1500字范文,内容丰富有趣,写作好帮手!
1500字范文 > Java使用多线程处理任务等待任务全部执行

Java使用多线程处理任务等待任务全部执行

时间:2023-03-03 12:56:09

相关推荐

Java使用多线程处理任务等待任务全部执行

日常的批量处理任务中,经常需要使用多线程同时处理大量任务,一次读取一定数量的数据,然后放入线程池中等待线程处理完成,再取一定数量数据进行循环处理。

效率比较低的方式是使用同步的for循环进行处理

其次就是使用多线程处理。一般情况使用多线程都会使用线程池来管理,有些情况下,不能把大量任务一次性丢进线程池中,以为内存有限,一般线程池的阻塞队列也是有界的,超出限制可能OOM或者触发拒绝策略,因此需要分批处理,假设一次性读取5000条数据,则需要先等待线程池处理完这5000条数据再进行下一次处理。这时候我们需要确认开启的多线程中的子任务全部结束,再让主线程去执行下一次处理。

大致总结的几种处理方案代码示例如下,本人水平有限,欢迎各位大佬指点留言,谢谢!

private static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool(2 * CPU_COUNT);/**1. 使用CountDownLatch*/public void testCountDownLatch() {//模拟查询到数据库中待处理数据List batchList = new ArrayList<>();for (int i = 0; i < 10; i++) {batchList.add(new java.lang.Object());}if (CollectionUtils.isEmpty(batchList)) {return;}log.info("开始本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now());final CountDownLatch countDownLatch = new CountDownLatch(batchList.size());batchList.forEach(Object -> FORK_JOIN_POOL.execute(() -> {try {TimeUnit.SECONDS.sleep(new Random().nextInt(10));log.info("当前线程休眠完成");countDownLatch.countDown();} catch (Exception e) {log.error("异常", e);}}));try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}log.info("完成本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now());}/**2. 使用CyclicBarrier*/public void testCyclicBarrier() {//模拟查询到数据库中待处理数据List<Object> batchList = new ArrayList<>();for (int i = 0; i < 10; i++) {batchList.add(new Object());}if (CollectionUtils.isEmpty(batchList)) {return;}log.info("开始本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now());final CyclicBarrier cyclicBarrier = new CyclicBarrier(batchList.size(), () -> {log.info("完成本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now());testCyclicBarrier();});batchList.forEach(Object -> FORK_JOIN_POOL.execute(() -> {try {TimeUnit.SECONDS.sleep(new Random().nextInt(10));log.info("当前线程休眠完成");cyclicBarrier.await();} catch (Throwable e) {log.error("异常", e);}}));}/**3. 使用CompletionService*/public void testCompletionService() {for (int j = 0; j < 3; j++) {List<Object> batchList = new ArrayList<>();for (int i = 0; i < 10; i++) {batchList.add(new Object());}if (CollectionUtils.isEmpty(batchList)) {return;}log.info("开始本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now());CompletionService completionService = new ExecutorCompletionService(FORK_JOIN_POOL);batchList.forEach(Object -> {completionService.submit(() -> {try {TimeUnit.SECONDS.sleep(new Random().nextInt(10));log.info("当前线程休眠完成");} catch (Throwable e) {log.error("异常", e);}return null;});});batchList.forEach(imgRecord -> {try {completionService.take().get();} catch (Exception e) {e.printStackTrace();}});log.info("完成本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now());}}/**4. 使用CompletableFuture*/public void testCompletableFuture() {for (int j = 0; j < 3; j++) {List<Object> batchList = new ArrayList<>();for (int i = 0; i < 10; i++) {batchList.add(new Object());}if (CollectionUtils.isEmpty(batchList)) {return;}log.info("开始本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now());ArrayList<CompletableFuture<?>> futureList = new ArrayList<>();batchList.forEach(Object -> {final CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(new Random().nextInt(10));log.info("当前线程休眠完成");} catch (Throwable e) {log.error("异常", e);}}, FORK_JOIN_POOL);futureList.add(future);});CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();log.info("完成本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now());}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。