Operations
On this page
In this section, we'll explore some essential operations you can perform on streams. These operations allow you to manipulate and interact with stream elements in various ways.
Tapping
Tapping is an operation that involves running an effect on each emission of the stream. It allows you to observe each element, perform some effectful operation, and discard the result of this observation. Importantly, the Stream.tap
operation does not alter the elements of the stream, and it does not affect the return type of the stream.
For instance, you can use Stream.tap
to print each element of a stream:
ts
import {Stream ,Console ,Effect } from "effect"conststream =Stream .make (1, 2, 3).pipe (Stream .tap ((n ) =>Console .log (`before mapping: ${n }`)),Stream .map ((n ) =>n * 2),Stream .tap ((n ) =>Console .log (`after mapping: ${n }`)))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:before mapping: 1after mapping: 2before mapping: 2after mapping: 4before mapping: 3after mapping: 6{_id: "Chunk",values: [ 2, 4, 6 ]}*/
ts
import {Stream ,Console ,Effect } from "effect"conststream =Stream .make (1, 2, 3).pipe (Stream .tap ((n ) =>Console .log (`before mapping: ${n }`)),Stream .map ((n ) =>n * 2),Stream .tap ((n ) =>Console .log (`after mapping: ${n }`)))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:before mapping: 1after mapping: 2before mapping: 2after mapping: 4before mapping: 3after mapping: 6{_id: "Chunk",values: [ 2, 4, 6 ]}*/
Taking Elements
Another essential operation is taking elements, which allows you to extract a specific number of elements from a stream. Here are several ways to achieve this:
take
. To extract a fixed number of elements.takeWhile
. To extract elements until a certain condition is met.takeUntil
. To extract elements until a specific condition is met.takeRight
. To extract a specified number of elements from the end.
ts
import {Stream ,Effect } from "effect"conststream =Stream .iterate (0, (n ) =>n + 1)// Using `take` to extract a fixed number of elements:consts1 =Stream .take (stream , 5)Effect .runPromise (Stream .runCollect (s1 )).then (console .log )/*Output:{_id: "Chunk",values: [ 0, 1, 2, 3, 4 ]}*/// Using `takeWhile` to extract elements until a certain condition is met:consts2 =Stream .takeWhile (stream , (n ) =>n < 5)Effect .runPromise (Stream .runCollect (s2 )).then (console .log )/*Output:{_id: "Chunk",values: [ 0, 1, 2, 3, 4 ]}*/// Using `takeUntil` to extract elements until a specific condition is met:consts3 =Stream .takeUntil (stream , (n ) =>n === 5)Effect .runPromise (Stream .runCollect (s3 )).then (console .log )/*Output:{_id: "Chunk",values: [ 0, 1, 2, 3, 4, 5 ]}*/// Using `takeRight` to extract a specified number of elements from the end:consts4 =Stream .takeRight (s3 , 3)Effect .runPromise (Stream .runCollect (s4 )).then (console .log )/*Output:{_id: "Chunk",values: [ 3, 4, 5 ]}*/
ts
import {Stream ,Effect } from "effect"conststream =Stream .iterate (0, (n ) =>n + 1)// Using `take` to extract a fixed number of elements:consts1 =Stream .take (stream , 5)Effect .runPromise (Stream .runCollect (s1 )).then (console .log )/*Output:{_id: "Chunk",values: [ 0, 1, 2, 3, 4 ]}*/// Using `takeWhile` to extract elements until a certain condition is met:consts2 =Stream .takeWhile (stream , (n ) =>n < 5)Effect .runPromise (Stream .runCollect (s2 )).then (console .log )/*Output:{_id: "Chunk",values: [ 0, 1, 2, 3, 4 ]}*/// Using `takeUntil` to extract elements until a specific condition is met:consts3 =Stream .takeUntil (stream , (n ) =>n === 5)Effect .runPromise (Stream .runCollect (s3 )).then (console .log )/*Output:{_id: "Chunk",values: [ 0, 1, 2, 3, 4, 5 ]}*/// Using `takeRight` to extract a specified number of elements from the end:consts4 =Stream .takeRight (s3 , 3)Effect .runPromise (Stream .runCollect (s4 )).then (console .log )/*Output:{_id: "Chunk",values: [ 3, 4, 5 ]}*/
Exploring Streams as an Alternative to Async Iterables
When working with asynchronous data sources, like async iterables, you often need to consume data in a loop until a certain condition is met. This tutorial introduces how you can achieve similar behavior using Streams in a beginner-friendly manner.
In async iterables, data consumption can continue in a loop until a break or return statement is encountered. To replicate this behavior with Streams, you have a couple of options:
-
Stream.takeUntil: This function allows you to take elements from a stream until a specified condition evaluates to true. It's akin to breaking out of a loop in async iterables when a certain condition is met.
-
Stream.toPull: The
Stream.toPull
function is another way to replicate looping through async iterables. It returns an effect that repeatedly pulls data chunks from the stream. This effect can fail withNone
when the stream is finished or withSome
error if it fails.
Let's take a closer look at the second option, Stream.toPull
.
ts
import {Stream ,Effect } from "effect"// Simulate a chunked streamconststream =Stream .fromIterable ([1, 2, 3, 4, 5]).pipe (Stream .rechunk (2))constprogram =Effect .gen (function* (_ ) {// Create an effect to get data chunks from the streamconstgetChunk = yield*_ (Stream .toPull (stream ))// Continuously fetch and process chunkswhile (true) {constchunk = yield*_ (getChunk )console .log (chunk )}})Effect .runPromise (Effect .scoped (program )).then (console .log ,console .error )/*Output:{ _id: 'Chunk', values: [ 1, 2 ] }{ _id: 'Chunk', values: [ 3, 4 ] }{ _id: 'Chunk', values: [ 5 ] }{_id: 'FiberFailure',cause: {_id: 'Cause',_tag: 'Fail',failure: { _id: 'Option', _tag: 'None' }}}*/
ts
import {Stream ,Effect } from "effect"// Simulate a chunked streamconststream =Stream .fromIterable ([1, 2, 3, 4, 5]).pipe (Stream .rechunk (2))constprogram =Effect .gen (function* (_ ) {// Create an effect to get data chunks from the streamconstgetChunk = yield*_ (Stream .toPull (stream ))// Continuously fetch and process chunkswhile (true) {constchunk = yield*_ (getChunk )console .log (chunk )}})Effect .runPromise (Effect .scoped (program )).then (console .log ,console .error )/*Output:{ _id: 'Chunk', values: [ 1, 2 ] }{ _id: 'Chunk', values: [ 3, 4 ] }{ _id: 'Chunk', values: [ 5 ] }{_id: 'FiberFailure',cause: {_id: 'Cause',_tag: 'Fail',failure: { _id: 'Option', _tag: 'None' }}}*/
In this example, we're using Stream.toPull
to repeatedly pull data chunks from the stream
. The code enters a loop and continues to fetch and display chunks until there's no more data left to process.
Mapping
In this section, we'll explore how to transform elements within a stream using the Stream.map
family of operations. These operations allow you to apply a function to each element of the stream, producing a new stream with the transformed values.
Basic Mapping
The Stream.map
operation applies a given function to all elements of the stream, creating another stream with the transformed values. Let's illustrate this with an example:
ts
import {Stream ,Effect } from "effect"conststream =Stream .make (1, 2, 3).pipe (Stream .map ((n ) =>n + 1))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 2, 3, 4 ]}*/
ts
import {Stream ,Effect } from "effect"conststream =Stream .make (1, 2, 3).pipe (Stream .map ((n ) =>n + 1))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 2, 3, 4 ]}*/
Effectful Mapping
If your transformation involves effects, you can use Stream.mapEffect
instead. It allows you to apply an effectful function to each element of the stream, producing a new stream with effectful results:
ts
import {Stream ,Random ,Effect } from "effect"conststream =Stream .make (10, 20, 30).pipe (Stream .mapEffect ((n ) =>Random .nextIntBetween (0,n )))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 6, 13, 5 ]}*/
ts
import {Stream ,Random ,Effect } from "effect"conststream =Stream .make (10, 20, 30).pipe (Stream .mapEffect ((n ) =>Random .nextIntBetween (0,n )))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 6, 13, 5 ]}*/
You can evaluate effects concurrently using the concurrency
option. It allows you to specify the number of concurrent running effects. The results are emitted downstream in the original order.
Let's write a simple page downloader that fetches URLs concurrently:
ts
import {Stream ,Effect } from "effect"constgetUrls =Effect .succeed (["url0", "url1", "url2"])constfetchUrl = (url : string) =>Effect .succeed ([`Resource 0-${url }`,`Resource 1-${url }`,`Resource 2-${url }`])conststream =Stream .fromIterableEffect (getUrls ).pipe (Stream .mapEffect (fetchUrl , {concurrency : 4 }))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [[ "Resource 0-url0", "Resource 1-url0", "Resource 2-url0" ], [ "Resource 0-url1", "Resource 1-url1","Resource 2-url1" ], [ "Resource 0-url2", "Resource 1-url2", "Resource 2-url2" ]]}*/
ts
import {Stream ,Effect } from "effect"constgetUrls =Effect .succeed (["url0", "url1", "url2"])constfetchUrl = (url : string) =>Effect .succeed ([`Resource 0-${url }`,`Resource 1-${url }`,`Resource 2-${url }`])conststream =Stream .fromIterableEffect (getUrls ).pipe (Stream .mapEffect (fetchUrl , {concurrency : 4 }))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [[ "Resource 0-url0", "Resource 1-url0", "Resource 2-url0" ], [ "Resource 0-url1", "Resource 1-url1","Resource 2-url1" ], [ "Resource 0-url2", "Resource 1-url2", "Resource 2-url2" ]]}*/
Stateful Mapping
The Stream.mapAccum
operation is similar to map
, but it transforms elements statefully and allows you to map and accumulate in a single operation. Let's see how you can use it to calculate the running total of an input stream:
ts
import {Stream ,Effect } from "effect"construnningTotal = (stream :Stream .Stream <number>):Stream .Stream <number> =>stream .pipe (Stream .mapAccum (0, (s ,a ) => [s +a ,s +a ]))// input: 0, 1, 2, 3, 4, 5Effect .runPromise (Stream .runCollect (runningTotal (Stream .range (0, 6)))).then (console .log )/*Output:{_id: "Chunk",values: [ 0, 1, 3, 6, 10, 15 ]}*/
ts
import {Stream ,Effect } from "effect"construnningTotal = (stream :Stream .Stream <number>):Stream .Stream <number> =>stream .pipe (Stream .mapAccum (0, (s ,a ) => [s +a ,s +a ]))// input: 0, 1, 2, 3, 4, 5Effect .runPromise (Stream .runCollect (runningTotal (Stream .range (0, 6)))).then (console .log )/*Output:{_id: "Chunk",values: [ 0, 1, 3, 6, 10, 15 ]}*/
Mapping and Flattening
The Stream.mapConcat
operation is akin to Stream.map
, but it takes things a step further. It maps each element to zero or more elements of type Iterable
and then flattens the entire stream. Let's illustrate this with an example:
ts
import {Stream ,Effect } from "effect"constnumbers =Stream .make ("1-2-3", "4-5", "6").pipe (Stream .mapConcat ((s ) =>s .split ("-")),Stream .map ((s ) =>parseInt (s )))Effect .runPromise (Stream .runCollect (numbers )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 3, 4, 5, 6 ]}*/
ts
import {Stream ,Effect } from "effect"constnumbers =Stream .make ("1-2-3", "4-5", "6").pipe (Stream .mapConcat ((s ) =>s .split ("-")),Stream .map ((s ) =>parseInt (s )))Effect .runPromise (Stream .runCollect (numbers )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 3, 4, 5, 6 ]}*/
In this example, we take a stream of strings like "1-2-3"
and split them into individual numbers, resulting in a flattened stream of integers.
Mapping to a Constant Value
The Stream.as
method allows you to map the success values of a stream to a specified constant value. This can be handy when you want to transform elements into a uniform value. Here's an example where we map all elements to the null
value:
ts
import {Stream ,Effect } from "effect"conststream =Stream .range (1, 5).pipe (Stream .as (null))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ null, null, null, null ]}*/
ts
import {Stream ,Effect } from "effect"conststream =Stream .range (1, 5).pipe (Stream .as (null))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ null, null, null, null ]}*/
In this case, regardless of the original values in the stream, we've mapped them all to null
.
Filtering
The Stream.filter
operation is like a sieve that lets through elements that meet a specified condition. Think of it as a way to sift through a stream and keep only the elements that satisfy the given criteria. Here's an example:
ts
import {Stream ,Effect } from "effect"conststream =Stream .range (1, 11).pipe (Stream .filter ((n ) =>n % 2 === 0))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 2, 4, 6, 8, 10 ]}*/
ts
import {Stream ,Effect } from "effect"conststream =Stream .range (1, 11).pipe (Stream .filter ((n ) =>n % 2 === 0))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 2, 4, 6, 8, 10 ]}*/
In this example, we start with a stream of numbers from 1 to 10 and use Stream.filter
to retain only the even numbers (those that satisfy the condition n % 2 === 0
). The result is a filtered stream containing the even numbers from the original stream.
Scanning
In this section, we'll explore the concept of stream scanning. Scans are similar to folds, but they provide a historical perspective. Like folds, scans also involve a binary operator and an initial value. However, what makes scans unique is that they emit every intermediate result as part of the stream.
ts
import {Stream ,Effect } from "effect"conststream =Stream .range (1, 6).pipe (Stream .scan (0, (a ,b ) =>a +b ))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 0, 1, 3, 6, 10, 15 ]}*/
ts
import {Stream ,Effect } from "effect"conststream =Stream .range (1, 6).pipe (Stream .scan (0, (a ,b ) =>a +b ))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 0, 1, 3, 6, 10, 15 ]}*/
In this example, we have a stream of numbers from 1 to 5, and we use Stream.scan
to perform a cumulative addition starting from an initial value of 0. The result is a stream that emits the accumulated sum at each step: 0, 1, 3, 6, 10, and 15.
Streams scans provide a way to keep a historical record of your stream transformations, which can be invaluable for various applications.
Additionally, if you only need the final result of the scan, you can use Stream.runFold
:
ts
import {Stream ,Effect } from "effect"constfold =Stream .range (1, 6).pipe (Stream .runFold (0, (a ,b ) =>a +b ))Effect .runPromise (fold ).then (console .log ) // Output: 15
ts
import {Stream ,Effect } from "effect"constfold =Stream .range (1, 6).pipe (Stream .runFold (0, (a ,b ) =>a +b ))Effect .runPromise (fold ).then (console .log ) // Output: 15
In this case, Stream.runFold
gives you the final accumulated value, which is 15 in this example.
Draining
In this section, we'll explore the concept of stream draining. Imagine you have a stream filled with effectful operations, but you're not interested in the values they produce; instead, you want to execute these effects and discard the results. This is where the Stream.drain
function comes into play.
Let's go through a few examples:
Example 1: Discarding Values
ts
import {Stream ,Effect } from "effect"// We create a stream and immediately drain it.consts1 =Stream .range (1, 6).pipe (Stream .drain )Effect .runPromise (Stream .runCollect (s1 )).then (console .log )/*Output:{_id: "Chunk",values: []}*/
ts
import {Stream ,Effect } from "effect"// We create a stream and immediately drain it.consts1 =Stream .range (1, 6).pipe (Stream .drain )Effect .runPromise (Stream .runCollect (s1 )).then (console .log )/*Output:{_id: "Chunk",values: []}*/
In this example, we have a stream with values from 1 to 5, but we use Stream.drain
to discard these values. As a result, the output stream is empty.
Example 2: Executing Random Effects
ts
import {Stream ,Effect ,Random } from "effect"consts2 =Stream .repeatEffect (Effect .gen (function* (_ ) {constnextInt = yield*_ (Random .nextInt )constnumber =Math .abs (nextInt % 10)console .log (`random number: ${number }`)returnnumber })).pipe (Stream .take (3))Effect .runPromise (Stream .runCollect (s2 )).then (console .log )/*Output:random number: 4random number: 2random number: 7{_id: "Chunk",values: [ 4, 2, 7 ]}*/consts3 =Stream .drain (s2 )Effect .runPromise (Stream .runCollect (s3 )).then (console .log )/*random number: 1random number: 6random number: 0Output:{_id: "Chunk",values: []}*/
ts
import {Stream ,Effect ,Random } from "effect"consts2 =Stream .repeatEffect (Effect .gen (function* (_ ) {constnextInt = yield*_ (Random .nextInt )constnumber =Math .abs (nextInt % 10)console .log (`random number: ${number }`)returnnumber })).pipe (Stream .take (3))Effect .runPromise (Stream .runCollect (s2 )).then (console .log )/*Output:random number: 4random number: 2random number: 7{_id: "Chunk",values: [ 4, 2, 7 ]}*/consts3 =Stream .drain (s2 )Effect .runPromise (Stream .runCollect (s3 )).then (console .log )/*random number: 1random number: 6random number: 0Output:{_id: "Chunk",values: []}*/
In this example, we create a stream with random effects and collect the values of these effects initially. Later, we use Stream.drain
to execute the same effects without collecting the values. This demonstrates how you can use draining to trigger effectful operations when you're not interested in the emitted values.
Stream draining can be particularly useful when you need to perform certain actions or cleanup tasks in your application without affecting the main stream of data.
Detecting Changes in a Stream
In this section, we'll explore the Stream.changes
operation, which allows you to detect and emit elements that are different from their preceding elements within the stream.
ts
import {Stream ,Effect } from "effect"conststream =Stream .make (1, 1, 1, 2, 2, 3, 4).pipe (Stream .changes )Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 3, 4 ]}*/
ts
import {Stream ,Effect } from "effect"conststream =Stream .make (1, 1, 1, 2, 2, 3, 4).pipe (Stream .changes )Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 3, 4 ]}*/
Zipping
Zipping is a process of combining two or more streams to create a new stream by pairing elements from the input streams. We can achieve this using the Stream.zip
and Stream.zipWith
operators. Let's dive into some examples:
ts
import {Stream ,Effect } from "effect"// We create two streams and zip them together.consts1 =Stream .zip (Stream .make (1, 2, 3, 4, 5, 6),Stream .make ("a", "b", "c"))Effect .runPromise (Stream .runCollect (s1 )).then (console .log )/*Output:{_id: "Chunk",values: [[ 1, "a" ], [ 2, "b" ], [ 3, "c" ]]}*/// We create two streams and zip them with custom logic.consts2 =Stream .zipWith (Stream .make (1, 2, 3, 4, 5, 6),Stream .make ("a", "b", "c"),(n ,s ) => [n -s .length ,s ])Effect .runPromise (Stream .runCollect (s2 )).then (console .log )/*Output:{_id: "Chunk",values: [[ 0, "a" ], [ 1, "b" ], [ 2, "c" ]]}*/
ts
import {Stream ,Effect } from "effect"// We create two streams and zip them together.consts1 =Stream .zip (Stream .make (1, 2, 3, 4, 5, 6),Stream .make ("a", "b", "c"))Effect .runPromise (Stream .runCollect (s1 )).then (console .log )/*Output:{_id: "Chunk",values: [[ 1, "a" ], [ 2, "b" ], [ 3, "c" ]]}*/// We create two streams and zip them with custom logic.consts2 =Stream .zipWith (Stream .make (1, 2, 3, 4, 5, 6),Stream .make ("a", "b", "c"),(n ,s ) => [n -s .length ,s ])Effect .runPromise (Stream .runCollect (s2 )).then (console .log )/*Output:{_id: "Chunk",values: [[ 0, "a" ], [ 1, "b" ], [ 2, "c" ]]}*/
The new stream will end when one of the streams ends.
Handling Stream Endings
When one of the input streams ends before the other, you might need to zip with default values. The Stream.zipAll
and Stream.zipAllWith
operations allow you to specify default values for both sides to handle such scenarios. Let's see an example:
ts
import {Stream ,Effect } from "effect"consts1 =Stream .zipAll (Stream .make (1, 2, 3, 4, 5, 6), {other :Stream .make ("a", "b", "c"),defaultSelf : 0,defaultOther : "x"})Effect .runPromise (Stream .runCollect (s1 )).then (console .log )/*Output:{_id: "Chunk",values: [[ 1, "a" ], [ 2, "b" ], [ 3, "c" ], [ 4, "x" ], [ 5, "x" ], [ 6, "x" ]]}*/consts2 =Stream .zipAllWith (Stream .make (1, 2, 3, 4, 5, 6), {other :Stream .make ("a", "b", "c"),onSelf : (n ) => [n , "x"],onOther : (s ) => [0,s ],onBoth : (n ,s ) => [n -s .length ,s ]})Effect .runPromise (Stream .runCollect (s2 )).then (console .log )/*Output:{_id: "Chunk",values: [[ 0, "a" ], [ 1, "b" ], [ 2, "c" ], [ 4, "x" ], [ 5, "x" ], [ 6, "x" ]]}*/
ts
import {Stream ,Effect } from "effect"consts1 =Stream .zipAll (Stream .make (1, 2, 3, 4, 5, 6), {other :Stream .make ("a", "b", "c"),defaultSelf : 0,defaultOther : "x"})Effect .runPromise (Stream .runCollect (s1 )).then (console .log )/*Output:{_id: "Chunk",values: [[ 1, "a" ], [ 2, "b" ], [ 3, "c" ], [ 4, "x" ], [ 5, "x" ], [ 6, "x" ]]}*/consts2 =Stream .zipAllWith (Stream .make (1, 2, 3, 4, 5, 6), {other :Stream .make ("a", "b", "c"),onSelf : (n ) => [n , "x"],onOther : (s ) => [0,s ],onBoth : (n ,s ) => [n -s .length ,s ]})Effect .runPromise (Stream .runCollect (s2 )).then (console .log )/*Output:{_id: "Chunk",values: [[ 0, "a" ], [ 1, "b" ], [ 2, "c" ], [ 4, "x" ], [ 5, "x" ], [ 6, "x" ]]}*/
This allows you to handle zipping when one stream completes earlier than the other.
Zipping Streams at Different Rates
Sometimes, you might have two streams producing elements at different speeds. If you don't want to wait for the slower one when zipping elements, you can use Stream.zipLatest
or Stream.zipLatestWith
. These operations combine elements in a way that when a value is emitted by either of the two streams, it is combined with the latest value from the other stream to produce a result. Here's an example:
ts
import {Stream ,Schedule ,Effect } from "effect"consts1 =Stream .make (1, 2, 3).pipe (Stream .schedule (Schedule .spaced ("1 seconds")))consts2 =Stream .make ("a", "b", "c", "d").pipe (Stream .schedule (Schedule .spaced ("500 millis")))conststream =Stream .zipLatest (s1 ,s2 )Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [[ 1, "a" ], [ 1, "b" ], [ 2, "b" ], [ 2, "c" ], [ 2, "d" ], [ 3, "d" ]]}*/
ts
import {Stream ,Schedule ,Effect } from "effect"consts1 =Stream .make (1, 2, 3).pipe (Stream .schedule (Schedule .spaced ("1 seconds")))consts2 =Stream .make ("a", "b", "c", "d").pipe (Stream .schedule (Schedule .spaced ("500 millis")))conststream =Stream .zipLatest (s1 ,s2 )Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [[ 1, "a" ], [ 1, "b" ], [ 2, "b" ], [ 2, "c" ], [ 2, "d" ], [ 3, "d" ]]}*/
Here, Stream.zipLatest
combines elements from both streams without waiting for the slower one, resulting in a more responsive output.
Pairing with Previous and Next Elements
-
zipWithPrevious
: This operator pairs each element of a stream with its previous element. -
zipWithNext
: It pairs each element of a stream with its next element. -
zipWithPreviousAndNext
: This operator pairs each element with both its previous and next elements.
Here's an example illustrating these operations:
ts
import {Stream } from "effect"conststream =Stream .make (1, 2, 3, 4)consts1 =Stream .zipWithPrevious (stream )consts2 =Stream .zipWithNext (stream )consts3 =Stream .zipWithPreviousAndNext (stream )
ts
import {Stream } from "effect"conststream =Stream .make (1, 2, 3, 4)consts1 =Stream .zipWithPrevious (stream )consts2 =Stream .zipWithNext (stream )consts3 =Stream .zipWithPreviousAndNext (stream )
Indexing Stream Elements
Another handy operator is Stream.zipWithIndex
, which indexes each element of a stream by pairing it with its respective index. This is especially useful when you need to keep track of the position of elements within the stream.
Here's an example of indexing elements in a stream:
ts
import {Stream ,Effect } from "effect"conststream =Stream .make ("Mary", "James", "Robert", "Patricia")constindexedStream =Stream .zipWithIndex (stream )Effect .runPromise (Stream .runCollect (indexedStream )).then (console .log )/*Output:{_id: "Chunk",values: [[ "Mary", 0 ], [ "James", 1 ], [ "Robert", 2 ], [ "Patricia", 3 ]]}*/
ts
import {Stream ,Effect } from "effect"conststream =Stream .make ("Mary", "James", "Robert", "Patricia")constindexedStream =Stream .zipWithIndex (stream )Effect .runPromise (Stream .runCollect (indexedStream )).then (console .log )/*Output:{_id: "Chunk",values: [[ "Mary", 0 ], [ "James", 1 ], [ "Robert", 2 ], [ "Patricia", 3 ]]}*/
Cartesian Product of Streams
The Stream module introduces a powerful feature: the ability to compute the Cartesian Product of two streams. This operation allows you to generate combinations of elements from two separate streams. Let's explore this concept further:
Imagine you have two sets of items, and you want to generate all possible pairs by taking one item from each set. This process is known as finding the Cartesian Product of the sets. In the context of streams, it means creating combinations of elements from two streams.
To achieve this, the Stream module provides the Stream.cross
operator, along with its variants. These operators take two streams and generate a new stream containing all possible combinations of elements from the original streams.
Here's a practical example:
ts
import {Stream ,Effect } from "effect"consts1 =Stream .make (1, 2, 3)consts2 =Stream .make ("a", "b")constproduct =Stream .cross (s1 ,s2 )Effect .runPromise (Stream .runCollect (product )).then (console .log )/*Output:{_id: "Chunk",values: [[ 1, "a" ], [ 1, "b" ], [ 2, "a" ], [ 2, "b" ], [ 3, "a" ], [ 3, "b" ]]}*/
ts
import {Stream ,Effect } from "effect"consts1 =Stream .make (1, 2, 3)consts2 =Stream .make ("a", "b")constproduct =Stream .cross (s1 ,s2 )Effect .runPromise (Stream .runCollect (product )).then (console .log )/*Output:{_id: "Chunk",values: [[ 1, "a" ], [ 1, "b" ], [ 2, "a" ], [ 2, "b" ], [ 3, "a" ], [ 3, "b" ]]}*/
It's important to note that the right-hand side stream (s2
in this case) will be iterated multiple times, once for each element in the left-hand side stream (s1
). This means that if the right-hand side stream involves expensive or side-effectful operations, they will be executed multiple times.
Partitioning
Partitioning a stream means dividing it into two separate streams based on a specified condition. The Stream module provides two helpful functions for achieving this: Stream.partition
and Stream.partitionEither
. Let's explore how these functions work and when to use them.
partition
The Stream.partition
function takes a predicate as input and splits the original stream into two substreams: one containing elements that satisfy the predicate (evaluate to true
), and the other containing elements that do not (evaluate to false
). Additionally, these substreams are wrapped within a Scope
type.
Here's an example where we partition a stream of numbers into even and odd numbers:
ts
import {Stream ,Effect } from "effect"constpartition =Stream .range (1, 10).pipe (Stream .partition ((n ) =>n % 2 === 0, {bufferSize : 5 }))Effect .runPromise (Effect .scoped (Effect .gen (function* (_ ) {const [evens ,odds ] = yield*_ (partition )console .log (yield*_ (Stream .runCollect (evens )))console .log (yield*_ (Stream .runCollect (odds )))})))/*Output:{_id: "Chunk",values: [ 2, 4, 6, 8 ]}{_id: "Chunk",values: [ 1, 3, 5, 7, 9 ]}*/
ts
import {Stream ,Effect } from "effect"constpartition =Stream .range (1, 10).pipe (Stream .partition ((n ) =>n % 2 === 0, {bufferSize : 5 }))Effect .runPromise (Effect .scoped (Effect .gen (function* (_ ) {const [evens ,odds ] = yield*_ (partition )console .log (yield*_ (Stream .runCollect (evens )))console .log (yield*_ (Stream .runCollect (odds )))})))/*Output:{_id: "Chunk",values: [ 2, 4, 6, 8 ]}{_id: "Chunk",values: [ 1, 3, 5, 7, 9 ]}*/
In this example, we use the Stream.partition
function with a predicate to split the stream into even and odd numbers. The bufferSize
option controls how much the faster stream can advance beyond the slower one.
partitionEither
Sometimes, you may need to partition a stream using an effectful predicate. For such cases, the Stream.partitionEither
function is available. This function accepts an effectful predicate and divides the stream into two substreams based on the result of the predicate: elements that yield Either.left
values go to one substream, while elements yielding Either.right
values go to the other.
Here's an example where we use Stream.partitionEither
to partition a stream of numbers based on an effectful condition:
ts
import {Stream ,Effect ,Either } from "effect"constpartition =Stream .range (1, 10).pipe (Stream .partitionEither ((n ) =>Effect .succeed (n < 5 ?Either .left (n * 2) :Either .right (n )),{bufferSize : 5 }))Effect .runPromise (Effect .scoped (Effect .gen (function* (_ ) {const [evens ,odds ] = yield*_ (partition )console .log (yield*_ (Stream .runCollect (evens )))console .log (yield*_ (Stream .runCollect (odds )))})))/*Output:{_id: "Chunk",values: [ 2, 4, 6, 8 ]}{_id: "Chunk",values: [ 5, 6, 7, 8, 9 ]}*/
ts
import {Stream ,Effect ,Either } from "effect"constpartition =Stream .range (1, 10).pipe (Stream .partitionEither ((n ) =>Effect .succeed (n < 5 ?Either .left (n * 2) :Either .right (n )),{bufferSize : 5 }))Effect .runPromise (Effect .scoped (Effect .gen (function* (_ ) {const [evens ,odds ] = yield*_ (partition )console .log (yield*_ (Stream .runCollect (evens )))console .log (yield*_ (Stream .runCollect (odds )))})))/*Output:{_id: "Chunk",values: [ 2, 4, 6, 8 ]}{_id: "Chunk",values: [ 5, 6, 7, 8, 9 ]}*/
In this case, the Stream.partitionEither
function splits the stream into two substreams: one containing values that are less than 5 (doubled using Either.left
), and the other containing values greater than or equal to 5 (using Either.right
).
GroupBy
When working with streams of data, you may often need to group elements based on certain criteria. The Stream module provides two functions for achieving this: groupByKey
and groupBy
. Let's explore how these functions work and when to use them.
groupByKey
The Stream.groupByKey
function allows you to partition a stream by a simple function of type (a: A) => K
, where A
represents the type of elements in your stream, and K
represents the keys by which the stream should be partitioned.
This function is not effectful, it simply groups elements by applying the provided function.
The Stream.groupByKey
function returns a new data type called GroupBy
.
This GroupBy
type represents a grouped stream.
To work with the groups, you can use the GroupBy.evaluate
function, which takes a function of type (key: K, stream: Stream<V, E>) => Stream.Stream<...>
. This function runs across all groups and merges them in a non-deterministic fashion.
In the example below, we use groupByKey
to group exam results by the tens place of their scores and count the number of results in each group:
ts
import {Stream ,GroupBy ,Effect ,Chunk } from "effect"classExam {constructor(readonlyperson : string,readonlyscore : number) {}}constexamResults = [newExam ("Alex", 64),newExam ("Michael", 97),newExam ("Bill", 77),newExam ("John", 78),newExam ("Bobby", 71)]constgroupByKeyResult =Stream .fromIterable (examResults ).pipe (Stream .groupByKey ((exam ) =>Math .floor (exam .score / 10) * 10))conststream =GroupBy .evaluate (groupByKeyResult , (key ,stream ) =>Stream .fromEffect (Stream .runCollect (stream ).pipe (Effect .map ((chunk ) => [key ,Chunk .size (chunk )] asconst ))))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [[ 60, 1 ], [ 90, 1 ], [ 70, 3 ]]}*/
ts
import {Stream ,GroupBy ,Effect ,Chunk } from "effect"classExam {constructor(readonlyperson : string,readonlyscore : number) {}}constexamResults = [newExam ("Alex", 64),newExam ("Michael", 97),newExam ("Bill", 77),newExam ("John", 78),newExam ("Bobby", 71)]constgroupByKeyResult =Stream .fromIterable (examResults ).pipe (Stream .groupByKey ((exam ) =>Math .floor (exam .score / 10) * 10))conststream =GroupBy .evaluate (groupByKeyResult , (key ,stream ) =>Stream .fromEffect (Stream .runCollect (stream ).pipe (Effect .map ((chunk ) => [key ,Chunk .size (chunk )] asconst ))))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [[ 60, 1 ], [ 90, 1 ], [ 70, 3 ]]}*/
In this example, we partition the exam results into groups based on the tens place of their scores (e.g., 60, 90, 70). The groupByKey
function is ideal for simple, non-effectful partitioning.
groupBy
In more complex scenarios where partitioning involves effects, you can turn to the Stream.groupBy
function. This function takes an effectful partitioning function and generates a GroupBy
data type, representing a grouped stream. You can then use GroupBy.evaluate
in a similar fashion as before to process the groups.
In the following example, we group names by their first character and count the number of names in each group. Note that the partitioning operation itself is simulated as effectful:
ts
import {Stream ,GroupBy ,Effect ,Chunk } from "effect"constgroupByKeyResult =Stream .fromIterable (["Mary","James","Robert","Patricia","John","Jennifer","Rebecca","Peter"]).pipe (Stream .groupBy ((name ) =>Effect .succeed ([name .substring (0, 1),name ])))conststream =GroupBy .evaluate (groupByKeyResult , (key ,stream ) =>Stream .fromEffect (Stream .runCollect (stream ).pipe (Effect .map ((chunk ) => [key ,Chunk .size (chunk )] asconst ))))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [[ "M", 1 ], [ "J", 3 ], [ "R", 2 ], [ "P", 2 ]]}*/
ts
import {Stream ,GroupBy ,Effect ,Chunk } from "effect"constgroupByKeyResult =Stream .fromIterable (["Mary","James","Robert","Patricia","John","Jennifer","Rebecca","Peter"]).pipe (Stream .groupBy ((name ) =>Effect .succeed ([name .substring (0, 1),name ])))conststream =GroupBy .evaluate (groupByKeyResult , (key ,stream ) =>Stream .fromEffect (Stream .runCollect (stream ).pipe (Effect .map ((chunk ) => [key ,Chunk .size (chunk )] asconst ))))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [[ "M", 1 ], [ "J", 3 ], [ "R", 2 ], [ "P", 2 ]]}*/
Grouping
When working with streams, you may encounter situations where you need to group elements in a more structured manner. The Stream module provides two helpful functions for achieving this: grouped
and groupedWithin
. In this section, we'll explore how these functions work and when to use them.
grouped
The Stream.grouped
function is perfect for partitioning stream results into chunks of a specified size. It's especially useful when you want to work with data in smaller, more manageable pieces.
Here's an example that demonstrates the use of Stream.grouped
:
ts
import {Stream ,Effect } from "effect"conststream =Stream .range (0, 8).pipe (Stream .grouped (3))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [{_id: "Chunk",values: [ 0, 1, 2 ]}, {_id: "Chunk",values: [ 3, 4, 5 ]}, {_id: "Chunk",values: [ 6, 7 ]}]}*/
ts
import {Stream ,Effect } from "effect"conststream =Stream .range (0, 8).pipe (Stream .grouped (3))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [{_id: "Chunk",values: [ 0, 1, 2 ]}, {_id: "Chunk",values: [ 3, 4, 5 ]}, {_id: "Chunk",values: [ 6, 7 ]}]}*/
In this example, we take a stream of numbers from 0 to 9 and use Stream.grouped(3)
to divide it into chunks of size 3.
groupedWithin
The Stream.groupedWithin
function provides more flexibility by allowing you to group events based on time intervals or chunk size, whichever condition is satisfied first. This is particularly useful when you want to group data based on time constraints.
ts
import {Stream ,Schedule ,Effect } from "effect"conststream =Stream .range (0, 10).pipe (Stream .repeat (Schedule .spaced ("1 seconds")),Stream .groupedWithin (18, "1.5 seconds"),Stream .take (3))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [{_id: "Chunk",values: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7 ]}, {_id: "Chunk",values: [ 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ]}, {_id: "Chunk",values: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7 ]}]}*/
ts
import {Stream ,Schedule ,Effect } from "effect"conststream =Stream .range (0, 10).pipe (Stream .repeat (Schedule .spaced ("1 seconds")),Stream .groupedWithin (18, "1.5 seconds"),Stream .take (3))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [{_id: "Chunk",values: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7 ]}, {_id: "Chunk",values: [ 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ]}, {_id: "Chunk",values: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7 ]}]}*/
In this example, we use Stream.groupedWithin(18, "1.5 seconds")
to create chunks of data. The grouping operation occurs either when 18 elements are reached or when 1.5 seconds have passed since the last chunk was created. This is particularly useful when dealing with time-sensitive data or when you want to control the chunk size dynamically.
Concatenation
In stream processing, there are scenarios where you may want to combine the contents of multiple streams. The Stream module provides several operators for achieving this, including Stream.concat
, Stream.concatAll
, and Stream.flatMap
. Let's explore these operators and understand how to use them effectively.
Simple Concatenation
The Stream.concat
operator is a straightforward way to concatenate two streams. It returns a new stream that emits elements from the left-hand stream followed by elements from the right-hand stream. This is useful when you want to combine two streams in a sequential manner.
Here's an example of using Stream.concat
:
ts
import {Stream ,Effect } from "effect"consts1 =Stream .make (1, 2, 3)consts2 =Stream .make (4, 5)conststream =Stream .concat (s1 ,s2 )Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 3, 4, 5 ]}*/
ts
import {Stream ,Effect } from "effect"consts1 =Stream .make (1, 2, 3)consts2 =Stream .make (4, 5)conststream =Stream .concat (s1 ,s2 )Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 3, 4, 5 ]}*/
Concatenating Multiple Streams
Sometimes you may have multiple streams that you want to concatenate together. Instead of manually chaining Stream.concat
operations, you can use Stream.concatAll
to concatenate a Chunk
of streams.
Here's an example:
ts
import {Stream ,Effect ,Chunk } from "effect"consts1 =Stream .make (1, 2, 3)consts2 =Stream .make (4, 5)consts3 =Stream .make (6, 7, 8)conststream =Stream .concatAll (Chunk .make (s1 ,s2 ,s3 ))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 3, 4, 5, 6, 7, 8 ]}*/
ts
import {Stream ,Effect ,Chunk } from "effect"consts1 =Stream .make (1, 2, 3)consts2 =Stream .make (4, 5)consts3 =Stream .make (6, 7, 8)conststream =Stream .concatAll (Chunk .make (s1 ,s2 ,s3 ))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 3, 4, 5, 6, 7, 8 ]}*/
Advanced Concatenation with flatMap
The Stream.flatMap
operator allows you to create a stream whose elements are generated by applying a function of type (a: A) => Stream<...>
to each output of the source stream. It concatenates all of the results.
Here's an example of using Stream.flatMap
:
ts
import {Stream ,Effect } from "effect"conststream =Stream .make (1, 2, 3).pipe (Stream .flatMap ((a ) =>Stream .repeatValue (a ).pipe (Stream .take (4))))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3 ]}*/
ts
import {Stream ,Effect } from "effect"conststream =Stream .make (1, 2, 3).pipe (Stream .flatMap ((a ) =>Stream .repeatValue (a ).pipe (Stream .take (4))))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3 ]}*/
If we need to do the flatMap
concurrently, we can use the concurrency
option, and also if the order of concatenation is not important for us, we can use the switch
option.
Merging
Sometimes we need to interleave the emission of two streams and create another stream. In these cases, we can't use the Stream.concat
operation because the concat operation waits for the first stream to finish and then consumes the second stream. So we need a way of picking elements from different sources. Effect Stream's merge operations does this for us. Let's discuss some variants of this operation:
merge
The Stream.merge
operation allows us to pick elements from different source streams and merge them into a single stream. Unlike Stream.concat
, which waits for the first stream to finish before moving to the second, Stream.merge
interleaves elements from both streams as they become available.
Here's an example:
ts
import {Schedule ,Stream ,Effect } from "effect"consts1 =Stream .make (1, 2, 3).pipe (Stream .schedule (Schedule .spaced ("100 millis")))consts2 =Stream .make (4, 5, 6).pipe (Stream .schedule (Schedule .spaced ("200 millis")))conststream =Stream .merge (s1 ,s2 )Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 4, 2, 3, 5, 6 ]}*/
ts
import {Schedule ,Stream ,Effect } from "effect"consts1 =Stream .make (1, 2, 3).pipe (Stream .schedule (Schedule .spaced ("100 millis")))consts2 =Stream .make (4, 5, 6).pipe (Stream .schedule (Schedule .spaced ("200 millis")))conststream =Stream .merge (s1 ,s2 )Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 4, 2, 3, 5, 6 ]}*/
Termination Strategy
When merging two streams, we should consider their termination strategy. Each stream has its own lifetime, some may finish quickly, while others may continue indefinitely. By default, when using Stream.merge
, the resulting stream terminates only when both specified streams terminate.
However, you can define the termination strategy to align with your requirements. Stream offers four different termination strategies using the haltStrategy
option:
"left"
. The resulting stream will terminate when the left-hand side stream terminates."right"
. The resulting stream will terminate when the right-hand side stream finishes."both"
. The resulting stream will terminate when both streams finish."either"
. The resulting stream will terminate when one of the streams finishes.
Here's an example of specifying a termination strategy:
ts
import {Stream ,Schedule ,Effect } from "effect"consts1 =Stream .range (1, 6).pipe (Stream .schedule (Schedule .spaced ("100 millis")))consts2 =Stream .repeatValue (0).pipe (Stream .schedule (Schedule .spaced ("200 millis")))conststream =Stream .merge (s1 ,s2 , {haltStrategy : "left" })Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 0, 2, 3, 0, 4, 5 ]}*/
ts
import {Stream ,Schedule ,Effect } from "effect"consts1 =Stream .range (1, 6).pipe (Stream .schedule (Schedule .spaced ("100 millis")))consts2 =Stream .repeatValue (0).pipe (Stream .schedule (Schedule .spaced ("200 millis")))conststream =Stream .merge (s1 ,s2 , {haltStrategy : "left" })Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 0, 2, 3, 0, 4, 5 ]}*/
In this example, we use haltStrategy: "left"
to make the resulting stream terminate when the left-hand stream (s1
) finishes.
mergeWith
In some cases, we not only want to merge two streams but also transform and unify their elements into new types. This is where Stream.mergeWith
comes into play. It allows us to specify transformation functions for both source streams.
Here's an example:
ts
import {Schedule ,Stream ,Effect } from "effect"consts1 =Stream .make ("1", "2", "3").pipe (Stream .schedule (Schedule .spaced ("100 millis")))consts2 =Stream .make (4.1, 5.3, 6.2).pipe (Stream .schedule (Schedule .spaced ("200 millis")))conststream =Stream .mergeWith (s1 ,s2 , {onSelf : (s ) =>parseInt (s ),onOther : (n ) =>Math .floor (n )})Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 4, 2, 3, 5, 6 ]}*/
ts
import {Schedule ,Stream ,Effect } from "effect"consts1 =Stream .make ("1", "2", "3").pipe (Stream .schedule (Schedule .spaced ("100 millis")))consts2 =Stream .make (4.1, 5.3, 6.2).pipe (Stream .schedule (Schedule .spaced ("200 millis")))conststream =Stream .mergeWith (s1 ,s2 , {onSelf : (s ) =>parseInt (s ),onOther : (n ) =>Math .floor (n )})Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 4, 2, 3, 5, 6 ]}*/
In this example, we use Stream.mergeWith
to merge s1
and s2
while converting string elements from s1
to integers and rounding decimal elements from s2
.
Interleaving
The Stream.interleave
operator allows us to pull one element at a time from each of two streams, creating a new interleaved stream. Once one of the streams is exhausted, the remaining values from the other stream are pulled.
Here's an example:
ts
import {Stream ,Effect } from "effect"consts1 =Stream .make (1, 2, 3)consts2 =Stream .make (4, 5, 6)conststream =Stream .interleave (s1 ,s2 )Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 4, 2, 5, 3, 6 ]}*/
ts
import {Stream ,Effect } from "effect"consts1 =Stream .make (1, 2, 3)consts2 =Stream .make (4, 5, 6)conststream =Stream .interleave (s1 ,s2 )Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 4, 2, 5, 3, 6 ]}*/
For more advanced interleaving logic, Stream.interleaveWith
provides additional flexibility. It allows you to specify the interleaving logic using a third stream of boolean
values. When the boolean stream emits true
, it chooses elements from the left-hand stream; otherwise, it selects elements from the right-hand stream.
Here's an example:
ts
import {Stream ,Effect } from "effect"consts1 =Stream .make (1, 3, 5, 7, 9)consts2 =Stream .make (2, 4, 6, 8, 10)constbooleanStream =Stream .make (true, false, false).pipe (Stream .forever )conststream =Stream .interleaveWith (s1 ,s2 ,booleanStream )Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 4, 3, 6, 8, 5, 10, 7, 9 ]}*/
ts
import {Stream ,Effect } from "effect"consts1 =Stream .make (1, 3, 5, 7, 9)consts2 =Stream .make (2, 4, 6, 8, 10)constbooleanStream =Stream .make (true, false, false).pipe (Stream .forever )conststream =Stream .interleaveWith (s1 ,s2 ,booleanStream )Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 4, 3, 6, 8, 5, 10, 7, 9 ]}*/
In this example, booleanStream
decides which source stream to choose for interleaving. When true
, it picks elements from s1
, and when false
, it selects elements from s2
.
Interspersing
Interspersing is a technique that allows you to add separators in a stream. This can be especially useful when you want to format or structure the data in your streams.
intersperse
The Stream.intersperse
operator lets you intersperse a delimiter element between the elements of a stream. This delimiter can be any value you choose. It's added between each pair of elements in the original stream.
Here's an example:
ts
import {Stream ,Effect } from "effect"conststream =Stream .make (1, 2, 3, 4, 5).pipe (Stream .intersperse (0))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 0, 2, 0, 3, 0, 4, 0, 5 ]}*/
ts
import {Stream ,Effect } from "effect"conststream =Stream .make (1, 2, 3, 4, 5).pipe (Stream .intersperse (0))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 0, 2, 0, 3, 0, 4, 0, 5 ]}*/
In this example, we have a stream stream
with numbers from 1 to 5, and we use Stream.intersperse(0)
to add zeros between them.
intersperseAffixes
For more advanced interspersing needs, Stream.intersperseAffixes
provides greater control. It allows you to specify different affixes for the start, middle, and end of your stream. These affixes can be strings or any other values you want.
Here's an example:
ts
import {Stream ,Effect } from "effect"conststream =Stream .make (1, 2, 3, 4, 5).pipe (Stream .intersperseAffixes ({start : "[",middle : "-",end : "]"}))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ "[", 1, "-", 2, "-", 3, "-", 4, "-", 5, "]" ]}*/
ts
import {Stream ,Effect } from "effect"conststream =Stream .make (1, 2, 3, 4, 5).pipe (Stream .intersperseAffixes ({start : "[",middle : "-",end : "]"}))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ "[", 1, "-", 2, "-", 3, "-", 4, "-", 5, "]" ]}*/
In this example, we use Stream.intersperseAffixes
to enclose the numbers from 1 to 5 within square brackets, separating them with hyphens.
Broadcasting
Broadcasting a stream is a way to create multiple streams that contain the same elements as the source stream. This operation allows you to send each element to multiple downstream streams simultaneously. However, the upstream stream can emit events only up to a certain limit, which is determined by the maximumLag
parameter. Once this limit is reached, the upstream stream slows down to match the speed of the slowest downstream stream.
Let's take a closer look at how broadcasting works in the following example. Here, we are broadcasting a stream of numbers to two downstream streams. One of them calculates the maximum number in the stream, while the other performs some logging with an additional delay. The upstream stream adjusts its speed based on the slower logging stream:
ts
import {Effect ,Stream ,Console ,Schedule ,Fiber } from "effect"constnumbers =Effect .scoped (Stream .range (1, 21).pipe (Stream .tap ((n ) =>Console .log (`Emit ${n } element before broadcasting`)),Stream .broadcast (2, 5),Stream .flatMap (([first ,second ]) =>Effect .gen (function* (_ ) {constfiber1 = yield*_ (Stream .runFold (first , 0, (acc ,e ) =>Math .max (acc ,e )).pipe (Effect .flatMap ((max ) =>Console .log (`Maximum: ${max }`)),Effect .fork ))constfiber2 = yield*_ (second .pipe (Stream .schedule (Schedule .spaced ("1 seconds")),Stream .runForEach ((n ) =>Console .log (`Logging to the Console: ${n }`)),Effect .fork ))yield*_ (Fiber .join (fiber1 ).pipe (Effect .zip (Fiber .join (fiber2 ), {concurrent : true })))})),Stream .runCollect ))Effect .runPromise (numbers ).then (console .log )/*Output:Emit 1 element before broadcastingEmit 2 element before broadcastingEmit 3 element before broadcastingEmit 4 element before broadcastingEmit 5 element before broadcastingEmit 6 element before broadcastingEmit 7 element before broadcastingEmit 8 element before broadcastingEmit 9 element before broadcastingEmit 10 element before broadcastingEmit 11 element before broadcastingLogging to the Console: 1Logging to the Console: 2Logging to the Console: 3Logging to the Console: 4Logging to the Console: 5Emit 12 element before broadcastingEmit 13 element before broadcastingEmit 14 element before broadcastingEmit 15 element before broadcastingEmit 16 element before broadcastingLogging to the Console: 6Logging to the Console: 7Logging to the Console: 8Logging to the Console: 9Logging to the Console: 10Emit 17 element before broadcastingEmit 18 element before broadcastingEmit 19 element before broadcastingEmit 20 element before broadcastingLogging to the Console: 11Logging to the Console: 12Logging to the Console: 13Logging to the Console: 14Logging to the Console: 15Maximum: 20Logging to the Console: 16Logging to the Console: 17Logging to the Console: 18Logging to the Console: 19Logging to the Console: 20{_id: "Chunk",values: [ undefined ]}*/
ts
import {Effect ,Stream ,Console ,Schedule ,Fiber } from "effect"constnumbers =Effect .scoped (Stream .range (1, 21).pipe (Stream .tap ((n ) =>Console .log (`Emit ${n } element before broadcasting`)),Stream .broadcast (2, 5),Stream .flatMap (([first ,second ]) =>Effect .gen (function* (_ ) {constfiber1 = yield*_ (Stream .runFold (first , 0, (acc ,e ) =>Math .max (acc ,e )).pipe (Effect .flatMap ((max ) =>Console .log (`Maximum: ${max }`)),Effect .fork ))constfiber2 = yield*_ (second .pipe (Stream .schedule (Schedule .spaced ("1 seconds")),Stream .runForEach ((n ) =>Console .log (`Logging to the Console: ${n }`)),Effect .fork ))yield*_ (Fiber .join (fiber1 ).pipe (Effect .zip (Fiber .join (fiber2 ), {concurrent : true })))})),Stream .runCollect ))Effect .runPromise (numbers ).then (console .log )/*Output:Emit 1 element before broadcastingEmit 2 element before broadcastingEmit 3 element before broadcastingEmit 4 element before broadcastingEmit 5 element before broadcastingEmit 6 element before broadcastingEmit 7 element before broadcastingEmit 8 element before broadcastingEmit 9 element before broadcastingEmit 10 element before broadcastingEmit 11 element before broadcastingLogging to the Console: 1Logging to the Console: 2Logging to the Console: 3Logging to the Console: 4Logging to the Console: 5Emit 12 element before broadcastingEmit 13 element before broadcastingEmit 14 element before broadcastingEmit 15 element before broadcastingEmit 16 element before broadcastingLogging to the Console: 6Logging to the Console: 7Logging to the Console: 8Logging to the Console: 9Logging to the Console: 10Emit 17 element before broadcastingEmit 18 element before broadcastingEmit 19 element before broadcastingEmit 20 element before broadcastingLogging to the Console: 11Logging to the Console: 12Logging to the Console: 13Logging to the Console: 14Logging to the Console: 15Maximum: 20Logging to the Console: 16Logging to the Console: 17Logging to the Console: 18Logging to the Console: 19Logging to the Console: 20{_id: "Chunk",values: [ undefined ]}*/
Buffering
Effect streams operate in a pull-based manner, which means downstream consumers can request elements at their own pace without needing to signal the upstream to slow down. However, there are scenarios where you might need to handle producers and consumers independently, especially when there's a speed mismatch between them. This is where buffering comes into play, allowing you to manage communication between a faster producer and a slower consumer effectively. Effect streams provide a built-in Stream.buffer
operator to assist with this.
buffer
The Stream.buffer
operator is designed to facilitate scenarios where a faster producer needs to work independently of a slower consumer. It achieves this by buffering elements in a queue, allowing the producer to continue working even if the consumer lags behind. You can specify the maximum buffer capacity using the capacity
option.
Let's walk through an example to see how it works:
ts
import {Stream ,Console ,Schedule ,Effect } from "effect"conststream =Stream .range (1, 11).pipe (Stream .tap ((n ) =>Console .log (`before buffering: ${n }`)),Stream .buffer ({capacity : 4 }),Stream .tap ((n ) =>Console .log (`after buffering: ${n }`)),Stream .schedule (Schedule .spaced ("5 seconds")))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:before buffering: 1before buffering: 2before buffering: 3before buffering: 4before buffering: 5before buffering: 6after buffering: 1after buffering: 2before buffering: 7after buffering: 3before buffering: 8after buffering: 4before buffering: 9after buffering: 5before buffering: 10...*/
ts
import {Stream ,Console ,Schedule ,Effect } from "effect"conststream =Stream .range (1, 11).pipe (Stream .tap ((n ) =>Console .log (`before buffering: ${n }`)),Stream .buffer ({capacity : 4 }),Stream .tap ((n ) =>Console .log (`after buffering: ${n }`)),Stream .schedule (Schedule .spaced ("5 seconds")))Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:before buffering: 1before buffering: 2before buffering: 3before buffering: 4before buffering: 5before buffering: 6after buffering: 1after buffering: 2before buffering: 7after buffering: 3before buffering: 8after buffering: 4before buffering: 9after buffering: 5before buffering: 10...*/
In this example, we create a stream of numbers from 1 to 11. We use Stream.buffer({ capacity: 4 })
to buffer up to 4 elements at a time. As you can see, the Stream.tap
operator allows us to log each element before and after buffering. We've also introduced a 5-second delay between each emission to illustrate the lag between producing and consuming messages.
You can choose from different buffering options based on the type of underlying queue you need:
- Bounded Queue:
{ capacity: number }
- Unbounded Queue:
{ capacity: "unbounded" }
- Sliding Queue:
{ capacity: number, strategy: "sliding" }
- Dropping Queue:
{ capacity: number, strategy: "dropping" }
Debouncing
The Stream.debounce
function plays a crucial role in controlling the rate at which elements are emitted. It introduces a minimum time interval between the emission of each element. This ensures that elements are emitted at a more controlled pace, especially when dealing with rapid or frequent emissions.
ts
import {Stream ,Effect } from "effect"conststream =Stream .make (1, 2, 3).pipe (Stream .concat (Stream .fromEffect (Effect .sleep ("500 millis"))),Stream .concat (Stream .make (4, 5)),Stream .concat (Stream .fromEffect (Effect .sleep ("10 millis"))),Stream .concat (Stream .make (6)),Stream .debounce ("100 millis") // Emit only after a pause of at least 100 ms)Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 3, 6 ]}*/
ts
import {Stream ,Effect } from "effect"conststream =Stream .make (1, 2, 3).pipe (Stream .concat (Stream .fromEffect (Effect .sleep ("500 millis"))),Stream .concat (Stream .make (4, 5)),Stream .concat (Stream .fromEffect (Effect .sleep ("10 millis"))),Stream .concat (Stream .make (6)),Stream .debounce ("100 millis") // Emit only after a pause of at least 100 ms)Effect .runPromise (Stream .runCollect (stream )).then (console .log )/*Output:{_id: "Chunk",values: [ 3, 6 ]}*/
In this example, we have a stream that emits elements at varying intervals. Some elements are emitted rapidly, while others are separated by pauses of different durations. We apply debouncing with a minimum pause requirement of 100 milliseconds using Stream.debounce("100 millis")
.
The result is that only elements that follow a pause of at least 100 milliseconds are emitted. This means that elements 1, 2, 4, and 5 are effectively skipped because they are emitted too close together. Only elements 3 and 6, which have a pause of at least 100 milliseconds before them, are emitted.