问题
kafka 数据拉取错误
版本、环境、场景等上下文信息
假设生产者生产的数据是:1,2,3,4,5,6,7,8,9.
如果第一次拉取到:1,2,3,4 提交offset
第二次拉取到:5,6,7,8 不提交offset
希望,第三次拉取到:5,6,7,8
相关代码
def kafka_con():
consumer = KafkaConsumer(group_id='alg',
bootstrap_servers=['10.100.1.**:9092','10.100.1.**:9092','10.100.1.**:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
auto_offset_reset='earliest', enable_auto_commit=False)
partition_a = [TopicPartition('dT', 0),TopicPartition('dT',1),
TopicPartition('dT',2), TopicPartition('dT',3)]
consumer.assign(partition_a)
f,k=open('./data.json','w',encoding='utf-8'),0
while k<3:
msg = consumer.poll(timeout_ms=10000, max_records=10000, update_offsets=False)
for keys, values in msg.items(): # 不同partition的结果
dic = {}
for val in values: # 每个partition拉取的每行
dic['number'] = val.key.decode('utf-8')
j_data = json.dumps(dic)
f.write(j_data + '\n')
if k==0:
consumer.commit_async()
else:
pass
f.write('一次执行完毕{}\n'.format(k))
k+=1
f.close()
报错信息
但是现在按照代码运行,结果显示为:
1,2,3,4,5,6,7,8,1,2,3,4
已经尝试过哪些方法仍然没解决,操作步骤等
刚学习kafka,想解决实时拉取数据,poll数据报错:则不提交,挂起进程2s后再次拉取原数据重新处理,正常处理完数据没报错:则提交offset