JAVA并发编程实战

编程入门 行业动态 更新时间:2024-10-09 13:24:29

JAVA并发编程<a href=https://www.elefans.com/category/jswz/34/1769775.html style=实战"/>

JAVA并发编程实战

目录

  • 前言
  • 思维导图
  • 1 任务取消
    • 1.1 中断
    • 1.2 中断策略
    • 1.3 响应中断
    • 1.4 示例:计时运行
    • 1.5 通过Future取消
    • 1.6 处理不可中断阻塞
    • 1.7 通过newTaskFor封装非标准取消
  • 2 停止基于线程的服务
    • 2.1 日志服务
    • 2.2 关闭ExecutorService
    • 2.3 致命药丸
    • 2.4 示例-只执行一次的任务
    • 2.5 shutdownNow局限性
  • 3 处理反常的线程终止
    • 3.1 未捕获异常的处理
  • 4 JVM关闭
    • 4.1 关闭钩子
  • 参考文献

前言

有时候我们希望任务或者线程自然结束前就停止它们,可能用户点击取消或者应用程序需要关闭。Java中提供了中断使一个线程可以要求另一个线程停止当前工作。

思维导图

1 任务取消

可取消的活动:当外部代码能够在活动自然完成前,将其状态改为完成状态。

可能有多种原因导致活动关闭如:

  • 用户请求的取消。用户点击取消按钮。
  • 限时活动。设置超时器,如果超出停止活动。
  • 应用程序事件,任务分解搜索,一个任务搜索到,停止其它任务。
  • 错误。当多个任务保存页面,一个遇到磁盘满,其它取消。
  • 关闭。当一个程序关闭,需要关闭正在处理的和还没有开始的任务。

在协作机制中有一种会设置标志位,任务定期检查,如果被设置,任务停止。如下demo:

public class PrimerGeneratorExample implements Runnable {/*** 取消标志位*/private volatile boolean cancelled;/*** 存储结果*/private final List<BigInteger> primers = new ArrayList<>();/*** 取消方法*/public void cancel() {this.cancelled = true;}/*** 执行方法*/@Overridepublic void run() {BigInteger one = BigInteger.ONE;while (!cancelled) {BigInteger nowPrimer = one.nextProbablePrime();synchronized (this) {nowPrimer = nowPrimer.nextProbablePrime();primers.add(nowPrimer);}}}public List<BigInteger> aSecondOfPrimer() throws InterruptedException {PrimerGeneratorExample primerGeneratorExample = new PrimerGeneratorExample();new Thread(primerGeneratorExample).start();try {Thread.sleep(1000);} finally {primerGeneratorExample.cancel();}return primerGeneratorExample.getPrimers();}public synchronized List<BigInteger> getPrimers() {return new ArrayList<>(primers);}
}

aSecondOfPrimer展示了使用这个任务的例子,让任务执行一秒,然后停止。

一个可取消的任务必须拥有取消策略。

1.1 中断

上述的例子,会导致任务退出,但是如果一个任务调用了一个阻塞方法,则任务可能永远不会检查标志位,导致永远不会终结。
这时我们需要另一种方式-中断来处理。

每个线程都有一个boolean类型的中断标志,中断时候会被设置为true。
Thread中中断相关主要有下面几个方法:
  interrupt():设置中断状态为true。
  interrupted():获取线程当前中断状态,并清除状态。
  isInterrupted():获取当前线程是否被中断。

当调用interrupt并不意味者线程必须停止工作,只是向线程传递了一个中断信号,具体处理由线程进行处理。
中断通常是实现取消最好的选择。

我们可以使用中断来处理上述素数查找任务,demo如下:

public class PrimerGeneratorInterruptedExample implements Runnable{/*** 阻塞队列存储结果*/private final BlockingQueue<BigInteger> primers;public PrimerGeneratorInterruptedExample(BlockingQueue<BigInteger> primers) {this.primers = primers;}@Overridepublic void run() {try {BigInteger primer = BigInteger.ONE;while (!Thread.currentThread().isInterrupted()) {primers.put(primer = primer.nextProbablePrime());}} catch (InterruptedException e) {//收到中断信息,允许线程退出}}/*** 调用中断方法进行中断*/public void cancel() {Thread.currentThread().interrupt();}
}

上述示例就通过显式检查中断和捕获中断异常两种方法处理了当我们遇到外部关闭任务,如何正确处理我们的任务取消。

1.2 中断策略

正如需要对任务制定取消策略,线程也应有中断策略。

如果对中断的处理不仅仅是把中断异常传递给调用者,那我们应该在捕获中断异常后,恢复中断状态:
  Thread.currentThread().interrupt();

因为每个线程都有自己的中断策略,所以你不应该中断线程,应该交由线程所有者进行处理。

1.3 响应中断

处理InterruptedException,有两种实用策略:

  • 传递异常,使你的方法可以成为阻塞方法。
  • 保存中断状态,上层调用者对其进行处理。

如下所示,我们此时只是传递异常。

private final BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(100);/*** take作为阻塞方法,可以响应中断,此时策略为,传递异常,交给调用者* @return 结果* @throws InterruptedException 中断异常*/public Integer task() throws InterruptedException {return blockingQueue.take();}

有些活动不支持取消,却仍可能调用可阻塞的阻塞方法如下demo:

/*** 任务不可取消,可以设置标志为作为是否中断的标志,在任务结束时,设置线程中断状态* @return*/public Integer getTask() {boolean interrupted = false;try {while (true) {try {return blockingQueue.take();} catch (InterruptedException e) {//设置中断标志interrupted = true;}}} finally {if (interrupted) {Thread.currentThread().interrupt();}}}

只有实现了线程中断策略的代码才可以接收中断请求。通用目的的任务和库的代码不应该接收中断请求。

1.4 示例:计时运行

如下demo是一个给定一段时间运行runnable的示例:

public class TimeRunOuter {private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(4);private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(4, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));/*** 使用一个定时取消任务,但是由于不清楚任务的中断策略,如果任务不响应中断,则不会返回,直到任务结束* @param runnable 任务* @param timeout 延迟时间* @param timeUnit 单位*/public static void timeRun(Runnable runnable, long timeout, TimeUnit timeUnit) {Thread currentThread = Thread.currentThread();SCHEDULED_EXECUTOR_SERVICE.schedule(currentThread::interrupt, timeout, timeUnit);//如果任务不响应中断,即使设置了中断位,线程也不会退出。runnable.run();}}

该方法在任务外部线程设置了一个取消任务线程,但是这种方式可能有如下问题:

  • 如果任务在时限内完成,这个取消线程也就可能在任务结束返回调用者之后调用,可能发生意想不到的问题。
  • 如果runnable不响应中断,那么该任务将不会返回,直至任务结束。

如下使用join是一种解决可能出现问题的方法:

/*** 任务中有自己的中断逻辑,同时使用join方法,可以解决上述版本中出现的问题* 依赖join方法* @param runnable 任务* @param timeout 延迟时间* @param timeUnit 单位* @throws Throwable 抛出的异常*/public static void timeRunImprove(Runnable runnable, long timeout, TimeUnit timeUnit) throws Throwable, InterruptedException {class RethrowTask implements Runnable {private volatile Throwable throwable;@Overridepublic void run() {try {runnable.run();} catch (Throwable t) {this.throwable = t;}}void rethrow () throws Throwable {if (throwable != null) {throw throwable;}}}RethrowTask task;Thread thread = new Thread(task = new RethrowTask());thread.start();SCHEDULED_EXECUTOR_SERVICE.schedule(thread::interrupt, timeout, timeUnit);//通过join在指定时间内等待任务执行情况,并处理后续的线程throwable如果有的话。thread.join(timeout);task.rethrow();}

该种方法由于任务中封装了中断策略,同时join方法设置了超时时间,timeRunImprove也可以结束,并获取任务线程的异常信息。

该方法依赖join:我们不知道控制权返回是任务正常结束还是join超时。

1.5 通过Future取消

Future可以管理任务生命周期,处理异常,有利于取消。

以下demo是上述计时任务的Future版:

/*** 利用future来取消任务。* @param runnable 任务* @param timeout 超时时间* @param timeUnit 时间单位*/public static void timeRunFuture(Runnable runnable, long timeout, TimeUnit timeUnit) {Future<?> future = EXECUTOR_SERVICE.submit(runnable);try {Object o = future.get(timeout, timeUnit);} catch (ExecutionException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();//任务被取消} finally {future.cancel(true);}}

ExecutorService创建线程,封装了中断策略,可以通过Future来中断任务。

1.6 处理不可中断阻塞

很多阻塞方法可以通过抛出中断异常实现中断响应。但是并不是所有阻塞方法都响应中断如下:

  • java.io中同步Socket IO。
  • java.nio中同步IO。
  • Selector的异步IO。
  • 获得锁。

下面的demo展示了一种封装非标准取消的技术:

public class ReaderThread extends Thread{private final Socket socket;private final InputStream inputStream;public ReaderThread(Socket socket) throws IOException {this.socket = socket;this.inputStream = socket.getInputStream();}@Overridepublic void run() {try {byte[] buf = new byte[1024];while (true) {int count = inputStream.read(buf);if (count < 0) break;else if (count > 0) {processBuf(buf, count);}}} catch (IOException e) {e.printStackTrace();}}private void processBuf(byte[] buf, int count) {System.out.println("process buf");}/*** 通过重写封装非标准的中断*/@Overridepublic void interrupt() {try {socket.close();} catch (IOException e) {e.printStackTrace();} finally {super.interrupt();}}
}

上述通过重写interrupt方法,使得可以响应中断和处理socket关闭。

1.7 通过newTaskFor封装非标准取消

通过利用ThreadPoolExecutor添加的钩子函数newTaskFor封装我们的任务,实现非标准取消。

示例demo结构图如下:

可取消任务接口定义:

public interface CancelCallable<T> extends Callable<T> {void cancel();RunnableFuture<T> newTask();
}

添加取消方法和封装任务方法

可处理新定义接口的线程池

public class CancelExecutor extends ThreadPoolExecutor {public CancelExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}@Overrideprotected<T> RunnableFuture<T> newTaskFor(Callable<T> callable) {if (callable instanceof CancelCallable) {return ((CancelCallable) callable).newTask();} else {return super.newTaskFor(callable);}}
}

自定义线程池,添加改写钩子函数支持我们自定义的任务。

任务实现

public abstract class SocketUsingTask<T> implements CancelCallable<T> {private Socket socket;protected synchronized void setSocket(Socket socket) {this.socket = socket;}@Overridepublic synchronized void cancel() {try {if (socket != null) {socket.close();}} catch (IOException e) {e.printStackTrace();}}@Overridepublic RunnableFuture<T> newTask() {return new FutureTask<T>(this) {@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {SocketUsingTask.this.cancel();return super.cancel(mayInterruptIfRunning);}};}
}

任务抽象类,支持封装非标准的取消。

2 停止基于线程的服务

对于线程持有的服务,只要服务的存在时间大于创建线程的方法存在的时间,就应该提供生命周期管理。

2.1 日志服务

下面是一个简单的日志服务demo:

/*** 不支持关闭的日志服务*/
public class LogWriter {private final BlockingQueue<String> blockingQueue;private final LogThread logThread;public LogWriter(BlockingQueue<String> blockingQueue, LogThread logThread) {this.blockingQueue = blockingQueue;this.logThread = logThread;}/*** 启动日志打印服务*/public void start() {logThread.start();}/*** 生产者——放入队列中* @param msg* @throws InterruptedException*/public void log(String msg) throws InterruptedException {blockingQueue.put(msg);}/*** 消费者——打印日志*/private class LogThread extends Thread {private final PrintWriter printWriter;public LogThread(PrintWriter printWriter) {this.printWriter = printWriter;}@Overridepublic void run() {try {while (true) {printWriter.println(blockingQueue.take());}} catch (InterruptedException e) {e.printStackTrace();} finally {printWriter.close();}}}
}

可以看出这是一个生产者-消费者模型,写日志交给日志线程。

该方法不支持关闭服务

为了支持关闭日志服务,对代码做如下改变:

/*** 支持可停止的日志服务,对涉及到的竞争条件做了处理*/
public class CanCancelledLogService {private final LogThread logThread;private final PrintWriter printWriter;private final BlockingQueue<String> blockingQueue;/*** 是否取消标志*/private volatile boolean cancelled;/*** 消息条数*/private int reservations;public CanCancelledLogService(LogThread logThread, PrintWriter printWriter, BlockingQueue<String> blockingQueue) {this.logThread = logThread;this.printWriter = printWriter;this.blockingQueue = blockingQueue;}public void stop() {cancelled = true;logThread.interrupt();}public void start() {logThread.start();}public void log(String msg) throws InterruptedException {synchronized (this) {if (cancelled) {throw new IllegalStateException();}reservations++;}blockingQueue.put(msg);}private class LogThread extends Thread {@Overridepublic void run() {try {while (true) {try {synchronized (CanCancelledLogService.this) {if (cancelled && reservations == 0) {break;}}String msg = blockingQueue.take();synchronized (CanCancelledLogService.this) {reservations--;}printWriter.write(msg);} catch (InterruptedException e) {//重试}}} finally {printWriter.close();}}}
}

上述改进添加了关闭标志和消息数记录,从而实现了在关闭时,可以消费完剩余的消息日志。

由于上述改变涉及到竞争条件,比如reservations的修改,因此需要通过加锁解决。

2.2 关闭ExecutorService

ExecutorService提供了两种关闭方法:

  • shutdown:优雅关闭,不再接受任务,执行完剩余任务。
  • shutdownNow:强制关闭,不再执行任务,返回还未执行的任务清单。

这两种方式在安全性和响应性中做了权衡。

如下使用ExecutorService日志服务的示例:

public class LogServiceUsingExecutorService {private final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(100));private final PrintWriter printWriter;public LogServiceUsingExecutorService(PrintWriter printWriter) {this.printWriter = printWriter;}/*** 利用ExecutorService提供自己的生命周期服务* @throws InterruptedException 中断异常*/public void stop() throws InterruptedException {try {executorService.shutdown();executorService.awaitTermination(10L, TimeUnit.SECONDS);} finally {printWriter.close();}}public void log(String msg) {try {executorService.execute(() -> {printWriter.println(msg);//做日志相关功能});} catch (RejectedExecutionException rejectedExecutionException) {}}
}

通过利用ExecutorService的shutdown关闭服务。

2.3 致命药丸

这是另一种关闭生产者和消费者服务的方法。

致命药丸:一个可以识别的对象,置于队列中,得到它意味者需要停止工作。

如下是使用致命药丸关闭服务示例:

/*** 使用致命药丸停止生产者和消费者服务*/
public class IndexingService {private static final File POISON = new File("");private final BlockingQueue<File> blockingQueue;private final File root;private final IndexThread indexThread;private final CrawlerThread crawlerThread;public IndexingService(BlockingQueue<File> blockingQueue, File root, IndexThread indexThread, CrawlerThread crawlerThread) {this.blockingQueue = blockingQueue;this.root = root;this.indexThread = indexThread;this.crawlerThread = crawlerThread;}/*** 启动服务线程*/public void start() {crawlerThread.start();indexThread.start();}/*** 停止添加file,线程收到了执行添加药丸POISON*/public void stop() {crawlerThread.interrupt();}/*** 等待消费者停止,不管正常或者异常。* @throws InterruptedException 中断异常*/public void awaitIndexService() throws InterruptedException {indexThread.join();}private class IndexThread extends Thread {@Overridepublic void run() {try {while (true) {File fi = blockingQueue.take();//如果是药丸,停止检索if (fi == POISON) {break;}indexFile(fi);}} catch (InterruptedException e) {//处理过程遇到中断}}private void indexFile(File fi) {}}private class CrawlerThread extends Thread {@Overridepublic void run() {try {crawlerRoot(root);} catch (InterruptedException interruptedException) {//失败了添加POISON,执行finally} finally {//收到中断,执行添加药丸,需要进行重试,因为可能遇到队列满了,put阻塞,丹斯线程被中断情况,防止添加失败。while (true) {try {blockingQueue.put(POISON);break;} catch (InterruptedException e) {//如果添加中遇到阻塞,且被中断,则应该尝试重试,否则无法添加POISON}}}}private void crawlerRoot(File root) throws InterruptedException {}}
}

当我们需要停止服务,通过中断通知任务线程,任务线程捕获后添加致命药丸。
消费者取出的如果是致命药丸,则意味着需要停止任务。

适用于生产者和消费者数量已知,如果生产者和消费者数量过大,很难处理。

2.4 示例-只执行一次的任务

当一个方法需要处理一批任务,并且所有任务结束前不会返回。

可以使用私有的ExecutorService,将其周期限制在一次方法调用中。

如检索一个主机集合是否收到邮件的服务demo:

/***执行一次的服务*/
public class CheckMailExample {/*** 检查主机群是否有邮件服务,返回检测结果* @param hosts 主机set* @param timeout 设置超时时间* @param timeUnit 时间单位* @return 检测list* @throws InterruptedException 中断异常*/public List<Pair<String, Boolean>> checkMails(Set<String> hosts, long timeout, TimeUnit timeUnit) throws InterruptedException {ExecutorService executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());final List<Pair<String, Boolean>> pairList = new ArrayList<>();AtomicReference<List<Pair<String, Boolean>>>  checkResults = new AtomicReference<>(pairList);try {for (String host : hosts) {executorService.execute(() -> {if (checkMail(host)) {checkResults.get().add(new Pair<>(host, true));}});}} finally {executorService.shutdown();executorService.awaitTermination(timeout, timeUnit);}return checkResults.get();}private boolean checkMail(String host) {return false;}
}

这里使用了SynchronousQueue,因此当检查完一个主机后才能执行下一个检查任务。
在所有主机检查结束后,调用shutdown关闭服务。

2.5 shutdownNow局限性

我们知道shutdownNow是强制关闭,只会返回未执行的任务,不会返回正在执行未结束的任务。

我们需要在ExecutorService关闭之后,获取所有被取消的任务。下面demo展示了如何通过封装任务策略,获取所有未完成的任务:

/*** 记录强制关闭线程池时,有哪些正在执行的任务被取消*/
public class TrackingExecutor extends AbstractExecutorService {/*** 委托ExecutorService处理请求*/private final ExecutorService executorService;/*** 线程池停止时,记录的被取消任务集合,线程安全的。*/private final Set<Runnable> tasksShutdownAtCancelled = Collections.synchronizedSet(new HashSet<>());public TrackingExecutor(ExecutorService executorService) {this.executorService = executorService;}public List<Runnable> getTasksShutdownAtCancelled() {if (!executorService.isTerminated()) {throw new IllegalStateException();}return new ArrayList<>(tasksShutdownAtCancelled);}@Overridepublic void shutdown() {executorService.shutdown();}@Overridepublic List<Runnable> shutdownNow() {return executorService.shutdownNow();}@Overridepublic boolean isShutdown() {return executorService.isShutdown();}@Overridepublic boolean isTerminated() {return executorService.isTerminated();}@Overridepublic boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {return executorService.awaitTermination(timeout, unit);}@Overridepublic void execute(Runnable command) {executorService.execute(() -> {try {command.run();} finally {//任务执行结束不管是否异常,判断线程池状态为关闭,并且当前执行线程是被中断的,则该任务需要添加//可能出现假阳性(任务执行结束,恰好线程池关闭,可能添加完成的任务),如果任务是幂等的没什么问题,否则需要注意if (executorService.isShutdown() && Thread.currentThread().isInterrupted()) {tasksShutdownAtCancelled.add(command);}}});}
}

上述主要是通过判断线程池是否关闭并且真正执行任务的线程是否被中断来决定任务是否需要加入。

上述的代码可能出现假阳性。

3 处理反常的线程终止

导致线程死亡的最主要原因是RuntimeException.

当我们调用任务时,应该对该任务是否有未检异常保持怀疑。因此我们应该在构建工作者线程时对其进行捕获。一个典型的构建工作者线程如下(主动解决未受检异常):

/*** 处理反常的异常处理*/
public class WorkThreadBuild extends Thread{@Overridepublic void run() {Throwable throwable = null;try {while (!isInterrupted()) {runTask(getTaskFromWorkQueue());}} catch (Throwable throwable1) {throwable = throwable1;} finally {//通知框架处理异常notifyExecutorService(Thread.currentThread(), throwable);}}private void notifyExecutorService(Thread thread, Throwable throwable) {}private void runTask(Object taskFromWorkQueue) {}private Object getTaskFromWorkQueue() {return null;}
}

也就是我们处理任务时,如果存在未受检异常,应该捕获,并通知线程框架,由框架决定如何处理。

3.1 未捕获异常的处理

对于未捕获异常,线程提供了一个接口UncaughtExceptionHandler进行处理:

@FunctionalInterfacepublic interface UncaughtExceptionHandler {/*** Method invoked when the given thread terminates due to the* given uncaught exception.* <p>Any exception thrown by this method will be ignored by the* Java Virtual Machine.* @param t the thread* @param e the exception*/void uncaughtException(Thread t, Throwable e);}

当一个线程因为未捕获异常而退出时,会通过它的UncaughtExceptionHandler进行处理。如果处理器不存在,则默认通过System.err打印栈追踪信息。ThreadGroup源码如下:

如下是一个演示未捕获异常写入日志的demo:

public class UnCatchExceptionLog implements Thread.UncaughtExceptionHandler {/*** 定义线程死亡打印日志方法* @param t 未捕获异常线程* @param e 抛出的错误*/@Overridepublic void uncaughtException(Thread t, Throwable e) {Logger anonymousLogger = Logger.getAnonymousLogger();anonymousLogger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);}public static void main(String[] args) {UnCatchExceptionLog unCatchExceptionLog = new UnCatchExceptionLog();ExecutorService executorService = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());executorService.execute(() -> {//直接在任务执行线程中设置未捕获异常处理器,这样当出现未捕获异常,就可以进行处理Thread.currentThread().setUncaughtExceptionHandler(unCatchExceptionLog);int number  = 10 / 0;});}
}

我们首先定义了UncaughtExceptionHandler实现类,之后通过在执行任务run方法中设置线程的未捕获异常处理器,这样就可以在出现未捕获异常时写入日志。
运行结果:

在一个长时间运行的服务中,所有的线程都应该设置一个未捕获异常处理器,至少应该将异常信息记录到日志。

4 JVM关闭

jvm可以正常关闭,也可以强行关闭。

4.1 关闭钩子

在正常关闭中jvm启动所有注册的钩子。钩子可以通过如下方法添加:Runtime.getRuntime().addShutdownHook

示例:

Runtime.getRuntime().addShutdownHook(new Thread(() -> {System.out.println("jvm关闭---钩子函数执行");}));

执行结果:

由于钩子是并发的,所以我们最好使用一个钩子关闭所有服务。

参考文献

[1]. 《JAVA并发编程实战》.

更多推荐

JAVA并发编程实战

本文发布于:2024-03-06 21:02:21,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1716366.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:实战   JAVA

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!