循环气流变量问题

编程入门 行业动态 更新时间:2024-10-17 08:31:35
本文介绍了循环气流变量问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我很难遍历脚本中的气流变量

I am having hard time looping over an airflow variable in my script

所以我需要在存储桶中列出所有以字符串为前缀的文件.

so I have a requirement to list all files prefixed by string in a bucket.

接下来遍历它并进行一些操作.

next loop through it and do some operations.

我尝试使用xcomm和subdags,但是我不知道为什么,所以我想出了一种新方法.

I tried making use of xcomm and subdags but i couldn't figure it out so i came up with a new approach.

尽管其中涉及2个脚本

第一个脚本将气流变量设置为我生成的值

1 st scripts sets the airflow variable with a value i generate

下面是代码.

#!/usr/bin/env python with DAG('Test_variable', default_args=default_args, schedule_interval=None ) as dag: GCS_File_list = GoogleCloudStorageListOperator( task_id= 'list_Files', bucket= 'bucketname', prefix='aaa', delimiter='.csv', google_cloud_storage_conn_id='google_cloud_default' #provide_context = True, #dag = dag ) def update_variable(**context): files_list = Variable.get('files_list') print(type(Variable.get('files_list'))) updated_file_list = context['ti'].xcom_pull(task_ids='list_Files') #updated_file_list = updated_file_list.strip('][').split(',') Variable.set("files_list", updated_file_list) #print(updated_file_list) #print(type(updated_file_list)) python_task = PythonOperator( task_id= 'pass_list', provide_context=True, python_callable=update_variable, #op_kwargs={'input_file':file}, trigger_rule=TriggerRule.ALL_SUCCESS, #provide_context = True, #xcom_push=True, dag=dag ) GCS_File_list >> python_task

如您所见,上面的脚本列出了存储桶中的文件,并将结果设置为气流变量.

as you see the above script lists files from a bucket and set the result to an airflow variable.

脚本2:导入脚本1中设置的变量值并在其上循环

script 2: import the variable value set in script 1 and loop over it

这是我可以获取变量值但无法对其进行迭代的地方

this is where i can get the variable value but cant iterate over it

脚本2:

#!/usr/bin/env python from datetime import datetime, timedelta from airflow import DAG from airflowag.models import Variable from airflow.operators.bash_operator import BashOperator from airflow.utils.trigger_rule import TriggerRule from airflow.operators import PythonOperator from datetime import datetime YESTERDAY = datetimebine( datetime.today() - timedelta(days=1), datetime.min.time()) BQ_DATASET_NAME = 'abc' CURRENT_TIME = datetime files_list_str = Variable.get("files_list") files_list = files_list_str.strip('][').split(',') bucket = 'def' default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': YESTERDAY, 'provide_context': True, } def gen_date(input_file,**kwargs): # stepl1: check for file extension and remove it idx_extension = input_file.find(".") input_file_name = input_file[:idx_extension] find_date_time_part = re.findall("_(\d*?_\d*?_\d*)",input_file_name) find_date_time_part = str(find_date_time_part).split('_', 1)[-1].strip(']') find_date_time_part = str(find_date_time_part) find_date_time_part = re.sub("'",'', find_date_time_part) find_date_time_part_len = len(find_date_time_part) if find_date_time_part_len == 15: x = [a for a in find_date_time_part.split('_') if a] #get the date time part from the list x = (' '.join(x[-2:])) #Using strptime to parse the string value as datetime object here our date format is YYYYMMDD hhmiss dt_obj = datetime.strptime(x, "%Y%m%d %H%M%S") # use strftime to format the date object into desired format in our case YYYY-MM-DD hh:mi:ss final_date_formatted = dt_obj.strftime("%Y-%m-%d %H:%M:%S") #print(type(find_date_time_part)) return final_date_formatted else: print("{}_{}".format(files_list,input_file)) with DAG('dag1', default_args=default_args, schedule_interval=None ) as dag: for file in enumerate (files_list): Python_Task = PythonOperator( task_id='pass_date', provide_context=True, python_callable=gen_date, op_kwargs={'input_file':file}, trigger_rule=TriggerRule.ALL_SUCCESS, provide_context = True, #xcom_push=True, dag=dag ) Python_Task

正如您在gen_date()函数中看到的那样,我正在else块中打印出变量名以及输入文件名

as you see in this function gen_date() i am printing out the variable name as well as the input file name in the else block

语句的输出 print("{} _ {}".format(files_list,input_file) 是['abc.csv','def.csv'] _ [

the output of the statement print("{}_{}".format(files_list,input_file) is ['abc.csv','def.csv']_[

我不确定为什么要传递"["而不是输入文件名 任何建议表示赞赏.

I am not sure why a "[" is getting passed instead of input_file name any advise appreciated.

我现在看到在for循环中,file_list被视为字符串而不是列表

如何使files_list成为列表而不是字符串.

how can i make the files_list as a list and not string.

推荐答案

因此,当我们获取气流变量(files_list)时,它会以字符串而不是列表的形式存储并返回,因此我们首先需要将变量转换为列表,然后然后循环访问该变量.

so when we get airflow variable (files_list) it is stored and returned as a string not a list so we would first need to convert the variable into a list and then loop on that variable.

更多推荐

循环气流变量问题

本文发布于:2023-11-23 19:05:44,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622551.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:气流   变量

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!