Kafka 消费者 offsetForTimes 方法仅返回少数分区偏移位置而不是全部

编程入门 行业动态 更新时间:2024-10-25 20:28:09
本文介绍了Kafka 消费者 offsetForTimes 方法仅返回少数分区偏移位置而不是全部的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

我有一个有 8 个分区的 kafka 主题,从单个消费者订阅该主题,并且我为消费者提供了唯一的消费者组.现在我尝试只使用来自所有分区的最近消息(在我的情况下是从当前时间开始 3 分钟前).我使用了如下所示的 offsetForTimes 方法.

I've one kafka topic with 8 partitions, subscribing the topic from single consumer and I've unique consumer group for the consumer. Now I tried to consume only the recent messages (in my case 3 mins before from current time) from all partitions. I used offsetForTimes method like below.

List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartions = partitionInfos.stream().......collect(Collectors.toList());
Long value = Instant.now().minus(120,ChronoUnit.SECONDS).toEpochMillis();
Map<TopicPartion,Long> topicPartitionTime = topicPartions.stream().collect(COllectors.toMap(tp -> tp,(value)));
Map<TopicPartition, OffsetAndTimeStamp> offsets = consumer.offsetsForTimes(topicPartitionTime);

现在的问题是offsetsForTimes 只返回一两个分区偏移位置,剩余的返回null.

now question is offsetsForTimes only returns one or two partitions offset positions and returns null for remaining.

我想消耗所有分区最近的消息,而不是一两个分区.

I want to consume all partitions recent messages not one or two partitions.

我也在下面试过

consumer.unsubscribe();
consumer.assign(allPartitions);
Map<TopicPartition, OffsetAndTimeStamp> offsets = consumer.offsetsForTimes(topicPartitionTime);

但仍然只有一两个偏移位置.在最坏的情况下,有时所有分区的偏移量为空.

but still getting only one or two offset positions.In worst case some times null offsets for all partitons.

如果 offsetForTimes 仅适用于一个/两个分区,如何从单个消费者轮询所有分区最近的记录?

if offsetForTimes works only with one/two partition, How to poll all partition recent records from single consumer ?

已我正在使用 Kafka 集群.3-4台机器共享8个分区.

EDITED : I'm using Kafka cluster. 8 partitions shared on 3-4 machines.

其他输入:-我能够在以下场景中重现该问题.

Additional Inputs:- I am able to reproduce the problem with below scenario.

创建三个主题 A(1-Partition)、B(10-Partition)和 C(10-Partition)KafkaStreams 消费来自 A 的消息并将消息推送到 B &C.向 A 主题推送了大约 100 条消息.KafkaStreams 消费并推送到 B&C 主题.我可以看到消息分布在 B&C 中的所有分区(即 10 个分区包含大约 10 条消息).我创建了单个 KafkaConsumer,Consuming B 主题.现在我用所有分区调用 offsetForTimes 方法,时间戳是从当前减去 5 分钟.确保consumer.assignment() 返回offsetForTimes 之前的所有分区.offsetForTimes 返回具有偏移位置的单个分区,但是当我调用 consumer.poll 方法时,它也返回来自其他分区的消息.

使用 apache kafka 版本 - 2.11-2.2.0Kafka 客户端 jar - 2.0.1

using apache kafka version - 2.11-2.2.0 Kafka clients jar - 2.0.1

提前感谢您的帮助.

推荐答案

我无法重现你的情况;我唯一一次获得偏移量的 null 是当该分区没有提交的偏移量时.例如我有 10 个分区,但只写入 8 个:

I can't reproduce your condition; the only time I get null for the offset is when there is no committed offset for that partition. e.g. I have 10 partitions but only write to 8:

@SpringBootApplication
public class So59200574Application implements ConsumerSeekAware {

    public static void main(String[] args) {
        SpringApplication.run(So59200574Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so59200574").partitions(10).replicas(1).build();
    }

    @KafkaListener(id = "so59200574", topics = "so59200574")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ConsumerAwareRebalanceListener rebal() {
        return new ConsumerAwareRebalanceListener() {

            @Override
            public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
                final long tenSecondsAgo = System.currentTimeMillis() - 10_000L;
                partitions.forEach(tp -> timestampsToSearchputeIfAbsent(tp, tp1 -> tenSecondsAgo));
                System.out.println(consumer.offsetsForTimes(timestampsToSearch));
            }

        };
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> IntStream.range(0, 8).forEach(i -> template.send("so59200574", i, null, "foo" + i));
    }

}

这篇关于Kafka 消费者 offsetForTimes 方法仅返回少数分区偏移位置而不是全部的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 10:23:27,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/961744.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:分区   而不是   消费者   位置   方法

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!