返回顶部
分享到

Java使用多线程处理未知任务数

java 来源:互联网 作者:佚名 发布时间:2025-03-21 21:20:37 人浏览
摘要

知道任务个数,你可以定义好线程数规则,生成线程数去跑 代码说明: 1.虚拟线程池: 使用 Executors.newVirtualThreadPerTaskExecutor() 创建虚拟线程池,每个任务将分配一个虚拟线程来执行。 2.提交任

知道任务个数,你可以定义好线程数规则,生成线程数去跑

代码说明:

1.虚拟线程池:

使用 Executors.newVirtualThreadPerTaskExecutor() 创建虚拟线程池,每个任务将分配一个虚拟线程来执行。

2.提交任务并返回结果:

  • 每个任务通过 CompletableFuture.supplyAsync() 提交,任务会返回一个结果(这里是字符串,模拟了任务的处理结果)。
  • 每个 CompletableFuture 都会保存任务的返回值。

3.等待所有任务完成:

使用 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) 等待所有的 CompletableFuture 完成。allOf.join() 会阻塞当前线程,直到所有任务完成。

4.收集结果:

  • 使用 Java 8 的 stream() 方法和 Collectors.toList() 来收集所有任务的结果,并将它们合并到一个 List 中。
  • CompletableFuture::join 会获取每个任务的结果,并且如果任务有异常,它会抛出 CompletionException,因此你可以根据需要进行异常处理。

5.关闭虚拟线程池:

最后,通过 executorService.shutdown() 关闭线程池,释放资源。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

public static void main(String[] args) throws InterruptedException {

        // 创建虚拟线程的线程池

        ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();

 

        // 假设我们有10个任务,每个任务返回一个字符串

        int numTasks = 10;

        List<CompletableFuture<String>> futures = new ArrayList<>(numTasks);

 

        // 提交任务到虚拟线程池

        for (int i = 0; i < numTasks; i++) {

            int taskId = i;

            // 将每个任务的结果放入 CompletableFuture 中

            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {

                try {

                    // 模拟工作

                    System.out.println("Task " + taskId + " started on " + Thread.currentThread());

                    Thread.sleep(1000);  // 模拟延迟

                    String result = "Result of task " + taskId;

                    System.out.println("Task " + taskId + " completed on " + Thread.currentThread());

                    return result;

                } catch (InterruptedException e) {

                    Thread.currentThread().interrupt();

                    return "Task " + taskId + " was interrupted";

                }

            }, executorService);

 

            futures.add(future);  // 将每个 future 加入集合

        }

 

        // 等待所有任务完成并获取结果

        CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

        allOf.join();  // 阻塞直到所有任务完成

 

        // 合并所有任务的结果到一个集合中

        List<String> results = futures.stream()

                                      .map(CompletableFuture::join)  // 获取每个任务的结果

                                      .collect(Collectors.toList());  // 合并到列表

 

        // 打印结果

        System.out.println("All results: " + results);

 

        // 关闭虚拟线程池

        executorService.shutdown();

    }

Java 不确定线程数,要异步多线程执行,还要等待所有线程执行结束,然后获取结果合并

解释:

任务列表 (tasks):我们创建了一个 List<Callable> 来保存所有要执行的异步任务,每个任务返回一个 Integer 结果。

创建线程池:使用 Executors.newFixedThreadPool(5) 创建了一个大小为 5 的线程池,可以并发执行 5 个线程。线程池的大小可以根据实际需要动态调整。

提交任务并获取 Future 列表:executorService.invokeAll(tasks) 方法会提交所有任务,并返回一个 List<Future>。每个 Future 对象代表一个异步任务的结果。

等待任务完成并合并结果:通过 future.get() 方法阻塞当前线程,直到任务完成并返回结果。我们在 sum 中累加所有任务的结果。

关闭线程池:最后,使用 executorService.shutdown() 关闭线程池,确保所有线程在任务完成后能够被正确回收。

重要事项:

  • invokeAll():会阻塞当前线程,直到所有任务完成。如果任务执行的时间不确定,使用 invokeAll() 是比较合适的,它会等待所有任务完成,并返回 Future 列表。
  • Future.get():该方法会阻塞当前线程,直到任务完成。如果任务执行有异常,get() 会抛出异常。
  • 线程池管理:使用 ExecutorService 方便管理线程池的大小,避免频繁创建和销毁线程带来的性能损失。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

public static void main(String[] args) throws InterruptedException, ExecutionException {

        // 假设我们有一些任务需要并发执行

        List<Callable<Integer>> tasks = new ArrayList<>();

         

        // 创建一些任务

        for (int i = 0; i < 10; i++) {

            final int taskId = i;

            tasks.add(() -> {

                // 模拟任务执行,返回一个结果

                Thread.sleep(1000);  // 模拟任务耗时

                return taskId * 2;   // 假设任务返回 taskId 的 2 倍

            });

        }

 

        // 创建一个固定大小的线程池

        ExecutorService executorService = Executors.newFixedThreadPool(5);

 

        try {

            // 提交所有任务并返回一个 Future 列表

            List<Future<Integer>> futures = executorService.invokeAll(tasks);

 

            // 等待所有任务完成并合并结果

            int sum = 0;

            for (Future<Integer> future : futures) {

                sum += future.get();  // 获取任务结果并合并

            }

 

            // 输出所有任务的合并结果

            System.out.println("Total sum: " + sum);

 

        } finally {

            // 关闭线程池

            executorService.shutdown();

        }

    }

实际案例 多线程调API然后合并API的结果返回给前端

1.声明任务队列集合

1

2

3

4

/*变量值对应Map*/

List<VarResultDto> results = new ArrayList<>();

// 假设我们有一些任务需要并发执行

List<Callable<Map<String, Object>>> tasks = new ArrayList<>();

2.将任务加入然后加入任务队列

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

tasks.add(() -> {

                 Map<String, Object> respTask = new HashMap<>();

                 List<VarResultDto> listTaskResp = new ArrayList<>();

                 List<String> listTaskError = new ArrayList<>();

                 try {

                     log.info("执行API请求{} apiId:[{}]", vo.getApiUrl(), vo.getId());

                     /*请求API获取结果*/

                     R<Object> objectR = apiDataInfoService.executeApi(vo);

                     // 解析结果

                     JSONObject apiResp = JSONUtil.parseObj(objectR);

                     if (apiResp.getInt("code") == 200 || apiResp.getInt("code") == 0) {

                         apiResp = apiResp.getJSONObject("data");

                     }

                     // JavaScript数据处理

                     if (StringUtils.isNotBlank(apiVarInfoDto.getJs())) {

                         try {

                             String newJson = SpringUtils.execJavaScript(JSON.toJSONString(apiResp), apiVarInfoDto.getJs());

                             apiResp = JSONUtil.parseObj(newJson);

                             log.info("JavaScript数据处理完成");

                         } catch (Exception e) {

                             log.warn("JavaScript数据处理异常: {}", JSON.toJSONString(apiVarInfoDto));

                         }

                     }

 

                     final JSONObject tempData = apiResp;

                     relations.forEach(relation -> {

                         String value = JSONUtil.getByPath(tempData, relation.getResult(), "");

                         if (StringUtils.isNotBlank(value)) {

                             // *设置变量及实际值*

                             VarResultDto resultDto = new VarResultDto();

                             resultDto.setId(relation.getId());

                             resultDto.setName(relation.getName());

                             resultDto.setResult(value);

                             listTaskResp.add(resultDto);

                         } else {

                             String error = "API接口:[" + vo.getApiName() + "]无法取得变量:[" + relation.getName() + "]有效数据,原因:[" + "API地址:" + apiDataInfo.getApiUrl() + "->返回错误:" + tempData.toString() + "]";

                             listTaskError.add(error);

                         }

                     });

                     respTask.put("results", listTaskResp);

                     respTask.put("errorLogs", listTaskError);

                 } catch (Exception e) {

                     log.error("请求API->{}失败!{}", vo.getApiUrl(), e.getMessage(), e);

                     boolean contains = e.getMessage().contains("TIMEOUT");

                     /*记录错误结果*/

                     relations.forEach(relation -> {

                         String error = "API接口:[" + vo.getApiName() + "]无法取得变量:[" + relation.getName() + "]有效数据,原因:[" + (contains ? "数据接口获取超时" : e.getMessage()) + "]";

                         listTaskError.add(error);

                     });

                     respTask.put("errorLogs", listTaskError);

                 }

                 return respTask;

             });

3.提交任务去执行,获取所有任务的结果,合并结果

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

String defaultThreadPoolSize = configService.getConfigValue("api_fork_join_size", "5");

       // 创建一个固定大小的线程池

       try (ExecutorService executorService = Executors.newFixedThreadPool(Integer.parseInt(defaultThreadPoolSize))) {

           try {

               // 提交所有任务并返回一个 Future 列表

               List<Future<Map<String, Object>>> futures = executorService.invokeAll(tasks);

 

               // 等待所有任务完成并合并结果

               List<Map<String, Object>> sum = new ArrayList<>();

               for (Future<Map<String, Object>> future : futures) {

                   // 获取任务结果并合并

                   sum.add(future.get());

               }

               // 输出所有任务的合并结果

               for (Map<String, Object> stringObjectMap : sum) {

                   Object results1 = stringObjectMap.get("results");

                   if (results1 != null) {

                       results.addAll((List<VarResultDto>) results1);

                   }

                   Object errorLogs1 = stringObjectMap.get("errorLogs");

                   if (errorLogs1 != null) {

                       errorLogs.addAll((List<String>) errorLogs1);

                   }

               }

           } catch (Exception e) {

               log.error("多线程---并行处理--出错了{}", e.getMessage(), e);

           } finally {

               // 关闭线程池

               executorService.shutdown();

           }

       }


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 :
相关文章
  • SpringBoot2.6.x与Swagger3兼容问题及解决方法
    报错: Failed to start bean documentationPluginsBootstrapper; nested exception is java.lang.NullPointerException 解决: ? 如果项目中未引入spring-boot-starter-actuator,
  • Java使用多线程处理未知任务数
    知道任务个数,你可以定义好线程数规则,生成线程数去跑 代码说明: 1.虚拟线程池: 使用 Executors.newVirtualThreadPerTaskExecutor() 创建虚拟线程
  • Java高级-反射&动态代理介绍
    1. 反射 1.1 反射的概述 专业的解释(了解一下): 是在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法; 对于任意一个
  • 使用EasyPoi实现多Sheet页导出的代码
    因多次遇到导出多Sheet页的需求,故记录下来,以备后续参考使用 一、Pom依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 !-- 集成easypoi组件 .导出excel ht
  • idea导入若依项目教程

    idea导入若依项目教程
    IDEA导入若依管理系统 项目官网地址:https://gitee.com/y_project/RuoYi-Vue 前提 系统需求: JDK = 1.8 MySQL = 5.5 Maven = 3.0 redis 必须启动(可以下载一个
  • Java中实现订单超时自动取消功能(最新推荐)

    Java中实现订单超时自动取消功能(最新推荐)
    在开发中,我们会遇到需要延时任务的业务场景,例如:用户下单之后未在规定的时间内支付成功,该订单会自动取消; 用户注册成功15分
  • 阿里巴巴TransmittableThreadLocal使用介绍
    ThreadLocal在上下文的数据传输上非常的方便和简洁。 工业实践中,比较常用的有三个,ThreadLocal、InheritableThreadLocal、TransmittableThreadLocal,那
  • SpringBoot使用Jackson介绍
    概述 Springboot配置JackSon处理类属性,JavaBean序列化为JSON格式,常用框架:阿里fastjson,谷歌gson、Jackson等。 ① 性能:Jackson Fastjson Gson 同个结
  • springboot结合JWT实现单点登录的代码

    springboot结合JWT实现单点登录的代码
    JWT实现单点登录 登录流程: 校验用户名密码-生成随机JWT Token-返回给前端。之后前端发请求携带该Token就能验证是哪个用户了。 校验流程:
  • java正则表达式匹配Matcher类的使用
    Matcher类 用法 在 Java 中,Matcher类是用于匹配正则表达式的工具,而group()方法是Matcher类中的一个重要方法,用于提取匹配结果中的捕获组(
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计