【笔记】Win10 Python3.6 celery4.0+学习整理

编程入门 行业动态 更新时间:2024-10-16 18:37:00

【<a href=https://www.elefans.com/category/jswz/34/1770047.html style=笔记】Win10 Python3.6 celery4.0+学习整理"/>

【笔记】Win10 Python3.6 celery4.0+学习整理

学习视频:=7

文章目录

  • 前言
  • 一、celery 工作原理
  • 二、使用步骤
    • 1. 安装
      • 1. 安装python库
      • 2. 安装redis
    • 2. 异步任务
      • 1.任务定义
      • 2. 创建celery
      • 3.启动
        • 1. 启动redis
        • 2. 启动celery
      • 4. 生产者发布任务
      • 5. 任务结果查询
    • 3. 定时任务
      • 1. 任务定义
      • 2. 创建celery
      • 3. 启动
        • 1. 启动worker
        • 2. 启动beat
    • 4. Django 下的定时任务
      • 1. 任务定义
      • 2. 创建celery
      • 3. 启动celery
      • 4. 建立生产者
      • 5. 配置路由
      • 6. 启动Django
  • 总结


前言

之前项目里有用到celery,当时忙着赶项目挑了要用的功能点看了下文档就写了。
最近有时间,学习整理一下


提示:以下是本篇文章正文内容,下面案例可供参考

一、celery 工作原理

  • 对于异步任务
    生产者:指有任务需求的一端,会发布任务。
    worker:celery内部工作者,会来领取任务执行。
  1. 用户自定义许多任务;
  2. 生产者可以不断地发送任务指令给redis的其中一个库 #1,每个任务对应一个id,生产者就可以去做别的事情了;
  3. redis#1拿到任务后会把这些任务指令按顺序存放;
  4. celery的worker监听着redis#1,一旦发现里面有任务,就按序取出,并发执行;
  5. 执行完毕后,结果保存到redis#2内;
  6. 生产者可以根据任务id查看任务的进展。
  • 对于定时任务

beater:celery内部工作者,代替生产者发布任务

  1. 用户自定义任务和每个任务的执行时间、频率;
  2. celery的beater会根据设定时间向redis内传入任务指令;

其余步骤与异步一致

二、使用步骤

1. 安装

1. 安装python库

pip install redispip install celerypip install eventlet

2. 安装redis


2. 异步任务

目录结构

├── multi_task_celery│   ├── celery_produce.py│   ├── celery_result.py│   ├── celery_tasks.py│   ├── task01.py│   └── task02.py└──
└──

1.任务定义

task01.py(示例):

import time
from celery_tasks import app@app.task
def send_mail(name):print("向%s发送邮件..." % name)time.sleep(5)print("向%s发送邮件完成" % name)return '邮件完成!'

task02.py(示例):

import time
from celery_tasks import app@app.task
def send_msg(name):print("向%s发送短信..." % name)time.sleep(5)print("向%s发送短信完成" % name)return '短信完成!'

2. 创建celery

celery_task.py(示例):

import celery
broker = 'redis://127.0.0.1:6379/1'   # 消息中间件 库1
backend = 'redis://127.0.0.1:6379/2'  # 存储结果   库2# 多任务文件,一般多任务不会写在同一个py内,如果写在不同py内就需要通过列表方式引入
includes = ['task01','task02'
]
app = celery.Celery('celery_demo', backend=backend, broker=broker, include=includes)# 时区
app.conf.timezone = 'Asia/Shanghai'

3.启动

1. 启动redis

cmd命令行到redis安装路径下,执行

redis-server redis.windows.conf
2. 启动celery
celery -A celery_task worker -l info -P eventlet

eventlet是开启协程,这个版本不开就不行

开启后, worker就开始监听redis#1

4. 生产者发布任务

celery_produce.py(示例):

from task01 import send_mail
from task02 import send_msgresult = send_mail.delay("Jack")
print(result.id)result2 = send_mail.delay("Rose")
print(result2.id)

执行celery_produce.py后,celery会将任务加入到redis#1队列,监听到的worker会并发取出任务执行
worker的数量默认为4

5. 任务结果查询

celery_result.py(示例):

from celery_tasks import app
from celery.result import AsyncResultresult = AsyncResult(id='a04b540d-2896-4d41-98b9-9b427bf79e70', app=app)if result.successful():res = result.get()print(res)result.forget()  # 将结果删除 (执行完毕后,结果不会自动删除)
elif result.failed():print("执行失败")
elif result.status == "PENDING":print("等待执行")
elif result.status == "RETRY":print("正在重试")
elif result.status == "STARTED":print("正在开始")

3. 定时任务

目录结构

├── multi_time_celery│   ├──  tasks│   │   ├── task01.py│   │   └──task02.py│   └──  celery_tasks.py└──
└──

1. 任务定义

task01.py(示例):

from celery_task import app
import time@app.task(name='tasks.task01.send_mail') # 这种模式下一定要加路径
def send_mail(name):print("向%s发送邮件..." % name)time.sleep(5)print("向%s发送邮件完成" % name)return '邮件完成!'

task02.py(示例):

from celery_task import app
import time@app.task(name='tasks.task02.send_msg')
def send_msg(name):print("向%s发送短信..." % name)time.sleep(5)print("向%s发送短信完成" % name)return '短信完成!'

2. 创建celery

celery_task.py(示例):

import celery
from datetime import timedelta
from celery.schedules import crontabbackend = 'redis://localhost:6379/1'
broker = 'redis://localhost:6379/2'
include = ['tasks.task01', 'tasks.task02']
app = celery.Celery('multi_time_task', backend=backend, broker=broker, include=include)# 定时任务配置
app.conf.beat_schedule = {# 名字自定义'add-every-1-minute': {'task': 'tasks.task01.send_mail',# 'schedule': 1  # 每隔1秒执行一次'schedule': crontab(minute="*/1"), # 每分钟执行一次'args': ('张三', ),},'add-every-1-day': {'task': 'tasks.task02.send_msg','schedule': timedelta(days=1),  # 每天执行一次'args': ('李四', ),}
}

3. 启动

1. 启动worker
celery -A celery_task worker -l info -P eventlet
2. 启动beat
celery -A celery_task beat

如果出现启动worker且还没启动beat时,worker就已经不断接收任务、执行任务,说明之前redis#1中的旧任务没有清除;
如果业务中已经不需要旧任务了,则手动清空redis#1中数据


4. Django 下的定时任务

创建一个django项目django_celery

目录结构

├── django_celery│   ├──  app01│   │   ├── ...│   │   └── views.py│   ├──  django_celery│   ├──  mycelery│   │   ├──  ...│   │   └── urls.py│   │   ├──  sms│   │   │   ├──  tasks.py│   │   │   └── __init__.py│   │   ├──  email│   │   │   ├──  tasks.py│   │   │   └── __init__.py│   │   ├──  config.py│   │   └──  main.py│   └──└──
└──    
  • app01为app项目
  • 在此例中,通过请求页面触发任务,所以任务发起代码写在views.py
  • mycelery内创建各种类型的任务,如email为发邮件任务,sms为发短信任务,各任务被main.py扫描,然后被celery worker监听

1. 任务定义

/mycelery/sms/tasks.py(示例):

# 此文件的文件名必须为tasks,其他的父目录叫什么无所谓
from mycelery.main import app
import time
import logginglog = logging.getLogger('django')@app.task
def send_sms(mobile):print("向%s发送短信"%mobile)time.sleep(5)return 'send sms ok'@app.task
def send_sms2(mobile):print("向%s发送短信2"%mobile)time.sleep(5)return 'send sms2 ok'

2. 创建celery

/mycelery/main.py(示例):

import os
from celery import Celeryapp = Celery('test_celery')# 加载django的配置变量,识别并加载django的配置文件,把celery和django绑定
# os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery.settings.dev')# 加载配置
app.config_from_object('mycelery.config')# 扫描任务,自动寻找目录下的task
app.autodiscover_tasks(["mycelery.sms", ])

3. 启动celery

/路径下启动

celery -A mycelery.main worker --loglevel=info -P eventlet

4. 建立生产者

/app01/views.py(示例):


from django.shortcuts import render, HttpResponse
from mycelery.sms.tasks import send_sms2, send_sms
from datetime import datetime, timedelta# 生产者
# 注意:生产者和消费者如果不在一个服务器下,需要两台服务器两份一模一样的任务代码,没有别的办法
def celery_test(request):# 异步任务send_sms.delay('110')send_sms2.delay('119')# 定时任务nowtime = datetime.now()utc_time = datetime.utcfromtimestamp(nowtime.timestamp())delta = timedelta(seconds=10) # 定时在当前时间的10秒后执行task_time = utc_time + delta# 注意此处eta一定要是UTCsend_sms.apply_async(args=['114', ], eta=task_time)send_sms2.apply_async(args=['120', ], eta=task_time)return HttpResponse('ok')

5. 配置路由

/django_celery/urls.py

from django.contrib import admin
from django.urls import path
from app01 import viewsurlpatterns = [path('admin/', admin.site.urls),path('test/', views.celery_test)
]

6. 启动Django

然后启动django项目
访问http://localhost:8000/test
页面会即时返回Response,实现异步任务发送和定时任务发送

总结

本文仅简单介绍celery 的异步任务和定时任务,具体功能配置详见官方文档:
/

更多推荐

【笔记】Win10 Python3.6 celery4.0+学习整理

本文发布于:2024-03-07 23:30:23,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1719212.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:笔记

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!