Zielony Smok - logo witryny

Ten wpis jest kontynuacją artykułu o strumieniach.

Operacje redukcji

Operacja redukcji (składania) pobiera sekwencję elementów wejściowych i składa je w pojedynczy wynik przez powtarzanie pewnych operacji. Przykładem operacji redukcji jest sumowanie elementów czy utworzenie listy elementów. Klasy strumieni wykorzystują metody reduce(), collect() podobnie jak wyspecjalizowane metody sum(), max(), czy count().

w praktyce programistycznej takie operacje jak operacja sumowania wykonuje się w prostej pętli sekwencyjnej:

int sum = 0;
for(int x: values:numbers {
sum += x;
}

Istnieje wiele przyczyn dla których należy przedkładać operacje redukcji ponad akumulację mutatywną, taką jak pokazana powyżej. Operacja redukcji operuje na strumieniu jako całości,a nie na pojedynczych elementach i jeśli tylko funkcje działające na strumieniu są asocjacyjne i bezstanowe to właściwie skonstruowana operacja redukcji jest zawsze zdolna do paralelizacji.

long sum = values.stream().reduce(0, (x,y) -> x + y);

Wystarczy stream() zamienić na parallelStream(), a operacja może przebiegać a trybie równoległym.

Redukcja pozwala na łatwą paralelizację ponieważ implementacja zapewnia wykonywanie operacji równoległych na podzbiorach, a następnie na odpowiednie połączenie wyników pośrednich w końcowy, poprawny wynik.

W trybie akumulacji mutatywnej należy zapewnić synchronizację, więc jeśli nawet istnieje możliwość paralelizacji to zastosowanie synchronizacji eliminuje wszystkie korzyści z równoległości.

Operacja reduce() zapewnia korzyści z paralelizacji bez obciążeń powodowanych przez konieczność synchronizacji zmiennych.

Operacja redukcji na elementach typu <T> dająca wynik typu <U> wymaga trzech parametrów: identity, accumulator,combiner.

<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);

Identyczność (identity) jest zarówno wartością początkową ziarna dla redukcji jak i domyślnym wynikiem, jeśli nie ma elementów wejściowych.

Funkcja akumulacyjna (accumulator) pobiera częściowy wynik oraz następny wynik i wytwarza nowy częściowy wynik.

Funkcja kombinacyjna (łącząca) (combiner) łączy dwa wyniki częściowe i tworzy nowy wynik częściowy. Funkcja kombinacyjna jest konieczna przy redukcji równoległej, gdyż wtedy wejście jest partycjonowane, obliczana jest częściowa akumulacja dla każdej partycji, a następnie częściowe wyniki są łączone do wytworzenia wyniku końcowego.

Bardziej formalnie, identyczność musi być identycznością dla funkcji łączącej. Jeżeli np. jest to suma to identyczność będzie 0, gdyż 0 nie zmienia wyniku przy sumowaniu. Jeżeli będzie to mnożenie czy dzielenie – będzie to 1, gdyż 1 nie zmienia wyniku przy mnożeniu i dzieleniu.

Dodatkowo funkcja łącząca musi być assocjacyjna i kompatybilna z funkcją akumulatorową.

Redukcja mutatywna

Operacja redukcji mutatywnej umieszcza elementy wejściowe w mutowalnym kontenerze takim jak Collection albo StringBuilder w trakcie przetwarzania elementów w strumieniu. Jeśli mamy strumień stringów i chcemy połączyć je w jeden łańcuch możemy uczynić to za pomocą operacji redukcji (streams2.Reduction01.java):

String concatenated = Arrays.stream(values).reduce("", String::concat);

Oczywiście otrzymamy to co chcemy, a operacja może nawet przebiegać równolegle. Problemem jednak jest wydajność operacji. Nieco wydajniej można tę operację przeprowadzić używając StringBuilder. Wtedy lepiej jest użyć operacji collect().

Aby umieścić strumień elementów w ArrayList nie możemy tego zrobić przez dodawanie elementów ze strumienia w trakcie iteracji po strumieniu. przy użyciu pętli forEach. Możemy użyć operacji collect() (streams2.Reduction02.java):

Stream<String> stream = Arrays.stream(values);
ArrayList<String> strings = stream.collect(() -> new
   ArrayList<>(), (c, e) -> c.add(e), (c1, c2) ->
c1.addAll(c2));

Zamiennie można wyciągnąć operację mapowania z funkcji akumulatorowej i zapisać operację nieco zwięźlej
(streams2.Reduction03.java):

Stream<String> stream = Arrays.stream(values);
List<String> strings1 = stream.map(Object::toString).collect(
	ArrayList::new, ArrayList::add, ArrayList::addAll);

Zamiennie można użyć abstrakcji Collector to uchwycenia wszystkich trzech aspektów (supplier, accumulator,
combiner) za jednych zamachem streams2.Reduction04.java):

Stream<String> stream = Arrays.stream(values);
List<String> strings1 =
   stream.map(Object::toString).collect(Collectors.toList());

Pakowanie mutowalnych redukcji w Collector oprócz uproszczenia składni przynosi jeszcze jedną korzyść:
kompozycyjność. Klasa Collector zawiera liczne predefiniowane fabryki kolektorów, włączając kombinatory, które
przekształcają jeden kolektor w drugi.

Przypuśćmy, że mamy kolektor obliczający sumę pensji w strumieniu pracowników:

Collector<Pracownik,?, Integer> sumowaniePensji =
    Collectors.summingInt(Pracownik::getPensja);

Znak zapytania oznacza, że nie jest dla nas istotne jakich pośrednich reprezentacji użyje wewnętrznie
kolektor.

Możemy go wykorzystać tak (streams2.Reduction05.java):

Integer aa = str.collect(Collectors.summingInt(Pracownik::getPensja));
System.out.println(aa);

albo tak:

Integer ab = str.collect(sumowaniePensji);

W przypadku, gdy chcemy utworzyć kolektor grupujący pensje według departamentu może ponownie użyć kolektora z jednoczesnym użyciem groupingBy (streams2.Reduction06.java):

Map<String, Integer> salByDept =
   str.collect(Collectors.groupingBy(
Pracownik::getDepartament, sumowaniePensji));

Redukcja konkurencyjna (współbieżna)

Wykonywanie niektórych złożonych operacji redukcji równolegle, np. collect(), w wyniku których powstają nowe kolekcje mogą być bardzo kosztowne czasowo. Dlatego na ogół równolegle wykonuje się operacje przy użyciu kolekcji współbieżnych

Redukcja konkurencyjna to redukcja równoległa z użyciem kolekcji współbieżnych np. ConcurrentHashMap.

Collector, który dostarcza redukcji konkurencyjnej jest oznakowany jako Collector.Characteristics.CONCURRENT. Niestety kolekcje współbieżne mają swoją ciemną stronę. Jeśli wiele wątków składa wyniki współbieżnie w dzielonym kontenerze, porządek w którym wyniki są układane nie jest deterministyczny. W konsekwencji redukcja konkurencyjna jest możliwa tylko wtedy, gdy uporządkowanie nie jest istotne dla przetwarzanego strumienia.

Implementacja Stream.collect(Collector) będzie wykonywała redukcję współbieżnie jeŚli:

  • Strumień jest równoległy
  • Kolektor ma charakterystykę Collector.Characteristics.CONCURRENT i
  • albo strumień jest nieuporządkowany albo kolektor ma charakterystykę Collector.Characteristics.UNORDERED

Używając metody BaseStream.unordered() można stwierdzić czy strumień jest uporządkowany.

W przypadku, gdy uporządkowanie elementów jest istotne nie można używać redukcji współbieżnej.

Asocjacyjność

Operator lub funkcja jest asocjacyjna gdy spełnia warunek:

(a op b) op c == a op (b op c)

gdzie 'op’ jest operacją np. mnożenia.

W równoległości wykorzystuje się szczególnie asocjacyjność typu:

a op b op c op d == (a op b) op (c op d)

Niskopoziomowa konstrukcja strumieni

Metody stream klasy Collection i Arrays pozwalają na otrzymywanie strumieni. Klasa StreamSupport zawiera kilka niskopoziomowych metod do tworzenia strumieni.

Spliterator jest równoległym analogiem iteratora Iterator. Spliterator opisuje (możliwie nieskończoną) kolekcję elementów z możliwością sekwencyjnego przechodzenia naprzód, bąbelkowego przechodzenia i dzielenia niektórych porcji wejścia na spliteratory, które mogą być przetwarzane równolegle. Na najniższym poziomie, wszystkie strumienie są kierowane przez spliterator.

Istnieją różne możliwości implementacji spliteratorów, z których niemal wszystkie są kompromisem pomiędzy prostotą implementacji, a ich efektywnością.

Najprostszym spliteratorem, ale najmniej efektywnym jest spliterator utworzony z iteratora przy użyciu metody Spliterators.spliteratorUnknownSize(java.util.Iterator, int).

Lepszy spliterator będzie dostarczał dobrze zbalansowanych podziałów o dobrze znanej wielkości oraz licznych charakterystyk spliteratora lub danych, które mogą być użyte przez implementację do optymalizacji wykonania.

Spliteratory dla mutowalnych danych podlegają dodatkowym wyzwaniom: taktowanie wiązania do danych ponieważ dane mogą zmieniać się pomiędzy czasem, gdy spliterator jest tworzony, a czasem gdy potok strumienia jest wykonywany.

Idealnie, spliterator powinien mieć w charakterystyce IMMUTABLE lub CONCURRENT, a jeśli nie ma powinien zapewniać późne wiązanie.

Jeśli źródło danych nie może bezpośrednio dostarczyć rekomendowanego spliteratora, może pośrednio dostarczyć spliteratora używając Supplier i skonstruować strumień przez wersje stream(), które akceptują Supplier. Spliterator jest otrzymywany z dostarczyciela tylko wtedy, gdy zacznie się kończąca operacja w potoku strumienia.