/*** 自定义线程池* * 优点:可以自定义参数*
*/
public static void newThreadPoolExecutor() {ThreadPoolExecutor executor = new ThreadPoolExecutor(// 核心线程数3,// 最大线程数5,// 空闲线程最大存活时间60L,// 空闲线程最大存活时间单位TimeUnit.SECONDS,// 等待队列及大小new ArrayBlockingQueue<>(100),// 创建新线程时使用的工厂Executors.defaultThreadFactory(),// 当线程池达到最大时的处理策略
// new ThreadPoolExecutor.AbortPolicy() // 抛出RejectedExecutionHandler异常new ThreadPoolExecutor.CallerRunsPolicy() // 交由调用者的线程执行
// new ThreadPoolExecutor.DiscardOldestPolicy() // 丢掉最早未处理的任务
// new ThreadPoolExecutor.DiscardPolicy() // 丢掉新提交的任务);// 总共5个任务for (int i = 1; i <= 5; i++) {int taskIndex = i;executor.execute(() -> {log.info("线程 " + Thread.currentThread().getName() + " 正在执行任务 " + taskIndex);// 每个任务耗时1秒try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}});}executor.shutdown();
}
控制台打印:
20:09:50.032 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 1
20:09:50.032 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-2 正在执行任务 2
20:09:50.032 [pool-1-thread-3] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-3 正在执行任务 3
20:09:51.038 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-2 正在执行任务 5
20:09:51.038 [pool-1-thread-3] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-3 正在执行任务 4
/*** 固定大小线程池* * 优点:当任务执行较快,且任务较少时使用方便*
* * 风险:当处理较慢时,等待队列的任务堆积会导致OOM*
*/
public static void newFixThreadPool() {// 3个固定线程ExecutorService executorService = Executors.newFixedThreadPool(3);// 总共5个任务for (int i = 1; i <= 5; i++) {int taskIndex = i;executorService.execute(() -> {log.info("线程 " + Thread.currentThread().getName() + " 正在执行任务 " + taskIndex);// 每个任务耗时1秒try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}});}executorService.shutdown();
}
控制台打印:
20:16:27.040 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-2 正在执行任务 2
20:16:27.040 [pool-1-thread-3] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-3 正在执行任务 3
20:16:27.040 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 1
20:16:28.048 [pool-1-thread-3] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-3 正在执行任务 4
20:16:28.048 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-2 正在执行任务 5
前3个任务被同时执行,因为刚好有3个核心线程。后2个任务会被存放到阻塞队列,当执行前3个任务的某个线程空闲时会从队列中获取任务并执行。
/*** Creates a thread pool that reuses a fixed number of threads* operating off a shared unbounded queue. At any point, at most* {@code nThreads} threads will be active processing tasks.* If additional tasks are submitted when all threads are active,* they will wait in the queue until a thread is available.* If any thread terminates due to a failure during execution* prior to shutdown, a new one will take its place if needed to* execute subsequent tasks. The threads in the pool will exist* until it is explicitly {@link ExecutorService#shutdown shutdown}.** @param nThreads the number of threads in the pool* @return the newly created thread pool* @throws IllegalArgumentException if {@code nThreads <= 0}*/
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue());
}
该类型线程池的核心线程数和最大线程数为指定的参数,空闲线程的存活线程时间为0毫秒,等待队列使用LinkedBlockingQueue,初始化大小为Integer.MAX_VALUE(即:2147483647)。
当任务执行较慢时,阻塞队列存有大量的任务等待,这些任务会占用大量的内存,从而可能导致OOM。
/*** 单一线程池* * 优势:保存任务按照提交的顺序执行*
* * 风险:当处理较慢时,等待队列的任务堆积会导致OOM*
*/
public static void newSingleThreadExecutor() {// 1个线程ExecutorService executor = Executors.newSingleThreadExecutor();// 总共5个任务for (int i = 1; i <= 5; i++) {int taskIndex = i;executor.execute(() -> {log.info("线程 " + Thread.currentThread().getName() + " 正在执行任务 " + taskIndex);// 每个任务耗时1秒try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}});}executor.shutdown();
}
控制台打印:
20:31:04.970 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 1
20:31:05.974 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 2
20:31:06.974 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 3
20:31:07.975 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 4
20:31:08.976 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 5
所有任务按照提交的顺序执行。
/*** Creates an Executor that uses a single worker thread operating* off an unbounded queue. (Note however that if this single* thread terminates due to a failure during execution prior to* shutdown, a new one will take its place if needed to execute* subsequent tasks.) Tasks are guaranteed to execute* sequentially, and no more than one task will be active at any* given time. Unlike the otherwise equivalent* {@code newFixedThreadPool(1)} the returned executor is* guaranteed not to be reconfigurable to use additional threads.** @return the newly created single-threaded Executor*/
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue()));
}
该类型线程池的核心线程数和最大线程数都为1,空闲线程的存活线程时间为0毫秒,等待队列使用LinkedBlockingQueue,初始化大小为Integer.MAX_VALUE(即:2147483647)。
当任务执行较慢时,阻塞队列存有大量的任务等待,这些任务会占用大量的内存,从而可能导致OOM。
/*** 共享线程池* * 优势:当在某一时间段内任务较多,且执行较快时方便使用*
* * 风险:当处理较慢时,会创建大量的线程*
*/
public static void newCachedThreadPool() {ExecutorService executor = Executors.newCachedThreadPool();// 总共5个任务for (int i = 1; i <= 5; i++) {int taskIndex = i;executor.execute(() -> {log.info("线程 " + Thread.currentThread().getName() + " 正在执行任务 " + taskIndex);// 每个任务耗时1秒try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}});}executor.shutdown();
}
控制台打印:
20:45:31.351 [pool-1-thread-4] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-4 正在执行任务 4
20:45:31.351 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-1 正在执行任务 1
20:45:31.351 [pool-1-thread-5] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-5 正在执行任务 5
20:45:31.358 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-2 正在执行任务 2
20:45:31.359 [pool-1-thread-3] INFO com.c3stones.test.ThreadPoolTest - 线程 pool-1-thread-3 正在执行任务 3
每一个任务都创建了新的线程。
/*** Creates a thread pool that creates new threads as needed, but* will reuse previously constructed threads when they are* available. These pools will typically improve the performance* of programs that execute many short-lived asynchronous tasks.* Calls to {@code execute} will reuse previously constructed* threads if available. If no existing thread is available, a new* thread will be created and added to the pool. Threads that have* not been used for sixty seconds are terminated and removed from* the cache. Thus, a pool that remains idle for long enough will* not consume any resources. Note that pools with similar* properties but different details (for example, timeout parameters)* may be created using {@link ThreadPoolExecutor} constructors.** @return the newly created thread pool*/
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue());
}
该类型线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE(即:2147483647),空闲线程最大存活时间为60秒,等待队列使用SynchronousQueue,该队列不存储数据,只做转发,具体可参考:【面试专栏】Java 阻塞队列。
当任务较多或执行较慢时,会创建大量的线程,从而导致OOM。
/*** 定时线程池* * 优点:可以定时执行某些任务*
* * 风险:当处理较慢时,等待队列的任务堆积会导致OOM*
*/
public static void newScheduledThreadPool() {
// // 单一线程
// ExecutorService executor = Executors.newSingleThreadScheduledExecutor();// 指定核心线程数ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);executor.schedule(() -> {log.info("3秒后开始执行,以后不再执行");// 每个任务耗时1秒try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}, 3, TimeUnit.SECONDS);
//
// executor.scheduleAtFixedRate(() -> {
// log.info("3秒后开始执行,以后每2秒执行一次");
//
// // 每个任务耗时1秒
// try {
// TimeUnit.SECONDS.sleep(1);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }, 3, 2, TimeUnit.SECONDS);
//
// executor.scheduleWithFixedDelay(() -> {
// log.info("3秒后开始执行,以后延迟2秒执行一次");
//
// // 每个任务耗时1秒
// try {
// TimeUnit.SECONDS.sleep(1);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }, 3, 2, TimeUnit.SECONDS);
}
控制台打印 - 1:
21:18:46.494 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后不再执行
启动后3秒开始执行,执行完成后不再继续执行。
控制台打印 - 2:
21:22:47.078 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后每2秒执行一次
21:22:49.075 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后每2秒执行一次
21:22:51.075 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后每2秒执行一次
......
启动后3秒开始执行,以后每两秒执行一次。
控制台打印 - 3:
21:28:09.701 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后延迟2秒执行一次
21:28:12.705 [pool-1-thread-1] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后延迟2秒执行一次
21:28:15.707 [pool-1-thread-2] INFO com.c3stones.test.ThreadPoolTest - 3秒后开始执行,以后延迟2秒执行一次
启动后3秒开始执行,以后每次执行时间为任务的耗时时间加固定的延迟时间。
假设每次任务固定延迟2秒,第一次任务在第3秒开始执行,任务耗时1秒;第二次任务将在第一次完成后2秒开始执行(即第6秒),耗时2秒;第三次任务将在第二次完成后2秒开始执行(即第10秒),依次类推。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;/*** 自定义线程池配置类** @author CL*/
@Configuration
public class TaskExecutorConfig {/*** 自定义任务执行器** @return {@link TaskExecutor}*/@Beanpublic TaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 核心线程数,默认1int corePoolSize = Runtime.getRuntime().availableProcessors();executor.setCorePoolSize(corePoolSize);// 最大线程数,默认Integer.MAX_VALUEexecutor.setMaxPoolSize(corePoolSize * 2 + 1);// 空闲线程最大存活时间,默认60秒executor.setKeepAliveSeconds(3);// 等待队列及大小,默认Integer.MAX_VALUEexecutor.setQueueCapacity(500);// 线程的名称前缀,默认该Bean名称简写:org.springframework.util.ClassUtils.getShortName(java.lang.Class>)executor.setThreadNamePrefix("custom-thread-");// 当线程池达到最大时的处理策略,默认抛出RejectedExecutionHandler异常
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 抛出RejectedExecutionHandler异常executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 交由调用者的线程执行
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); // 丢掉最早未处理的任务
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); // 丢掉新提交的任务// 等待所有任务结束后再关闭线程池,默认falseexecutor.setWaitForTasksToCompleteOnShutdown(true);// 等待所有任务结束最长等待时间,默认0毫秒executor.setAwaitTerminationSeconds(10);// 执行初始化executor.initialize();return executor;}}
/*** 示例Service** @author CL*/
public interface DemoService {/*** 示例方法** @return {@link String}*/void demo();}
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Service;import javax.annotation.Resource;/*** 示例Service实现** @author CL*/
@Slf4j
@Service
public class DemoServiceImpl implements DemoService {@Resourceprivate TaskExecutor taskExecutor;/*** 示例方法*/@Overridepublic void demo() {taskExecutor.execute(() -> {log.info("线程 " + Thread.currentThread().getName() + " 正在执行Service中的方法");});}}
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;/*** 示例异步任务** @author CL*/
@Slf4j
@Component
@EnableAsync
public class DemoAsync {/*** 示例方法*/@Async(value = "taskExecutor")public void demo() {log.info("线程 " + Thread.currentThread().getName() + " 正在执行Async中的方法");}}
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;/*** 自定义定时任务调度配置类** @author CL*/
@Configuration
public class SheduledConfig implements SchedulingConfigurer {/*** 配置定时任务** @param scheduledTaskRegistrar 配置任务注册器*/@Overridepublic void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {scheduledTaskRegistrar.setScheduler(taskScheduler());// // 第二种方式
// scheduledTaskRegistrar.setScheduler(scheduledExecutorService());}/*** 自定义任务调度器** @return {@link TaskScheduler}*/@Beanpublic TaskScheduler taskScheduler() {ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();executor.setPoolSize(5);executor.setThreadNamePrefix("custom-scheduler-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}// /**
// * 自定义任务线程池
// *
// * @return {@link ScheduledExecutorService}
// */
// @Bean
// public ScheduledExecutorService scheduledExecutorService() {
// return Executors.newScheduledThreadPool(5);
// }}
6.2 测试
import com.c3tones.async.DemoAsync;
import com.c3tones.service.DemoService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** 示例Controller** @author CL*/
@Slf4j
@RestController
public class DemoController {@Resourceprivate DemoService demoService;@Resourceprivate DemoAsync demoAsync;/*** Service示例方法** @return {@link String}*/@RequestMapping("/service")public void service() {log.info("Service示例方法开始执行");demoService.demo();log.info("Service示例方法结束执行");}/*** 异步示例方法** @return {@link String}*/@RequestMapping("/async")public void async() {log.info("异步示例方法开始执行");demoAsync.demo();log.info("异步示例方法结束执行");}}
curl http://127.0.0.1:8080/service
控制台打印:
2023-03-19 22:26:26.896 INFO 136568 --- [nio-8080-exec-3] com.c3tones.controller.DemoController : Service示例方法开始执行
2023-03-19 22:26:26.897 INFO 136568 --- [nio-8080-exec-3] com.c3tones.controller.DemoController : Service示例方法结束执行
2023-03-19 22:26:26.897 INFO 136568 --- [custom-thread-1] com.c3tones.service.DemoServiceImpl : 线程 custom-thread-1 正在执行Service中的方法
调用接口同步打印日志,自定义线程异步执行任务。
curl http://127.0.0.1:8080/async
控制台打印:
2023-03-19 22:28:08.349 INFO 136568 --- [nio-8080-exec-7] com.c3tones.controller.DemoController : 异步示例方法开始执行
2023-03-19 22:28:08.355 INFO 136568 --- [nio-8080-exec-7] com.c3tones.controller.DemoController : 异步示例方法结束执行
2023-03-19 22:28:08.363 INFO 136568 --- [custom-thread-2] com.c3tones.async.DemoAsync : 线程 custom-thread-2 正在执行Async中的方法
调用接口同步打印日志,异步线程异步执行任务。
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;/*** 示例定时任务** @author CL*/
@Slf4j
@Component
@EnableScheduling
public class DemoScheduled {/*** 示例方法*/@Scheduled(cron = "0/3 * * * * ? ")public void demo() {log.info("线程 " + Thread.currentThread().getName() + " 正在执行Scheduled中的方法");}}
2023-03-19 22:30:24.002 INFO 136568 --- [tom-scheduler-3] com.c3tones.sheduled.DemoScheduled : 线程 custom-scheduler-3 正在执行Scheduled中的方法
2023-03-19 22:30:27.002 INFO 136568 --- [tom-scheduler-3] com.c3tones.sheduled.DemoScheduled : 线程 custom-scheduler-3 正在执行Scheduled中的方法
2023-03-19 22:30:30.001 INFO 136568 --- [tom-scheduler-3] com.c3tones.sheduled.DemoScheduled : 线程 custom-scheduler-3 正在执行Scheduled中的方法
定时任务从0秒开始,每3秒执行一次任务。thread-demo