|
@@ -4,6 +4,7 @@ import com.jay.monitor.data.core.enums.MsgTypeEnums;
|
|
|
import com.jay.monitor.data.server.config.props.MonitorProperties;
|
|
|
import com.jay.monitor.data.server.enums.DataSplitType;
|
|
|
import com.jay.monitor.data.server.enums.StoreType;
|
|
|
+import com.jay.monitor.data.server.factory.thread.TaskRunnable;
|
|
|
import com.jay.monitor.data.server.store.mysql.MysqlDDLProcess;
|
|
|
import com.jay.monitor.data.server.utils.MonitorPropertiesUtil;
|
|
|
import org.slf4j.Logger;
|
|
@@ -16,6 +17,7 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.LongAdder;
|
|
|
|
|
|
/**
|
|
@@ -24,7 +26,7 @@ import java.util.concurrent.atomic.LongAdder;
|
|
|
* @Author liukaixiong
|
|
|
* @Date 2020/10/29 13:33
|
|
|
*/
|
|
|
-public abstract class AbstractStoreProcess<T> implements Runnable, InitializingBean {
|
|
|
+public abstract class AbstractStoreProcess<T> implements TaskRunnable, InitializingBean {
|
|
|
private Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
|
|
|
private LongAdder counter = new LongAdder();
|
|
@@ -33,6 +35,8 @@ public abstract class AbstractStoreProcess<T> implements Runnable, InitializingB
|
|
|
|
|
|
private int batchSize = 500;
|
|
|
|
|
|
+ private volatile boolean isConsumer = true;
|
|
|
+
|
|
|
private BlockingQueue<T> queue = new ArrayBlockingQueue<>(queueSize);
|
|
|
|
|
|
@Autowired
|
|
@@ -73,14 +77,17 @@ public abstract class AbstractStoreProcess<T> implements Runnable, InitializingB
|
|
|
|
|
|
public Object store(T request) throws Exception {
|
|
|
|
|
|
+ if (!this.isConsumer) {
|
|
|
+ logger.debug("当前消费标记:" + this.isConsumer + " 不处理消息!");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
storeBefore(request);
|
|
|
|
|
|
boolean add = getQueue().offer(request);
|
|
|
|
|
|
if (!add) {
|
|
|
-
|
|
|
counter.decrement();
|
|
|
-
|
|
|
if (counter.intValue() == 0 || counter.intValue() > queueSize) {
|
|
|
logger.warn(getClass() + " 队列消费满了,装不下去了");
|
|
|
counter.reset();
|
|
@@ -137,7 +144,8 @@ public abstract class AbstractStoreProcess<T> implements Runnable, InitializingB
|
|
|
int batchSize = getBatchSize();
|
|
|
int minBatch = batchSize / 10;
|
|
|
int thresholdValue = (int) (defaultSize * 0.8);
|
|
|
- while (true) {
|
|
|
+ long startHour = TimeUnit.MILLISECONDS.toHours(System.currentTimeMillis());
|
|
|
+ while (isConsumer) {
|
|
|
try {
|
|
|
int size = this.queue.size();
|
|
|
if (size > 0) {
|
|
@@ -151,14 +159,27 @@ public abstract class AbstractStoreProcess<T> implements Runnable, InitializingB
|
|
|
// 如果数量不多的情况下
|
|
|
speedUpConsumer(this.queue, 1);
|
|
|
}
|
|
|
+ } else {
|
|
|
+ long currentHour = TimeUnit.MILLISECONDS.toHours(System.currentTimeMillis());
|
|
|
+ if (currentHour > startHour) {
|
|
|
+ logger.warn(" current queue size : " + size + "\t hour : " + currentHour);
|
|
|
+ startHour = currentHour;
|
|
|
+ }
|
|
|
}
|
|
|
- } catch (Exception e) {
|
|
|
- logger.warn("异步队列消费失败", e);
|
|
|
+ } catch (Throwable e) {
|
|
|
+ logger.error("异步队列消费失败", e);
|
|
|
sleep(5);
|
|
|
} finally {
|
|
|
sleep(5);
|
|
|
}
|
|
|
}
|
|
|
+ stop();
|
|
|
+ logger.warn(getThreadName() + " stop!!!!");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void stop() {
|
|
|
+ this.isConsumer = false;
|
|
|
}
|
|
|
|
|
|
private void sleep(int ms) {
|
|
@@ -170,9 +191,20 @@ public abstract class AbstractStoreProcess<T> implements Runnable, InitializingB
|
|
|
}
|
|
|
|
|
|
public void startConsumerThread() {
|
|
|
- String threadName = getClass().getSimpleName() + "-consumer-thread";
|
|
|
+ String threadName = getThreadName();
|
|
|
logger.info("开启消费线程:" + threadName);
|
|
|
- new Thread(this, threadName).start();
|
|
|
+ this.isConsumer = true;
|
|
|
+ Thread thread = new Thread(this, threadName);
|
|
|
+ thread.setUncaughtExceptionHandler((currentThread, throwable) -> {
|
|
|
+ logger.error("极端异常出现未捕获的情况:" + currentThread.getName(), throwable);
|
|
|
+ });
|
|
|
+ thread.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String getThreadName() {
|
|
|
+ String threadName = getClass().getSimpleName() + "-consumer-thread";
|
|
|
+ return threadName;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -192,12 +224,16 @@ public abstract class AbstractStoreProcess<T> implements Runnable, InitializingB
|
|
|
if (tableRuleMap != null) {
|
|
|
DataSplitType dataSplitType = tableRuleMap.get(tableName());
|
|
|
if (dataSplitType != null) {
|
|
|
- String dataId = ddlProcess.getDataId(tableName(), dataSplitType);
|
|
|
- return dataId;
|
|
|
+ return ddlProcess.getDataId(tableName(), dataSplitType);
|
|
|
}
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 处理的表名
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ */
|
|
|
public abstract String tableName();
|
|
|
}
|