首先,作为Java开发的同学来说,java.util.concurrent并发包一定不会陌生,多多少少也会接触或使用过。今天的主角就是java.util.concurrent.ThreadPoolExecutor和java.util.concurrent.CountDownLatch。本文不详细赘述线程池的原理和数据结构,只是先普及下入门知识,然后再看如何正确的把这两种技术结合起来使用,来达到理想的效果。idea中直接command+N(mac下)输入这两个类即可打开JDK的源码:
CounDownLatch部分源码
CountDownLatch看起来很简单,就几个方法:
ThreadPoolExecutor类图
上图是JDK自带的ThreadPoolExecutor类结构图,其实也很简单,大家可以自行打开看一下源码。这里我们直接说JDK自带的几种线程池:
了解各种线程池的特性,以便在不同的业务场景中使用不同的线程池。下图是使用线程池时候阿里规约扫描插件给出的建议。个人认为在使用线程池时候基本上注意事项就是:不要自己随意去创建一个线程池、不要在方法里创建线程池(内存泄漏)、建议项目中统一配置公共线程池,线程池的各个参数统一配置(区分计算密集型业务和IO密集型业务来配置不同的线程数)。
线程池使用规范
下面给出我在项目开发中自定义的线程池策略,经过压测环境长时间的高并发的压测,能够保证CPU及内存稳定。并结合CountDownLatch让我们的API能够快起来!
package com.test.app.conf.executor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池
* 提供打印出当前线程池状态的方法
*
* @Author: javaer
* @Date: 2019-07-08 18:21
*/
@Slf4j
public class AppThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private static final long serialVersionUID = -4778941758120026886L;
/**
* 打印线程池状态日志
* @param prefix 执行的方法
*/
private void showThreadPoolInfo(String prefix) {
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
log.info("{},{},taskCount[{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("execute with timeout");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("submit callable");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("submitListenable callable");
return super.submitListenable(task);
}
}
package com.test.app.conf.executor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池配置
* @Author: javaer
* @Date: 2019-07-08 18:16
*/
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {
/**
* 线程名称前缀
*/
private final static String THREAD_NAME_PREFIX = "app-async-service-";
/**
* 核心线程数
*/
@Value("${spring.threadPool.corePoolSize}")
private int corePoolSize;
/**
* 最大线程数
*/
@Value("${spring.threadPool.maxPoolSize}")
private int maxPoolSize;
/**
* 队列长度
*/
@Value("${spring.threadPool.queueCapacity}")
private int queueCapacity;
/**
* 线程存活时长
*/
@Value(("${spring.threadPool.keepAliveSeconds}"))
private int keepAliveSeconds;
@Bean
public ThreadPoolTaskExecutor appServiceExecutor() {
ThreadPoolTaskExecutor executor = new AppThreadPoolTaskExecutor();
//核心线程数
executor.setCorePoolSize(corePoolSize);
//最大线程数
executor.setMaxPoolSize(maxPoolSize);
//线程池队列大小
executor.setQueueCapacity(queueCapacity);
//线程存活时长
executor.setKeepAliveSeconds(keepAliveSeconds);
//配置线程池中的线程名称前缀
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
//当线程池maxPoolSize满了,采取由当前调用者所在的线程处理任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化线程池
executor.initialize();
log.info("thread pool executor initialized");
return executor;
}
}
CountDownLatch结合线程池使用
由于业务代码无法开放出来,所以在CountDownLatch+线程池使用的地方我提供了一个简单的使用示例,相信大家也能看明白。之前我在做我们APP的首页的时候,由于之前APP的首页全部数据由一个接口提供(这里就不吐槽接口设计的问题),压测的时候平均响应耗时8000ms,完全跑不起来的节奏。后来我采用了上面的方案优化了一波,再加了一层redis缓存(真的害怕哪天量突然上来,数据库就挂了),最终压测并发100的情况下,平均响应时间100ms左右,CPU和内存在压测的30分钟内也保持平稳的运行。
当然,CountDownLatch+线程池可以广泛应用于业务开发的很多地方,只要是这个API需要聚合多个地方的数据,那么它就有用武之地。让你的API飞起来,就是这么简单!
留言与评论(共有 0 条评论) |