Explorar o código

feature 动态设置分片量

xiahan hai 2 meses
pai
achega
21fe38a84b

+ 1 - 1
zhsw-common/src/main/java/com/rongwei/zhsw/system/config/TtlThreadPoolConfig.java

@@ -20,7 +20,7 @@ import java.util.concurrent.Executor;
 @EnableAsync
 public class TtlThreadPoolConfig {
     @Bean(name = "zhswThreadPool")
-    public Executor ttlTaskExecutor() {
+    public ThreadPoolTaskExecutor ttlTaskExecutor() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         executor.setCorePoolSize(16);
         executor.setMaxPoolSize(32);

+ 49 - 8
zhsw-common/src/main/java/com/rongwei/zhsw/system/service/impl/ImportExcelServiceImpl.java

@@ -7,7 +7,6 @@ import com.rongwe.zhsw.system.domain.SwWaterUsageEntryDo;
 import com.rongwe.zhsw.system.vo.ImportMeterReadingRecordVo;
 import com.rongwei.rwadmincommon.system.vo.SysUserVo;
 import com.rongwei.rwcommon.base.R;
-import com.rongwei.rwcommon.base.exception.CustomException;
 import com.rongwei.rwcommoncomponent.file.dao.SysFileItemDao;
 import com.rongwei.rwcommonentity.commonservers.domain.SysFileItemDo;
 import com.rongwei.zhsw.system.config.ContextHolder;
@@ -20,12 +19,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Service;
 
 import java.io.File;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import static com.rongwei.rwcommon.utils.UtilsChecks.parameterCheck;
 
@@ -49,9 +49,9 @@ public class ImportExcelServiceImpl implements ImportExcelService {
     private SqlSessionFactory sqlSessionFactory;
 
 
-    @Qualifier("zhswThreadPool")
     @Autowired
-    private Executor asyncTaskExecutor;
+    @Qualifier("zhswThreadPool")
+    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
 
     private static final Logger log = LoggerFactory.getLogger(ImportExcelServiceImpl.class);
 
@@ -90,9 +90,46 @@ public class ImportExcelServiceImpl implements ImportExcelService {
         return R.ok();
     }
 
+    /**
+     * 动态计算分片大小
+     *
+     * @param totalCount       总数据量
+     * @return 计算后的分片大小
+     */
+    private int calculateDynamicBatchSize(int totalCount, int availableThreads) {
+        // 基础配置参数(建议通过配置文件注入)
+         int MIN_BATCH_SIZE = 10;        // 最小分片大小
+         int MAX_BATCH_SIZE = 500;      // 最大分片大小
+         int DEFAULT_BATCH_SIZE = 100;   // 默认分片大小
+         int THREAD_THRESHOLD = 5;       // 线程数阈值(当可用线程数<=此值时使用默认大小)
+
+        // 特殊情况处理
+        if (totalCount == 0) return 0;
+        if (availableThreads <= 0) return DEFAULT_BATCH_SIZE;
+
+        // 动态计算逻辑
+        if (availableThreads >= THREAD_THRESHOLD) {
+            // 当可用线程数较多时,采用动态分片策略
+            int calculatedSize = (int) Math.ceil((double) totalCount / availableThreads);
+            return Math.min(Math.max(calculatedSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE);
+        } else {
+            // 当可用线程数较少时,使用默认分片大小
+            return DEFAULT_BATCH_SIZE;
+        }
+    }
+
     public void dataSaveAndPostProcessing(List<SwWaterUsageEntryDo> saveList, String dskey, Map<String, SwUserManagementDo> ownerMap) {
         long asyncTasksStart = System.currentTimeMillis();
-        List<List<SwWaterUsageEntryDo>> partition = Lists.partition(saveList, 1000); // 动态分片
+        // 获取线程池状态
+        ThreadPoolExecutor executor = threadPoolTaskExecutor.getThreadPoolExecutor();
+        int poolSize = executor.getMaximumPoolSize();
+        int activeCount = executor.getActiveCount();
+        int availableCount = poolSize - activeCount;
+
+        // 计算动态分片大小(带保护逻辑)
+        int batchSize = calculateDynamicBatchSize(saveList.size(), availableCount);
+        log.debug("获取到的分片大小{}", batchSize);
+        List<List<SwWaterUsageEntryDo>> partition = Lists.partition(saveList, batchSize); // 动态分片
         for (List<SwWaterUsageEntryDo> data : partition) {
             long startTime = System.currentTimeMillis();
             waterUsageEntryService.getBaseMapper().ListBatchSave(data, dskey);
@@ -111,9 +148,13 @@ public class ImportExcelServiceImpl implements ImportExcelService {
         //                 .toArray(CompletableFuture[]::new)
         // );
         // allFutures.thenRun(() -> {
-        ContextHolder.setValue("dsKey", dskey);
-        billGenerationService.asyncGenerateBill(saveList, ownerMap); // 同步生成账单
-        log.debug("数据保存总耗时: {}ms", System.currentTimeMillis() - asyncTasksStart);
+        // asyncTaskExecutor.get
+
+        partition.forEach(list -> {
+            ContextHolder.setValue("dsKey", dskey);
+            billGenerationService.asyncGenerateBill(list, ownerMap); // 异步生成账单
+            log.debug("数据保存总耗时: {}ms", System.currentTimeMillis() - asyncTasksStart);
+        });
         // });
     }