代码如下(示例):
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Description:线程池管理
* @author: lw
* @date: 2018年12月17日
*/
@Configuration
@EnableAsync
public class ThreadPoolTaskExecutorConfig {
//阻塞队列
private static final int workQueue = 20;
//线程空闲后的存活时长
private static final int keepAliveTime = 30;
//Cpu核数
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
//核心线程数量大小
private static final int corePoolSize = Math.max(2, Math.min(CPU_COUNT - 1, 4));
//线程池最大容纳线程数
private static final int maxPoolSize = CPU_COUNT * 2 + 1;
@Bean("asyncTaskExecutor")
public ThreadPoolTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("asyncTaskExecutor-");//线程前缀
threadPoolTaskExecutor.setCorePoolSize(corePoolSize);//核心线程数
threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);//最大线程数
threadPoolTaskExecutor.setQueueCapacity(workQueue);//等待队列
threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveTime);//线程池维护线程所允许的空闲时间,单位为秒
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 线程池对拒绝任务(无线程可用)的处理策略
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
代码如下(示例):
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.CompletableFuture;
@Autowired
@Qualifier("asyncTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
public List<JSONObect> listResult(){
List<JSONObect> result = new ArrayList<>();
List<CompletableFuture<HttpResponse>> futures = new ArrayList<>();
for (String url UrlList) {
CompletableFuture<HttpResponse> future = CompletableFuture.supplyAsync(() -> {
//方法处理逻辑
HttpResponse res = null;
try {
res = HttpRequest.post(url)
.header("Authorization", accessToken)
.body(JSON.toJSONString(new JSONObject()))
.execute();
} catch (Exception e) {
log.error(e.getMessage());
if (res != null) {
log.error(res.toString());
}
}
return res;
}, threadPoolTaskExecutor);//指定线程池
futures.add(future);
try {
//方法处理完成回调
future.thenAccept((HttpResponse res) -> {
if (res.isOk()) {
String body = res.body();
if (JSONValidator.from(body).validate()) {
JSONObject dataJSON = JSON.parseObject(body);
if (dataJSON.containsKey("data")) {
if (!ObjectUtils.isEmpty(dataJSON.get("data"))) {
result.add(dataJSON.get("data"));
}
}
}
}
});
} catch (Exception e) {
log.error(e.getMessage());
}
}
//等待所有线程执行完毕
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.whenComplete((v, th) -> {
log.info("所有任务执行完成触发");
}).join();
//返回结果
return result;
}
总结
本内容仅是对配置线程池和指定线程池的异步并发执行
使用的时候一定要先理解再使用,勿在浮沙筑高台
Callable,有结果的同步行为,比如做蛋糕,产生蛋糕
Runnable,无结果的同步行为,比如喝牛奶,仅仅就是喝
Future,异步封装Callable/Runnable,比如委托给师傅(其他线程)去做糕点
CompletableFuture,封装Future,使其拥有回调功能,比如让师傅主动告诉我蛋糕做好了