Akka Streams Basics
конспект курса «Lightbend Akka Streams for Java - Professional» by Lightbend Academy
Введение
Stream - это последовательность данных, состоящая из отдельных элементов. Обычно размер stream’а заранее неизвестен, а размер данных, проходящих через stream, слишком велик для размещения в памяти. Примеры: онлайн видео поток, данные фитнес-браслета.
Reactive Streams - это инициатива по созданию стандарта асинхронной потоковой обработки данных с обеспечением неблокирующего контроля интенсивности их поступления (back pressure). Готовность к получению новой порции данных, при этом, исходит от потребителя данных (subscriber) к поставщику (publisher). Основная цель данной инициативы - обеспечение взаимодействия между различными (но соответствующими стандарту) реализациями.
Стандартными компонентами Reactive Streams в Java являются:
Модуль Akka Streams предоставляет высокоуровневое API для работы со stream’ами, а интерфейсы Reactive Streams используются только в низкоуровневом API.
Схема линейного потока данных:
{Внешний источник данных} --> [Source] --> [Flow] --> [Sink] --> {Внешний потребитель данных}
Пример простого stream’а:
akka.actor.ActorSystem actorSystem = akka.actor.ActorSystem.create("system");
akka.stream.Materializer materializer = akka.stream.Materializer.createMaterializer(actorSystem);
akka.stream.javadsl.Source.from(List.of(1, 2, 3, 4))
.via(akka.stream.javadsl.Flow.of(Integer.class).map(elem -> elem * 2))
.to(akka.stream.javadsl.Sink.foreach(System.out::println))
.run(materializer);
Sources
Source
- это вход в stream.
Source
публикует данные только по запросу от последующих стадий.
Некоторые типы Source
‘ов:
Source::empty()
Source::single(elem: T)
Source::repeat(elem: T)
Source::tick(delay: Duration, interval: Duration, elem: T)
Source::from(Iterable)
Source::fromIterator(akka.japi.function.Creator<Iterator<T>>)
Source::cycle(akka.japi.function.Creator<Iterator<T>>)
Source::unfold(initialValue: T, Function<T, Optional<Pair<T, R>>>)
Source::unfoldAsync(initialValue: T, Function<T, CompletionStage<Optional<Pair<T, R>>>>)
Source::actorRef(completionMatcher: akka.japi.function.Function<Object, Optional<akka.stream.CompletionStrategy>>, failureMatcher: akka.japi.function.Function<Object, Optional<Throwable>>, bufferSize: int, OverflowStrategy)
FileIO::fromPath(java.nio.file.Path, chunkSize: int)
Tcp.get(akka.actor.ActorSystem)::bind(interface: String, port: int)
StreamConverters::fromInputStream(akka.japi.function.Creator<InputStream>)
Sinks
Sink
- это выход из stream.
Sink
управляет поступлением новых данных.
Некоторые типы Sink
‘ов:
Sink::ignore()
Sink::forEach(akka.japi.function.Procedure<T>)
Sink::head()
Sink::last()
Sink::headOption()
Sink::lastOption()
Sink::seq()
Sink::fold(T zero, akka.japi.function.Function2<T, In, T> f)
Sink::reduce(akka.japi.function.Function2<In, In, In> f)
Sink::actorRef(ref: ActorRef, onCompleteMessage)
Sink::actorRefWithBackpressure(ref: ActorRef, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage: Function<Throwable, Object>)
FileIO::toPath(java.nio.file.Path)
StreamConverters::fromOutputStream(out: scala.Function0<java.io.OutputStream>, autoFlush: boolean)
Flows
Flow
- используется для манипулирования данными идущими от Source
к Sink
.
Некоторые типы Flow
:
Flow::map(akka.japi.function.Function<R, T>)
Flow::mapAsync(parallelism: int, Function<R, java.util.concurrent.CompletionStage<T>>)
Flow::mapConcat(akka.japi.function.Function<Out, Iterable<T>>)
Flow::grouped(chunkSize: int)
Flow::sliding(windowSize: int, step: int)
Flow::fold(zero: T, akka.japi.function.Function2<T, R, T>)
Flow::scan(zero: T, akka.japi.function.Function2<T, R, T>)
Flow::filter(akka.japi.function.Predicate<T>)
Flow::collect(scala.PartialFunction<R, T>)
Flow::takeWithin(Duration)
Flow::dropWithin(Duration)
Flow::groupedWithin(chinkSize: int, Duration)
Flow::log(name: String)
Flow::zip(source: akka.stream.Graph<akka.stream.SourceShape<T>, ?>)
Flow::flatMapConcat(akka.japi.function.Function<R, ? extends Graph<SourceShape<T>, M>>)
Flow::flatMapMerge(breadth: int, akka.japi.function.Function<R, ? extends Graph<SourceShape<T>, M>>)
Flow::buffer(size: int, akka.stream.OverflowStrategy)
Flow::batch(max: long, seed: akka.japi.function.Function<Out, S>, aggregate: akka.japi.function.Function2<S, Out, S>)
Некоторые, доступные во Flow
, трансформации могут быть выполнены методами Source
.
Runnable Graphs
RunnableGraph - представляет собой соединенные друг с другом Source
, Flow
(необязательно) и Sink
.
Материализация (materialization) заключается в выделение необходимых ресурсов для запуска stream’а, и выполняется с помощью так называемых терминальных операций, таких как run()
и runWith()
.
Результатом материализации stream’а является материализованное значение которое предоставляет возможность взаимодействия с ним.
Отказоустойчивость
Отказоустойчивость в Akka Streams реализована в виде стратегий, которые настраиваются с помощью атрибутов RunnableGraph
‘а (или отдельных стадий).
Для завершения, в случае ошибки, stream’а с определенным значением используется метод recover
.
Графы
Объединения и разветвления stream’ов реализованы в виде соединителей (junctions).
Для создания сложных графов Akka Streams предоставляет Graph DSL.
Fusing
Все операторы Akka Streams, по умолчанию, сливаются (fusing) вместе и выполняются последовательно. Т.е. каждый элемент должен пройти все этапы перед тем как в stream поступит новый элемент.
Данное поведение настраивается параметром akka.stream.materializer.auto-fusing
.
Для асинхронного выполнения любой стадии используется метод async()
.
Следует учитывать, что асинхронное выполнение стадии в stream приведет к накладным расходам в виде дополнительных акторов (и их mailbox’ов) и буферов.