ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

使用ThreadPoolExecutor 创建线程池,完成并行操作

2020-03-26 18:58:01  阅读:288  来源: 互联网

标签:java calls 并行操作 util 线程 new import public ThreadPoolExecutor


日常工作中很多地方很多效率极低的操作,往往可以改串行为并行,执行效率往往提高数倍,废话不多说先上代码

1、用到的guava坐标

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>18.0</version>
        </dependency>
View Code

2、创建一个枚举保证线程池是单例

package com.hao.service;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

public enum ExecutorManager {

    INSTANCE;

    private ExecutorManager() {

    }

    private static int AVAILABLEPROCESSORS = Runtime.getRuntime().availableProcessors();

    public static final ThreadPoolExecutor threadPoolExecutor =
        new ThreadPoolExecutor(AVAILABLEPROCESSORS * 50, AVAILABLEPROCESSORS * 80, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(AVAILABLEPROCESSORS * 2000),
            new ThreadFactoryBuilder().setNameFormat("ExecutorManager-pool-Thread-%d").build());
    
    

}
View Code

3、创建一个方法类

package com.hao.service;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;

import org.springframework.stereotype.Service;

import com.google.common.base.Preconditions;

@Service
public class ExecutorContext {

    public ExecutorService executorService;
    private int DEFAULT_WAIT_SECONDS = 2;

    @PostConstruct
    public void init() {
        executorService = ExecutorManager.threadPoolExecutor;
    }

    public <T> List<T> waitAllFutures(List<Callable<T>> calls, int milliseconds) throws Exception {
        Preconditions.checkArgument(null != calls && !calls.isEmpty(), "callable empty.");
        LatchedCallables<T> latchAndCallables = wrapCallables(calls);
        List<Future<T>> futurres = new LinkedList<>();
        for (CountdownedCallable<T> callable : latchAndCallables.wrappedCallables) {
            if (null != callable) {
                futurres.add(executorService.submit(callable));
            }
        }
        List<T> rets = new ArrayList<>();
        if (latchAndCallables.latch.await(milliseconds, TimeUnit.MILLISECONDS)) {
            for (CountdownedCallable<T> call : latchAndCallables.wrappedCallables) {
                rets.add(call.getResult());
            }
        } else {
            for (Future<T> future : futurres) {
                if (!future.isDone()) {
                    future.cancel(true);
                }
            }
        }
        return rets;
    }

    public <T> List<T> waitAllCallables(List<Callable<T>> calls, int seconds) throws Exception {
        Preconditions.checkArgument(null != calls && !calls.isEmpty(), "callable empty.");
        LatchedCallables<T> latchAndCallables = wrapCallables(calls);
        for (CountdownedCallable<T> callable : latchAndCallables.wrappedCallables) {
            executorService.submit(callable);
        }
        List<T> rets = new ArrayList<>();
        if (latchAndCallables.latch.await(seconds, TimeUnit.SECONDS)) {
            for (CountdownedCallable<T> call : latchAndCallables.wrappedCallables) {
                rets.add(call.getResult());
            }
        }
        return rets;
    }

    public <T> List<T> waitAllCallables(@SuppressWarnings("unchecked") Callable<T>... calls) throws Exception {
        Preconditions.checkNotNull(calls, "callable empty.");
        return waitAllCallables(Arrays.asList(calls), DEFAULT_WAIT_SECONDS);
    }

    private static <T> LatchedCallables<T> wrapCallables(List<Callable<T>> callables) {
        CountDownLatch latch = new CountDownLatch(callables.size());
        List<CountdownedCallable<T>> wrapped = new ArrayList<>(callables.size());
        for (Callable<T> callable : callables) {
            wrapped.add(new CountdownedCallable<>(callable, latch));
        }

        LatchedCallables<T> returnVal = new LatchedCallables<>();
        returnVal.latch = latch;
        returnVal.wrappedCallables = wrapped;
        return returnVal;
    }

    public static class LatchedCallables<T> {
        public CountDownLatch latch;
        public List<CountdownedCallable<T>> wrappedCallables;
    }

    public static class CountdownedCallable<T> implements Callable<T> {
        private final Callable<T> wrapped;
        private final CountDownLatch latch;
        private T result;

        public CountdownedCallable(Callable<T> wrapped, CountDownLatch latch) {
            this.wrapped = wrapped;
            this.latch = latch;
        }

        @Override
        public T call() throws Exception {
            try {
                result = wrapped.call();
                return result;
            } finally {
                latch.countDown();
            }
        }

        public T getResult() {
            return result;
        }
    }

}
View Code

4、创建一个测试类

package com.hao;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

import com.hao.bean.Employee;
import com.hao.service.EmployeeService;
import com.hao.service.ExecutorContext;

public class ExecutorTest extends BaseTest {

    @Autowired
    ExecutorContext executorContext;
    
    @Autowired
    EmployeeService employeeService;

    @Test
    public void test01() {
        long t0 = System.currentTimeMillis();
        List<Employee> employees = new ArrayList<Employee>();
        try {
            List<Callable<Integer>> calls = new ArrayList<Callable<Integer>>();
            Callable<Integer> able1 = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(5000);
                    Employee employee = employeeService.getById(1L);
                    employees.add(employee);
                    return 1;
                }

            };
            calls.add(able1);
            Callable<Integer> able2 = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(5000);
                    Employee employee = employeeService.getById(2L);
                    employees.add(employee);
                    return 2;
                }

            };
            calls.add(able2);
            Callable<Integer> able3 = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(5000);
                    Employee employee = employeeService.getById(3L);
                    employees.add(employee);
                    return 3;
                }

            };
            calls.add(able3);

            executorContext.waitAllCallables(calls, 5000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        for (Employee employee : employees) {
            System.out.println(employee);
        }
        System.out.println(System.currentTimeMillis() - t0);
    }

}
View Code

5、执行结果如下

 

 次工具类的好处在于能够像使用普通 service一样使用线程池完成并行操作,当然不要忘记将 ExecutorContext 置于能被sping扫描到的地方,

否则不能直接使用@Autowired 依赖注入

 

标签:java,calls,并行操作,util,线程,new,import,public,ThreadPoolExecutor
来源: https://www.cnblogs.com/zhanh247/p/12576491.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有