阿里二面:Java8的Stream api是迭代一次還是迭代多次
面試官:java8新增的stream api用過嗎?
我:這個必須用過啊。
面試官:給你下面一個字符串?dāng)?shù)組,如果用stream api來實現(xiàn),找出以字符'a'開頭長度最大的字符串,使用stream api該怎么實現(xiàn)呢?
- {"abb","abcd","fegc","efe","adfes"}
我:用下面這個方法來實現(xiàn):
- public static void maxLength(List<String> list){
- System.out.println(list.stream().filter(s -> s.startsWith("a")).mapToInt(r -> length(r)).max().orElse(0));;
- }
面試官:這個操作是迭代一次還是迭代兩次呢?也就是說是先迭代一遍,過濾出以字符'a'開頭的字符串?dāng)?shù)組,然后再迭代一次,找出最大長度,還是一次迭代完成呢?
我:這個是迭代一次完成,如果要是迭代多次,stream后面的操作函數(shù)很多的情況下效率會非常低。我們加個打印可以來驗證結(jié)果,代碼如下:
- public static void main(String[] args) {
- List<String> list = Arrays.asList("abb", "abcd", "fegc", "efe", "adfes");
- int maxLength = list.stream().
- filter(s -> isStartWitha(s)).
- mapToInt(StreamTest1::length).
- max().orElse(0);
- System.out.println("以字符a開頭的字符串最大長度:" + maxLength);
- }
- private static boolean isStartWitha(String a){
- System.out.println(a + " is start with a:" + a.startsWith("a"));
- return a.startsWith("a");
- }
- private static int length(String a){
- System.out.println("the length of" + a + ":" + a.length());
- return a.length();
- }
打印結(jié)果如下:
- abb is start with a:true
- the length of abb:3
- abcd is start with a:true
- the length of abcd:4
- fegc is start with a:false
- efe is start with a:false
- adfes is start with a:true
- the length of adfes:5
- 以字符a開頭的字符串最大長度:5
面試官:你確定只是迭代一次嗎?有其他情況嗎?
我:有。filter是一個無狀態(tài)的中間操作,對于這個中間操作來說,stream處理只需要迭代一次。但是對于有狀態(tài)的中間操作,就需要迭代多次。
面試官:你剛剛提到有狀態(tài)的操作和無狀態(tài)的操作,這個是怎么區(qū)分呢?
我:在stream api中,無狀態(tài)的操作是指當(dāng)前元素的操作不受前面元素的影響,主要包括如下方法:
- filter(),flatMap(),flatMapToInt(),flatMapToLong(),flatMapToDouble(),map(),mapToInt(),mapToDouble(),mapToLong(),peek(),unordered()
而有狀態(tài)的操作是指需要等所有元素處理完之后才能執(zhí)行當(dāng)前操作,主要包括下面方法:
- distinct(),limit(),skip(),sorted(),sorted()
面試官:有狀態(tài)的操作,能舉個例子嗎?
我:比如下面這段代碼:
- public static void main(String[] args) {
- List<Integer> list = Arrays.asList(5, 2, 3, 1, 4);
- List<Integer> newArray = list.stream()
- .map(StreamTest2::map1)
- .sorted((o1, o2) -> o1 - o2)
- .map(StreamTest2::map2)
- .collect(Collectors.toList());
- System.out.println("新的有序數(shù)組:" + newArray);
- }
- private static Integer map1(Integer i) {
- int result = i * 10;
- System.out.println("線程:" + Thread.currentThread().getName() + " 方法map1入?yún)ⅲ?quot; + i + ",輸出:" + result);
- return result;
- }
- private static Integer map2(Integer i) {
- int result = i * 10;
- System.out.println("線程:" + Thread.currentThread().getName() + " 方法map2入?yún)ⅲ?quot; + i + ",輸出:" + result);
- return result;
- }
上面代碼中,對原始數(shù)組進行了兩次迭代,第一次迭代對所有數(shù)組元素都調(diào)用了map1方法乘以10,然后對新數(shù)組進行排序,第二次迭代對排序后的數(shù)組元素調(diào)用map2方法,即對排序后的數(shù)組元素乘以10。方法輸出如下:
- 線程:main 方法map1入?yún)ⅲ?,輸出:50
- 線程:main 方法map1入?yún)ⅲ?,輸出:20
- 線程:main 方法map1入?yún)ⅲ?,輸出:30
- 線程:main 方法map1入?yún)ⅲ?,輸出:10
- 線程:main 方法map1入?yún)ⅲ?,輸出:40
- 線程:main 方法map2入?yún)ⅲ?0,輸出:100
- 線程:main 方法map2入?yún)ⅲ?0,輸出:200
- 線程:main 方法map2入?yún)ⅲ?0,輸出:300
- 線程:main 方法map2入?yún)ⅲ?0,輸出:400
- 線程:main 方法map2入?yún)ⅲ?0,輸出:500
- 新的有序數(shù)組:[100, 200, 300, 400, 500]
面試官:了解過底層原理嗎?
我:我來先畫一下Stream的UML類圖:
這個類圖說明以下幾點:
- AbstractPipeline有基本類型的子類,如LongPipeline和DoublePipeline,還有一個引用類型的子類ReferencePipeline。
- 無論是ReferencePipeline,還是LongPipeline和DoublePipeline等基本類型的Pipeline,都有3個內(nèi)部類來繼承自己。
- StatelessOp對應(yīng)無狀態(tài)的操作,StatefulOp對應(yīng)有狀態(tài)的操作,Head對應(yīng)Collection.stream()方法返回結(jié)果。
- 無論是StatelessOp、StatefulOp還是Head,都是一個Pipeline,這些Pipeline用雙向鏈表串聯(lián)起來,每個Pipeline節(jié)點被看作一個Stage,Head是鏈表的頭結(jié)點。上面UML類圖中AbstractPipeline類中previousStage和nextStage就代表雙向鏈表當(dāng)前節(jié)點指向前后節(jié)點的引用。如下圖:
面試官:上面用雙向鏈表把所有操作都串聯(lián)起來了,這樣可以實現(xiàn)從Head節(jié)點開始依次執(zhí)行所有的操作。但是這些操作怎么疊加在一起呢?比如下面這段代碼有三個map方法,后面的方法要依賴前面的計算結(jié)果:
- List<Integer> list = Arrays.asList(5, 2, 3, 1, 4);
- List<Integer> newArray = list.stream().map(StreamTest2::map1).map(StreamTest2::map2).map(StreamTest2::map3).collect(Collectors.toList());
我:Stream提供了Sink接口來處理操作的疊加。上面代碼的map方法把操作封裝到了Sink,每個節(jié)點執(zhí)行操作時,調(diào)用Sink的accept方法就可以把操作結(jié)果傳給下一個節(jié)點的Sink。比如map方法源代碼如下:
- public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
- Objects.requireNonNull(mapper);
- return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
- StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
- @Override
- //返回包裝成的Sink
- Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
- return new Sink.ChainedReference<P_OUT, R>(sink) {
- @Override
- public void accept(P_OUT u) {
- //downstream是下游節(jié)點的Sink,把當(dāng)前節(jié)點的執(zhí)行結(jié)果傳給下游節(jié)點
- downstream.accept(mapper.apply(u));
- }
- };
- }
- };
- }
面試官:能詳細(xì)講一下Sink嗎?
我:Sink主要提供了下面4個方法
- //執(zhí)行操作之前調(diào)用這個方法
- void begin(long size)
- //執(zhí)行操作之后調(diào)用這個方法
- void end()
- //是否可以結(jié)束操作
- boolean cancellationRequested()
- //操作執(zhí)行函數(shù)
- void accept()
對于有狀態(tài)的操作,必須實現(xiàn)begin和end兩個方法,因為begin方法會創(chuàng)建一個存放中間結(jié)果的容器,accept方法將元素放入該容器,end方法負(fù)責(zé)對容器中元素處理,比如排序。
面試官:那cancellationRequested方法什么時候用呢?
我:這個方法用于短路操作,比如stream.findAny。
面試官:你剛剛提到短路操作,怎么區(qū)分短路操作和非短路操作呢?
我:短路操作和非短路操作都是Stream的結(jié)束操作,結(jié)束操作是針對中間操作來說的。短路操作是指不用處理全部元素就可以結(jié)束,包括下面的方法:
- anyMatch(),allMatch(),noneMatch(),findFirst(),findAny()
非短路操作是指需要處理所有元素才能結(jié)束,包括下面的方法:
- forEach(),forEachOrdered(),toArray(),reduce(),collect(),max(),min(),count()
總結(jié)一下Stream操作,如下圖:
在遇到結(jié)束操作時,所有Pipeline節(jié)點封裝的Sink會串成一個鏈表,如下圖:
把Sink串成鏈表的過程可以參考下面這段源代碼:
- final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
- Objects.requireNonNull(sink);
- for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
- sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
- }
- return (Sink<P_IN>) sink;
- }
這樣從Head節(jié)點開始依次調(diào)用每個節(jié)點封裝的Sink中的begin,accept,cancellationRequested,end 四個方法就可以完成Steam流水線的執(zhí)行。
面試官:上面提到了Sink會串成一個鏈,那對于有返回結(jié)果的操作,返回的結(jié)果是保存在什么地方呢?
我:這里分三種情況:
- 如果返回結(jié)果是boolean(比如 anyMatch、allMatch、noneMatch)和Optional(比如 findFirst、findAny),返回結(jié)果存放在對應(yīng)的Sink。
- collect, reduce等規(guī)約操作,返回結(jié)果存放在用戶指定的容器中,比如如下代碼返回結(jié)果放在Optional容器中:
- Optional accResult = Stream.of(1, 2, 3, 4, 5).reduce((sum, item) -> {
- sum += item;
- return sum;
- });
max 和 min也是規(guī)約操作,因為底層是通過調(diào)用 reduce 方法實現(xiàn)的。
- 對于返回是數(shù)組的情況,返回數(shù)組之前,數(shù)據(jù)會存放在一種多叉樹數(shù)據(jù)結(jié)構(gòu)中,這種多叉樹結(jié)構(gòu)元素存儲在樹的葉子當(dāng)中,一個葉子節(jié)點可以存放多個元素。
面試官:上面你提到返回數(shù)組的時候用到了多叉樹的結(jié)構(gòu),這樣做對于Stream處理有什么好處呢?
我:按照官方的說法,這樣做是為了避免在并行操作期間不必要地復(fù)制數(shù)據(jù)。
面試官:能簡單介紹一下Stream的并行處理嗎?
我:Stream的并行處理用到了Fork/Join框架,如下圖:
計算過程中,先把任務(wù)拆解成子任務(wù),并行計算。計算完成后再把子任務(wù)計算結(jié)果合并成結(jié)果集。
面試官:Fork/Join框架跟普通線程池相比,有什么優(yōu)勢嗎?
我:fork/join框架的優(yōu)勢是, 如果某個子任務(wù)需要等待另外一個子任務(wù)完成才能繼續(xù)工作,那么處理線程會主動尋找其他未完成的子任務(wù)進行執(zhí)行。跟普通線程池相比,減少了等待時間。
面試官:使用Stream并行流,一定會比串行快嗎?
我:這個不一定,使用的時候要考慮以下幾個因素:
- 要處理的元素數(shù)量,數(shù)據(jù)越多,性能提升越明顯。
- 數(shù)據(jù)結(jié)構(gòu)的可分割性,數(shù)組、ArrayList支持隨機讀取,可分割性好,HashSet、TreeSet雖然可以分割,但不太容易分割均勻,LinkedList、Streams.iterate、BufferedReader.lines因為長度未知,可分解性差。
- 盡量使用基本類型,避免裝箱拆箱。
- 單個子任務(wù)花費時間越長,帶來的性能提升就會越大。
面試官:據(jù)說Stream api跟普通迭代相比有性能損耗,你怎么看?
我:對于簡單的處理操作,Stream api性能確實不如普通迭代。但是如果CPU性能好的話,使用Stream并行處理性能會明細(xì)提高。對于復(fù)雜處理操作,無論并行還是串行,Stream api有明顯的優(yōu)勢。
對于并行處理,要考慮CPU的核數(shù)。