Java może zrównoleglić operacje strumieniowe, aby wykorzystać systemy wielordzeniowe. Ten artykuł przedstawia perspektywę i pokazuje, jak strumień równoległy może poprawić wydajność na odpowiednich przykładach.
Strumienie w Javie
strumień w Javie to sekwencja obiektów reprezentowanych jako kanał danych. Zwykle ma źródło gdzie znajdują się dane i miejsce docelowe gdzie jest transmitowany. Zauważ, że strumień nie jest repozytorium; zamiast tego działa na źródle danych, takim jak tablica lub kolekcja. Bity pośrednie w przejściu są w rzeczywistości nazywane strumieniem. W trakcie transmisji strumień zwykle przechodzi jedną lub więcej możliwych transformacji, takich jak filtrowanie lub sortowanie, lub może to być dowolny inny proces operujący na danych. To dostosowuje oryginalne dane do innej formy, zazwyczaj zgodnie z potrzebami programisty. Dlatego tworzony jest nowy strumień zgodnie z zastosowaną na nim operacją. Na przykład, gdy strumień jest posortowany, powstaje nowy strumień, który daje wynik, który jest następnie sortowany. Oznacza to, że nowe dane są przekształconą kopią oryginału, a nie w oryginalnej formie.
Sekwencyjny strumień
Każda operacja strumieniowa w Javie, o ile nie została wyraźnie określona jako równoległa, jest przetwarzana sekwencyjnie. Są to zasadniczo strumienie nierównoległe, które wykorzystują pojedynczy wątek do przetwarzania potoku. Strumienie sekwencyjne nigdy nie korzystają z systemu wielordzeniowego, nawet jeśli system bazowy może obsługiwać wykonywanie równoległe. Co się dzieje, na przykład, gdy zastosujemy wielowątkowość do przetwarzania strumienia? Nawet wtedy działa na jednym rdzeniu na raz. Może jednak przeskakiwać z jednego rdzenia na drugi, chyba że zostanie wyraźnie przypięty do określonego rdzenia. Na przykład przetwarzanie w czterech różnych wątkach w porównaniu z czterema różnymi rdzeniami jest oczywiście inne, gdy pierwszy nie pasuje do drugiego. Wykonywanie wielu wątków w środowisku jednordzeniowym jest całkiem możliwe, ale przetwarzanie równoległe to zupełnie inny gatunek. Program musi być zaprojektowany od podstaw do programowania równoległego, oprócz wykonywania w środowisku, które go obsługuje. To jest powód, dla którego programowanie równoległe jest złożoną dziedziną.
Wypróbujmy przykład, aby lepiej zilustrować ten pomysł.
package org.mano.example; import java.util.Arrays; import java.util.List; public class Main2 { public static oid main(String[] args) { List<Integer> list=Arrays.asList(1,2,3,4,5,6,7,8,9); list.stream().forEach(System.out::println); System.out.println(); list.parallelStream().forEach(System.out::println); } }
Wyjście
123456789 685973214
Ten przykład jest ilustracją działania q strumienia sekwencyjnego oraz q równoległego strumienia. list.stream() działa sekwencyjnie w jednym wątku z println() operacja. list.parallelStream() z drugiej strony jest przetwarzany równolegle, w pełni wykorzystując podstawowe środowisko wielordzeniowe. Ciekawym aspektem jest wyjście poprzedniego programu. W przypadku strumienia sekwencyjnego zawartość listy jest drukowana w uporządkowanej kolejności. Z drugiej strony wyjście strumienia równoległego jest nieuporządkowane, a kolejność zmienia się za każdym razem, gdy program jest uruchamiany. Oznacza to przynajmniej jedną rzecz:to wywołanie list.parallelStream() Metoda sprawia, że println Instrukcja działa w wielu wątkach, coś, co list.stream() robi w jednym wątku.
Równoległy strumień
Podstawową motywacją do używania strumienia równoległego jest uczynienie przetwarzania strumienia częścią programowania równoległego, nawet jeśli cały program może nie być zrównoleglony. Strumień równoległy wykorzystuje procesory wielordzeniowe, co skutkuje znacznym wzrostem wydajności. W przeciwieństwie do programowania równoległego są one złożone i podatne na błędy. Jednak biblioteka strumieniowa Java zapewnia możliwość łatwego i niezawodnego robienia tego. Cały program nie może być zrównoleglony. ale przynajmniej część, która obsługuje strumień, może być zrównoleglona. W rzeczywistości są one dość proste w tym sensie, że możemy powołać się na kilka metod, a reszta jest już załatwiona. Jest na to kilka sposobów. Jednym z takich sposobów jest uzyskanie równoległego strumienia przez wywołanie funkcji parallelStream() metoda zdefiniowana przez Kolekcja . Innym sposobem jest wywołanie parallel() metoda zdefiniowana przez BaseStream w strumieniu sekwencyjnym. Strumień sekwencyjny jest równoległy przez wywołanie. Należy pamiętać, że podstawowa platforma musi obsługiwać programowanie równoległe, na przykład w systemie wielordzeniowym. W przeciwnym razie inwokacja nie ma sensu. W takim przypadku strumień byłby przetwarzany po kolei, nawet jeśli wykonaliśmy wywołanie. Jeśli wywołanie jest wykonywane w już równoległym strumieniu, nic nie robi i po prostu zwraca strumień.
Aby zapewnić, że wynik przetwarzania równoległego zastosowanego w strumieniu jest taki sam, jak w przypadku przetwarzania sekwencyjnego, strumienie równoległe muszą być bezstanowe, niezakłócające i asocjacyjne.
Szybki przykład
package org.mano.example; import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; public class Main { public static void main(String[] args) { List<Employee> employees = Arrays.asList( new Employee(1276, "FFF",2000.00), new Employee(7865, "AAA",1200.00), new Employee(4975, "DDD",3000.00), new Employee(4499, "CCC",1500.00), new Employee(9937, "GGG",2800.00), new Employee(5634, "HHH",1100.00), new Employee(9276, "BBB",3200.00), new Employee(6852, "EEE",3400.00)); System.out.println("Original List"); printList(employees); // Using sequential stream long start = System.currentTimeMillis(); List<Employee> sortedItems = employees.stream() .sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList()); long end = System.currentTimeMillis(); System.out.println("sorted using sequential stream"); printList(sortedItems); System.out.println("Total the time taken process :" + (end - start) + " milisec."); // Using parallel stream start = System.currentTimeMillis(); List<Employee> anotherSortedItems = employees .parallelStream().sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList()); end = System.currentTimeMillis(); System.out.println("sorted using parallel stream"); printList(anotherSortedItems); System.out.println("Total the time taken process :" + (end - start) + " milisec."); double totsal=employees.parallelStream() .map(e->e.getSalary()) .reduce(0.00,(a1,a2)->a1+a2); System.out.println("Total Salary expense: "+totsal); Optional<Employee> maxSal=employees.parallelStream() .reduce((Employee e1, Employee e2)-> e1.getSalary()<e2.getSalary()?e2:e1); if(maxSal.isPresent()) System.out.println(maxSal.get().toString()); } public static void printList(List<Employee> list) { for (Employee e : list) System.out.println(e.toString()); } } package org.mano.example; public class Employee { private int empid; private String name; private double salary; public Employee() { super(); } public Employee(int empid, String name, double salary) { super(); this.empid = empid; this.name = name; this.salary = salary; } public int getEmpid() { return empid; } public void setEmpid(int empid) { this.empid = empid; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getSalary() { return salary; } public void setSalary(double salary) { this.salary = salary; } @Override public String toString() { return "Employee [empid=" + empid + ", name=" + name + ", salary=" + salary + "]"; } }
W poprzednim kodzie zwróć uwagę, w jaki sposób zastosowaliśmy sortowanie w strumieniu jeden za pomocą wykonywania sekwencyjnego.
List<Employee> sortedItems = employees.stream() .sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList());
a równoległe wykonanie jest osiągane poprzez nieznaczną zmianę kodu.
List<Employee> anotherSortedItems = employees .parallelStream().sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList());
Porównamy również czas systemowy, aby zorientować się, która część kodu zajmuje więcej czasu. Operacja równoległa rozpoczyna się, gdy strumień równoległy zostanie jawnie uzyskany przez parallelStream() metoda. Jest jeszcze inna interesująca metoda, zwana reduce() . Kiedy zastosujemy tę metodę do równoległego strumienia, operacja może wystąpić w różnych wątkach.
Jednak zawsze możemy przełączać się między trybami równoległymi i sekwencyjnymi zgodnie z potrzebami. Jeśli chcemy zmienić strumień równoległy na sekwencyjny, możemy to zrobić, wywołując sekwencyjny() metoda określona przez BaseStream . Jak widzieliśmy w naszym pierwszym programie, operacja wykonywana na strumieniu może być uporządkowana lub nieuporządkowana zgodnie z kolejnością elementów. Oznacza to, że kolejność zależy od źródła danych. Tak jednak nie jest w przypadku strumieni równoległych. Aby zwiększyć wydajność, są one przetwarzane równolegle. Ponieważ odbywa się to bez żadnej sekwencji, gdzie każda partycja strumienia jest przetwarzana niezależnie od innych partycji bez żadnej koordynacji, konsekwencja jest nieprzewidywalna i nieuporządkowana. Ale jeśli chcemy konkretnie wykonać operację na każdym elemencie w strumieniu równoległym, który ma być uporządkowany, możemy rozważyć forEachOrdered() metoda, która jest alternatywą dla forEach() metoda.
Wniosek
Strumieniowe interfejsy API są częścią Javy od dłuższego czasu, ale dodanie usprawnienia przetwarzania równoległego jest bardzo przyjemne, a jednocześnie dość intrygujące. Jest to szczególnie prawdziwe, ponieważ nowoczesne maszyny są wielordzeniowe i istnieje piętno, że projektowanie programowania równoległego jest złożone. Interfejsy API dostarczane przez Javę zapewniają możliwość włączenia odrobiny poprawek programowania równoległego w programie Javy, który ma ogólny projekt wykonania sekwencyjnego. To chyba najlepsza część tej funkcji.