Sink Operations

On this page

In the previous sections, we learned how to create and use sinks. Now, let's explore some operations you can perform on sinks to transform or filter their behavior.

Changing the Input

Sometimes, you have a sink that works perfectly with one type of input, but you want to use it with a different type. This is where Sink.mapInput comes in handy. While map modifies the output of a function, mapInput modifies the input. It allows you to adapt your sink to work with a different input.

Imagine you have a Sink.sum that calculates the sum of incoming numeric values. However, your stream contains strings, not numbers. You can use mapInput to convert your strings into numbers and make Sink.sum compatible with your stream:

ts
import { Stream, Sink, Effect } from "effect"
 
const numericSum = Sink.sum
 
const stringSum = numericSum.pipe(
Sink.mapInput((s: string) => Number.parseFloat(s))
)
 
Effect.runPromise(
Stream.make("1", "2", "3", "4", "5").pipe(Stream.run(stringSum))
).then(console.log)
/*
Output:
15
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const numericSum = Sink.sum
 
const stringSum = numericSum.pipe(
Sink.mapInput((s: string) => Number.parseFloat(s))
)
 
Effect.runPromise(
Stream.make("1", "2", "3", "4", "5").pipe(Stream.run(stringSum))
).then(console.log)
/*
Output:
15
*/

Transforming Both Input and Output

If you need to change both the input and output of a sink, you can use Sink.dimap. It's an extended version of mapInput that lets you transform both types. This can be useful when you need to perform a complete conversion between your input and output types:

ts
import { Stream, Sink, Effect } from "effect"
 
// Convert its input to integers, do the computation and then convert them back to a string
const sumSink = Sink.sum.pipe(
Sink.dimap({
onInput: (s: string) => Number.parseFloat(s),
onDone: (n) => String(n)
})
)
 
Effect.runPromise(
Stream.make("1", "2", "3", "4", "5").pipe(Stream.run(sumSink))
).then(console.log)
/*
Output:
15 <-- as string
*/
ts
import { Stream, Sink, Effect } from "effect"
 
// Convert its input to integers, do the computation and then convert them back to a string
const sumSink = Sink.sum.pipe(
Sink.dimap({
onInput: (s: string) => Number.parseFloat(s),
onDone: (n) => String(n)
})
)
 
Effect.runPromise(
Stream.make("1", "2", "3", "4", "5").pipe(Stream.run(sumSink))
).then(console.log)
/*
Output:
15 <-- as string
*/

Filtering Input

Sinks offer a way to filter incoming elements using Sink.filterInput. This allows you to collect or process only the elements that meet a specific condition. In the following example, we collect elements in chunks of three and filter out the negative numbers:

ts
import { Stream, Sink, Effect } from "effect"
 
const stream = Stream.make(1, -2, 0, 1, 3, -3, 4, 2, 0, 1, -3, 1, 1, 6).pipe(
Stream.transduce(
Sink.collectAllN<number>(3).pipe(Sink.filterInput((n) => n > 0))
)
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
{
_id: "Chunk",
values: [ 1, 1, 3 ]
}, {
_id: "Chunk",
values: [ 4, 2, 1 ]
}, {
_id: "Chunk",
values: [ 1, 1, 6 ]
}, {
_id: "Chunk",
values: []
}
]
}
*/
ts
import { Stream, Sink, Effect } from "effect"
 
const stream = Stream.make(1, -2, 0, 1, 3, -3, 4, 2, 0, 1, -3, 1, 1, 6).pipe(
Stream.transduce(
Sink.collectAllN<number>(3).pipe(Sink.filterInput((n) => n > 0))
)
)
 
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
{
_id: "Chunk",
values: [
{
_id: "Chunk",
values: [ 1, 1, 3 ]
}, {
_id: "Chunk",
values: [ 4, 2, 1 ]
}, {
_id: "Chunk",
values: [ 1, 1, 6 ]
}, {
_id: "Chunk",
values: []
}
]
}
*/