If you need to delay running a stream, you can easily delay it by using the time functions.
import fs2._
import fs2.util._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
implicit val strategy = Strategy.fromExecutionContext(global)
implicit val scheduler = Scheduler.fromFixedDaemonPool(4)
implicit val F = Async[Task]
scala> // delay a stream by 10 seconds
| time.sleep(5.seconds).run.unsafeRun
This creates a stream that sleeps for a short time. Using unsafeRun will block the current thread though as unsafeRun awaits the completion fo the task created (our F effect) from the sleep stream.
If we want to run the stream in the background, we could:
scala> Await.ready( // Await is only here to capture the output for display
| time.sleep(5.seconds).run.unsafeRunAsyncFuture
| , Duration.Inf)
res2: scala.concurrent.Future[Unit] = Future(Success(()))
Which asks to run the Task (created from run) but use a scala Future. The .foreach prints out the result for convenience although in this case there is no output as expected.
To sleep a stream, we need to "add" on our own stream after the sleep:
This emits a Unit and a true after a 5 second delay. We can use this to write a pipe that sleeps the contents:
/** A pipe that given a stream, delays it by delta. */
def sleepFirst[F[_], O](delta: FiniteDuration)(implicit F: Async[F]): Pipe[F, O, O] =
delayed => time.sleep(delta).flatMap(_ => delayed)
Instead of stream concatenation, we used flatMap to drop the Unit emitted by the time.sleep.
There are other ways to delay a stream. Streams are parameterized on F so we can use our chosen effect to potentially delay things.
For example, lets generate stream elements that are a result of calling to the outside world to obtain a security token periodically. Security tokens expire on a schedule that is often indicated by the current security token. If we want to generate a stream that obtains a new token just before the previous token expires but otherwise sleep, we can use Stream.unfoldEval and Task.schedule:
import java.util.concurrent.{TimeUnit => TU}
case class AuthToken(expiresIn: FiniteDuration)
// Simulate a taken that has a varying expiration time
// A return value of None indicates an error of some sort.
// You can use a Try or Either to capture any error information.
// The Task reflects the asynchronous nature of obtaining a token
// from a web service.
def getNextToken() = {
println(s"Getting token: ${java.time.Instant.now}")
Task.delay(Some(AuthToken((math.random * 5).seconds)))
}
// Create a token stream. It will immediately try to get the first token.
val authTokenStream = Stream.unfoldEval((0.0).seconds){ expiresIn =>
getNextToken()
.schedule(expiresIn) // schedule getting the token through the F effect (=Task)
.map{ nextTokenOpt =>
// If a token is found, emit the token and set the state to the next time it is needed
// with a small decrease so it does not expire before the call is issued.
nextTokenOpt.map { nextToken =>
val delay = FiniteDuration((nextToken.expiresIn * 0.95).toNanos, TU.NANOSECONDS)
(nextToken, delay)
}}}
Run it and make sure there is a way to stop i.e. only take 10 values:
/** Unfold, periodically checking an effect for new values.
* Time between checks is obtained using getDelay potentially
* using the returned effect value.
* @param A Output element type.
* @param f Call an effect to get a value.
* @param getDelay Extract amount to delay from that value.
*/
def unfoldEvalWithDelay[A](f: => Task[Option[A]],
getDelay: A => FiniteDuration)
(implicit F: Async[Task], strategy: Strategy, scheduler: fs2.Scheduler) =
Stream.unfoldEval((0.0).seconds){ delay =>
f.schedule(delay)
.map{ opt =>
opt.map { a =>
(a, getDelay(a))}}}
/** Calculate a delay but use fraction to shorten the delay. */
def shortenDelay(fraction: Double = 0.95, delay: FiniteDuration): FiniteDuration =
FiniteDuration((delay * 0.95).toNanos, TU.NANOSECONDS)