|
@@ -7,6 +7,8 @@ import com.elab.core.async.pruducer.ITaskProducer;
|
|
|
import com.elab.log.ext.CatSupplier;
|
|
|
import com.elab.spring.config.prop.ThreadPoolProperties;
|
|
|
import com.elab.spring.config.prop.ThreadProperties;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.BeansException;
|
|
|
import org.springframework.beans.factory.InitializingBean;
|
|
|
import org.springframework.context.ApplicationContext;
|
|
@@ -24,7 +26,7 @@ import java.util.function.Supplier;
|
|
|
* @Date 2020/11/18 15:51
|
|
|
*/
|
|
|
public class ThreadProcessUtils implements ApplicationContextAware, InitializingBean {
|
|
|
-
|
|
|
+ private static Logger logger = LoggerFactory.getLogger(ThreadProcessUtils.class);
|
|
|
private ApplicationContext applicationContext;
|
|
|
|
|
|
private static ITaskProducer taskProducer;
|
|
@@ -53,10 +55,12 @@ public class ThreadProcessUtils implements ApplicationContextAware, Initializing
|
|
|
this.applicationContext = applicationContext;
|
|
|
}
|
|
|
|
|
|
- public static void addRealTimeQueue(RealTaskExecutor taskExecutor, TaskStoreData taskStoreData) {
|
|
|
+ public static boolean addRealTimeQueue(RealTaskExecutor taskExecutor, TaskStoreData taskStoreData) {
|
|
|
if (taskProducer != null) {
|
|
|
- taskProducer.sendRealTimeQueue(taskExecutor, taskStoreData);
|
|
|
+ return taskProducer.sendRealTimeQueue(taskExecutor, taskStoreData);
|
|
|
}
|
|
|
+ logger.warn("加入队列失败,taskProducer 为空!");
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -91,7 +95,8 @@ public class ThreadProcessUtils implements ApplicationContextAware, Initializing
|
|
|
* @return 阻塞
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
- public static <U> CompletableFuture<U>[] supplyAsync(boolean isBlockResult, Long timeOutMs, Supplier<U>... supplier) throws Exception {
|
|
|
+ public static <U> CompletableFuture<U>[] supplyAsync(boolean isBlockResult, Long timeOutMs, Supplier<U>... supplier)
|
|
|
+ throws Exception {
|
|
|
CompletableFuture<U>[] result = new CompletableFuture[supplier.length];
|
|
|
for (int i = 0; i < supplier.length; i++) {
|
|
|
CompletableFuture<U> completableFuture = supplyAsync(supplier[i]);
|
|
@@ -103,8 +108,8 @@ public class ThreadProcessUtils implements ApplicationContextAware, Initializing
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- public static <U> Map<String, CompletableFuture<U>> supplyAsync(Map<String, Supplier<U>> taskGroupMap) throws Exception {
|
|
|
+ public static <U> Map<String, CompletableFuture<U>> supplyAsync(Map<String, Supplier<U>> taskGroupMap)
|
|
|
+ throws Exception {
|
|
|
return supplyAsync(true, taskGroupMap);
|
|
|
}
|
|
|
|
|
@@ -117,21 +122,23 @@ public class ThreadProcessUtils implements ApplicationContextAware, Initializing
|
|
|
* @return
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
- public static <U> Map<String, CompletableFuture<U>> supplyAsync(boolean isBlockResult, Map<String, Supplier<U>> taskGroupMap) throws Exception {
|
|
|
+ public static <U> Map<String, CompletableFuture<U>> supplyAsync(boolean isBlockResult,
|
|
|
+ Map<String, Supplier<U>> taskGroupMap) throws Exception {
|
|
|
return supplyAsync(isBlockResult, taskGroupMap, null);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 异步调用
|
|
|
*
|
|
|
- * @param isBlockResult 是否阻塞直到结果获取完成
|
|
|
- * @param taskGroupMap 任务组
|
|
|
- * @param timeOutMs 等待获取时间
|
|
|
+ * @param isBlockResult 是否阻塞直到结果获取完成
|
|
|
+ * @param taskGroupMap 任务组
|
|
|
+ * @param timeOutMs 等待获取时间
|
|
|
* @param <U>
|
|
|
* @return
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
- public static <U> Map<String, CompletableFuture<U>> supplyAsync(boolean isBlockResult, Map<String, Supplier<U>> taskGroupMap, Long timeOutMs) throws Exception {
|
|
|
+ public static <U> Map<String, CompletableFuture<U>> supplyAsync(boolean isBlockResult,
|
|
|
+ Map<String, Supplier<U>> taskGroupMap, Long timeOutMs) throws Exception {
|
|
|
CompletableFuture<U>[] result = new CompletableFuture[taskGroupMap.size()];
|
|
|
|
|
|
Map<String, CompletableFuture<U>> resultMap = new HashMap<>();
|
|
@@ -148,7 +155,8 @@ public class ThreadProcessUtils implements ApplicationContextAware, Initializing
|
|
|
return resultMap;
|
|
|
}
|
|
|
|
|
|
- private static <U> void allOf(boolean isBlockResult, Long timeOutMs, CompletableFuture<U>[] result) throws InterruptedException, ExecutionException, TimeoutException {
|
|
|
+ private static <U> void allOf(boolean isBlockResult, Long timeOutMs, CompletableFuture<U>[] result)
|
|
|
+ throws InterruptedException, ExecutionException, TimeoutException {
|
|
|
if (isBlockResult) {
|
|
|
CompletableFuture<Void> completableFuture = CompletableFuture.allOf(result);
|
|
|
if (timeOutMs != null) {
|
|
@@ -165,18 +173,21 @@ public class ThreadProcessUtils implements ApplicationContextAware, Initializing
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
@Override
|
|
|
public void afterPropertiesSet() throws Exception {
|
|
|
taskProducer = this.applicationContext.getBean(ITaskProducer.class);
|
|
|
this.threadProperties = this.applicationContext.getBean(ThreadProperties.class);
|
|
|
if (this.threadProperties != null && this.threadProperties.getThreadPool() != null) {
|
|
|
ThreadPoolProperties threadPool = this.threadProperties.getThreadPool();
|
|
|
- ExecutorService executorService = new ThreadPoolExecutor(threadPool.getCorePoolSize(), threadPool.getMaximumPoolSize(), threadPool.getKeepAliveTime(), TimeUnit.SECONDS, new ArrayBlockingQueue(threadPool.getQueueSize()));
|
|
|
+ ExecutorService executorService =
|
|
|
+ new ThreadPoolExecutor(threadPool.getCorePoolSize(), threadPool.getMaximumPoolSize(),
|
|
|
+ threadPool.getKeepAliveTime(), TimeUnit.SECONDS, new ArrayBlockingQueue(threadPool.getQueueSize()));
|
|
|
setExecutorService(executorService);
|
|
|
} else {
|
|
|
ThreadPoolProperties threadPool = new ThreadPoolProperties();
|
|
|
- ExecutorService executorService = new ThreadPoolExecutor(threadPool.getCorePoolSize(), threadPool.getMaximumPoolSize(), threadPool.getKeepAliveTime(), TimeUnit.SECONDS, new ArrayBlockingQueue(threadPool.getQueueSize()));
|
|
|
+ ExecutorService executorService =
|
|
|
+ new ThreadPoolExecutor(threadPool.getCorePoolSize(), threadPool.getMaximumPoolSize(),
|
|
|
+ threadPool.getKeepAliveTime(), TimeUnit.SECONDS, new ArrayBlockingQueue(threadPool.getQueueSize()));
|
|
|
setExecutorService(executorService);
|
|
|
}
|
|
|
}
|