fs2 Effects
fs2 comes with a few effects of its own. You can plugin in your own. This is important because you may not want to use fs2's Task, you may want to use someone else's Task. You can also use other types of effects such as Either or Option but in reality you will want to use something that allows asynchronous effects.
Effects as implemented in most libraries include effect4s or cats-effect are really "strategy design patterns." They implement a few methods that allow you to create the intended effect. The target effect is a class such as Task. Once you have a stable effects "API" you can build on it, which is what the scala macro that generalized async/await library monadless did.
For example, when you summon an Async instance in fs2 via implicit val F = Async[Task]
you are really asking for a strategy class to be found that has a few special methods to create Tasks.
In the case of Async, the effect is one of running code asynchronously. Async implements a critical method that allows you to interface to other code, whether synchronous or not, and returns an instance of the effect type. It may seem strange that you need a "strategy design pattern" object for this, but the API of different effects can vary dramatically or sometimes, confusingly similar. Future and Task, for example, have some similarities in their API but differ in their semantics--Future starts computing right away. The Async can abstract over this and provide a single API for you to use.
The effects API generally has to worry about a two core areas:
Creating the target effect instance e.g. lifting a value to the target effect.
Handling errors
The effects object, such as Async, allows you to create the target effect, such as a Task, using the same "Async" API everywhere. It is still up to you to actually run the resulting Task if that is what your target effect requires.
Let's look at a few examples. First we need to establish, for convenience, an implicit effect, called F
. If we use F
directly it does not need to be implicitly declared, but we will need the implicit part later in this section.
If we tab complete the F in the repl, for this version of fs2, you get:
Async (or F) also has pure, which we know is a haskell word that lifts a strict value into a monad. Hence we could do:
It is normal that the effect is capitalized since its not really a "data" class per se nor is it the actual effect. Letters F
and E
are popular.
After we lifted the value using pure, we had a Task returned. To see what was in the Task, we have to run the Task using the Task's API, not the Async API. unsafeRun
runs and returns a value, blocking until the result of the computation is available. unsafeRun
could throw an exception if your task code throws an exception. We typically want different error handling processing.
Let's say we are concerned that an exception could be thrown but we want to handle the exception as a value instead using a try-catch block (which is very unfunctional).
We can use the Async API to do that by:
We provided F.attempt
with the target effect, Task, directly, to show that the .attempt
belongs to F
. Other strategy patterns that model effects may provide other methods to manage errors. Having to specify the target effect directly in F is not helpful and not the recommended approach so we can just use F.pure
like above.
An fs2 Attempt
is a convenient name for Either[Throwable, A]
. Let's compose this using only F
:
Alight, a pure value of 1 is not throwing an exception but we see that we get an Either back. Since "right is good" for an Either, we get a Right(1)
returned. If we lift something that throws an error:
Which shows that the exception is now a "value." We did not use "pure" to lift the "processing code" because we wanted to wait for the Task to be created and catch the exception. If we had used pure, the throw would have happened immediately and the Task would not have seen it:
The Task interface is rich with methods for selecting how a Task should be run. For example, you can delay running the Task for a certain amount of time, you can "race" a Task with another Task and receive the first result back. You can also have the task return an Option which holds an Either[Throwable, A]
where None indicates that it synchronously until an async boundary was hit, then returned the Either. This asynchronous "boundary" sounds strange, but the idea is that since you can compose Tasks together using, say, flatMap, and each flatMap could return a Task that runs asynchronously, you may want a way to control evaluation more precisely. This type of "boundary" behavior is specific to Task so the Async API knows nothing about it nor provides any API to address it.
You saw before that Async had a method .attempt
. Task also has an .attempt
. Previously, we used the Async method, but we could use Task's directly. It turns out to be more convenient in some cases:
Since .attempt
occurs after the .pure
it is being applied to the Task, not Async. Now we could have done this using just the Async API like we showed earlier:
Of course using .attempt
twice, once for F and once for Task would not make sense:
You will need to choose which one works for you. If your effect already includes an .attempt
or something like that, you can use that, or use F.attempt
and F.delay
which allows you to use F
without any knowledge about the underlying effect--a better choice when writing your own code that allows the user to specify the effect they would like to use.
Async also has two curious methods: unsafeRunAsync
and async
. Both of these methods take callbacks that allow you to interface with other code. These methods are useful when integrating with other code that themselves require callbacks.
Let's just look at Async.async
as the other method is very similar in concept. The signature is:
We know the Either captures an error or a value. The signature Either => Unit
indicates that the parameter to async
needs to be a function. It takes a callback as the first argument. You would call the callback when your domain specific processing has an error or a value to return. Calling the callback returns Unit since there is no communication needed by your processing block from the async
function e.g. an acknowledgement. But async
has the => F[Unit]
at the end as well, so we need are getting a <callback> => F[Unit]
. The main idea here is that when you work asynchronously, you want to indicate "when" the callback should be called. In many languages you cannot delay computation, but in scala you can. Hence, your application code needs to call the callback and return a F
that tells F.async
when to process that callback. Many libraries, especially javascript, assume that the callback processing happens immediately. However, you never really know what the library may do with your callback. Its a bit more precise here.
Here's an example:
But we may want to delay processing a bit when we interface to the outside world:
Notice that we used Task.schedule
which means our function was not completely agnostic to the specific effect type. That's because Async does not know about "scheduling." The lunch delay occurs on our thread, we could have started it in the background the asked it to delay. Don't schedule a delay directly on your thread.
We can also process bad things that happen:
All of these example really only call the outside world once. If you wanted to repeat the call to the outside world, you would generate a stream of values and for that, you would use fs2 streams of course.
Last updated