问题描述
限时送ChatGPT账号..我正在尝试访问一些标题,同时以批处理模式使用消息.如果我设置侦听器来处理 Message<?>
我可以手动提取标题
I am trying to access some headers whilst consuming messages in batch mode.
If I setup the listener to handle Message<?>
I can manually extract the headers
@KafkaListener(topics = "${kafka.topic}")
public void receive (List<Message<?> data, Acknowledgment ack) throws SQLException {
for (int i = 0; i < data.size(); ++) {
Object message = data.get(i).getPayload();
MessageHeaders mh = data.get(i).getHeaders();
Object value = mh.get("test");
我想为我完成其中的一些,但是当我尝试时
I would like to have some of this done for me but when I try
@KafkaListener(topics = "${kafka.topic}")
public voic receive (List<string> data,
@Header (KafkaHeaders.OFFSET) List<Integer> offsets,
@Header ("test") List<String> testHeaders,
Acknowledgment ack) throws SQLException {
我收到 MessageHandlingException: Missing header 'test' for method parameter type [interface.java.util.list]
然而,这种方法对于偏移标头工作正常.
I get MessageHandlingException: Missing header 'test' for method parameter type [interface.java.util.list]
This method however works fine for the offset header.
这是因为有内置代码来处理标准标头,而这种方法不能用于自定义标头,还是我错过了一些可以使这种方法起作用的东西?
Is this because there is inbuilt code to handle the standard headers and this approach cannot be used for custom ones or have I missed something that would make this approach work?
推荐答案
是的,框架只映射它知道的头;它将所有其他映射的标头放入
Yes, the framework only maps the headers it knows about; it puts all other mapped headers into
/**
* The header for a list of Maps of converted native Kafka headers. Used for batch
* listeners; the map at a particular list position corresponds to the data in the
* payload list position.
*/
public static final String BATCH_CONVERTED_HEADERS = PREFIX + "batchConvertedHeaders";
如果您想要该标头的离散映射,则需要创建一个自定义 BatchMessageConverter
- 可能是 BatchMessagingMessageConverter
的子类.
If you want a discrete mapping for that header, you would need to create a custom BatchMessageConverter
- probably a subclass of BatchMessagingMessageConverter
.
可能最简单的方法是覆盖 这个方法,调用super.toMessage()
然后添加你的标题.
Probably the easiest would be to override this method, call super.toMessage()
then add your headers.
return MessageBuilder.fromMessage(super.toMessage(...))
.setHeader("test", ...)
.build();
如果您使用的是 Spring Boot,只需将转换器添加为 bean,boot 就会将其连接起来.
If you are using Spring Boot, just add the converter as a bean and boot will wire it in.
否则将转换器添加到容器工厂.
Otherwise add the converter to the container factory.
编辑
如果消息转换器没有头部映射器;所有标题都放在标题 KafkaHeaders.NATIVE_HEADERS
中,它是一个 List
.
If the message converter has no header mapper; all headers are put in the header KafkaHeaders.NATIVE_HEADERS
which is a List<Headers>
.
这篇关于Kafka 标题作为批处理模式的列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论