clear

Resources

Access the Resources you need to get started !

> Find out more
Image for post

Starting and stopping an application properly is an often underlooked challenge. Tackling domain complexity is the obvious priority, and we often postpone such mundane concerns for later. Yet, in today’s highly available distributed systems, robust and failsafe application initialization and termination are essential to maintain service quality and data integrity during updates.

Here are some aspects to getting a service off and on the ground:

  • initialize and terminate internal components in a specific order
  • flush buffers to persistent storage to avoid loss of data
  • close database connections and more generally release resources to avoid timeouts and starvation
  • stop acceptation of new requests while finalizing in-flight responses
  • coordinate cluster membership and node hand-off

This article describes a set of simple abstractions that we use to start and shut down our services. This accelerates our development, ensures reliable continuous delivery operations, and makes our codebase more uniform.

We also describe how to take advantage of these abstractions with Akka to define application startup, while delegating shutdown to Akka’s built-in CoordinatedShutdown mechanism.

Stopping

Here’s how we define a Stoppable process:

This trait bears a type parameter T which represents the type of value returned upon stop process completion. The typical candidate for this parameter is Either[E, Unit] where E is some error type informing about a failure during graceful shutdown.

Reacting to stopped

Triggering a stop is done via a call to triggerStop(). Here as well, this trigger is handled by subscribing to the future returned by stopTriggered. The subscriber, typically the very same component extending Stoppable, is expected to carry out the necessary tear-down actions and call notifyStopped upon completion (which in turn signals anyone listening to stopped, as described above). Note that we are using simple promises to implement these notification contracts, but other mechanisms such as Monix’s Subject would work just as well.

Image for post

Example of stoppable flow (dashed arrows represent Future callbacks).

This simple set of definitions already allows the description of the asynchronous interactions of some process triggering a stop of another and termination observers. Compared to a simpler version where a single future is used to terminate a process, this reactive solution has the advantage that the concern of triggering the stop is decoupled from the one of keeping track of the shutdown. Also, the stop can be triggered from several points in the program flow without leading to competing shutdown sequence executions.

Starting

For such cases, we introduce an explicit asynchronous method, unsurprisingly named start(), which returns a Future[Either[E, Unit]]. Unlike stop, start is a direct process: the caller gets a Future on the result of the start operation. This simpler approach fits the common use case since as mentioned previously, processes do typically not start on their own but according to some controllable stimulus (e.g. running the application). This call is also usually located in a single place within the code (the object extending App for instance).

Start and stop define a Service

This trait extends Stoppable with the notion of asynchronous start. We could also have defined start() in some separate Startable trait, but the need hasn’t arisen until now. It is indeed often the case that components featuring asynchronous initialization also require the corresponding asynchronous tear-down. Nonetheless, we still support an easy way to create start-only services via our builder, as explained below.

Service builder

  • a “startable”, for instance an in-memory cache which takes time to load:
  • a service with error-free initialization and shut-down:
  • a service requiring a specific kafka topic and for which initialization can fail:
  • a service with only stop logic, described using the previously mentioned Stoppable abstraction: Service.withName("stop-only").withStoppable(theStoppable).build

Composite Service

When composing services together, we define the composed start as the ordered sequence of initialization steps of each service. By extension, the composed termination is the sequence of corresponding stop operations, applied in the reverse order. With these definitions, the composition itself can be considered a service, with start and stop implementing sequencing for “child” services.

Although this generic composite behavior seems trivial at first sight, there are some interesting challenges:

  • services can stop at any point in time, in which case we want to stop the whole composition in turn
  • services can stop while the composition is still initializing, in which case we want to abort and stop only the services that were already started
  • similarly, one of the services may fail to start, in which case we also need to abort and stop already started services
  • triggering a stop of the composite while it is starting should abort the ongoing sequence and stop already started services
  • we should have a way to define timeouts to make sure stop methods do not exceed some maximal acceptable duration, to avoid freezing processes

The required orchestration to satisfy these constraints involves time and events, so we will make use of functional reactive operators available in Monix to express it compactly. Note that only the internals of the composite service requires this machinery, the outer API of the composite is (by definition) the same as any other service and is thus expressed with Future. Akka Streams would also be fine for this job, but Monix has an edge when it comes to testing, as we’ll illustrate further below.

Composite start

  • startTriggered: Task[Unit]: start signal, represented with a Monix Task (tasks are in essence lazy Future)
  • stopTriggered: Task[Unit]: stop signal
  • triggerStop: () => Unit: trigger for stop signal
  • services: List[Service]: list of services
  • def startService(service: Service): Task[Service]: wraps a call to the start method of the service within a Task, failing the task with ServiceStartFailed in case of error
  • case class StartResults(startedServices: List[Service], failure: Option[ServiceStartError]): data structure to capture the results of the composite start operation
  • def registerForServiceStopOrError(service: Service, triggerStop: () => Unit) = service.stopped.foreach(_=> triggerStop()): calls triggerStop mentioned above when a child service stops

Here’s the expression of the start sequence:

Let’s review each involved operator (for visual description, you can refer to the marble diagrams further below):

  • Observable.fromTask: convert a task to an Observable. Observable is an abstraction for a back-pressured stream
  • ....ambWith(Observable.fromTask(stopTriggered))): amb picks the first stream which emits a value and “sticks” to it. We use this here to short-circuit the start if a stop is triggered before even starting
  • Observable.fromIterable(services): iterate the collection of services into a stream, with built-in back-pressure, i.e. the next value is only delivered with the agreement of the downstream operator. Combined with mapEval (see below), this implements a sequential service start
  • takeUntil(Observable.fromTask(stopTriggered)): takeUntil stops the stream whenever a value is emitted on the other parameter stream. We use it here to implement early stop
  • mapEval(startService): call the startService method described above
  • .map(_.asRight).onErrorHandleWith { case failure: ServiceStartFailed => Observable(failure.error.asLeft)}: capture success or failure into an either value
  • .takeWhileInclusive(_.isRight): maintain the stream while initializations are successful – in case of failure, abort and include the failure as last value
  • scan: for each value in the stream, emit a new StartResults consolidated element. We also make a side-effecting call to registerForServiceStopOrError when the service starts successfully to enable observation of the child service stopped signal

Image for post

Composite start marbles for various scenarios.

Composite stop

  • stopTriggered: Task[Unit]: stop signal
  • startResults: Task[StartResults]: results of the start operation

Stopping is about triggering the stop of each started service in reverse order, sequentially.

Stopping a service is implemented with:

Task.deferFuture turns the passed future into a lazy one, so that we trigger service stop only upon running the task.

We rely on the stopped signal of the service to continue on to the next, with the addition of a timeout mechanism. This prevents one service from holding up the stop of others indefinitely and limits the scope of any possible resource leak.

This stopService function is used in the following flow:

Let’s review it line-by-line:

  • Task.fromFuture(stopTriggered).flatMap(_ => startResults): the stop signal is chained with the asynchronous result of start. The shutdown is thus delayed until initialization is completed. This isn’t an issue however since the start is short-circuited in case of an early stop. The idea is that even in case of abort, stopping only happens after the ongoing start step terminates – so that the system state is always fully defined.
  • Observable.fromIterable(results.startedServices.reverse).mapEval(stopService): initiates the stream of sequential stop operations in reverse order for services that were successfully started (and only these)
  • foldLeftL(...): results of stop operations are aggregated into a list so that we can have informative logging feedback about the shutdown

Image for post

Marble diagram for the successful stop scenario.

Testing

The scheduler abstraction allows taking full control of flow during testing, which can become invaluable with complex streams.

Managing a distributed Akka application lifecycle

Even with the help of CoordinatedShutdown, bringing a distributed HTTP application down safely requires some careful orchestration, as not even one HTTP request is supposed to be lost when an instance goes down. To achieve this lofty goal, we’ll first need some integration with the container orchestrator.

Liveness and readiness probes

  • liveness: indicates whether the application should be left running. The orchestrator will restart the container if the probe fails, to avoid nodes hanging forever in a limbo state ;
  • readiness: indicates that the application is ready to receive traffic.

When operating a cluster with Akka management extensions, we can take advantage of the built-in support for health checks. In particular, two endpoints are exposed out of the box by the Akka management server:

  • /alive: meant to be used as a liveness probe (the default implementation simply returns ‘OK’) ;
  • /ready: meant to be used as a readiness probe. When using the Akka management cluster HTTP module, the result depends on cluster membership matching a configurable value (typically, that the node has joined the cluster).

Akka also lets us define custom health checks to be added to (or to replace) the built-in ones (when enabling more than one check, the probe only succeeds if all checks are green).

Not one HTTP request lost!

  1. We start by stopping message consumption from Kafka. The local consumer is generally part of a group ID, leaving it up to the other consumers in the group to continue handling incoming data
  2. The readiness probe is disabled so that Kube eventually stops directing us further traffic
  3. We allow ongoing requests to complete and give Kube time to find out that the probe is disabled. This phase supports a timeout as we can’t afford to wait indefinitely either
  4. After this request “draining” period, it is time to leave the cluster, which is all driven by CoordinatedShutdown. Any cluster singleton instance running on the node is respawned somewhere else. Incoming messages to actors residing in the downing node are buffered during hand-off so that they can be recovered in another node without losing a single message (this requires persistent actors obviously, the state itself isn’t transferred)
  5. After leaving the cluster, we can take care of closing any remaining resources before the application exits

Here’s a sequence diagram of this orchestration (the terminology is explained further down):

Image for post

Sequence diagram for Akka Cluster HTTP application shutdown

Akka’s Coordinated Shutdown

The methods above expose only the phases which are meant to accommodate user-defined tasks.

Custom readiness probe

Along with this probe, we also register some custom tasks to CoordinatedShutdown to orchestrate request drain. Parameters to this logic are:

  • binding: the Akka-HTTP binding
  • pendingRequestsHardDeadline: hard deadline after which opened connections are force-closed
  • unbindingDelay: duration before rejection of incoming connections (the time allowed for traffic redirection)

Here’s the code, in the form of an extension method:

Application Builder DSL

  1. Start services dependent on the actor system but which do not require cluster membership
  2. Bind the HTTP routes so that we are ready to receive traffic whenever the Akka Cluster readiness probe indicates we have joined the cluster
  3. Join the Akka Cluster
  4. Start services depending on cluster membership

We can implement each phase as a service and sequence it using CompositeService. To capture these common patterns and make it easier to bootstrap new Akka applications, we have created a small higher-level DSL which takes care of scaffolding the application, while also hooking up the termination of its components to coordinated shutdown phases.

Here’s an example composition of such an application:

  • SomeEtcdService: this imaginary service doesn’t require cluster membership to start pulling data from ETCD using Akka Streams, so we initialize it first. Its stop operation is hooked up with beforeActorSystemTerminate.
  • HttpServiceDefinition: this type captures relevant information to expose HTTP endpoints:
  • SomeEntityService: a service making use of sharded persistent actors. This is typically best initialized when cluster membership has been confirmed
  • SomeKafkaConsumerService: once the system is ready to send commands to persistent actors, it’s the time to start consuming from Kafka sources and committing offsets

If any of these services fail during startup, CoordinatedShutdown is triggered. Since our services have hooked their stops into its various phases upon startup, the shutdown is entirely driven by Akka.

The composite service returned from the builder can even be composed further, which typically allows for describing an initial migration phase before the creation of the actor system. This can be beneficial in a cluster configuration since Akka already takes steps towards joining the cluster upon instantiation. Note however that by default CoordinatedShutdown will shut down the JVM, so care must be taken not to rely on outer stoppers (or to override this setting).

Summary

How do you deal with those challenges in your applications? We are curious to find out, please let us know!

clear

Monthly news

and information from bestmile

Subscribe now!

Get the latest mobility news, articles,
and events delivered to your inbox.
Fill out the form below to receive exclusive access to news and information from Bestmile.