处理FixedThreadPool中的背压

编程入门 行业动态 更新时间:2024-10-26 22:27:54
本文介绍了处理FixedThreadPool中的背压的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述 如何使用线程池处理Java中的背压?

如何拒绝新任务,以便提交的任务不超过 N 个. N -是提交队列中允许的最大任务数,包括新的,正在运行的,暂停的(未完成)任务.

用例

用户提交运行一段时间的计算任务.有时,有太多用户同时提交任务.如果已经提交了 N 个任务,如何拒绝新任务.

换句话说,已提交的总数(未完成,已开始或未开始的任务)不能大于 N .

示例代码

此处是完整版本,下面是简短摘要. /p>

长期运行的任务. CalculationTask .

public class CalculationTask { public CalculationTask(final String name) { this.name = name; } public CalculationResult calculate() { final long waitTimeMs = MIN_WAIT_TIME_MS + RANDOM.nextInt(MAX_WAIT_TIME_MS); sleep(waitTimeMs); final int result = Math.abs(RANDOM.nextInt()); final String text = "This is result: " + result; final CalculationResult calculationResult = new CalculationResult(name, text, result); System.out.println("Calculation finished: " + calculationResult); return calculationResult; } }

其结果. CalculationResult .

public class CalculationResult { private final String taskName; private final String text; private final Integer number; // Getters, setters, constructor, toString. }

这是我提交工作的方式. CalculationBroker .

public class CalculationBroker { private static final int MAX_WORKERS_NUMBER = 5; private final ExecutorService executorService = Executors.newFixedThreadPool(MAX_WORKERS_NUMBER); private final Map<String, CalculationResult> calculationCache = new ConcurrentHashMap<>(); public CompletableFuture<CalculationResult> submit(final CalculationTask calculationTask) { final CalculationResult calculationResultCached = calculationCache.get(calculationTask.getName()); if (calculationResultCached != null) { return CompletableFuturepletedFuture(calculationResultCached); } System.out.println("Calculation submitted: " + calculationTask.getName()); final CompletableFuture<CalculationResult> calculated = CompletableFuture .supplyAsync(calculationTask::calculate, executorService); calculated.thenAccept(this::updateCache); return calculated; } private void updateCache(final CalculationResult calculationResult) { calculationCache.put(calculationResult.getTaskName(), calculationResult); } }

这就是我一起运行它们的方式. 主要.

public class Main { public static void main(String[] args) { final int N_TASKS = 100; final CalculationBroker calculationBroker = new CalculationBroker(); final List<CompletableFuture<CalculationResult>> completableFutures = new ArrayList<>(); for (int i = 0; i < N_TASKS; i++) { final CalculationTask calculationTask = createCalculationTask(i); final CompletableFuture<CalculationResult> calculationResultCompletableFuture = calculationBroker.submit(calculationTask); completableFutures.add(calculationResultCompletableFuture); } calculationBroker.close(); } private static CalculationTask createCalculationTask(final int counter) { return new CalculationTask("CalculationTask_" + counter); } }

这是输出.

2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_97. 2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_98. 2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_99. 2020-05-23 14:14:54 [pool-1-thread-3] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_2', text='This is result: 1081871544', number=1081871544, durationMs=1066} 2020-05-23 14:14:55 [pool-1-thread-1] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_0', text='This is result: 1942553785', number=1942553785, durationMs=1885} 2020-05-23 14:14:56 [pool-1-thread-5] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_4', text='This is result: 104326011', number=104326011, durationMs=2120} 20

我的发现.

详细信息

上面的代码等效于Executors.newFixedThreadPool(n),但是,我们使用固定容量为100的ArrayBlockingQueue代替了默认的无限LinkedBlockingQueue,这意味着如果已将100个任务排队(并且正在执行n个),则将执行新任务被RejectedExecutionException拒绝.

ThreadPoolExecutor使用LinkedBlockingQueue,默认情况下不受限制.

如以上建议所述:

final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100); executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue);

解决方案

您回答了自己的问题...您可以使用Queue size来做到这一点.

int poolSize = ...; int queueSize = ...; CustomRejectedExecutionHandler handler = new CustomRejectedExecutionHandler(); ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(queueSize), handler);

您可以使用CustomRejectedExecutionHandler处理拒绝的线程.

import java.util.concurrent.ThreadPoolExecutor; import org.apache.log4j.Logger; public class CustomRejectedExecutionHandler implements RejectedExecutionHandler { public static final Logger LOGGER = Logger.getLogger(CustomRejectedExecutionHandler.class); @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) { LOGGER.error(runnable.toString() + " execution rejected."); } }

How to deal with back-pressure in Java using thread pool?

How to reject new tasks so there are no more than N submitted tasks. N - is the maximum number of allowed tasks in submission queue, which include new, running, paused (not finished) tasks.

Use case

Users submit calculation tasks that run for some time. Sometimes, there are so many users submitting tasks at the same time. How to reject new tasks if there are already N tasks submitted.

In other words, the total number of submitted (not finished, started or not started) tasks cannot be greater than N.

Example code

Here is full version and bellow are short snippets.

A long running task. CalculationTask.

public class CalculationTask { public CalculationTask(final String name) { this.name = name; } public CalculationResult calculate() { final long waitTimeMs = MIN_WAIT_TIME_MS + RANDOM.nextInt(MAX_WAIT_TIME_MS); sleep(waitTimeMs); final int result = Math.abs(RANDOM.nextInt()); final String text = "This is result: " + result; final CalculationResult calculationResult = new CalculationResult(name, text, result); System.out.println("Calculation finished: " + calculationResult); return calculationResult; } }

Its result. CalculationResult.

public class CalculationResult { private final String taskName; private final String text; private final Integer number; // Getters, setters, constructor, toString. }

This is how I submit jobs. CalculationBroker.

public class CalculationBroker { private static final int MAX_WORKERS_NUMBER = 5; private final ExecutorService executorService = Executors.newFixedThreadPool(MAX_WORKERS_NUMBER); private final Map<String, CalculationResult> calculationCache = new ConcurrentHashMap<>(); public CompletableFuture<CalculationResult> submit(final CalculationTask calculationTask) { final CalculationResult calculationResultCached = calculationCache.get(calculationTask.getName()); if (calculationResultCached != null) { return CompletableFuturepletedFuture(calculationResultCached); } System.out.println("Calculation submitted: " + calculationTask.getName()); final CompletableFuture<CalculationResult> calculated = CompletableFuture .supplyAsync(calculationTask::calculate, executorService); calculated.thenAccept(this::updateCache); return calculated; } private void updateCache(final CalculationResult calculationResult) { calculationCache.put(calculationResult.getTaskName(), calculationResult); } }

And this is how I run them together. Main.

public class Main { public static void main(String[] args) { final int N_TASKS = 100; final CalculationBroker calculationBroker = new CalculationBroker(); final List<CompletableFuture<CalculationResult>> completableFutures = new ArrayList<>(); for (int i = 0; i < N_TASKS; i++) { final CalculationTask calculationTask = createCalculationTask(i); final CompletableFuture<CalculationResult> calculationResultCompletableFuture = calculationBroker.submit(calculationTask); completableFutures.add(calculationResultCompletableFuture); } calculationBroker.close(); } private static CalculationTask createCalculationTask(final int counter) { return new CalculationTask("CalculationTask_" + counter); } }

This is output.

2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_97. 2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_98. 2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_99. 2020-05-23 14:14:54 [pool-1-thread-3] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_2', text='This is result: 1081871544', number=1081871544, durationMs=1066} 2020-05-23 14:14:55 [pool-1-thread-1] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_0', text='This is result: 1942553785', number=1942553785, durationMs=1885} 2020-05-23 14:14:56 [pool-1-thread-5] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_4', text='This is result: 104326011', number=104326011, durationMs=2120} 20

My findings.

Bellow details

Code above is equivalent to Executors.newFixedThreadPool(n), however instead of default unlimited LinkedBlockingQueue we use ArrayBlockingQueue with fixed capacity of 100. This means that if 100 tasks are already queued (and n being executed), new task will be rejected with RejectedExecutionException.

ThreadPoolExecutor uses a LinkedBlockingQueue, which is unlimited by default.

As the post above sugessts:

final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100); executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue);

解决方案

You answered your own question ... you could use Queue size to do that..

int poolSize = ...; int queueSize = ...; CustomRejectedExecutionHandler handler = new CustomRejectedExecutionHandler(); ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(queueSize), handler);

You could use CustomRejectedExecutionHandler to handle rejected threads.

import java.util.concurrent.ThreadPoolExecutor; import org.apache.log4j.Logger; public class CustomRejectedExecutionHandler implements RejectedExecutionHandler { public static final Logger LOGGER = Logger.getLogger(CustomRejectedExecutionHandler.class); @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) { LOGGER.error(runnable.toString() + " execution rejected."); } }

更多推荐

处理FixedThreadPool中的背压

本文发布于:2023-11-25 12:19:11,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1629721.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:FixedThreadPool

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!