java8 使用parallelStream线程安全地收集数据

By | 5月 29, 2019

以下我们常用的数据结构,都是线程不安全的。

  • ArrayList, LinkedList
  • HashMap, LinkedHashMap, TreeMap
  • HashSet, LinkedHashSet, TreeSet
  • StringBuilder

parallelStream是以多线程的方式,执行定义的代码块。因为是多线程,所以在代码块里操作线程不安全的Collection,就会引发Concurrency问题。

List<String> results = Lists.newArrayList();
sources.parallelStream().forEach(source -> {
  results.add(sigmaString(source));
  results.add(lambdaString(source));
});

这段代码,是将sources中的每个source,转换成sigma string和lambda string,然后收集到results里面。实际运行时,results由于多线程问题,有些数据没有add进来,丢失了

Solution 1: 不用parallel

当sources数据不多时,可以去掉paralle,直接使用stream,就避免了Concurrency issue。

List<String> results = Lists.newArrayList();
sources.stream().forEach(source -> {
  results.add(sigmaString(source));
  results.add(lambdaString(source));
});

Solution 2: 使用collect方法

我们使用外面的集合,无非是为了收集元素。Java8 Stream的collect方法,就是收集Stream里的元素,返回List,Set或Map等,并且它是线程安全的。下面用collect改写上面的代码:

List<String> results = sources.parallelStream()
  .flatMap(source -> Stream.of(sigmaString(source), lambdaString(source)))
  .collect(Collectors.toList());

没有使用外部的集合,并且使用到了parallelStream并行处理的优势。