Functional IO with FS2 Streams

Functional IO with FS2 Streams

One of the main principles of functional programming is to avoid side-effects. For the most part, working with immutable instances would be sufficient to satisfy that principle. But sometimes it’s needed to do some side effects e.g. when we want to read from file. One straight-forward way of doing it is to get a BufferedSource, and process it using an iterator as shown below.

def readFromFile(fileName: String) {
	val src: BufferedSource = io.Source.fromFile(fileName)
	val it: Iterator[String] = src.getLines
	val _ = while(it.hasNext) {
		val line = it.next()
		println(line)
	}
	src.close()
}

Then it is not too difficult to recursively convert the iterator to a Scala Stream of lines, without processing the whole iterator, using lazy evaluation. So we would end up with a function with signature:

   def read(pathToFile: Path): Stream[String]

There are few problems that can arise from this function:

  • First, if an exception occurs and we still haven’t reached the end of the resulting Stream, or if we forget to close the BufferedSource, then the file would stay open, which is called resource leaking. Therefore, the above function is not resource-safe.- Second and more tricky, when we call the function read for a 2nd time, it may return a different result as someone may have modified the file between the two calls. We are forced to have some intrinsic knowledge about the file, therefore, the function read breaks referential transparency.

In this blog-post we are going to tackle those two problems and learn how to avoid imperative troublesome IO using a specialized Scala library - FS2 (previously known as Scalaz-Stream).

The Idea of Functional Streams

The idea for solving the problem of resource-safety is to define a wrapper data type - Process[F[],O], where the left generic type F would know how to close the resource (the file) when it’s needed. In that way we can guarantee resource-safety without having to do anything non-related to reading from file (such as handling an exception). We can think of Process[F[],O] as a type that represents a Stream builder, possibly producing/using some side effect. For example when reading from file we would work with Process[Task, String], where Task knows how to execute and handle an operation (by assigning it to a thread). But note that it doesn’t start the execution immediately. It only starts when we call some of its executor-methods. Another example is when writing to output stream, in which case we would use Process[Task, Unit]. Hence, we can say Process[F[_],O] is like a function F[O] ⇒ Stream[O], but more powerful, and we will see exactly how is it more powerful.

The idea for solving the second problem is to separate the logic of computation from actual computation. Note that the logic of how reading from file is done doesn’t change through the course of our program’s lifecycle. Only the result of executing that logic changes (since it involves a side-effect). So we are going to say that the left type - F[O] in Process knows how to evaluate effects. In that way we can represent our logic of computation, and delegate the responsibilities of executing that logic (and handling possible exceptions of the execution) to F. Most often we would substitute the generic type F with Task (which in addition allows concurrent composition of streams).

Right now it may seem too much is going on. But as with every other paradigm, once we get a hands-on experience, we are going to feel comfortable working with it. The important conceptual thing to remember is that the left type F is responsible for closing resources and assigning operations to threads. In what follows, we are going to see examples of functional streams using the library FS2, a library that implements the above ideas.

How to Use FS2

This is how we use FS2 for reading a file…​

First we must add its dependencies:

"co.fs2" %% "fs2-core" % "0.9.1",
"co.fs2" %% "fs2-io" % "0.9.1",

Then, similar to Scala’s Future, we must provide a strategy for execution:

implicit val strategy = Strategy.fromFixedDaemonPool(4)

Then we define the pipeline of transformations:

val lines: Stream[Task, String] =
	io.file.readAllAsync[Task](pathToFile, 4096)
						.through(text.utf8Decode)
 						.through(text.lines)

val lines: Stream[Task, String] = io.file.readAllAsync[Task](pathToFile, 4096) - we get a Stream[Task, Byte].through(text.utf8Decode) - we get a Stream[Task, String], but newline is disregarded.through(text.lines) - we get the final Stream[Task, String] representing lines.

Stream[F[], O] is the FS2 implementation of what we denoted by Process[F[],O].

Note that the above code doesn’t read anything. We postpone it as much as possible, usually until the end of the world (our main method). At that point we must do two things:

  • First, we compile the pipeline of transformations, combining all the intermediate Tasks into a single Task. We do that by calling lines.run, which gives us a Task[Unit].- Second, we execute the pipeline by calling lines.run.unsafeRun(), so we end up with a single result-value, in this case Unit.

In addition, Stream[F[], O] is also a Monad, so we can do almost everything that we can do with a Seq. That is one argument why the FS2 is much more powerful than a builder-function F[] ⇒ O.

Pulling

Another very useful and powerful functionality that FS2 supports is pulling. Sometimes we don’t want to map over all the elements of an FS2Stream , but halt the process of mapping and end up with a smaller FS2Stream. We can implement that with the method pull defined on FS2Stream:

.pull[Task, Path](using: (Handle[Task, Path]) => Pull[Task, Path, Nothing]))

Note that we wrote FS2Stream, so to avoid mixing it with the standard Scala Stream.

Now comes the tricky part - how to define the auxiliary function using. The simplest way to explain that function is to say Handle knows how to retrieve the next element in a FS2Stream, and Pull knows how to pick up elements that we want to select and output as a side-effect of the pulling.

For example, imagine we want to output elements produced in the process of iterating, but produce element only in some steps of the iteration. Then we can do the following:

def using(): (Handle[Task, A], accumulator: FS2Stream[Task, B]) => Pull[Task, A, Nothing] = {
	newHandle: Handle[Task, A] =>
    	val nextPull: Pull[Task, A, Handle[Task, A]] =
          for {
              (nextElement: A, newHandle: Handle[Task, A]) <- newHandle.await1   			  updatedAcc = ...
              //update the accumulator and pass back the updated one
              _ <- someCheckingOfA match {
                      case ... => Pull.pure(())
                      //nothing to pick up
                      case ... =>  Pull.output1(something)
                      //pick up something
              }
          } yield (nextHandle, updatedAcc)
        nextPull.flatMap((nextHandle: Handle[Task, A]) => using()(nextHandle))
}

, and we would apply using() as follows:

someFS2Stream.pull[Task, A]((handle: Handle[Task, A]) => using()(handle, emptyAccumulator))

Conclusion

FS2 is a masterpiece library that allows us to work with side effects in a resource-safe, consistent, and memory-efficient way. It does all that in a fully functional and composable way, and on top of all that, it supports concurrency. In another blog-post I am going to explain how we can do concurrent computations with FS2.