My fs2 (was scalaz-stream) User Notes
  • Introduction
  • Foundations
  • Design Patterns
    • Category Theory Express Design Patterns
    • Category Theory
      • Semigroup
      • Monoid
      • Functor
      • Applicative
      • Products\/Coproducts
      • Monad
      • Free Monad
    • Category Theory for fs2
  • fs2 Core Model
    • Streams and Graphs
    • Composability
    • API
    • Effects
      • fs2 Effects
    • Smart Constructors and Combinators
  • fs2 Basics
    • Technical Setup
    • Simple Streams
    • Effectful Streams
    • Error Handling and Resource Management
    • Sinks
    • Channels and Exchanges
    • Combining Streams: Wye and Tee
    • Refs
  • Cookbook
    • Recipe: onFinish for Task
    • Recipe: Changing Seq[A] to A
    • Recipe: Debugging with toString
    • Recipe: Retrieving Web Pages
    • Recipe: Converting an Iterator to a Stream
    • Recipe: Data-Driven Stream
    • Recipe: Evaluate a Task Periodically
    • Recipe: Delay running a stream
    • Recipe: Stop an infinite Stream when you want to
    • Recipe: CSV Parsing with univocity
    • Recipe: CSV Parsing with node.js csv-parse using Readable => Stream
  • fs2 Examples
    • Akka Examples
    • Cafe
    • More Than One Stream
Powered by GitBook
On this page

Was this helpful?

  1. Cookbook

Recipe: onFinish for Task

import fs2._
import fs2.util._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
implicit val strategy = Strategy.fromExecutionContext(global)
implicit val scheduler = Scheduler.fromFixedDaemonPool(4)
implicit val F = Async[Task]

There is no onFinish or onComplete method on Task. How can we run a Task "callback" after a Task has finished? Assume your callback looks like dispose: Task[Unit] = .... Also assume that that the dispose task could also throw an exception. nnnnnn We can use Task.flatMap to sequence the computations. Since Task allows us to catch an exception or a value as an Either we can:

scala> val dispose = Task.delay(println("Cleanup!"))
dispose: fs2.Task[Unit] = Task

scala> val goodTask = Task.delay(1+1)
goodTask: fs2.Task[Int] = Task

scala> val badTask = Task.delay(throw new RuntimeException("Boom!"))
badTask: fs2.Task[Nothing] = Task

scala> goodTask.attempt.flatMap{ result =>
     |   println(s"Original task result: $result")
     |   dispose.flatMap(_ => 
     |     result.fold(Task.fail, Task.now))}.unsafeRun
Original task result: Right(2)
Cleanup!
res0: Int = 2

But for a task that that throws.

scala> badTask.attempt.flatMap{ result =>
     |   println(s"Original task result: $result")
     |   dispose.flatMap(_ => result.fold(Task.fail, Task.now))}.unsafeRun
Original task result: Left(java.lang.RuntimeException: Boom!)
Cleanup!
java.lang.RuntimeException: Boom!
  at $anonfun$1.apply(<console>:28)
  at $anonfun$1.apply(<console>:28)
  at fs2.Task$.$anonfun$delay$1(Task.scala:191)
  at fs2.Task$.$anonfun$suspend$2(Task.scala:199)
  at fs2.util.Attempt$.apply(Attempt.scala:12)
  at fs2.Task$.$anonfun$suspend$1(Task.scala:199)
  at fs2.internal.Future.step(Future.scala:54)
  at fs2.internal.Future.listen(Future.scala:30)
  at fs2.internal.Future.runAsync(Future.scala:69)
  at fs2.internal.Future.run(Future.scala:79)
  at fs2.TaskPlatform$JvmSyntax.unsafeRun(TaskPlatform.scala:14)
  ... 164 elided

Ouch! This actually throws the exception from the original bad task. We actually want that exception because its important information, but we realize that the Task.fail plucks the exception out and returns it from the flatMap. Depending on how you want to handle any errors from your badTask you need to add that at the end after the flatMap for resource cleanup. The .attempt on badTask is there so we can run our cleanup handler. Note that resource management is builtin to fs2 streams with Stream.bracket but we are working with Tasks here.

We could just convert any errors to an Either, this time, for the purposes of handling the original badTask errors:

scala> val badTask = Task.delay(throw new RuntimeException("Boom!"))
badTask: fs2.Task[Nothing] = Task

scala> badTask.attempt.flatMap{ result =>
     |   println(s"Original task result: $result")
     |   dispose.flatMap(_ => result.fold(Task.fail, Task.now))}.attempt.unsafeRun
Original task result: Left(java.lang.RuntimeException: Boom!)
Cleanup!
res2: fs2.util.Attempt[Nothing] = Left(java.lang.RuntimeException: Boom!)

If your Task already returns an Either or other Monad that captures the concept of an error (Either, Option, Try) you can apply the same idea.

This recipe is sourced from gitter.

PreviousCookbookNextRecipe: Changing Seq[A] to A

Last updated 5 years ago

Was this helpful?