Schedule Combinators
On this page
Schedules define stateful, possibly effectful, recurring schedules of events, and compose in a variety of ways. Combinators allow us to take schedules and combine them together to get other schedules.
To demonstrate the functionality of different combinators, we will be working with the following helper:
ts
import {Effect } from "effect"letstart = newDate ().getTime ()export constlog =Effect .sync (() => {constnow = newDate ().getTime ()console .log (`delay: ${now -start }`)start =now })
ts
import {Effect } from "effect"letstart = newDate ().getTime ()export constlog =Effect .sync (() => {constnow = newDate ().getTime ()console .log (`delay: ${now -start }`)start =now })
The logDelay
helper logs the time delay between each execution. We will use this effect to showcase the behavior of various built-in combinators.
Composition
Schedules compose in the following primary ways:
- Union. This performs the union of the intervals of two schedules.
- Intersection. This performs the intersection of the intervals of two schedules.
- Sequencing. This concatenates the intervals of one schedule onto another.
Union
Combines two schedules through union, by recurring if either schedule wants to recur, using the minimum of the two delays between recurrences.
ts
import {Effect ,Schedule } from "effect"import * asDelay from "./Delay"constschedule =Schedule .union (Schedule .exponential ("100 millis"),Schedule .spaced ("1 seconds"))Effect .runPromise (Effect .repeat (Delay .log ,schedule ))/*Output:delay: 3delay: 115 < exponentialdelay: 202delay: 404delay: 802delay: 1002 < spaceddelay: 1006delay: 1006delay: 1006delay: 1006...*/
ts
import {Effect ,Schedule } from "effect"import * asDelay from "./Delay"constschedule =Schedule .union (Schedule .exponential ("100 millis"),Schedule .spaced ("1 seconds"))Effect .runPromise (Effect .repeat (Delay .log ,schedule ))/*Output:delay: 3delay: 115 < exponentialdelay: 202delay: 404delay: 802delay: 1002 < spaceddelay: 1006delay: 1006delay: 1006delay: 1006...*/
When we use the combined schedule with Effect.repeat
, we observe that the effect is executed repeatedly based on the minimum delay between the two schedules. In this case, the delay alternates between the exponential schedule (increasing delay) and the spaced schedule (constant delay).
Intersection
Combines two schedules through the intersection, by recurring only if both schedules want to recur, using the maximum of the two delays between recurrences.
ts
import {Effect ,Schedule } from "effect"import * asDelay from "./Delay"constschedule =Schedule .intersect (Schedule .exponential ("10 millis"),Schedule .recurs (5))Effect .runPromise (Effect .repeat (Delay .log ,schedule ))/*Output:delay: 4delay: 18 < recursdelay: 22delay: 45delay: 84delay: 166*/
ts
import {Effect ,Schedule } from "effect"import * asDelay from "./Delay"constschedule =Schedule .intersect (Schedule .exponential ("10 millis"),Schedule .recurs (5))Effect .runPromise (Effect .repeat (Delay .log ,schedule ))/*Output:delay: 4delay: 18 < recursdelay: 22delay: 45delay: 84delay: 166*/
When we use the combined schedule with Effect.repeat
, we observe that the effect is executed repeatedly only if both schedules want it to recur. The delay between recurrences is determined by the maximum delay between the two schedules. In this case, the delay follows the progression of the exponential schedule until the maximum number of recurrences specified by the recursive schedule is reached.
Sequencing
Combines two schedules sequentially, by following the first policy until it ends, and then following the second policy.
ts
import {Effect ,Schedule } from "effect"import * asDelay from "./Delay"constschedule =Schedule .andThen (Schedule .recurs (5),Schedule .spaced ("1 seconds"))Effect .runPromise (Effect .repeat (Delay .log ,schedule ))/*Output:delay: 4delay: 8 < recursdelay: 4delay: 2delay: 0delay: 2delay: 1005 < spaceddelay: 1007delay: 1006delay: 1007delay: 1005...*/
ts
import {Effect ,Schedule } from "effect"import * asDelay from "./Delay"constschedule =Schedule .andThen (Schedule .recurs (5),Schedule .spaced ("1 seconds"))Effect .runPromise (Effect .repeat (Delay .log ,schedule ))/*Output:delay: 4delay: 8 < recursdelay: 4delay: 2delay: 0delay: 2delay: 1005 < spaceddelay: 1007delay: 1006delay: 1007delay: 1005...*/
When we use the combined schedule with Effect.repeat
, we observe that the effect follows the policy of the first schedule (recurs) until it completes the specified number of recurrences. After that, it switches to the policy of the second schedule (spaced) and continues repeating the effect with the fixed delay between recurrences.
Jittering
A jittered
is a combinator that takes one schedule and returns another schedule of the same type except for the delay which is applied randomly
When a resource is out of service due to overload or contention, retrying and backing off doesn't help us. If all failed API calls are backed off to the same point of time, they cause another overload or contention. Jitter adds some amount of randomness to the delay of the schedule. This helps us to avoid ending up accidentally synchronizing and taking the service down by accident.
Research shows that Schedule.jittered(0.0, 1.0)
is very suitable for retrying.
ts
import {Effect ,Schedule } from "effect"import * asDelay from "./Delay"constschedule =Schedule .jittered (Schedule .exponential ("10 millis"))Effect .runPromise (Effect .repeat (Delay .log ,schedule ))/*Output:delay: 3delay: 18delay: 24delay: 48delay: 92delay: 184delay: 351delay: 620delay: 1129delay: 2576...*/
ts
import {Effect ,Schedule } from "effect"import * asDelay from "./Delay"constschedule =Schedule .jittered (Schedule .exponential ("10 millis"))Effect .runPromise (Effect .repeat (Delay .log ,schedule ))/*Output:delay: 3delay: 18delay: 24delay: 48delay: 92delay: 184delay: 351delay: 620delay: 1129delay: 2576...*/
In this example, we use the jittered
combinator to apply jitter to an exponential schedule. The exponential schedule increases the delay between each repetition exponentially. By adding jitter to the schedule, the delays become randomly adjusted within a certain range.
Filtering
We can filter inputs or outputs of a schedule with whileInput
and whileOutput
.
ts
import {Effect ,Schedule } from "effect"import * asDelay from "./Delay"constschedule =Schedule .whileOutput (Schedule .recurs (5), (n ) =>n <= 2)Effect .runPromise (Effect .repeat (Delay .log ,schedule ))/*Output:delay: 2delay: 11 < recursdelay: 1delay: 1(end) < whileOutput*/
ts
import {Effect ,Schedule } from "effect"import * asDelay from "./Delay"constschedule =Schedule .whileOutput (Schedule .recurs (5), (n ) =>n <= 2)Effect .runPromise (Effect .repeat (Delay .log ,schedule ))/*Output:delay: 2delay: 11 < recursdelay: 1delay: 1(end) < whileOutput*/
In this example, we create a schedule using Schedule.recurs(5)
to repeat a certain action up to 5 times. However, we apply the whileOutput
combinator with a predicate that filters out outputs greater than 2. As a result, the schedule stops producing outputs once the value exceeds 2, and the repetition ends.
Modifying
Modifies the delay of a schedule.
ts
import {Effect ,Schedule } from "effect"import * asDelay from "./Delay"constschedule =Schedule .modifyDelay (Schedule .spaced ("1 seconds"),(_ ) => "100 millis")Effect .runPromise (Effect .repeat (Delay .log ,schedule ))/*Output:delay: 4delay: 110 < modifyDelaydelay: 103delay: 103delay: 106...*/
ts
import {Effect ,Schedule } from "effect"import * asDelay from "./Delay"constschedule =Schedule .modifyDelay (Schedule .spaced ("1 seconds"),(_ ) => "100 millis")Effect .runPromise (Effect .repeat (Delay .log ,schedule ))/*Output:delay: 4delay: 110 < modifyDelaydelay: 103delay: 103delay: 106...*/
Tapping
Whenever we need to effectfully process each schedule input/output, we can use tapInput
and tapOutput
.
We can use these two functions for logging purposes:
ts
import {Effect ,Schedule ,Console } from "effect"import * asDelay from "./Delay"constschedule =Schedule .tapOutput (Schedule .recurs (2), (n ) =>Console .log (`repeating ${n }`))Effect .runPromise (Effect .repeat (Delay .log ,schedule ))/*Output:delay: 2repeating 0delay: 8repeating 1delay: 5repeating 2*/
ts
import {Effect ,Schedule ,Console } from "effect"import * asDelay from "./Delay"constschedule =Schedule .tapOutput (Schedule .recurs (2), (n ) =>Console .log (`repeating ${n }`))Effect .runPromise (Effect .repeat (Delay .log ,schedule ))/*Output:delay: 2repeating 0delay: 8repeating 1delay: 5repeating 2*/