有人对FileSensor有任何想法吗?我在研究本地目录中的文件时遇到了它。代码如下:
does anybody have any idea on FileSensor ? I came through it while i was researching on sensing files on my local directory. The code is as follows:
task= FileSensor( task_id="senseFile" filepath="etc/hosts", fs_conn_id='fs_local', _hook=self.hook, dag=self.dag,)我也将conn_id和conn类型设置为File(路径),并设置了{'path':'mypath'},但即使我设置了不存在的路径或者如果文件不在指定路径中,则说明任务已完成并且dag成功。
I have also set my conn_id and conn type as File (path) and gave the {'path':'mypath'} but even though i set a non existing path or if the file isnt there in the specified path, the task is completed and the dag is successful. The FileSensor doesnt seem to sense files at all.
推荐答案我发现社区对FileSenor的贡献有些不足,于是写了我自己的文件。
I found the community contributed FileSenor a little bit underwhelming so wrote my own.
我让它在服务器/调度程序运行所在的本地文件中正常工作,但是在使用网络路径时遇到了问题。
I got it working for files locally to where the server/scheduler was running however ran into problems when using network paths.
我发现网络路径的窍门是将网络驱动器安装到Linux Box。
The trick for network paths I found was to mount the network drive to my Linux Box.
这是我的DAG习惯于sensor_task >> proccess_task >> archive_task >>触发器重新运行
This is my DAG used to sensor_task >> proccess_task >> archive_task >> trigger rerun
注意:我们使用通过WebGUI输入的变量(sourcePath,filePattern和archivePath)
Note: We use variables (sourcePath, filePattern & archivePath) entered via the WebGUI
from airflow import DAG from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator, TriggerDagRunOperator from datetime import datetime, timedelta from airflow.models import Variable default_args = { 'owner': 'glsam', 'depends_on_past': False, 'start_date': datetime(2017, 6, 26), 'provide_context': True, 'retries': 100, 'retry_delay': timedelta(seconds=30) } task_name = 'my_first_file_sensor_task' filepath = Variable.get("soucePath") filepattern = Variable.get("filePattern") archivepath = Variable.get("archivePath") dag = DAG( 'task_name', default_args=default_args, schedule_interval=None, catchup=False, max_active_runs=1, concurrency=1) sensor_task = OmegaFileSensor( task_id=task_name, filepath=filepath, filepattern=filepattern, poke_interval=3, dag=dag) def process_file(**context): file_to_process = context['task_instance'].xcom_pull( key='file_name', task_ids=task_name) file = open(filepath + file_to_process, 'w') file.write('This is a test\n') file.write('of processing the file') file.close() proccess_task = PythonOperator( task_id='process_the_file', python_callable=process_file, dag=dag) archive_task = ArchiveFileOperator( task_id='archive_file', filepath=filepath, task_name=task_name, archivepath=archivepath, dag=dag) trigger = TriggerDagRunOperator( task_id='trigger_dag_rerun', trigger_dag_id=task_name, dag=dag) sensor_task >> proccess_task >> archive_task >> trigger然后这是我的FileSenor
And then this is my FileSenor
import os import re from datetime import datetime from airflow.models import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults from airflow.operators.sensors import BaseSensorOperator class ArchiveFileOperator(BaseOperator): @apply_defaults def __init__(self, filepath, archivepath, task_name, *args, **kwargs): super(ArchiveFileOperator, self).__init__(*args, **kwargs) self.filepath = filepath self.archivepath = archivepath self.task_name = task_name def execute(self, context): file_name = context['task_instance'].xcom_pull(self.task_name, key='file_name') os.rename(self.filepath + file_name, self.archivepath + file_name) class OmegaFileSensor(BaseSensorOperator): @apply_defaults def __init__(self, filepath, filepattern, *args, **kwargs): super(OmegaFileSensor, self).__init__(*args, **kwargs) self.filepath = filepath self.filepattern = filepattern def poke(self, context): full_path = self.filepath file_pattern = repile(self.filepattern) directory = os.listdir(full_path) for files in directory: if not re.match(file_pattern, files): # do nothing else: context['task_instance'].xcom_push('file_name', files) return True return False class OmegaPlugin(AirflowPlugin): name = "omega_plugin" operators = [OmegaFileSensor, ArchiveFileOperator]更多推荐
气流文件传感器,用于感应本地驱动器上的文件
发布评论