scala>// delay a stream by 10 seconds| time.sleep(5.seconds).run.unsafeRun
This creates a stream that sleeps for a short time. Using unsafeRun will block the current thread though as unsafeRun awaits the completion fo the task created (our F effect) from the sleep stream.
If we want to run the stream in the background, we could:
scala> Await.ready( // Await is only here to capture the output for display| time.sleep(5.seconds).run.unsafeRunAsyncFuture| , Duration.Inf)res2: scala.concurrent.Future[Unit] = Future(Success(()))
Which asks to run the Task (created from run) but use a scala Future. The .foreach prints out the result for convenience although in this case there is no output as expected.
To sleep a stream, we need to "add" on our own stream after the sleep:
This emits a Unit and a true after a 5 second delay. We can use this to write a pipe that sleeps the contents:
/** A pipe that given a stream, delays it by delta. */defsleepFirst[F[_], O](delta: FiniteDuration)(implicit F: Async[F]): Pipe[F, O, O] = delayed => time.sleep(delta).flatMap(_ => delayed)
Instead of stream concatenation, we used flatMap to drop the Unit emitted by the time.sleep.
There are other ways to delay a stream. Streams are parameterized on F so we can use our chosen effect to potentially delay things.
For example, lets generate stream elements that are a result of calling to the outside world to obtain a security token periodically. Security tokens expire on a schedule that is often indicated by the current security token. If we want to generate a stream that obtains a new token just before the previous token expires but otherwise sleep, we can use Stream.unfoldEval and Task.schedule:
import java.util.concurrent.{TimeUnit => TU}caseclass AuthToken(expiresIn: FiniteDuration)// Simulate a taken that has a varying expiration time// A return value of None indicates an error of some sort.// You can use a Try or Either to capture any error information.// The Task reflects the asynchronous nature of obtaining a token// from a web service.defgetNextToken() = { println(s"Getting token: ${java.time.Instant.now}") Task.delay(Some(AuthToken((math.random *5).seconds)))}// Create a token stream. It will immediately try to get the first token.val authTokenStream = Stream.unfoldEval((0.0).seconds){ expiresIn => getNextToken() .schedule(expiresIn) // schedule getting the token through the F effect (=Task) .map{ nextTokenOpt =>// If a token is found, emit the token and set the state to the next time it is needed// with a small decrease so it does not expire before the call is issued. nextTokenOpt.map { nextToken =>val delay = FiniteDuration((nextToken.expiresIn *0.95).toNanos, TU.NANOSECONDS) (nextToken, delay) }}}
Run it and make sure there is a way to stop i.e. only take 10 values:
/** Unfold, periodically checking an effect for new values. * Time between checks is obtained using getDelay potentially * using the returned effect value. * @param A Output element type. * @param f Call an effect to get a value. * @param getDelay Extract amount to delay from that value. */defunfoldEvalWithDelay[A](f: => Task[Option[A]], getDelay: A => FiniteDuration) (implicit F: Async[Task], strategy: Strategy, scheduler: fs2.Scheduler) = Stream.unfoldEval((0.0).seconds){ delay => f.schedule(delay) .map{ opt => opt.map { a => (a, getDelay(a))}}} /** Calculate a delay but use fraction to shorten the delay. */defshortenDelay(fraction: Double =0.95, delay: FiniteDuration): FiniteDuration = FiniteDuration((delay *0.95).toNanos, TU.NANOSECONDS)