Akka Streams Basics
конспект курса «Lightbend Akka Streams for Java - Professional» by Lightbend Academy
Введение
Stream - это последовательность данных, состоящая из отдельных элементов. Обычно размер stream’а заранее неизвестен, а размер данных, проходящих через stream, слишком велик для размещения в памяти. Примеры: онлайн видео поток, данные фитнес-браслета.
Reactive Streams - это инициатива по созданию стандарта асинхронной потоковой обработки данных с обеспечением неблокирующего контроля интенсивности их поступления (back pressure). Готовность к получению новой порции данных, при этом, исходит от потребителя данных (subscriber) к поставщику (publisher). Основная цель данной инициативы - обеспечение взаимодействия между различными (но соответствующими стандарту) реализациями.
Стандартными компонентами Reactive Streams в Java являются:
Модуль Akka Streams предоставляет высокоуровневое API для работы со stream’ами, а интерфейсы Reactive Streams используются только в низкоуровневом API.
Схема линейного потока данных:
{Внешний источник данных} --> [Source] --> [Flow] --> [Sink] --> {Внешний потребитель данных}
Пример простого stream’а:
akka.actor.ActorSystem actorSystem = akka.actor.ActorSystem.create("system");
akka.stream.Materializer materializer = akka.stream.Materializer.createMaterializer(actorSystem);
akka.stream.javadsl.Source.from(List.of(1, 2, 3, 4))
.via(akka.stream.javadsl.Flow.of(Integer.class).map(elem -> elem * 2))
.to(akka.stream.javadsl.Sink.foreach(System.out::println))
.run(materializer);
Sources
Source - это вход в stream.
Source публикует данные только по запросу от последующих стадий.
Некоторые типы Source‘ов:
Source::empty()Source::single(elem: T)Source::repeat(elem: T)Source::tick(delay: Duration, interval: Duration, elem: T)Source::from(Iterable)Source::fromIterator(akka.japi.function.Creator<Iterator<T>>)Source::cycle(akka.japi.function.Creator<Iterator<T>>)Source::unfold(initialValue: T, Function<T, Optional<Pair<T, R>>>)Source::unfoldAsync(initialValue: T, Function<T, CompletionStage<Optional<Pair<T, R>>>>)Source::actorRef(completionMatcher: akka.japi.function.Function<Object, Optional<akka.stream.CompletionStrategy>>, failureMatcher: akka.japi.function.Function<Object, Optional<Throwable>>, bufferSize: int, OverflowStrategy)FileIO::fromPath(java.nio.file.Path, chunkSize: int)Tcp.get(akka.actor.ActorSystem)::bind(interface: String, port: int)StreamConverters::fromInputStream(akka.japi.function.Creator<InputStream>)
Sinks
Sink - это выход из stream.
Sink управляет поступлением новых данных.
Некоторые типы Sink‘ов:
Sink::ignore()Sink::forEach(akka.japi.function.Procedure<T>)Sink::head()Sink::last()Sink::headOption()Sink::lastOption()Sink::seq()Sink::fold(T zero, akka.japi.function.Function2<T, In, T> f)Sink::reduce(akka.japi.function.Function2<In, In, In> f)Sink::actorRef(ref: ActorRef, onCompleteMessage)Sink::actorRefWithBackpressure(ref: ActorRef, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage: Function<Throwable, Object>)FileIO::toPath(java.nio.file.Path)StreamConverters::fromOutputStream(out: scala.Function0<java.io.OutputStream>, autoFlush: boolean)
Flows
Flow - используется для манипулирования данными идущими от Source к Sink.
Некоторые типы Flow:
Flow::map(akka.japi.function.Function<R, T>)Flow::mapAsync(parallelism: int, Function<R, java.util.concurrent.CompletionStage<T>>)Flow::mapConcat(akka.japi.function.Function<Out, Iterable<T>>)Flow::grouped(chunkSize: int)Flow::sliding(windowSize: int, step: int)Flow::fold(zero: T, akka.japi.function.Function2<T, R, T>)Flow::scan(zero: T, akka.japi.function.Function2<T, R, T>)Flow::filter(akka.japi.function.Predicate<T>)Flow::collect(scala.PartialFunction<R, T>)Flow::takeWithin(Duration)Flow::dropWithin(Duration)Flow::groupedWithin(chinkSize: int, Duration)Flow::log(name: String)Flow::zip(source: akka.stream.Graph<akka.stream.SourceShape<T>, ?>)Flow::flatMapConcat(akka.japi.function.Function<R, ? extends Graph<SourceShape<T>, M>>)Flow::flatMapMerge(breadth: int, akka.japi.function.Function<R, ? extends Graph<SourceShape<T>, M>>)Flow::buffer(size: int, akka.stream.OverflowStrategy)Flow::batch(max: long, seed: akka.japi.function.Function<Out, S>, aggregate: akka.japi.function.Function2<S, Out, S>)
Некоторые, доступные во Flow, трансформации могут быть выполнены методами Source.
Runnable Graphs
RunnableGraph - представляет собой соединенные друг с другом Source, Flow (необязательно) и Sink.
Материализация (materialization) заключается в выделение необходимых ресурсов для запуска stream’а, и выполняется с помощью так называемых терминальных операций, таких как run() и runWith().
Результатом материализации stream’а является материализованное значение которое предоставляет возможность взаимодействия с ним.
Отказоустойчивость
Отказоустойчивость в Akka Streams реализована в виде стратегий, которые настраиваются с помощью атрибутов RunnableGraph‘а (или отдельных стадий).
Для завершения, в случае ошибки, stream’а с определенным значением используется метод recover.
Графы
Объединения и разветвления stream’ов реализованы в виде соединителей (junctions).
Для создания сложных графов Akka Streams предоставляет Graph DSL.
Fusing
Все операторы Akka Streams, по умолчанию, сливаются (fusing) вместе и выполняются последовательно. Т.е. каждый элемент должен пройти все этапы перед тем как в stream поступит новый элемент.
Данное поведение настраивается параметром akka.stream.materializer.auto-fusing.
Для асинхронного выполнения любой стадии используется метод async().
Следует учитывать, что асинхронное выполнение стадии в stream приведет к накладным расходам в виде дополнительных акторов (и их mailbox’ов) и буферов.