Simple Streams
Stream fragments are created using smart constructors.
For example if you process the contents of a file in batch, the initial fragment is often a Stream
object from the fs2 io
module. Combinators and stream creators in the io
module essentially create a Stream[Task,String]
object because an IO operation needs to happen with a context, in this case, the Task
context. When we initial create streams, that are neither runnable nor run automatically.
Creating Simple Streams
The fragment below is not run automatically. It needs to be converted to a runnable stream then run explicitly. Notice that it produces a Stream
which signals to us that it really acting as a source.
We knew to use emit
or just apply
because we looked at the Stream object documentation in the Stream companion object. The Stream object documentation holds many smart constructors. By default, no output was created. A description of a stream was created. The Emit
inside the result is an instruction that instructs the stream, once it is run, to emit a value. The value is wrapped in a Chunk as Chunks are used internally to manage collections of items more efficiently. You could also construct the stream using a chunk directly:
which is of course more verbose.
The following fragment skips using the emit
method and emits 3 values instead of just one:
Each stream object created is immutable. You can build up streams using this base object and without changing the underlying base object. So:
This shows that you can create stream fragments, which are immutable values, and use them to create other, new streams by linking them together using scala functional idioms such as map
.
This is not true of all stream fragments. For example, if the stream fragment contains a side effect you could get different results when the side effect runs. Streams created using "strict" values such as integers 0 and 1 really create "pure" streams. Streams are pure if they have no side effects. We could make the "pure" concept explicit in stream construction and instead of showing "Nothing" for the effect, convert it to "Pure". We also need to convert streams to "Pure" effects at times to satisfy the scala compiler:
You can see that instead of Stream[Nothing, Int]
we now have Stream[Pure, Int]
.
In the examples below, we will run these fragments immediately but later we will connect stream fragments together using pipes and factor in effects, which is harder.
Running Streams to Get Results
We need can create a runnable Stream then run it. For Pure streams, this is pretty straight forward and matches what scala already provides in its collection/stream objects. Creating a runnable stream when effects are involved is more complex but should be less complex and give you more control (i.e. make things very explicit) than using the basic building blocks contained in scala. For example, composing a program using scala Future's and thread pools to perform the same operations and remain composable may be more difficult than using fs2 Streams. fs2 may be overkill for some simple problems, but may be critical for more complex processing scenarios.
In the example below, the output is a single number. runLog
converts a Stream
to the F
context, in this case a Task
, which is then run to obtain the result. runLog
indicates that the outputs of the stream should be "logged" to a collection. The default collection for runLog
is a standard scala Vector
. Consider the fragment below:
If we tab complete on .run
we see a few different run methods. Since the stream is pure, runLog actually runs the stream as no other side effect needs to be considered (fs2 selects its own side effect to run a pure stream):
We now see that the stream output since we took the final effect, a Task, and used the Task API to run the Task, which in this case was .unsafeRun
. .runLog
automatically created a Vector output in the sense that "log" logged the output to an in-memory structure, a Vector. Since the stream was pure we could have also just called toVector
on the stream fragment, no run
command would have been needed.
A pure stream does not have an effect so there is no Task to output from .run
and hence no need to run .unsafeRun
.
fs2 also focuses on resource safety. Its internal interpreter (using a free monad) catches exceptions and maps them into various outputs. It does this because in order to guarantee resource safety, fs2 must catch exceptions. You or fs2 may choose to rethrow the exceptions but that's your choice. Exceptions are actually mapped to a Fail "instruction" in the interpreter.
Let's consider a stream fragment where we only use run
:
Unlike with runLog
we would not have any output. We can view runLog as adding an effect of writing to a scala vector as a convenience for us.
Internally, when a stream is run through Pure or a Task, fs2 adds a "fold" command to our stream that folds over the stream value and collects them into a data structure. The initial value of .runLog
fold is an empty vector and the operation during the fold is the "append to vector."
We could use our own collection. Underneath runLog is runLogFree.run
which calls a fold: runFoldFree(Vector.empty)(_ :+ _)
. So let's say we want to output our stream output items to a set:
Note that we still called run
to run it as runFoldFree
just says that when the stream is run, to run a fold on it. There are others way to accomplish writing to different outputs but the idea is that essentially we transformed our stream from a stream of Ints to a stream with "item" type "Set" using a fold. The run*
methods implies its the end of the world and to "build" a runnable stream" (and potentially run it).
We could just use a standard fold
in our stream but then we have to run it and collect the output using runLog:
We needed the runLog because unlike before, the runFoldFree knew to do the run as well whereas just a plain fold
did not.
Simple Stream Composition
Now we can chain together several individual objects that represent a "step" and have them work in sequence. We need to append (or add) individual emits together. In sstream, this is can be done by the operators ++
:
which is the same as
There are also some smart constructors for creating ranges.
This also shows that the output can be any object, not just Strings or Ints, but in the last case, a Tuple.
The different ways to run a Task are covered in a later section. There are many .unsafeRun
-like methods that you can use to return Either to manage exceptions as values or to run the Task asynchronously in the background and attach a callback so that you can process the result when it completes.
Important thoughts on composing streams.
The above compositions concatenated streams together so that the outputs from the first stream came first then the outputs from the second stream came second when the final stream was run. This is one way to "append" streams.
The other more useful way in advanced streams is to use map/flatMap. In this context, these monadic operators are used for sequencing. The key thing to recognize is that the stream that is being mapped/flatMapped runs first then the elements/stream elements from the inner expression come second. It's not the same thing as concatenating streams together but you can think of mapping/flatMapping as "feeding" the inner map/flatMap expression with the Stream results as the stream results are generated, assuming the first Stream emits anything at all.
The Stream below emits a curious set of values but makes a point:
Each upstream value that is emitted makes the expression in the flatMap run as if the upstream Stream was an outer loop in a for-loop. Then, because this is a flatMap and we output a Stream(4)
each time, the downstream Stream becomes a Stream that emits a 4 for each upstream value. runLog
sees 3 4s.
Since Streams are monads, we use one-element Streams to initialize information "within the stream":
Here we assumed that Task.delay(10)
represents a resource that we had to run asynchronously to obtain, then we fed that resource, as a one element Stream result, into the flatMap to be used to create another Stream that uses that resource. Since the result of the flatMap becomes the "stream", we can consume the resource without having to print or output it by passing it along. It is effectively consumed to create the real stream we cared about Stream(intResource+10)
. This resource could also be fed into a downstream stream to be used as an initializer for Stream element generation. In the above example, we actually pass it along after adding 10 to it, so the "log" sees 20.
A Hint of Effectful Streams
All the streams up to know were pure in the sense the streams did factor in effects. However, internally, fs2 may convert your pure streams to effectful streams. It does this for some of the functions on Stream that may appear to be pure.
For example, you instead of using runLog
you could use toVector
:
Internally, the stream is converted to a Stream with a Task
effect. The Task effect allows us to choose whether we just want the raw value out of the Task or have it throw an exception if there is an error.
When we use an effect, we use the effect's error handling. A fs2 Task
has an unsafeRunSync/unsafeRunAsync method that returns a "Right" value directly or throws an exception if it encounters a Left. So out of fs2, we can get an Either, the Task interprets this Either in unsafeRunSync and either provides the value or throws an exception. It looks something like this:
In this case the Right was a Vector.
Your choice of effect decides what function needs to be called to run a runnable stream. fs2 provides the Task effect which is fairly flexible and useful not only in fs2 but also standalone. You could use your own effect, for example, an Option, List or Either as an effect although for asynchronous work, that may not make alot of sense.
Last updated