Fibers
On this page
What is a Fiber?
A "fiber" is a small unit of work or a lightweight thread of execution. It represents a specific computation or an effectful operation in a program. Fibers are used to manage concurrency and asynchronous tasks.
Think of a fiber as a worker that performs a specific job. It can be started, paused, resumed, and even interrupted. Fibers are useful when we want to perform multiple tasks simultaneously or handle long-running operations without blocking the main program.
By using fibers, developers can control and coordinate the execution of tasks, allowing for efficient multitasking and responsiveness in their applications.
To summarize:
- An
Effect
is a higher-level concept that describes an effectful computation. It is lazy and immutable, meaning it represents a computation that may produce a value or fail but does not immediately execute. - A fiber, on the other hand, represents the running execution of an
Effect
. It can be interrupted or awaited to retrieve its result. Think of it as a way to control and interact with the ongoing computation.
Creating Fibers
A fiber is created any time an effect is run. When running effects concurrently, a fiber is created for each concurrent effect.
Lifetime of Child Fibers
When we fork fibers, depending on how we fork them we can have four different lifetime strategies for the child fibers:
-
Fork With Automatic Supervision. If we use the ordinary
Effect.fork
operation, the child fiber will be automatically supervised by the parent fiber. The lifetime child fibers are tied to the lifetime of their parent fiber. This means that these fibers will be terminated either when they end naturally, or when their parent fiber is terminated. -
Fork in Global Scope (Daemon). Sometimes we want to run long-running background fibers that aren't tied to their parent fiber, and also we want to fork them in a global scope. Any fiber that is forked in global scope will become daemon fiber. This can be achieved by using the
Effect.forkDaemon
operator. As these fibers have no parent, they are not supervised, and they will be terminated when they end naturally, or when our application is terminated. -
Fork in Local Scope. Sometimes, we want to run a background fiber that isn't tied to its parent fiber, but we want to live that fiber in the local scope. We can fork fibers in the local scope by using
Effect.forkScoped
. Such fibers can outlive their parent fiber (so they are not supervised by their parents), and they will be terminated when their life end or their local scope is closed. -
Fork in Specific Scope. This is similar to the previous strategy, but we can have more fine-grained control over the lifetime of the child fiber by forking it in a specific scope. We can do this by using the
Effect.forkIn
operator.
Fork with Automatic Supervision
Effect employs a structured concurrency model where the lifetimes of fibers are neatly nested. Simply put, the lifespan of a fiber depends on the lifespan of its parent fiber.
To help clarify this concept, let's explore the following example. In this scenario, the foo
fiber spawns the bar
fiber. The bar
fiber is engaged in a long-running task that never completes. What's important to note here is that Effect ensures the bar
fiber will not outlive the foo
fiber:
ts
import {Effect ,Console ,Schedule } from "effect"constbarJob =Effect .repeat (Console .log ("Bar: still running!"),Schedule .fixed ("1 seconds"))constfooJob =Effect .gen (function* (_ ) {console .log ("Foo: started!")yield*_ (Effect .fork (barJob ))yield*_ (Effect .sleep ("3 seconds"))console .log ("Foo: finished!")})Effect .runPromise (fooJob )
ts
import {Effect ,Console ,Schedule } from "effect"constbarJob =Effect .repeat (Console .log ("Bar: still running!"),Schedule .fixed ("1 seconds"))constfooJob =Effect .gen (function* (_ ) {console .log ("Foo: started!")yield*_ (Effect .fork (barJob ))yield*_ (Effect .sleep ("3 seconds"))console .log ("Foo: finished!")})Effect .runPromise (fooJob )
When you run the above program, you'll see the following output:
Foo: started!Bar: still running!Bar: still running!Bar: still running!Foo: finished!
Foo: started!Bar: still running!Bar: still running!Bar: still running!Foo: finished!
This pattern can be extended to any level of nested fibers.
Fork in Global Scope (Daemon)
Using Effect.forkDaemon
we can create a daemon fiber from an effect. Its lifetime is tied to the global scope. So if the parent fiber terminates, the daemon fiber will not be terminated. It will only will be terminated when the global scope is closed, or its life end naturally.
ts
import {Effect ,Console ,Schedule } from "effect"constbarJob =Effect .repeat (Console .log ("Bar: still running!"),Schedule .fixed ("1 seconds"))constfooJob =Effect .gen (function* (_ ) {console .log ("Foo: started!")yield*_ (Effect .forkDaemon (barJob ))yield*_ (Effect .sleep ("3 seconds"))console .log ("Foo: finished!")})Effect .runPromise (fooJob )
ts
import {Effect ,Console ,Schedule } from "effect"constbarJob =Effect .repeat (Console .log ("Bar: still running!"),Schedule .fixed ("1 seconds"))constfooJob =Effect .gen (function* (_ ) {console .log ("Foo: started!")yield*_ (Effect .forkDaemon (barJob ))yield*_ (Effect .sleep ("3 seconds"))console .log ("Foo: finished!")})Effect .runPromise (fooJob )
If we run the above program, we will see the following output which shows that while the lifetime of the foo
fiber ends after 3 seconds, the daemon fiber (bar
) is still running:
Foo: started!Bar: still running!Bar: still running!Bar: still running!Foo: finished!Bar: still running!Bar: still running!Bar: still running!Bar: still running!Bar: still running!...etc...
Foo: started!Bar: still running!Bar: still running!Bar: still running!Foo: finished!Bar: still running!Bar: still running!Bar: still running!Bar: still running!Bar: still running!...etc...
Even if we interrupt the foo
fiber, the daemon fiber (bar
) will not be interrupted:
ts
import {Effect ,Console ,Schedule ,Fiber } from "effect"constbarJob =Effect .repeat (Console .log ("Bar: still running!"),Schedule .fixed ("1 seconds"))constfooJob =Effect .gen (function* (_ ) {console .log ("Foo: started!")yield*_ (Effect .forkDaemon (barJob ))yield*_ (Effect .sleep ("3 seconds"))console .log ("Foo: finished!")}).pipe (Effect .onInterrupt (() =>Console .log ("Foo: interrupted!")))constprogram =Effect .gen (function* (_ ) {constf = yield*_ (Effect .fork (fooJob ))yield*_ (Effect .sleep ("2 seconds"))yield*_ (Fiber .interrupt (f ))})Effect .runPromise (program )
ts
import {Effect ,Console ,Schedule ,Fiber } from "effect"constbarJob =Effect .repeat (Console .log ("Bar: still running!"),Schedule .fixed ("1 seconds"))constfooJob =Effect .gen (function* (_ ) {console .log ("Foo: started!")yield*_ (Effect .forkDaemon (barJob ))yield*_ (Effect .sleep ("3 seconds"))console .log ("Foo: finished!")}).pipe (Effect .onInterrupt (() =>Console .log ("Foo: interrupted!")))constprogram =Effect .gen (function* (_ ) {constf = yield*_ (Effect .fork (fooJob ))yield*_ (Effect .sleep ("2 seconds"))yield*_ (Fiber .interrupt (f ))})Effect .runPromise (program )
The output:
Foo: started!Bar: still running!Bar: still running!Foo: interrupted!Bar: still running!Bar: still running!Bar: still running!Bar: still running!Bar: still running!...etc...
Foo: started!Bar: still running!Bar: still running!Foo: interrupted!Bar: still running!Bar: still running!Bar: still running!Bar: still running!Bar: still running!...etc...
Fork in Local Scope
Sometimes we want to attach fiber to a local scope. In such cases, we can use the Effect.forkScoped
operator. By using this operator, the lifetime of the forked fiber can be outlived the lifetime of its parent fiber, and it will be terminated when the local scope is closed:
ts
import {Effect ,Console ,Schedule } from "effect"constbarJob =Effect .repeat (Console .log ("Bar: still running!"),Schedule .fixed ("1 seconds"))constfooJob =Effect .gen (function* (_ ) {console .log ("Foo: started!")yield*_ (Effect .forkScoped (barJob ))yield*_ (Effect .sleep ("3 seconds"))console .log ("Foo: finished!")})constprogram =Effect .scoped (Effect .gen (function* (_ ) {console .log ("Local scope started!")yield*_ (Effect .fork (fooJob ))yield*_ (Effect .sleep ("5 seconds"))console .log ("Leaving the local scope!")}))Effect .runPromise (program )
ts
import {Effect ,Console ,Schedule } from "effect"constbarJob =Effect .repeat (Console .log ("Bar: still running!"),Schedule .fixed ("1 seconds"))constfooJob =Effect .gen (function* (_ ) {console .log ("Foo: started!")yield*_ (Effect .forkScoped (barJob ))yield*_ (Effect .sleep ("3 seconds"))console .log ("Foo: finished!")})constprogram =Effect .scoped (Effect .gen (function* (_ ) {console .log ("Local scope started!")yield*_ (Effect .fork (fooJob ))yield*_ (Effect .sleep ("5 seconds"))console .log ("Leaving the local scope!")}))Effect .runPromise (program )
In the above example, the bar
fiber forked in the local scope has bigger lifetime than its parent fiber (foo
).
So, when its parent fiber (foo
) is terminated, the bar
fiber still running in the local scope until the local scope is closed.
Let's see the output:
Local scope started!Foo: started!Bar: still running!Bar: still running!Bar: still running!Foo: finished!Bar: still running!Bar: still running!Leaving the local scope!
Local scope started!Foo: started!Bar: still running!Bar: still running!Bar: still running!Foo: finished!Bar: still running!Bar: still running!Leaving the local scope!
Fork in Specific Scope
There are some cases where we need more fine-grained control, so we want to fork a fiber in a specific scope. We can use the Effect.forkIn
operator which takes the target scope as an argument:
ts
import {Effect ,Console ,Schedule } from "effect"constbarJob =Effect .repeat (Console .log ("Still running!"),Schedule .fixed ("1 seconds"))constprogram =Effect .scoped (Effect .gen (function* (_ ) {constscope = yield*_ (Effect .scope )yield*_ (Effect .scoped (Effect .gen (function* (_ ) {yield*_ (Effect .forkIn (barJob ,scope ))yield*_ (Effect .sleep ("3 seconds"))console .log ("The innermost scope is about to be closed.")})))yield*_ (Effect .sleep ("5 seconds"))console .log ("The outer scope is about to be closed.")}))Effect .runPromise (program )
ts
import {Effect ,Console ,Schedule } from "effect"constbarJob =Effect .repeat (Console .log ("Still running!"),Schedule .fixed ("1 seconds"))constprogram =Effect .scoped (Effect .gen (function* (_ ) {constscope = yield*_ (Effect .scope )yield*_ (Effect .scoped (Effect .gen (function* (_ ) {yield*_ (Effect .forkIn (barJob ,scope ))yield*_ (Effect .sleep ("3 seconds"))console .log ("The innermost scope is about to be closed.")})))yield*_ (Effect .sleep ("5 seconds"))console .log ("The outer scope is about to be closed.")}))Effect .runPromise (program )
The output:
Still running!Still running!Still running!The innermost scope is about to be closed.Still running!Still running!Still running!Still running!Still running!Still running!The outer scope is about to be closed.
Still running!Still running!Still running!The innermost scope is about to be closed.Still running!Still running!Still running!Still running!Still running!Still running!The outer scope is about to be closed.
When do Fibers run?
New fibers begin execution after the current fiber completes or yields. This is necessary to prevent infinite loops in some cases, and is useful to know when using the fork
APIs.
In the following example the SubscriptionRef
changes
stream only contains a single value 2
because the fiber (created by fork
) to run the stream is started after the value has been updated.
ts
import {Effect ,SubscriptionRef ,Stream ,Console } from "effect"constprogram =Effect .gen (function* (_ ) {constref = yield*_ (SubscriptionRef .make (0))yield*_ (ref .changes ,Stream .tap ((n ) =>Console .log (`SubscriptionRef changed to ${n }`)),Stream .runDrain ,Effect .fork )yield*_ (SubscriptionRef .set (ref , 1))yield*_ (SubscriptionRef .set (ref , 2))})Effect .runPromise (program )/*Output:SubscriptionRef changed to 2*/
ts
import {Effect ,SubscriptionRef ,Stream ,Console } from "effect"constprogram =Effect .gen (function* (_ ) {constref = yield*_ (SubscriptionRef .make (0))yield*_ (ref .changes ,Stream .tap ((n ) =>Console .log (`SubscriptionRef changed to ${n }`)),Stream .runDrain ,Effect .fork )yield*_ (SubscriptionRef .set (ref , 1))yield*_ (SubscriptionRef .set (ref , 2))})Effect .runPromise (program )/*Output:SubscriptionRef changed to 2*/
If we add Effect.yieldNow()
to force the current fiber to yield then the stream will contain all values 0
, 1
, and 2
because the fiber running the stream has an opportunity to start before the value is changed.
ts
import {Effect ,SubscriptionRef ,Stream ,Console } from "effect"constprogram =Effect .gen (function* (_ ) {constref = yield*_ (SubscriptionRef .make (0))yield*_ (ref .changes ,Stream .tap ((n ) =>Console .log (`SubscriptionRef changed to ${n }`)),Stream .runDrain ,Effect .fork )yield*_ (Effect .yieldNow ())yield*_ (SubscriptionRef .set (ref , 1))yield*_ (SubscriptionRef .set (ref , 2))})Effect .runPromise (program )/*Output:SubscriptionRef changed to 0SubscriptionRef changed to 1SubscriptionRef changed to 2*/
ts
import {Effect ,SubscriptionRef ,Stream ,Console } from "effect"constprogram =Effect .gen (function* (_ ) {constref = yield*_ (SubscriptionRef .make (0))yield*_ (ref .changes ,Stream .tap ((n ) =>Console .log (`SubscriptionRef changed to ${n }`)),Stream .runDrain ,Effect .fork )yield*_ (Effect .yieldNow ())yield*_ (SubscriptionRef .set (ref , 1))yield*_ (SubscriptionRef .set (ref , 2))})Effect .runPromise (program )/*Output:SubscriptionRef changed to 0SubscriptionRef changed to 1SubscriptionRef changed to 2*/