object kafka_producer_consumer {
def main(args: Array[String]): Unit = {
producer()
cunsumer()
}
def cunsumer(): Unit ={
val props = new Properties()
props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("group.id", "something")
props.put("auto.offset.reset","earliest")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(Collections.singletonList("result"))
while (true){
val records = consumer.poll(100)
for (record <- records){
println(record.offset() +"--" +record.key() +"--" +record.value())
}
}
consumer.close()
}
def producer(): Unit ={
val brokers_list = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
val topic = "result"
val properties = new Properties()
properties.put("group.id", "jaosn_")
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers_list)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) //key的序列化;
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)//value的序列化;
val producer = new KafkaProducer[String, String](properties)
var num = 0
for(i<- 1 to 1000){
val json = new JSONObject()
json.put("name","jason"+i)
json.put("addr","25"+i)
producer.send(new ProducerRecord(topic,json.toString()))
}
producer.close()
}
昵称
0 声望
这家伙太懒,什么都没留下
先把生产者修改成同步等待,看看报什么错:
producer.send(new ProducerRecord(topic,json.toString())).get()
你的答案