返回到文章

采纳

编辑于

kafka旧高级消费者API

旧高级消费者API
history
kafka_history


2.2.1 旧高级消费者API




class Consumer { /**

  • Create a ConsumerConnector
    *
  • @param config at the minimum, need to specify the groupid of the consumer and the zookeeper
  • connection string zookeeper.connect.
    */
    public static kafka.javaapi.consumer.ConsumerConnector createJavaConsumerConnector(ConsumerConfig config);
    }

/**

  • V: type of the message
  • K: type of the optional key assciated with the message
    /
    public interface kafka.javaapi.consumer.ConsumerConnector {
    /*

    • Create a list of message streams of type T for each topic.
      *
    • @param topicCountMap a map of (topic, #streams) pair
    • @param decoder a decoder that converts from Message to T
    • @return a map of (topic, list of KafkaStream) pairs.
    • The number of items in the list is #streams. Each stream supports
    • an iterator over message/metadata pairs.
      */
      public <K,V> Map<String, List<KafkaStream<K,V>>>
      createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);

    /**

    • Create a list of message streams of type T for each topic, using the default decoder.
      */
      public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);

    /**

    • Create a list of message streams for topics matching a wildcard.
      *
    • @param topicFilter a TopicFilter that specifies which topics to
    • subscribe to (encapsulates a whitelist or a blacklist).
    • @param numStreams the number of message streams to return.
    • @param keyDecoder a decoder that decodes the message key
    • @param valueDecoder a decoder that decodes the message itself
    • @return a list of KafkaStream. Each stream supports an
    • iterator over its MessageAndMetadata elements.
      */
      public <K,V> List<KafkaStream<K,V>>
      createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);

    /**

    • Create a list of message streams for topics matching a wildcard, using the default decoder.
      */
      public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);

    /**

    • Create a list of message streams for topics matching a wildcard, using the default decoder, with one stream.
      */
      public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);

    /**

    • Commit the offsets of all topic/partitions connected by this connector.
      */
      public void commitOffsets();

    /**

    • Shut down the connector
      */
      public void shutdown();
      }


高级消费者Api的例子,点击这里