EN
/video/87947443.html

Java 创建线程池的几种方式

2025-06-24 11:47:32 来源: 新华社
字号:默认 超大 | 打印 |

一、创建线程池四种方式

  1. 使用 Executors类,Executors 类是 Java 中用于创建线程池的工厂类,它提供了多种静态方法来创建不同类型的线程池
  2. 使用 ThreadPoolExecutor类,ThreadPoolExecutor 是 Java 中线程池的一个核心类,它提供了更细粒度的控制来创建和管理线程池
  3. 使用 FutureCallable,Future 和 Callable 是并发编程中非常重要的两个接口,它们通常与 ExecutorService 一起使用来执行异步任务。
  4. 使用 SpringThreadPooltaskExecutor,ThreadPoolTaskExecutor 是一个基于 java.util.concurrent.ThreadPoolExecutor 的扩展,提供了更丰富的配置选项和与Spring集成的特性

二、线程池重要参数

  1. corePoolSize (int):线程池的基本大小,即在没有任务执行时线程池的大小。当新任务提交时,线程池会优先使用已有的空闲线程。
  2. maximumPoolSize (int):线程池能够容纳同时执行的最大线程数。这个参数用于控制线程池的最大规模,防止因任务过多而导致资源耗尽。
  3. keepAliveTime (long):当线程池中的线程数量超过 corePoolSize 时,多余的空闲线程能等待新任务的最长时间。超过这个时间后,多余的线程将被终止。
  4. unit (TimeUnit):keepAliveTime 参数的时间单位,常见的时间单位有 TimeUnit.SECONDS、TimeUnit.MINUTES 等。
  5. workQueue (BlockingQueue):一个阻塞队列,用于存储等待执行的任务。常用的阻塞队列有 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue 等。
  6. threadFactory (ThreadFactory):用于创建新线程的工厂。可以通过实现 ThreadFactory 接口来自定义线程的创建过程。
  7. handler (RejectedExecutionHandler):当任务太多而线程池无法处理时,用于定义拒绝任务的策略。常见的拒绝策略有 ThreadPoolExecutor.AbortPolicy、ThreadPoolExecutor.CallerRunsPolicy 和 ThreadPoolExecutor.DiscardPolicy 等。
packagecom.demo.threadPool;importjava.util.concurrent.*;publicclassMainDemo1{ publicstaticvoidmain(String[]args){ intcorePoolSize =5;// 核心线程数intmaximumPoolSize =10;// 最大线程数longkeepAliveTime =1;// 非核心线程空闲存活时间/**         * 存活时间单位         * TimeUnit.DAYS:天         * TimeUnit.HOURS:小时         * TimeUnit.MINUTES:分         * TimeUnit.SECONDS:秒         * TimeUnit.MILLISECONDS:毫秒         * TimeUnit.MICROSECONDS:微妙         * TimeUnit.NANOSECONDS:纳秒         */TimeUnitunit =TimeUnit.MINUTES;BlockingQueue<Runnable>workQueue =newLinkedBlockingQueue<Runnable>();// 工作队列ThreadFactorythreadFactory =Executors.defaultThreadFactory();// 线程工厂RejectedExecutionHandlerhandler =newThreadPoolExecutor.AbortPolicy();// 拒绝策略ThreadPoolExecutorexecutor =newThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);}}

三、线程池5种状态

  1. RUNNING:正常运行状态,可接收新任务,可处理阻塞队列中的任务
  2. SHUTDOWN:不会接收新任务,但会处理阻塞队列剩余任务
  3. STOP:会中断正在执行的任务,并抛弃阻塞队列任务
  4. TIDYING:任务全执行完毕,活动线程为 0,即将进入终结
  5. TERMINATED:终结状态

四、Executors 类创建线程池

  1. new newCachedThreadPool():创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。线程池的规模不存在限制。(数量不固定的线程池)
  2. new newFixedThreadPool():创建一个固定长度线程池,可控制线程最大并发数,超出的线程会在队列中等待。(固定数量的线程池)
  3. new newScheduledThreadPool():创建一个固定长度线程池,支持定时及周期性任务执行。(定时线程池)
  4. new newSingleThreadExecutor():创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。(单线程的线程池)
  • 固定线程池创建 ( Executors.newFixedThreadPool(5) ):创建一个固定大小的线程池。线程池中的线程数量是固定的,即使有些线程处于空闲状态,它们也不会被回收。
packagecom.demo.threadPool;importjava.util.List;importjava.util.concurrent.*;publicclassMainThreadPool{ publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{ //初始化固定大小线程池ExecutorServiceexecutor1 =Executors.newFixedThreadPool(5);//使用 execute(Runnable command) 方法提交一个不需要返回结果的任务,// 或者使用submit(Callable task) 方法提交一个需要返回结果的任务。for(inti =0;i <10;i++){ executor1.execute(newTaskR(i));}//使用 submit(Callable task) 任务并获取 Future//使用 Future.get() 方法等待任务完成并获取结果。这个方法会阻塞调用线程直到任务完成。for(inti =0;i <10;i++){ Future<String>future =executor1.submit(newTaskC(i));System.out.println("线程返回结果  "+future.get());}// 当所有任务都执行完毕,或者需要关闭线程池时,调用 shutdown() 方法。// 这将等待正在执行的任务完成,但不接收新任务。executor1.shutdown();//使用 shutdownNow() 方法尝试立即停止所有正在执行的任务,并返回等待执行的任务列表List<Runnable>notExecutedTasks =executor1.shutdownNow();for(Runnablels :notExecutedTasks){ System.out.println(ls);}//使用 awaitTermination() 方法等待线程池关闭,直到所有任务完成或超时。booleanres =executor1.awaitTermination(60,TimeUnit.SECONDS);System.out.println("执行结果:"+res);}}/** * 实现 Runnable 接口 */classTaskRimplementsRunnable{ privateintid;publicTaskR(intid){ this.id =id;}publicvoidrun(){ System.out.println("TaskR "+id +" is running...");}}/** * 实现 Callable 接口 * 有返回值 */classTaskCimplementsCallable{ privateintid;publicTaskC(intid){ this.id =id;}@OverridepublicObjectcall(){ System.out.println("TaskC "+id +" is running...");returnid+"--TaskC";}}
  • 单线程池 (newSingleThreadExecutor):创建一个只有一个线程的线程池。即使有多个任务提交,它们也会被排队,逐个由单个线程执行
packagecom.demo.threadPool;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.Future;/** * 单线程池 (newSingleThreadExecutor): * 创建一个只有一个线程的线程池。即使有多个任务提交,它们也会被排队,逐个由单个线程执行。 */publicclassMainOne{ publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{ /**         * 单线程:创建的执行服务内部有一个线程。所有提交给它的任务将会序列化执行,也就是说,它会在单个线程上依次执行任务,不会有并发执行的情况发生         * 任务队列:如果有多个任务提交给这个执行器,除了当前正在执行的任务外,其他任务将会在一个无界队列中等待,直到线程可用         * 处理任务失败:如果执行中的线程由于任务抛出异常而终止,执行服务会安排一个新的线程来替换它,以继续执行后续的任务         * 使用场景: newSingleThreadExecutor 非常适合需要顺序执行的任务,并且要求任务之间不受并发问题影响的场景         */ExecutorServiceexecutor =Executors.newSingleThreadExecutor();for(inti =0;i <10;i++){ executor.execute(newTaskR(i));}//使用 submit(Callable task) 任务并获取 Future//使用 Future.get() 方法等待任务完成并获取结果。这个方法会阻塞调用线程直到任务完成。for(inti =0;i <10;i++){ Future<String>future =executor.submit(newTaskC(i));System.out.println("线程返回结果  "+future.get());}// 当所有任务都执行完毕,或者需要关闭线程池时,调用 shutdown() 方法。// 这将等待正在执行的任务完成,但不接收新任务。executor.shutdown();}}
  • 缓存线程池 (newCachedThreadPool):创建一个可根据需要创建新线程的线程池。如果线程空闲超过60秒,它们将被终止并从池中移除
packagecom.demo.threadPool;importjava.util.Date;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;/** * 缓存线程池 (newCachedThreadPool): * 创建一个可根据需要创建新线程的线程池。如果线程空闲超过60秒,它们将被终止并从池中移除 */publicclassMainCacheThreadPool{ publicstaticvoidmain(String[]args)throwsInterruptedException{ System.out.println(Thread.currentThread().getName()+"线程: Start at: "+newDate());//初始化缓存线程池ExecutorServiceexec =Executors.newCachedThreadPool();for(inti =1;i <10;i++){ System.out.println("添加了第"+i +"个任务类");Thread.sleep(2000);exec.execute(newTaskR(i));}//所有任务结束后关闭线程池exec.shutdown();System.out.println(Thread.currentThread().getName()+" 线程: Finished all threads at:"+newDate());}}
  • 调度线程池 (newScheduledThreadPool):创建一个支持定时任务和周期性任务的线程池
packagecom.demo.threadPool;importjava.util.Date;importjava.util.concurrent.Executors;importjava.util.concurrent.ScheduledExecutorService;importjava.util.concurrent.TimeUnit;/** * 固定频率执行 * 调度线程池 (newScheduledThreadPool): * 创建一个支持定时任务和周期性任务的线程池 */publicclassMainScheduledThreadPool{ publicstaticvoidmain(String[]args){ /**         * 场景描述         * 假设你需要一个应用程序,该程序能够每10秒执行一次任务,并在启动后1分钟开始执行。此外,         * 你还需要能够安排一次性任务在未来的某个时间点执行         */ScheduledExecutorServicethreadPool =Executors.newScheduledThreadPool(10);// 安排定期任务// 初始延迟1分钟,之后每10秒执行一次threadPool.scheduleAtFixedRate(newTaskR(2),60,10,TimeUnit.SECONDS);// 安排一次性任务// 使用 schedule 方法安排一个任务,在指定的延迟后执行一次// 延迟5分钟后执行threadPool.schedule(newTaskR(3),5,TimeUnit.MINUTES);// 关闭线程池// 当不再需要线程池时,调用 shutdown 方法来关闭线程池。这将等待正在执行的任务完成,但不接收新任务threadPool.shutdown();// 等待线程池关闭// 使用 awaitTermination 方法等待线程池关闭,直到所有任务完成或超时。try{ threadPool.awaitTermination(1,TimeUnit.HOURS);}catch(InterruptedExceptione){ e.printStackTrace();}}}
  • 使用给定的线程工厂创建线程池:可以提供一个自定义的 ThreadFactory 来创建线程池中的线程
packagecom.demo.threadPool;importjava.util.concurrent.*;/** * 使用给定的线程工厂创建线程池 */publicclassMainFactory{ publicstaticvoidmain(String[]args){ //自定义线程工厂创建ThreadFactorythreadFactory =newThreadFactory(){ @OverridepublicThreadnewThread(Runnabler){ returnnewThread(r);}};//使用给定的线程工厂创建线程池ExecutorServiceexecutor =Executors.newFixedThreadPool(5,threadFactory);executor.execute(newTaskR(2));}}
  • 自定义线程工厂创建:自定义线程工厂可以设置自己的线程名,设置守护线程,设置线程优先级,处理未捕获的异常等
packagecom.demo.threadPool;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.ThreadFactory;importjava.util.concurrent.atomic.AtomicInteger;/** *  自定义线程工厂:设置线程名,守护线程,优先级以及UncaughtExceptionHandler */publicclassMainFactoryimplementsThreadFactory{ privatefinalThreadGroupgroup;privatefinalAtomicIntegerthreadNumber =newAtomicInteger(1);privatefinalStringnamePrefix;publicMainFactory(StringnamePrefix){ SecurityManagers =System.getSecurityManager();group =(s !=null)?s.getThreadGroup():Thread.currentThread().getThreadGroup();this.namePrefix =namePrefix +"-thread-";}publicMainFactory(ThreadGroupgroup,StringnamePrefix){ this.group =group;this.namePrefix =namePrefix;}@OverridepublicThreadnewThread(Runnabler){ Threadt =newThread(group,r,namePrefix +threadNumber.getAndIncrement(),0);//守护线程if(t.isDaemon())t.setDaemon(true);//线程优先级if(t.getPriority()!=Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);/**         * 处理未捕捉的异常         */t.setUncaughtExceptionHandler(newThread.UncaughtExceptionHandler(){ @OverridepublicvoiduncaughtException(Threadt,Throwablee){ System.out.println("处理未捕获的异常");}});returnt;}//测试方法publicstaticvoidmain(String[]args){ ExecutorServicepool =Executors.newFixedThreadPool(5,newMainFactory("测试线程"));for(inti =0;i <10;i++){ pool.execute(newRunnable(){ @Overridepublicvoidrun(){ System.out.println("线程处理");//未捕获的异常,走自定义的UncaughtExceptionHandler逻辑inti =1/0;}});}pool.shutdown();}}

五、ThreadPoolExecutor 类创建线程池

ThreadPoolExecutor 是 java.util.concurrent 包中用来创建线程池的一个类。它提供了一种灵活的方式来管理线程池,允许你控制线程的创建和销毁。

ThreadPoolExecutor类中的几个重要方法

  1. execute():向线程池提交一个任务,交由线程池去执行
  2. submit():也是向线程池提交任务,但是和execute()方法不同,它能够返回任务执行的结果它实际上还是调用的 execute() 方法,只不过它利用了 Future 来获取任务执行结果
  3. invokeAll():提交一个任务集合
  4. invokeAny(): 提交一个任务集合,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
  5. shutdown():关闭线程池,再也不会接受新的任务不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止
  6. shutdownNow():关闭线程池,再也不会接受新的任务立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
  7. isShutdown():不在 RUNNING 状态的线程池,此方法就返回 true
  8. isTerminated():线程池状态是否是 TERMINATED
packagecom.demo.threadPool;importjava.util.Random;importjava.util.concurrent.*;/** * ThreadPoolExecutor 是 java.util.concurrent 包中用来创建线程池的一个类 * 它提供了一种灵活的方式来管理线程池,允许你控制线程的创建和销毁。 * 以下是几种常见的创建 ThreadPoolExecutor 线程池的方式 * 实际上 Executors 类也是调用 ThreadPoolExecutor 类创建的线程 */publicclassMainThreadPoolExecutor{ //测试方法publicstaticvoidmain(String[]args){ /**         * 核心线程数,核心线程就是一直存在的线程         */intcorePoolSize =5;/**         * 最大线程数,表示线程池中最多能创建多少个线程         * 非核心线程数 = 最大线程数 - 核心线程数         */intmaximumPoolSize =10;/**         * 默认情况下,只有当线程池中的线程数大于corePoolSize时,         * keepAliveTime才会起作用,则会终止,直到线程池中的线程数不超过corePoolSize         * 则会终止,直到线程池中的线程数不超过corePoolSize         * 但是如果调用了 allowCoreThreadTimeOut(boolean) 方法         * 在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为 0         * 针对非核心线程而言,表示线程没有任务执行时最多保持多久时间会终止         */longkeepAliveTime =60;/**         * 时间单位         * 与 keepAliveTime 配合使用,针对非核心线程         */TimeUnitunit =TimeUnit.SECONDS;/**         * 存放任务的阻塞队列         */BlockingQueue<Runnable>workQueue =newLinkedBlockingQueue<>(5);/**         * 创建线程的工厂,可以为线程创建时起个好名字         */ThreadFactorythreadFactory =newThreadFactory(){ @OverridepublicThreadnewThread(Runnabler){ returnnewThread(r);}};/**         * 拒绝策略         * 任务太多的时候会进行拒绝操作         * 核心线程,非核心线程,任务队列都放不下时         */// 自定义拒绝策略RejectedExecutionHandlerdefaultHandler1 =newMyRejectedExecutionHandler();// 默认策略,在需要拒绝任务时抛出RejectedExecutionExceptionRejectedExecutionHandlerdefaultHandler3 =newThreadPoolExecutor.AbortPolicy();// 直接在 execute 方法的调用线程中运行被拒绝的任务,如果线程池已经关闭,任务将被丢弃;RejectedExecutionHandlerdefaultHandler2 =newThreadPoolExecutor.CallerRunsPolicy();// 直接丢弃任务RejectedExecutionHandlerdefaultHandler4 =newThreadPoolExecutor.DiscardPolicy();// 丢弃队列中等待时间最长的任务,并执行当前提交的任务,如果线程池已经关闭,任务将被丢弃RejectedExecutionHandlerdefaultHandler5 =newThreadPoolExecutor.DiscardOldestPolicy();/**         * 创建线程池         */ExecutorServiceservice1 =newThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,defaultHandler1);for(inti =0;i <10;i++){ System.out.println("添加第"+i+"个任务");service1.execute(newMyThread("线程"+i));}service1.shutdown();}}/** * 自定义拒绝策略 */classMyRejectedExecutionHandlerimplementsRejectedExecutionHandler{ @OverridepublicvoidrejectedExecution(Runnabler,ThreadPoolExecutorexecutor){ newThread(r,"新线程"+newRandom().nextInt(10)).start();}}/** * 线程类 */classMyThreadimplementsRunnable{ Stringname;publicMyThread(Stringname){ this.name =name;}@Overridepublicvoidrun(){ try{ Thread.sleep(2000);}catch(InterruptedExceptione){ e.printStackTrace();}System.out.println("线程:"+Thread.currentThread().getName()+" 执行:"+name +"  run");}}

六、Future 和 Callable 类使用创建线程池

  1. Callable是一个函数式接口,它允许你定义一个任务,该任务可以返回一个结果并抛出异常。它是 Runnable 接口的扩展,增加了返回值和抛出异常的能力。
  • 返回值:与 Runnable 接口不同,Callable 任务可以返回一个值,返回值通过 Future 对象获取。
  • 异常:Callable 任务可以抛出异常,这些异常可以通过 Future 对象处理。
  1. Future接口代表异步计算的结果。它提供了检查计算是否完成的方法,以及获取计算结果的方法。
  • get():获取计算结果。如果计算尚未完成,此方法会阻塞,直到计算完成或抛出异常。
  • isDone():检查计算是否完成。
  • cancel():尝试取消任务。
  • isCancelled():检查任务是否被取消
packagecom.demo.threadPool;importjava.util.concurrent.*;/** * Future 使用 */publicclassMainFuture{ publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{ ExecutorServiceexecutorService =Executors.newFixedThreadPool(1);System.out.println("开始时间戳为:"+System.currentTimeMillis());Future<String>future =executorService.submit(newTest01());Stringresult =future.get();//获取计算结果。如果计算尚未完成,此方法会阻塞,直到计算完成或抛出异常booleanisdone =future.isDone();//检查计算是否完成booleancancel =future.cancel(true);//尝试取消任务booleaniscancelled =future.isCancelled();//检查任务是否被取消System.out.println("result:"+result);System.out.println("isdone:"+isdone);System.out.println("cancel:"+cancel);System.out.println("iscancelled:"+iscancelled);System.out.println("结束时间戳为:"+System.currentTimeMillis());     executorService.shutdown();}}classTest01implementsCallable{ @OverridepublicObjectcall()throwsException{ return"你好";}}

七、Spring 的 ThreadPoolTaskExecutor 类创建线程池

ThreadPoolTaskExecutor 是 Spring 框架提供的一个线程池实现,它扩展了 Java 的 ThreadPoolExecutor 并提供了一些额外的配置和功能
  1. 添加依赖: 如果你的项目是一个 Maven 项目,确保你的 pom.xml 文件中包含了 Spring Boot 的依赖
  2. 配置线程池: 在 Spring Boot 应用程序中,你可以通过 Java 配置类来配置 ThreadPoolTaskExecutor
packagecom.cnpc.epai.assetcatalog.dmp.controller;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;importjava.util.concurrent.ThreadPoolExecutor;/** * 线程池配置类 */@ConfigurationpublicclassConfigPoolConfiguration{ @Bean("TaskExecutorDemo")publicThreadPoolTaskExecutortaskExecutorDemo(){ ThreadPoolTaskExecutorthreadPoolTaskExecutor =newThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(10);// 核心线程数threadPoolTaskExecutor.setMaxPoolSize(20);// 最大线程数threadPoolTaskExecutor.setQueueCapacity(100);//工作队列threadPoolTaskExecutor.setKeepAliveSeconds(60);// 非核心线程的空闲存活时间threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);//指定是否允许核心线程超时。这允许动态增长和收缩,即使与非零队列结合使用也是如此(因为最大池大小只有在队列已满时才会增长)threadPoolTaskExecutor.setThreadNamePrefix("monitor-thread-pool-");// 设置线程名前缀threadPoolTaskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.AbortPolicy());// 拒绝策略threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);// 设置线程池关闭时需要等待子任务执行完毕,才销毁对应的beanthreadPoolTaskExecutor.initialize();//初始化线程池returnthreadPoolTaskExecutor;}}

测试类

packagecom.cnpc.epai.assetcatalog.dmp.controller;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.scheduling.annotation.Async;importorg.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;importorg.springframework.stereotype.Service;@ServicepublicclassTestService{ @AutowiredprivateThreadPoolTaskExecutortaskExecutor;@Async("taskExecutor")publicvoidexecuteTask(){ taskExecutor.execute(()->{ System.out.println("Executing task in thread: "+Thread.currentThread().getName());});}}

【我要纠错】责任编辑:新华社