2.2.1 旧高级消费者API
class Consumer { /** * Create a ConsumerConnector * * @param config at the minimum, need to specify the groupid of the consumer and the zookeeper * connection string zookeeper.connect. */ public static kafka.javaapi.consumer.ConsumerConnector createJavaConsumerConnector(ConsumerConfig config); } /** * V: type of the message * K: type of the optional key assciated with the message */ public interface kafka.javaapi.consumer.ConsumerConnector { /** * Create a list of message streams of type T for each topic. * * @param topicCountMap a map of (topic, #streams) pair * @param decoder a decoder that converts from Message to T * @return a map of (topic, list of KafkaStream) pairs. * The number of items in the list is #streams. Each stream supports * an iterator over message/metadata pairs. */ public <K,V> Map<String, List<KafkaStream<K,V>>> createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder); /** * Create a list of message streams of type T for each topic, using the default decoder. */ public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap); /** * Create a list of message streams for topics matching a wildcard. * * @param topicFilter a TopicFilter that specifies which topics to * subscribe to (encapsulates a whitelist or a blacklist). * @param numStreams the number of message streams to return. * @param keyDecoder a decoder that decodes the message key * @param valueDecoder a decoder that decodes the message itself * @return a list of KafkaStream. Each stream supports an * iterator over its MessageAndMetadata elements. */ public <K,V> List<KafkaStream<K,V>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder); /** * Create a list of message streams for topics matching a wildcard, using the default decoder. */ public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams); /** * Create a list of message streams for topics matching a wildcard, using the default decoder, with one stream. */ public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter); /** * Commit the offsets of all topic/partitions connected by this connector. */ public void commitOffsets(); /** * Shut down the connector */ public void shutdown(); }
高级消费者Api的例子,点击这里 。