如何在排队之前让 ThreadPoolExecutor 将线程增加到最大值?

编程入门 行业动态 更新时间:2024-10-24 04:31:46
本文介绍了如何在排队之前让 ThreadPoolExecutor 将线程增加到最大值?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

一段时间以来,我一直对 ThreadPoolExecutor 的默认行为感到沮丧,它支持我们许多人使用的 ExecutorService 线程池.引用 Javadocs:

I've been frustrated for some time with the default behavior of ThreadPoolExecutor which backs the ExecutorService thread-pools that so many of us use. To quote from the Javadocs:

如果有超过corePoolSize但少于maximumPoolSize的线程在运行,则只有当队列已满时才会创建一个新线程.

If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.

这意味着如果您使用以下代码定义线程池,它将永远启动第二个线程,因为 LinkedBlockingQueue 是无界的.

What this means is that if you define a thread pool with the following code, it will never start the 2nd thread because the LinkedBlockingQueue is unbounded.

ExecutorService threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

仅当您有一个有界队列并且队列已满时,核心号以上的任何线程才会启动.我怀疑很多初级 Java 多线程程序员都不知道 ThreadPoolExecutor 的这种行为.

Only if you have a bounded queue and the queue is full are any threads above the core number started. I suspect a large number of junior Java multithreaded programmers are unaware of this behavior of the ThreadPoolExecutor.

现在我有特定的用例,这不是最佳的.我正在寻找方法来解决它,而无需编写自己的 TPE 类.

Now I have specific use case where this is not-optimal. I'm looking for ways, without writing my own TPE class, to work around it.

我的要求是针对可能不可靠的第三方进行回调的网络服务.

My requirements are for a web service that is making call-backs to a possibly unreliable 3rd party.

  • 我不想与网络请求同步进行回调,所以我想使用线程池.
  • 我通常一分钟就能得到几个这样的结果,所以我不想让 newFixedThreadPool(...) 有大量线程,而这些线程大多处于休眠状态.
  • 每隔一段时间,我就会收到大量流量,我想将线程数扩大到某个最大值(比如 50).
  • 我需要最好尝试执行所有回调,因此我想将任何超过 50 的其他回调排队.我不想通过使用newCachedThreadPool().
  • I don't want to make the call-back synchronously with the web-request, so I want to use a thread-pool.
  • I typically get a couple of these a minute so I don't want to have a newFixedThreadPool(...) with a large number of threads that mostly are dormant.
  • Every so often I get a burst of this traffic and I want to scale up the number of threads to some max value (let's say 50).
  • I need to make a best attempt to do all callbacks so I want to queue up any additional ones above 50. I don't want to overwhelm the rest of my web-server by using a newCachedThreadPool().

我如何解决 ThreadPoolExecutor 中的这个限制,其中队列需要在 之前被限制并已满 将启动更多线程?我怎样才能让它在排队任务之前启动更多线程?

How can I work around this limitation in ThreadPoolExecutor where the queue needs to be bounded and full before more threads will be started? How can I get it to start more threads before queuing tasks?

@Flavio 很好地说明了使用 ThreadPoolExecutor.allowCoreThreadTimeOut(true) 来让核心线程超时和退出.我考虑过,但我仍然想要核心线程功能.如果可能,我不希望池中的线程数低于核心大小.

@Flavio makes a good point about using the ThreadPoolExecutor.allowCoreThreadTimeOut(true) to have the core threads timeout and exit. I considered that but I still wanted the core-threads feature. I did not want the number of threads in the pool to drop below the core-size if possible.

推荐答案

如何解决 ThreadPoolExecutor 中的此限制,其中在启动更多线程之前,队列需要有界且已满.

How can I work around this limitation in ThreadPoolExecutor where the queue needs to be bounded and full before more threads will be started.

我相信我终于通过 ThreadPoolExecutor 找到了一个有点优雅(可能有点老套)的解决方案来解决这个限制.它涉及扩展 LinkedBlockingQueue 以使其在已经有一些任务排队时为 queue.offer(...) 返回 false.如果当前线程跟不上排队的任务,TPE 将添加额外的线程.如果池已达到最大线程数,则将调用 RejectedExecutionHandler.然后是处理程序将 put(...) 放入队列中.

I believe I have finally found a somewhat elegant (maybe a little hacky) solution to this limitation with ThreadPoolExecutor. It involves extending LinkedBlockingQueue to have it return false for queue.offer(...) when there are already some tasks queued. If the current threads are not keeping up with the queued tasks, the TPE will add additional threads. If the pool is already at max threads, then the RejectedExecutionHandler will be called. It is the handler which then does the put(...) into the queue.

编写一个offer(...) 可以返回false 并且put() 永远不会阻塞的队列当然很奇怪黑客部分.但这与 TPE 对队列的使用效果很好,因此我认为这样做没有任何问题.

It certainly is strange to write a queue where offer(...) can return false and put() never blocks so that's the hack part. But this works well with TPE's usage of the queue so I don't see any problem with doing this.

代码如下:

// extend LinkedBlockingQueue to force offer() to return false conditionally BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() { private static final long serialVersionUID = -6903933921423432194L; @Override public boolean offer(Runnable e) { // Offer it to the queue if there is 0 items already queued, else // return false so the TPE will add another thread. If we return false // and max threads have been reached then the RejectedExecutionHandler // will be called which will do the put into the queue. if (size() == 0) { return super.offer(e); } else { return false; } } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*secs*/, TimeUnit.SECONDS, queue); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { // This does the actual put into the queue. Once the max threads // have been reached, the tasks will then queue up. executor.getQueue().put(r); // we do this after the put() to stop race conditions if (executor.isShutdown()) { throw new RejectedExecutionException( "Task " + r + " rejected from " + e); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } });

通过这种机制,当我向队列提交任务时,ThreadPoolExecutor 将:

With this mechanism, when I submit tasks to the queue, the ThreadPoolExecutor will:

  • 最初将线程数扩展到核心大小(此处为 1).
  • 将其提供给队列.如果队列为空,它将排队等待由现有线程处理.
  • 如果队列已经有 1 个或多个元素,offer(...) 将返回 false.
  • 如果返回 false,则增加池中线程的数量,直到达到最大数量(此处为 50).
  • 如果达到最大值,则调用 RejectedExecutionHandler
  • RejectedExecutionHandler 然后将任务放入队列,以按 FIFO 顺序由第一个可用线程处理.
  • Scale the number of threads up to the core size initially (here 1).
  • Offer it to the queue. If the queue is empty it will be queued to be handled by the existing threads.
  • If the queue has 1 or more elements already, the offer(...) will return false.
  • If false is returned, scale up the number of threads in the pool until they reach the max number (here 50).
  • If at the max then it calls the RejectedExecutionHandler
  • The RejectedExecutionHandler then puts the task into the queue to be processed by the first available thread in FIFO order.
  • 虽然在我上面的示例代码中,队列是无界的,但您也可以将其定义为有界队列.例如,如果您向 LinkedBlockingQueue 添加 1000 的容量,那么它将:

    Although in my example code above, the queue is unbounded, you could also define it as a bounded queue. For example, if you add a capacity of 1000 to the LinkedBlockingQueue then it will:

  • 将线程扩展到最大
  • 然后排队直到它有 1000 个任务
  • 然后阻塞调用者,直到队列有可用空间.
  • 此外,如果您需要在RejectedExecutionHandler 那么你可以使用 offer(E, long, TimeUnit) 方法代替 Long.MAX_VALUE 作为超时时间.

    Also, if you needed to use offer(...) in the RejectedExecutionHandler then you could use the offer(E, long, TimeUnit) method instead with Long.MAX_VALUE as the timeout.

    警告:

    如果您希望在执行程序关闭后 将任务添加到执行程序,那么您可能希望更聪明地将 RejectedExecutionException 从我们的自定义 中抛出RejectedExecutionHandler 当 executor-service 已关闭时.感谢@RaduToader 指出这一点.

    If you expect tasks to be added to the executor after it has been shutdown, then you may want to be smarter about throwing RejectedExecutionException out of our custom RejectedExecutionHandler when the executor-service has been shutdown. Thanks to @RaduToader for pointing this out.

    对此答案的另一个调整可能是询问 TPE 是否有空闲线程,并且只有在有空闲线程时才将项目入队.您必须为此创建一个真正的类并在其上添加 ourQueue.setThreadPoolExecutor(tpe); 方法.

    Another tweak to this answer could be to ask the TPE if there are idle threads and only enqueue the item if there is so. You would have to make a true class for this and add ourQueue.setThreadPoolExecutor(tpe); method on it.

    那么您的 offer(...) 方法可能类似于:

    Then your offer(...) method might look something like:

  • 检查是否 tpe.getPoolSize() == tpe.getMaximumPoolSize() 在这种情况下只需调用 super.offer(...).
  • 否则如果 tpe.getPoolSize() >tpe.getActiveCount() 然后调用 super.offer(...) 因为似乎有空闲线程.
  • 否则返回 false 以 fork 另一个线程.
  • Check to see if the tpe.getPoolSize() == tpe.getMaximumPoolSize() in which case just call super.offer(...).
  • Else if tpe.getPoolSize() > tpe.getActiveCount() then call super.offer(...) since there seem to be idle threads.
  • Otherwise return false to fork another thread.
  • 也许是这样:

    int poolSize = tpe.getPoolSize(); int maximumPoolSize = tpe.getMaximumPoolSize(); if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) { return super.offer(e); } else { return false; }

    请注意,TPE 上的 get 方法很昂贵,因为它们访问 volatile 字段或(在 getActiveCount() 的情况下)锁定 TPE 并遍历线程列表.此外,这里存在竞争条件,可能会导致任务未正确排队或在有空闲线程时分叉另一个线程.

    Note that the get methods on TPE are expensive since they access volatile fields or (in the case of getActiveCount()) lock the TPE and walk the thread-list. Also, there are race conditions here that may cause a task to be enqueued improperly or another thread forked when there was an idle thread.

    更多推荐

    如何在排队之前让 ThreadPoolExecutor 将线程增加到最大值?

    本文发布于:2023-10-28 08:30:30,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1536101.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:最大值   线程   如何在   增加到   ThreadPoolExecutor

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!