文章目录
- Springboot项目中线程池使用整理
- 学习目标
- 线程池类型及特点
- Java基础线程池
- Spring ThreadPoolTaskExecutor
- ThreadPoolTaskExecutor vs ThreadPoolExecutor
- 主要区别
- @Async注解使用
- AsyncConfigurer接口
- CompletableFuture
- 线程池实现方式比较
- 1. Spring @Bean方式
- 2. AsyncConfigurer方式
- 3. Guava线程池
- 4. Hutool线程池
- 5. Apache Commons Pool
- 选择建议
- 定时任务实现
- 第三方框架线程池实现
- Guava线程池
- Hutool线程池
- Apache Commons Pool
- 线程池使用总结
- 1. 线程池配置选择
- 2. 异步任务处理
- 3. 定时任务处理
- 4. 监控和管理
- 5. 常见问题避免
Springboot项目中线程池使用整理
整理一下在java多线程的使用过程中使用线程池的方式,代码示例见https://github.com/2Red1Blue/java-jvm-tuning其中的spring-threadpool模块
todo: 待补充一份动态线程池的实现: promethus+nacos/apollo,暂时没时间写了,等有空补充上来
学习目标
- 理解Spring Boot中线程池的概念和作用
- 掌握不同类型的线程池配置方法
- 学习线程池的常用使用方式
线程池类型及特点
Java基础线程池
单线程池(SingleThreadExecutor)
- 只有一个工作线程
- 保证任务按照提交顺序执行(FIFO)
- 适用于需要保证顺序的场景
固定线程池(FixedThreadPool)
- 固定数量的工作线程
- 任务队列无界
- 适用于负载较重的服务器,可控制线程数量
缓存线程池(CachedThreadPool)
- 按需创建新线程
- 空闲线程会被回收(默认60秒)
- 适用于执行大量短期异步任务
定时任务线程池(ScheduledThreadPool)
- 支持定时及周期性任务执行
- 可设置核心线程数
- 适用于需要定期执行的任务
Spring ThreadPoolTaskExecutor
Spring的ThreadPoolTaskExecutor是对Java ThreadPoolExecutor的封装,具有以下特点:
更好的Spring集成
- 可通过配置文件配置
- 支持Spring的生命周期管理
- 可以方便地注入到Spring组件中
增强的功能
- 支持任务优先级
- 提供线程池运行状态监控
- 支持优雅关闭
- 可配置线程名前缀,方便调试
配置更灵活
- 核心线程数
- 最大线程数
- 队列容量
- 线程存活时间
- 拒绝策略
- 等待任务完成的设置
使用建议
- 在Spring环境中优先使用ThreadPoolTaskExecutor
- 可以通过配置文件统一管理线程池参数
- 建议设置有界队列避免OOM
- 根据实际需求选择合适的拒绝策略
ThreadPoolTaskExecutor vs ThreadPoolExecutor
主要区别
生命周期管理
- ThreadPoolTaskExecutor:
- 集成Spring生命周期
- 支持优雅关闭
- 可以等待任务完成后再关闭
- ThreadPoolExecutor:
配置方式
- ThreadPoolTaskExecutor:
- 支持Spring配置文件配置
- 可以通过@Value注入参数
- 提供更友好的配置方法
- ThreadPoolExecutor:
- 需要手动创建和配置
- 配置相对复杂
- 需要自己实现ThreadFactory
功能扩展
- ThreadPoolTaskExecutor:
- ThreadPoolExecutor:
@Async注解使用
配置方式
@EnableAsync@ConfigurationpublicclassAsyncConfig{@Bean("threadPoolTaskExecutor")publicThreadPoolTaskExecutorthreadPoolTaskExecutor(){}}
使用方式
@Async("threadPoolTaskExecutor")publicCompletableFuture<String>asyncMethod(){}
注意事项
- @Async方法必须是public
- 返回值应该是void或Future类型
- 在同一个类中调用@Async方法无效
- 异常处理需要特别注意
最佳实践
- 指定线程池名称避免使用默认线程池
- 使用CompletableFuture获取异步结果
- 合理设置线程池参数
- 添加异常处理机制
AsyncConfigurer接口
AsyncConfigurer接口是Spring提供的用于配置异步执行的接口,它提供了两个重要方法:
- 配置方式
@ConfigurationpublicclassAsyncExecutorConfigimplementsAsyncConfigurer{@Value("${thread-pool.core-pool-size}")privateintcorePoolSize;@OverridepublicExecutorgetAsyncExecutor(){ThreadPoolTaskExecutorexecutor =newThreadPoolTaskExecutor();executor.setCorePoolSize(corePoolSize);executor.setMaxPoolSize(10);executor.setQueueCapacity(25);executor.setThreadNamePrefix("async-");executor.initialize();returnexecutor;}@OverridepublicAsyncUncaughtExceptionHandlergetAsyncUncaughtExceptionHandler(){return(ex,method,params)->{};}}
- 特点
- 提供全局的异步执行器配置
- 支持统一的异常处理
- 可以通过配置文件注入参数
- 适合需要统一管理异步配置的场景
CompletableFuture
- 基本使用
CompletableFuture<String>future =CompletableFuture.supplyAsync(()->{return"结果";},executor);future.thenAccept(result ->{System.out.println(result);});
- 常用方法
CompletableFuture<String>future1 =CompletableFuture.supplyAsync(()->"Hello");CompletableFuture<String>future2 =CompletableFuture.supplyAsync(()->"World");CompletableFuture<String>combined =future1.thenCombine(future2,(s1,s2)->s1 +" "+s2);future.exceptionally(throwable ->"默认值").thenAccept(System.out::println);CompletableFuture.allOf(future1,future2).join();
- 使用建议
- 指定自定义线程池
- 合理设置超时时间
- 正确处理异常
- 避免阻塞操作
线程池实现方式比较
1. Spring @Bean方式
@BeanpublicThreadPoolTaskExecutorthreadPoolTaskExecutor(){ThreadPoolTaskExecutorexecutor =newThreadPoolTaskExecutor();executor.setCorePoolSize(corePoolSize);executor.setMaxPoolSize(maxPoolSize);executor.setQueueCapacity(queueCapacity);executor.setThreadNamePrefix("spring-thread-");executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());executor.setWaitForTasksToCompleteOnShutdown(true);returnexecutor;}
优点:
- Spring容器管理生命周期
- 支持配置文件配置
- 可以被其他组件注入
- 支持优雅关闭
适用场景:
- Spring应用程序
- 需要统一配置管理
- 需要Spring生命周期管理
2. AsyncConfigurer方式
优点:
- 提供全局异步配置
- 统一的异常处理
- 更好的Spring集成
适用场景:
- 需要全局异步配置
- 需要统一异常处理
- @Async注解使用场景
3. Guava线程池
优点:
- 更强大的Future实现
- 更好的异常处理
- 链式调用支持
- 丰富的工具类
适用场景:
- 复杂的异步任务链
- 需要更好的Future功能
- 需要更强的异常处理
4. Hutool线程池
优点:
适用场景:
5. Apache Commons Pool
优点:
适用场景:
选择建议
- 一般Spring项目:使用@Bean配置ThreadPoolTaskExecutor
- 全局异步配置:使用AsyncConfigurer
- 复杂异步任务链:考虑Guava
- 简单异步任务:可以使用Hutool
- 对象池需求:使用Commons Pool
定时任务实现
Spring Task实现方式
@Scheduled注解方式
@Scheduled(fixedRate =5000)publicvoidfixedRateTask(){}@Scheduled(cron ="0 0 12 * * ?")publicvoidcronTask(){}
TaskScheduler方式
@AutowiredprivateTaskSchedulertaskScheduler;taskScheduler.schedule(()->{},newCronTrigger("0/10 * * * * ?"));taskScheduler.scheduleWithFixedDelay(()->{},TimeUnit.SECONDS.toMillis(30));publicvoidstartHeartbeatCheck(){taskScheduler.scheduleWithFixedDelay(()->{try{performHeartbeatCheck();}catch(Exceptione){log.error("心跳检测失败",e);}},TimeUnit.SECONDS.toMillis(30));
}
### 配置说明1. 启用定时任务```java@Configuration@EnableSchedulingpublic class SchedulerConfig { @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(5); scheduler.setThreadNamePrefix("scheduled-task-"); return scheduler; }}
- Cron表达式说明
- 秒 分 时 日 月 周
- “0 0 12 * * ?” 每天12点执行
- “0/5 * * * * ?” 每5秒执行一次
- “0 0/30 * * * ?” 每30分钟执行一次
第三方框架线程池实现
Guava线程池
适用场景:
ThreadFactorythreadFactory =newThreadFactoryBuilder().setNameFormat("guava-pool-%d").setDaemon(true).setPriority(Thread.NORM_PRIORITY).setUncaughtExceptionHandler((thread,ex)->log.error("Thread error: {}",thread.getName(),ex)).build();ListeningExecutorServiceservice =MoreExecutors.listeningDecorator(newThreadPoolExecutor(10,10,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>(1000),threadFactory));ListenableFuture<Result>future =service.submit(task);Futures.addCallback(future,newFutureCallback<Result>(){publicvoidonSuccess(Resultresult){...}publicvoidonFailure(Throwablet){...}});
示例
一、订单处理流程
privatefinalListeningExecutorServiceguavaExecutor;publicThirdPartyThreadPoolExample(){ThreadFactorythreadFactory =newThreadFactoryBuilder().setNameFormat("business-process-pool-%d").setDaemon(true).setPriority(Thread.NORM_PRIORITY).setUncaughtExceptionHandler((thread,ex)->log.error("业务处理线程异常: {}",thread.getName(),ex)).build();guavaExecutor =MoreExecutors.listeningDecorator(newThreadPoolExecutor(5,20,60L,TimeUnit.SECONDS,newLinkedBlockingQueue<>(1000),threadFactory,newThreadPoolExecutor.CallerRunsPolicy()));}publicvoidprocessOrders(List<Order>orders){List<ListenableFuture<OrderResult>>futures =orders.stream().map(order ->guavaExecutor.submit(()->processOrder(order))).collect(Collectors.toList());ListenableFuture<List<OrderResult>>allFutures =Futures.allAsList(futures);Futures.addCallback(allFutures,newFutureCallback<>(){@OverridepublicvoidonSuccess(List<OrderResult>results){updateOrderStatuses(results);sendNotifications(results);}@OverridepublicvoidonFailure(Throwablet){log.error("批量订单处理失败",t);handleBatchProcessingFailure(orders);}},guavaExecutor);}
二、数据库连接管理
privatefinalGenericObjectPool<DatabaseConnection>connectionPool;publicThirdPartyThreadPoolExample(){GenericObjectPoolConfig<DatabaseConnection>poolConfig =newGenericObjectPoolConfig<>();poolConfig.setMaxTotal(20);poolConfig.setMaxIdle(10);poolConfig.setMinIdle(5);poolConfig.setTestOnBorrow(true);poolConfig.setTestWhileIdle(true);poolConfig.setTimeBetweenEvictionRuns(Duration.ofMinutes(1));poolConfig.setMinEvictableIdleTime(Duration.ofMinutes(5));connectionPool =newGenericObjectPool<>(newDatabaseConnectionFactory(),poolConfig);}publicvoidperformDatabaseOperations(Stringsql)throwsException{DatabaseConnectionconn =null;try{conn =connectionPool.borrowObject();conn.executeQuery(sql);}finally{if(conn !=null){connectionPool.returnObject(conn);}}}publicvoidprocessBatchData(List<String>dataItems)throwsException{List<DatabaseConnection>connections =newArrayList<>();try{for(inti =0;i <Math.min(dataItems.size(),5);i++){connections.add(connectionPool.borrowObject());}ExecutorServiceexecutor =Executors.newFixedThreadPool(connections.size());List<CompletableFuture<Void>>futures =newArrayList<>();for(inti =0;i <connections.size();i++){DatabaseConnectionconn =connections.get(i);List<String>batch =getBatch(dataItems,i,connections.size());futures.add(CompletableFuture.runAsync(()->{try{processBatch(conn,batch);}catch(Exceptione){log.error("批处理失败",e);thrownewRuntimeException(e);}},executor));}CompletableFuture.allOf(futures.toArray(newCompletableFuture[0])).join();executor.shutdown();}finally{for(DatabaseConnectionconn :connections){connectionPool.returnObject(conn);}}}
二、