# Recipe: Delay running a stream

If you need to delay running a stream, you can easily delay it by using the `time` functions.

```scala
import fs2._
import fs2.util._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
implicit val strategy = Strategy.fromExecutionContext(global)
implicit val scheduler = Scheduler.fromFixedDaemonPool(4)
implicit val F = Async[Task]
```

```scala
scala> // delay a stream by 10 seconds
     | time.sleep(5.seconds).run.unsafeRun
```

This creates a stream that sleeps for a short time. Using unsafeRun will block the current thread though as unsafeRun awaits the completion fo the task created (our F effect) from the sleep stream.

If we want to run the stream in the background, we could:

```scala
scala> Await.ready( // Await is only here to capture the output for display
     |     time.sleep(5.seconds).run.unsafeRunAsyncFuture
     |     , Duration.Inf)
res2: scala.concurrent.Future[Unit] = Future(Success(()))
```

Which asks to run the Task (created from run) but use a scala Future. The `.foreach` prints out the result for convenience although in this case there is no output as expected.

To sleep a stream, we need to "add" on our own stream after the sleep:

```scala
scala> (time.sleep(5.seconds) ++ Stream.emit(true)).runLog.unsafeRun
res3: Vector[AnyVal] = Vector((), true)
```

This emits a Unit and a `true` after a 5 second delay. We can use this to write a pipe that sleeps the contents:

```scala
/** A pipe that given a stream, delays it by delta. */
def sleepFirst[F[_], O](delta: FiniteDuration)(implicit F: Async[F]): Pipe[F, O, O] =
  delayed => time.sleep(delta).flatMap(_ =>  delayed)
```

```scala
scala> Stream.emit(true).through(sleepFirst(5 seconds)).runLog.unsafeRun
res5: Vector[Boolean] = Vector(true)
```

Instead of stream concatenation, we used flatMap to drop the Unit emitted by the `time.sleep`.

There are other ways to delay a stream. Streams are parameterized on `F` so we can use our chosen effect to potentially delay things.

For example, lets generate stream elements that are a result of calling to the outside world to obtain a security token periodically. Security tokens expire on a schedule that is often indicated by the current security token. If we want to generate a stream that obtains a new token just before the previous token expires but otherwise sleep, we can use `Stream.unfoldEval` and `Task.schedule`:

```scala
import java.util.concurrent.{TimeUnit  => TU}

case class AuthToken(expiresIn: FiniteDuration)

// Simulate a taken that has a varying expiration time
// A return value of None indicates an error of some sort.
// You can use a Try or Either to capture any error information.
// The Task reflects the asynchronous nature of obtaining a token
// from a web service.
def getNextToken() = { 
   println(s"Getting token: ${java.time.Instant.now}")
   Task.delay(Some(AuthToken((math.random * 5).seconds)))
}

// Create a token stream. It will immediately try to get the first token.
val authTokenStream = Stream.unfoldEval((0.0).seconds){ expiresIn => 
    getNextToken()
    .schedule(expiresIn) // schedule getting the token through the F effect (=Task)
    .map{ nextTokenOpt => 
       // If a token is found, emit the token and set the state to the next time it is needed
       // with a small decrease so it does not expire before the call is issued.
       nextTokenOpt.map { nextToken => 
           val delay = FiniteDuration((nextToken.expiresIn * 0.95).toNanos, TU.NANOSECONDS)
           (nextToken, delay)
       }}}
```

Run it and make sure there is a way to stop i.e. only take 10 values:

```scala
scala> authTokenStream.take(10).runLog.unsafeRun
Getting token: 2017-08-15T13:49:56.004Z
res15: Vector[AuthToken] = Vector(AuthToken(4871614083 nanoseconds), AuthToken(203938202 nanoseconds), AuthToken(20214022 nanoseconds), AuthToken(73392269 nanoseconds), AuthToken(423479764 nanoseconds), AuthToken(143081291 nanoseconds), AuthToken(4430848941 nanoseconds), AuthToken(1686590951 nanoseconds), AuthToken(417218776 nanoseconds), AuthToken(52172066 nanoseconds))
```

We could make this a bit more general:

```scala
 /** Unfold, periodically checking an effect for new values.
    * Time between checks is obtained using getDelay potentially
    * using the returned effect value.
    * @param A Output element type.
    * @param f Call an effect to get a value.
    * @param getDelay Extract amount to delay from that value.
    */
  def unfoldEvalWithDelay[A](f: => Task[Option[A]],
    getDelay: A => FiniteDuration)
    (implicit F: Async[Task], strategy: Strategy, scheduler: fs2.Scheduler) =
    Stream.unfoldEval((0.0).seconds){ delay =>
      f.schedule(delay)
        .map{ opt =>
          opt.map { a =>
            (a, getDelay(a))}}}

  /** Calculate a delay but use fraction to shorten the delay. */
  def shortenDelay(fraction: Double = 0.95, delay: FiniteDuration): FiniteDuration =
    FiniteDuration((delay * 0.95).toNanos, TU.NANOSECONDS)
```

and then call it like:

```scala
scala> unfoldEvalWithDelay(getNextToken, (auth: AuthToken) => shortenDelay(delay=auth.expiresIn)).take(5).runLog.unsafeRun
Getting token: 2017-08-15T13:50:07.949Z
res19: Vector[AuthToken] = Vector(AuthToken(299479746 nanoseconds), AuthToken(2231396086 nanoseconds), AuthToken(619609193 nanoseconds), AuthToken(2901539413 nanoseconds), AuthToken(3456834684 nanoseconds))
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://devon-miller.gitbook.io/test_private_book/miscellaneous/recipe-delay-running-a-stream.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
