请问 我们公司的kafka集群有两个,其中一个kafka只能写数据进去却读不了,两个kafka的配置都是一样的,关键是不报错,好像是创建不了消费者,求答案 谢谢

发表于: 2019-08-13   最后更新时间: 2019-08-13 22:38:20   1,856 游览
object kafka_producer_consumer {
  def main(args: Array[String]): Unit = {
     producer()
    cunsumer()
  }
  def cunsumer(): Unit ={
    val props = new Properties()

    props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("group.id", "something")
    props.put("auto.offset.reset","earliest")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(Collections.singletonList("result"))
    while (true){
      val records = consumer.poll(100)
      for (record <- records){
        println(record.offset() +"--" +record.key() +"--" +record.value())
      }
    }
    consumer.close()
  }

  def producer(): Unit ={

    val brokers_list = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
    val topic = "result"
    val properties = new Properties()
    properties.put("group.id", "jaosn_")
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers_list)
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) //key的序列化;
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)//value的序列化;
    val producer = new KafkaProducer[String, String](properties)
    var num = 0
    for(i<- 1 to 1000){
      val json = new JSONObject()
      json.put("name","jason"+i)
      json.put("addr","25"+i)
      producer.send(new ProducerRecord(topic,json.toString()))
    }
    producer.close()
  }
发表于 2019-08-13
添加评论

先把生产者修改成同步等待,看看报什么错:
producer.send(new ProducerRecord(topic,json.toString())).get()

你的答案

查看kafka相关的其他问题或提一个您自己的问题