My fs2 (was scalaz-stream) User Notes
  • Introduction
  • Foundations
  • Design Patterns
    • Category Theory Express Design Patterns
    • Category Theory
      • Semigroup
      • Monoid
      • Functor
      • Applicative
      • Products\/Coproducts
      • Monad
      • Free Monad
    • Category Theory for fs2
  • fs2 Core Model
    • Streams and Graphs
    • Composability
    • API
    • Effects
      • fs2 Effects
    • Smart Constructors and Combinators
  • fs2 Basics
    • Technical Setup
    • Simple Streams
    • Effectful Streams
    • Error Handling and Resource Management
    • Sinks
    • Channels and Exchanges
    • Combining Streams: Wye and Tee
    • Refs
  • Cookbook
    • Recipe: onFinish for Task
    • Recipe: Changing Seq[A] to A
    • Recipe: Debugging with toString
    • Recipe: Retrieving Web Pages
    • Recipe: Converting an Iterator to a Stream
    • Recipe: Data-Driven Stream
    • Recipe: Evaluate a Task Periodically
    • Recipe: Delay running a stream
    • Recipe: Stop an infinite Stream when you want to
    • Recipe: CSV Parsing with univocity
    • Recipe: CSV Parsing with node.js csv-parse using Readable => Stream
  • fs2 Examples
    • Akka Examples
    • Cafe
    • More Than One Stream
Powered by GitBook
On this page

Was this helpful?

  1. Cookbook

Recipe: Stop an infinite Stream when you want to

PreviousRecipe: Delay running a streamNextRecipe: CSV Parsing with univocity

Last updated 5 years ago

Was this helpful?

Sometimes you need to stop a stream that you working on that runs indefinitely or running in the "wild" (which means you may have a few streams running independently in your application). It's obvious using the "pull" model that when the input stream runs out of data, the overall stream should stop. But what if the stream is infinite? We can use a Signal to interrupt a Stream. You can also use a stream of booleans but we will show the signal approach.

// set the effect to use and ensure an Async is available
val done = async.signalOf(false).unsafeRun
// run your inifinite stream...
your_inifintite_or_bounded_stream.interruptWhen(done).run.unsafeRunAsyncFuture()

println("Press Enter to stop...")
readLine()
done.set(true).unsafeRun

We are using a Signal outside of a stream, which is perfectly allowable. Here's what's going on:

  • First, we need a Signal, so we create one. But its wrapped in a Task (the effect), so we need to extract it out using unsafeRun.

  • Signal.interrupt takes a stream argument and runs that stream. Underneath, it uses Stream.interruptWhen that stops the stream when the Signal is set to true.

  • The readLine() simulates some condition that you control.

  • The last line sets the Signal to true. But since the Signal communicates through the "effect", you need to run the Task returned from set that actually sets the value.

Once set, the stream will be interrupted and stop.

Notice that we used unsafeRun which is not strictly necessary. You could do all of this in a streamy way. The approach was described . There is also a good description of .

scala> /** Interrupt immediately. */
     | def interruptNow[F[_], A](implicit F: Async[F]): Pipe[F, A, A] =
     |     toBeInterrupted => 
     |         // Create a cancellation signal
     |         Stream.eval(async.signalOf(false)).flatMap { cancellationSignal =>
     |            // Attached it to the input stream
     |            toBeInterrupted.interruptWhen(cancellationSignal).merge(Stream.eval_(cancellationSignal.set(true)))
     |     }
interruptNow: [F[_], A](implicit F: fs2.util.Async[F])fs2.Pipe[F,A,A]

scala> /** Delay interruption. */
     | def interruptAfter[F[_], A](delay: FiniteDuration)(implicit F: Async[F]): Pipe[F, A, A] =
     |   toBeInterrupted =>
     |     Stream.eval(async.signalOf(false)).flatMap { cancellationSignal =>
     |       toBeInterrupted.interruptWhen(cancellationSignal).merge(time.sleep_(delay) ++ Stream.eval_(cancellationSignal.set(true)))
     |     }
interruptAfter: [F[_], A](delay: scala.concurrent.duration.FiniteDuration)(implicit F: fs2.util.Async[F])fs2.Pipe[F,A,A]

How does this work?

  • Create a pipe because we want to create a combinator that is easy to use and that would convert an existing stream.

  • Create a signal that allows the use of Stream.interruptWhen. The signal must be a boolean.

  • Since signals are wrapped in effects, use a stream to unwrap the signal so we can use it directly in toBeInterrupted.interruptWhen

  • Merge the "interruptable" stream with a stream that sets the interrupt signal to true.

  • Yes, this is a silly example, because interruption is really only useful if it is criteria based.

  • interruptAfter interrupts based on a delay-based criteria. So this is not silly.

val logStart = Stream.eval_(Task.delay(println("Started: " + System.currentTimeMillis)))
val logFinished = Stream.eval_(Task.delay(println("Finished: " + System.currentTimeMillis)))

def randomIntEffect(): Task[Int] = Task.delay { (math.random * 100).toInt }

val infiniteStream = time.awakeEvery[Task](1.seconds).evalMap{ delta => 
    println(s"Delta: $delta")
    randomIntEffect()
}

Now we can run this, first interrupting immediately:

scala> val solutionNow = logStart ++ infiniteStream.through(interruptNow) ++ logFinished
solutionNow: fs2.Stream[fs2.Task,Int] = append(append(attemptEval(Task).flatMap(<function1>).flatMap(<function1>), Segment(Emit(Chunk(()))).flatMap(<function1>)), Segment(Emit(Chunk(()))).flatMap(<function1>))

scala> solutionNow.runLog.unsafeRun
Started: 1502804969299
res4: Vector[Int] = Vector()

then interrupting after a 6 seconds, which may gives 5 or 6 results:

scala> val solutionAfter = logStart ++ infiniteStream.through(interruptAfter(5.seconds)) ++ logFinished
solutionAfter: fs2.Stream[fs2.Task,Int] = append(append(attemptEval(Task).flatMap(<function1>).flatMap(<function1>), Segment(Emit(Chunk(()))).flatMap(<function1>)), Segment(Emit(Chunk(()))).flatMap(<function1>))

scala> solutionAfter.runLog.unsafeRun
Started: 1502804969481
res5: Vector[Int] = Vector(98, 51, 39, 28, 83)

We could generalize this more to any trigger wrapped in an effect and when "triggered," interrupts the stream. We could even use another "signal" based on the first approach. If you want to be able to interrupt a stream in a repl that is running asynchronously in the background, we can setup a small effect, compose it into the stream, then "run" the effect in the repl to terminate a stream running in the background.

INSERT COOL EXAMPLE HERE

in this gist
pausing