Recipe: Stop an infinite Stream when you want to
Sometimes you need to stop a stream that you working on that runs indefinitely or running in the "wild" (which means you may have a few streams running independently in your application). It's obvious using the "pull" model that when the input stream runs out of data, the overall stream should stop. But what if the stream is infinite? We can use a Signal to interrupt a Stream. You can also use a stream of booleans but we will show the signal approach.
We are using a Signal outside of a stream, which is perfectly allowable. Here's what's going on:
First, we need a Signal, so we create one. But its wrapped in a Task (the effect), so we need to extract it out using unsafeRun.
Signal.interrupt takes a stream argument and runs that stream. Underneath, it uses
Stream.interruptWhen
that stops the stream when the Signal is set to true.The readLine() simulates some condition that you control.
The last line sets the Signal to true. But since the Signal communicates through the "effect", you need to run the Task returned from
set
that actually sets the value.
Once set, the stream will be interrupted and stop.
Notice that we used unsafeRun
which is not strictly necessary. You could do all of this in a streamy way. The approach was described in this gist. There is also a good description of pausing.
How does this work?
Create a pipe because we want to create a combinator that is easy to use and that would convert an existing stream.
Create a signal that allows the use of
Stream.interruptWhen
. The signal must be a boolean.Since signals are wrapped in effects, use a stream to unwrap the signal so we can use it directly in
toBeInterrupted.interruptWhen
Merge the "interruptable" stream with a stream that sets the interrupt signal to true.
Yes, this is a silly example, because interruption is really only useful if it is criteria based.
interruptAfter
interrupts based on a delay-based criteria. So this is not silly.
Now we can run this, first interrupting immediately:
then interrupting after a 6 seconds, which may gives 5 or 6 results:
We could generalize this more to any trigger wrapped in an effect and when "triggered," interrupts the stream. We could even use another "signal" based on the first approach. If you want to be able to interrupt a stream in a repl that is running asynchronously in the background, we can setup a small effect, compose it into the stream, then "run" the effect in the repl to terminate a stream running in the background.
INSERT COOL EXAMPLE HERE
Last updated