admin管理员组文章数量:1580424
Flink重启策略
为什么需要设置重启策略?
当任务失败时,Flink需要重新启动失败的任务和其他受影响的任务,以将作业恢复到正常状态。
重新启动策略和故障转移策略用于控制任务重新启动。重新启动策略决定是否以及何时可以重新启动失败/受影响的任务。故障转移策略决定应重新启动哪些任务以恢复作业。
NOTE:重启策略需要配合Checkpoint启动,因为需要用到flink的内部State
使用RestartStrategy
配置文件配置
配置文件中是DataSet&DataStream通用的。
如果enableCheckpoint()
没有设置,那么restart-strategy
默认为:none
如果设置了enableCheckpoint()
,那么restart-strategy
为:fixed-delay
且delay=1s
#这里有3种不同的重启策略,
restart-strategy: none, off, disable|fixeddelay, fixed-delay|failurerate, failure-rate
通过ExecutionConfig配置
//限定重启次数
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 每次重启之间的时间间隔,即重启尝试时间
)
//限定失败率
//如果Duration被设为5分钟 = 300s,那么10s尝试重启一次,那么实际可重试30次
//failure-rate = n/30 其中n为重试但失败的次数,如果达到一定的阈值,那么任务重启失败
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 5min内允许失败的最大次数,可以适当调整
Time.of(5, TimeUnit.MINUTES), //用来衡量失败率的时间间隔
Time.of(10, TimeUnit.SECONDS) //2个连续的重试尝试之间的时间间隔
))
重启策略 Restart strategy
fixed-delay
#假如 restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts=3 [default]
restart-strategy.fixed-delay.delay=2s [default]
举个栗子:
===> 假如 delay=1s,attempts=1,那么重启的策略就为每2秒尝试重启一次,要么重启成功,要么失败进入下一次重启尝试,如果累计重试次数达到3次但是任然没有成功,那么这个task重启就算失败
failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
举个栗子:
===> 假如failure-rate-interval=5min,max-failures-per-interval=3,delay=10,那么重启策略就是每10s尝试重启一次,如果连续重试失败次数超过3次,那么表示重启失败
non-restart
不启用重启策略
fallback-restart
Flink自动管理重启策略,如果用这个策略,那么默认就是使用fixed-dalay
失败策略Failover strategy
官网参考:https://ci.apache/projects/flink/flink-docs-release-1.10/dev/task_failure_recovery.html
可以通过flink-conf.yaml来设置failover strategy
Restart All Failover Strategy
- 这个策略是重启整个job中所有的task,从失败恢复到正常状态
Restart Pipelined Region Failover Strateg
- 用来决定在region 失败策略中的region范围,这种策略比重启所有任务代价要小的多env.getConfig.setExecutionMode(ExecutionMode.PIPELINED)
jobmanager.execution.failover-strategy | value to config |
---|---|
Restart all 重启所有的任务 | Full |
Restart pipelined region 重启单个分区内的任务 | Region |
简单的实践Checkpoint代码
package com.shufang.state.chekpoint
import com.shufang.broadcast.People
import com.shufang.entities.WorkPeople
import com.shufang.source.MyUDFPeopleSource
import org.apache.flink.api.common.ExecutionMode
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.co.{BroadcastProcessFunction, KeyedBroadcastProcessFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object CheckPointDemo {
def main(args: Array[String]): Unit = {
//获取执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//用来决定在region (failover strategy)失败策略中的region范围
env.getConfig.setExecutionMode(ExecutionMode.PIPELINED)
/**
* --------------------------------------checkpoint的配置-----------------------------------------------
*/
env.enableCheckpointing(1000) //每1s checkpoint 一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //默认是EXACTLY_ONCE
env.getCheckpointConfig.setCheckpointInterval(1000) //每隔 1s进行一次checkpoint 的工作
env.getCheckpointConfig.setCheckpointTimeout(6000) //如果checkpoint操作在6s之内没有完成,那么就discard终端该checkpoint操作
//true:假如在checkpoint过程中产生了Error,那么Task直接显示失败
//false:产生了error,Task继续运行,checkpoint会降级到之前那个状态
env.getCheckpointConfig.setFailOnCheckpointingErrors(false) //默认为true
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //在统一时间只能同时有1个checkpoint操作,其他的操作必须等当前操作执行完或者超时之后才能执行
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION) //清除或保留状态
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(0) //下一个checkpoint操作触发之前最小的阻塞时间,必须>=0
/** --------------------------------------配置重启策略----------------------------------------------------
* When a task failure happens, (当一个任务失败后)
* Flink needs to restart the failed task and other affected tasks to recover the job to a normal state.
* (Flink 需要重启失败的任务和其他受影响的task并恢复到一个正常的状态)
* 重启配置与checkpoint设置有关:
* 如果没有开启checkpoint,那么重启策略为:no restart!
* 如果开启了checkpoint,那么重启策略默认为:fixed-delay strategy is used with Integer.MAX_VALUE
*
* restart-strategy 可以在flink-conf.yaml中进行设置,也可以通过env.setRestartStrategy()设置
*/
/*env.setRestartStrategy(
RestartStrategies.failureRateRestart(
10,
Time.minutes(5),
Time.seconds(10))
)*/
//env.setRestartStrategy(new RestartStrategies.FallbackRestartStrategyConfiguration) //自动按照fixed-dalay重启策略
/*env.setRestartStrategy(
new RestartStrategies.FailureRateRestartStrategyConfiguration(
10,
Time.minutes(5),
Time.seconds(10)))*/
//env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration())
//env.setRestartStrategy(new RestartStrategies.FixedDelayRestartStrategyConfiguration(5,Time.seconds(4)))
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,Time.seconds(4)))
val config = new RestartStrategies.FailureRateRestartStrategyConfiguration(3, Time.minutes(5), Time.seconds(10))
env.setRestartStrategy(config)
val ds: DataStream[WorkPeople] = env.addSource(new MyUDFPeopleSource)
val ds1: DataStream[(Int, Char)] = env.fromElements((1, '男'), (2, '女'))
val describer = new MapStateDescriptor[Int, Char]("genderInfo", classOf[Int], classOf[Char])
val bcStream: BroadcastStream[(Int, Char)] = ds1.broadcast(describer)
val resultStream: DataStream[People] = ds.connect(bcStream).process(
new BroadcastProcessFunction[WorkPeople, (Int, Char), People] {
override def processElement(value: WorkPeople,
ctx: BroadcastProcessFunction[WorkPeople, (Int, Char), People]#ReadOnlyContext,
out: Collector[People]): Unit = {
val gender: Char = ctx.getBroadcastState(describer).get(value.genderCode).charValue()
out.collect(People(value.id, value.name, gender, value.address, value.price))
}
override def processBroadcastElement(value: (Int, Char), ctx: BroadcastProcessFunction[WorkPeople, (Int, Char), People]#Context, out: Collector[People]): Unit = {
ctx.getBroadcastState(describer).put(value._1, value._2)
}
}
)
ds.print("before:")
resultStream.print("after:")
env.execute("checkpoint")
}
}
本文标签: 重启策略FlinkrestartStrategy
版权声明:本文标题:Flink重启策略Restart-Strategy 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://www.elefans.com/xitong/1725699924a1037343.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论