Cookbook

This is a collection of recipes for common design problems when creating solutions using fs2.

The recipes were gathered from email lists, chat rooms, Stack Overflow or any other source where a problem was encountered and I thought capturing it and consolidating it into a single place would be helpful.

Group-By

NEED FS2 UPDATE In scala, a group by takes a set of objects then creates a Map. The keys are the keys you specified to group by and the value of each key is a list of objects that were grouped into that group. Here's an example that breaks up a list into even and odd values:

scala>Seq(1,2,3,4,5,6,7,8,9,10).groupBy(_ % 2 == 0)
res81: scala.collection.immutable.Map[Boolean,Seq[Int]] = Map(false -> List(1, 3, 5, 7, 9), true -> List(2, 4, 6, 8, 10))

scala>Seq(1,2,3,4,5,6,7,8,9,10).groupBy{x => if(x % 2 == 0) "even" else "odd"}
res84: scala.collection.immutable.Map[String,Seq[Int]] = Map(odd -> List(1, 3, 5, 7, 9), even -> List(2, 4, 6, 8, 10))

The important thing to note is that groupBy acts well on a finite list, because all rows can be scanned to perform the grouping. For infinite streams, this does not work. In sstream, instead of groupBy you perform a chunkBy.

You may be familiar with chunk already:

scala>Process.emitAll(Seq(1,2,3,4,5,6,7,8,9,10)).chunk(3).toSource.runLog.run
res88: IndexedSeq[Vector[Int]] = Vector(Vector(1, 2, 3), Vector(4, 5, 6), Vector(7, 8, 9), Vector(10))

You can use chunkBy to specify the groupBy criteria:

scala>Process.emitAll(Seq(1,2,3,4,5,6,7,8,9,10)).chunkBy(_<5).toSource.runLog.run
res90: IndexedSeq[Vector[Int]] = Vector(Vector(1, 2, 3, 4, 5), Vector(6, 7, 8, 9, 10))

But there is a twist. Your data needs to be sorted first so that the chunkBy function can detect when a change in the "group" occurs. This imposes some constraints on your input so its not as easy to use as the standard groupBy:

scala>Process.emitAll(Seq(2,4,6,8,10,1,3,5,7,9)).chunkBy(_%2==0).toSource.runLog.run
res91: IndexedSeq[Vector[Int]] = Vector(Vector(2, 4, 6, 8, 10, 1), Vector(3, 5, 7, 9))

Ouch, we picked up a 1 in the "even" group. You would need to be more careful about your logic.

When you treat a stream as an infinite stream, its harder to get the boundary just right. But in reality, you would never process the stream this way because the above also still implicitly assumes a finite stream. You should really just create a large enough window and then groupBy the windowed data directly. Your process then consumes many smaller groupBys results:

scala> Process.emitAll(Seq(1,2,3,4,5,6,7,8,9,10)).chunk(4).map(_.groupBy(_%2==0)).toSource.runLog.run
res98: IndexedSeq[scala.collection.immutable.Map[Boolean,scala.collection.immutable.Vector[Int]]] =
Vector(Map(false -> Vector(1, 3), true -> Vector(2, 4)), Map(false -> Vector(5, 7), true -> Vector(6, 8)),
Map(false -> Vector(9), true -> Vector(10)))

If you have a finite stream of elements and you have enough memory, then you just need to chunk everything and perform the groupBy:

scala> Process.emitAll(Seq(1,2,3,4,5,6,7,8,9,10)).chunkAll.map(_.groupBy(_%2==0)).toSource.runLog.run
res99: IndexedSeq[scala.collection.immutable.Map[Boolean,scala.collection.immutable.Vector[Int]]] =
Vector(Map(false -> Vector(1, 3, 5, 7, 9), true -> Vector(2, 4, 6, 8, 10)))

Reduce-By

Halt a Process Early (and stop the stream)

NEED FS2 UPDATE After a posting on scalaz-stream (here), you may want to halt a stream early. We saw that a stream is just a series of Process instance objects that is interpreted by an interpreter.

For example:

scala> (emit(10) ++ emit(20)).toSource.runLog.run
res4: IndexedSeq[Int] = Vector(10, 20)

scala> (emit(10) ++ halt ++ emit(20)).toSource.runLog.run
res5: IndexedSeq[Int] = Vector(10, 20)

And we can recall that if a process is already formulated as a series of process instances, the interpreter will continue to process the process instances in the series until it hits the end. In the above, the halt was skipped.

halt is defined as Halt(End). The Halt case class takes a Cause argument. Causes can be End, Kill or Error. Kill just kills the process. But does that work?

scala> (emit(10) ++ Halt(Kill) ++ emit(20)).toSource.runLog.run
res7: IndexedSeq[Int] = Vector(10)

It looks like it does. The Halt(Kill) versus the Halt(End) actually stops the processing. You can add an exception to Error to propagate the exception:

scala> (emit(10) ++ Halt(Error(new IllegalArgumentException("bad arg"))) ++ emit(20)).toSource.runLog.run
java.lang.IllegalArgumentException: bad arg
...
  ... 43 elided

But in this case the exception is thrown! Not very useful.

It is also a bit more difficult when you need to cause a Halt inside an environment like Task. Inside a Task, you can throw a Cause.Terminated or Cause.CausedBy exception which will get caught by the interpreter and halt the process. Again, this causes a throw versus graceful handling.

One problem with early termination is whether the right handlers are called. For example, if your stream is embedded in a larger program that contains error handling, then you can use halt (or whatever) and outside the enviroment perform some actions e.g. print a message that the process terminated. This is not really the functional way though.

However, as you saw above, you may not know what really happened. In the expression (emit(10) ++ Halt(Kill) ++ emit(20)).toSource.runLog.run you actually get a result back but you do not really know if something caused early termination. You may also have a requirement to indicate which specific data element caused the issue.

You have a couple of approaches:

  1. Issue a Halt inside the stream as described above. Something still needs to catch the error. See the other items in this list.

  2. Attach handlers to your F[_] environment e.g. if using Task you can call yourProcess.run.attemptRun to get back a disjunction of the final value or the error. This is equivalent to handling the error outside the stream.

  3. Handle the error in the stream (and integrate error reporting e.g. using a Writer). You can insert a halt directly into your processing using flatMap. See (2) to handle the kill when running the flow.

    yourProcess.flatMap(outVal =>
       if(errorCond(outVal)) { 
         println("Error!")
         Halt(Cause.Kill)
       } else emit(outVal))
  4. Print out an error message and keep processing.

  5. Publish to an error "stream" that is waiting for "errors" and reports them to the user in some fashion.

  6. Write your own Catchable

sstreams was designed such that if an exception is thrown within the stream, resource management should clean up opened resources. Its not clear how cleanup occurs if an exception is caught in the F environment as described in item 2. (Note to me: I need to validate this across a bunch of scenarios).

Since this recipe is about halting a process early, some of these are skipped in this section. It is more common that you want to keep processing the data or print out a highly informative error message so you should really identify the error when it occurs, print out a robust error message (see Writer for an approach to this) then continue processing normally or stop.

If you really have to stop the process then you should ensure that you have the right handlers attached to the stream to perform your cleanup logic. For example,onFailure, attempt and partialAttempt allow you to specify a new process to be returned after processing hits an exception. You could use these to send messages to other streams or perform other logic. These combinators give you the exception to process in your error handling logic. onKill allows you to specify another, perhaps nicer, process to execute if you hit any exception. swallowKill converts a kill to a nice End. Note that onComplete may not perform its "completion process" if there is an error so use it carefully.

You could also write your own Catchable and use it explicitly or implicitly when you build your stream using yourprocess.run. run takes a Monad and Catchable as implicit parameters so you could supply your own and define your own handler when you materialize your stream. You may need todo this if you are unable to combine one Process with another using the combinators listed above to handle errors within the stream.

Personally, I like to write out the errors robustly then continue processing. If I must halt, I like to write the error nicely within the stream, so error handling is integrated from the start and will be based on different types of errors. Then I end the Process normally using a swallowKill or a flatMap into an end. Many times, the rest of my program is usually running as a Process and multiple processes are interacting. In this case, the process with an error needs to continue processing, perhaps after adjusting some configuration state. In this case, I just need to "restart" or "resume" processing normally and the combinators mentioned above allow me to do that easily.

If I need to get an exception wrapped in a Monad like Option or a disjunction, I'll use yourprocess.run.attemptRun. This is essentialy the same as getting an "exit" code from a process that is processed outside the stream. I use this in one-off programs or short programs that are not part of a set of streams running together.

Fan-Out

NEED FS2 UPDATE

sstream is designed to have one down-stream final process-the top level process. Because of this design approach, its easier to manage back pressure, but harder to implement some topologies such as fan-out.

Fan-out can be achieved multiple ways. First, you can set up a subscribers and have data broadcast to them. Second, you can create some new combinators that help with fan-out albiet with some limitations due to the pull-model design.

Fan-In

NEED FS2 UPDATE

Fan-in can be implemented three ways:

  1. By using tee or wye.

  2. By using subscribers

  3. By using coproducts.

Request/Response

Managing State Across Process Iterations

NEED FS2 UPDATE

State can be managed within a process but you need to use the proper pattern to capture the state and transfer it to each process iteration creation. Since a Process is a relatively cheap object to produce, you can something like the following.

/**
   * Look through stream data for a tag. Return a tuple of the tag and data. Use {first}
   * as a default until the tag is found in the data.
   */
  def extractAttribute[T, D](lastTag: T)(getTag: D => Option[T]): Process1[D, (T, D)] = {
    def go(lastTag: T): Process1[D, (T, D)] = {
      Process.receive1[D, (T, D)] {
        event: D =>
          val tag = getTag(event) getOrElse lastTag
          Process.emit((tag, event)) ++ go(tag)
      }
    }
    go(first)
    }

A small variation forces extractAttribute to be explicit about its return value and it receives the lastTag in case it wants to return it. This helps get rid of the Option.

def extractAttribute[T, I](lastTag: T)(getTag: Tuple2[T, I] => T): Process1[I, (T, I)] = {
    def go(lastTag: T): Process1[I, (T, I)] = {
      Process.receive1[I, (T, I)] {
        event: I =>
          val tag = getTag((lastTag, event))
          Process.emit((tag, event)) ++ go(tag)
      }
    }
    go(first)
}

But we can rewrite this more compactly:

def addTag[T, I](defaultTag: T)(getTag: ((I, T)) => T): Process1[I, (I, T)] =
    Process.receive1[I, (I, T)] { e =>
      val tag = getTag((e, defaultTag))
      Process.emit((e, tag)) ++ addTag(tag)(getTag)
    }

This is a general purpose process that finds a "tag" in the data stream and creates a tuple with that tag and the data as output. Based on your getTag you can compute the tag of your choice. For example, when the data object contains a "new" tag, you can start adding that tag to the output until the tag changes. You need a starting tag to use first in case the first data object does not contain a tag. We renamed the function addTag because it adds a "tag" to the input record with a value extracted from the data stream values.

This pattern looks like it could be much like the signal in sstream, but its different in that we want this to be within the stream. Downstream, a process can process the tag and the data object together, or rip the tag off and pass through only the data object. Since a Tuple is really Coproduct, we see that we can use this technique to add state derived from the data stream to data objects.

This pattern looks different than [[Database Sinks]] because it needs to transfer state across process invocations. In [[Database Sinks]], the Process.repeatEval acts on what is a relatively constant expression, a function that writes to the database. In this pattern, we need to pass state, the last tag found, to the new process "instance." You need a new process instance each time the process instance is stepped forward (the iteration by the driver). If the tag was the same each time, we could have used Process.repeatEval. Of course, a constant tag does not solve our data processing problem.

The expression Process.emit((tag,event)) ++ go(tag) shows how a stream should be though about as a sequence of process object instances that are driven by a driver program. Since we need to create a sequence of process instances, we can use the append operator (++ or fby) to sequence two process steps. One to emit a value and the other to generate more process object instances.

I came across the need for this recipe when parsing logfiles. The logfiles have events. Some of the events indicate the start of a business activity and others indicate the end of the business activity. In between are other events related to the business activity but they do not have a business activity tag. I needed to derive the business activity tag from the data itself.

A more recent version of sstreams included the following definition, which is much more compact than the one above, and something you can use, however, it is different than the above. The zipWithState below actually uses the initial state in the first tuple returned instead of calculating it from the actual data element. You can use this to carry state across, but its not really an extraction process based on the current element.

/** Zips the input with state that begins with `z` and is updated by `next`. */
  def zipWithState[A,B](z: B)(next: (A, B) => B): Process1[A,(A,B)] =
    receive1(a => emit((a, z)) ++ zipWithState(next(a, z))(next))

You should notice that instead of the model of using a go and a function for iteration, the process steps were appended together.

Database Sinks

Writing data to a database is a side effect. To managed side effects, create a Sink. A Sink is a process that outputs a function that takes an input and returns an environment (F[_]) that can be run to perform the side effect when the driver runs. The database operation is a small part of the operation. You need to arrange for your data to be sent to the sink using Process.to(yourDbProcess).

Here's an example using typesafe's slick. It is assumed that the database machinery is in scope through implicits. The data type is Event and Events is the TableQuery object that can preform the inserts. In slick 3+, all database operations on the slick objects return an Action object that has to be run. Since we want to print out the returned rows, we run an Await to await the result. This causes a block on the thread until the inserts are complete. However, the process is running in a different thread. We could have attached a future onComplete to print out the records process but this is not needed since this would result in unnecessary thread usage.

/**
   * Write events to a database as a side effect.
   */
  val dbWriter: Sink[Task, Vector[Event]] =
    sink.lift { events: Vector[Event] =>
          Task.delay {
            val result = Await.result(db.run(Events ++= events), Duration.Inf)
            println("Inserted: " + result.getOrElse(0) + " records.")
    }}

We have another way to compose this. First let's just write a general function that returns a task that runs the actions when requested and importantly, returns the T result:

def doDbActions[T](actions: Seq[DBIOAction[T, NoStream, Effect.All]]) =
    Task.delay {
      logger.info(s"Batch update size: ${actions.size}")
      Await.result(db.run(slick.dbio.DBIO.sequence(actions)), Duration.Inf)
    }

Now we can use this to compose the Event writing function:

/** Create a Task that writes a set of events to the database. Blocks inside the task. */
  def eventsDbWriter(events: Vector[Event]) = {
    val IOs = (Events ++= events)
    doDbActions(Seq(IOs))
  }

And now, to convert this to a channel:

channel.lift(eventsDbWriter)

Since this formulation blocks inside the Task, the channel should be used concurrenly using merge. See the Concurrent recipe in the cookbook for more details. You could obviously refine the type signature more and make the TableQuery an implicit parameter to make this function even more general purpose.

Instead of Sink we have a channel so we need to use it different. With a Sink we do something like:

yourProcess.to(...your dbwriter sink...).run.run

Now with a channel, you do:

yourProcess.through(channel.lift(eventsDbWriter)).run.run

If you really want to use that .to() syntax on a Sink then you can map the output to get a sink:

yourProcess.to(channel.lift(eventsDbWriter).mapOut(_ => ())).run.run

This is a bit hacky. Perhaps a convenience function will be provided on Channel to do this automatically.

A channel or sink needs to have a function that returns an environment, in this case a Task. The Task wraps some function but does not specify how the wrapped function (in this case the database write) is performed. The caller that runs the task decides how the task is to be run. We could alter the function above and return a Future (by using an internal Promise) and complete the promise with the result.

Generally you want to batch your data before it arrives. You can use the batch process to do this to your process producing events:

myEventProducingProcess.batch(10000).to(dbWriterSink(myDataAccess)).run.run

Database Sources

NEED FS2 UPDATE

How you model a database source depends on the type of asynchronous capability the API has. For this recipe, we will assume an asynchronous (reactive) source like slick's 3.0 streaming interface.

In slick 3.0.0 you can return a result set that is materialized in memory all at once. This is what you get when you call db.run(yourQuery.result). If you use db.stream you receive a DatabasePublisher. You can interface to this publisher in different ways. The way below is fully asynchronous and returns the number of items processed.

/**
     * Given a publisher and a queue, return a {{{Task}}} that when run,
     * takes all objects offered by publisher and place
     * them into the queue. The task completes when the database publisher
     * has returned all of its results. You will probably want to run the
     * return task using {{{.runAsync}}}.
     */
    def getData[T](publisher: slick.backend.DatabasePublisher[T],
      queue: scalaz.stream.async.mutable.Queue[T], batchRequest: Int = 1): Task[Long] =
      Task {
      val p = scala.concurrent.Promise[Long]()
      var counter: Long = 0
        val s = new org.reactivestreams.Subscriber[T] {
          var sub: Subscription = _

          def onSubscribe(s: Subscription): Unit = {
            sub = s
            sub.request(batchRequest)
          }

          def onComplete(): Unit = {
            sub.cancel()
            p.success(counter)
          }

          def onError(t: Throwable): Unit = p.failure(t)

          def onNext(e: T): Unit = {
            counter += 1
            queue.enqueueOne(e).run
            sub.request(batchRequest)
          }
        }
        publisher.subscribe(s)
        Await.ready(p.future, Duration.Inf)
        counter
      }

When you are ready to use the source you can use a queue to create a new process that you source from:

// source of data from "wherever"
  val q = async.boundedQueue[YourData](100)

  val dataSourceTask = getData(dbstr, q)
  // start the streaming on another thread
  dataSourceTask.runAsync { _.fold(t => println(s"Error streaming data: $t"), rowcount => q.close.run ) }
  q.dequeue.map(_.toString).to(io.stdOutLines).run.run // blocks until the stream finishes

This will print your data using the slick streaming model. In this case we composed on the scalaz Task to ensure the queue was closed. We could also alter the return value of the getData call so it returns immediately and then compose on the returned value like:

/**
  * The `Task` returned here, when run, will return immediately. You can then
  * compose on the returned future for any cleanup.
  */
 def getData[T](publisher: slick.backend.DatabasePublisher[T],
      queue: scalaz.stream.async.mutable.Queue[T], batchRequest: Int = 1): Task[Future[Long]] =
      Task {
      val p = scala.concurrent.Promise[Long]()
      var counter: Long = 0
        val s = new org.reactivestreams.Subscriber[T] {
          var sub: Subscription = _

          def onSubscribe(s: Subscription): Unit = {
            sub = s
            sub.request(batchRequest)
          }

          def onComplete(): Unit = {
            sub.cancel()
            p.success(counter)
          }

          def onError(t: Throwable): Unit = p.failure(t)

          def onNext(e: T): Unit = {
            counter += 1
            queue.enqueueOne(e).run
            sub.request(batchRequest)
          }
        }
        publisher.subscribe(s)
        p.future
      }

Since the Task will return immediately while the slick database source streams on another thread, you can composed on the returned future to close the queue:

// source of data from "wherever"
  val q = async.boundedQueue[YourData](100)

  // start the streaming on another thread
  val dataSourceTask = getData(dbstr, q).run onComplete { futureWithRowCount => q.close.run }
  q.dequeue.map(_.toString).to(io.stdOutLines).run.run

You should not run the stream itself async and close the queue because the queue will be closed too quickly. For example, this will not work:

q.dequeue.map(_.toString).to(io.stdOutLines).run.runAsync(_ => ())
    q.close.run

Filtering Out Nones

I often find myself looping on a set of input files, trying to validate that they are valid to process, then extracting and processing multiple records. For example, I often need to process events from logfiles or entities from master files as part of some analytics I need to perform.

If the processing of an individual logical record fails, its usually good to capture the error and return an Option[T] from the processing to show that no object was produced from that iteration. Downstream, you'll need to filter out the Nones assuming you already logged the error message as appropriate.

To filter Nones out of a stream, you can do

def filterOutNones[T] = process1.filter[Option[T]](e => e.isDefined).map(_.get)

Then use this in your processing

Process.emitAll(args)
.pipe(processContentForObjects()) // print or route error messages here
...
.pipe(filterOutNones)
.to(outputSink)

Processing a Set of External Files

NEED FS2 UPDATE

It is quite common to need to process a set of files whose content is then processed. The content represents the stream data.

If the number of files is not large and the file system access time is reasonable, you do not need to pre-test the files using sstreams. For example, you could do the following:

val dfiles =
      (directories.get.map(v => v.split(',')).map(_.toSeq) getOrElse Seq())
        .map(Paths.get(_))
        .filter(p => Files.isDirectory(p))
        .flatMap(Files.newDirectoryStream(_).asScala)
        .filter(p => p.getFileName.toString.endsWith("xml"))
        .map(_.toString) ++ args

This processes an entry called directory that contains a comma separate list of files to process from the command line plus the rest of the command line arguments in args. dfiles becomes a list of validated files. In the example, above, we only accept XML files. You can turn this into a function if you wish.

Once you have a collection of "validated" filenames you con process them using

Process.emitAll(dfiles)
.map(...)
.pipe(...)
...

Generally, the "existance" test is not to burdensome on the file system. However, you may also want to ensure that they have non-zero size. If you define a function like:

/** Test a filename for non-zero files. Return boolean. */
  def nonZeroSize(f: String) = {
    handling(classOf[java.io.IOException]) by { t =>
      println(s"Error accessing $f")
      t.printStackTrace
      false
    } apply { Files.size(Paths.get(f)) > 0 }
  }

Then you can filter the original dfiles or process them in a stream:

Process.emit(dfiles)
.filter(nonZeroSize)
.map(...)
.pipe(...)
...

You may find that you need to validate the file and that the validation is very costly. Since the validation probably contains a side-effect, e.g. reading a file, you really should process this in a channel and then use concurrency to ensure adequate performance:

Process.emitAll(dfiles)
.filter(nonZeroSize)
.concurrently(8)(channel.lift(validationFunction))
.map(...)
.pipe(...)
...

In the above, we used the concurrently implicit class that is described in another cookbook entry.

Obviously, since we used println in the nonZeroSize function. we should really restructure the function to return \/ and siphon off any messages using the Writer approach described in the Writer section. If you are just writing a quick stream for analysis versus production processing, the println approach is good enough.

Run Concurrent Processes

NEED FS2 UPDATE

Tasks can be run asynchronously or synchronously but you need to be explicit.

Here's an example that uses Task.delay. The Task runs in the main thread once it is asked to run.

val infochannel = io.channel { line:String => Task.delay{ println(Thread.currentThread.getName + ": " + line)}}
infochannel: scalaz.stream.Channel[scalaz.concurrent.Task,String,Unit] = Append(Emit(Vector(<function1>)),Vector(<function1>))

scala> Process.range(1,10).map(_.toString).through(infochannel).run.run
run-main-0: 1
run-main-0: 2
run-main-0: 3
run-main-0: 4
run-main-0: 5
run-main-0: 6
run-main-0: 7
run-main-0: 8
run-main-0: 9

But a slight change in definition allows the println to occur in a separate thread:

val infochannel = io.channel { line:String => Task{ println(Thread.currentThread.getName + ": " + line)}}
infochannel: scalaz.stream.Channel[scalaz.concurrent.Task,String,Unit] = Append(Emit(Vector(<function1>)),Vector(<function1>))

scala> Process.range(1,10).map(_.toString).through(infochannel).run.run
pool-10-thread-4: 1
pool-10-thread-4: 2
pool-10-thread-4: 3
pool-10-thread-4: 4
pool-10-thread-4: 5
pool-10-thread-4: 6
pool-10-thread-2: 7
pool-10-thread-2: 8
pool-10-thread-2: 9

You'll notice that the order of the emit's was preserved so the real issue is not just ensuring that your task is called asynchronously but also that the stream is setup to run and process the outputs concurrently and either in order or not in order if you want results to stream as they become available. Otherwise, you are "semantically" blocking.

The idea is the same regardless of how you want to compose your streams. You need to produce a "stream" of "Tasks" that are evaluated somehow. The Tasks below are evaluated in sequence and both asynchronously and synchronously.

scala> Process.emitAll(Seq(Task{1},Task{4},Task.delay{60}, Task.now{30+40})).eval.runLog.run
res28: IndexedSeq[Int] = Vector(1, 4, 60, 70)

We can use gather to let the task objects run as they need to. gather uses an <yourstream>.eval to obtain the Task results. The .eval causes the Task to run as you have intended e.g. Task(..) runs asynchronously, but Task.delay(..) runs in the same thread. Below, we use gather which runs the Tasks but allows the results to be out of order which is why you see the 20 result come before the 70 even though in the first example below it comes after the 70. We have switched all the Tasks in the example above to run asynchronously:

scala> Process.emitAll(Seq(Task{1},Task{4},Task{60}, Task{30+40}, Task{20})).eval.runLog.run
res35: IndexedSeq[Int] = Vector(1, 4, 60, 70, 20)

scala> Process.emitAll(Seq(Task{1},Task{4},Task.delay{60}, Task.now{30+40}, Task{20})).gather(3).runLog.run
res36: IndexedSeq[Int] = Vector(1, 4, 60, 20, 70)

Because each Task is not performing much work you may have to run the above a few times before you see different results.

Asynchronous running can also be performed at the process level using merge.mergeN. Instead of specifying the Task directly, which may be inconvenient, you specify the processes themselves. The infrastructure inside merge makes sure that they run asynchronously.

scala> val processSource = Process.emitAll(Seq(Process.emit(100), Process.emit(90), Process.emit(60), Process.emit(40), Process.emit(20)))
processSource: scalaz.stream.Process0[scalaz.stream.Process0[Int]] = Emit(List(Emit(Vector(100)), Emit(Vector(90)), Emit(Vector(60)), Emit(Vector(40)), Emit(Vector(20))))

merge.mergeN(2)(processSource).runLog.run
res40: IndexedSeq[Int] = Vector(90, 100, 60, 40, 20)

Again, you may have to run this a few times to get the out of order behavior to be demonstrated. This may seem like an odd way to create streams, but if you think about channels, channel is a process of functions that need to be evaluated. If you zip a stream of functions with a stream of function arguments, you have a process that when evaluated, evaluates the function. It can then be run in parallel chunks using merge.mergeN. Unlike gather, merge runs the processes asynchronously and makes the result available as soon as possible, refilling its internal run queue to run another process as soon as possible. See the notes on Wye and Tee for an example of how to make the syntax for this scenario much easier and intuitive.

Last updated