气流文件传感器,用于感应本地驱动器上的文件

编程入门 行业动态 更新时间:2024-10-21 09:27:09
本文介绍了气流文件传感器,用于感应本地驱动器上的文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

有人对FileSensor有任何想法吗?我在研究本地目录中的文件时遇到了它。代码如下:

does anybody have any idea on FileSensor ? I came through it while i was researching on sensing files on my local directory. The code is as follows:

task= FileSensor( task_id="senseFile" filepath="etc/hosts", fs_conn_id='fs_local', _hook=self.hook, dag=self.dag,)

我也将conn_id和conn类型设置为File(路径),并设置了{'path':'mypath'},但即使我设置了不存在的路径或者如果文件不在指定路径中,则说明任务已完成并且dag成功。

I have also set my conn_id and conn type as File (path) and gave the {'path':'mypath'} but even though i set a non existing path or if the file isnt there in the specified path, the task is completed and the dag is successful. The FileSensor doesnt seem to sense files at all.

推荐答案

我发现社区对FileSenor的贡献有些不足,于是写了我自己的文件。

I found the community contributed FileSenor a little bit underwhelming so wrote my own.

我让它在服务器/调度程序运行所在的本地文件中正常工作,但是在使用网络路径时遇到了问题。

I got it working for files locally to where the server/scheduler was running however ran into problems when using network paths.

我发现网络路径的窍门是将网络驱动器安装到Linux Box。

The trick for network paths I found was to mount the network drive to my Linux Box.

这是我的DAG习惯于sensor_task >> proccess_task >> archive_task >>触发器重新运行

This is my DAG used to sensor_task >> proccess_task >> archive_task >> trigger rerun

注意:我们使用通过WebGUI输入的变量(sourcePath,filePattern和archivePath)

Note: We use variables (sourcePath, filePattern & archivePath) entered via the WebGUI

from airflow import DAG from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator, TriggerDagRunOperator from datetime import datetime, timedelta from airflow.models import Variable default_args = { 'owner': 'glsam', 'depends_on_past': False, 'start_date': datetime(2017, 6, 26), 'provide_context': True, 'retries': 100, 'retry_delay': timedelta(seconds=30) } task_name = 'my_first_file_sensor_task' filepath = Variable.get("soucePath") filepattern = Variable.get("filePattern") archivepath = Variable.get("archivePath") dag = DAG( 'task_name', default_args=default_args, schedule_interval=None, catchup=False, max_active_runs=1, concurrency=1) sensor_task = OmegaFileSensor( task_id=task_name, filepath=filepath, filepattern=filepattern, poke_interval=3, dag=dag) def process_file(**context): file_to_process = context['task_instance'].xcom_pull( key='file_name', task_ids=task_name) file = open(filepath + file_to_process, 'w') file.write('This is a test\n') file.write('of processing the file') file.close() proccess_task = PythonOperator( task_id='process_the_file', python_callable=process_file, dag=dag) archive_task = ArchiveFileOperator( task_id='archive_file', filepath=filepath, task_name=task_name, archivepath=archivepath, dag=dag) trigger = TriggerDagRunOperator( task_id='trigger_dag_rerun', trigger_dag_id=task_name, dag=dag) sensor_task >> proccess_task >> archive_task >> trigger

然后这是我的FileSenor

And then this is my FileSenor

import os import re from datetime import datetime from airflow.models import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults from airflow.operators.sensors import BaseSensorOperator class ArchiveFileOperator(BaseOperator): @apply_defaults def __init__(self, filepath, archivepath, task_name, *args, **kwargs): super(ArchiveFileOperator, self).__init__(*args, **kwargs) self.filepath = filepath self.archivepath = archivepath self.task_name = task_name def execute(self, context): file_name = context['task_instance'].xcom_pull(self.task_name, key='file_name') os.rename(self.filepath + file_name, self.archivepath + file_name) class OmegaFileSensor(BaseSensorOperator): @apply_defaults def __init__(self, filepath, filepattern, *args, **kwargs): super(OmegaFileSensor, self).__init__(*args, **kwargs) self.filepath = filepath self.filepattern = filepattern def poke(self, context): full_path = self.filepath file_pattern = repile(self.filepattern) directory = os.listdir(full_path) for files in directory: if not re.match(file_pattern, files): # do nothing else: context['task_instance'].xcom_push('file_name', files) return True return False class OmegaPlugin(AirflowPlugin): name = "omega_plugin" operators = [OmegaFileSensor, ArchiveFileOperator]

更多推荐

气流文件传感器,用于感应本地驱动器上的文件

本文发布于:2023-11-23 20:50:28,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622824.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:文件   气流   传感器   感应   器上

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!