Leftovers
On this page
In this section, we'll explore how to deal with elements that may be left unconsumed by sinks. Sinks can consume varying numbers of elements from their upstream, and we'll learn how to collect or ignore any leftovers.
Collecting Leftovers
When a sink consumes elements from an upstream source, it may not use all of them. These unconsumed elements are referred to as "leftovers." To collect these leftovers, we can use Sink.collectLeftover
. It returns a tuple containing the result of the previous sink operation and any leftover elements:
ts
import {Stream ,Sink ,Effect } from "effect"consts1 =Stream .make (1, 2, 3, 4, 5).pipe (Stream .run (Sink .take <number>(3).pipe (Sink .collectLeftover )))Effect .runPromise (s1 ).then (console .log )/*Output:[{_id: "Chunk",values: [ 1, 2, 3 ]}, {_id: "Chunk",values: [ 4, 5 ]}]*/consts2 =Stream .make (1, 2, 3, 4, 5).pipe (Stream .run (Sink .head <number>().pipe (Sink .collectLeftover )))Effect .runPromise (s2 ).then (console .log )/*Output:[{_id: "Option",_tag: "Some",value: 1}, {_id: "Chunk",values: [ 2, 3, 4, 5 ]}]*/
ts
import {Stream ,Sink ,Effect } from "effect"consts1 =Stream .make (1, 2, 3, 4, 5).pipe (Stream .run (Sink .take <number>(3).pipe (Sink .collectLeftover )))Effect .runPromise (s1 ).then (console .log )/*Output:[{_id: "Chunk",values: [ 1, 2, 3 ]}, {_id: "Chunk",values: [ 4, 5 ]}]*/consts2 =Stream .make (1, 2, 3, 4, 5).pipe (Stream .run (Sink .head <number>().pipe (Sink .collectLeftover )))Effect .runPromise (s2 ).then (console .log )/*Output:[{_id: "Option",_tag: "Some",value: 1}, {_id: "Chunk",values: [ 2, 3, 4, 5 ]}]*/
Ignoring Leftovers
When leftover elements are not needed, they can be ignored using Sink.ignoreLeftover
:
ts
import {Stream ,Sink ,Effect } from "effect"consts1 =Stream .make (1, 2, 3, 4, 5).pipe (Stream .run (Sink .take <number>(3).pipe (Sink .ignoreLeftover ).pipe (Sink .collectLeftover )))Effect .runPromise (s1 ).then (console .log )/*Output:[{_id: "Chunk",values: [ 1, 2, 3 ]}, {_id: "Chunk",values: []}]*/
ts
import {Stream ,Sink ,Effect } from "effect"consts1 =Stream .make (1, 2, 3, 4, 5).pipe (Stream .run (Sink .take <number>(3).pipe (Sink .ignoreLeftover ).pipe (Sink .collectLeftover )))Effect .runPromise (s1 ).then (console .log )/*Output:[{_id: "Chunk",values: [ 1, 2, 3 ]}, {_id: "Chunk",values: []}]*/