智慧出行/FAQ

编程入门 行业动态 更新时间:2024-10-17 17:23:28

<a href=https://www.elefans.com/category/jswz/34/1770070.html style=智慧出行/FAQ"/>

智慧出行/FAQ

Offsets out of range with no configured reset policy for partition

假设我们有10000个数据
segment就把它分为0-1000,1000-2000,2000-3000…
当我们消费到4500的时候报错了,然后也没有进行处理,过了kafka的生命周期,kafka就把数据全部清理掉了,当kafka在次进行消费,4501时没有数据了就报Offsets out of range with no configured reset policy for partition

如何解决?

我们要实现一个offset的纠偏的工具类
scala语言编写的纠偏工具类:
注意事项:Kafkaconsumer传入conf时,他类型是java的map,而我们的kafkaparams是scala的,需要使用scala包下的collection.JavaConversions._

package com.cartravel.kafkaimport java.time.Durationimport org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafkamon.TopicPartitionimport scala.collection.mutable
import scala.util.Try
import scala.collection.JavaConversions._
class kafkaOffsetCorrection(kafkaParams:Map[String,Object], topics:Seq[String], topicPartitionMap:mutable.HashMap[TopicPartition,Long]) extends Serializable {//主逻辑//currentorOffset,earliestOffset,latestOffset//矫正逻辑:curr < ear || cur > latest 说明offset不在有效范围之内了,我们就要纠正offset位置--> 从ear开始消费//开始矫正def Correction():Map[TopicPartition,Long] ={val earliestOffsets = getEarliestOffset//获取earliestOffsetval latestOffset = getLatestOffsetfor ((k,v)<-topicPartitionMap){val current: Long = vval earliest: Long = earliestOffsets(k)val latest = latestOffset(k)if (current<earliest || current>latest){//如果成立说明需要进行纠偏-->offset到earliesttopicPartitionMap.put(k,earliest)}}topicPartitionMap.toMap}private def getEarliestOffset: Map[TopicPartition,Long] ={var newKafkaParams = mutable.Map[String,Object]()newKafkaParams ++= kafkaParamsnewKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest")//通过kafka的api去消费val consumer = new KafkaConsumer[String,Object](newKafkaParams) //这里主要,传入的是java的map,所以需要使用scala的JavaConversions._consumer.subscribe(topics)val pullData = Try {consumer.poll(Duration.ofMillis(0))}if (pullData.isFailure){//邮件报警}val topicPartitions: Set[TopicPartition] = consumer.assignment().toSet//暂停消费consumer.pause(topicPartitions)//从头开始consumer.seekToBeginning(topicPartitions)val earliestOffsetMap = topicPartitions.map(line=>(line,consumer.position(line))).toMapconsumer.unsubscribe()consumer.close()earliestOffsetMap}private def getLatestOffset: Map[TopicPartition,Long] ={val newKafkaParams = mutable.Map[String,Object]()newKafkaParams ++= kafkaParamsnewKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest")//通过kafka的api去消费val consumer: KafkaConsumer[String, Object] = new KafkaConsumer[String,Object](newKafkaParams)consumer.subscribe(topics)val pullData = Try {consumer.poll(Duration.ofMillis(0))}if (pullData.isFailure){//邮件报警}val topicPartitions: Set[TopicPartition] = consumer.assignment().toSet//暂停消费consumer.pause(topicPartitions)//从尾开始消费consumer.seekToEnd(topicPartitions)val earliestOffsetMap = topicPartitions.map(line=>(line,consumer.position(line))).toMapconsumer.unsubscribe()consumer.close()earliestOffsetMap}
}

更多推荐

智慧出行/FAQ

本文发布于:2024-02-26 23:38:52,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1704405.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:智慧   FAQ

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!