Akka Streams: error handling in event processing pipelines
A short read about being careful with error handling in stream design.
Akka Streams is a reasonable choice when it comes to event processing. A wide variety of connectors and integrations makes it easy to assemble a working pipeline and get up to speed quickly.
On the other hand, we have a long list of predefined operators, GraphDSL, support for error handling, a possibility of rolling out custom stages and more. It means, that having a rough idea of what do we want to achieve, there is always a number of ways to approach the stream design. It’s quite easy to assemble a working graph. But to make the code scalable, maintainable and easily extendable, it usually needs experimentation and playing around with different approaches. Especially when building a non-trivial processing pipeline.
One hint, that might help to narrow down the choices is starting with types. If you’re not sure, how to design this Flow or that Sink, think about what could be a reasonable type. In other words, think, how to express the role of this stage in types. This usually helps when facing multiple choices, and can guide you to the final design.
One of the most fundamental things when designing a stream is error handling and there are multiple ways to approach it in Akka Streams. Now, having all of this in mind, let’s see how to iteratively tackle this problem in a real-world example.
Example
As an example, we’re using Akka Streams 2.5.21 to model an event processing pipeline. The stream consists of the following stages:
- Consuming an event (from e.g. SQS, Kafka, …).
- Decoding an event.
- Applying validation to exclude events, that do not fulfill some conditions.
- Persisting an event.
As you might notice, each stage might fail. And this is where error handling is important to get right. Otherwise, it might have a negative impact on the overall stream design, what you’ll see next.
Iteration #1
Let’s start from the beginning. Our stream begins with a Source of events:
val source: Source[Event, NotUsed] = ???
In the next step, we’d probably want to decode each event to give it a domain meaning. Let’s use circe for it.
val decodingFlow: Flow[Event, Either[Error, OrderShipped], NotUsed] = Flow.fromFunction(event ⇒ decode[OrderShipped](event.content))
So far so good. Since the types align, our stages are composable:
val decodingSource: Source[Either[Error, OrderShipped], NotUsed] = source.via(decodingFlow)
Let’s now implement the validation flow:
val validatingFlow: Flow[Either[Error, OrderShipped],
Either[Error, OrderShipped], NotUsed] = ???
In order to make this stage composable, it has to accept Either[Error, OrderShipped]
on its input. Let’s pause here and think. Why would it accept Either on its input? What would it actually mean?
Going forward with this idea, every stage would more or less look like the following:
val flow = Flow[Either[Error, OrderShipped]].map {
case Right(event) ⇒ ??? // actual business logic here
case Left(_) ⇒ ??? // pass downstream
}
Now every flow in our stream does 2 things:
- Handles the
Right
case, and executes the business logic. - Handles the
Left
case, and immediately passes it forward to the output.
And it’s bad if all our stages, by a wrong design decision, do two things, instead of one.
Iteration #2 — better types
Let’s improve! In the second iteration, let’s think about what could be the reasonable types. The first iteration taught us, that the flows shouldn’t accept Eithers. It’d be better if they accepted a simple value, and returned the result of its processing. Having this in mind, we could start with the following types:
val source: Source[Event, NotUsed]
val decodingFlow: Flow[Event, Either[ProcessingError, OrderShipped], NotUsed]
val validatingFlow: Flow[OrderShipped, Either[ProcessingError, OrderShipped], NotUsed]
val persistingFlow: Flow[OrderShipped, Either[ProcessingError, OrderShipped], NotUsed]
Each flow is now simpler. It accepts a single element, does the processing, and returns the result. Looks way better than in the first iteration. The ProcessingError
could be defined as follows:
sealed trait ProcessingError
case class DecodingError(...) extends ProcessingError
case class ValidationError(...) extends ProcessingError
case class PersistenceError(...) extends ProcessingError
Unfortunately, now we’re not able to easily wire those stages, as the types do not align. It means, that the following will not compile anymore:
source
.via(decodingFlow) // out: Either[ProcessingError, OrderShipped]
.via(validatingFlow) // in: OrderShipped
.via(persistingFlow)
.to(sink)
What we’d want to achieve now is to immediately plug error handling sink after each stage, if an error occurred. In other words, if the output of the flow is Left, we’d attach error handling sink. Otherwise, we’d push the successful element downstream to the next stage. What we want to achieve can be depicted in the following diagram:
In this way, we can apply custom error handling for each type of error:
DecodingError
can e.g. include the rawEvent
, push it to a Dead Letter Queue, and commit the cursor.ValidationError
can e.g. commit the cursor immediately, as we might not care about invalid events.PersistanceError
can e.g. also include the rawEvent
, push it to a Dead Letter Queue, and commit the cursor. Or it can use a completely different error handling strategy.
One way to wire the stream to implement the above idea is to leverage using flatMapConcat, which has the following signature:
def flatMapConcat[T, M](f: Out ⇒ Graph[SourceShape[T], M]): Repr[T]
in this way:
val graph: RunnableGraph[NotUsed] = source
.via(decodingFlow)
.flatMapConcat {
case l @ Left(_) ⇒ Source.single(l)
case Right(orderShipped) ⇒
Source
.single(orderShipped)
.via(validatingFlow)
.flatMapConcat {
case l @ Left(_) ⇒ Source.single(l)
case Right(orderShipped) ⇒
Source
.single(orderShipped)
.via(persistingFlow)
}
}
.to(sink)val sink: Sink[Either[ProcessingError, OrderShipped], NotUsed] = ??? // handle all errors
This could potentially work. But this approach of wiring the graph is cumbersome. If the graph consisted of more processing stages, the “tree” of branches would grow. It’s a clear indication, that this approach doesn’t scale and somewhat reminds me of:
Let’s now think about how to improve on the graph wiring itself.
Iteration #3 — better wiring
We’re getting closer to a better design. We know, that we want to immediately plug error handling sink after each stage if an error occurred. What we want to achieve can be depicted in the following diagram:
Now we have almost all pieces in place, but ???
.
It turns out, that such stage can be easily created with Partition graph operator. Fortunately, it also turns out, that such stage is generic enough to be already defined in Akka Streams. And it’s called divertTo
.
def divertTo(that: Graph[SinkShape[Out], _], when: Out ⇒ Boolean): Repr[Out]
The documentation says:
Each upstream element will either be diverted to the given sink, or the downstream consumer according to the predicate function applied to the element.
Which is exactly what we want. With divertTo
, the stream could be wired in the following way:
val source: Source[Event, NotUsed] = ???
val decodingFlowDiverted =
decodingFlow.divertTo(errorHandlingSink, _.isLeft).collect { case Right(e) ⇒ e }
val validatingFlowDiverted =
validatingFlow.divertTo(errorHandlingSink, _.isLeft).collect { case Right(e) ⇒ e }
val persistingFlowDiverted =
persistingFlow.divertTo(errorHandlingSink, _.isLeft).collect { case Right(e) ⇒ e }
val graph: RunnableGraph[NotUsed] =
source
.via(decodingFlowDiverted)
.via(validatingFlowDiverted)
.via(persistingFlowDiverted)
.to(sink)
and this time it compiles. As you might notice, there’s a repeating part:
.divertTo(errorHandlingSink, _.isLeft).collect { case Right(e) ⇒ e }
so we can introduce a small improvement here as well:
object FlowOps {
implicit class FlowEitherOps[A, L, R, Mat](flow: Flow[A, Either[L, R], Mat]) {
def divertLeft(to: Graph[SinkShape[Either[L, R]], Mat]): Flow[A, R, Mat] =
flow.via {
Flow[Either[L, R]]
.divertTo(to, _.isLeft)
.collect { case Right(element) ⇒ element }
}
}
}
and the wiring is now simplified even more:
import FlowOps._val source: Source[Event, NotUsed] = ???
val decodingFlowDiverted: Flow[Event, OrderShipped, NotUsed] =
decodingFlow.divertLeft(to = errorHandlingSink)
val validatingFlowDiverted: Flow[OrderShipped, OrderShipped, NotUsed] =
validatingFlow.divertLeft(to = errorHandlingSink)
val persistingFlowDiverted: Flow[OrderShipped, OrderShipped, NotUsed] =
persistingFlow.divertLeft(to = errorHandlingSink)
val graph: RunnableGraph[NotUsed] =
source
.via(decodingFlowDiverted)
.via(validatingFlowDiverted)
.via(persistingFlowDiverted)
.to(sink)
It’s also important to emphasize, that each errorHandlingSink
can be of course different, and hence apply a different error handling strategy.
Summary
Akka Streams gives a solid foundation for building data-intensive pipelines. It comes with a wide variety of building blocks, but sometimes because of this reason, it can be easy to shoot yourself in the foot.
When designing a stream, there’s no single best way to do it. You usually explore different possibilities, try, and fail. As we went through the problem of error handling, we did 3 iterations, but ultimately landed with an acceptable design.
There are three general tips, that can be taken from this lesson:
- Try not to start with Graph DSL. Start by looking at the list of operators. It’s growing from release to release.
- Start with types. It really helps.
- Try different approaches and experiment. There are always multiple ways of designing a non-trivial stream.
Thanks for reading!