从0.8.x, 0.9.x 或 0.10.0.X 升级到 0.10.1.0
0.10.1.0有线协议更改,通过遵循以下建议的滚动升级,在升级期间不会停机。但是,需要注意升0.10.1.0中潜在的突发状况。
注意:由于引入了新的协议,要在升级客户端之前先升级kafka集群(即,0.10.1.x仅支持 0.10.1.x或更高版本的broker,但是0.10.1.x的broker向下支持旧版本的客户端)
注意:如果你可接受停机,你可以简单地将所有broker关闭,更新版本并重启启动,它们将默认从新版本开始。
注意:变换协议版本和重启启动可以在broker升级完成后的任何时间去做,不必马上做。
日志保留时间不再基于日志段的最后修改时间。相反,它将基于日志段中消息的最大时间戳。
日志滚动时间不再取决于日志段的创建时间。而是基于消息中的时间戳。进一步来说。如果日志段中第一个消息的时间戳是T,则当新的消息的时间戳大于或等于T+log.roll.ms时,日志将推出。
0.10.0 的打开的文件处理将增加了约33%,因为为每个段增加时间索引文件。
时间索引和offset索引共享相同的索引大小配置。因为每个时间索引条目是offset索引条目的1.5备。用户可能需要增加log.index.size.max.bytes以避免频繁的日志滚动。
由于索引文件数量增加,对于一些有大量日志段的broker(即 >15k),在broker启动期间,日志加载处理可能更长。根据我们的实现,num.recovery.threads.per.data.dir设置为1可减少日志加载的时间。
新的java消费者不再是测试阶段了,我们建议将其应用到所有的新开发当中。旧的Scala使用仍然支持,但将在下一个版本中弃用,并在未来的主要版本中移除。
--new-consumer/--new.consumer
转换不再需要使用MirrorMaker和类似于Console消费者工具。只需要通过一个Kafka broker连接,而不是ZooKeeper了。另外,控制台消费者和旧消费者已弃用,并且将在未来的主要版本中移除。
Kafka集群现在可通过集群ID来标识唯一,broker升级到0.10.1.0时将自动的生成。集群ID可通过kafka.server:type=KafkaServer,name=ClusterId获取。它是元数据相应的一部分,序列化,客户端拦截器和度量记录器可通过实现ClusterResourceListener接口来接收集群ID。
BrokerState "RunningAsController" (value 4) 已被移除。由于一个bug,brpker仅在转换出来之前处于这种状态,因此移除影响应该是最小的。推荐的方法是通过kafka检查给定的broker是否是控制器。controller:type=KafkaController,name=ActiveControllerCount
新的Java消费者现在允许用户通过分区上的时间戳来搜索offset。
新的Java消费者现在支持后台线程心跳检测,有一个新的配置max.poll.interval.ms控制消费者主动离开组之前poll调用之间的最大时间(默认是5分钟)。配置request.timeout.ms的值必须始终大于max.poll.interval.ms,因为JoinGroup请求在消费者重新平衡时候阻塞服务器的最大时间。因此我们更改了其默认值超过5分钟,最后,session.timeout.ms的默认值已调整为10秒,并max.poll.records的默认值更改为500。
当使用Authorizer并且用户对topic没有描述授权时,broker将不再向请求返回TOPIC_AUTHORIZATION_FAILED错误,因为这会泄漏topic名称。 相反,将返回UNKNOWN_TOPIC_OR_PARTITION错误代码。 当使用生产者和消费者时,这可能导致意外的超时或延迟,因为Kafka客户端通常将在未知的topic错误时自动重试。 如果您怀疑这可能已经正在发生,你应该查阅客户端日志。
获取响应的默认的限制大小(消费者为50MB,副本为10MB)。现有的分区限制也适用(消费者和副本是1MB)。注意,这些限制不是绝对的最大值(下一节解释)。
如果一个消息大于响应/分区大小限制,消费者和副本可以继续使用。更具体的是,如果在第一个非空分区中的第一个消息大于限制,则消息将仍然返回。
kafka.api.FetchRequest和kafka.javaapi.FetchRequest中增加了重载的构造函数。以允许调用者去指定分区的顺序(因为在v3中顺序很重要)。之前的构造函数已弃用。在请求发送之前,以避免资源匮乏问题引起的混洗。
ListOffsetRequest v1支持基于时间戳的精确offset搜索。
MetadataResponse v2引入了一个新字段:“cluster_id”。
FetchRequest v3支持限制响应大小(除了现有的分区限制)。
JoinGroup v1引入了一个新字段:“rebalance_timeout”。
0.10.0.0具有潜在的突变更改(请在升级之前查看),以及升级后可能的性能影响。 通过遵循以下建议的滚动升级计划,可保障在升级期间和之后不会出现停机时间和性能影响。
注意:由于引入了新协议,因此在升级客户端之前先升级Kafka集群。
注意,对于版本0.9.0.0:由于0.9.0.0中有一个bug,依赖于Zookeeper(旧的Scala高级消费者和MirrorMaker如果一起使用)的客户端将无法在0.10.0.x中使用。因此,broker升级到0.10.0.x之前,先升级0.9.0.0客户端到0.9.0.1。对于0.8.X或0.9.0.1客户端,此步骤不是必需的。
更新所有broker的server.properties文件,并添加以下配置:
升级broker,关闭它,然后升级到新版本,最后重启它。
一旦整个集群升级完成,通过编辑inter.broker.protocol.version设置为0.10.0.0转换所有协议。注意:你现在应该还不需要设置message.format.version
- 此配置应该当所有的消费者升级为0.10.0.0时才需要设置。
依次重新启动broker,使新协议版本生效。
一旦所有的消费者已经升级为.10.0,设置每个broker的log.message.format.version为0.10.0,然后逐个重启。
注意 :如果你接受停机目,你可以简单粗暴的关闭所有broker,更新版本并重新启动。它们默认从新协议开始。
注意 :变换协议版本和重启启动可以在broker升级完成后的任何时间去做,不必马上做。
0.10.0中的消息格式包括新的时间戳字段,并使用压缩消息的相关联的offset。磁盘默认的消息格式是0.10.0,消息格式可以通过server.properties中的log.message.format.version配置。如果消费者客户端版本低于0.10.0.0。它只能“理解”0.10.0之前的消息格式。在这种情况下,broker在发送响应到旧版本消费者之前转换0.10.0格式到之前的格式。然而,这样的话,broker不是零复制传输。在Kafka社区关于性能影响的报告显示,在升级后,CPU利用率从20%提高100%。这迫使所有客户端马升级,促使性能恢复正常。为了避免消费者升级到0.10.0.0之前的消息转换,可以设置log.message.format.version为0.8.2或0.9.0。这样,broker仍然零复制传输将数据发送给旧的消费者。一旦消费者升级,就可以把消息格式更为0.10.0,就可以享受含新时间戳和优化后的压缩新消息格式。转换只是为了确保兼容性,尽可能避免消息转换才是至关重要的。
客户端升级到0.10.0.0,不会对性能产生影响。
注意 :通过设置消息格式版本,可以证明所有现有消息处于或低于该消息格式版本。否则消费者在0.10.0.0之前可能会中断。特别是,在消息格式设置为0.10.0之后,不应将其更改回较早的格式,因为它可能会在0.10.0.0之前的版本上中断消费者。
注意 :由于在每个消息中引入了额外的时间戳,生产者在发送少量消息可能会看到消息吞吐量下降(因为增加了开销)。 同样,复制每个消息传输也增加了8个字节。 如果你集群的能力与网络接近,可能会超过网卡,并看到由于过载的故障和性能问题。
注意 :如果生产者已经启用了压缩,则在某些情况下,可能注意到生产者吞吐量减少或broker的压缩率降低。当接收压缩消息时,0.10.0的broker避免再次压缩消息,这样减少延迟并提高吞吐量。然而,在某些情况下,这可能减少生产者的批次大小,导致较差的吞吐量。如果出现这种情况,可调整生产者的linger.ms 和 batch.size以提高吞吐量。另外,生产者用于压缩消息的缓存小于broker生产者使用的缓存,这可能对磁盘上的消息的压缩比有负面影响。 我们打算在未来的Kafka版本中进行配置。
0.10.0.0潜在的中断
从Kafka 0.10.0.0开始,Kafka中的消息格式版本表示为Kafka的版本。例如,消息格式0.9.0指的是支持的最高消息版本就是0.9.0。
消息格式0.10.0已经介绍过了,并且默认是使用的。消息包含了一个时间戳字段和压缩后消息的关系offset。
已经引入了ProduceRequest/Response v2,并默认使用支持消息格式0.10.0。
已经引入了FetchRequest/Response v2已经被引入,它默认使用支持消息格式0.10.0。
MessageFormatter 接口从def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
更改为 def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
MessageReader 接口从 def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]
更改为 def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]
MessageFormatter的包从kafka.tools到kafka.common
MessageReader的包从kafka.tools到kafka.common
MirrorMakerMessageHandler不再处理(记录:MessageAndMetadata [Array [Byte],Array [Byte]])方法从未被调用用。
0.7版本的KafkaMigrationTool不再和kafka一起打包。如果你需要从0.7迁移到0.10.0,请先迁移到0.8,然后按照的升级步骤从0.8升级到0.10.0。
新消费者API已标准化,接收java.util.Collection作为方法参数的序列化类型。升级现有的版本才能使用0.10.0客户端库
LZ4压缩消息处理已更改为使用可互操作的规范框架(LZ4f v1.5.1)。为了保留与旧客户端的兼容性,此改变仅适用于消息格式为0.10.0和更高版本。使用v0/v1(消息格式0.9.0)Produce/Fetch LZ4压缩消息的客户端应继续使用0.9.0实现框架。使用Produce/Fetch协议v2或更高版本的客户端应使用可互操作的LZ4f框架。可互操作的LZ4库的列表可在https://www.lz4.org/查看
在0.10.0.0的显著变化
从0.10.0.0开始,增加一个新的客户端Kafka Streams客户端,用于流式处理存储在kafka topic的数据。这个新客户端仅支持0.10.x或更高的版本。
新消费者默认receive.buffer.bytes是64K。
新的消费者现在公开了exclude.internal.topics配置,以防止内部topic(例如消费者offset topic)被其他的正则匹配订阅。默认是启用。
旧的的Scala的生产者已经弃用。使用者尽快使用最新的Java客户端。
新的消费者API已标记为稳定。
9.0.0有潜在的中断更改风险(在升级之前需要知道),并且与之前版本的broker之间的协议改变。这意味着此次升级可能和客户端旧版本不兼容。因此在升级客户端之前,先升级kafka集群。如果你使用MirrorMaker下游集群,则同样应首先升级。
升级所有broker的server.properties
,并在其中添加inter.broker.protocol.version = 0.8.2.X
每次升级一个broker:关闭broker,替换新版本,然后重新启动。
一旦整个群集升级,通过编辑inter.broker.protocol.version并将其设置为0.9.0.0来转换所有协议。
逐个重新启动broker,使新协议版本生效。
注意 :如果你可接受停机,你可以简单地将所有broker关闭,更新版本并重启启动,协议将默认从新版本开始。
注意 :变换协议版本和重启启动可以在broker升级完成后的任何时间去做,不必马上做。
Java 1.6不再支持。
Scala 2.9不再支持。
默认情况下,1000以上的Broker ID为自动分配。如果你的集群高于该阈值,需相应地增加reserved.broker.max.id配置。
replica.lag.max.messages配置已经移除。分区leader在决定哪些副本处于同步时将不再考虑落后的消息的数。
配置参数replica.lag.time.max.ms现在不仅指自上次从副本获取请求后经过的时间,还指自副本上次被捕获以来的时间。 副本仍然从leader获取消息,但超过replica.lag.time.max.ms配置的最新消息将被认为不同步的。
压缩的topic不再接受没有key的消息,如果出现,生产者将抛出异常。 在0.8.x中,没有key的消息将导致日志压缩线程退出(并停止所有压缩的topic)。
MirrorMaker不再支持多个目标集群。 它只接受一个--consumer.config。 要镜像多个源集群,每个源集群至少需要一个MirrorMaker实例,每个源集群都有自己的消费者配置。
在org.apache.kafka.clients.tools。包下的Tools已移至org.apache.kafka.tools。。 所有包含的脚本仍将照常工作,只有直接导入这些类的自定义代码将受到影响。
在kafka-run-class.sh中更改了默认的Kafka JVM性能选项(KAFKA_JVM_PERFORMANCE_OPTS)。
kafka-topics.sh脚本(kafka.admin.TopicCommand)现在退出,失败时出现非零退出代码。
kafka-topics.sh脚本(kafka.admin.TopicCommand)现在将在topic名称由于使用“.”或“_”而导致风险度量标准冲突时打印警告。以及冲突的情况下的错误。
kafka-console-producer.sh脚本(kafka.tools.ConsoleProducer)将默认使用新的Java Producer,用户必须指定“old-producer”才能使用旧生产者。
默认情况下,所有命令行工具都会将所有日志消息打印到stderr而不是stdout。
可以通过将broker.id.generation.enable设置为false来禁用新的broker ID生成功能。
默认情况下,配置参数log.cleaner.enable为true。 这意味着topic会清理。
policy = compact现在将被默认压缩,并且128MB的堆(通过log.cleaner.dedupe.buffer.size)分配给清洗进程。你可能需要根据你对压缩topic的使用情况,查看log.cleaner.dedupe.buffer.size和其他log.cleaner配置值。
默认情况下,新消费者的配置参数fetch.min.bytes的默认值为1。
kafka-topics.sh脚本的变更topic配置已弃用(kafka.admin.ConfigCommand),以后将使用kafka-configs.sh(kafka.admin.ConfigCommand) 。
kafka-consumer-offset-checker.sh(kafka.tools.ConsumerOffsetChecker)已弃用,以后将使用kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand)
kafka.tools.ProducerPerformance已弃用。以后将使用org.apache.kafka.tools.ProducerPerformance(kafka-producer-perf-test.sh也将使用新类)
生产者的block.on.buffer.full已弃用,并将在以后的版本中移除。目前其默认已经更为false。KafkaProducer将不再抛出BufferExhaustedException,而是使用max.block.ms来中止,之后将抛出TimeoutException。如果block.on.buffer.full属性明确地设置为true,它将设置max.block.ms为Long.MAX_VALUE和metadata.fetch.timeout.ms将不执行。
0.8.2与0.8.1完全兼容。 关闭,更新代码并重新启动,逐个升级broker。
0.8.1与0.8完全兼容。 关闭,更新代码并重新启动,逐个升级broker。
版本0.7与较新版本不兼容。 对API,ZooKeeper数据结构,协议和配置进行了主要更改,以便添加复制(在0.7中缺失)。 从0.7版升级到更高版本需要一个特殊的迁移工具(通过下一章的API)。 此迁移可以在不停机的情况下完成。