1 min readOct 13, 2019
Yes, flows using divertTo
can be tested using Akka Streams TestKit.
The following code contains a test suite for the divertLeft
operator introduced in the blog post:
"divertLeft" should "pass element downstream if right" in {
// given
val testErrorSink = TestSink.probe[Either[String, String]]
val element = Right("ok")
// when
val stream =
Source(List.fill(2)(element))
.mapMaterializedValue(_ ⇒ TestSubscriber.probe[Either[String, String]])
.divertLeft(testErrorSink)
// then
stream
.runWith(TestSink.probe[String])
.request(2)
.expectNext("ok")
.expectNext("ok")
.expectComplete()
}
"divertLeft" should "divert element if left" in {
// given
val passedElements: ListBuffer[String] = ListBuffer()
val divertedElements: ListBuffer[Either[String, String]] = ListBuffer()
val successfulElement = Right("Ok")
val failedElement = Left("Failed")
val errorSink = Sink
.foreach[Either[String, String]] { elm ⇒
divertedElements += elm
}
.mapMaterializedValue(_ ⇒ NotUsed)
// when
val sourceUnderTest =
Source(List(failedElement, successfulElement, failedElement))
.divertLeft(errorSink)
.via(Flow[String].wireTap(e ⇒ passedElements += e))
.toMat(Sink.ignore)(Keep.right)
sourceUnderTest.run().futureValue
// then
divertedElements.size shouldBe 2
divertedElements should contain only failedElement
passedElements.size shouldBe 1
passedElements should contain only "Ok"
}
with divertLeft
operator defined in the following way:
object SourceOps {
implicit class SourceEitherOps[L, R, Mat](s: Source[Either[L, R], Mat]) {
def divertLeft(to: Graph[SinkShape[Either[L, R]], Mat]): Source[R, Mat] =
s.via {
Flow[Either[L, R]]
.divertTo(to, _.isLeft)
.collect { case Right(element) ⇒ element }
}
}
}