谈谈集合.Stream Api
2021-03-14 21:33
标签:star 数据源 cep 传统 raise 四种 出现 split map操作 Java8提供的stream API可以让程序员像操作数据库一样操作集合。Stream API可以极大提高Java程序员的生产力,让程序员写出高效率、干净、简洁的代码。同时它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势,使用 fork/join 并行方式来拆分任务和加速处理过程。通常编写并行代码很难而且容易出错, 但使用 Stream API 无需编写一行多线程的代码,就可以很方便地写出高性能的并发程序。常用的stream API有如下; 简而言之,Stream API是一个非常高效的数据处理框架。 可以从不同的数据源创建stream。java collection包中的Collections,Lists,Sets这些类中新增stream()和parallelStream()方法,通过这些方法可以创建一个顺序stream(sequential streams)或者一个并发的stream(Parallel streams)。并发stream(Parallel streams)更适合在多线程中使用,本文先介绍顺序流(sequential streams)在结尾会描述并发stream(Parallel streams), List对象上调用stream()方法可以返回一个常规的对象流。在下面的例子中我们不需要创建一个collection对象也可以使用stream: 直接使用Stream.of()方法就能从一组对象创建一个stream对象, 除了常规的对象流,JAVA 8中的IntStream,LongStream,DoubleStream这些流能够处理基本数据类型如:int,long,double。比如:IntStream可以使用range()方法能够替换掉传统的for循环 基本类型流(primitive streams)使用方式与常规对象流类型(regular object streams)大部分相同,但是基本类型流(primitive streams)能使用一些特殊的lambda表达式,比如:用IntFunction代替Function,用IntPredicate代替Predicate,同时 可以通过常规对象流(regular object stream)的mapToInt(), mapToLong(),mapToDouble(),基本类型对象流(primitive streams)中的mapToObj()等方法完成常规对象流和基本类型流之间的 下面这个例子中doubles stream先被映射成int stream,然后又被映射成String类型的对象流: 我们用下面的一个列子来引入Stream的处理顺序: 想象中,上面的列子会输出下面的内容: 但是当我们执行这段代码的时候,控制台没有输出任何内容。下面会来讲下出现这个现象的原因。在讲这个原因之前我们先来引入两个Stream相关的概念,能帮助我们更好的理解Stream API: 中间操作和最终操作 stream包含 大多stream操作接受某种形式的lambda表达式作为参数,通过方法接口的形式指定操作的具体行为,这些方法接口的行为基本上都是无干扰(non-interfering)和无状态(stateless)。无干扰(non-interfering)的方法的定义是:该方法不修改stream的底层数据源,比如上述例子中:没有lambda表达式添加或者删除myList中的元素。无状态(stateless)方法的定义:操作的执行是独立的,比如上述例子中,没有lambda表达式在执行中依赖可能发生变化的外部变量或状态。 简单粗浅的总结下上面那段话:返回值是还是Stream类型的操作是中间操作,返回值是void或者是非Stream类型的操作的最终操作。Stream的API不会改变原始数据。 下面是Stream的接口,我们通过返回值就可以清楚的判断哪些是中间操作哪些是最终操作。像我们平时常用的操作filter、map、distinct、sort和limit等都是中间操作。 有了上面中间操作和最终操作的基础,我们再来看看上面的列子会发现列子中的操作只有中间操作而没有最终操作。说到这里大家可能已经知道答案了:Stream的中操作只有执行到最终操作时才会被触发。 下面就加上一个最终操作看看效果: 上面代码输出下面的结果: 我们会发现filter操作在过滤出一个元素后会立马进入下一步执行,而不是等待将整个集合过滤完再操作。(map操作也是类似的行为) 调整上面的filter和map的顺序能大量减少map的执行次数,提升执行效率; Sorting 是一种特殊的中间操作(intermediate operation),在对集合中元素进行排序过程中需要保存元素的状态,因此Sorting 是一种有状态的操作(stateful operation)。 首先,在整个输入集上执行排序操作(即先对集合进行水平操作),由于输入集合中的元素间存在多种组合,因此上面的例子中sorted操作被执行了8次。 可以通过对执行链重排序的方式,提升stream的执行效率。修改执行链顺序之后由于filter操作的过滤,导致sorted操作的输入集只有一个元素,在大数据量的情况下能够大幅度提高执行效率。 Stream有一个特性,就是当你 在同一个stream中执行完anyMatch后再执行noneMatch就会抛出如下异常: 可以通过为每个最终操作(terminal operation)创建一个新的stream链的方式来解决上面的重用问题,Stream api中已经提供了一个 Collect(收集)是一种是十分有用的最终操作,它可以把stream中的元素转换成另外一种形式,比如;list,set,map。Collect使用Collector作为参数,Collector包含四种不同的操作:supplier(初始构造器), accumulator(累加器), combiner(组合器), finisher(终结者)。这听起来很复杂,但是一个好消息是java 8通过Collectors类内置了各种复杂的收集操作,因此对于大部分常用的操作来说,你不需要自己去实现collector类。 使用Collectors可很轻松的生成List、Set和Map对象。 通过上面的demo可以看出,将stream转换为List十分简单,如果想转换为Set的话,只需使用Collectors.toSet()就可以了。 我们已经了解:通过map方法可以将stream中的一种对象转换成另外一种对象。但是map方法还是有使用场景限制,只能将一种对象映射为另外一种特定的已经存在的对象。是否能够将一个对象映射为多种对象,或者映射成一个根本不存在的对象呢。这就是flatMap方法出现的目的。 FlatMap方法可以将一个stream中的每一个元素对象转换为另一个stream中的另一种元素对象,因此可以将stream中的每个对象改造成零,一个或多个。flatMap操作的返回流包含这些改造后的对象。‘ 下面给出一个列子: flatMap方法接收一个Stream类型的参数,从它的名字可以看出来,这个方法的作用是将多个Stream合并成一个Stream(然不是我们想当然的生成一个Map)。 计算的结果是76,通过添加调试输出,我们可以详细地了解执行引擎中发生了什么。 从调试输出中可以看到,累加器做了所有的工作,它首先获取值为0的标示值和第一个用户Max,接下来的三步中持续sum值由于累加不断变大,在最后一步汇总的年纪增长到76。 注意,上面的调试输出中combiner没有执行,通过parallel执行上面相同stream。 通过并行的方式执行上面的stream操作,得到的是另外一种完全不相同的执行动作。在并行stream中combiner方法会被调用。这是由于累加器是被并行调用的,因此组合器需要对分开的累加操作进行求和。 为了提高大量输入时的执行效率,stream可以采用并行的放行执行。 在我的机器上公共池初始化为每个默认3并行,这个值可以通过调整jvm参数来修改: 为了了解并行流的执行动作,下面的例子会打印当前线程的执行信息。 执行的结果如下: 通过分析调试输出,我们可以更好地了解哪一个线程执行了哪些stream操作。从上面的输出中我们可以看到parallel stream使用了ForkJoinPool提供的所有可用的线程来执行流的各种操作。由于不能确定哪个线程会执行并行流的哪个操作,因此反复执行上面的代码,打印的结果会不同。 扩充上面的例子,添加sort操作 执行结果如下: 这个执行结果看起来比较奇怪,看起来sort操作只是在main线程中顺序执行的。实际上,parallel stream中的sort操作使用了JAVA 8的一个新方法:Arrays.parallelSort()。JAVA doc中是这样描述Arrays.parallelSort()的:待排序数组的长度决定了排序操作是顺序执行还是并行执行。java doc 描述如下: 回到上一章的例子,我们已经了解combiner方法只能在parallel streams中调用,让我们来看下那些线程被实际调用: 执行结果如下: 从控制台输出可以看到accumulator和combiner操作都被可用的线程并行执行了。 总结起来:在大数据量输入的时候,parallel streams可以带来比较大的性能提升。但是应该记住,一些并行操作,比如:reduce,collect需要额外的计算(组合操作),但是在顺序流中,这些组合操作是不需要的。 另外,我们知道 https://blog.csdn.net/m0_37556444/article/details/84975355 https://ifeve.com/stream/ 谈谈集合.Stream Api 标签:star 数据源 cep 传统 raise 四种 出现 split map操作 原文地址:https://www.cnblogs.com/54chensongxia/p/12449920.html1. 什么是stream API
+--------------------+ +------+ +------+ +---+ +-------+
| stream of elements +-----> |filter+-> |sorted+-> |map+-> |collect|
+--------------------+ +------+ +------+ +---+ +-------+
2. stream的几个特点
3. Stream API使用列子
3.1 Stream分类
Arrays.asList("a1", "a2", "a3")
.stream()
.findFirst()
.ifPresent(System.out::println); // a1
1234
Stream.of("a1", "a2", "a3")
.findFirst()
.ifPresent(System.out::println); // a1
123
IntStream.range(1, 4)
.forEach(System.out::println);
12
基本类型流(primitive streams)中可以支持一些聚合方法
,如:sum(),average()等。Arrays.stream(new int[] {1, 2, 3})
.map(n -> 2 * n + 1)
.average()
.ifPresent(System.out::println); // 5.0
1234
相互转换
IntStream.range(1, 4)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);
123
Stream.of(1.0, 2.0, 3.0)
.mapToInt(Double::intValue)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);
// a1
// a2
// a
3.2 Stream API的处理顺序
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
});
filter: d2
filter: a2
filter: b1
filter: b3
filter: c
中间(intermediate operations)
和最终(terminal operation)
两种形式的操作。中间操作(intermediate operations)的返回值还是一个stream
,因此可以通过链式调用将中间操作(intermediate operations)串联起来。最终操作(terminal operation)只能返回void或者一个非stream的结果。
在上述例子中:filter, map ,sorted是中间操作,而forEach是一个最终操作。更多关于stream的中可用的操作可以查看java doc。上面例子中的链式调用也被称为操作管道流。public interface Stream
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
})
.forEach(s -> System.out.println("forEach: " + s));
filter: d2
forEach: d2
filter: a2
forEach: a2
filter: b1
forEach: b1
filter: b3
forEach: b3
filter: c
forEach: c
3.3 执行效率与steream执行链顺序的关系
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("A");
})
.forEach(s -> System.out.println("forEach: " + s));
Stream.of("d2", "a2", "b1", "b3", "c")
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
3.4 流复用
执行完任何一个最终操作(terminal operation)的时候流就被关闭了
Stream
java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)
12345
stream supplier
类来在已经存在的中间操作(intermediate operations )的stream基础上构建一个新的stream。Supplier
3.5 一些高级操作
3.5.1 Collect(收集)操作
List
3.5.2 FlatMap操作
String[] words = new String[]{"Hello","World"};
List
3.5.3 Reduce操作
reduce操作可以将stream中所有元素组合起来得到一个元素
,JAVA8支持三中不同的reduce方法。
persons
.stream()
.reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
.ifPresent(System.out::println);
Person result =
persons
.stream()
.reduce(new Person("", 0), (p1, p2) -> {
p1.age += p2.age;
p1.name += p2.name;
return p1;
});
System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76
1234567891011
Integer ageSum = persons
.stream()
.reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);
System.out.println(ageSum); // 76
12345
Integer ageSum = persons
.stream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David
12345678910111213141516
Integer ageSum = persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35
12345678910111213141516171819
4. 并行Stream
并行流(Parallel Streams)通过ForkJoinPool.commonPool() 方法获取一个可用的ForkJoinPool。这个ForkJoinPool使用5个线程(实际上是由底层可用的物理cpu核数决定的)
。ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism()); // 3
On my machine the common pool is initialized with a parallelism of 3 per default. This value can be decreased or increased by setting the following JVM parameter:
123
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
1
Collections中包含parallelStream()方法,通过这个方法能够为Collections中的元素创建并行流。另外也可以调用stream的parallel()方法将一个顺序流转变为一个并行流的拷贝。
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
1234567891011121314
filter: b1 [main]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: c2 [ForkJoinPool.commonPool-worker-3]
map: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map: b1 [main]
forEach: B1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-3]
map: a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]
123456789101112131415
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.format("sort: %s %s [%s]\n",
s1, s2, Thread.currentThread().getName());
return s1.compareTo(s2);
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
12345678910111213141516171819
filter: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: b1 [main]
map: b1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-2]
map: a1 [ForkJoinPool.commonPool-worker-2]
map: c2 [ForkJoinPool.commonPool-worker-3]
sort: A2 A1 [main]
sort: B1 A2 [main]
sort: C2 B1 [main]
sort: C1 C2 [main]
sort: C1 B1 [main]
sort: C1 C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]
123456789101112131415161718192021
If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.
1
List
accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2]
combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]
1234567
所有的parallel stream操作共享一个jvm范围内的ForkJoinPool
,所以你应该注意避免在parallel stream上执行慢阻塞流操作,因为这些操作可能导致你应用中依赖parallel streams操作的其他部分也会响应变慢。5. 使用详细列子
package com.csx.demo.spring.boot.lambda;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.*;
public class LambdaStreamDemo {
public static void main(String[] args) {
List
6. 简单总结
7. 参考