Channels and Exchanges
Channels
UPDATE FOR FS2 A sstream Channel is a process of functions. It may seem strange to have a process that contains functions, but in scala, functions can be treated as data. The signature for Channel is:
The function signature is quite specific. Given some input value, return an environment that somehow returns an output value when the environment is accessed. The input value usually arrives via another process. The environment is often a Task from which a value could be obtained by running the task. The Task should return a value. The reason why the "effectful" doc comment is there is because if you can access the environment, synchronously or asynchronously, that returns a value then you can perform a side effect in that environment. A common environment is a Task. In this sense, Task could be thought of as IO
in the haskell sense---some type of effect is wrapped up in an object so it's runtime behavior can be controlled.
The main way to create a custom channel, since each effect will probably be specific to your application is to use io.channel
:
Here getUrl
uses a side effect, such as a call to a webservice, to obtain and return a result. You could imagine any side effect modeled this way. Instead of getUrl
you could use insertIntoDB
and the input could be some type of record that needs to be inserted into the database.
Another standard use of Channel is to zip a function with an argument to the function and within the zip, apply the function to the argument. You could do this with a "tee" and tee's zipApply
:
In the above, we used the argument, the stream of integers, and wanted to apply the addOne
function to them. addOne
is turned into a channel using io.channel
. Because addOne already performed the "effect" inside of a task, it was easy wrap call to create the channel. Then, using "tee" and "zipApply" we applied the function, inside the channel, to the arguments. For zipApply to work, the arguments need to be on the "left" and the function to be applied on the "right." In the next to last expression that merely running this stream results in a vector of Tasks. That's because the addOne function returned tasks. You need to extract the values from the Tasks using "run". sstream's eval
evaluates the Task using run
and returns the value. You could have also just used the vector of Tasks and evaluated that result outside of sstreams e.g. .run.run.map(_.run)
.
It is probably more often the case that you already have a function and need to convert it into a channel. If your function body is not wrapped in a Task, it needs to be. But you can wrap any function with a little boilerplate:
There's a slightly better way to express this by implicitly grabbing a scalaz Functor[F]
but hopefully this illustrates the idea.
Exchanges
Last updated