如何拒绝新任务,以便提交的任务不超过 N 个. N -是提交队列中允许的最大任务数,包括新的,正在运行的,暂停的(未完成)任务.
用例用户提交运行一段时间的计算任务.有时,有太多用户同时提交任务.如果已经提交了 N 个任务,如何拒绝新任务.
换句话说,已提交的总数(未完成,已开始或未开始的任务)不能大于 N .
示例代码此处是完整版本,下面是简短摘要. /p>
长期运行的任务. CalculationTask .
public class CalculationTask { public CalculationTask(final String name) { this.name = name; } public CalculationResult calculate() { final long waitTimeMs = MIN_WAIT_TIME_MS + RANDOM.nextInt(MAX_WAIT_TIME_MS); sleep(waitTimeMs); final int result = Math.abs(RANDOM.nextInt()); final String text = "This is result: " + result; final CalculationResult calculationResult = new CalculationResult(name, text, result); System.out.println("Calculation finished: " + calculationResult); return calculationResult; } }其结果. CalculationResult .
public class CalculationResult { private final String taskName; private final String text; private final Integer number; // Getters, setters, constructor, toString. }这是我提交工作的方式. CalculationBroker .
public class CalculationBroker { private static final int MAX_WORKERS_NUMBER = 5; private final ExecutorService executorService = Executors.newFixedThreadPool(MAX_WORKERS_NUMBER); private final Map<String, CalculationResult> calculationCache = new ConcurrentHashMap<>(); public CompletableFuture<CalculationResult> submit(final CalculationTask calculationTask) { final CalculationResult calculationResultCached = calculationCache.get(calculationTask.getName()); if (calculationResultCached != null) { return CompletableFuturepletedFuture(calculationResultCached); } System.out.println("Calculation submitted: " + calculationTask.getName()); final CompletableFuture<CalculationResult> calculated = CompletableFuture .supplyAsync(calculationTask::calculate, executorService); calculated.thenAccept(this::updateCache); return calculated; } private void updateCache(final CalculationResult calculationResult) { calculationCache.put(calculationResult.getTaskName(), calculationResult); } }这就是我一起运行它们的方式. 主要.
public class Main { public static void main(String[] args) { final int N_TASKS = 100; final CalculationBroker calculationBroker = new CalculationBroker(); final List<CompletableFuture<CalculationResult>> completableFutures = new ArrayList<>(); for (int i = 0; i < N_TASKS; i++) { final CalculationTask calculationTask = createCalculationTask(i); final CompletableFuture<CalculationResult> calculationResultCompletableFuture = calculationBroker.submit(calculationTask); completableFutures.add(calculationResultCompletableFuture); } calculationBroker.close(); } private static CalculationTask createCalculationTask(final int counter) { return new CalculationTask("CalculationTask_" + counter); } }这是输出.
2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_97. 2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_98. 2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_99. 2020-05-23 14:14:54 [pool-1-thread-3] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_2', text='This is result: 1081871544', number=1081871544, durationMs=1066} 2020-05-23 14:14:55 [pool-1-thread-1] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_0', text='This is result: 1942553785', number=1942553785, durationMs=1885} 2020-05-23 14:14:56 [pool-1-thread-5] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_4', text='This is result: 104326011', number=104326011, durationMs=2120} 20我的发现.
详细信息
上面的代码等效于Executors.newFixedThreadPool(n),但是,我们使用固定容量为100的ArrayBlockingQueue代替了默认的无限LinkedBlockingQueue,这意味着如果已将100个任务排队(并且正在执行n个),则将执行新任务被RejectedExecutionException拒绝.
ThreadPoolExecutor使用LinkedBlockingQueue,默认情况下不受限制.
如以上建议所述:
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100); executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue);解决方案
您回答了自己的问题...您可以使用Queue size来做到这一点.
int poolSize = ...; int queueSize = ...; CustomRejectedExecutionHandler handler = new CustomRejectedExecutionHandler(); ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(queueSize), handler);您可以使用CustomRejectedExecutionHandler处理拒绝的线程.
import java.util.concurrent.ThreadPoolExecutor; import org.apache.log4j.Logger; public class CustomRejectedExecutionHandler implements RejectedExecutionHandler { public static final Logger LOGGER = Logger.getLogger(CustomRejectedExecutionHandler.class); @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) { LOGGER.error(runnable.toString() + " execution rejected."); } }How to deal with back-pressure in Java using thread pool?
How to reject new tasks so there are no more than N submitted tasks. N - is the maximum number of allowed tasks in submission queue, which include new, running, paused (not finished) tasks.
Use caseUsers submit calculation tasks that run for some time. Sometimes, there are so many users submitting tasks at the same time. How to reject new tasks if there are already N tasks submitted.
In other words, the total number of submitted (not finished, started or not started) tasks cannot be greater than N.
Example codeHere is full version and bellow are short snippets.
A long running task. CalculationTask.
public class CalculationTask { public CalculationTask(final String name) { this.name = name; } public CalculationResult calculate() { final long waitTimeMs = MIN_WAIT_TIME_MS + RANDOM.nextInt(MAX_WAIT_TIME_MS); sleep(waitTimeMs); final int result = Math.abs(RANDOM.nextInt()); final String text = "This is result: " + result; final CalculationResult calculationResult = new CalculationResult(name, text, result); System.out.println("Calculation finished: " + calculationResult); return calculationResult; } }Its result. CalculationResult.
public class CalculationResult { private final String taskName; private final String text; private final Integer number; // Getters, setters, constructor, toString. }This is how I submit jobs. CalculationBroker.
public class CalculationBroker { private static final int MAX_WORKERS_NUMBER = 5; private final ExecutorService executorService = Executors.newFixedThreadPool(MAX_WORKERS_NUMBER); private final Map<String, CalculationResult> calculationCache = new ConcurrentHashMap<>(); public CompletableFuture<CalculationResult> submit(final CalculationTask calculationTask) { final CalculationResult calculationResultCached = calculationCache.get(calculationTask.getName()); if (calculationResultCached != null) { return CompletableFuturepletedFuture(calculationResultCached); } System.out.println("Calculation submitted: " + calculationTask.getName()); final CompletableFuture<CalculationResult> calculated = CompletableFuture .supplyAsync(calculationTask::calculate, executorService); calculated.thenAccept(this::updateCache); return calculated; } private void updateCache(final CalculationResult calculationResult) { calculationCache.put(calculationResult.getTaskName(), calculationResult); } }And this is how I run them together. Main.
public class Main { public static void main(String[] args) { final int N_TASKS = 100; final CalculationBroker calculationBroker = new CalculationBroker(); final List<CompletableFuture<CalculationResult>> completableFutures = new ArrayList<>(); for (int i = 0; i < N_TASKS; i++) { final CalculationTask calculationTask = createCalculationTask(i); final CompletableFuture<CalculationResult> calculationResultCompletableFuture = calculationBroker.submit(calculationTask); completableFutures.add(calculationResultCompletableFuture); } calculationBroker.close(); } private static CalculationTask createCalculationTask(final int counter) { return new CalculationTask("CalculationTask_" + counter); } }This is output.
2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_97. 2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_98. 2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_99. 2020-05-23 14:14:54 [pool-1-thread-3] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_2', text='This is result: 1081871544', number=1081871544, durationMs=1066} 2020-05-23 14:14:55 [pool-1-thread-1] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_0', text='This is result: 1942553785', number=1942553785, durationMs=1885} 2020-05-23 14:14:56 [pool-1-thread-5] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_4', text='This is result: 104326011', number=104326011, durationMs=2120} 20My findings.
Bellow details
Code above is equivalent to Executors.newFixedThreadPool(n), however instead of default unlimited LinkedBlockingQueue we use ArrayBlockingQueue with fixed capacity of 100. This means that if 100 tasks are already queued (and n being executed), new task will be rejected with RejectedExecutionException.
ThreadPoolExecutor uses a LinkedBlockingQueue, which is unlimited by default.
As the post above sugessts:
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100); executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue);解决方案
You answered your own question ... you could use Queue size to do that..
int poolSize = ...; int queueSize = ...; CustomRejectedExecutionHandler handler = new CustomRejectedExecutionHandler(); ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(queueSize), handler);You could use CustomRejectedExecutionHandler to handle rejected threads.
import java.util.concurrent.ThreadPoolExecutor; import org.apache.log4j.Logger; public class CustomRejectedExecutionHandler implements RejectedExecutionHandler { public static final Logger LOGGER = Logger.getLogger(CustomRejectedExecutionHandler.class); @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) { LOGGER.error(runnable.toString() + " execution rejected."); } }
更多推荐
处理FixedThreadPool中的背压
发布评论