在Spark执行器上将偏移提交给Kafka

编程入门 行业动态 更新时间:2024-10-13 20:20:01
本文介绍了在Spark执行器上将偏移提交给Kafka的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我从Kafka获取事件,在Spark上丰富/过滤/转换事件,然后将其存储在ES中.我将抵消额返还给Kafka

I am getting events from Kafka, enriching/filtering/transforming them on Spark and then storing them in ES. I am committing back the offsets to Kafka

我有两个问题/问题:

(1)我当前的Spark作业非常慢

我有一个主题的50个分区和20个执行程序.每个执行器都有2个核心和4g的内存.我的驱动程序有8克内存.我正在消耗1000个事件/分区/秒,我的批处理间隔是10秒.这意味着,我在10秒内消耗了500000个事件

I have 50 partitions for a topic and 20 executors. Each executor has 2 cores and 4g of memory each. My driver has 8g of memory. I am consuming 1000 events/partition/second and my batch interval is 10 seconds. This means, I am consuming 500000 events in 10 seconds

我的ES群集如下:

20个分片/索引

3个主实例c5.xlarge.elasticsearch

3 master instances c5.xlarge.elasticsearch

12个实例m4.xlarge.elasticsearch

12 instances m4.xlarge.elasticsearch

磁盘/节点= 1024 GB,所以总共12 TB

disk / node = 1024 GB so 12 TB in total

我的计划和处理延迟越来越大

And I am getting huge scheduling and processing delays

(2)如何在执行程序上提交偏移量?

当前,我在执行程序上丰富/转换/过滤事件,然后使用 BulkRequest 将所有内容发送到ES.这是一个同步过程.如果得到积极的反馈,我会将偏移量列表发送给驱动程序.如果没有,我发回一个空清单.在驱动程序上,我将偏移量提交给Kafka.我相信,应该有一种方法,我可以在执行者上提交抵消额,但是我不知道如何将kafka Stream传递给执行者:

Currently, I enrich/transform/filter my events on executors and then send everything to ES using BulkRequest. It's a synchronous process. If I get positive feedback, I send the offset list to driver. If not, I send back an empty list. On the driver, I commit offsets to Kafka. I believe, there should be a way, where I can commit offsets on executors but I don't know how to pass kafka Stream to executors:

((CanCommitOffsets) kafkaStream.inputDStream())mitAsync(offsetRanges, this::onComplete);

这是向Kafka提交偏移量的代码,需要Kafka Stream

This is the code for committing offsets to Kafka which requires Kafka Stream

这是我的总体代码:

kafkaStream.foreachRDD( // kafka topic rdd -> { // runs on driver rdd.cache(); String batchIdentifier = Long.toHexString(Double.doubleToLongBits(Math.random())); LOGGER.info("@@ [" + batchIdentifier + "] Starting batch ..."); Instant batchStart = Instant.now(); List<OffsetRange> offsetsToCommit = rdd.mapPartitionsWithIndex( // kafka partition (index, eventsIterator) -> { // runs on worker OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); LOGGER.info( "@@ Consuming " + offsetRanges[index].count() + " events" + " partition: " + index ); if (!eventsIterator.hasNext()) { return Collections.emptyIterator(); } // get single ES documents List<SingleEventBaseDocument> eventList = getSingleEventBaseDocuments(eventsIterator); // build request wrappers List<InsertRequestWrapper> requestWrapperList = getRequestsToInsert(eventList, offsetRanges[index]); LOGGER.info( "@@ Processed " + offsetRanges[index].count() + " events" + " partition: " + index + " list size: " + eventList.size() ); BulkResponse bulkItemResponses = elasticSearchRepository.addElasticSearchDocumentsSync(requestWrapperList); if (!bulkItemResponses.hasFailures()) { return Arrays.asList(offsetRanges).iterator(); } elasticSearchRepository.close(); return Collections.emptyIterator(); }, true ).collect(); LOGGER.info( "@@ [" + batchIdentifier + "] Collected all offsets in " + (Instant.now().toEpochMilli() - batchStart.toEpochMilli()) + "ms" ); OffsetRange[] offsets = new OffsetRange[offsetsToCommit.size()]; for (int i = 0; i < offsets.length ; i++) { offsets[i] = offsetsToCommit.get(i); } try { offsetManagementMappermit(offsets); } catch (Exception e) { // ignore } LOGGER.info( "@@ [" + batchIdentifier + "] Finished batch of " + offsetsToCommit.size() + " messages " + "in " + (Instant.now().toEpochMilli() - batchStart.toEpochMilli()) + "ms" ); rdd.unpersist(); });

推荐答案

您可以将偏移逻辑移到rdd循环上方...我正在使用下面的模板以获得更好的偏移处理和性能

You can move the offset logic above the rdd loop ... I am using below template for better offset handling and performance

JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)); kafkaStream.foreachRDD( kafkaStreamRDD -> { //fetch kafka offsets for manually commiting it later OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaStreamRDD.rdd()).offsetRanges(); //filter unwanted data kafkaStreamRDD.filter( new Function<ConsumerRecord<String, String>, Boolean>() { @Override public Boolean call(ConsumerRecord<String, String> kafkaRecord) throws Exception { if(kafkaRecord!=null) { if(!StringUtils.isAnyBlank(kafkaRecord.key() , kafkaRecord.value())) { return Boolean.TRUE; } } return Boolean.FALSE; } }).foreachPartition( kafkaRecords -> { // init connections here while(kafkaRecords.hasNext()) { ConsumerRecord<String, String> kafkaConsumerRecord = kafkaRecords.next(); // work here } }); //commit offsets ((CanCommitOffsets) kafkaStream.inputDStream())mitAsync(offsetRanges); });

更多推荐

在Spark执行器上将偏移提交给Kafka

本文发布于:2023-11-23 16:35:24,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622147.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:上将   执行器   Spark   Kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!