在我的Spark作业中,有一些第三方库,其中包含各种数据,指标,...当我创建并注册SparkListener时,它会在驱动程序中执行回调代码
In my spark jobs, there are some 3rd party libraries with various data, metrics, ... When I create and register SparkListener it executes the callbacks code in a driver
我的问题是,当工作人员开始某个阶段以及完成工作阶段(运行所有任务)时,我如何才能在工作人员上执行一些代码?还是完成工作?
My question how can i have some code that will be executed on a worker when worker is starting some stage, and also when it finished the stage ( run all tasks)? Or finisher a job?
在这种情况下,这将是从实际运行在工人上的第三方库中提取一些数据的好触发器
I that case it will be a good trigger to extract some data from 3rd parties libs that actually running on workers
推荐答案从2.4 Spark 开始具有执行程序插件API
ExecutorPlugin
一个可以在每个Spark执行程序中自动实例化的插件.用户可以指定应使用"spark.executor.plugins"配置创建的插件.在执行程序开始运行任何任务之前,将为每个执行程序创建每个插件的实例,包括通过动态分配创建的插件.
A plugin which can be automatically instantiated within each Spark executor. Users can specify plugins which should be created with the "spark.executor.plugins" configuration. An instance of each plugin will be created for every executor, including those created by dynamic allocation, before the executor starts running any tasks.
在执行任何任务之前就开始.
有一个示例项目,旨在显示一些基本用法.它采用了建议的设计,实际的实现方式却大不相同.
There is an example project that aims to show some basic usage. It uses a proposed design, the actual implementation is quite different.
val spark = SparkSession .builder() .config("spark.executor.plugins", CustomPlugin.getClass.getName.replace("$", "")) .getOrCreate()执行:
object CustomPlugin extends ExecutorPlugin { override def init(): Unit = { println(s"Started") //Not that TaskContext.get() throws NPE as task context hasn't been initialized yet } override def shutdown(): Unit = { println(s"Shutdown:") } }别忘了在最后停止Spark会话
Don't forget to stop the spark session at the end
spark.stop()更多推荐
如何注册火花侦听器以在火花工作者进程上工作
发布评论