如何在MySqlOperator中使用气流xcoms

编程入门 行业动态 更新时间:2024-10-22 07:31:17
本文介绍了如何在MySqlOperator中使用气流xcoms的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述 def mysql_operator_test(): DEFAULT_DATE = datetime(2017, 10, 9) t = MySqlOperator( task_id='basic_mysql', sql="SELECT count(*) from table 1 where id>100;", mysql_conn_id='mysql_default', dag=dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=False) run_this = PythonOperator( task_id='getRecoReq', python_callable=mysql_operator_test, # xcom_push=True, dag=dag) task2 = PythonOperator( task_id= 'mysql_select', provide_context=True, python_callable = blah, templates_dict = {'requests': "{{ ti.xcom_pull(task_ids='getReq') }}" }, dag=dag) run_this.set_downstream(task2)

我想使用xcoms捕获MySqlOperator返回的计数.有人可以指导一下吗?

I want to capture the count returned by the MySqlOperator using xcoms. Can someone please guide regarding the same?

推荐答案

您非常亲密!但是,您问这个问题的方式有点像反模式.您不想在Airflow中的各个任务之间共享数据.另外,您也不想像在mysql_operator_test中那样使用运算符.这很诱人,刚开始时我做了同样的事情.

You're very close! However, the way you're asking this question is kind of an anti-pattern. You don't want to share data across tasks in Airflow. Also, you don't want to use the operator like you are in mysql_operator_test. It's tempting, I did the same thing when I was getting started.

我尝试了与此非常相似的东西,但是使用了SFTP连接.我最终只是在PythonOperator内完成了所有操作,并使用了底层的钩子.

I tried something very similar to this but with SFTP connections. I eventually just did everything inside of the PythonOperator and used the underlying hooks.

我建议您在python_callable内使用MySQLHook.像这样:

I'd recommend you use the MySQLHook inside of a python_callable. Something like this:

def count_mysql_and_then_use_the_count(): """ Returns an SFTP connection created using the SSHHook """ mysql_hook = MySQLHook(...) cur = conn.cursor() cur.execute("""SELECT count(*) from table 1 where id>100""") for count in cur: # Do something with the count...

我不确定这是否可以正常运行,但是我的想法是在可调用的Python内使用钩子,我不经常使用MySQLHook,但是我使用SSHHook效果很好.

I'm not sure if this will work as is but the idea is use a hook inside your Python callable, I don't use the MySQLHook often but I did this with the SSHHook and it's been working great.

更多推荐

如何在MySqlOperator中使用气流xcoms

本文发布于:2023-11-23 19:49:18,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622675.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:气流   如何在   MySqlOperator   xcoms

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!