Kafka只订阅最新消息吗?

编程入门 行业动态 更新时间:2024-10-22 05:18:38
本文介绍了Kafka只订阅最新消息吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

有时(似乎是非常随机的),Kafka发送旧消息.我只需要最新消息,因此它将使用相同的密钥覆盖消息.目前看来,我有多个具有相同密钥的消息,没有被压缩.

Sometimes(seems very random) Kafka sends old messages. I only want the latest messages so it overwrite messages with the same key. Currently it looks like I have multiple messages with the same key it doesn't get compacted.

我在主题中使用此设置:

I use this setting in the topic:

cleanup.policy=compact

我正在使用Java/Kotlin和Apache Kafka 1.1.1客户端.

I'm using Java/Kotlin and Apache Kafka 1.1.1 client.

Properties(8).apply { val jaasTemplate = "org.apache.kafkamon.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";" val jaasCfg = String.format(jaasTemplate, Configuration.kafkaUsername, Configuration.kafkaPassword) put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS) put(ConsumerConfig.GROUP_ID_CONFIG, "ApiKafkaKotlinConsumer${Configuration.kafkaGroupId}") put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name) put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name) put("security.protocol", "SASL_SSL") put("sasl.mechanism", "SCRAM-SHA-256") put("sasl.jaas.config", jaasCfg) put("max.poll.records", 100) put("receive.buffer.bytes", 1000000) }

我错过了一些设置吗?

推荐答案

如果每个键都只有一个值,则必须使用KTable<K,V>抽象: Kafka Streams 中的StreamsBuilder::table(final String topic) .此处使用的主题应将清除策略设置为compact.

If you want have only one value for each key, you have to use KTable<K,V> abstraction: StreamsBuilder::table(final String topic) from Kafka Streams. Topic used here should have cleanup policy set to compact.

如果您使用KafkaConsumer,则只需从代理中提取数据.它没有为您提供执行某种重复数据删除功能的任何机制.根据是否执行压缩,您可以为同一密钥获得 one 到 n 消息.

If you use KafkaConsumer you just pull data from brokers. It doesn't give you any mechanism that perform some kind of deduplication. Depending on if compaction was performed or not, you can get one to n messages for same key.

关于压缩

紧凑并不意味着同一个键的所有旧值都将立即被删除.当相同键的old消息将被删除时,取决于几个属性.最重要的是:

Compaction doesn't mean, that all old value for same key are removed immediately. When old message for same key will be removed, depends on several properties. The most important are:

  • log.cleaner.min.cleanable.ratio

对于符合清除条件的日志,脏日志与总日志的最小比率

The minimum ratio of dirty log to total log for a log to eligible for cleaning

  • log.cleaner.minpaction.lag.ms
    • log.cleaner.minpaction.lag.ms
    • 消息在日志中保持不压缩的最短时间.仅适用于正在压缩的日志.

      The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.

      • log.cleaner.enable
        • log.cleaner.enable
        • 启用日志清除器进程以在服务器上运行.如果使用带有cleanup.policy = compact的任何主题(包括内部偏移量主题),则应启用该选项.如果禁用,这些主题将不会被压缩,并且会不断增长.

          Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size.

          有关压缩的更多详细信息,您可以找到 kafka.apache/documentation/#压实

          More detail about compaction you can find kafka.apache/documentation/#compaction

更多推荐

Kafka只订阅最新消息吗?

本文发布于:2023-10-24 14:35:58,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1524195.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:最新消息   Kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!