如何使用CeleryExecutor在自定义Docker映像上运行气流

编程入门 行业动态 更新时间:2024-10-28 04:24:05
本文介绍了如何使用CeleryExecutor在自定义Docker映像上运行气流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在向Web应用程序添加气流,该应用程序将包含业务逻辑的目录手动添加到 PYTHON_PATH env var,以及我希望在所有服务器上保持一致的其他系统级设置在我的集群中.我已经使用RMQ作为代理成功运行了此应用程序的celery,并以Redis作为任务结果后端运行了一段时间,并且具有使用 LocalExecutor 运行Airflow的经验.

I am adding airflow to a web application that manually adds a directory containing business logic to the PYTHON_PATH env var, as well as does additional system-level setup that I want to be consistent across all servers in my cluster. I've been successfully running celery for this application with RMQ as the broker and redis as the task results backend for awhile, and have prior experience running Airflow with LocalExecutor.

我没有使用Pukel的图像,而是有一个基本的后端图像的入口,该图像基于 SERVICE env var运行其他服务.看起来像这样:

Instead of using Pukel's image, I have a an entry point for a base backend image that runs a different service based on the SERVICE env var. That looks like this:

if [ $SERVICE == "api" ]; then # upgrade to the data model flask db upgrade # start the web application python wsgi.py fi if [ $SERVICE == "worker" ]; then celery -A tasks.celery.celery worker --loglevel=info --uid=nobody fi if [ $SERVICE == "scheduler" ]; then celery -A tasks.celery.celery beat --loglevel=info fi if [ $SERVICE == "airflow" ]; then airflow initdb airflow scheduler airflow webserver

我有一个 .env 文件,该文件使用定义我的气流参数的方式来构建容器:

I have an .env file that I build the containers with the defines my airflow parameters:

AIRFLOW_HOME=/home/backend/airflow AIRFLOW__CORE__LOAD_EXAMPLES=False AIRFLOW__CORE__EXECUTOR=CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+pymysql://${MYSQL_USER}:${MYSQL_ROOT_PASSWORD}@${MYSQL_HOST}:${MYSQL_PORT}/airflow?charset=utf8mb4 AIRFLOW__CELERY__BROKER_URL=amqp://${RABBITMQ_DEFAULT_USER}:${RABBITMQ_DEFAULT_PASS}@${RABBITMQ_HOST}:5672 AIRFLOW__CELERY__RESULT_BACKEND=redis://${REDIS_HOST}

关于当前如何设置我的入口点,它不会进入 webserver .而是通过调用Web服务器在前台运行该 scheduler .我可以将其更改为

With how my entrypoint is setup currently, it doesn't make it to the webserver. Instead, it runs that scheduler in the foreground with invoking the web server. I can change this to

airflow initdb airflow scheduler -D airflow webserver

现在,Web服务器正在运行,但是它不知道调度程序,该调度程序现在正在作为守护程序运行:

Now the webserver runs, but it isn't aware of the scheduler, which is now running as a daemon:

Airflow确实知道我正在使用 CeleryExecutor ,并在正确的位置查找中断:

Airflow does, however, know that I'm using a CeleryExecutor and looks for the dags in the right place:

airflow | [2020-07-29 21:48:35,006] {default_celery.py:88} WARNING - You have configured a result_backend of redis://redis, it is highly recommended to use an alternative result_backend (i.e. a database). airflow | [2020-07-29 21:48:35,010] {__init__.py:50} INFO - Using executor CeleryExecutor airflow | [2020-07-29 21:48:35,010] {dagbag.py:396} INFO - Filling up the DagBag from /home/backend/airflow/dags airflow | [2020-07-29 21:48:35,113] {default_celery.py:88} WARNING - You have configured a result_backend of redis://redis, it is highly recommended to use an alternative result_backend (i.e. a database).

我可以通过进入容器内部并手动启动调度程序来解决此问题:

I can solve this by going inside the container and manually firing up the scheduler:

技巧似乎是在容器内的前台同时运行两个进程,但我仍在如何在入口点内执行此操作.我已经检查了Pukel的入口点代码,但是对我来说他在做什么并不明显.我敢肯定,只要稍加调整,就可以开始比赛了……预先感谢您的帮助.另外,如果有任何主要的反模式可能会碰到这里,我很乐意获得反馈,以便我能够正确实现气流.这是我第一次实现 CeleryExecutor ,涉及的费用很多.

The trick seems to be running both processes in the foreground within the container, but I'm stuck on how to do that inside the entrypoint. I've checked out Pukel's entrypoint code, but it's not obvious to me what he's doing. I'm sure that with just a slight tweak this will be off to the races... Thanks in advance for the help. Also, if there's any major anti-pattern that I'm at risk of running into here I'd love to get the feedback so that I can implement airflow properly. This is my first time implementing CeleryExecutor, and there's a decent amount involved.

推荐答案

尝试使用nohup. en.wikipedia/wiki/Nohup

try using nohup. en.wikipedia/wiki/Nohup

nohup气流计划程序> scheduler.log&

在您的情况下,您将更新入口点,如下所示:

in your case, you would update your entrypoint as follows:

if [ $SERVICE == "airflow" ]; then airflow initdb nohup airflow scheduler > scheduler.log & nohup airflow webserver fi

更多推荐

如何使用CeleryExecutor在自定义Docker映像上运行气流

本文发布于:2023-11-24 03:03:28,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1623738.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:映像   自定义   气流   如何使用   CeleryExecutor

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!