线程池

配置线程池

代码如下(示例):

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @Description:线程池管理
 * @author: lw
 * @date: 2018年12月17日
 */
@Configuration
@EnableAsync
public class ThreadPoolTaskExecutorConfig {
    //阻塞队列
    private static final int workQueue = 20;
    //线程空闲后的存活时长
    private static final int keepAliveTime = 30;
    //Cpu核数
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    //核心线程数量大小
    private static final int corePoolSize = Math.max(2, Math.min(CPU_COUNT - 1, 4));
    //线程池最大容纳线程数
    private static final int maxPoolSize = CPU_COUNT * 2 + 1;

    @Bean("asyncTaskExecutor")
    public ThreadPoolTaskExecutor asyncTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix("asyncTaskExecutor-");//线程前缀
        threadPoolTaskExecutor.setCorePoolSize(corePoolSize);//核心线程数
        threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);//最大线程数
        threadPoolTaskExecutor.setQueueCapacity(workQueue);//等待队列
        threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveTime);//线程池维护线程所允许的空闲时间,单位为秒
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 线程池对拒绝任务(无线程可用)的处理策略
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
}

异步执行方法,获取返回值

代码如下(示例):

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.CompletableFuture;

@Autowired
@Qualifier("asyncTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;

public List<JSONObect> listResult(){
	List<JSONObect> result = new ArrayList<>();
	List<CompletableFuture<HttpResponse>> futures = new ArrayList<>();
	for (String url UrlList) {
		CompletableFuture<HttpResponse> future = CompletableFuture.supplyAsync(() -> {
					//方法处理逻辑
	                HttpResponse res = null;
	                try {
	                    res = HttpRequest.post(url)
	                            .header("Authorization", accessToken)
	                            .body(JSON.toJSONString(new JSONObject()))
	                            .execute();
	                    } catch (Exception e) {
	                        log.error(e.getMessage());
	                        if (res != null) {
	                            log.error(res.toString());
	                        }
	                    }
	                    return res;
	                }, threadPoolTaskExecutor);//指定线程池

	                futures.add(future);

	                try {
	                	//方法处理完成回调
	                    future.thenAccept((HttpResponse res) -> {
	                        if (res.isOk()) {
	                            String body = res.body();
	                            if (JSONValidator.from(body).validate()) {
	                                JSONObject dataJSON = JSON.parseObject(body);
	                                if (dataJSON.containsKey("data")) {
	                                    if (!ObjectUtils.isEmpty(dataJSON.get("data"))) {
	                                        result.add(dataJSON.get("data"));
	                                    }
	                                }
	                            }
	                        }
	                    });
	                } catch (Exception e) {
	                    log.error(e.getMessage());
	                }

	}
	//等待所有线程执行完毕
	CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
	        .whenComplete((v, th) -> {
	            log.info("所有任务执行完成触发");
	        }).join();
     //返回结果
	return result;
}

总结

本内容仅是对配置线程池和指定线程池的异步并发执行

使用的时候一定要先理解再使用,勿在浮沙筑高台

CompletableFuture原理解析

CompletableFuture原理解析

Callable,有结果的同步行为,比如做蛋糕,产生蛋糕

Runnable,无结果的同步行为,比如喝牛奶,仅仅就是喝

Future,异步封装Callable/Runnable,比如委托给师傅(其他线程)去做糕点

CompletableFuture,封装Future,使其拥有回调功能,比如让师傅主动告诉我蛋糕做好了