Operations

On this page

In this section, we'll explore some essential operations you can perform on streams. These operations allow you to manipulate and interact with stream elements in various ways.

Tapping

Tapping is an operation that involves running an effect on each emission of the stream. It allows you to observe each element, perform some effectful operation, and discard the result of this observation. Importantly, the Stream.tap operation does not alter the elements of the stream, and it does not affect the return type of the stream.

For instance, you can use Stream.tap to print each element of a stream:

ts
import { Stream, Console, Effect } from "effect"
 
const stream = Stream.make(1, 2, 3).pipe(
Stream.tap((n) => Console.log(`before mapping: ${n}`)),
Stream.map((n) => n * 2),
Stream.tap((n) => Console.log(`after mapping: ${n}`))
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
before mapping: 1
after mapping: 2
before mapping: 2
after mapping: 4
before mapping: 3
after mapping: 6
{
_id: "Chunk",
values: [ 2, 4, 6 ]
}
*/
ts
import { Stream, Console, Effect } from "effect"
 
const stream = Stream.make(1, 2, 3).pipe(
Stream.tap((n) => Console.log(`before mapping: ${n}`)),
Stream.map((n) => n * 2),
Stream.tap((n) => Console.log(`after mapping: ${n}`))
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
before mapping: 1
after mapping: 2
before mapping: 2
after mapping: 4
before mapping: 3
after mapping: 6
{
_id: "Chunk",
values: [ 2, 4, 6 ]
}
*/

Taking Elements

Another essential operation is taking elements, which allows you to extract a specific number of elements from a stream. Here are several ways to achieve this:

  • take. To extract a fixed number of elements.
  • takeWhile. To extract elements until a certain condition is met.
  • takeUntil. To extract elements until a specific condition is met.
  • takeRight. To extract a specified number of elements from the end.
ts
import { Stream, Effect } from "effect"
 
const stream = Stream.iterate(0, (n) => n + 1)
 
// Using `take` to extract a fixed number of elements:
const s1 = Stream.take(stream, 5)
Effect.runPromise(Stream.runCollect(s1)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 0, 1, 2, 3, 4 ]
}
*/
 
// Using `takeWhile` to extract elements until a certain condition is met:
const s2 = Stream.takeWhile(stream, (n) => n < 5)
Effect.runPromise(Stream.runCollect(s2)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 0, 1, 2, 3, 4 ]
}
*/
 
// Using `takeUntil` to extract elements until a specific condition is met:
const s3 = Stream.takeUntil(stream, (n) => n === 5)
Effect.runPromise(Stream.runCollect(s3)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 0, 1, 2, 3, 4, 5 ]
}
*/
 
// Using `takeRight` to extract a specified number of elements from the end:
const s4 = Stream.takeRight(s3, 3)
Effect.runPromise(Stream.runCollect(s4)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 3, 4, 5 ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const stream = Stream.iterate(0, (n) => n + 1)
 
// Using `take` to extract a fixed number of elements:
const s1 = Stream.take(stream, 5)
Effect.runPromise(Stream.runCollect(s1)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 0, 1, 2, 3, 4 ]
}
*/
 
// Using `takeWhile` to extract elements until a certain condition is met:
const s2 = Stream.takeWhile(stream, (n) => n < 5)
Effect.runPromise(Stream.runCollect(s2)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 0, 1, 2, 3, 4 ]
}
*/
 
// Using `takeUntil` to extract elements until a specific condition is met:
const s3 = Stream.takeUntil(stream, (n) => n === 5)
Effect.runPromise(Stream.runCollect(s3)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 0, 1, 2, 3, 4, 5 ]
}
*/
 
// Using `takeRight` to extract a specified number of elements from the end:
const s4 = Stream.takeRight(s3, 3)
Effect.runPromise(Stream.runCollect(s4)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 3, 4, 5 ]
}
*/

Exploring Streams as an Alternative to Async Iterables

When working with asynchronous data sources, like async iterables, you often need to consume data in a loop until a certain condition is met. This tutorial introduces how you can achieve similar behavior using Streams in a beginner-friendly manner.

In async iterables, data consumption can continue in a loop until a break or return statement is encountered. To replicate this behavior with Streams, you have a couple of options:

  1. Stream.takeUntil: This function allows you to take elements from a stream until a specified condition evaluates to true. It's akin to breaking out of a loop in async iterables when a certain condition is met.

  2. Stream.toPull: The Stream.toPull function is another way to replicate looping through async iterables. It returns an effect that repeatedly pulls data chunks from the stream. This effect can fail with None when the stream is finished or with Some error if it fails.

Let's take a closer look at the second option, Stream.toPull.

ts
import { Stream, Effect } from "effect"
 
// Simulate a chunked stream
const stream = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(Stream.rechunk(2))
 
const program = Effect.gen(function* (_) {
// Create an effect to get data chunks from the stream
const getChunk = yield* _(Stream.toPull(stream))
 
// Continuously fetch and process chunks
while (true) {
const chunk = yield* _(getChunk)
console.log(chunk)
}
})
 
Effect.runPromise(Effect.scoped(program)).then(console.log, console.error)
/*
Output:
{ _id: 'Chunk', values: [ 1, 2 ] }
{ _id: 'Chunk', values: [ 3, 4 ] }
{ _id: 'Chunk', values: [ 5 ] }
{
_id: 'FiberFailure',
cause: {
_id: 'Cause',
_tag: 'Fail',
failure: { _id: 'Option', _tag: 'None' }
}
}
*/
ts
import { Stream, Effect } from "effect"
 
// Simulate a chunked stream
const stream = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(Stream.rechunk(2))
 
const program = Effect.gen(function* (_) {
// Create an effect to get data chunks from the stream
const getChunk = yield* _(Stream.toPull(stream))
 
// Continuously fetch and process chunks
while (true) {
const chunk = yield* _(getChunk)
console.log(chunk)
}
})
 
Effect.runPromise(Effect.scoped(program)).then(console.log, console.error)
/*
Output:
{ _id: 'Chunk', values: [ 1, 2 ] }
{ _id: 'Chunk', values: [ 3, 4 ] }
{ _id: 'Chunk', values: [ 5 ] }
{
_id: 'FiberFailure',
cause: {
_id: 'Cause',
_tag: 'Fail',
failure: { _id: 'Option', _tag: 'None' }
}
}
*/

In this example, we're using Stream.toPull to repeatedly pull data chunks from the stream. The code enters a loop and continues to fetch and display chunks until there's no more data left to process.

Mapping

In this section, we'll explore how to transform elements within a stream using the Stream.map family of operations. These operations allow you to apply a function to each element of the stream, producing a new stream with the transformed values.

Basic Mapping

The Stream.map operation applies a given function to all elements of the stream, creating another stream with the transformed values. Let's illustrate this with an example:

ts
import { Stream, Effect } from "effect"
 
const stream = Stream.make(1, 2, 3).pipe(Stream.map((n) => n + 1))
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 2, 3, 4 ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const stream = Stream.make(1, 2, 3).pipe(Stream.map((n) => n + 1))
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 2, 3, 4 ]
}
*/

Effectful Mapping

If your transformation involves effects, you can use Stream.mapEffect instead. It allows you to apply an effectful function to each element of the stream, producing a new stream with effectful results:

ts
import { Stream, Random, Effect } from "effect"
 
const stream = Stream.make(10, 20, 30).pipe(
Stream.mapEffect((n) => Random.nextIntBetween(0, n))
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 6, 13, 5 ]
}
*/
ts
import { Stream, Random, Effect } from "effect"
 
const stream = Stream.make(10, 20, 30).pipe(
Stream.mapEffect((n) => Random.nextIntBetween(0, n))
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 6, 13, 5 ]
}
*/

You can evaluate effects concurrently using the concurrency option. It allows you to specify the number of concurrent running effects. The results are emitted downstream in the original order.

Let's write a simple page downloader that fetches URLs concurrently:

ts
import { Stream, Effect } from "effect"
 
const getUrls = Effect.succeed(["url0", "url1", "url2"])
 
const fetchUrl = (url: string) =>
Effect.succeed([
`Resource 0-${url}`,
`Resource 1-${url}`,
`Resource 2-${url}`
])
 
const stream = Stream.fromIterableEffect(getUrls).pipe(
Stream.mapEffect(fetchUrl, { concurrency: 4 })
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ "Resource 0-url0", "Resource 1-url0", "Resource 2-url0" ], [ "Resource 0-url1", "Resource 1-url1",
"Resource 2-url1" ], [ "Resource 0-url2", "Resource 1-url2", "Resource 2-url2" ]
]
}
*/
ts
import { Stream, Effect } from "effect"
 
const getUrls = Effect.succeed(["url0", "url1", "url2"])
 
const fetchUrl = (url: string) =>
Effect.succeed([
`Resource 0-${url}`,
`Resource 1-${url}`,
`Resource 2-${url}`
])
 
const stream = Stream.fromIterableEffect(getUrls).pipe(
Stream.mapEffect(fetchUrl, { concurrency: 4 })
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ "Resource 0-url0", "Resource 1-url0", "Resource 2-url0" ], [ "Resource 0-url1", "Resource 1-url1",
"Resource 2-url1" ], [ "Resource 0-url2", "Resource 1-url2", "Resource 2-url2" ]
]
}
*/

Stateful Mapping

The Stream.mapAccum operation is similar to map, but it transforms elements statefully and allows you to map and accumulate in a single operation. Let's see how you can use it to calculate the running total of an input stream:

ts
import { Stream, Effect } from "effect"
 
const runningTotal = (stream: Stream.Stream<number>): Stream.Stream<number> =>
stream.pipe(Stream.mapAccum(0, (s, a) => [s + a, s + a]))
 
// input: 0, 1, 2, 3, 4, 5
Effect.runPromise(Stream.runCollect(runningTotal(Stream.range(0, 6)))).then(
console.log
)
/*
Output:
{
_id: "Chunk",
values: [ 0, 1, 3, 6, 10, 15 ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const runningTotal = (stream: Stream.Stream<number>): Stream.Stream<number> =>
stream.pipe(Stream.mapAccum(0, (s, a) => [s + a, s + a]))
 
// input: 0, 1, 2, 3, 4, 5
Effect.runPromise(Stream.runCollect(runningTotal(Stream.range(0, 6)))).then(
console.log
)
/*
Output:
{
_id: "Chunk",
values: [ 0, 1, 3, 6, 10, 15 ]
}
*/

Mapping and Flattening

The Stream.mapConcat operation is akin to Stream.map, but it takes things a step further. It maps each element to zero or more elements of type Iterable and then flattens the entire stream. Let's illustrate this with an example:

ts
import { Stream, Effect } from "effect"
 
const numbers = Stream.make("1-2-3", "4-5", "6").pipe(
Stream.mapConcat((s) => s.split("-")),
Stream.map((s) => parseInt(s))
)
 
Effect.runPromise(Stream.runCollect(numbers)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3, 4, 5, 6 ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const numbers = Stream.make("1-2-3", "4-5", "6").pipe(
Stream.mapConcat((s) => s.split("-")),
Stream.map((s) => parseInt(s))
)
 
Effect.runPromise(Stream.runCollect(numbers)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3, 4, 5, 6 ]
}
*/

In this example, we take a stream of strings like "1-2-3" and split them into individual numbers, resulting in a flattened stream of integers.

Mapping to a Constant Value

The Stream.as method allows you to map the success values of a stream to a specified constant value. This can be handy when you want to transform elements into a uniform value. Here's an example where we map all elements to the null value:

ts
import { Stream, Effect } from "effect"
 
const stream = Stream.range(1, 5).pipe(Stream.as(null))
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ null, null, null, null ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const stream = Stream.range(1, 5).pipe(Stream.as(null))
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ null, null, null, null ]
}
*/

In this case, regardless of the original values in the stream, we've mapped them all to null.

Filtering

The Stream.filter operation is like a sieve that lets through elements that meet a specified condition. Think of it as a way to sift through a stream and keep only the elements that satisfy the given criteria. Here's an example:

ts
import { Stream, Effect } from "effect"
 
const stream = Stream.range(1, 11).pipe(Stream.filter((n) => n % 2 === 0))
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 2, 4, 6, 8, 10 ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const stream = Stream.range(1, 11).pipe(Stream.filter((n) => n % 2 === 0))
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 2, 4, 6, 8, 10 ]
}
*/

In this example, we start with a stream of numbers from 1 to 10 and use Stream.filter to retain only the even numbers (those that satisfy the condition n % 2 === 0). The result is a filtered stream containing the even numbers from the original stream.

Scanning

In this section, we'll explore the concept of stream scanning. Scans are similar to folds, but they provide a historical perspective. Like folds, scans also involve a binary operator and an initial value. However, what makes scans unique is that they emit every intermediate result as part of the stream.

ts
import { Stream, Effect } from "effect"
 
const stream = Stream.range(1, 6).pipe(Stream.scan(0, (a, b) => a + b))
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 0, 1, 3, 6, 10, 15 ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const stream = Stream.range(1, 6).pipe(Stream.scan(0, (a, b) => a + b))
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 0, 1, 3, 6, 10, 15 ]
}
*/

In this example, we have a stream of numbers from 1 to 5, and we use Stream.scan to perform a cumulative addition starting from an initial value of 0. The result is a stream that emits the accumulated sum at each step: 0, 1, 3, 6, 10, and 15.

Streams scans provide a way to keep a historical record of your stream transformations, which can be invaluable for various applications.

Additionally, if you only need the final result of the scan, you can use Stream.runFold:

ts
import { Stream, Effect } from "effect"
 
const fold = Stream.range(1, 6).pipe(Stream.runFold(0, (a, b) => a + b))
 
Effect.runPromise(fold).then(console.log) // Output: 15
ts
import { Stream, Effect } from "effect"
 
const fold = Stream.range(1, 6).pipe(Stream.runFold(0, (a, b) => a + b))
 
Effect.runPromise(fold).then(console.log) // Output: 15

In this case, Stream.runFold gives you the final accumulated value, which is 15 in this example.

Draining

In this section, we'll explore the concept of stream draining. Imagine you have a stream filled with effectful operations, but you're not interested in the values they produce; instead, you want to execute these effects and discard the results. This is where the Stream.drain function comes into play.

Let's go through a few examples:

Example 1: Discarding Values

ts
import { Stream, Effect } from "effect"
 
// We create a stream and immediately drain it.
const s1 = Stream.range(1, 6).pipe(Stream.drain)
 
Effect.runPromise(Stream.runCollect(s1)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: []
}
*/
ts
import { Stream, Effect } from "effect"
 
// We create a stream and immediately drain it.
const s1 = Stream.range(1, 6).pipe(Stream.drain)
 
Effect.runPromise(Stream.runCollect(s1)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: []
}
*/

In this example, we have a stream with values from 1 to 5, but we use Stream.drain to discard these values. As a result, the output stream is empty.

Example 2: Executing Random Effects

ts
import { Stream, Effect, Random } from "effect"
 
const s2 = Stream.repeatEffect(
Effect.gen(function* (_) {
const nextInt = yield* _(Random.nextInt)
const number = Math.abs(nextInt % 10)
console.log(`random number: ${number}`)
return number
})
).pipe(Stream.take(3))
 
Effect.runPromise(Stream.runCollect(s2)).then(console.log)
/*
Output:
random number: 4
random number: 2
random number: 7
{
_id: "Chunk",
values: [ 4, 2, 7 ]
}
*/
 
const s3 = Stream.drain(s2)
 
Effect.runPromise(Stream.runCollect(s3)).then(console.log)
/*
random number: 1
random number: 6
random number: 0
Output:
{
_id: "Chunk",
values: []
}
*/
ts
import { Stream, Effect, Random } from "effect"
 
const s2 = Stream.repeatEffect(
Effect.gen(function* (_) {
const nextInt = yield* _(Random.nextInt)
const number = Math.abs(nextInt % 10)
console.log(`random number: ${number}`)
return number
})
).pipe(Stream.take(3))
 
Effect.runPromise(Stream.runCollect(s2)).then(console.log)
/*
Output:
random number: 4
random number: 2
random number: 7
{
_id: "Chunk",
values: [ 4, 2, 7 ]
}
*/
 
const s3 = Stream.drain(s2)
 
Effect.runPromise(Stream.runCollect(s3)).then(console.log)
/*
random number: 1
random number: 6
random number: 0
Output:
{
_id: "Chunk",
values: []
}
*/

In this example, we create a stream with random effects and collect the values of these effects initially. Later, we use Stream.drain to execute the same effects without collecting the values. This demonstrates how you can use draining to trigger effectful operations when you're not interested in the emitted values.

Stream draining can be particularly useful when you need to perform certain actions or cleanup tasks in your application without affecting the main stream of data.

Detecting Changes in a Stream

In this section, we'll explore the Stream.changes operation, which allows you to detect and emit elements that are different from their preceding elements within the stream.

ts
import { Stream, Effect } from "effect"
 
const stream = Stream.make(1, 1, 1, 2, 2, 3, 4).pipe(Stream.changes)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3, 4 ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const stream = Stream.make(1, 1, 1, 2, 2, 3, 4).pipe(Stream.changes)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3, 4 ]
}
*/

Zipping

Zipping is a process of combining two or more streams to create a new stream by pairing elements from the input streams. We can achieve this using the Stream.zip and Stream.zipWith operators. Let's dive into some examples:

ts
import { Stream, Effect } from "effect"
 
// We create two streams and zip them together.
const s1 = Stream.zip(
Stream.make(1, 2, 3, 4, 5, 6),
Stream.make("a", "b", "c")
)
 
Effect.runPromise(Stream.runCollect(s1)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ 1, "a" ], [ 2, "b" ], [ 3, "c" ]
]
}
*/
 
// We create two streams and zip them with custom logic.
const s2 = Stream.zipWith(
Stream.make(1, 2, 3, 4, 5, 6),
Stream.make("a", "b", "c"),
(n, s) => [n - s.length, s]
)
 
Effect.runPromise(Stream.runCollect(s2)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ 0, "a" ], [ 1, "b" ], [ 2, "c" ]
]
}
*/
ts
import { Stream, Effect } from "effect"
 
// We create two streams and zip them together.
const s1 = Stream.zip(
Stream.make(1, 2, 3, 4, 5, 6),
Stream.make("a", "b", "c")
)
 
Effect.runPromise(Stream.runCollect(s1)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ 1, "a" ], [ 2, "b" ], [ 3, "c" ]
]
}
*/
 
// We create two streams and zip them with custom logic.
const s2 = Stream.zipWith(
Stream.make(1, 2, 3, 4, 5, 6),
Stream.make("a", "b", "c"),
(n, s) => [n - s.length, s]
)
 
Effect.runPromise(Stream.runCollect(s2)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ 0, "a" ], [ 1, "b" ], [ 2, "c" ]
]
}
*/

The new stream will end when one of the streams ends.

Handling Stream Endings

When one of the input streams ends before the other, you might need to zip with default values. The Stream.zipAll and Stream.zipAllWith operations allow you to specify default values for both sides to handle such scenarios. Let's see an example:

ts
import { Stream, Effect } from "effect"
 
const s1 = Stream.zipAll(Stream.make(1, 2, 3, 4, 5, 6), {
other: Stream.make("a", "b", "c"),
defaultSelf: 0,
defaultOther: "x"
})
 
Effect.runPromise(Stream.runCollect(s1)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ 1, "a" ], [ 2, "b" ], [ 3, "c" ], [ 4, "x" ], [ 5, "x" ], [ 6, "x" ]
]
}
*/
 
const s2 = Stream.zipAllWith(Stream.make(1, 2, 3, 4, 5, 6), {
other: Stream.make("a", "b", "c"),
onSelf: (n) => [n, "x"],
onOther: (s) => [0, s],
onBoth: (n, s) => [n - s.length, s]
})
 
Effect.runPromise(Stream.runCollect(s2)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ 0, "a" ], [ 1, "b" ], [ 2, "c" ], [ 4, "x" ], [ 5, "x" ], [ 6, "x" ]
]
}
*/
ts
import { Stream, Effect } from "effect"
 
const s1 = Stream.zipAll(Stream.make(1, 2, 3, 4, 5, 6), {
other: Stream.make("a", "b", "c"),
defaultSelf: 0,
defaultOther: "x"
})
 
Effect.runPromise(Stream.runCollect(s1)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ 1, "a" ], [ 2, "b" ], [ 3, "c" ], [ 4, "x" ], [ 5, "x" ], [ 6, "x" ]
]
}
*/
 
const s2 = Stream.zipAllWith(Stream.make(1, 2, 3, 4, 5, 6), {
other: Stream.make("a", "b", "c"),
onSelf: (n) => [n, "x"],
onOther: (s) => [0, s],
onBoth: (n, s) => [n - s.length, s]
})
 
Effect.runPromise(Stream.runCollect(s2)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ 0, "a" ], [ 1, "b" ], [ 2, "c" ], [ 4, "x" ], [ 5, "x" ], [ 6, "x" ]
]
}
*/

This allows you to handle zipping when one stream completes earlier than the other.

Zipping Streams at Different Rates

Sometimes, you might have two streams producing elements at different speeds. If you don't want to wait for the slower one when zipping elements, you can use Stream.zipLatest or Stream.zipLatestWith. These operations combine elements in a way that when a value is emitted by either of the two streams, it is combined with the latest value from the other stream to produce a result. Here's an example:

ts
import { Stream, Schedule, Effect } from "effect"
 
const s1 = Stream.make(1, 2, 3).pipe(
Stream.schedule(Schedule.spaced("1 seconds"))
)
 
const s2 = Stream.make("a", "b", "c", "d").pipe(
Stream.schedule(Schedule.spaced("500 millis"))
)
 
const stream = Stream.zipLatest(s1, s2)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ 1, "a" ], [ 1, "b" ], [ 2, "b" ], [ 2, "c" ], [ 2, "d" ], [ 3, "d" ]
]
}
*/
ts
import { Stream, Schedule, Effect } from "effect"
 
const s1 = Stream.make(1, 2, 3).pipe(
Stream.schedule(Schedule.spaced("1 seconds"))
)
 
const s2 = Stream.make("a", "b", "c", "d").pipe(
Stream.schedule(Schedule.spaced("500 millis"))
)
 
const stream = Stream.zipLatest(s1, s2)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ 1, "a" ], [ 1, "b" ], [ 2, "b" ], [ 2, "c" ], [ 2, "d" ], [ 3, "d" ]
]
}
*/

Here, Stream.zipLatest combines elements from both streams without waiting for the slower one, resulting in a more responsive output.

Pairing with Previous and Next Elements

  • zipWithPrevious: This operator pairs each element of a stream with its previous element.

  • zipWithNext: It pairs each element of a stream with its next element.

  • zipWithPreviousAndNext: This operator pairs each element with both its previous and next elements.

Here's an example illustrating these operations:

ts
import { Stream } from "effect"
 
const stream = Stream.make(1, 2, 3, 4)
 
const s1 = Stream.zipWithPrevious(stream)
 
const s2 = Stream.zipWithNext(stream)
 
const s3 = Stream.zipWithPreviousAndNext(stream)
ts
import { Stream } from "effect"
 
const stream = Stream.make(1, 2, 3, 4)
 
const s1 = Stream.zipWithPrevious(stream)
 
const s2 = Stream.zipWithNext(stream)
 
const s3 = Stream.zipWithPreviousAndNext(stream)

Indexing Stream Elements

Another handy operator is Stream.zipWithIndex, which indexes each element of a stream by pairing it with its respective index. This is especially useful when you need to keep track of the position of elements within the stream.

Here's an example of indexing elements in a stream:

ts
import { Stream, Effect } from "effect"
 
const stream = Stream.make("Mary", "James", "Robert", "Patricia")
 
const indexedStream = Stream.zipWithIndex(stream)
 
Effect.runPromise(Stream.runCollect(indexedStream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ "Mary", 0 ], [ "James", 1 ], [ "Robert", 2 ], [ "Patricia", 3 ]
]
}
*/
ts
import { Stream, Effect } from "effect"
 
const stream = Stream.make("Mary", "James", "Robert", "Patricia")
 
const indexedStream = Stream.zipWithIndex(stream)
 
Effect.runPromise(Stream.runCollect(indexedStream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ "Mary", 0 ], [ "James", 1 ], [ "Robert", 2 ], [ "Patricia", 3 ]
]
}
*/

Cartesian Product of Streams

The Stream module introduces a powerful feature: the ability to compute the Cartesian Product of two streams. This operation allows you to generate combinations of elements from two separate streams. Let's explore this concept further:

Imagine you have two sets of items, and you want to generate all possible pairs by taking one item from each set. This process is known as finding the Cartesian Product of the sets. In the context of streams, it means creating combinations of elements from two streams.

To achieve this, the Stream module provides the Stream.cross operator, along with its variants. These operators take two streams and generate a new stream containing all possible combinations of elements from the original streams.

Here's a practical example:

ts
import { Stream, Effect } from "effect"
 
const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make("a", "b")
 
const product = Stream.cross(s1, s2)
 
Effect.runPromise(Stream.runCollect(product)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ 1, "a" ], [ 1, "b" ], [ 2, "a" ], [ 2, "b" ], [ 3, "a" ], [ 3, "b" ]
]
}
*/
ts
import { Stream, Effect } from "effect"
 
const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make("a", "b")
 
const product = Stream.cross(s1, s2)
 
Effect.runPromise(Stream.runCollect(product)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ 1, "a" ], [ 1, "b" ], [ 2, "a" ], [ 2, "b" ], [ 3, "a" ], [ 3, "b" ]
]
}
*/

It's important to note that the right-hand side stream (s2 in this case) will be iterated multiple times, once for each element in the left-hand side stream (s1). This means that if the right-hand side stream involves expensive or side-effectful operations, they will be executed multiple times.

Partitioning

Partitioning a stream means dividing it into two separate streams based on a specified condition. The Stream module provides two helpful functions for achieving this: Stream.partition and Stream.partitionEither. Let's explore how these functions work and when to use them.

partition

The Stream.partition function takes a predicate as input and splits the original stream into two substreams: one containing elements that satisfy the predicate (evaluate to true), and the other containing elements that do not (evaluate to false). Additionally, these substreams are wrapped within a Scope type.

Here's an example where we partition a stream of numbers into even and odd numbers:

ts
import { Stream, Effect } from "effect"
 
const partition = Stream.range(1, 10).pipe(
Stream.partition((n) => n % 2 === 0, { bufferSize: 5 })
)
 
Effect.runPromise(
Effect.scoped(
Effect.gen(function* (_) {
const [evens, odds] = yield* _(partition)
console.log(yield* _(Stream.runCollect(evens)))
console.log(yield* _(Stream.runCollect(odds)))
})
)
)
/*
Output:
{
_id: "Chunk",
values: [ 2, 4, 6, 8 ]
}
{
_id: "Chunk",
values: [ 1, 3, 5, 7, 9 ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const partition = Stream.range(1, 10).pipe(
Stream.partition((n) => n % 2 === 0, { bufferSize: 5 })
)
 
Effect.runPromise(
Effect.scoped(
Effect.gen(function* (_) {
const [evens, odds] = yield* _(partition)
console.log(yield* _(Stream.runCollect(evens)))
console.log(yield* _(Stream.runCollect(odds)))
})
)
)
/*
Output:
{
_id: "Chunk",
values: [ 2, 4, 6, 8 ]
}
{
_id: "Chunk",
values: [ 1, 3, 5, 7, 9 ]
}
*/

In this example, we use the Stream.partition function with a predicate to split the stream into even and odd numbers. The bufferSize option controls how much the faster stream can advance beyond the slower one.

partitionEither

Sometimes, you may need to partition a stream using an effectful predicate. For such cases, the Stream.partitionEither function is available. This function accepts an effectful predicate and divides the stream into two substreams based on the result of the predicate: elements that yield Either.left values go to one substream, while elements yielding Either.right values go to the other.

Here's an example where we use Stream.partitionEither to partition a stream of numbers based on an effectful condition:

ts
import { Stream, Effect, Either } from "effect"
 
const partition = Stream.range(1, 10).pipe(
Stream.partitionEither(
(n) => Effect.succeed(n < 5 ? Either.left(n * 2) : Either.right(n)),
{ bufferSize: 5 }
)
)
 
Effect.runPromise(
Effect.scoped(
Effect.gen(function* (_) {
const [evens, odds] = yield* _(partition)
console.log(yield* _(Stream.runCollect(evens)))
console.log(yield* _(Stream.runCollect(odds)))
})
)
)
/*
Output:
{
_id: "Chunk",
values: [ 2, 4, 6, 8 ]
}
{
_id: "Chunk",
values: [ 5, 6, 7, 8, 9 ]
}
*/
ts
import { Stream, Effect, Either } from "effect"
 
const partition = Stream.range(1, 10).pipe(
Stream.partitionEither(
(n) => Effect.succeed(n < 5 ? Either.left(n * 2) : Either.right(n)),
{ bufferSize: 5 }
)
)
 
Effect.runPromise(
Effect.scoped(
Effect.gen(function* (_) {
const [evens, odds] = yield* _(partition)
console.log(yield* _(Stream.runCollect(evens)))
console.log(yield* _(Stream.runCollect(odds)))
})
)
)
/*
Output:
{
_id: "Chunk",
values: [ 2, 4, 6, 8 ]
}
{
_id: "Chunk",
values: [ 5, 6, 7, 8, 9 ]
}
*/

In this case, the Stream.partitionEither function splits the stream into two substreams: one containing values that are less than 5 (doubled using Either.left), and the other containing values greater than or equal to 5 (using Either.right).

GroupBy

When working with streams of data, you may often need to group elements based on certain criteria. The Stream module provides two functions for achieving this: groupByKey and groupBy. Let's explore how these functions work and when to use them.

groupByKey

The Stream.groupByKey function allows you to partition a stream by a simple function of type (a: A) => K, where A represents the type of elements in your stream, and K represents the keys by which the stream should be partitioned. This function is not effectful, it simply groups elements by applying the provided function.

The Stream.groupByKey function returns a new data type called GroupBy. This GroupBy type represents a grouped stream. To work with the groups, you can use the GroupBy.evaluate function, which takes a function of type (key: K, stream: Stream<V, E>) => Stream.Stream<...>. This function runs across all groups and merges them in a non-deterministic fashion.

In the example below, we use groupByKey to group exam results by the tens place of their scores and count the number of results in each group:

ts
import { Stream, GroupBy, Effect, Chunk } from "effect"
 
class Exam {
constructor(
readonly person: string,
readonly score: number
) {}
}
 
const examResults = [
new Exam("Alex", 64),
new Exam("Michael", 97),
new Exam("Bill", 77),
new Exam("John", 78),
new Exam("Bobby", 71)
]
 
const groupByKeyResult = Stream.fromIterable(examResults).pipe(
Stream.groupByKey((exam) => Math.floor(exam.score / 10) * 10)
)
 
const stream = GroupBy.evaluate(groupByKeyResult, (key, stream) =>
Stream.fromEffect(
Stream.runCollect(stream).pipe(
Effect.map((chunk) => [key, Chunk.size(chunk)] as const)
)
)
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ 60, 1 ], [ 90, 1 ], [ 70, 3 ]
]
}
*/
ts
import { Stream, GroupBy, Effect, Chunk } from "effect"
 
class Exam {
constructor(
readonly person: string,
readonly score: number
) {}
}
 
const examResults = [
new Exam("Alex", 64),
new Exam("Michael", 97),
new Exam("Bill", 77),
new Exam("John", 78),
new Exam("Bobby", 71)
]
 
const groupByKeyResult = Stream.fromIterable(examResults).pipe(
Stream.groupByKey((exam) => Math.floor(exam.score / 10) * 10)
)
 
const stream = GroupBy.evaluate(groupByKeyResult, (key, stream) =>
Stream.fromEffect(
Stream.runCollect(stream).pipe(
Effect.map((chunk) => [key, Chunk.size(chunk)] as const)
)
)
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ 60, 1 ], [ 90, 1 ], [ 70, 3 ]
]
}
*/

In this example, we partition the exam results into groups based on the tens place of their scores (e.g., 60, 90, 70). The groupByKey function is ideal for simple, non-effectful partitioning.

groupBy

In more complex scenarios where partitioning involves effects, you can turn to the Stream.groupBy function. This function takes an effectful partitioning function and generates a GroupBy data type, representing a grouped stream. You can then use GroupBy.evaluate in a similar fashion as before to process the groups.

In the following example, we group names by their first character and count the number of names in each group. Note that the partitioning operation itself is simulated as effectful:

ts
import { Stream, GroupBy, Effect, Chunk } from "effect"
 
const groupByKeyResult = Stream.fromIterable([
"Mary",
"James",
"Robert",
"Patricia",
"John",
"Jennifer",
"Rebecca",
"Peter"
]).pipe(
Stream.groupBy((name) => Effect.succeed([name.substring(0, 1), name]))
)
 
const stream = GroupBy.evaluate(groupByKeyResult, (key, stream) =>
Stream.fromEffect(
Stream.runCollect(stream).pipe(
Effect.map((chunk) => [key, Chunk.size(chunk)] as const)
)
)
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ "M", 1 ], [ "J", 3 ], [ "R", 2 ], [ "P", 2 ]
]
}
*/
ts
import { Stream, GroupBy, Effect, Chunk } from "effect"
 
const groupByKeyResult = Stream.fromIterable([
"Mary",
"James",
"Robert",
"Patricia",
"John",
"Jennifer",
"Rebecca",
"Peter"
]).pipe(
Stream.groupBy((name) => Effect.succeed([name.substring(0, 1), name]))
)
 
const stream = GroupBy.evaluate(groupByKeyResult, (key, stream) =>
Stream.fromEffect(
Stream.runCollect(stream).pipe(
Effect.map((chunk) => [key, Chunk.size(chunk)] as const)
)
)
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
[ "M", 1 ], [ "J", 3 ], [ "R", 2 ], [ "P", 2 ]
]
}
*/

Grouping

When working with streams, you may encounter situations where you need to group elements in a more structured manner. The Stream module provides two helpful functions for achieving this: grouped and groupedWithin. In this section, we'll explore how these functions work and when to use them.

grouped

The Stream.grouped function is perfect for partitioning stream results into chunks of a specified size. It's especially useful when you want to work with data in smaller, more manageable pieces.

Here's an example that demonstrates the use of Stream.grouped:

ts
import { Stream, Effect } from "effect"
 
const stream = Stream.range(0, 8).pipe(Stream.grouped(3))
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
{
_id: "Chunk",
values: [ 0, 1, 2 ]
}, {
_id: "Chunk",
values: [ 3, 4, 5 ]
}, {
_id: "Chunk",
values: [ 6, 7 ]
}
]
}
*/
ts
import { Stream, Effect } from "effect"
 
const stream = Stream.range(0, 8).pipe(Stream.grouped(3))
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
{
_id: "Chunk",
values: [ 0, 1, 2 ]
}, {
_id: "Chunk",
values: [ 3, 4, 5 ]
}, {
_id: "Chunk",
values: [ 6, 7 ]
}
]
}
*/

In this example, we take a stream of numbers from 0 to 9 and use Stream.grouped(3) to divide it into chunks of size 3.

groupedWithin

The Stream.groupedWithin function provides more flexibility by allowing you to group events based on time intervals or chunk size, whichever condition is satisfied first. This is particularly useful when you want to group data based on time constraints.

ts
import { Stream, Schedule, Effect } from "effect"
 
const stream = Stream.range(0, 10).pipe(
Stream.repeat(Schedule.spaced("1 seconds")),
Stream.groupedWithin(18, "1.5 seconds"),
Stream.take(3)
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
{
_id: "Chunk",
values: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7 ]
}, {
_id: "Chunk",
values: [ 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
}, {
_id: "Chunk",
values: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7 ]
}
]
}
*/
ts
import { Stream, Schedule, Effect } from "effect"
 
const stream = Stream.range(0, 10).pipe(
Stream.repeat(Schedule.spaced("1 seconds")),
Stream.groupedWithin(18, "1.5 seconds"),
Stream.take(3)
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
{
_id: "Chunk",
values: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7 ]
}, {
_id: "Chunk",
values: [ 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
}, {
_id: "Chunk",
values: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7 ]
}
]
}
*/

In this example, we use Stream.groupedWithin(18, "1.5 seconds") to create chunks of data. The grouping operation occurs either when 18 elements are reached or when 1.5 seconds have passed since the last chunk was created. This is particularly useful when dealing with time-sensitive data or when you want to control the chunk size dynamically.

Concatenation

In stream processing, there are scenarios where you may want to combine the contents of multiple streams. The Stream module provides several operators for achieving this, including Stream.concat, Stream.concatAll, and Stream.flatMap. Let's explore these operators and understand how to use them effectively.

Simple Concatenation

The Stream.concat operator is a straightforward way to concatenate two streams. It returns a new stream that emits elements from the left-hand stream followed by elements from the right-hand stream. This is useful when you want to combine two streams in a sequential manner.

Here's an example of using Stream.concat:

ts
import { Stream, Effect } from "effect"
 
const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make(4, 5)
 
const stream = Stream.concat(s1, s2)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3, 4, 5 ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make(4, 5)
 
const stream = Stream.concat(s1, s2)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3, 4, 5 ]
}
*/

Concatenating Multiple Streams

Sometimes you may have multiple streams that you want to concatenate together. Instead of manually chaining Stream.concat operations, you can use Stream.concatAll to concatenate a Chunk of streams.

Here's an example:

ts
import { Stream, Effect, Chunk } from "effect"
 
const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make(4, 5)
const s3 = Stream.make(6, 7, 8)
 
const stream = Stream.concatAll(Chunk.make(s1, s2, s3))
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3, 4, 5, 6, 7, 8 ]
}
*/
ts
import { Stream, Effect, Chunk } from "effect"
 
const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make(4, 5)
const s3 = Stream.make(6, 7, 8)
 
const stream = Stream.concatAll(Chunk.make(s1, s2, s3))
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3, 4, 5, 6, 7, 8 ]
}
*/

Advanced Concatenation with flatMap

The Stream.flatMap operator allows you to create a stream whose elements are generated by applying a function of type (a: A) => Stream<...> to each output of the source stream. It concatenates all of the results.

Here's an example of using Stream.flatMap:

ts
import { Stream, Effect } from "effect"
 
const stream = Stream.make(1, 2, 3).pipe(
Stream.flatMap((a) => Stream.repeatValue(a).pipe(Stream.take(4)))
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3 ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const stream = Stream.make(1, 2, 3).pipe(
Stream.flatMap((a) => Stream.repeatValue(a).pipe(Stream.take(4)))
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3 ]
}
*/

If we need to do the flatMap concurrently, we can use the concurrency option, and also if the order of concatenation is not important for us, we can use the switch option.

Merging

Sometimes we need to interleave the emission of two streams and create another stream. In these cases, we can't use the Stream.concat operation because the concat operation waits for the first stream to finish and then consumes the second stream. So we need a way of picking elements from different sources. Effect Stream's merge operations does this for us. Let's discuss some variants of this operation:

merge

The Stream.merge operation allows us to pick elements from different source streams and merge them into a single stream. Unlike Stream.concat, which waits for the first stream to finish before moving to the second, Stream.merge interleaves elements from both streams as they become available.

Here's an example:

ts
import { Schedule, Stream, Effect } from "effect"
 
const s1 = Stream.make(1, 2, 3).pipe(
Stream.schedule(Schedule.spaced("100 millis"))
)
const s2 = Stream.make(4, 5, 6).pipe(
Stream.schedule(Schedule.spaced("200 millis"))
)
 
const stream = Stream.merge(s1, s2)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 4, 2, 3, 5, 6 ]
}
*/
ts
import { Schedule, Stream, Effect } from "effect"
 
const s1 = Stream.make(1, 2, 3).pipe(
Stream.schedule(Schedule.spaced("100 millis"))
)
const s2 = Stream.make(4, 5, 6).pipe(
Stream.schedule(Schedule.spaced("200 millis"))
)
 
const stream = Stream.merge(s1, s2)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 4, 2, 3, 5, 6 ]
}
*/

Termination Strategy

When merging two streams, we should consider their termination strategy. Each stream has its own lifetime, some may finish quickly, while others may continue indefinitely. By default, when using Stream.merge, the resulting stream terminates only when both specified streams terminate.

However, you can define the termination strategy to align with your requirements. Stream offers four different termination strategies using the haltStrategy option:

  • "left". The resulting stream will terminate when the left-hand side stream terminates.
  • "right". The resulting stream will terminate when the right-hand side stream finishes.
  • "both". The resulting stream will terminate when both streams finish.
  • "either". The resulting stream will terminate when one of the streams finishes.

Here's an example of specifying a termination strategy:

ts
import { Stream, Schedule, Effect } from "effect"
 
const s1 = Stream.range(1, 6).pipe(
Stream.schedule(Schedule.spaced("100 millis"))
)
const s2 = Stream.repeatValue(0).pipe(
Stream.schedule(Schedule.spaced("200 millis"))
)
 
const stream = Stream.merge(s1, s2, { haltStrategy: "left" })
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 0, 2, 3, 0, 4, 5 ]
}
*/
ts
import { Stream, Schedule, Effect } from "effect"
 
const s1 = Stream.range(1, 6).pipe(
Stream.schedule(Schedule.spaced("100 millis"))
)
const s2 = Stream.repeatValue(0).pipe(
Stream.schedule(Schedule.spaced("200 millis"))
)
 
const stream = Stream.merge(s1, s2, { haltStrategy: "left" })
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 0, 2, 3, 0, 4, 5 ]
}
*/

In this example, we use haltStrategy: "left" to make the resulting stream terminate when the left-hand stream (s1) finishes.

mergeWith

In some cases, we not only want to merge two streams but also transform and unify their elements into new types. This is where Stream.mergeWith comes into play. It allows us to specify transformation functions for both source streams.

Here's an example:

ts
import { Schedule, Stream, Effect } from "effect"
 
const s1 = Stream.make("1", "2", "3").pipe(
Stream.schedule(Schedule.spaced("100 millis"))
)
const s2 = Stream.make(4.1, 5.3, 6.2).pipe(
Stream.schedule(Schedule.spaced("200 millis"))
)
 
const stream = Stream.mergeWith(s1, s2, {
onSelf: (s) => parseInt(s),
onOther: (n) => Math.floor(n)
})
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 4, 2, 3, 5, 6 ]
}
*/
ts
import { Schedule, Stream, Effect } from "effect"
 
const s1 = Stream.make("1", "2", "3").pipe(
Stream.schedule(Schedule.spaced("100 millis"))
)
const s2 = Stream.make(4.1, 5.3, 6.2).pipe(
Stream.schedule(Schedule.spaced("200 millis"))
)
 
const stream = Stream.mergeWith(s1, s2, {
onSelf: (s) => parseInt(s),
onOther: (n) => Math.floor(n)
})
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 4, 2, 3, 5, 6 ]
}
*/

In this example, we use Stream.mergeWith to merge s1 and s2 while converting string elements from s1 to integers and rounding decimal elements from s2.

Interleaving

The Stream.interleave operator allows us to pull one element at a time from each of two streams, creating a new interleaved stream. Once one of the streams is exhausted, the remaining values from the other stream are pulled.

Here's an example:

ts
import { Stream, Effect } from "effect"
 
const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make(4, 5, 6)
 
const stream = Stream.interleave(s1, s2)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 4, 2, 5, 3, 6 ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make(4, 5, 6)
 
const stream = Stream.interleave(s1, s2)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 4, 2, 5, 3, 6 ]
}
*/

For more advanced interleaving logic, Stream.interleaveWith provides additional flexibility. It allows you to specify the interleaving logic using a third stream of boolean values. When the boolean stream emits true, it chooses elements from the left-hand stream; otherwise, it selects elements from the right-hand stream.

Here's an example:

ts
import { Stream, Effect } from "effect"
 
const s1 = Stream.make(1, 3, 5, 7, 9)
const s2 = Stream.make(2, 4, 6, 8, 10)
 
const booleanStream = Stream.make(true, false, false).pipe(Stream.forever)
 
const stream = Stream.interleaveWith(s1, s2, booleanStream)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 4, 3, 6, 8, 5, 10, 7, 9 ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const s1 = Stream.make(1, 3, 5, 7, 9)
const s2 = Stream.make(2, 4, 6, 8, 10)
 
const booleanStream = Stream.make(true, false, false).pipe(Stream.forever)
 
const stream = Stream.interleaveWith(s1, s2, booleanStream)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 4, 3, 6, 8, 5, 10, 7, 9 ]
}
*/

In this example, booleanStream decides which source stream to choose for interleaving. When true, it picks elements from s1, and when false, it selects elements from s2.

Interspersing

Interspersing is a technique that allows you to add separators in a stream. This can be especially useful when you want to format or structure the data in your streams.

intersperse

The Stream.intersperse operator lets you intersperse a delimiter element between the elements of a stream. This delimiter can be any value you choose. It's added between each pair of elements in the original stream.

Here's an example:

ts
import { Stream, Effect } from "effect"
 
const stream = Stream.make(1, 2, 3, 4, 5).pipe(Stream.intersperse(0))
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 0, 2, 0, 3, 0, 4, 0, 5 ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const stream = Stream.make(1, 2, 3, 4, 5).pipe(Stream.intersperse(0))
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 0, 2, 0, 3, 0, 4, 0, 5 ]
}
*/

In this example, we have a stream stream with numbers from 1 to 5, and we use Stream.intersperse(0) to add zeros between them.

intersperseAffixes

For more advanced interspersing needs, Stream.intersperseAffixes provides greater control. It allows you to specify different affixes for the start, middle, and end of your stream. These affixes can be strings or any other values you want.

Here's an example:

ts
import { Stream, Effect } from "effect"
 
const stream = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.intersperseAffixes({
start: "[",
middle: "-",
end: "]"
})
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ "[", 1, "-", 2, "-", 3, "-", 4, "-", 5, "]" ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const stream = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.intersperseAffixes({
start: "[",
middle: "-",
end: "]"
})
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ "[", 1, "-", 2, "-", 3, "-", 4, "-", 5, "]" ]
}
*/

In this example, we use Stream.intersperseAffixes to enclose the numbers from 1 to 5 within square brackets, separating them with hyphens.

Broadcasting

Broadcasting a stream is a way to create multiple streams that contain the same elements as the source stream. This operation allows you to send each element to multiple downstream streams simultaneously. However, the upstream stream can emit events only up to a certain limit, which is determined by the maximumLag parameter. Once this limit is reached, the upstream stream slows down to match the speed of the slowest downstream stream.

Let's take a closer look at how broadcasting works in the following example. Here, we are broadcasting a stream of numbers to two downstream streams. One of them calculates the maximum number in the stream, while the other performs some logging with an additional delay. The upstream stream adjusts its speed based on the slower logging stream:

ts
import { Effect, Stream, Console, Schedule, Fiber } from "effect"
 
const numbers = Effect.scoped(
Stream.range(1, 21).pipe(
Stream.tap((n) => Console.log(`Emit ${n} element before broadcasting`)),
Stream.broadcast(2, 5),
Stream.flatMap(([first, second]) =>
Effect.gen(function* (_) {
const fiber1 = yield* _(
Stream.runFold(first, 0, (acc, e) => Math.max(acc, e)).pipe(
Effect.flatMap((max) => Console.log(`Maximum: ${max}`)),
Effect.fork
)
)
const fiber2 = yield* _(
second.pipe(
Stream.schedule(Schedule.spaced("1 seconds")),
Stream.runForEach((n) =>
Console.log(`Logging to the Console: ${n}`)
),
Effect.fork
)
)
yield* _(
Fiber.join(fiber1).pipe(
Effect.zip(Fiber.join(fiber2), { concurrent: true })
)
)
})
),
Stream.runCollect
)
)
 
Effect.runPromise(numbers).then(console.log)
/*
Output:
Emit 1 element before broadcasting
Emit 2 element before broadcasting
Emit 3 element before broadcasting
Emit 4 element before broadcasting
Emit 5 element before broadcasting
Emit 6 element before broadcasting
Emit 7 element before broadcasting
Emit 8 element before broadcasting
Emit 9 element before broadcasting
Emit 10 element before broadcasting
Emit 11 element before broadcasting
Logging to the Console: 1
Logging to the Console: 2
Logging to the Console: 3
Logging to the Console: 4
Logging to the Console: 5
Emit 12 element before broadcasting
Emit 13 element before broadcasting
Emit 14 element before broadcasting
Emit 15 element before broadcasting
Emit 16 element before broadcasting
Logging to the Console: 6
Logging to the Console: 7
Logging to the Console: 8
Logging to the Console: 9
Logging to the Console: 10
Emit 17 element before broadcasting
Emit 18 element before broadcasting
Emit 19 element before broadcasting
Emit 20 element before broadcasting
Logging to the Console: 11
Logging to the Console: 12
Logging to the Console: 13
Logging to the Console: 14
Logging to the Console: 15
Maximum: 20
Logging to the Console: 16
Logging to the Console: 17
Logging to the Console: 18
Logging to the Console: 19
Logging to the Console: 20
{
_id: "Chunk",
values: [ undefined ]
}
*/
ts
import { Effect, Stream, Console, Schedule, Fiber } from "effect"
 
const numbers = Effect.scoped(
Stream.range(1, 21).pipe(
Stream.tap((n) => Console.log(`Emit ${n} element before broadcasting`)),
Stream.broadcast(2, 5),
Stream.flatMap(([first, second]) =>
Effect.gen(function* (_) {
const fiber1 = yield* _(
Stream.runFold(first, 0, (acc, e) => Math.max(acc, e)).pipe(
Effect.flatMap((max) => Console.log(`Maximum: ${max}`)),
Effect.fork
)
)
const fiber2 = yield* _(
second.pipe(
Stream.schedule(Schedule.spaced("1 seconds")),
Stream.runForEach((n) =>
Console.log(`Logging to the Console: ${n}`)
),
Effect.fork
)
)
yield* _(
Fiber.join(fiber1).pipe(
Effect.zip(Fiber.join(fiber2), { concurrent: true })
)
)
})
),
Stream.runCollect
)
)
 
Effect.runPromise(numbers).then(console.log)
/*
Output:
Emit 1 element before broadcasting
Emit 2 element before broadcasting
Emit 3 element before broadcasting
Emit 4 element before broadcasting
Emit 5 element before broadcasting
Emit 6 element before broadcasting
Emit 7 element before broadcasting
Emit 8 element before broadcasting
Emit 9 element before broadcasting
Emit 10 element before broadcasting
Emit 11 element before broadcasting
Logging to the Console: 1
Logging to the Console: 2
Logging to the Console: 3
Logging to the Console: 4
Logging to the Console: 5
Emit 12 element before broadcasting
Emit 13 element before broadcasting
Emit 14 element before broadcasting
Emit 15 element before broadcasting
Emit 16 element before broadcasting
Logging to the Console: 6
Logging to the Console: 7
Logging to the Console: 8
Logging to the Console: 9
Logging to the Console: 10
Emit 17 element before broadcasting
Emit 18 element before broadcasting
Emit 19 element before broadcasting
Emit 20 element before broadcasting
Logging to the Console: 11
Logging to the Console: 12
Logging to the Console: 13
Logging to the Console: 14
Logging to the Console: 15
Maximum: 20
Logging to the Console: 16
Logging to the Console: 17
Logging to the Console: 18
Logging to the Console: 19
Logging to the Console: 20
{
_id: "Chunk",
values: [ undefined ]
}
*/

Buffering

Effect streams operate in a pull-based manner, which means downstream consumers can request elements at their own pace without needing to signal the upstream to slow down. However, there are scenarios where you might need to handle producers and consumers independently, especially when there's a speed mismatch between them. This is where buffering comes into play, allowing you to manage communication between a faster producer and a slower consumer effectively. Effect streams provide a built-in Stream.buffer operator to assist with this.

buffer

The Stream.buffer operator is designed to facilitate scenarios where a faster producer needs to work independently of a slower consumer. It achieves this by buffering elements in a queue, allowing the producer to continue working even if the consumer lags behind. You can specify the maximum buffer capacity using the capacity option.

Let's walk through an example to see how it works:

ts
import { Stream, Console, Schedule, Effect } from "effect"
 
const stream = Stream.range(1, 11).pipe(
Stream.tap((n) => Console.log(`before buffering: ${n}`)),
Stream.buffer({ capacity: 4 }),
Stream.tap((n) => Console.log(`after buffering: ${n}`)),
Stream.schedule(Schedule.spaced("5 seconds"))
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
before buffering: 1
before buffering: 2
before buffering: 3
before buffering: 4
before buffering: 5
before buffering: 6
after buffering: 1
after buffering: 2
before buffering: 7
after buffering: 3
before buffering: 8
after buffering: 4
before buffering: 9
after buffering: 5
before buffering: 10
...
*/
ts
import { Stream, Console, Schedule, Effect } from "effect"
 
const stream = Stream.range(1, 11).pipe(
Stream.tap((n) => Console.log(`before buffering: ${n}`)),
Stream.buffer({ capacity: 4 }),
Stream.tap((n) => Console.log(`after buffering: ${n}`)),
Stream.schedule(Schedule.spaced("5 seconds"))
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
before buffering: 1
before buffering: 2
before buffering: 3
before buffering: 4
before buffering: 5
before buffering: 6
after buffering: 1
after buffering: 2
before buffering: 7
after buffering: 3
before buffering: 8
after buffering: 4
before buffering: 9
after buffering: 5
before buffering: 10
...
*/

In this example, we create a stream of numbers from 1 to 11. We use Stream.buffer({ capacity: 4 }) to buffer up to 4 elements at a time. As you can see, the Stream.tap operator allows us to log each element before and after buffering. We've also introduced a 5-second delay between each emission to illustrate the lag between producing and consuming messages.

You can choose from different buffering options based on the type of underlying queue you need:

  • Bounded Queue: { capacity: number }
  • Unbounded Queue: { capacity: "unbounded" }
  • Sliding Queue: { capacity: number, strategy: "sliding" }
  • Dropping Queue: { capacity: number, strategy: "dropping" }

Debouncing

The Stream.debounce function plays a crucial role in controlling the rate at which elements are emitted. It introduces a minimum time interval between the emission of each element. This ensures that elements are emitted at a more controlled pace, especially when dealing with rapid or frequent emissions.

ts
import { Stream, Effect } from "effect"
 
const stream = Stream.make(1, 2, 3).pipe(
Stream.concat(Stream.fromEffect(Effect.sleep("500 millis"))),
Stream.concat(Stream.make(4, 5)),
Stream.concat(Stream.fromEffect(Effect.sleep("10 millis"))),
Stream.concat(Stream.make(6)),
Stream.debounce("100 millis") // Emit only after a pause of at least 100 ms
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 3, 6 ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const stream = Stream.make(1, 2, 3).pipe(
Stream.concat(Stream.fromEffect(Effect.sleep("500 millis"))),
Stream.concat(Stream.make(4, 5)),
Stream.concat(Stream.fromEffect(Effect.sleep("10 millis"))),
Stream.concat(Stream.make(6)),
Stream.debounce("100 millis") // Emit only after a pause of at least 100 ms
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 3, 6 ]
}
*/

In this example, we have a stream that emits elements at varying intervals. Some elements are emitted rapidly, while others are separated by pauses of different durations. We apply debouncing with a minimum pause requirement of 100 milliseconds using Stream.debounce("100 millis").

The result is that only elements that follow a pause of at least 100 milliseconds are emitted. This means that elements 1, 2, 4, and 5 are effectively skipped because they are emitted too close together. Only elements 3 and 6, which have a pause of at least 100 milliseconds before them, are emitted.