kafka消费者低级实例
为什么使用SimpleConsumer?
使用“SimpleConsumer”的主要原因是你想比使用“消费者分组”更好的控制分区消费。
比如你想:
- 多次读取消息
- 在一个处理过程中只消费Partition其中的一部分消息
- 添加事务管理机制以保证消息被处理且仅被处理一次
使用SimpleConsumer有哪些弊端呢
这个SimpleConsumer确实需要很大的工作量:
- 必须在程序中跟踪offset值.
- 必须找出指定Topic(主题) Partition(分区)中的lead broker.
- 必须处理broker的变动.
使用SimpleConsumer的步骤
- 从所有活跃的broker中找出哪个是指定Topic(主题) Partition(分区)中的leader broker
- 找出指定Topic Partition中的所有备份broker
- 构造请求
-
发送请求获取数据
-
处理leader broker变更
寻找 Lead Broker 的Topic(主题)和Partition(分区)
这些不需要broker在集群中,你可以开始寻找活着的broker来查询Leader的信息。
private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { PartitionMetadata returnMetaData = null; loop: for (String seed : a_seedBrokers) { SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); List<String> topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { if (part.partitionId() == a_partition) { returnMetaData = part; break loop; } } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } if (returnMetaData != null) { m_replicaBrokers.clear(); for (kafka.cluster.Broker replica : returnMetaData.replicas()) { m_replicaBrokers.add(replica.host()); } } return returnMetaData; }
调用topicMetadata()方法连接到broker中查找到我们感兴趣的topic.
partitionsMetadata循环所有分区,直到找到我们需要的。.
查找读取起始偏移量
现在定义从哪里开始读取数据。kafka包括两个常数,kafka.api.OffsetRequest.EarliestTime()发现日志中的数据的开始,kafka.api.OffsetRequest.LatestTime()将只流新消息。不要以为补偿0开始偏移。
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) ); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; }
差错处理
由于SimpleConsumer不处理lead broker的失败,你必须写一些代码来处理它。
if (fetchResponse.hasError()) { numErrors++; // Something went wrong! short code = fetchResponse.errorCode(a_topic, a_partition); System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); if (numErrors > 5) break; if (code == ErrorMapping.OffsetOutOfRangeCode()) { // We asked for an invalid offset. For simple case ask for the last element to reset readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); consumer = null; leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); continue; }在这里,一旦获取返回的一个错误,我们记录原因,关闭消费者,然后试图找出新的leader。
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { for (int i = 0; i < 3; i++) { boolean goToSleep = false; PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); if (metadata == null) { goToSleep = true; } else if (metadata.leader() == null) { goToSleep = true; } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { // first time through if the leader hasn't changed give ZooKeeper a second to recover // second time, assume the broker did recover before failover, or it was a non-Broker issue // goToSleep = true; } else { return metadata.leader().host(); } if (goToSleep) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } System.out.println("Unable to find new leader after Broker failure. Exiting"); throw new Exception("Unable to find new leader after Broker failure. Exiting"); }
此方法使用前面定义找到新领导者的findLeader()的逻辑,但在这里我们只尝试连接到该topic(主题)/partition(分区)的副本之一。这样,如果我们不能找到需要的数据,则退出。
因为它可能需要很短的时间内Zookeeper发现领袖失效并分配一个新的领导人,如果我们没有得到响应,则休眠。事实上,Zookeeper故障转移很快,所以你从不休眠。
读取数据
最后我们读取传输回来的数据,并把它写出来.
// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only. // Setting the replicaId incorrectly will cause the brokers to behave incorrectly. FetchRequest req = new FetchRequestBuilder() .clientId(clientName) .addFetch(a_topic, a_partition, readOffset, 100000) .build(); FetchResponse fetchResponse = consumer.fetch(req); if (fetchResponse.hasError()) { // See code in previous section } numErrors = 0; long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); continue; } readOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); numRead++; a_maxReads--; } if (numRead == 0) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } }
请注意,“readOffset”询问最后一次读取的消息的下一个偏移。当消息块被处理,我们就知道从哪里开始获取下一个。
还要注意的是,检查正在读取的偏移量不小于我们要求的偏移。这是必要的,因为如果kafka被压缩的消息,读取请求将返回整个压缩的块,即使请求偏移量不是压缩块的开头。还需要注意的是,我们要求fetchSize 100000bytes。如果kafka生产商正在大批量生产,这可能是不够的并可能会返回一个空的消息集。在这种情况下,应增加 fetchSize,直到返回非空的集合。.
最后,我们跟中读取信息#,如果我们没有在最后一次请求读取到消息,我们就进行休眠。
运行示例
该示例预计以下参数:
- 消息的最大数量阅读(所以我们没有永远循环下去)
- 从Topic(主题)读取
- 从Partition(分区)读取
- 一个broker用于Metadata的查询
- Broker端口监听
完整源码
package com.test.simple; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; import kafka.javaapi.*; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; public class SimpleExample { public static void main(String args[]) { SimpleExample example = new SimpleExample(); long maxReads = Long.parseLong(args[0]); String topic = args[1]; int partition = Integer.parseInt(args[2]); List<String> seeds = new ArrayList<String>(); seeds.add(args[3]); int port = Integer.parseInt(args[4]); try { example.run(maxReads, topic, partition, seeds, port); } catch (Exception e) { System.out.println("Oops:" + e); e.printStackTrace(); } } private List<String> m_replicaBrokers = new ArrayList<String>(); public SimpleExample() { m_replicaBrokers = new ArrayList<String>(); } public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception { // find the meta data about the topic and partition we are interested in // PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); if (metadata == null) { System.out.println("Can't find metadata for Topic and Partition. Exiting"); return; } if (metadata.leader() == null) { System.out.println("Can't find Leader for Topic and Partition. Exiting"); return; } String leadBroker = metadata.leader().host(); String clientName = "Client_" + a_topic + "_" + a_partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName); int numErrors = 0; while (a_maxReads > 0) { if (consumer == null) { consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); } FetchRequest req = new FetchRequestBuilder() .clientId(clientName) .addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka .build(); FetchResponse fetchResponse = consumer.fetch(req); if (fetchResponse.hasError()) { numErrors++; // Something went wrong! short code = fetchResponse.errorCode(a_topic, a_partition); System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); if (numErrors > 5) break; if (code == ErrorMapping.OffsetOutOfRangeCode()) { // We asked for an invalid offset. For simple case ask for the last element to reset readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); consumer = null; leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); continue; } numErrors = 0; long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); continue; } readOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); numRead++; a_maxReads--; } if (numRead == 0) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } if (consumer != null) consumer.close(); } public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) ); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { for (int i = 0; i < 3; i++) { boolean goToSleep = false; PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); if (metadata == null) { goToSleep = true; } else if (metadata.leader() == null) { goToSleep = true; } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { // first time through if the leader hasn't changed give ZooKeeper a second to recover // second time, assume the broker did recover before failover, or it was a non-Broker issue // goToSleep = true; } else { return metadata.leader().host(); } if (goToSleep) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } System.out.println("Unable to find new leader after Broker failure. Exiting"); throw new Exception("Unable to find new leader after Broker failure. Exiting"); } private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { PartitionMetadata returnMetaData = null; loop: for (String seed : a_seedBrokers) { SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); List<String> topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { if (part.partitionId() == a_partition) { returnMetaData = part; break loop; } } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } if (returnMetaData != null) { m_replicaBrokers.clear(); for (kafka.cluster.Broker replica : returnMetaData.replicas()) { m_replicaBrokers.add(replica.host()); } } return returnMetaData; } }