конспект курса «Lightbend Akka for Scala - Professional» by Lightbend Academy

Что такое Akka

Akka - это набор инструментов для создания высоконагруженных, распределенных, и отказоустойчивых приложений основанных на обмене сообщениями для Java и Scala.

Также Akka представляет собой реализацию модели акторов для JVM.

Актор - это фундаментальная единица вычислений, воплощающая обработку, хранение и коммуникацию.

Основные концепции модели акторов:

  • все есть актор
  • у каждого актора есть адрес
  • в ответ на полученное сообщение актор может:
    • отправить сообщения другим акторам
    • создать новые акторы
    • выбрать поведение, которое будет использоваться при обработке следующего сообщения

Создание Актора

Акторы организованы в виде иерархической системы (actor system) в которой каждый актор имеет уникальный идентификатор (путь) и предка, который за ним наблюдает.

Верхнеуровневые акторы:

  • / - так называемый корневой страж, останавливается последним (прародитель всех акторов в системе);
  • /system - системный страж, останавливается после пользовательского стража (используется, например, в качестве родителя для актора-логгера);
  • /user - пользовательский страж, верхнеуровневый пользовательский актор, используемый приложением.

actor-top-tree

Система акторов Akka также предоставляет:

  • фабрику верхнеуровневых пользовательских акторов: actorOf
  • диспетчеры и пулы потоков
  • сервис планировщика
  • доступ к конфигурации
  • поток событий в стиле издатель-подписчик

Создание системы акторов:

ActorSystem system = ActorSystem.create(name + "-system");

Остановка:

system.terminate();
scala.concurrent.Await.ready(system.whenTerminated(), scala.concurrent.duration.Duration.Inf());

Каждый актор представлен своим akka.actor.ActorRef, который дает возможность отправки ему сообщений. Ссылка на сам актор, при этом, недоступна.

Отправитель --[сообщение]--> ActorRef --[сообщение]--> Mailbox <--(планирование/обработка)--> Dispatcher

Актор обрабатывает по одному сообщению за раз (возможно в разных потоках). Таким образом обеспечивается иллюзия последовательного выполнения и безопасная работа с изменяемым состоянием. Прием сообщения и его обработка происходят независимо и, обычно, в разных потоках.

class MyActor extends akka.actor.AbstractLoggingActor {

  public static Props props() {
    return akka.actor.Props.create(MyActor.class, MyActor::new);
  }

  @Override
  public Receive createReceive() {
    // начальное поведение (behavior)
    return receiveBuilder()
        .matchAny(msg -> log().info("{}", msg))
        .build();
  }
}

Логгирование необработанных сообщений настраивается параметром akka.actor.debug.unhandled = on.

Создание актора:

ActorRef myActor = system.actorOf(MyActor.props(), "my-actor");

Создание актора - асинхронная операция, но ActorRef доступен сразу.

Для конфигурации Akka использует формат HOCON и библиотеку Typesafe Config. Именем конфигурационного файла по-умолчанию является application.conf.

Пример конфигурации:

akka {
  loggers = [akka.event.slf4j.Slf4jLogger]
  loglevel = DEBUG
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

  actor {
    debug {
      unhandled = on
    }
    deployment {
      /my-actor {
        router = round-robin-pool
        nr-of-instances = 4
      }
    }
    default-dispatcher {
      fork-join-executor {
        parallelism-min = 4
        parallelism-factor = 2.0
        parallelism-max = 16
      }
    }
  }
}

Взаимодействие с Актором

Взаимодействие с актором происходит исключительно через обмен сообщениями:

// не дожидается ответа и сразу возвращает управление
myActor.tell("Hello, Actor!", akka.actor.Actor.noSender() /* или self() если из актора */);

Правило: сообщения должны быть неизменяемыми.

Сообщения (протокол) обычно определяются внутри самого актора в виде вложенных классов.

Пример ответа на сообщение:

class EchoActor extends akka.actor.AbstractActor {
  ...
  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .matchAny(msg -> sender().tell(msg, self()))
        .build();
  }
}

Если отправитель не существует или остановлен, то ответ попадет в ActorSystem.deadLetters.

Выборка подгруппы акторов осуществляется с помощью метода ActorRefFactory::actorSelection.

Пример:

system.actorSelection("../some-actor");

// из актора
context().actorSelection("/user/my-actor");

Получить ссылку на конкретный актор в выборке можно отправив ей сообщение Identify. См. документацию.

Внутреннее состояние Актора

Каждый актор имеет доступ к своему контексту с помощью метода context().

Контекст обеспечивает:

  • доступ к self() и отправителю текущего сообщения
  • возможность создания и остановки акторов
  • доступ к родительскому актору и акторам-потомкам
  • возможность изменения своего поведения
  • возможность перенаправления сообщений другому актору
  • и т.д.

Создание актора-потомка:

class MyActor extends akka.actor.AbstractActor {
  ...
  ActorRef createChildActor() {
    context().actorOf(ChildActor.props(), "child-actor");
  }
}

За счет последовательной обработки сообщений акторы могут безопасно работать с внутренним изменяемым состоянием.

Akka (на основе JMM) гарантирует, что изменения внутренних полей актора будут видимы при обработке следующего сообщения. Таким образом нет необходимости в использовании volatile или классов из пакета java.util.concurrent.atomic.

Планирование отправки сообщений Актору

Для планирования отправки актору сообщений Akka предоставляет:

  • глобальный (в рамках системы акторов) планировщик
  • таймеры для отправки актором сообщений саму себе

Пример использования планировщика:

system.scheduler().scheduleOnce(java.time.Duration.ofSeconds(1L), myActor, "Message", 
    system.dispatcher(), ActorRef.noSender());

Пример использования таймеров:

class MyActor extends AbstractActorWithTimers {

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .matchEquals(Messages.INIT, ignore -> timers()
            .startTimerWithFixedDelay("timer-key", Message.TICK, Duration.ofSeconds(1L)))
        .matchEquals(Messages.TICK, msg -> { /* do something */ })
        .matchEquals(Messages.STOP, ignore -> timers().cancel("timer-key"))
        .build();
  }

  enum Messages {
    INIT, TICK, STOP
  }
}

Преимущества таймеров при отправке актором сообщений самому себе:

  • таймеры автоматически отменяются при остановке или перезапуске актора
  • помещенные в mailbox сообщения от впоследствии отмененного таймера не будут переданы актору

Тестирование Акторов

Для тестирования акторов используется модуль akka-testkit.

Тестирование акторов напрямую в синхронном стиле возможно с помощью класса TestActorRef. Более предпочтительным является тестирование с использованием ActorRef и паттерна ask.

Проверка обмена сообщениями между акторами обычно выполняется с помощью актора-заглушка TestProbe.

Пример:

@Test
void test() {
    TestProbe probe = TestProbe.apply(system);
    ActorRef myActor = system.actorOf(MyActor.props(), "my-actor");
    myActor.tell("ping", probe);
    probe.expectMsg("pong");
}

Жизненный цикл Актора

Actor Lifecycle

Ссылка на документацию.

Актор обрабатывает сообщения только между фазами started и stopped.

Остановка актора выполняется с помощью:

  • вызова метода ActorRefFactory::stop(actor: ActorRef)
  • отправки актору сообщения PoisonPill - выполнение context().stop(self()) при извлечении из mailbox
  • отправки актору сообщения Kill - возбуждение ActorKilledException при извлечении из mailbox

Пример:

system.stop(myActor);

// из актора
context().stop(self());

myActor.tell(PoisonPill.getInstance(), ActorRef.noSender());

myActor.tell(Kill.getInstance(), ActorRef.noSender());

При остановке актор:

  1. обрабатывает текущее сообщение
  2. приостанавливает дальнейшую обработку сообщений
  3. останавливает своих потомков и ожидает их завершения
  4. останавливается сам

Т.к. сообщения PoisonPill и Kill не передаются актору, для выполнения перед остановкой завершающих действий необходимо использовать соответствующее пользовательское сообщение.

Актор может отслеживать остановку другого актора:

class MyActor extends akka.actor.AbstractLoggingActor {

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(Watch.class, msg -> context().watch(msg.actor))
        .match(akka.actor.Terminated.class, msg -> log().info("Actor {} is stopped.", msg.actor()))
        .build();
  }

  static class Watch {
    final ActorRef actor;
    
    Watch(ActorRef actor) {
      this.actor = actor;
    }
  }
}

Не обработанное сообщение Terminated приведет к возбуждению DeathPactException.

Отказоустойчивость

Отказоустойчивость в Akka реализована в виде родительского контроля (parental supervision).

В случае возникновения ошибки актор и его потомки приостанавливаются, а её обработка (зависит от выбранной стратегии) поручается родительскому актору.

Akka предоставляет две готовых стратегии:

  • OneForOneStrategy - применяет заданное действие только к актору в котором возникла ошибка
  • AllForOneStrategy - применяет заданное действие ко всем потомкам родителя

Стратегии конфигурируются соответствием исключений и директив.

Если для исключения не найдено подходящей директивы, то оно возбуждается повторно.

Возможные директивы:

  • SupervisorStrategy.Stop - возобновление обработки сообщений актором
  • SupervisorStrategy.Restart - замена актора и возобновление обработки сообщений
  • SupervisorStrategy.Stop - остановка актора
  • SupervisorStrategy.Escalate - эскалация ошибки (повторное возбуждение исключения)

Сообщения при этом не теряются (кроме того, что вызвало ошибку).

По-умолчанию используется стратегия OneForOneStrategy со следующим соответствием:

Роутеры и Диспетчеры

Конкурентность - это свойство программы, в то время как параллелизм - свойство компьютера.

Т.к. каждый актор обрабатывает по одному сообщению за раз, для увеличения производительности необходимо множество параллельно работающих акторов.

Для распределения сообщений между группой акторов используются роутеры (router).

Сообщения отправленные роутеру перераспределяются между акторами-получателями (routees).

Акторы-получатели могут создаваться как самим роутером (pool) (с возможностью конфигурирования) так и вне его (group).

Роутер-пул делегирует ошибки получателей своему предку. В роутере-группе обработка ошибок делегируется предкам самих получателей.

Пример:

akka {
  ...
  actor {
    deployment {
      /my-actor {
        router = round-robin-pool
        nr-of-instances = 4
      }
    }
  }
}
system.actorOf(FromConfig.getInstance().props(MyActor.props()), "my-actor");

Akka предоставляет следующие алгоритмы распределения сообщений:

Диспетчеры являются частью ядра Akka и отвечают за выделение актору процессорного времени на обработку следующего сообщения.

Явное назначение диспетчера актору:

my-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 16
  }
  # определяет количество сообщений
  # которое актор должен обработать
  # перед тем как поток перейдет другому актору
  throughput = 1
}
ActorRef myActor = system.actorOf(Props.create(MyActor.class).withDispatcher("my-dispatcher"), "my-actor");

По-умолчанию акторам назначается диспетчер akka.actor.default-dispatcher.

Существует несколько видов диспетчеров:

  • Dispatcher - событийный диспетчер, использующий пул потоков для множества акторов
  • PinnedDispatcher - назначает каждому актору отдельный поток (однопоточный пул)
  • CallingThreadDispatcher - запускает обработку сообщения в текущем потоке, используется для тестирования

Изменение поведения Актора

Akka позволяет изменять поведение актора в ходе выполнения:

class MyActor extends AbstractActor {

  @Override
  public Receive createReceive() {
    return initialBehavior();
  }

  Receive initialBehavior() {
    return receiveBuilder()
        .matchEquals("init", ignore -> context().become(mainBehavior().onMessage()))
        .build();
  }

  Receive mainBehavior() {
    return receiveBuilder()
        .matchAny(msg -> {
            // do something
        }).build();
  }
}

При перезапуске актор возвращается к своему начальному поведению.

Сообщения, которые не подходят для текущего поведения актора, могут быть спрятаны для последующей обработки.

Размер очереди спрятанных сообщений задается параметром akka.actor.default-mailbox.stash-capacity.

Получение ответа от Актора

Для получения от актора ответа используется паттерн ask. При его использовании (в контексте java) ответом будет объект класса java.util.concurrent.CompletionStage. CompletionStage завершится либо после получения от актора ответа, либо в случае возникновения ошибки.

Для передачи результата CompletionStage актору используется паттерн pipe.

Пример:

akka.pattern.Patterns
    .pipe(akka.pattern.Patterns.ask(otherActor, "How are you?", Duration.ofSeconds(5L)), system.dispatcher())
    .to(self());