Cookbook
This is a collection of recipes for common design problems when creating solutions using fs2.
The recipes were gathered from email lists, chat rooms, Stack Overflow or any other source where a problem was encountered and I thought capturing it and consolidating it into a single place would be helpful.
Group-By
NEED FS2 UPDATE In scala, a group by takes a set of objects then creates a Map. The keys are the keys you specified to group by and the value of each key is a list of objects that were grouped into that group. Here's an example that breaks up a list into even and odd values:
The important thing to note is that groupBy
acts well on a finite list, because all rows can be scanned to perform the grouping. For infinite streams, this does not work. In sstream, instead of groupBy you perform a chunkBy.
You may be familiar with chunk already:
You can use chunkBy to specify the groupBy criteria:
But there is a twist. Your data needs to be sorted first so that the chunkBy function can detect when a change in the "group" occurs. This imposes some constraints on your input so its not as easy to use as the standard groupBy:
Ouch, we picked up a 1 in the "even" group. You would need to be more careful about your logic.
When you treat a stream as an infinite stream, its harder to get the boundary just right. But in reality, you would never process the stream this way because the above also still implicitly assumes a finite stream. You should really just create a large enough window and then groupBy the windowed data directly. Your process then consumes many smaller groupBys results:
If you have a finite stream of elements and you have enough memory, then you just need to chunk everything and perform the groupBy:
Reduce-By
Halt a Process Early (and stop the stream)
NEED FS2 UPDATE After a posting on scalaz-stream (here), you may want to halt a stream early. We saw that a stream is just a series of Process instance objects that is interpreted by an interpreter.
For example:
And we can recall that if a process is already formulated as a series of process instances, the interpreter will continue to process the process instances in the series until it hits the end. In the above, the halt was skipped.
halt
is defined as Halt(End)
. The Halt
case class takes a Cause
argument. Causes can be End
, Kill
or Error
. Kill just kills the process. But does that work?
It looks like it does. The Halt(Kill)
versus the Halt(End)
actually stops the processing. You can add an exception to Error
to propagate the exception:
But in this case the exception is thrown! Not very useful.
It is also a bit more difficult when you need to cause a Halt inside an environment like Task
. Inside a Task, you can throw a Cause.Terminated
or Cause.CausedBy
exception which will get caught by the interpreter and halt the process. Again, this causes a throw versus graceful handling.
One problem with early termination is whether the right handlers are called. For example, if your stream is embedded in a larger program that contains error handling, then you can use halt
(or whatever) and outside the enviroment perform some actions e.g. print a message that the process terminated. This is not really the functional way though.
However, as you saw above, you may not know what really happened. In the expression (emit(10) ++ Halt(Kill) ++ emit(20)).toSource.runLog.run
you actually get a result back but you do not really know if something caused early termination. You may also have a requirement to indicate which specific data element caused the issue.
You have a couple of approaches:
Issue a
Halt
inside the stream as described above. Something still needs to catch the error. See the other items in this list.Attach handlers to your
F[_]
environment e.g. if usingTask
you can callyourProcess.run.attemptRun
to get back a disjunction of the final value or the error. This is equivalent to handling the error outside the stream.Handle the error in the stream (and integrate error reporting e.g. using a Writer). You can insert a halt directly into your processing using flatMap. See (2) to handle the kill when running the flow.
Print out an error message and keep processing.
Publish to an error "stream" that is waiting for "errors" and reports them to the user in some fashion.
Write your own
Catchable
sstreams was designed such that if an exception is thrown within the stream, resource management should clean up opened resources. Its not clear how cleanup occurs if an exception is caught in the F
environment as described in item 2. (Note to me: I need to validate this across a bunch of scenarios).
Since this recipe is about halting a process early, some of these are skipped in this section. It is more common that you want to keep processing the data or print out a highly informative error message so you should really identify the error when it occurs, print out a robust error message (see Writer for an approach to this) then continue processing normally or stop.
If you really have to stop the process then you should ensure that you have the right handlers attached to the stream to perform your cleanup logic. For example,onFailure
, attempt
and partialAttempt
allow you to specify a new process to be returned after processing hits an exception. You could use these to send messages to other streams or perform other logic. These combinators give you the exception to process in your error handling logic. onKill
allows you to specify another, perhaps nicer, process to execute if you hit any exception. swallowKill
converts a kill to a nice End
. Note that onComplete
may not perform its "completion process" if there is an error so use it carefully.
You could also write your own Catchable
and use it explicitly or implicitly when you build your stream using yourprocess.run
. run
takes a Monad and Catchable as implicit parameters so you could supply your own and define your own handler when you materialize your stream. You may need todo this if you are unable to combine one Process with another using the combinators listed above to handle errors within the stream.
Personally, I like to write out the errors robustly then continue processing. If I must halt, I like to write the error nicely within the stream, so error handling is integrated from the start and will be based on different types of errors. Then I end the Process normally using a swallowKill
or a flatMap into an end
. Many times, the rest of my program is usually running as a Process and multiple processes are interacting. In this case, the process with an error needs to continue processing, perhaps after adjusting some configuration state. In this case, I just need to "restart" or "resume" processing normally and the combinators mentioned above allow me to do that easily.
If I need to get an exception wrapped in a Monad like Option or a disjunction, I'll use yourprocess.run.attemptRun
. This is essentialy the same as getting an "exit" code from a process that is processed outside the stream. I use this in one-off programs or short programs that are not part of a set of streams running together.
Fan-Out
NEED FS2 UPDATE
sstream is designed to have one down-stream final process-the top level process. Because of this design approach, its easier to manage back pressure, but harder to implement some topologies such as fan-out.
Fan-out can be achieved multiple ways. First, you can set up a subscribers and have data broadcast to them. Second, you can create some new combinators that help with fan-out albiet with some limitations due to the pull-model design.
Fan-In
NEED FS2 UPDATE
Fan-in can be implemented three ways:
By using tee or wye.
By using subscribers
By using coproducts.
Request/Response
Managing State Across Process Iterations
NEED FS2 UPDATE
State can be managed within a process but you need to use the proper pattern to capture the state and transfer it to each process iteration creation. Since a Process is a relatively cheap object to produce, you can something like the following.
A small variation forces extractAttribute
to be explicit about its return value and it receives the lastTag
in case it wants to return it. This helps get rid of the Option.
But we can rewrite this more compactly:
This is a general purpose process that finds a "tag" in the data stream and creates a tuple with that tag and the data as output. Based on your getTag
you can compute the tag of your choice. For example, when the data object contains a "new" tag, you can start adding that tag to the output until the tag changes. You need a starting tag to use first in case the first data object does not contain a tag. We renamed the function addTag
because it adds a "tag" to the input record with a value extracted from the data stream values.
This pattern looks like it could be much like the signal
in sstream, but its different in that we want this to be within the stream. Downstream, a process can process the tag and the data object together, or rip the tag off and pass through only the data object. Since a Tuple is really Coproduct, we see that we can use this technique to add state derived from the data stream to data objects.
This pattern looks different than [[Database Sinks]] because it needs to transfer state across process invocations. In [[Database Sinks]], the Process.repeatEval
acts on what is a relatively constant expression, a function that writes to the database. In this pattern, we need to pass state, the last tag found, to the new process "instance." You need a new process instance each time the process instance is stepped forward (the iteration by the driver). If the tag was the same each time, we could have used Process.repeatEval
. Of course, a constant tag does not solve our data processing problem.
The expression Process.emit((tag,event)) ++ go(tag)
shows how a stream should be though about as a sequence of process object instances that are driven by a driver program. Since we need to create a sequence of process instances, we can use the append operator (++
or fby
) to sequence two process steps. One to emit a value and the other to generate more process object instances.
I came across the need for this recipe when parsing logfiles. The logfiles have events. Some of the events indicate the start of a business activity and others indicate the end of the business activity. In between are other events related to the business activity but they do not have a business activity tag. I needed to derive the business activity tag from the data itself.
A more recent version of sstreams included the following definition, which is much more compact than the one above, and something you can use, however, it is different than the above. The zipWithState
below actually uses the initial state in the first tuple returned instead of calculating it from the actual data element. You can use this to carry state across, but its not really an extraction process based on the current element.
You should notice that instead of the model of using a go
and a function for iteration, the process steps were appended together.
Database Sinks
Writing data to a database is a side effect. To managed side effects, create a Sink
. A Sink is a process that outputs a function that takes an input and returns an environment (F[_]
) that can be run to perform the side effect when the driver runs. The database operation is a small part of the operation. You need to arrange for your data to be sent to the sink using Process.to(yourDbProcess)
.
Here's an example using typesafe's slick
. It is assumed that the database machinery is in scope through implicits. The data type is Event
and Events
is the TableQuery
object that can preform the inserts. In slick 3+, all database operations on the slick objects return an Action
object that has to be run. Since we want to print out the returned rows, we run an Await
to await the result. This causes a block on the thread until the inserts are complete. However, the process is running in a different thread. We could have attached a future onComplete
to print out the records process but this is not needed since this would result in unnecessary thread usage.
We have another way to compose this. First let's just write a general function that returns a task that runs the actions when requested and importantly, returns the T
result:
Now we can use this to compose the Event
writing function:
And now, to convert this to a channel:
Since this formulation blocks inside the Task
, the channel should be used concurrenly using merge
. See the Concurrent
recipe in the cookbook for more details. You could obviously refine the type signature more and make the TableQuery
an implicit parameter to make this function even more general purpose.
Instead of Sink
we have a channel so we need to use it different. With a Sink
we do something like:
Now with a channel, you do:
If you really want to use that .to()
syntax on a Sink
then you can map the output to get a sink:
This is a bit hacky. Perhaps a convenience function will be provided on Channel
to do this automatically.
A channel or sink needs to have a function that returns an environment, in this case a Task. The Task wraps some function but does not specify how the wrapped function (in this case the database write) is performed. The caller that runs the task decides how the task is to be run. We could alter the function above and return a Future (by using an internal Promise) and complete the promise with the result.
Generally you want to batch your data before it arrives. You can use the batch
process to do this to your process producing events:
Database Sources
NEED FS2 UPDATE
How you model a database source depends on the type of asynchronous capability the API has. For this recipe, we will assume an asynchronous (reactive) source like slick's 3.0 streaming interface.
In slick 3.0.0 you can return a result set that is materialized in memory all at once. This is what you get when you call db.run(yourQuery.result)
. If you use db.stream
you receive a DatabasePublisher
. You can interface to this publisher in different ways. The way below is fully asynchronous and returns the number of items processed.
When you are ready to use the source you can use a queue to create a new process that you source from:
This will print your data using the slick streaming model. In this case we composed on the scalaz Task
to ensure the queue was closed. We could also alter the return value of the getData
call so it returns immediately and then compose on the returned value like:
Since the Task will return immediately while the slick database source streams on another thread, you can composed on the returned future to close the queue:
You should not run the stream itself async and close the queue because the queue will be closed too quickly. For example, this will not work:
Filtering Out Nones
I often find myself looping on a set of input files, trying to validate that they are valid to process, then extracting and processing multiple records. For example, I often need to process events from logfiles or entities from master files as part of some analytics I need to perform.
If the processing of an individual logical record fails, its usually good to capture the error and return an Option[T]
from the processing to show that no object was produced from that iteration. Downstream, you'll need to filter out the None
s assuming you already logged the error message as appropriate.
To filter None
s out of a stream, you can do
Then use this in your processing
Processing a Set of External Files
NEED FS2 UPDATE
It is quite common to need to process a set of files whose content is then processed. The content represents the stream data.
If the number of files is not large and the file system access time is reasonable, you do not need to pre-test the files using sstreams. For example, you could do the following:
This processes an entry called directory
that contains a comma separate list of files to process from the command line plus the rest of the command line arguments in args. dfiles
becomes a list of validated files. In the example, above, we only accept XML files. You can turn this into a function if you wish.
Once you have a collection of "validated" filenames you con process them using
Generally, the "existance" test is not to burdensome on the file system. However, you may also want to ensure that they have non-zero size. If you define a function like:
Then you can filter the original dfiles or process them in a stream:
You may find that you need to validate the file and that the validation is very costly. Since the validation probably contains a side-effect, e.g. reading a file, you really should process this in a channel and then use concurrency to ensure adequate performance:
In the above, we used the concurrently
implicit class that is described in another cookbook entry.
Obviously, since we used println
in the nonZeroSize
function. we should really restructure the function to return \/
and siphon off any messages using the Writer
approach described in the Writer section. If you are just writing a quick stream for analysis versus production processing, the println approach is good enough.
Run Concurrent Processes
NEED FS2 UPDATE
Tasks can be run asynchronously or synchronously but you need to be explicit.
Here's an example that uses Task.delay
. The Task runs in the main thread once it is asked to run.
But a slight change in definition allows the println to occur in a separate thread:
You'll notice that the order of the emit's was preserved so the real issue is not just ensuring that your task is called asynchronously but also that the stream is setup to run and process the outputs concurrently and either in order or not in order if you want results to stream as they become available. Otherwise, you are "semantically" blocking.
The idea is the same regardless of how you want to compose your streams. You need to produce a "stream" of "Tasks" that are evaluated somehow. The Tasks below are evaluated in sequence and both asynchronously and synchronously.
We can use gather
to let the task objects run as they need to. gather
uses an <yourstream>.eval
to obtain the Task results. The .eval
causes the Task to run as you have intended e.g. Task(..)
runs asynchronously, but Task.delay(..)
runs in the same thread. Below, we use gather
which runs the Tasks but allows the results to be out of order which is why you see the 20 result come before the 70 even though in the first example below it comes after the 70. We have switched all the Tasks in the example above to run asynchronously:
Because each Task is not performing much work you may have to run the above a few times before you see different results.
Asynchronous running can also be performed at the process level using merge.mergeN
. Instead of specifying the Task directly, which may be inconvenient, you specify the processes themselves. The infrastructure inside merge makes sure that they run asynchronously.
Again, you may have to run this a few times to get the out of order behavior to be demonstrated. This may seem like an odd way to create streams, but if you think about channels, channel is a process of functions that need to be evaluated. If you zip a stream of functions with a stream of function arguments, you have a process that when evaluated, evaluates the function. It can then be run in parallel chunks using merge.mergeN
. Unlike gather, merge runs the processes asynchronously and makes the result available as soon as possible, refilling its internal run queue to run another process as soon as possible. See the notes on Wye and Tee for an example of how to make the syntax for this scenario much easier and intuitive.
Last updated