Recipe: Delay running a stream

If you need to delay running a stream, you can easily delay it by using the time functions.

import fs2._
import fs2.util._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
implicit val strategy = Strategy.fromExecutionContext(global)
implicit val scheduler = Scheduler.fromFixedDaemonPool(4)
implicit val F = Async[Task]
scala> // delay a stream by 10 seconds
     | time.sleep(5.seconds).run.unsafeRun

This creates a stream that sleeps for a short time. Using unsafeRun will block the current thread though as unsafeRun awaits the completion fo the task created (our F effect) from the sleep stream.

If we want to run the stream in the background, we could:

scala> Await.ready( // Await is only here to capture the output for display
     |     time.sleep(5.seconds).run.unsafeRunAsyncFuture
     |     , Duration.Inf)
res2: scala.concurrent.Future[Unit] = Future(Success(()))

Which asks to run the Task (created from run) but use a scala Future. The .foreach prints out the result for convenience although in this case there is no output as expected.

To sleep a stream, we need to "add" on our own stream after the sleep:

scala> (time.sleep(5.seconds) ++ Stream.emit(true)).runLog.unsafeRun
res3: Vector[AnyVal] = Vector((), true)

This emits a Unit and a true after a 5 second delay. We can use this to write a pipe that sleeps the contents:

Instead of stream concatenation, we used flatMap to drop the Unit emitted by the time.sleep.

There are other ways to delay a stream. Streams are parameterized on F so we can use our chosen effect to potentially delay things.

For example, lets generate stream elements that are a result of calling to the outside world to obtain a security token periodically. Security tokens expire on a schedule that is often indicated by the current security token. If we want to generate a stream that obtains a new token just before the previous token expires but otherwise sleep, we can use Stream.unfoldEval and Task.schedule:

Run it and make sure there is a way to stop i.e. only take 10 values:

We could make this a bit more general:

and then call it like:

Last updated

Was this helpful?