上下文管理器和多处理池

编程入门 行业动态 更新时间:2024-10-23 19:21:26
本文介绍了上下文管理器和多处理池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

假设您正在使用multiprocessing.Pool对象,并且正在使用构造函数的initializer设置传递初始值设定项函数,该函数随后在全局名称空间中创建资源.假设资源具有上下文管理器.如果上下文管理的资源必须在流程的整个生命周期中都存在,但最终要对其进行适当的清理,那么您将如何处理它的生命周期呢?

Suppose you are using a multiprocessing.Pool object, and you are using the initializer setting of the constructor to pass an initializer function that then creates a resource in the global namespace. Assume resource has a context manager. How would you handle the life-cycle of the context managed resource provided it has to live through the life of the process, but be properly cleaned up at the end?

到目前为止,我有一些类似的东西:

So far, I have something somewhat like this:

resource_cm = None resource = None def _worker_init(args): global resource resource_cm = open_resource(args) resource = resource_cm.__enter__()

从这里开始,池进程可以使用资源.到目前为止,一切都很好.但是,由于multiprocessing.Pool类未提供destructor或deinitializer自变量,因此处理清理工作有些棘手.

From here on, the pool processes can use the resource. So far so good. But handling clean up is a bit trickier, since the multiprocessing.Pool class does not provide a destructor or deinitializer argument.

我的想法之一是使用atexit模块,并在初始化程序中注册清除操作.像这样:

One of my ideas is to use the atexit module, and register the clean up in the initializer. Something like this:

def _worker_init(args): global resource resource_cm = open_resource(args) resource = resource_cm.__enter__() def _clean_up(): resource_cm.__exit__() import atexit atexit.register(_clean_up)

这是一个好方法吗?有更简单的方法吗?

Is this a good approach? Is there an easier way of doing this?

atexit似乎不起作用.至少在上面我没有用它的方式,所以到目前为止,我仍然没有解决这个问题的方法.

atexit does not seem to work. At least not in the way I am using it above, so as of right now I still do not have a solution for this problem.

推荐答案

首先,这是一个非常好的问题!在研究了multiprocessing代码后,我想我已经找到了一种方法:

First, this is a really great question! After digging around a bit in the multiprocessing code, I think I've found a way to do this:

启动multiprocessing.Pool时,在内部Pool对象为池中的每个成员创建一个multiprocessing.Process对象.当这些子流程启动时,它们会调用_bootstrap函数,如下所示:

When you start a multiprocessing.Pool, internally the Pool object creates a multiprocessing.Process object for each member of the pool. When those sub-processes are starting up, they call a _bootstrap function, which looks like this:

def _bootstrap(self): from . import util global _current_process try: # ... (stuff we don't care about) util._finalizer_registry.clear() util._run_after_forkers() util.info('child process calling self.run()') try: self.run() exitcode = 0 finally: util._exit_function() # ... (more stuff we don't care about)

run方法是实际运行您赋予Process对象的target的方法.对于Pool进程,该方法具有长时间运行的while循环,该循环等待工作项通过内部队列进入.对我们来说真正有趣的是在 self.run之后发生的事情:util._exit_function()被调用.

The run method is what actually runs the target you gave the Process object. For a Pool process that's a method with a long-running while loop that waits for work items to come in over an internal queue. What's really interesting for us is what happened after self.run: util._exit_function() is called.

事实证明,该函数进行了一些清理,听起来很像您要寻找的东西:

As it turns out, that function does some clean up that sounds a lot like what you're looking for:

def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, active_children=active_children, current_process=current_process): # NB: we hold on to references to functions in the arglist due to the # situation described below, where this function is called after this # module's globals are destroyed. global _exiting info('process shutting down') debug('running all "atexit" finalizers with priority >= 0') # Very interesting! _run_finalizers(0)

这是_run_finalizers的文档字符串:

def _run_finalizers(minpriority=None): ''' Run all finalizers whose exit priority is not None and at least minpriority Finalizers with highest priority are called first; finalizers with the same priority will be called in reverse order of creation. '''

该方法实际上遍历终结器回调列表并执行它们:

The method actually runs through a list of finalizer callbacks and executes them:

items = [x for x in _finalizer_registry.items() if f(x)] items.sort(reverse=True) for key, finalizer in items: sub_debug('calling %s', finalizer) try: finalizer() except Exception: import traceback traceback.print_exc()

完美.那么我们如何进入_finalizer_registry? multiprocessing.util中有一个未记录的对象,称为Finalize,该对象负责向注册表添加回调:

Perfect. So how do we get into the _finalizer_registry? There's an undocumented object called Finalize in multiprocessing.util that is responsible for adding a callback to the registry:

class Finalize(object): ''' Class which supports object finalization using weakrefs ''' def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): assert exitpriority is None or type(exitpriority) is int if obj is not None: self._weakref = weakref.ref(obj, self) else: assert exitpriority is not None self._callback = callback self._args = args self._kwargs = kwargs or {} self._key = (exitpriority, _finalizer_counter.next()) self._pid = os.getpid() _finalizer_registry[self._key] = self # That's what we're looking for!

好,因此将它们放在一起作为一个示例:

Ok, so putting it all together into an example:

import multiprocessing from multiprocessing.util import Finalize resource_cm = None resource = None class Resource(object): def __init__(self, args): self.args = args def __enter__(self): print("in __enter__ of %s" % multiprocessing.current_process()) return self def __exit__(self, *args, **kwargs): print("in __exit__ of %s" % multiprocessing.current_process()) def open_resource(args): return Resource(args) def _worker_init(args): global resource print("calling init") resource_cm = open_resource(args) resource = resource_cm.__enter__() # Register a finalizer Finalize(resource, resource.__exit__, exitpriority=16) def hi(*args): print("we're in the worker") if __name__ == "__main__": pool = multiprocessing.Pool(initializer=_worker_init, initargs=("abc",)) pool.map(hi, range(pool._processes)) pool.close() pool.join()

输出:

calling init in __enter__ of <Process(PoolWorker-1, started daemon)> calling init calling init in __enter__ of <Process(PoolWorker-2, started daemon)> in __enter__ of <Process(PoolWorker-3, started daemon)> calling init in __enter__ of <Process(PoolWorker-4, started daemon)> we're in the worker we're in the worker we're in the worker we're in the worker in __exit__ of <Process(PoolWorker-1, started daemon)> in __exit__ of <Process(PoolWorker-2, started daemon)> in __exit__ of <Process(PoolWorker-3, started daemon)> in __exit__ of <Process(PoolWorker-4, started daemon)>

如您所见,当我们join()池时,我们所有工作人员中都会调用__exit__.

As you can see __exit__ gets called in all our workers when we join() the pool.

更多推荐

上下文管理器和多处理池

本文发布于:2023-11-23 10:06:05,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1621049.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:上下文   管理器   多处

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!