My fs2 (was scalaz-stream) User Notes
  • Introduction
  • Foundations
  • Design Patterns
    • Category Theory Express Design Patterns
    • Category Theory
      • Semigroup
      • Monoid
      • Functor
      • Applicative
      • Products\/Coproducts
      • Monad
      • Free Monad
    • Category Theory for fs2
  • fs2 Core Model
    • Streams and Graphs
    • Composability
    • API
    • Effects
      • fs2 Effects
    • Smart Constructors and Combinators
  • fs2 Basics
    • Technical Setup
    • Simple Streams
    • Effectful Streams
    • Error Handling and Resource Management
    • Sinks
    • Channels and Exchanges
    • Combining Streams: Wye and Tee
    • Refs
  • Cookbook
    • Recipe: onFinish for Task
    • Recipe: Changing Seq[A] to A
    • Recipe: Debugging with toString
    • Recipe: Retrieving Web Pages
    • Recipe: Converting an Iterator to a Stream
    • Recipe: Data-Driven Stream
    • Recipe: Evaluate a Task Periodically
    • Recipe: Delay running a stream
    • Recipe: Stop an infinite Stream when you want to
    • Recipe: CSV Parsing with univocity
    • Recipe: CSV Parsing with node.js csv-parse using Readable => Stream
  • fs2 Examples
    • Akka Examples
    • Cafe
    • More Than One Stream
Powered by GitBook
On this page

Was this helpful?

  1. fs2 Core Model

Streams and Graphs

fs2 is designed to work well with processes that can be thought of as "pull" model. Sticking to this model keeps the approach to building streams simpler. fs2 does include other constructs to help with the push model.

Do fs2 streams represent graphs? They do because you can attach one Stream's output to the input of another Stream. However, the library focuses on composition. If one Stream's output is fed into another Stream's input, a new Stream is created.

scala> val baseProcess = Process.range(1,10)
baseProcess: scalaz.stream.Process0[Int] = Append(Halt(End),Vector(<function1>))

scala> baseProcess.toSource.runLog.run
res0: IndexedSeq[Int] = Vector(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> baseProcess.toSource.runLog.run
res1: IndexedSeq[Int] = Vector(1, 2, 3, 4, 5, 6, 7, 8, 9)

We ran two streams with the same baseProcess twice. In sstreams, you build a description of a stream using immutable Process objects. You can re-use baseProcess without concern that it had already been run and consumed in some way. This is true even if your base process has side effects. For example, if you use the sstream io module, you can create a Process that reads from a file. Creating the process does not cause the file to be read. You can run that process multiple times similar to the above it will read the file the same way.

sstreams often appears to be dealing with multiple outputs from a single Process. However, when you look under the cover, sstreams is often using Coproducts to represent two types bundled into a single type.

If a stream fan-outs then merges, you can use a single toplevel process to create your stream.

Other stream processing libraries that can create full graphs typically create "nodes" that have routers in front of them that route specific stream elements to it based on some criteria, for example, based on a value in the element object or the element type. The nodes are started and stopped independently. In this case, a node is processing different objects. In sstream, a stream processes a single object, such as a Coproduct, even though it may appear to be only operating on one of the types. This is a very different model.

Previousfs2 Core ModelNextComposability

Last updated 5 years ago

Was this helpful?