22
33import lombok .RequiredArgsConstructor ;
44import lombok .extern .slf4j .Slf4j ;
5+ import org .dromara .dynamictp .core .DtpRegistry ;
6+ import org .dromara .dynamictp .core .spring .DtpLifecycle ;
7+ import org .dromara .dynamictp .core .support .ExecutorAdapter ;
8+ import org .dromara .dynamictp .core .support .ExecutorWrapper ;
59import org .dromara .dynamictp .core .thread .DtpExecutor ;
610import org .springframework .aop .interceptor .AsyncUncaughtExceptionHandler ;
11+ import org .springframework .beans .BeansException ;
12+ import org .springframework .beans .factory .config .BeanPostProcessor ;
13+ import org .springframework .context .Lifecycle ;
14+ import org .springframework .context .SmartLifecycle ;
715import org .springframework .context .annotation .Bean ;
816import org .springframework .context .annotation .Configuration ;
917import org .springframework .scheduling .annotation .AsyncConfigurer ;
1220import org .springframework .scheduling .annotation .SchedulingConfigurer ;
1321import org .springframework .scheduling .concurrent .ThreadPoolTaskScheduler ;
1422
23+ import java .util .Collections ;
24+ import java .util .List ;
1525import java .util .concurrent .Executor ;
26+ import java .util .concurrent .ForkJoinPool ;
27+ import java .util .concurrent .ThreadPoolExecutor ;
28+ import java .util .concurrent .TimeUnit ;
29+ import java .util .concurrent .atomic .AtomicBoolean ;
30+ import java .util .stream .Collectors ;
1631
1732
1833/**
2843@ Configuration
2944public class ThreadPoolConfig {
3045
31- private final DtpExecutor asyncExecutor ;
32-
3346 /**
34- * Spring 定时任务线程池
47+ * Spring 定时任务线程池 @Scheduled
3548 */
36- @ Bean (name = "taskScheduler" , destroyMethod = "shutdown" )
49+ @ Bean (name = "taskScheduler" )
3750 public ThreadPoolTaskScheduler taskScheduler () {
3851 ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler ();
3952 scheduler .setPoolSize (Runtime .getRuntime ().availableProcessors ());
@@ -51,8 +64,11 @@ public SchedulingConfigurer schedulingConfigurer(ThreadPoolTaskScheduler taskSch
5164 return taskRegistrar -> taskRegistrar .setTaskScheduler (taskScheduler );
5265 }
5366
67+ /**
68+ * spring 异步任务线程池 @Async
69+ */
5470 @ Bean
55- public AsyncConfigurer asyncConfigurer () {
71+ public AsyncConfigurer asyncConfigurer (DtpExecutor asyncExecutor ) {
5672 return new AsyncConfigurer () {
5773 @ Override
5874 public Executor getAsyncExecutor () {
@@ -67,4 +83,114 @@ public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
6783 }
6884 };
6985 }
86+
87+ /**
88+ * 定时任务线程池需要在普通线程池之前关闭,因为定时任务可能向其他线程池提交任务
89+ * 注:如果项目中存在普通线程池也向定时任务线程池中提交任务的情况,可能需要根据依赖关系细化定制关闭顺序
90+ */
91+ @ Bean
92+ public Lifecycle taskSchedulerLifeCycle (ThreadPoolTaskScheduler taskScheduler ) {
93+ return new TaskSchedulerLifeCycle (taskScheduler );
94+ }
95+
96+ /**
97+ * DynamicTp DtpLifCycle 后置处理,优雅关闭
98+ */
99+ @ Bean
100+ public BeanPostProcessor executorLifecyclePostProcess () {
101+ return new BeanPostProcessor () {
102+ @ Override
103+ public Object postProcessBeforeInitialization (Object bean , String beanName ) throws BeansException {
104+ if (bean instanceof DtpLifecycle ) {
105+ return new ExecutorLifeCycle ();
106+ }
107+ return bean ;
108+ }
109+ };
110+ }
111+
112+ /**
113+ * 判断线程池是否还有正则执行的任务
114+ */
115+ public static boolean isPoolActive (Executor executor ) {
116+ // 兼容 DtpExecutor
117+ if (executor instanceof ExecutorAdapter ) {
118+ ExecutorAdapter <?> adapter = (ExecutorAdapter <?>) executor ;
119+ return adapter .getActiveCount () == 0 ;
120+ } else if (executor instanceof ThreadPoolExecutor ) {
121+ ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor ) executor ;
122+ return threadPoolExecutor .getActiveCount () == 0 ;
123+ } else if (executor instanceof ForkJoinPool ) {
124+ ForkJoinPool forkJoinPool = (ForkJoinPool ) executor ;
125+ return forkJoinPool .getActiveThreadCount () == 0
126+ && forkJoinPool .getRunningThreadCount () == 0
127+ && forkJoinPool .getQueuedTaskCount () == 0
128+ && forkJoinPool .getQueuedSubmissionCount () == 0 ;
129+ }
130+ return true ;
131+ }
132+
133+ @ Slf4j
134+ public static class TaskSchedulerLifeCycle implements SmartLifecycle {
135+ private final AtomicBoolean running = new AtomicBoolean (false );
136+
137+ private final ThreadPoolTaskScheduler taskScheduler ;
138+
139+ public TaskSchedulerLifeCycle (ThreadPoolTaskScheduler taskScheduler ) {
140+ this .taskScheduler = taskScheduler ;
141+ }
142+
143+ @ Override
144+ public void start () {
145+ running .compareAndSet (false , true );
146+ }
147+
148+ @ Override
149+ public void stop () {
150+ log .info ("LifeCycle preparing to stop taskScheduler" );
151+ if (this .running .compareAndSet (true , false )) {
152+ taskScheduler .shutdown ();
153+ }
154+ }
155+
156+ @ Override
157+ public boolean isRunning () {
158+ return running .get ();
159+ }
160+ }
161+
162+ @ Slf4j
163+ public static class ExecutorLifeCycle extends DtpLifecycle {
164+ @ Override
165+ public int getPhase () {
166+ // webServer、taskScheduler、mq 的 phase 是 DEFAULT_PHASE
167+ // 在 webServer、taskScheduler、mq 关闭之后再关闭
168+ return SmartLifecycle .DEFAULT_PHASE - 1 ;
169+ }
170+
171+ @ Override
172+ public void stop () {
173+ log .info ("LifeCycle preparing to stop all thread pool" );
174+ // 由于存在线程池互相提交现象,不能直接全部 shutdown
175+ List <? extends ExecutorAdapter <?>> executorList = DtpRegistry .listAllExecutors ().values ().stream ()
176+ .map (ExecutorWrapper ::getExecutor ).collect (Collectors .toList ());
177+ // 打乱重复校验 5 次
178+ for (int i = 0 ; i < 5 ; ) {
179+ Collections .shuffle (executorList );
180+ if (executorList .stream ().allMatch (ThreadPoolConfig ::isPoolActive )) {
181+ i ++;
182+ log .info ("LifeCycle check all thread pool completed, check order is {}" , i );
183+ } else {
184+ i = 0 ;
185+ log .info ("LifeCycle check not all thread pool completed, wait 1s" );
186+ try {
187+ TimeUnit .SECONDS .sleep (1 );
188+ } catch (InterruptedException ignored ) {
189+ //
190+ }
191+ }
192+ }
193+ super .stop ();
194+ }
195+ }
70196}
0 commit comments