Consuming Streams

On this page

When working with streams, it's essential to understand how to consume the data they produce. In this guide, we'll walk through several common methods for consuming streams.

Using runCollect

To gather all the elements from a stream into a single Chunk, you can use the Stream.runCollect function.

ts
import { Stream, Effect } from "effect"
 
const stream = Stream.make(1, 2, 3, 4, 5)
 
const collectedData = Stream.runCollect(stream)
 
Effect.runPromise(collectedData).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3, 4, 5 ]
}
*/
ts
import { Stream, Effect } from "effect"
 
const stream = Stream.make(1, 2, 3, 4, 5)
 
const collectedData = Stream.runCollect(stream)
 
Effect.runPromise(collectedData).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [ 1, 2, 3, 4, 5 ]
}
*/

Using runForEach

Another way to consume elements of a stream is by using Stream.runForEach. It takes a callback function that receives each element of the stream. Here's an example:

ts
import { Stream, Effect, Console } from "effect"
 
const effect = Stream.make(1, 2, 3).pipe(
Stream.runForEach((n) => Console.log(n))
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
1
2
3
undefined
*/
ts
import { Stream, Effect, Console } from "effect"
 
const effect = Stream.make(1, 2, 3).pipe(
Stream.runForEach((n) => Console.log(n))
)
 
Effect.runPromise(effect).then(console.log)
/*
Output:
1
2
3
undefined
*/

In this example, we use Stream.runForEach to log each element to the console.

Using a Fold Operation

The Stream.fold function is another way to consume a stream by performing a fold operation over the stream of values and returning an effect containing the result. Here are a couple of examples:

ts
import { Stream, Effect } from "effect"
 
const e1 = Stream.make(1, 2, 3, 4, 5).pipe(Stream.runFold(0, (a, b) => a + b))
 
Effect.runPromise(e1).then(console.log) // Output: 15
 
const e2 = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.runFoldWhile(
0,
(n) => n <= 3,
(a, b) => a + b
)
)
 
Effect.runPromise(e2).then(console.log) // Output: 6
ts
import { Stream, Effect } from "effect"
 
const e1 = Stream.make(1, 2, 3, 4, 5).pipe(Stream.runFold(0, (a, b) => a + b))
 
Effect.runPromise(e1).then(console.log) // Output: 15
 
const e2 = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.runFoldWhile(
0,
(n) => n <= 3,
(a, b) => a + b
)
)
 
Effect.runPromise(e2).then(console.log) // Output: 6

In the first example (e1), we use Stream.runFold to calculate the sum of all elements. In the second example (e2), we use Stream.runFoldWhile to calculate the sum but only until a certain condition is met.

Using a Sink

To consume a stream using a Sink, you can pass the Sink to the Stream.run function. Here's an example:

ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3).pipe(Stream.run(Sink.sum))
 
Effect.runPromise(effect).then(console.log) // Output: 6
ts
import { Stream, Sink, Effect } from "effect"
 
const effect = Stream.make(1, 2, 3).pipe(Stream.run(Sink.sum))
 
Effect.runPromise(effect).then(console.log) // Output: 6

In this example, we use a Sink to calculate the sum of the elements in the stream.