Bartłomiej Szwej
1 min readOct 13, 2019

--

Yes, flows using divertTo can be tested using Akka Streams TestKit.

The following code contains a test suite for the divertLeft operator introduced in the blog post:

"divertLeft" should "pass element downstream if right" in {
// given
val testErrorSink = TestSink.probe[Either[String, String]]
val element = Right("ok")

// when
val stream =
Source(List.fill(2)(element))
.mapMaterializedValue(_ ⇒ TestSubscriber.probe[Either[String, String]])
.divertLeft(testErrorSink)

// then
stream
.runWith(TestSink.probe[String])
.request(2)
.expectNext("ok")
.expectNext("ok")
.expectComplete()
}

"divertLeft" should "divert element if left" in {
// given
val passedElements: ListBuffer[String] = ListBuffer()
val divertedElements: ListBuffer[Either[String, String]] = ListBuffer()

val successfulElement = Right("Ok")
val failedElement = Left("Failed")

val errorSink = Sink
.foreach[Either[String, String]] { elm ⇒
divertedElements += elm
}
.mapMaterializedValue(_ ⇒ NotUsed)

// when
val sourceUnderTest =
Source(List(failedElement, successfulElement, failedElement))
.divertLeft(errorSink)
.via(Flow[String].wireTap(e ⇒ passedElements += e))
.toMat(Sink.ignore)(Keep.right)

sourceUnderTest.run().futureValue

// then
divertedElements.size shouldBe 2
divertedElements should contain only failedElement

passedElements.size shouldBe 1
passedElements should contain only "Ok"
}

with divertLeft operator defined in the following way:

object SourceOps {
implicit class SourceEitherOps[L, R, Mat](s: Source[Either[L, R], Mat]) {
def divertLeft(to: Graph[SinkShape[Either[L, R]], Mat]): Source[R, Mat] =
s.via {
Flow[Either[L, R]]
.divertTo(to, _.isLeft)
.collect { case Right(element) ⇒ element }
}
}
}

--

--