Ten wpis jest kontynuacją artykułu o strumieniach.
Operacje redukcji
Operacja redukcji (składania) pobiera sekwencję elementów wejściowych i składa je w pojedynczy wynik przez powtarzanie pewnych operacji. Przykładem operacji redukcji jest sumowanie elementów czy utworzenie listy elementów. Klasy strumieni wykorzystują metody reduce(), collect() podobnie jak wyspecjalizowane metody sum(), max(), czy count().
w praktyce programistycznej takie operacje jak operacja sumowania wykonuje się w prostej pętli sekwencyjnej:
int sum = 0; for(int x: values:numbers { sum += x; }
Istnieje wiele przyczyn dla których należy przedkładać operacje redukcji ponad akumulację mutatywną, taką jak pokazana powyżej. Operacja redukcji operuje na strumieniu jako całości,a nie na pojedynczych elementach i jeśli tylko funkcje działające na strumieniu są asocjacyjne i bezstanowe to właściwie skonstruowana operacja redukcji jest zawsze zdolna do paralelizacji.
long sum = values.stream().reduce(0, (x,y) -> x + y);
Wystarczy stream() zamienić na parallelStream(), a operacja może przebiegać a trybie równoległym.
Redukcja pozwala na łatwą paralelizację ponieważ implementacja zapewnia wykonywanie operacji równoległych na podzbiorach, a następnie na odpowiednie połączenie wyników pośrednich w końcowy, poprawny wynik.
W trybie akumulacji mutatywnej należy zapewnić synchronizację, więc jeśli nawet istnieje możliwość paralelizacji to zastosowanie synchronizacji eliminuje wszystkie korzyści z równoległości.
Operacja reduce() zapewnia korzyści z paralelizacji bez obciążeń powodowanych przez konieczność synchronizacji zmiennych.
Operacja redukcji na elementach typu <T> dająca wynik typu <U> wymaga trzech parametrów: identity, accumulator,combiner.
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
Identyczność (identity) jest zarówno wartością początkową ziarna dla redukcji jak i domyślnym wynikiem, jeśli nie ma elementów wejściowych.
Funkcja akumulacyjna (accumulator) pobiera częściowy wynik oraz następny wynik i wytwarza nowy częściowy wynik.
Funkcja kombinacyjna (łącząca) (combiner) łączy dwa wyniki częściowe i tworzy nowy wynik częściowy. Funkcja kombinacyjna jest konieczna przy redukcji równoległej, gdyż wtedy wejście jest partycjonowane, obliczana jest częściowa akumulacja dla każdej partycji, a następnie częściowe wyniki są łączone do wytworzenia wyniku końcowego.
Bardziej formalnie, identyczność musi być identycznością dla funkcji łączącej. Jeżeli np. jest to suma to identyczność będzie 0, gdyż 0 nie zmienia wyniku przy sumowaniu. Jeżeli będzie to mnożenie czy dzielenie – będzie to 1, gdyż 1 nie zmienia wyniku przy mnożeniu i dzieleniu.
Dodatkowo funkcja łącząca musi być assocjacyjna i kompatybilna z funkcją akumulatorową.
Redukcja mutatywna
Operacja redukcji mutatywnej umieszcza elementy wejściowe w mutowalnym kontenerze takim jak Collection albo StringBuilder w trakcie przetwarzania elementów w strumieniu. Jeśli mamy strumień stringów i chcemy połączyć je w jeden łańcuch możemy uczynić to za pomocą operacji redukcji (streams2.Reduction01.java):
String concatenated = Arrays.stream(values).reduce("", String::concat);
Oczywiście otrzymamy to co chcemy, a operacja może nawet przebiegać równolegle. Problemem jednak jest wydajność operacji. Nieco wydajniej można tę operację przeprowadzić używając StringBuilder. Wtedy lepiej jest użyć operacji collect().
Aby umieścić strumień elementów w ArrayList nie możemy tego zrobić przez dodawanie elementów ze strumienia w trakcie iteracji po strumieniu. przy użyciu pętli forEach. Możemy użyć operacji collect() (streams2.Reduction02.java):
Stream<String> stream = Arrays.stream(values); ArrayList<String> strings = stream.collect(() -> new ArrayList<>(), (c, e) -> c.add(e), (c1, c2) -> c1.addAll(c2));
Zamiennie można wyciągnąć operację mapowania z funkcji akumulatorowej i zapisać operację nieco zwięźlej
(streams2.Reduction03.java):
Stream<String> stream = Arrays.stream(values); List<String> strings1 = stream.map(Object::toString).collect( ArrayList::new, ArrayList::add, ArrayList::addAll);
Zamiennie można użyć abstrakcji Collector to uchwycenia wszystkich trzech aspektów (supplier, accumulator,
combiner) za jednych zamachem streams2.Reduction04.java):
Stream<String> stream = Arrays.stream(values); List<String> strings1 = stream.map(Object::toString).collect(Collectors.toList());
Pakowanie mutowalnych redukcji w Collector oprócz uproszczenia składni przynosi jeszcze jedną korzyść:
kompozycyjność. Klasa Collector zawiera liczne predefiniowane fabryki kolektorów, włączając kombinatory, które
przekształcają jeden kolektor w drugi.
Przypuśćmy, że mamy kolektor obliczający sumę pensji w strumieniu pracowników:
Collector<Pracownik,?, Integer> sumowaniePensji = Collectors.summingInt(Pracownik::getPensja);
Znak zapytania oznacza, że nie jest dla nas istotne jakich pośrednich reprezentacji użyje wewnętrznie
kolektor.
Możemy go wykorzystać tak (streams2.Reduction05.java):
Integer aa = str.collect(Collectors.summingInt(Pracownik::getPensja)); System.out.println(aa);
albo tak:
Integer ab = str.collect(sumowaniePensji);
W przypadku, gdy chcemy utworzyć kolektor grupujący pensje według departamentu może ponownie użyć kolektora z jednoczesnym użyciem groupingBy (streams2.Reduction06.java):
Map<String, Integer> salByDept = str.collect(Collectors.groupingBy( Pracownik::getDepartament, sumowaniePensji));
Redukcja konkurencyjna (współbieżna)
Wykonywanie niektórych złożonych operacji redukcji równolegle, np. collect(), w wyniku których powstają nowe kolekcje mogą być bardzo kosztowne czasowo. Dlatego na ogół równolegle wykonuje się operacje przy użyciu kolekcji współbieżnych
Redukcja konkurencyjna to redukcja równoległa z użyciem kolekcji współbieżnych np. ConcurrentHashMap.
Collector, który dostarcza redukcji konkurencyjnej jest oznakowany jako Collector.Characteristics.CONCURRENT. Niestety kolekcje współbieżne mają swoją ciemną stronę. Jeśli wiele wątków składa wyniki współbieżnie w dzielonym kontenerze, porządek w którym wyniki są układane nie jest deterministyczny. W konsekwencji redukcja konkurencyjna jest możliwa tylko wtedy, gdy uporządkowanie nie jest istotne dla przetwarzanego strumienia.
Implementacja Stream.collect(Collector) będzie wykonywała redukcję współbieżnie jeŚli:
- Strumień jest równoległy
- Kolektor ma charakterystykę Collector.Characteristics.CONCURRENT i
- albo strumień jest nieuporządkowany albo kolektor ma charakterystykę Collector.Characteristics.UNORDERED
Używając metody BaseStream.unordered() można stwierdzić czy strumień jest uporządkowany.
W przypadku, gdy uporządkowanie elementów jest istotne nie można używać redukcji współbieżnej.
Asocjacyjność
Operator lub funkcja jest asocjacyjna gdy spełnia warunek:
(a op b) op c == a op (b op c)
gdzie 'op’ jest operacją np. mnożenia.
W równoległości wykorzystuje się szczególnie asocjacyjność typu:
a op b op c op d == (a op b) op (c op d)
Niskopoziomowa konstrukcja strumieni
Metody stream klasy Collection i Arrays pozwalają na otrzymywanie strumieni. Klasa StreamSupport zawiera kilka niskopoziomowych metod do tworzenia strumieni.
Spliterator jest równoległym analogiem iteratora Iterator. Spliterator opisuje (możliwie nieskończoną) kolekcję elementów z możliwością sekwencyjnego przechodzenia naprzód, bąbelkowego przechodzenia i dzielenia niektórych porcji wejścia na spliteratory, które mogą być przetwarzane równolegle. Na najniższym poziomie, wszystkie strumienie są kierowane przez spliterator.
Istnieją różne możliwości implementacji spliteratorów, z których niemal wszystkie są kompromisem pomiędzy prostotą implementacji, a ich efektywnością.
Najprostszym spliteratorem, ale najmniej efektywnym jest spliterator utworzony z iteratora przy użyciu metody Spliterators.spliteratorUnknownSize(java.util.Iterator, int).
Lepszy spliterator będzie dostarczał dobrze zbalansowanych podziałów o dobrze znanej wielkości oraz licznych charakterystyk spliteratora lub danych, które mogą być użyte przez implementację do optymalizacji wykonania.
Spliteratory dla mutowalnych danych podlegają dodatkowym wyzwaniom: taktowanie wiązania do danych ponieważ dane mogą zmieniać się pomiędzy czasem, gdy spliterator jest tworzony, a czasem gdy potok strumienia jest wykonywany.
Idealnie, spliterator powinien mieć w charakterystyce IMMUTABLE lub CONCURRENT, a jeśli nie ma powinien zapewniać późne wiązanie.
Jeśli źródło danych nie może bezpośrednio dostarczyć rekomendowanego spliteratora, może pośrednio dostarczyć spliteratora używając Supplier i skonstruować strumień przez wersje stream(), które akceptują Supplier. Spliterator jest otrzymywany z dostarczyciela tylko wtedy, gdy zacznie się kończąca operacja w potoku strumienia.