Consuming Streams
On this page
When working with streams, it's essential to understand how to consume the data they produce. In this guide, we'll walk through several common methods for consuming streams.
runCollect
Using To gather all the elements from a stream into a single Chunk
, you can use the Stream.runCollect
function.
ts
import {Stream ,Effect } from "effect"conststream =Stream .make (1, 2, 3, 4, 5)constcollectedData =Stream .runCollect (stream )Effect .runPromise (collectedData ).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 3, 4, 5 ]}*/
ts
import {Stream ,Effect } from "effect"conststream =Stream .make (1, 2, 3, 4, 5)constcollectedData =Stream .runCollect (stream )Effect .runPromise (collectedData ).then (console .log )/*Output:{_id: "Chunk",values: [ 1, 2, 3, 4, 5 ]}*/
runForEach
Using Another way to consume elements of a stream is by using Stream.runForEach
. It takes a callback function that receives each element of the stream. Here's an example:
ts
import {Stream ,Effect ,Console } from "effect"consteffect =Stream .make (1, 2, 3).pipe (Stream .runForEach ((n ) =>Console .log (n )))Effect .runPromise (effect ).then (console .log )/*Output:123undefined*/
ts
import {Stream ,Effect ,Console } from "effect"consteffect =Stream .make (1, 2, 3).pipe (Stream .runForEach ((n ) =>Console .log (n )))Effect .runPromise (effect ).then (console .log )/*Output:123undefined*/
In this example, we use Stream.runForEach
to log each element to the console.
Using a Fold Operation
The Stream.fold
function is another way to consume a stream by performing a fold operation over the stream of values and returning an effect containing the result. Here are a couple of examples:
ts
import {Stream ,Effect } from "effect"conste1 =Stream .make (1, 2, 3, 4, 5).pipe (Stream .runFold (0, (a ,b ) =>a +b ))Effect .runPromise (e1 ).then (console .log ) // Output: 15conste2 =Stream .make (1, 2, 3, 4, 5).pipe (Stream .runFoldWhile (0,(n ) =>n <= 3,(a ,b ) =>a +b ))Effect .runPromise (e2 ).then (console .log ) // Output: 6
ts
import {Stream ,Effect } from "effect"conste1 =Stream .make (1, 2, 3, 4, 5).pipe (Stream .runFold (0, (a ,b ) =>a +b ))Effect .runPromise (e1 ).then (console .log ) // Output: 15conste2 =Stream .make (1, 2, 3, 4, 5).pipe (Stream .runFoldWhile (0,(n ) =>n <= 3,(a ,b ) =>a +b ))Effect .runPromise (e2 ).then (console .log ) // Output: 6
In the first example (e1
), we use Stream.runFold
to calculate the sum of all elements. In the second example (e2
), we use Stream.runFoldWhile
to calculate the sum but only until a certain condition is met.
Using a Sink
To consume a stream using a Sink, you can pass the Sink
to the Stream.run
function. Here's an example:
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3).pipe (Stream .run (Sink .sum ))Effect .runPromise (effect ).then (console .log ) // Output: 6
ts
import {Stream ,Sink ,Effect } from "effect"consteffect =Stream .make (1, 2, 3).pipe (Stream .run (Sink .sum ))Effect .runPromise (effect ).then (console .log ) // Output: 6
In this example, we use a Sink
to calculate the sum of the elements in the stream.