prox part 2 — akka streams with cats effect
In the previous post, we have seen how prox applies advanced type level programming techniques to express executing external system processes. The input and output of these processes can be connected to streams. The current version of prox uses the fs2 library to describe these streams, and cats-effect as an IO abstraction, allowing it to separate the specification of a process pipeline from its actual execution.
In this post, we will keep cats-effect but replace fs2 with the stream library of the Akka toolkit, Akka Streams. This will be a hybrid solution, as Akka Streams is not using any kind of IO abstraction, unlike fs2 which is implemented on top of cats-effect. We will experiment with implementing prox purely with the Akka libraries in a future post.
Please note that this post was written based on an older version of prox, using cats-effect and fs2 1.0 and Akka 2.5.
Replacing fs2 with Akka Streams
We start by removing the fs2 dependency and adding Akka Streams:
Then we have to change all the fs2 types used in the codebase to the matching Akka Streams types. The following list describes these pairs:
- Stream[IO, O] => Source[O, Any]
- Pipe[IO, I, O] => Flow[I, O, Any]
- Sink[IO, O] => Sink[O, Future[Done]
Another small difference that requires changing a lot of our functions is the implicit context these streaming solutions require.
With the original implementation it used to be:
- an implicit ContextShift[IO] instance
- and an explicitly passed blocking execution context of type ExecutionContext
We can treat the blocking execution context as part of the implicit context for prox too and could refactor the library to pass both of them wrapped together within a context object.
Let’s see what we need for the Akka Streams based implementation!
- an implicit ContextShift[IO] is still needed because we are still using cats-effect as our IO abstraction
- The blocking execution context, however, was only used for passing it to fs2, so we can remove that
- And for Akka Streams, we will need an execution context of type ExecutionContext and also a Materializer. The materializer is used by Akka Streams to execute blueprints of streams. The usual implementation is ActorMaterializer which does that by spawning actors implementing the stream graph.
So, for example, the start extension method is modified like this:
It turns out that there is one more minor difference that needs changes in the internal type signatures.
In Akka Streams byte streams are represented by not streams of element type Byte. like in fs2, but streams of chunks called ByteStrings. So everywhere we used Byte as element type, such as on the process boundaries, we now simply have to use ByteStrings, for example:
Another thing to notice is that fs2 had a type parameter for passing the IO monad to run on. As I wrote earlier, Akka Streams does not depend on such abstractions, so this parameter is missing. On the other hand, it has a third type parameter which is set in the above example to Any. This parameter is called Mat and represents the type of the value the flow will materialize to. At this point, we don’t care about it so we set it to Any.
Let’s take a look at the connect function of the ProcessIO trait. With fs2 the InputStreamingSource is implemented like this:
We have a source stream and during the setup of the process graph, when the system process has been already created, we have to set up the redirection of this source stream to this process. This is separated into a connect and a run step:
- The connect step creates an fs2 stream that observers the source stream and sends each byte to the system process’s standard input. This just defines this stream and returns it as a pure functional value.
- The run step, on the other hand, has the result type IO[Fiber[IO, Unit]]. It defines the effect of starting a new thread and running the stream on it.
In the case of fs2 we can be sure that the source.observe function is pure just by checking it’s type signature:
All side-effecting functions in fs2 are defined as IO functions, so we simply know that this one is not among them, and that’s why the connect was a pure, non-IO function in the original implementation. With Akka Streams we don’t have any information about this encoded in the type system. We use the source.alsoTo function:
which is actually also pure (only creating a blueprint of the graph to be executed), so we can safely replace the implementation to this in the Akka Streams version:
The implementation of run above is a nice example of how we can integrate asynchronous operations not implemented with cats-effect to an IO based program. With IO.async we define how to start the asynchronous operation (in this case running the Akka stream) and we get a callback function, finish to be called when the asynchronous operation ends. The stream here materializes to a Future[T] value, so we can use its onComplete function to notify the IO system about the finished stream. The IO value returned by IO.async represents the whole asynchronous operation, it returns its final result when the callback is called, and “blocks” the program flow until it is done. This does not mean actually blocking a thread, but the next IO function will be executed only when it finished running (as it’s type is IO[A]). That is not what we need here, so we use Concurrent[IO].start to put this IO action on a separate fiber. This way all streams involved in the process graph will be executing in parallel.
Calculating the result
prox supports multiple ways to calculate a result of running a process graph:
- If the target is a Sink, the result type is Unit
- If the pipe’s output is Out and there is a Monoid instance for Out, the stream is folded into an Out value
- Otherwise, if the pipe’s output is Out, the result type will be Vector[Out]
These cases can be enforced by the Drain, ToVector and Fold wrapper classes.
Let’s see how we can implement them with Akka Streams compared to fs2.
The sink version was implemented like this with fs2:
- .compile gets an interface that can be used to convert the stream to a IO[A] value in multiple ways.
- .drain is one of them. It runs the stream but ignores its elements, having a result type of IO[Unit].
- We want to run this concurrently with the other streams so we move it to a fiber
With Akka Streams there is one big difference. In fs2 the sink is represented as a Pipe[F, E, Unit], so we could treat it in the same way as other stream segments. In this case the Sink is not a Flow, so we do a trick to keep the interface as close to the original one as possible:
The trick is that we create the OutputStreamingTarget with an identity flow, and only use the Sink when we actually run the stream, passing it to the runWith function. This materializes the stream into a Future[Done] value, that we can tie back to our IO system with IO.async as I already described it.
Combine with Monoid
When the element type is a monoid we can fold it into a single value. Fs2 directly supports this:
Akka Streams does not use cats type classes, but it also has a way to fold the stream, so we can easily implement it using the monoid instance:
Vector of elements
Finally, let’s see the version that keeps all the stream elements in a vector as a result:
With Akka Streams we can do it by running the stream into a sink created for this, Sink.seq. It materializes into a Future[Seq[T]] value that holds all the elements of the executed stream:
At this point, the only remaining thing is to modify the tests too. One of the more complex examples is the customProcessPiping test case. With fs2 it takes advantage of some text processing pipe elements coming with the library:
There are similar tools in Akka Streams to express this in the Framing module:
where utf8Decode is a helper sink defined as:
First, it concatenates the ByteString chunks, then simply calls .utf8String on the result.
We have seen that it is relatively easy to replace the stream library in prox without changing its interface much if we keep cats-effect for expressing the effectful computations. The complete working example is available on the akka-streams branch.