Parallel Operators
On this page
In this section, we'll explore parallel operations that allow you to run multiple sinks concurrently. These operations can be quite useful when you need to perform tasks simultaneously.
Parallel Zipping: Combining Results
When you want to run two sinks concurrently and combine their results, you can use Sink.zip
. This operation runs both sinks concurrently and combines their outcomes into a tuple:
ts
import {Sink ,Console ,Stream ,Schedule ,Effect } from "effect"consts1 =Sink .forEach ((s : string) =>Console .log (`sink 1: ${s }`)).pipe (Sink .as (1))consts2 =Sink .forEach ((s : string) =>Console .log (`sink 2: ${s }`)).pipe (Sink .as (2))constsink =s1 .pipe (Sink .zip (s2 , {concurrent : true }))Effect .runPromise (Stream .make ("1", "2", "3", "4", "5").pipe (Stream .schedule (Schedule .spaced ("10 millis")),Stream .run (sink ))).then (console .log )/*Output:sink 1: 1sink 2: 1sink 1: 2sink 2: 2sink 1: 3sink 2: 3sink 1: 4sink 2: 4sink 1: 5sink 2: 5[ 1, 2 ]*/
ts
import {Sink ,Console ,Stream ,Schedule ,Effect } from "effect"consts1 =Sink .forEach ((s : string) =>Console .log (`sink 1: ${s }`)).pipe (Sink .as (1))consts2 =Sink .forEach ((s : string) =>Console .log (`sink 2: ${s }`)).pipe (Sink .as (2))constsink =s1 .pipe (Sink .zip (s2 , {concurrent : true }))Effect .runPromise (Stream .make ("1", "2", "3", "4", "5").pipe (Stream .schedule (Schedule .spaced ("10 millis")),Stream .run (sink ))).then (console .log )/*Output:sink 1: 1sink 2: 1sink 1: 2sink 2: 2sink 1: 3sink 2: 3sink 1: 4sink 2: 4sink 1: 5sink 2: 5[ 1, 2 ]*/
Racing: First One Wins
Another useful operation is Sink.race
, which lets you race multiple sinks concurrently. The sink that completes first will provide the result for your program:
ts
import {Sink ,Console ,Stream ,Schedule ,Effect } from "effect"consts1 =Sink .forEach ((s : string) =>Console .log (`sink 1: ${s }`)).pipe (Sink .as (1))consts2 =Sink .forEach ((s : string) =>Console .log (`sink 2: ${s }`)).pipe (Sink .as (2))constsink =s1 .pipe (Sink .race (s2 ))Effect .runPromise (Stream .make ("1", "2", "3", "4", "5").pipe (Stream .schedule (Schedule .spaced ("10 millis")),Stream .run (sink ))).then (console .log )/*Output:sink 1: 1sink 2: 1sink 1: 2sink 2: 2sink 1: 3sink 2: 3sink 1: 4sink 2: 4sink 1: 5sink 2: 51*/
ts
import {Sink ,Console ,Stream ,Schedule ,Effect } from "effect"consts1 =Sink .forEach ((s : string) =>Console .log (`sink 1: ${s }`)).pipe (Sink .as (1))consts2 =Sink .forEach ((s : string) =>Console .log (`sink 2: ${s }`)).pipe (Sink .as (2))constsink =s1 .pipe (Sink .race (s2 ))Effect .runPromise (Stream .make ("1", "2", "3", "4", "5").pipe (Stream .schedule (Schedule .spaced ("10 millis")),Stream .run (sink ))).then (console .log )/*Output:sink 1: 1sink 2: 1sink 1: 2sink 2: 2sink 1: 3sink 2: 3sink 1: 4sink 2: 4sink 1: 5sink 2: 51*/