# Effectful Streams

In the last section, we saw that fs2 may use an effect to process a pure stream e.g. running it.

However, effects can intentionally be used anywhere in your stream. You can convert between different effects depending on your processing needs.

We can evaluate an arbitrary effect in a context and use that as a source:

```scala
fs2> var x = 2 
x: Int = 2
fs2> val p1 = Stream.eval(Task.delay(x + 2)) 
p1: Stream[Task, Int] = attemptEval(Task).flatMap(<function1>)
fs2> val p2 = Stream.eval(Task.delay(x + 3)) 
p2: Stream[Task, Int] = attemptEval(Task).flatMap(<function1>)
fs2> p1.runLog.unsafeRun 
res58: Vector[Int] = Vector(6)
fs2> p2.runLog.unsafeRun 
res59: Vector[Int] = Vector(7)
```

The stream became effectful because we asked it to eval something and that something was a Task. The item output by the stream is an integer, but the effect is a `Task`.

The above just shows that inside a Task, we can return a value. Now let's have that Task process a side effect:

```scala
fs2> val p3 = Stream.eval(Task.delay(x = 10)) 
p3: Stream[Task, Unit] = attemptEval(Task).flatMap(<function1>)
fs2> val p4 = Stream.eval(Task.delay(x = x + 10)) 
p4: Stream[Task, Unit] = attemptEval(Task).flatMap(<function1>)
fs2> p3.runLog.unsafeRun 
res62: Vector[Unit] = Vector(())
fs2> p4.runLog.unsafeRun 
res63: Vector[Unit] = Vector(())
fs2> x 
res64: Int = 20
```

We see that runLog returns Unit, which is to be expected since the Task performs an assignment, a side effect, on `x`. Running both changes the environment, in this case, setting x to 20 eventually.

This also shows that `Task.delay` is not evaluated until the process is run. Task.delay uses scala by name parameters. As a slight detour about using Task, you could use `Task.now` to force evaluation at the point that the `Task.now` is called.

It may not seem very useful to have a stream with only one value emitted, but you can imagine that you might repeat the side effect call each time, perhaps from an IO source, and return a different value.

## Synchronous Effects

The example above using Task ran the Task sychronously. We can see that by doing the following:

```scala
fs2> Stream.eval(Task.delay { println(s"${Thread.currentThread.getName}"); System.currentTimeMillis }).repeat.take(5).runLog.unsafeRun 
main
main
main
main
main
res67: Vector[Long] = Vector(1475333678504L, 1475333678513L, 1475333678514L, 1475333678515L, 1475333678516L)
```

and we can even be explicit:

```scala
fs2> Stream.eval(Task.delay { println(s"${Thread.currentThread.getName}"); System.currentTimeMillis }).repeat.take(5).runLog.unsafeRunSync 
main
main
main
main
main
res71: Either[Attempt[Vector[Long]] => Unit => Unit, Vector[Long]] = Right(Vector(1475333765381L, 1475333765382L, 1475333765382L, 1475333765383L, 1475333765383L))
```

The `take(5)` takes a stream then creates a new stream with the `take` applied to it. There are many operators that operate directly on a stream including map, flatmap, etc. You need to look at the Stream's class documentation to see them all. We could have also used `repeatEval` to create a stream of values instead of using eval and repeat separately.

## Asynchronous Effects

Running tasks on the main thread is not great if the computation takes awhile. However, we must thoughtful when we think about running on different threads because where we indicate want to run asynchronously makes a difference.

We can for example, run the final effectful Task that is created when calling runLog on a stream asynchronously:

```scala
fs2> Stream.repeatEval(Task.delay { println(s"${Thread.currentThread.getName}"); System.currentTimeMillis }).take(5).runLog 
res76: Task[Vector[Long]] = Task
```

There's a Task as the final output, now we need to run a Task. We can run a task many ways but Task's API may be confusing a bit.

```scala
fs2> Stream.repeatEval(Task.delay { println(s"${Thread.currentThread.getName}"); System.currentTimeMillis }).take(5).runLog.unsafeRunAsyncFuture 
main
main
main
main
main
res79: scala.concurrent.Future[Vector[Long]] = Success(Vector(1475334408289, 1475334408289, 1475334408290, 1475334408290, 1475334408290))
```

We see that this returned a future, but we see that the internal tasks still ran on the main thread. `unsafeRunAyncFuture` returns the result in a Future, but the internal tasks never "requested" to be run asynchronously so they were forced back onto the main thread.

We could drop the `Future` part and just call `unsafeRunAsync` but then we need to supply a callback that handles both successful values as well as exceptions returned as values. In fs2, this is the `Attempt` type which is really a `Either[Throwable, A]`. `unsafeRunAsync` also does not do what we think.

So let's try another method on Task which is `async`. `async` requires a Strategy object (which is in our standard imports for amm):

```scala
fs2> strategy 
res85: Strategy = Strategy
```

We might expect `async` to immediately start running, but async actually just returns a Task that when run, starts the asynchronous computation:

```scala
fs2> val asyncTask  = Stream.repeatEval(Task.delay { println(s"${Thread.currentThread.getName}"); System.currentTimeMillis }).take(5).runLog.async 
asyncTask: Task[Vector[Long]] = Task
fs2>
```

Since we do not see any output, we know that the inner Tasks did not run. Now we can run it:

```scala
fs2> asyncTask.unsafeRun 
ForkJoinPool-1-worker-5
ForkJoinPool-1-worker-5
ForkJoinPool-1-worker-5
ForkJoinPool-1-worker-5
ForkJoinPool-1-worker-5
res84: Vector[Long] = Vector(1475335219673L, 1475335219674L, 1475335219674L, 1475335219674L, 1475335219674L)
```

Which shows that all the inner Tasks did run asynchronously on other thread, but it did not do a whole lot regarding running those individual on different threads. In other words, the stream itself was run on a different thread but the inner tasks of our stream were still running synchronously on that thread.

That's because when we used Task.delay to define that stream, we did not tell each inner Task to run asynchronously using our Strategy object. We can do this by using `apply` instead of `delay`.

If we make the change and just use unsafeRun on our main stream we get:

```scala
fs2> val asyncTask  = Stream.repeatEval(Task { println(s"${Thread.currentThread.getName}"); System.currentTimeMillis }).take(5).runLog 
asyncTask: Task[Vector[Long]] = Task

fs2> asyncTask.unsafeRun 
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-7
res90: Vector[Long] = Vector(1475335626584L, 1475335626585L, 1475335626585L, 1475335626585L, 1475335626586L)
```

You may need to run unsafeRun multiple times because you see multiple threads being used since which thread is used in the pool is non-deterministic.

The inner tasks did not do much, the computation was mostly self-constained. We could add some internal state used to calculate the task output:

```scala
fs2> def supplyRandom(maxValue: Int): Stream[Task, Int] = {
       var r =new java.util.Random
     Stream.repeatEval(Task{println(s"${Thread.currentThread.getName}"); r.nextInt(maxValue)})
     } 
defined function supplyRandom
fs2> supplyRandom(100).take(10).runLog.unsafeRun 
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-7
ForkJoinPool-1-worker-7
ForkJoinPool-1-worker-7
ForkJoinPool-1-worker-7
ForkJoinPool-1-worker-7
ForkJoinPool-1-worker-5
ForkJoinPool-1-worker-5
ForkJoinPool-1-worker-5
res99: Vector[Int] = Vector(4, 87, 26, 75, 31, 52, 38, 18, 25, 90)
```

This shows us that we can do asynchronous work with state. You'll also notice that this is still basically a "pull" model where the stream interpreter drives the "pull" of values from the source.

## Side-Show: What are all those unsafe\* Task Methods?

We see alot of "unsafe\*" methods on Task. What are those for? If we tab complete a Task, we get:

```scala
fs2> Task.now(42). 
!=                     ensure                 handleWith             race                   unsafeAttemptRunFor    unsafeRunAsyncFuture   |>
==                     equals                 hashCode               schedule               unsafeAttemptRunSync   unsafeRunFor
asInstanceOf           flatMap                isInstanceOf           self                   unsafeAttemptValue     unsafeRunSync
async                  getClass               map                    toString               unsafeRun              unsafeTimed
attempt                handle                 or                     unsafeAttemptRun       unsafeRunAsync         unsafeValue
```

That's alot.

Here's the way to think about them. The Task wants to promote good behavior on your part. When it runs something, the thing inside that is running could be very effectful, e.g. reading from the disk. This may lead to exceptions or other types of errors. The different varieties of unsafe\* methods allows you to choose how to run the Task and manage the potential errors. Recall that `Attempt` is really just an alias type for `Either[Throwable, A]` so clearly any method with the word "Attempt" in it is related to this concept of treating errors as values.

We have already seen that

* async: Runs the overall Task on a separate thread but this may not translate into running the other Tasks in the stream on different threads.
* unsafeRun: Runs on the current thread and returns the current value. Exceptions are thrown if an exception occurs and you have to wrap your call with a try-catch to catch the exception.
* unsafeRunSync: Runs the task sychronously on the current thread and return an Either with either the value or a callback function that is called when any underlying asynchrounous computation is encountered.&#x20;
* unsafeRunAsyncFuture/unsafeRunAsync: Runs the Task on the calling thread but if an asynchronous computation is encountered, returns immediately while letting that computation complete on that thread. Either return the result in a Future or provide a callback to handle the returned value.
* unsafeValue: Instead of an either, return an Option. None means that there some error, but you have no information on that error.

You can also schedule the Task to run after a delay ("unsafeTimed"/"schedule") or under a timer ("runFor") so that an exception is thrown if you wait too long.

You can handle errors in a Task by explicitly converting them to an Either (via "attempt") or you can use some error handler like:

* handle: This is just like recover in a scala Future.
* handleWith: This is just like a recoverWith in a scala Future.

So fine grained error handling is really through "attempt" and "handle" or even "flatMap".

This is all still pretty complex of course, but the intent is to make it less complex than rolling your own framework.

A Stream can be made up of lots of synchronous or asynchrounous parts. Streams may at one moment by sync then turn async for a computation then join back together to be sync. In general you may not know the what's in your stream exactly but you still want to get the "output" value at some point at the "end of the world" of your program.

Let's give a specific example of this behavior and use "unsafeRunSync".

```scala
s2> Task.now(42).unsafeRunSync 
res106: Either[Attempt[Int] => Unit => Unit, Int] = Right(42)
```

This tells us that when we explicitly want to run the task synchronously, there may still be async processing going on. In this case, I received a Right(42) because the process was basically a pure value being lifted to a Task, it finished immediately. (note to author: I'm not sure this shows the async part prior to unsafeRunSync).

What causes a return of a Left(callback)?

```scala
fs2> val x = Task.delay{ println("doit") }.flatMap(_ => Task{Thread.sleep(5000); println(s"${Thread.currentThread.getName}") }.async) 
x: Task[Unit] = Task
fs2> x.unsafeRunSync 
doit
res125: Either[Attempt[Unit] => Unit => Unit, Unit] = Left(<function1>)
fs2> ForkJoinPool-1-worker-7
```

Here we see that we create a Task whose "head" is really a synchronous Task but whose flatMap runs a Task asynchronously. It's hard to tell, but the worker thread name is printed after the prompt returns. In other words, "unsafeRunSync" starts off by seeing that it is sync but there is an underlying async so it returns a Left. If you had both sync tasks even in the flatMap you get:

```scala
fs2> val x = Task.delay{ println("doit") }.flatMap(_ => Task.delay{Thread.sleep(5000); println(s"${Thread.currentThread.getName}") }) 
x: Task[Unit] = Task
fs2> x.unsafeRunSync 
doit
main
res129: Either[Attempt[Unit] => Unit => Unit, Unit] = Right(())
fs2>
```

Here we still encounter the sleep, its all run on the main thread (hence the thread name main) and the main thread name printed synch right after doit. So it all ran synchronously. We also see that we got a Right back indicating that Task "knew" it could return a Right.

So unsafeRunSync is about async boundaries in the map/flatMap of the Task NOT async boundaries inside a stream. unsafeRunSync helps you manage your final "end of world" computation when you are modifying it after you create the Task itself.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://devon-miller.gitbook.io/test_private_book/intro_basics/effectful_streams2.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
