记一次多线程并发代码优化

2020/07/29

一、前言

并发运行相比串行执行很好,因为其可以减少执行时间,但是并发用的不对,也会造成资源浪费,本文我们就来探究一例子。

二、案例介绍与优化

有这样一段代码,根据传递的url列表,并发的去下载url对于的文件内容,原来代码模拟如下:

//0
	private final static ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(8, 8, 0L, TimeUnit.MILLISECONDS,
			new LinkedBlockingQueue<Runnable>(1));

	public static void main(String[] args) {

		// 1.创建图片列表
		List<String> imageList = new ArrayList<String>();
		for (int i = 0; i < 3; ++i) {
			imageList.add(i + "");
		}

		long start = System.currentTimeMillis();

		// 2.并发处理url
		Map<String, String> resultMap = imageList.parallelStream().collect(Collectors.toMap(url -> url, url -> {
			try {
				EXECUTOR_SERVICE.submit(() -> {
					// 2.1模拟同步处理url,并返回结果
					System.out.println(Thread.currentThread().getName() + " " + url);
					Thread.sleep(30000);
					return "jiaduo" + url;
				}).get();
			} catch (Exception e) {
				e.printStackTrace();
			}
			return "";
		}));

		// 3.打印结果
		long costs = System.currentTimeMillis() - start;
		System.out.println("result:" + costs + " " + JSON.toJSONString(resultMap));
		System.out.println("main is over");
	}
  • 如上代码0创建了一个线程个数为8的线程池。
  • 代码1创建了一个图片列表
  • 代码2意为使用并行流把图片url下载任务投递到线程池EXECUTOR_SERVICE后,通过流的collect方法,把url和url处理结果收集起来变成map返回。

  • 代码2.1我们模拟同步根据url下载文件,并返回处理结果。

  • 代码3则使用main线程打印整个处理耗时和处理结果。

如上代码,运行起来确实可以实现并发下载图片功能,但是里面有个细节,就是默认并行流使用的是对于这段代码来说,首先使用了并行流,而并行流默认使用的是ForkJoinPool中的commonPool,而该commonPool是真个JVM内单实例的,如果commonPool内的线程全部阻塞了,则其他使用它的地方将转换为调用线程来执行。

另外这里会导致多个commonPool中的线程处于阻塞状态等待异步任务执行完毕。这里假设imageList中有3个URL,则我们会有3个线程(一个main函数所在调用线程,2个commonpool中的线程)分别把下载图片任务投递到executorService,然后这3个线程各自调用了返回的future的get系列方法等待上传任务的完成,所以这会导致commonPool内的2个线程和调用线程被阻塞。

为了验证上面说法,大家运行上面代码,然后可以自行jstack线程堆栈,会发现如下:

首先我们会看到3个EXECUTOR_SERVICE线程池中的线程在执行图片下载任务: image.png

然后我们会看到有两个commonPool中的线程,在等待投递到EXECUTOR_SERVICE中的两个任务执行完毕: image.png

并且可以看到main线程也在等待投递到EXECUTOR_SERVICE中的一个任务的执行结果: image.png

所以这里为了等待投递到线程池EXECUTOR_SERVICE中的三个任务执行完毕,耗费了三个线程。其实可以把上面代码改造为如下:

	private final static ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(8, 8, 0L, TimeUnit.MILLISECONDS,
			new LinkedBlockingQueue<Runnable>(1));

	public static void main(String[] args) {

		// 1.创建图片列表
		List<String> imageList = new ArrayList<String>();
		for (int i = 0; i < 3; ++i) {
			imageList.add(i + "");
		}

		long start = System.currentTimeMillis();

		// 2.并发执行,并保持url对应的future
		Map<String, Future<String>> futureMap = imageList.stream().collect(Collectors.toMap(url -> url, url -> {
			return EXECUTOR_SERVICE.submit(() -> {
				// 2.1模拟rpc同步处理url,并返回结果
				System.out.println(Thread.currentThread().getName() + " " + url);
				Thread.sleep(300000);
				return "jiaduo" + url;
			});
		}));

		// 3.调用线程同步等待所有任务执行完毕
		Map<String, String> resultMap = futureMap.entrySet().stream()
				.collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
					try {
						entry.getValue().get();
					} catch (Exception e) {
						e.printStackTrace();
					}
					return "";
				}));

		// 3.打印结果
		long costs = System.currentTimeMillis() - start;
		System.out.println("result:" + costs + " " + JSON.toJSONString(resultMap));
		System.out.println("main is over");
	}
  • 如上代码2我们把图片下载任务投递到了线程池EXECUTOR_SERVICE后调用线程马上返回了对于的Future对象,然后我们通过流的collect方法把url和对应的future对象收集到了futureMap中,这个过程是异步的,不会阻塞调用线程。

  • 代码3 main线程则循环获取每个future的执行结果,并且通过流的collect方法把url和对应的future执行结果收集到map.

运行上面代码,我们会发现除了有EXECUTOR_SERVICE中的三个线程在执行文件下载任务外,只有一个main线程在等待全部任务执行完毕,相比原来方法节省了2个commonPool里面的线程。

三、总结

并发固然好,但是用不对则会起到副作用,本例中原来方法如果url列表很大,则会导致commonpool里面的线程打满,则当前jvm内其它使用commonpool的地方则会自动转换为调用线程来执行了,会起不到预设的效果。


微信扫描二维码,关注获取第一手技术分享

(转载本站文章请注明作者和出处 加多-技术原始积累

Show Disqus Comments

Post Directory

扫码关注公众号:加多
发送 290992
即可立即永久解锁本站全部文章