kafka producer生产数据server端堆外内存频繁OOM
kafka
[2018-06-11 17:57:59,303] ERROR [ReplicaManager broker=0] Error processing append operation on partition raw-0 (kafka.server.ReplicaManager)
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:693)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:741)
at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:727)
at org.apache.kafka.common.utils.Utils.readFully(Utils.java:831)
at org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:804)
at org.apache.kafka.common.record.FileLogInputStream$FileChannelRecordBatch.loadBatchWithSize(FileLogInputStream.java:210)
at org.apache.kafka.common.record.FileLogInputStream$FileChannelRecordBatch.loadFullBatch(FileLogInputStream.java:192)
at org.apache.kafka.common.record.FileLogInputStream$FileChannelRecordBatch.iterator(FileLogInputStream.java:149)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:84)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:242)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:550)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:548)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:548)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:538)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:538)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:579)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:575)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:575)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$2.apply(KafkaApis.scala:596)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$2.apply(KafkaApis.scala:596)
at kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2221)
at kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:54)
at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2220)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:596)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:614)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:186)
at kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:172)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:613)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:630)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:630)
at kafka.server.DelayedFetch.onComplete(DelayedFetch.scala:167)
at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:136)
at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)
at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:371)
at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:277)
at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:306)
at kafka.cluster.Partition$$anonfun$13.apply(Partition.scala:580)
at kafka.cluster.Partition$$anonfun$13.apply(Partition.scala:566)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:565)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:744)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:728)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:728)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:469)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:466)
at kafka.server.KafkaApis.handle(KafkaApis.scala:104)
at kafka.server.KafkaRequestHandler.run(KafkaRe