智慧出行/FAQ"/>
智慧出行/FAQ
Offsets out of range with no configured reset policy for partition
假设我们有10000个数据
segment就把它分为0-1000,1000-2000,2000-3000…
当我们消费到4500的时候报错了,然后也没有进行处理,过了kafka的生命周期,kafka就把数据全部清理掉了,当kafka在次进行消费,4501时没有数据了就报Offsets out of range with no configured reset policy for partition
如何解决?
我们要实现一个offset的纠偏的工具类
scala语言编写的纠偏工具类:
注意事项:Kafkaconsumer传入conf时,他类型是java的map,而我们的kafkaparams是scala的,需要使用scala包下的collection.JavaConversions._
package com.cartravel.kafkaimport java.time.Durationimport org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafkamon.TopicPartitionimport scala.collection.mutable
import scala.util.Try
import scala.collection.JavaConversions._
class kafkaOffsetCorrection(kafkaParams:Map[String,Object], topics:Seq[String], topicPartitionMap:mutable.HashMap[TopicPartition,Long]) extends Serializable {//主逻辑//currentorOffset,earliestOffset,latestOffset//矫正逻辑:curr < ear || cur > latest 说明offset不在有效范围之内了,我们就要纠正offset位置--> 从ear开始消费//开始矫正def Correction():Map[TopicPartition,Long] ={val earliestOffsets = getEarliestOffset//获取earliestOffsetval latestOffset = getLatestOffsetfor ((k,v)<-topicPartitionMap){val current: Long = vval earliest: Long = earliestOffsets(k)val latest = latestOffset(k)if (current<earliest || current>latest){//如果成立说明需要进行纠偏-->offset到earliesttopicPartitionMap.put(k,earliest)}}topicPartitionMap.toMap}private def getEarliestOffset: Map[TopicPartition,Long] ={var newKafkaParams = mutable.Map[String,Object]()newKafkaParams ++= kafkaParamsnewKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest")//通过kafka的api去消费val consumer = new KafkaConsumer[String,Object](newKafkaParams) //这里主要,传入的是java的map,所以需要使用scala的JavaConversions._consumer.subscribe(topics)val pullData = Try {consumer.poll(Duration.ofMillis(0))}if (pullData.isFailure){//邮件报警}val topicPartitions: Set[TopicPartition] = consumer.assignment().toSet//暂停消费consumer.pause(topicPartitions)//从头开始consumer.seekToBeginning(topicPartitions)val earliestOffsetMap = topicPartitions.map(line=>(line,consumer.position(line))).toMapconsumer.unsubscribe()consumer.close()earliestOffsetMap}private def getLatestOffset: Map[TopicPartition,Long] ={val newKafkaParams = mutable.Map[String,Object]()newKafkaParams ++= kafkaParamsnewKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest")//通过kafka的api去消费val consumer: KafkaConsumer[String, Object] = new KafkaConsumer[String,Object](newKafkaParams)consumer.subscribe(topics)val pullData = Try {consumer.poll(Duration.ofMillis(0))}if (pullData.isFailure){//邮件报警}val topicPartitions: Set[TopicPartition] = consumer.assignment().toSet//暂停消费consumer.pause(topicPartitions)//从尾开始消费consumer.seekToEnd(topicPartitions)val earliestOffsetMap = topicPartitions.map(line=>(line,consumer.position(line))).toMapconsumer.unsubscribe()consumer.close()earliestOffsetMap}
}
更多推荐
智慧出行/FAQ
发布评论