无法在Python中创建新线程(Can't create new threads in Python)

编程入门 行业动态 更新时间:2024-10-28 10:23:42
无法在Python中创建新线程(Can't create new threads in Python) import threading threads = [] for n in range(0, 60000): t = threading.Thread(target=function,args=(x, n)) t.start() threads.append(t) for t in threads: t.join()

它在我的笔记本电脑上运行范围高达800,但如果我将范围增加到800以上,则会出现can't create new thread的错误。

我怎样才能控制号码线程创建或任何其他方式来使它像超时工作? 我尝试使用threading.BoundedSemaphore函数,但似乎并没有正常工作。

import threading threads = [] for n in range(0, 60000): t = threading.Thread(target=function,args=(x, n)) t.start() threads.append(t) for t in threads: t.join()

It is working well for range up to 800 on my laptop, but if I increase range to more than 800 I get the error can't create new thread.

How can I control number to threads to get created or any other way to make it work like timeout? I tried using threading.BoundedSemaphore function but that doesn't seem to work properly.

最满意答案

问题在于,没有一个主要的平台(截至2013年中)可以让你在这个数量的线程附近创建任何地方。 有很多种不同的限制你可以遇到,并且不知道你的平台,它的配置和你得到的确切的错误,你不可能知道你遇到了哪一个。 但是这里有两个例子:

在32位Windows上,默认线程堆栈为1MB,并且所有线程堆栈必须与程序中的所有其他内存空间相同,以便在60000之前运行很久。 在64位的Linux上,你可能会耗尽你的会话的一个软ulimit值,然后你会在任何地方接近页面空间。 (Linux有许多不同于POSIX所要求的限制)。

那么,我怎样才能控制数字线程来创建或任何其他方式来使它像超时或其他任何工作?

使用尽可能多的线程是不太可能成为你真正想要做的。 在8核机器上运行800个线程意味着你花费了大量时间在线程之间进行上下文切换,并且缓存在获得准备就绪之前不断刷新,等等。

最有可能的是,你真正想要的是以下之一:

每个CPU有一个线程,服务于60000个任务池。 可能是进程而不是线程(如果主要工作是在Python中,或者在没有明确释放GIL的C代码中)。 也许有固定数量的线程(例如,一个Web浏览器可能会执行12个并发请求,无论您是否拥有1个内核或64个内核)。 也许是一批600个批次的100个任务,而不是60000个单一任务。 60000合作定制的光纤/ greenlets / microthreads共享一个真正的线程。 也许显式的协程而不是调度器。 或通过例如gevent “魔术”合作绿色gevent 。 也许每个CPU有一个线程,每个线程运行1 / N的光纤。

但这当然是可能的

一旦你达到了你打的任何限制,很可能再次尝试失败,直到一个线程完成了它的工作并被加入,并且很有可能在这种事情发生后再次尝试成功。 所以,鉴于你显然正在发生异常,你可以像处理Python中的其他任何东西一样处理:使用try / except块。 例如,像这样的东西:

threads = [] for n in range(0, 60000): while True: t = threading.Thread(target=function,args=(x, n)) try: t.start() threads.append(t) except WhateverTheExceptionIs as e: if threads: threads[0].join() del threads[0] else: raise else: break for t in threads: t.join()

当然,这假设推出的第一项任务可能是完成的首批任务之一。 如果不是这样,你需要一些方法来明确表示完成(条件,信号量,队列等),否则你需要使用一些底层(特定于平台)的库,它可以为你提供一种方式等待整个列表,直到至少有一个线程完成。

另外请注意,在某些平台(例如Windows XP)上,您可能会遇到离奇的行为,只是越来越接近极限。


除了做得更好之外,做正确的事情也可能会更简单。 例如,以下是每个CPU进程池:

with concurrent.futures.ProcessPoolExecutor() as executor: fs = [executor.submit(function, x, n) for n in range(60000)] concurrent.futures.wait(fs)

......和一个固定线程数池:

with concurrent.futures.ThreadPoolExecutor(12) as executor: fs = [executor.submit(function, x, n) for n in range(60000)] concurrent.futures.wait(fs)

...以及一个平衡--CPU平行度与numpy矢量化批处理池:

with concurrent.futures.ThreadPoolExecutor() as executor: batchsize = 60000 // os.cpu_count() fs = [executor.submit(np.vector_function, x, np.arange(n, min(n+batchsize, 60000))) for n in range(0, 60000, batchsize)] concurrent.futures.wait(fs)

在上面的例子中,我使用列表理解来提交所有的工作并收集他们的未来,因为我们没有在循环内做任何其他事情。 但是从你的评论看来,这听起来像你在循环中有其他想做的事情。 所以,让我们将其转换回显式声明:

with concurrent.futures.ProcessPoolExecutor() as executor: fs = [] for n in range(60000): fs.append(executor.submit(function, x, n)) concurrent.futures.wait(fs)

现在,无论你想在循环中添加什么,你都可以。


但是,我不认为你真的想在循环中添加任何东西。 循环只是尽可能快地提交所有工作; 这是等待它们完成的wait功能,并且可能在那里你想早点退出。

要做到这一点,可以使用FIRST_COMPLETED标志wait ,但使用as_completed要简单得多。

另外,我假设error是某种由任务设置的值。 在这种情况下,您需要在其中放置一个Lock ,就像在线程之间共享任何其他可变值一样。 (这是一个在ProcessPoolExecutor和ThreadPoolExecutor之间稍微超过一行差异的地方 - 如果您使用进程,则需要使用multiprocessing.Lock而不是threading.Lock 。)

所以:

error_lock = threading.Lock error = [] def function(x, n): # blah blah try: # blah blah except Exception as e: with error_lock: error.append(e) # blah blah with concurrent.futures.ProcessPoolExecutor() as executor: fs = [executor.submit(function, x, n) for n in range(60000)] for f in concurrent.futures.as_completed(fs): do_something_with(f.result()) with error_lock: if len(error) > 1: exit()

但是,您可能想要考虑其他设计。 一般来说,如果你可以避免线程之间的共享,你的生活变得更容易。 而期货的设计就是为了让您轻松实现,就像让普通函数调用一样,让您返回值或引发异常。 那f.result()会给你返回的值或引发引发的异常。 因此,您可以将该代码重写为:

def function(x, n): # blah blah # don't bother to catch exceptions here, let them propagate out with concurrent.futures.ProcessPoolExecutor() as executor: fs = [executor.submit(function, x, n) for n in range(60000)] error = [] for f in concurrent.futures.as_completed(fs): try: result = f.result() except Exception as e: error.append(e) if len(error) > 1: exit() else: do_something_with(result)

请注意,这与文档中的ThreadPoolExecutor示例看起来有多相似。 这个简单的模式足以处理几乎没有锁的任何事物,只要这些任务不需要彼此交互即可。

The problem is that no major platform (as of mid-2013) will let you create anywhere near this number of threads. There are a wide variety of different limitations you could run into, and without knowing your platform, its configuration, and the exact error you got, it's impossible to know which one you ran into. But here are two examples:

On 32-bit Windows, the default thread stack is 1MB, and all of your thread stacks have to fit into the same 2GB of virtual memory space as everything else in your program, so you will run out long before 60000. On 64-bit linux, you will likely exhaust one of your session's soft ulimit values before you get anywhere near running out of page space. (Linux has a variety of different limits beyond the ones required by POSIX.)

So, how can i control number to threads to get created or any other way to make it work like timeout or whatever?

Using as many threads as possible is very unlikely to be what you actually want to do. Running 800 threads on an 8-core machine means that you're spending a whole lot of time context-switching between the threads, and the cache keeps getting flushed before it ever gets primed, and so on.

Most likely, what you really want is one of the following:

One thread per CPU, serving a pool of 60000 tasks. Maybe processes instead of threads (if the primary work is in Python, or in C code that doesn't explicitly release the GIL). Maybe a fixed number of threads (e.g., a web browsers may do, say, 12 concurrent requests at a time, whether you have 1 core or 64). Maybe a pool of, say, 600 batches of 100 tasks apiece, instead of 60000 single tasks. 60000 cooperatively-scheduled fibers/greenlets/microthreads all sharing one real thread. Maybe explicit coroutines instead of a scheduler. Or "magic" cooperative greenlets via, e.g. gevent. Maybe one thread per CPU, each running 1/Nth of the fibers.

But it's certainly possible.

Once you've hit whichever limit you're hitting, it's very likely that trying again will fail until a thread has finished its job and been joined, and it's pretty likely that trying again will succeed after that happens. So, given that you're apparently getting an exception, you could handle this the same way as anything else in Python: with a try/except block. For example, something like this:

threads = [] for n in range(0, 60000): while True: t = threading.Thread(target=function,args=(x, n)) try: t.start() threads.append(t) except WhateverTheExceptionIs as e: if threads: threads[0].join() del threads[0] else: raise else: break for t in threads: t.join()

Of course this assumes that the first task launched is likely to be the one of the first tasks finished. If this is not true, you'll need some way to explicitly signal doneness (condition, semaphore, queue, etc.), or you'll need to use some lower-level (platform-specific) library that gives you a way to wait on a whole list until at least one thread is finished.

Also, note that on some platforms (e.g., Windows XP), you can get bizarre behavior just getting near the limits.


On top of being a lot better, doing the right thing will probably be a lot simpler as well. For example, here's a process-per-CPU pool:

with concurrent.futures.ProcessPoolExecutor() as executor: fs = [executor.submit(function, x, n) for n in range(60000)] concurrent.futures.wait(fs)

… and a fixed-thread-count pool:

with concurrent.futures.ThreadPoolExecutor(12) as executor: fs = [executor.submit(function, x, n) for n in range(60000)] concurrent.futures.wait(fs)

… and a balancing-CPU-parallelism-with-numpy-vectorization batching pool:

with concurrent.futures.ThreadPoolExecutor() as executor: batchsize = 60000 // os.cpu_count() fs = [executor.submit(np.vector_function, x, np.arange(n, min(n+batchsize, 60000))) for n in range(0, 60000, batchsize)] concurrent.futures.wait(fs)

In the examples above, I used a list comprehension to submit all of the jobs and gather their futures, because we're not doing anything else inside the loop. But from your comments, it sounds like you do have other stuff you want to do inside the loop. So, let's convert it back into an explicit for statement:

with concurrent.futures.ProcessPoolExecutor() as executor: fs = [] for n in range(60000): fs.append(executor.submit(function, x, n)) concurrent.futures.wait(fs)

And now, whatever you want to add inside that loop, you can.


However, I don't think you actually want to add anything inside that loop. The loop just submits all the jobs as fast as possible; it's the wait function that sits around waiting for them all to finish, and it's probably there that you want to exit early.

To do this, you can use wait with the FIRST_COMPLETED flag, but it's much simpler to use as_completed.

Also, I'm assuming error is some kind of value that gets set by the tasks. In that case, you will need to put a Lock around it, as with any other mutable value shared between threads. (This is one place where there's slightly more than a one-line difference between a ProcessPoolExecutor and a ThreadPoolExecutor—if you use processes, you need multiprocessing.Lock instead of threading.Lock.)

So:

error_lock = threading.Lock error = [] def function(x, n): # blah blah try: # blah blah except Exception as e: with error_lock: error.append(e) # blah blah with concurrent.futures.ProcessPoolExecutor() as executor: fs = [executor.submit(function, x, n) for n in range(60000)] for f in concurrent.futures.as_completed(fs): do_something_with(f.result()) with error_lock: if len(error) > 1: exit()

However, you might want to consider a different design. In general, if you can avoid sharing between threads, your life gets a lot easier. And futures are designed to make that easy, by letting you return a value or raise an exception, just like a regular function call. That f.result() will give you the returned value or raise the raised exception. So, you can rewrite that code as:

def function(x, n): # blah blah # don't bother to catch exceptions here, let them propagate out with concurrent.futures.ProcessPoolExecutor() as executor: fs = [executor.submit(function, x, n) for n in range(60000)] error = [] for f in concurrent.futures.as_completed(fs): try: result = f.result() except Exception as e: error.append(e) if len(error) > 1: exit() else: do_something_with(result)

Notice how similar this looks to the ThreadPoolExecutor Example in the docs. This simple pattern is enough to handle almost anything without locks, as long as the tasks don't need to interact with each other.

更多推荐

本文发布于:2023-07-16 13:33:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1129067.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:线程   建新   中创   Python   create

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!