ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

【并发】ExecutorCompletionService使用

2022-07-05 10:00:43  阅读:180  来源: 互联网

标签:task executor 并发 任务 线程 使用 null completionQueue ExecutorCompletionService


介绍

ExecutorCompletionService是线程池和队列的配合使用,可以将线程池执行完成的结果存入队列当中,通过take或者poll方法获取执行完成的结果

遇到的场景

举例需求是将Excel表中的学生信息导入到数据库当中,在将Excel数据读取出来之后,需要对某些列的数据进行计算处理,在将表数据全部读取出来之后,遍历每一行数据,提交给线程池执行,执行完成的任务将返回结果存入ExecutorCompletionService默认的BlockingQueue队列当中,通过take方法拿到我们需要的数据,再做数据批量插入

源码分析

一、属性

// 线程池
private final Executor executor;
// 将Runnable任务和Callable任务包装成FutureTask任务
private final AbstractExecutorService aes;
// 用于存放任务执行完成后的结果,如果任务执行失败,不会存入
private final BlockingQueue<Future<V>> completionQueue;

二、构造方法

ExecutorCompletionService提供了两个构造方法,必传参数是线程池,可选是否传入用于存放结果的队列,不传入默认使用BlockingQueue

public ExecutorCompletionService(Executor executor) {
	// 未传入线程池,抛出空指针
	if (executor == null)
	throw new NullPointerException();
	// 赋值
	this.executor = executor;
	// 如果线程池对象是AbstractExecutorService的子类,则给ase赋值
	this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;
	// 未传入队列,初始化BlockingQueue
	this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue) {
	// 同上
	if (executor == null || completionQueue == null)
		throw new NullPointerException();
	this.executor = executor;
	this.aes = (executor instanceof AbstractExecutorService) ?
		(AbstractExecutorService) executor : null;
	this.completionQueue = completionQueue;
}

三、内部类

/*
 * QueueingFuture是FutureTask的子类,重写了done方法。
 * FutureTask中的done方法是个空方法,模板方法,可以进行重写
 */
private class QueueingFuture extends FutureTask<Void> {
	QueueingFuture(RunnableFuture<V> task) {
		super(task, null);
		this.task = task;
	}
	// 任务执行完成执行该方法,将执行完成的任务添加到队列当中
	protected void done() { completionQueue.add(task); }
	private final Future<V> task;
}

四、任务的封装

// 将传入的Callable任务封装成RunnableFuture任务
private RunnableFuture<V> newTaskFor(Callable<V> task) {
	if (aes == null)
		// 使用构造方法创建
		return new FutureTask<V>(task);
	else
		// aes的newTaskFor方法内部使用的也是构造方法创建
		return aes.newTaskFor(task);
}
// 将传入的Runnable任务封装成RunnableFuture任务
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
	if (aes == null)
		return new FutureTask<V>(task, result);
	else
		return aes.newTaskFor(task, result);
}

五、任务的执行

// 提交Callable任务
public Future<V> submit(Callable<V> task) {
	// 判空
	if (task == null) throw new NullPointerException();
	// 封装成RunnableFuture类型
	RunnableFuture<V> f = newTaskFor(task);
	// 线程池执行
	executor.execute(new QueueingFuture(f));
	return f;
}
public Future<V> submit(Runnable task, V result) {
	if (task == null) throw new NullPointerException();
	RunnableFuture<V> f = newTaskFor(task, result);
	executor.execute(new QueueingFuture(f));
	return f;
}

六、结果获取

// 获取完成的任务,获取不到会阻塞,直到获取到或者等待线程被中断抛出中断异常
public Future<V> take() throws InterruptedException {
	return completionQueue.take();
}
// 获取完成的任务,获取不到不会阻塞,不支持中断,从队列中获取不到完成的任务直接返回null
public Future<V> poll() {
	return completionQueue.poll();
}
// 获取完成的任务,循环的获取已完成的任务,直到获取到,或者超时,或者获取线程被中断,支持中断异常
public Future<V> poll(long timeout, TimeUnit unit)
	throws InterruptedException {
	return completionQueue.poll(timeout, unit);
}

参考文章:事故总结集锦-多线程使用不当导致的OOM -ExecutorCompletionService的 “套路”
参考文章:ExecutorCompletionService源码分析

标签:task,executor,并发,任务,线程,使用,null,completionQueue,ExecutorCompletionService
来源: https://www.cnblogs.com/hujh2022/p/16444908.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有