admin管理员组

文章数量:1568419

django

背景

在django项目中使用django-apschedule来实现定时任务,使用的是BackgroundScheduler调度类,该调度的实现是通过后台线程的方式执行定时任务。其中任务都是持久化到数据库中的。

在项目的运行过程中,因为数据库的异常,导致定时任务线程异常终止,即使数据库后续恢复正常,但也不再继续执行。我多次尝试复现未果,在开启定时任务期间,手动将数据库连接断开,定时任务执行失败,然后再将数据库建立连接,定时任务竟然重新恢复了,这让我一时摸不着头脑。

具体的错误日志如下,通过分析,是update_job连接数据库异常,没有任何捕获机制,然后层层网上抛,最终导致线程停止,可以很肯定的是,绝对是因为数据库连接失败导致的定时任务失败,那为什么无法复现呢?

Traceback (most recent call last):File "/usr/local/python3/lib/python3.7/threading.py", line 926, in _bootstrap_innerself.run()File "/usr/local/python3/lib/python3.7/threading.py", line 870, in runself._target(*self._args, **self._kwargs)File "/usr/local/python3/lib/python3.7/site-packages/apscheduler/schedulers/blocking.py", line 32, in _main_loopwait_seconds = self._process_jobs()File "/usr/local/python3/lib/python3.7/site-packages/apscheduler/schedulers/base.py", line 1009, in _process_jobsjobstore.update_job(job)File "/usr/local/python3/lib/python3.7/site-packages/django_apscheduler/util.py", line 105, in func_wrapperresult = func(*args, **kwargs)File "/usr/local/python3/lib/python3.7/site-packages/django_apscheduler/jobstores.py", line 249, in update_jobwith transaction.atomic():File "/usr/local/python3/lib/python3.7/site-packages/django/db/transaction.py", line 189, in __enter__if not connection.get_autocommit():File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 389, in get_autocommitself.ensure_connection()File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in innerreturn func(*args, **kwargs)File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 219, in ensure_connectionself.connect()File "/usr/local/python3/lib/python3.7/site-packages/django/db/utils.py", line 90, in __exit__raise dj_exc_value.with_traceback(traceback) from exc_valueFile "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 219, in ensure_connectionself.connect()File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in innerreturn func(*args, **kwargs)File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 200, in connectself.connection = self.get_new_connection(conn_params)File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in innerreturn func(*args, **kwargs)File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/postgresql/base.py", line 187, in get_new_connectionconnection = Database.connect(**conn_params)File "/usr/local/python3/lib/python3.7/site-packages/psycopg2/__init__.py", line 122, in connectconn = _connect(dsn, connection_factory=connection_factory, **kwasync)
django.db.utils.OperationalError: connection to server at "xxxx.postgresql.svc.cluster.local" (xx.xx.xx.xx), port xxxx failed: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.

源码分析原因

可以先看下BackgroundScheduler的实现方式,在start方法中创建了个子线程。

class BackgroundScheduler(BlockingScheduler):_thread = Nonedef start(self, *args, **kwargs):if self._event is None or self._event.is_set():self._event = Event()BaseScheduler.start(self, *args, **kwargs)self._thread = Thread(target=self._main_loop, name='APScheduler')self._thread.daemon = self._daemonself._thread.start()def shutdown(self, *args, **kwargs):super(BackgroundScheduler, self).shutdown(*args, **kwargs)self._thread.join()del self._thread

其中_main_loopBlockingScheduler中实现,是一个死循环,执行_process_jobs方法

class BlockingScheduler(BaseScheduler):...def _main_loop(self):wait_seconds = TIMEOUT_MAXwhile self.state != STATE_STOPPED:self._event.wait(wait_seconds)self._event.clear()wait_seconds = self._process_jobs()...

再看_process_jobs中的内容,在BaseScheduler实现的,主要流程如下,先找到所有要执行的job,然后进行遍历运行并更新Job的状态。之前的错误日志,也就是这里的update_job抛出异常,而这里并没有捕获异常,最终层层往上抛,update_job -> _process_jobs -> _main_loop,最终线程异常终止。

def _process_jobs(self):for jobstore_alias, jobstore in six.iteritems(self._jobstores):try:due_jobs = jobstore.get_due_jobs(now)except Exception as e:...continue...for job in due_jobs:...try:executor.submit_job(job, run_times)except BaseException:......jobstore.update_job(job)

那为什么复现不了呢?这个是因为,关闭数据库连接时,程序不一定可以正好运行在update_job,可以看到前面的get_due_jobs进行了异常捕获,如果这里抛出数据库连接异常是可以捕获到的,然后跳过后面的操作,等待下一次定时任务的执行,如果还是失败,则再次等待,所以这里的异常不会抛到最上层导致线程停止。

但如果某个时机,上面连接数据库都成功了,到update_job这里异常抛出,则会导致整个线程停止,定时任务不再执行。

那如何解决该问题呢?

搭建demo

首先我们搭建一个demo出来,模拟复现该问题。

  1. 创建django项目

django-admin startproject apschedule_demopython manage.py startapp demopython manage.py makemigrationspython manage.py migrate
  1. 在settings.py中配置到好数据库信息
DATABASES = {"default": {"ENGINE": "django.db.backends.postgresql","NAME": "apschedule_demo","HOST": "xxxx","PORT": 5432,"USER": "xxx","PASSWORD": "xxx"}
}
  1. 根据django-apschedule官方提供的文档搭建demo

在settings.py中添加该APP

INSTALLED_APPS = (# ..."django_apscheduler",
)

创建目录demo/management/commands,并在其下面创建runapscheduler.py文件,代码内容如下:

import loggingfrom django.conf import settingsfrom apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStorelogger = logging.getLogger(__name__)def my_job():# Your job processing logic here...print("job..")class Command(BaseCommand):help = "Runs APScheduler."def handle(self, *args, **options):scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)scheduler.add_jobstore(DjangoJobStore(), "default")scheduler.add_job(my_job,trigger=CronTrigger(second="*/3"),  # Every 3 secondsid="my_job",  # The `id` assigned to each job MUST be uniquemax_instances=1,replace_existing=True,)logger.info("Added job 'my_job'.")try:logger.info("Starting scheduler...")scheduler.start()# 因为上面是非阻塞开启定时任务,所以这里需要阻塞,不让主线程结束。while True:time.sleep(10)except KeyboardInterrupt:logger.info("Stopping scheduler...")scheduler.shutdown()logger.info("Scheduler shut down successfully!")

可以通过python manage.py runapscheduler执行上面的命令运行定时任务,该脚本创建了一个每3秒执行一次的任务。

  1. 复现

我们将断点打在jobstore.update_job(job)上,然后使用debug模式进行调试,当程序运行到断点上时,将数据库关闭,然后程序继续运行,则会报错,并抛出异常,线程停止了运行。至此,我们复现了该问题。

线程重启

我一开始想,我可以判断该线程是否异常,如果异常则将线程重启就好了

    while True:if not scheduler._thread.is_alive():scheduler._thread.start()time.sleep(10)

但事与愿违,抛出了异常,异常信息如下:

RuntimeError: threads can only be started once

通过查看官方文档可以知道,线程的start方法只能调用一次。

listener

apschedule中提供了监听器机制,也就是在定时任务的成功、失败等状态都可以通过提前注册的listener方法来进行回调。但通过分析源码,其并不能捕获到定时任务线程的异常。

下面是简化了代码的listeners的原理流程:

  1. 外部通过add_listener方法注册回调方法
  2. 在定时任务线程主流程_process_jobs中发生的各个事件添加到events中
  3. 遍历events事件,然后通过与注册的回调方法mask进行匹配,匹配上则调用回调方法
class BaseScheduler:def __init__(...):self._listeners = []def add_listener(self, callback, mask=EVENT_ALL):self._listeners.append((callback, mask))def _process_jobs(self):events = []...events.append(event)...for event in events:self._dispatch_event(event)def _dispatch_event(self, event):for cb, mask in listeners:if event.code & mask:try:cb(event)except BaseException:self._logger.exception('Error notifying listener')

如果线程本身挂了,回调方法是不可执行的。

捕获线程中函数的异常

如果update_job抛出异常导致线程停止,那我捕获它的异常,然后再continue,等待下次定时任务运行再重试不就好了,但是这就需要改动源码,能不能改源码就尽量不改。所以这边我采用了继承BackgroundScheduler类,然后再重写_process_jobs方法来解决。

在重写的_process_jobs方法中,对父类的_process_jobs()进行异常的捕获,然后再不断的进行重试,这样即使update_job抛出异常了,也可以不断的进行尝试恢复,直至成功。

class DemoBackgroundScheduler(BackgroundScheduler):def _process_jobs(self):while True:try:return super()._process_jobs()except BaseException:time.sleep(5)class Command(BaseCommand):help = "Runs APScheduler."def handle(self, *args, **options):scheduler = DemoBackgroundScheduler(timezone=settings.TIME_ZONE)...

然后再次尝试复现该问题,可以发现在断开数据库后,它能够一直进行重试,线程没有停止,当数据库恢复运行后,job执行成功,不再抛出异常。

相关链接

  • APScheduler官方文档

欢迎关注,互相学习,共同进步~

我的个人博客
公众号:编程黑洞

本文标签: django