Java8 Stream基本使用

By | 5月 12, 2016

1. 什么是Stream

Stream是一个数据处理接口,本身不存储任何数据。大概有20多个方法,每个都很好用,并且含有函数式编程里的filter,map,reduce方法。Stream的数据有三个来源:Collection, Array或者根据需要生成。

2. 创建Stream

在Java8中,添加了许多能够产生Stream的方法,如Pattern类中的splitAsStream方法,下面是三种典型常见的生成Stream的形式。

2.1 Collection创建Stream

Java8在Collection接口中添加了stream方法,可以从任何集合生成一个Stream。

List<String> elements = Lists.newArrayList("element1", "element2", "element3");
Stream<String> stream = elements.stream();

2.2 Array创建Stream

int[] nums = {1, 2, 3, 4};
Arrays.stream(nums);
// 使用数组的子集,包含start index,不包含end index。
Arrays.stream(nums, 0, 2);

2.3 直接生成Stream

有限Stream。给定元素,直接生成Stream。如果不是同一类型元素,就是Stream<Object>。

Stream<Integer> stream = Stream.of(1, 2, 3);

无限Stream。使用Stream中的generate静态方法,需要传入一个Supplier<T>接口实例。

// 常量Stream
Stream<String> generate = Stream.generate(() -> "Echo");

// 随机数字的Stream
Stream<Double> generate2 = Stream.generate(Math::random);

无限Stream。使用Stream中的iterate静态方法,第一个参数是种子(起始元素),第二个参数是UnaryOperator<T>接口实例。

// 0, 1, 2, 3, 4, ...
Stream<Integer> iterate = Stream.iterate(0, n -> n + 1);

3. Filter、Map、Reduce方法

Stream的转换,是指从一个Stream中读取数据,经过处理后产生一个新的Stream。对Stream执行任何操作,都不会更改底层的数据集合(Collection或者Array),都是生成新的Stream,JDK中称为不干扰(noninterference)。

3.1 Stream::filter(Predicate<T> p)

过滤一个Stream,新生成的Stream只含有符合条件的元素。

Stream<String> words = Stream.of("int", "double", "String", "BigNumber");
Stream<String> newWords = words.filter(w -> w.length() > 5);

3.2 Stream::map(Function<T, R> f)

对Stream中的每个元素T,执行Function:: R apply(T)方法,生成的元素R组成一个新的Stream。Python的map是对多个list元素执行function,Stream的map只是对自己元素执行function。

Stream<Integer> numbers = Stream.of(1, 2, 3, 4);
Stream<String> map = numbers.map(n -> n++ + "");

3.3 Stream::reduce(BinaryOperator<T> accumulator)

对Stream中的元素,两个两个循环处理,最后组合成一个元素。例如,对所有元素求和。

Stream<Integer> nums = Stream.of(1, 2, 3, 4);
nums.reduce((x, y) -> x + y);
// 指定第一个元素
nums.reduce(0, (x, y) -> x + y);

4. 提取子Stream和合并Stream

获取Stream里前n个元素,组成新的Stream。如果n小于Stream的size,就返回原来的Stream。

Stream<Double> randoms = Stream.generate(Math::random).limit(10);

丢掉前n个元素,组成新的Stream。如果n大于Stream的size,就返回一个空的Stream。

Stream<String> words = Stream.of("", "", "hello").skip(2);

将两个Stream合并成一个Stream

Stream<Integer> stream1 = Stream.of(1, 2);
Stream<Integer> stream2 = Stream.of(5, 6);
Stream<Integer> concat = Stream.concat(stream1, stream2);

5. 并行Stream – 并行处理

默认情况下,都是创建一个串行Stream,方法Collection.paralleStream()除外。调用一个Stream中的parallel方法,就可将其转换成一个并行Stream。对并行Stream施加的任何方法,都是并行执行的,Java自己决定用多少线程来处理。

Stream<Integer> nums = Stream.of(1, 2, 3, 4, 5, 6);
// 无序,打印每个元素
nums.parallel().forEach(n -> System.out.println(n));
// 强制有序处理
nums.parallel().forEachOrdered(System.out::println);

对于串行Stream,调用forEach()方法,始终是有序遍历。下面是并行求和的例子:

// 1, 2, 3 ... 100
Stream<Integer> nums = Stream.iterate(1, n -> n + 1).limit(100);
Stream<Integer> filtered = nums.parallel().filter(n -> n % 2 == 0);

注意:并行Stream使用上和串行Stream没有区别,但是并行Stream中应用的方法,必须是线程安全的。下面是个反面例子:

Stream<String> words = Stream.of("Hello", "the", "world", "!");
int[] count = {0};
// 多个线程并行累加count[0]
words.parallel().forEach(s -> count[0] += s.length());
System.out.println(count[0]);