我有timeout上下文管理器,可以与信号完美配合,但是由于信号仅在主线程中起作用,因此它在多线程模式下会引发错误.
I have timeout context manager that works perfectly with signals but it raises error in multithread mode because signals work only in main thread.
def timeout_handler(signum, frame): raise TimeoutException() @contextmanager def timeout(seconds): old_handler = signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(seconds) try: yield finally: signal.alarm(0) signal.signal(signal.SIGALRM, old_handler)我已经看到了timeout的装饰器实现,但是我不知道如何在threading.Thread派生的类中传递yield.我的变体无法正常工作.
I've seen decorator implementation of timeout but I don't know how to pass yield inside class derived from threading.Thread. My variant won't work.
@contextmanager def timelimit(seconds): class FuncThread(threading.Thread): def run(self): yield it = FuncThread() it.start() it.join(seconds) if it.isAlive(): raise TimeoutException()推荐答案
如果上下文管理器保护的代码是基于循环的,请考虑按照人们处理线程杀死的方式进行处理.杀死另一个线程通常是不安全的,因此标准方法是让控制线程设置一个对工作线程可见的标志.辅助线程会定期检查该标志并彻底关闭自身.这是您可以执行类似于超时的操作的方法:
If the code guarded by the context manager is loop-based, consider handling this the way people handle thread killing. Killing another thread is generally unsafe, so the standard approach is to have the controlling thread set a flag that's visible to the worker thread. The worker thread periodically checks that flag and cleanly shuts itself down. Here's how you can do something analogous with timeouts:
class timeout(object): def __init__(self, seconds): self.seconds = seconds def __enter__(self): self.die_after = time.time() + self.seconds return self def __exit__(self, type, value, traceback): pass @property def timed_out(self): return time.time() > self.die_after这是一个单线程用法示例:
Here's a single-threaded usage example:
with timeout(1) as t: while True: # this will take a long time without a timeout # periodically check for timeouts if t.timed_out: break # or raise an exception # do some "useful" work print "." time.sleep(0.2)和一个多线程的:
import thread def print_for_n_secs(string, seconds): with timeout(seconds) as t: while True: if t.timed_out: break # or raise an exception print string, time.sleep(0.5) for i in xrange(5): thread.start_new_thread(print_for_n_secs, ('thread%d' % (i,), 2)) time.sleep(0.25)这种方法比使用信号更具侵入性,但它适用于任意线程.
This approach is more intrusive than using signals but it works for arbitrary threads.
更多推荐
带有线程的Python超时上下文管理器
发布评论