消费者报找不到方法为String类型的
消费端配置
@Profile("kafka")
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
public KafkaConsumerConfig(){
System.out.println("kafka消费者配置加载...");
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setMessageListener(kafkaConsumerListener());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerProperties());
}
@Bean
public Map<String, Object> consumerProperties() {
Map<String, Object> props= new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.3.31:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerListener");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
@Bean
public KafkaConsumerListenser kafkaConsumerListener(){
return new KafkaConsumerListenser();
}
}
监听类
@KafkaListener(topics = "test")
public class KafkaConsumerListenser implements AcknowledgingMessageListener<Integer, String> {
@Override
public void onMessage(ConsumerRecord<Integer, String> integerStringConsumerRecord, Acknowledgment acknowledgment) {
System.out.println(integerStringConsumerRecord.value()+"-----------------------------------------------------------------"+integerStringConsumerRecord.topic());
}
}
报错信息:
org.springframework.kafka.KafkaException: No method found for class java.lang.String
at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:92)
at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:146)
at org.springframework.kafka.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:60)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:166)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:764)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:708)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2500(KafkaMessageListenerContainer.java:230)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:981)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
看这个错,是缺少一个String参数的方法。你把Integer的先去掉。
public class KafkaConsumerListenser {
@KafkaListener(group="KafkaConsumerListener" ,topics = "test")
void listener(ConsumerRecord consumerRecords, Acknowledgment ack){
System.out.println("-----------------------------------------------------------------"+consumerRecords.value());
ack.acknowledge();
}
}
刚才试了下去掉Integer也不行,换成这种方式就可以。继承接口的话就不行
你的答案