Stream的使用
首先我們看一下stream的基本使用方法:
ArrayList<String> list = Lists.newArrayList("America", "ABC", "CNN", "OK", "ASYNC"); List<String> strings = list.stream().filter(e -> e.startsWith("A")).map(e -> e + " nice").collect(Collectors.toList());
最終我們會得到ArrayList中以A開頭的字母加上“nice”的字符串List,如果放在jdk7裡我們會這樣寫:
ArrayList<String> strings = Lists.newArrayList(); for (String s : list) { if(s.startsWith("A")){ String newStr = s + "nice"; strings.add(newStr); } }
我試著去看源代碼,發現Stream實質上就是這樣執行我們的需求的。下面就說說我看到了什麼。
Stream相關類的介紹
打開java.util.stream
包,可以看到核心接口Stream類,顧名思義就是流水的意思,官方文檔原話說的是
A sequence of elements supporting sequential and parallel aggregate operations.
Stream就是一個支持串行和並行的聚集操作的一系列元素。
定義了一些中間操作(Intermediate operations)和結束操作(Terminal operations),
中間操作包括無狀態(Stateless)操作比如:filter, map, flatMap等,有狀態(Stateful)操作比如:distinct, sorted, limit等;
結束操作(Terminal operations)包括非短路操作(short-circuiting)比如:forEach, reduce, collect等和短路操作如:findFirst, findAny;
中間操作不是真正的操作而是一種操作的描述,只有執行到結束操作才會觸發實際計算,在結束操作執行之前只是把中間操作記錄了下來。無狀態中間操作指元素的操作不受其他元素的影響,比如以某一Predicate去filter元素,元素和元素之前不互相影響。而有狀態中間操作指的是元素和元素之間是有關聯的,比如sorted,只有讀取所有元素之後才能確定排序結果。
短路結束操作指的是不用處理所有元素才能返回結果,比如findFirst,只要找到第一個符合條件的元素即可返回結果。非短路結束操作則必須處理完所有元素才能返回結果。
Stream繼承了BaseStream,定義了一些Stream的基本操作。
Pipeline記錄操作
以上所說的操作需要被按順序記錄下來,這裡就需要管道流水線Pipeline的概念來實現。
管道有一個基類PipelineHelper,他是執行Stream管道的一個helper,將Stream的所有信息收集到一個地方。
上面所說的操作其實都定義在PipelineHelper的一個子類ReferencePipeline中,包括Head(Source stage of a ReferencePipeline)、StatelessOp(Base class for a stateless intermediate stage of a Stream.)、StatefulOp(Base class for a stateful intermediate stage of a Stream.)靜態內部類。
ReferencePipeline是描述中間操作管道流和源管道流的一個類,同時也實現了Stream接口
在Stream中使用stage(階段)來描述一個完整的操作,而Head、StatelessOp、StatefulOp這三個操作都是實例化的PipelineHelper,也就是stage。可以把stage理解為帶管道的流(Stream with Pipeline)
在本文一開始的例子中,我們分析一下有幾個stage,下圖:
每一步Stream的方法調用都產生一個新的stage,在隨後的分析中會發現,這些stage會以雙向鏈表的方式鏈接,而每個stage都記錄了每一個階段的操作,這樣我們就可以依賴這種數據結構來保存對數據源的所有操作了。
鏈接stage
stage的鏈接靠Sink來實現,我們先看一下Sink的接口,我們這裡只看ChainedReference
ChainedReference包括:
- begin:在遍歷元素前調用,做好遍歷準備
- accept:遍歷每個元素的時候調用,包含每個stage的操作和回掉函數
- end:遍歷結束後調用
- cancellationRequested:是否能夠儘早結束遍歷,用於短路操作
每個stage都把操作實現在Sink裡,上游stage調用下游stage的accept方法,達到按順序執行每個操作的目的。
stage的自動執行
直接上代碼
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { Objects.requireNonNull(predicate); return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { if (predicate.test(u)) downstream.accept(u); } }; } }; }
上面代碼是Stream的filter方法,fiter是一個無狀態操作,返回一個新的stage,還實現了AbstractPipeline.opWrapSink來返回stage實現的sink。這裡filter的參數是一個predicate,在predicate.test返回true時調用下游的stage的sink的accept方法,這樣整個操作流就連續執行下去了。
stage的雙向鏈接
在說Stream自動執行之前,有必要說一說每個stage是怎麼鏈接起來的。Stream在操作時產生的Operation類是如何用雙向鏈表的結構來前後鏈接的?
在上面Stream.filter的源代碼可以看到,filter返回了一個StatelessOp對象,構造函數接受了當前對象this為第一個參數,然後來看StatelessOp的代碼:
abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { /** * Construct a new Stream by appending a stateless intermediate * operation to an existing stream. * * @param upstream The upstream pipeline stage * @param inputShape The stream shape for the upstream pipeline stage * @param opFlags Operation flags for the new stage */ StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) { super(upstream, opFlags); assert upstream.getOutputShape() == inputShape; } @Override final boolean opIsStateful() { return false; } }
可以看到StatelessOp實現了ReferencePipeline接口,在構造函數裡調用了super(upstream, opFlags),而這個upstream(上游流)參數就是上面傳入的this,下游流StatelessOp的upstream就指向this了,這樣就通過下游流的upstream鏈接上游流。目前每個操作之間還只是單鏈表。
那有人就會想了,下游流保存了上游流的引用,那上游流是怎麼保存下游流的引用呢?這就要看最後的結束操作了,我們來看Stream.collect代碼:
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) { A container; if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator(); forEach(u -> accumulator.accept(container, u)); } else { container = evaluate(ReduceOps.makeRef(collector)); } return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container); }
這裡我們只看串行操作的分支。filter返回了一個結束操作的計算結果。我們來看evaluate方法:
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); }
AbstractPipeline.evaluate方法接收了一個結束操作對象,我們只看串行操作:
public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { return helper.wrapAndCopyInto(makeSink(), spliterator).get(); }
繼續看AbstractPipeline.wrapAndCopyInto:
@Override final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) { copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator); return sink; } @Override final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } }
AbstractPipeline.wrapAndCopyInto接收了結束操作的sink,繼續看AbstractPipeline.wrapSink:
@Override @SuppressWarnings("unchecked") final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }
從結束操作的sink開始,一層一層包裝sink,最後第一個中間操作的sink在最外層,在每個操作的opWrapSink方法裡返回的sink都維護了一個downstream指向後一個操作,這樣,雙向鏈表的結構就完成了。這樣,我們在copyInto方法裡調用begin、accept、end的時候就會通過downstream一層一層的調用下去,最終在結束操作執行實際計算。
結束
Stream的基本原理就分析到這裡,希望大家和我一起討論學習。希望看不明白的同學可以向我提問,看過源碼的同學歡迎指出錯誤!大家一起學習!