fs2 Core Model

fs2 models a sequence of values that may have been created as a result of an effectual action. Internally, a Stream object holds a reference to some values often bundled into chunks for efficiency.

Running a stream means that a sequence of computations are applied to the stream's values and those values are exposed through a context.

Both the context and stream element type are customized to your application domain. The fs2 stream model is a "pull" model which means that downstream functions call "upstream" functions to obtain values. The pull model integrates backpressure control so that downstream and upstream processing can be matched. fs2 focuses on resource safety in both a sychronous and asynchrounous environment. Hence, fs2 is useful in domains that you might think scala's streams/collections are suitable.

A Stream can also hold other Streams as well as handle effects. Effects are evaluated as part of stream processing and allow you to interact with the "outside" world such as filesystems are remote servers.

You should not think of the "output" data type as only being domain objects in your application. The data can represent control information that directs the execution of effects.

Although the following vocabulary does not exist in the fs2 library, we will use a few terms in these user notes:

  • Stream fragment: This is a Stream object that has not been "built" into its specified F context at the end of the world. For example, Stream(0,1,2) should be considered a fragment as it cannot be run using the specific context F's method to run it.

  • Runnable Stream: This is a Stream that has had its runLog or similar call made as to create a value with a type of it's context but its final context has not been run to obtain the value. You get the output value or execute the side effects of the Stream once it's F context is "run." In this sense, a runnable stream is a fs2 stream object that has been transformed into its final context F but not run.

fs2 has a few core data types that are described in the following sub-sections.

Stream

Stream[+F[_], +O]: The basic building block of all other components. It requires a context (also called an environment) of type F and outputs a value of type O. F could be a Task that runs an asynchronous call to a database or website.

A Stream should be thought of as a sequence of values that are operated on with pipes and effects.

Most importantly, a Stream is a monad. This means that fs2 streams are monadic computations. While the concept of a stream evokes a sequence of values being presented to a computation, because a fs2 Stream is also a monad, it can contain a single value and use map and flatMap to create a Stream with more values. In other words, you do not immediately create a Stream with a sequence of values, you may need to build up a Stream from monadic operations on another Stream e.g. your_stream_with_one_value.flatMap(singleValue => create_inifinite_value_func(singleValue). This is a very common pattern when developing fs2 Streams.

Pipe

A function that converts one value into another inside a Stream. It's a Stream[F,I] => Stream[F,O]

Once you create a pipe, you could literally apply it a stream object

mypipe(inputStream)

But its often best to use the more streamy looking approach of

inputStream.through(mypipe)

A pipe is pretty much a map/flatMap type functional operation but the pipe concept fits nicely into the mental model of a Stream.

Pipe2

A function that takes two Streams and outputs a single Stream. Think of interleaving two streams together. You have two input Streams that you weave together and output a single value, wrapped in a Stream, that implements your "weave" logic. For example, take right, the left, or take first available from either Stream input.

Sink

A function that takes an input value and returns an effect that returns Unit indicating an effectful operation potentially using that input value. "Running" the effect should perform some action like writing to a file.

A sink is a just a type alias for Pipe[F, I, Unit] which means it is really just a function Stream[F, I] => Stream[F, Unit].

Sinks are usually used by calling to(yoursink) on your stream.

Signal

A Signal is a memory cell that is hidden inside an "effect." You can only access (read) or set (write) the memory cell value through the effect. For example, you can set the value of a Signal by passing in an effect where the effect returns a value. The value is set into the memory cell. Similarly, to read a Signal's value, you can get an effect, run the effect, then you have the value that was inside the Signal.

You can think of a Signal as a way to share values across different Streams. One stream could set the value of the signal and another stream could read the value in the signal. Then, assuming you are composing the streams so that they are both "run", you can share information across streams without having to merge their content directly. The price you pay is the evaluation of the effect.

Generally, you'll find that you use an eval method on a Stream to evaluate the effect and get/set the signal's value. So if you see alot of Stream.evalMap(somesignal).flatMap { valueOfSignal => useIt(valueOfSignal) } you'll know that you are creating a Stream upfront in order to use it to create a stream.

Signals are also used to terminate a stream based on some criteria. For example, a Boolean Signal has an `interrupt` method that stops a stream when the signal is set to true. Or you could use the `interruptWhen` combinator to create a stream that stops when the signal is set to true.

Chunk

A sequence of values. A Chunk is more of an internal representation but the basic idea is that a Chunk holds a sequence of values. These values may be emitted through a Stream one by one semantically, but underneath, may be chunked into groups or efficiency. If you write your own pipe or source function, outputting Chunks may make your Stream run more efficiently.

Pull

A Pull is monad that holds a "program" that can get a value. A caller would use the Pull object, run the "program" and thereby "pull" a value from the Pull object. The value it can "get" for you is a resource R. Pull is used heavily internally and is exposed as API.

The "program" that a Pull holds is a Free monad that has to be run to produce the value. The fs2 machinery that expects a Pull knows how to run the Free monad. The "program" returns a value. The easy way to think about a Pull is that instead of a Pull just holding a simple value, it holds a "method" to obtain a value. This is conceptually similar to the strategy design pattern where the strategy in this case is the strategy to obtain a value. The "strategy" object is a Free-monad based "program."

In many cases, the resource of type R is manipulated and an O is produced. The resource may have the need to be "closed". Resource "closing" is considered an effect and must run in the same F environment that you are outputting your values in, hence the signature Pull[F[_], +O, +R]. The resource may also be an exception, which indicates that an error occurred in some way associated with the resource.

When you need to operate at a lower level than a Stream object, instead of passing specific values around, a Pull is passed around and the caller can "pull" values from it by running the "program." Since fs2 controls the "program" fs2 can ensure that the resources can be managed. fs2 prides itself on resource management, and a Pull object is one way that it controls managed resources.

When a Pull represents an "output," the resource is essentially a Stream and the caller picks up the Pull-return value and uses the stream items as output values where appropriate.

A Pull can enclose any value. You can map and flatMap that value as needed to manipulate it and produce another Pull. For example, when you use Stream.open you receive a Pull object that holds a Handle resource. You can use that Handle resource to wait for a value, transform the value in some way, then wrap it back up in a Pull to return a transformed value back to the caller (the "output" as described in the previous paragraph). This is exactly what happens when you call Stream.pull(Handle => Pull). A Handle is allocated and managed inside a Pull object. The function you provide operates on that value inside the Pull and produces another Pull. Essentially, the "pull" method is is a flatMap on the Pull monad.

Resources can be anything, including a Handle as Handles must be opened and closed as mentioned above. A Pull can also manage a Stream as a resource.

The resource is "free'd" or "closed" or "finalized" by calling "close" on the Pull instance. "close" is called automatically by fs2. For example, the pull method on a Stream is bracketed by an open and close to perform resource management.

Handle

A Handle is an indirect reference to a Stream. It is indirect because a Handle holds a small buffer of stream items and can, when necessary, access the underlying stream to obtain more items. A Handle can be used to both push or pull values into the Stream. A Handle is a temporary object used to manipulate Streams. As importantly, most of a Handle's combinators generate a Pull object which pulls values from the Handle's internal buffers. When you ask for a value from the underlying stream, the value is wrapped in a Pull.

You can use a Handle's to read from the underlying stream. For example, by awaiting/receiving a value, can obtain a value and process it in a domain specific way. When reading the data, you receive a Pull that represent the underlying item wrapped in a resource management friendly monad (the Pull).

You can also create a new Handle from an existing Handle by adding an output value to it. The combinators in Handle use a buffer to make this efficient. The output elements you "push" into the Handle are pre-pended to the underlying stream elements. Again, in the effort to be efficient, if you create a new Handle by pushing elements onto it, then subsequently have an await called on this "new handle", that new handle would see the elements you pushed into the buffer. If the buffer is empty, it accesses the underlying stream for an element. This is the essentially the "pull" design model that permeates fs2.

Given a Handle, you can call await. You get back a resource managed tuple of type Chunk (since working with chunks is more efficient than working with individual elements) and a new handle that represents the current handle but without the element it just returned--the rest of the "stream elements": (chunk, handle with the remainder).

Since a Handle is a monad, you can map or flatMap into it and this is roughly equivalent to mapping the function into both the internal buffer and the underlying stream--you are mapping onto the elements of the semantic stream. Since the internal representation of a Handle should be off limits to callers, you generally map and flatMap into a Handle to change its values. Using map and flatMap to operate on "something inside a container" is a very standard function programming concept.

You can call await on the Handle object and obtain a Pull object (the wrapped value from the underlying stream), or you could call receive which allows you to write a more convenient function that takes the tuple of (value, next handle) and return a Pull.

Last updated