运行tweepy的气流任务退出,返回码为

编程入门 行业动态 更新时间:2024-10-12 05:45:00
本文介绍了运行tweepy的气流任务退出,返回码为-6的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个简单的Airflow DAG,它只有一个任务-stream_from_twitter_to_kafka

I have a simple Airflow DAG which has only one task - stream_from_twitter_to_kafka

以下是DAG的代码:

default_args = { "owner": "me", "depends_on_past": False, "start_date": datetime(2020, 1, 20), "email": ["makalaaneesh18@mail"], "email_on_failure": False, "email_on_retry": False, "retries": 0, "retry_delay": timedelta(minutes=1), } NO_OF_TWEETS_TO_STREAM = 100 with DAG("stream_from_twitter", catchup=False, default_args=default_args, schedule_interval="@hourly") as dag: task1 = PythonOperator(task_id="stream_from_twitter_to_kafka", python_callable=read_stream_of_tweets, op_args=(NO_OF_TWEETS_TO_STREAM,)) task1

read_stream_of_tweets的代码使用tweepy读取推文的传入流并将其发布到kafka主题:

The code for read_stream_of_tweets uses tweepy to read incoming stream of tweets and publishes to a kafka topic:

# override tweepy.StreamListener to add logic to on_status class MyStreamListener(tweepy.StreamListener): def __init__(self, *args, **kwargs): self.num_tweets = kwargs.pop('num_tweets') self.current_num_tweets = 0 super(MyStreamListener, self).__init__(*args, **kwargs) self.kafka_producer = MyKafkaProducer() def on_status(self, status): if self.current_num_tweets >= self.num_tweets: # Limiting to a number. return False if not status.text.startswith("RT"): print(status.text) status_info = { 'id': status.id_str, 'text': status.text } self.kafka_producer.publish_message(TOPIC_PUB, value=status_info) self.current_num_tweets = self.current_num_tweets + 1 def on_error(self, status_code): if status_code == 420: # returning False in on_data disconnects the stream return False def read_stream_of_tweets(n): auth = tweepy.OAuthHandler(consumer_token, consumer_secret) auth.set_access_token(access_token, access_secret) myStreamListener = MyStreamListener(num_tweets=n) myStream = tweepy.Stream(auth=auth, listener=myStreamListener) myStream.filter(track=['life'], languages=['en'])

这是任务的日志:

*** Reading local file: /Users/aneeshmakala/Documents/ComputerScience/datascience/hapPy/airflow/logs/stream_from_twitter/stream_from_twitter_to_kafka/2020-01-20T12:27:48.408593+00:00/1.log [2020-01-20 17:58:27,264] {base_task_runner.py:61} DEBUG - Planning to run as the user [2020-01-20 17:58:27,272] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set. [2020-01-20 17:58:27,272] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying. [2020-01-20 17:58:27,273] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks. [2020-01-20 17:58:27,273] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Task Instance State' PASSED: True, Task state queued was valid. [2020-01-20 17:58:27,273] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> [2020-01-20 17:58:27,277] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Pool Slots Available' PASSED: True, ('There are enough open slots in %s to execute the task', 'default_pool') [2020-01-20 17:58:27,280] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set. [2020-01-20 17:58:27,280] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying. [2020-01-20 17:58:27,280] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Task Concurrency' PASSED: True, Task concurrency is not set. [2020-01-20 17:58:27,280] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks. [2020-01-20 17:58:27,280] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> [2020-01-20 17:58:27,280] {taskinstance.py:866} INFO - -------------------------------------------------------------------------------- [2020-01-20 17:58:27,280] {taskinstance.py:867} INFO - Starting attempt 1 of 1 [2020-01-20 17:58:27,280] {taskinstance.py:868} INFO - -------------------------------------------------------------------------------- [2020-01-20 17:58:27,286] {taskinstance.py:887} INFO - Executing <Task(PythonOperator): stream_from_twitter_to_kafka> on 2020-01-20T12:27:48.408593+00:00 [2020-01-20 17:58:27,288] {standard_task_runner.py:52} INFO - Started process 11912 to run task [2020-01-20 17:58:27,315] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,314] {cli_action_loggers.py:68} DEBUG - Calling callbacks: [<function default_action_log at 0x10da70830>] [2020-01-20 17:58:27,326] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,326] {settings.py:213} DEBUG - Setting up DB connection pool (PID 11912) [2020-01-20 17:58:27,327] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,326] {settings.py:221} DEBUG - settings.configure_orm(): Using NullPool [2020-01-20 17:58:27,329] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,329] {dagbag.py:403} INFO - Filling up the DagBag from /Users/aneeshmakala/Documents/ComputerScience/datascience/hapPy/airflow/dags/stream_from_twitter.py [2020-01-20 17:58:27,330] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,330] {dagbag.py:232} DEBUG - Importing /Users/aneeshmakala/Documents/ComputerScience/datascience/hapPy/airflow/dags/stream_from_twitter.py [2020-01-20 17:58:27,332] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,332] {dagbag.py:370} DEBUG - Loaded DAG <DAG: stream_from_twitter> [2020-01-20 17:58:27,351] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [running]> 1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.ip6.arpa [2020-01-20 17:58:27,364] {__init__.py:97} DEBUG - Preparing lineage inlets and outlets [2020-01-20 17:58:27,364] {__init__.py:133} DEBUG - inlets: [], outlets: [] [2020-01-20 17:58:27,364] {python_operator.py:105} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_EMAIL=makalaaneesh18@mail AIRFLOW_CTX_DAG_OWNER=me AIRFLOW_CTX_DAG_ID=stream_from_twitter AIRFLOW_CTX_TASK_ID=stream_from_twitter_to_kafka AIRFLOW_CTX_EXECUTION_DATE=2020-01-20T12:27:48.408593+00:00 AIRFLOW_CTX_DAG_RUN_ID=manual__2020-01-20T12:27:48.408593+00:00 [2020-01-20 17:58:27,367] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,366] {oauth1_auth.py:77} DEBUG - Signing request <PreparedRequest [POST]> using client <Client client_key=XXXXXX, client_secret=****, resource_owner_key=XXXXXX, resource_owner_secret=****, signature_method=HMAC-SHA1, signature_type=AUTH_HEADER, callback_uri=None, rsa_key=None, verifier=None, realm=None, encoding=utf-8, decoding=None, nonce=None, timestamp=None> [2020-01-20 17:58:27,368] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,367] {oauth1_auth.py:93} DEBUG - Including body in call to sign: True [2020-01-20 17:58:27,369] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,369] {__init__.py:133} DEBUG - Collected params: [('delimited', 'length'), ('oauth_nonce', 'XXXXXX'), ('oauth_timestamp', '1579523307'), ('oauth_version', '1.0'), ('oauth_signature_method', 'HMAC-SHA1'), ('oauth_consumer_key', 'XXXXXX'), ('oauth_token', 'XXXXXX'), ('track', 'life'), ('language', 'en')] [2020-01-20 17:58:27,370] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,370] {__init__.py:137} DEBUG - Normalized params: delimited=length&language=en&oauth_consumer_key=XXXXXX&oauth_nonce=XXXXXX&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1579523307&oauth_token=XXXXXX&oauth_version=1.0&track=life [2020-01-20 17:58:27,370] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,370] {__init__.py:138} DEBUG - Normalized URI: stream.twitter/1.1/statuses/filter.json [2020-01-20 17:58:27,371] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,371] {__init__.py:143} DEBUG - Signing: signature base string: POST&https%3A%2F%2Fstream.twitter%2F1.1%2Fstatuses%2Ffilter.json&delimited%3Dlength%26language%3Den%26oauth_consumer_key%3DXXXXXX%26oauth_nonce%3DXXXXXX%26oauth_signature_method%3DHMAC-SHA1%26oauth_timestamp%3D1579523307%26oauth_token%3DXXXXXX%26oauth_version%3D1.0%26track%3Dlife [2020-01-20 17:58:27,371] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,371] {__init__.py:150} DEBUG - Signature: JEwre9zNc+Ge6ezoGop6oXpp5Js= [2020-01-20 17:58:27,372] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,372] {oauth1_auth.py:114} DEBUG - Updated url: stream.twitter/1.1/statuses/filter.json?delimited=length [2020-01-20 17:58:27,372] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,372] {oauth1_auth.py:115} DEBUG - Updated headers: {'Content-Type': 'application/x-www-form-urlencoded', 'Content-Length': '22', 'Authorization': 'OAuth oauth_nonce="XXXXXX", oauth_timestamp="1579523307", oauth_version="1.0", oauth_signature_method="HMAC-SHA1", oauth_consumer_key="XXXXXX", oauth_token="XXXXXX", oauth_signature="JEwre9zNc%2BGe6ezoGop6oXpp5Js%3D"'} [2020-01-20 17:58:27,373] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,373] {oauth1_auth.py:116} DEBUG - Updated body: '\x1b[1mtrack=life&language=en\x1b[0m' [2020-01-20 17:58:32,274] {logging_mixin.py:112} INFO - [2020-01-20 17:58:32,273] {base_job.py:200} DEBUG - [heartbeat] [2020-01-20 17:58:32,274] {logging_mixin.py:112} INFO - [2020-01-20 17:58:32,274] {local_task_job.py:124} DEBUG - Time since last heartbeat(0.01 s) < heartrate(5.0 s), sleeping for 4.990854 s [2020-01-20 17:58:37,265] {logging_mixin.py:112} INFO - [2020-01-20 17:58:37,265] {local_task_job.py:103} INFO - Task exited with return code -6

经过一番研究,显然,返回代码-6与SIGABRT相关联。我不知道为什么任务进程会中止。

After some research, apparently, return code -6 is associated with SIGABRT. I have no idea why the task process is aborting.

使用 airflow test手动离线测试任务stream_from_twitter stream_from_twitter_to_kafka 20200120 奇迹般有效。但是在由调度程序运行时,上面的日志将失败。

Manually testing the task offline using airflow test stream_from_twitter stream_from_twitter_to_kafka 20200120 works like a charm. But it fails with above log when run by the scheduler.

不知道这里发生了什么。我在> GoogleCloudStorageDownloadOperator任务已退出,返回代码-6上看到了类似的内容。

No idea what's going on here. I saw something similar at GoogleCloudStorageDownloadOperator "Task exited with return code -6", but there's no solution there either.

编辑:

  • 粘贴的调试日志

  • Pasted DEBUG logs

    气流版本:1.10.7

    airflow version: 1.10.7

    执行器:SequentialExecutor (默认)

    Executor: SequentialExecutor (default)

    数据库后端:我在这里使用默认设置。我尚未修改配置以使用mysql或postgres。

    DB backend: I'm using the default settings here. I haven't modified the config to use mysql or postgres.

    推荐答案

    这与气流或 tweepy 无关。

    此该问题特定于Mac OS High Sierra及更高版本。 stackoverflow/a/52230415/4434664 解决了我的问题。

    This issue is specific to Mac OS High Sierra and above. stackoverflow/a/52230415/4434664 solved my issue.

    基本上,气流测试只是在进程内运行任务,但是调度程序将启动一个工作进程,该进程将调用 fork(),显然,High Sierra引入了一些新的安全性更改,打破了python中的 fork()用法。

    Basically, airflow test merely runs the task in-process, but the scheduler would start a worker process which would call fork(), and apparently, High Sierra introduced some new security changes that's breaking fork() usages in python.

    这也引起了ansible问题。请参考 github/ansible/ansible/issues/32499#issuecomment- 341578864

    This also caused problems in ansible. Refer github/ansible/ansible/issues/32499#issuecomment-341578864

  • 更多推荐

    运行tweepy的气流任务退出,返回码为

    本文发布于:2023-10-18 23:25:49,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1505722.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:气流   tweepy

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!