Recipe: Delay running a stream
If you need to delay running a stream, you can easily delay it by using the time functions.
import fs2._
import fs2.util._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
implicit val strategy = Strategy.fromExecutionContext(global)
implicit val scheduler = Scheduler.fromFixedDaemonPool(4)
implicit val F = Async[Task]scala> // delay a stream by 10 seconds
| time.sleep(5.seconds).run.unsafeRunThis creates a stream that sleeps for a short time. Using unsafeRun will block the current thread though as unsafeRun awaits the completion fo the task created (our F effect) from the sleep stream.
If we want to run the stream in the background, we could:
scala> Await.ready( // Await is only here to capture the output for display
| time.sleep(5.seconds).run.unsafeRunAsyncFuture
| , Duration.Inf)
res2: scala.concurrent.Future[Unit] = Future(Success(()))Which asks to run the Task (created from run) but use a scala Future. The .foreach prints out the result for convenience although in this case there is no output as expected.
To sleep a stream, we need to "add" on our own stream after the sleep:
scala> (time.sleep(5.seconds) ++ Stream.emit(true)).runLog.unsafeRun
res3: Vector[AnyVal] = Vector((), true)This emits a Unit and a true after a 5 second delay. We can use this to write a pipe that sleeps the contents:
Instead of stream concatenation, we used flatMap to drop the Unit emitted by the time.sleep.
There are other ways to delay a stream. Streams are parameterized on F so we can use our chosen effect to potentially delay things.
For example, lets generate stream elements that are a result of calling to the outside world to obtain a security token periodically. Security tokens expire on a schedule that is often indicated by the current security token. If we want to generate a stream that obtains a new token just before the previous token expires but otherwise sleep, we can use Stream.unfoldEval and Task.schedule:
Run it and make sure there is a way to stop i.e. only take 10 values:
We could make this a bit more general:
and then call it like:
Last updated
Was this helpful?