|
@@ -8,7 +8,6 @@ import com.alibaba.fastjson.JSON;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|
|
import com.google.common.collect.Lists;
|
|
|
import com.rongwe.scentity.domian.*;
|
|
|
-import com.rongwe.scentity.vo.CheckTemplateItemsVo;
|
|
|
import com.rongwe.scentity.vo.CompanyInfoVo;
|
|
|
import com.rongwe.scentity.vo.DistributeVo;
|
|
|
import com.rongwe.scentity.vo.FourColorSynchronizationVo;
|
|
@@ -17,7 +16,6 @@ import com.rongwei.rwcommonentity.commonservers.domain.SysFileItemDo;
|
|
|
import com.rongwei.sfcommon.sys.dao.RiskSynchronizationDao;
|
|
|
import com.rongwei.sfcommon.sys.service.*;
|
|
|
import com.rongwei.sfcommon.utils.Result;
|
|
|
-import com.sun.org.apache.xpath.internal.operations.Equals;
|
|
|
import org.apache.commons.beanutils.PropertyUtils;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
@@ -69,8 +67,9 @@ public class SynchronizationImpl implements SynchronizationService {
|
|
|
|
|
|
@Value("${third-party.service.ip}")
|
|
|
private String ip;
|
|
|
-
|
|
|
- private static final int BATCH_SIZE = 50;
|
|
|
+ @Value("${third-party.service.token}")
|
|
|
+ private String token;
|
|
|
+ private static final int BATCH_SIZE = 2;
|
|
|
|
|
|
|
|
|
/**
|
|
@@ -591,20 +590,7 @@ public class SynchronizationImpl implements SynchronizationService {
|
|
|
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- private ExecutorService createCustomThreadPool() {
|
|
|
- int corePoolSize = 4;
|
|
|
- int maxPoolSize = 8;
|
|
|
- long keepAliveTime = 60; // 空闲线程存活时间
|
|
|
-
|
|
|
- return new ThreadPoolExecutor(
|
|
|
- corePoolSize,
|
|
|
- maxPoolSize,
|
|
|
- keepAliveTime, TimeUnit.SECONDS,
|
|
|
- new LinkedBlockingQueue<>(100),
|
|
|
- new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
|
|
|
- );
|
|
|
- }
|
|
|
+
|
|
|
|
|
|
//2.四色图信息存在,分布信息不存在
|
|
|
//筛选出DistributeVo中imageBase64不为空的数据
|
|
@@ -740,14 +726,12 @@ public class SynchronizationImpl implements SynchronizationService {
|
|
|
String ip,
|
|
|
List<AspRiskMapDo> dataSave) {
|
|
|
|
|
|
- ExecutorService executorService = createCustomThreadPool();
|
|
|
+
|
|
|
|
|
|
try {
|
|
|
// 处理四色图同步任务
|
|
|
for (List<Map<String, Object>> syncData : fourColorSyncDataList) {
|
|
|
- Future<Result> future = executorService.submit(() -> dataPush(ip, FOURCOLORCONSTANT, syncData));
|
|
|
- Result result = future.get();// 等待结果
|
|
|
-
|
|
|
+ Result result = dataPush(ip, FOURCOLORCONSTANT, syncData, token);
|
|
|
AspRiskMapDo aspRiskMapDo;
|
|
|
if (Objects.equals(result.getStatus(), 200)) {
|
|
|
// 当前批次成功,记录日志和状态
|
|
@@ -771,12 +755,12 @@ public class SynchronizationImpl implements SynchronizationService {
|
|
|
dataSave.add(aspRiskMapDo);
|
|
|
}
|
|
|
}
|
|
|
+ Thread.sleep(5000);
|
|
|
}
|
|
|
|
|
|
// 处理分布信息同步任务
|
|
|
for (List<Map<String, Object>> syncData : distributeSyncDataList) {
|
|
|
- Future<Result> future = executorService.submit(() -> dataPush(ip, DISTRIBUTECONSTANT, syncData));
|
|
|
- Result result = future.get(); // 等待结果
|
|
|
+ Result result = dataPush(ip, DISTRIBUTECONSTANT, syncData, token);
|
|
|
|
|
|
//2.无论四色图信息是否同步成功,只要分布信息同步失败,状态为同步失败
|
|
|
if (!Objects.equals(result.getStatus(), 200)) {
|
|
@@ -790,6 +774,7 @@ public class SynchronizationImpl implements SynchronizationService {
|
|
|
dataSave.add(aspRiskMapDo);
|
|
|
}
|
|
|
}
|
|
|
+ Thread.sleep(5000);
|
|
|
}
|
|
|
|
|
|
// 处理校验失败的数据
|
|
@@ -804,9 +789,7 @@ public class SynchronizationImpl implements SynchronizationService {
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
- } finally {
|
|
|
- executorService.shutdown(); // 关闭线程池
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -823,14 +806,11 @@ public class SynchronizationImpl implements SynchronizationService {
|
|
|
List<AspRiskEventDo> eventDataSave = new ArrayList<>();
|
|
|
List<AspRiskControlMeasuresDo> controlMeasureDataSave = new ArrayList<>();
|
|
|
|
|
|
- ExecutorService executorService = createCustomThreadPool();
|
|
|
|
|
|
try {
|
|
|
// 处理单元同步任务
|
|
|
for (List<Map<String, Object>> syncData : unitPartitionedLists) {
|
|
|
- Future<Result> future = executorService.submit(() -> dataPush(ip, UNITCONSTANT, syncData));
|
|
|
- Result result = future.get(); // 等待结果
|
|
|
-
|
|
|
+ Result result = dataPush(ip, UNITCONSTANT, syncData, token);
|
|
|
if (Objects.equals(result.getStatus(), 200)) {
|
|
|
// 当前批次成功,记录日志和状态
|
|
|
for (Map<String, Object> waitSyncDatum : syncData) {
|
|
@@ -852,13 +832,12 @@ public class SynchronizationImpl implements SynchronizationService {
|
|
|
dataSave.add(aspRiskUnitDo);
|
|
|
}
|
|
|
}
|
|
|
+ Thread.sleep(5000);
|
|
|
}
|
|
|
|
|
|
// 处理事件信息同步任务
|
|
|
for (List<Map<String, Object>> syncData : eventPartitionedLists) {
|
|
|
- Future<Result> future = executorService.submit(() -> dataPush(ip, DISTRIBUTECONSTANT, syncData));
|
|
|
- Result result = future.get(); // 等待结果
|
|
|
-
|
|
|
+ Result result = dataPush(ip, DISTRIBUTECONSTANT, syncData, token);
|
|
|
//2.无论四色图信息是否同步成功,只要分布信息同步失败,状态为同步失败
|
|
|
if (!Objects.equals(result.getStatus(), 200)) {
|
|
|
// 分布信息同步失败,记录日志和状态
|
|
@@ -882,14 +861,15 @@ public class SynchronizationImpl implements SynchronizationService {
|
|
|
eventDataSave.add(aspRiskEventDo);
|
|
|
}
|
|
|
}
|
|
|
+ Thread.sleep(5000);
|
|
|
}
|
|
|
- if (eventDataSave.size() > 0) {
|
|
|
+ if (!eventDataSave.isEmpty()) {
|
|
|
aspRiskEventService.updateBatchById(eventDataSave);
|
|
|
}
|
|
|
// 处理措施信息同步任务
|
|
|
for (List<Map<String, Object>> syncData : controlMeasurePartitionedLists) {
|
|
|
- Future<Result> future = executorService.submit(() -> dataPush(ip, CONTROLMEASURECONSTANT, syncData));
|
|
|
- Result result = future.get();
|
|
|
+ Result result=dataPush(ip, CONTROLMEASURECONSTANT, syncData, token);
|
|
|
+
|
|
|
//2.无论四色图信息是否同步成功,只要分布信息同步失败,状态为同步失败
|
|
|
if (!Objects.equals(result.getStatus(), 200)) {
|
|
|
// 分布信息同步失败,记录日志和状态
|
|
@@ -915,9 +895,10 @@ public class SynchronizationImpl implements SynchronizationService {
|
|
|
controlMeasureDataSave.add(aspRiskControlMeasuresDo);
|
|
|
}
|
|
|
}
|
|
|
- if (controlMeasureDataSave.size() > 0) {
|
|
|
+ if (!controlMeasureDataSave.isEmpty()) {
|
|
|
aspRiskControlMeasuresService.updateBatchById(controlMeasureDataSave);
|
|
|
}
|
|
|
+ Thread.sleep(5000);
|
|
|
}
|
|
|
|
|
|
// 处理校验失败的数据
|
|
@@ -932,8 +913,6 @@ public class SynchronizationImpl implements SynchronizationService {
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
- } finally {
|
|
|
- executorService.shutdown(); // 关闭线程池
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -948,14 +927,10 @@ public class SynchronizationImpl implements SynchronizationService {
|
|
|
String ip,
|
|
|
List<CheckTemplateDo> dataSave) {
|
|
|
|
|
|
- ExecutorService executorService = createCustomThreadPool();
|
|
|
-
|
|
|
try {
|
|
|
// 处理安全维护同步任务
|
|
|
for (List<Map<String, Object>> syncData : safeMaintenanceSyncDataList) {
|
|
|
- Future<Result> future = executorService.submit(() -> dataPush(ip, SAFEMAINTENANCECONSTANT, syncData));
|
|
|
- Result result = future.get();// 等待结果
|
|
|
-
|
|
|
+ Result result= dataPush(ip, SAFEMAINTENANCECONSTANT, syncData, token);
|
|
|
CheckTemplateDo checkTemplateDo;
|
|
|
if (Objects.equals(result.getStatus(), 200)) {
|
|
|
// 当前批次成功,记录日志和状态
|
|
@@ -979,6 +954,7 @@ public class SynchronizationImpl implements SynchronizationService {
|
|
|
dataSave.add(checkTemplateDo);
|
|
|
}
|
|
|
}
|
|
|
+ Thread.sleep(5000);
|
|
|
}
|
|
|
|
|
|
// 处理校验失败的数据
|
|
@@ -993,8 +969,6 @@ public class SynchronizationImpl implements SynchronizationService {
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
- } finally {
|
|
|
- executorService.shutdown(); // 关闭线程池
|
|
|
}
|
|
|
}
|
|
|
|