SubscriptionRef
A SubscriptionRef<A>
is a specialized form of a SynchronizedRef
. It allows us to subscribe and receive updates on the current value and any changes made to that value.
ts
export interface SubscriptionRef<A> extends Synchronized.SynchronizedRef<A> {/*** A stream containing the current value of the `Ref` as well as all changes* to that value.*/readonly changes: Stream<A>}
ts
export interface SubscriptionRef<A> extends Synchronized.SynchronizedRef<A> {/*** A stream containing the current value of the `Ref` as well as all changes* to that value.*/readonly changes: Stream<A>}
You can perform all the usual operations on a SubscriptionRef
, such as get
, set
, or modify
to work with the current value.
The changes
stream is where the magic happens. It lets you observe the current value and all subsequent changes. Each time you run this stream, you'll get the current value as of that moment and any changes that occurred afterward.
To create a SubscriptionRef
, you can use the make
constructor, specifying the initial value:
ts
import {SubscriptionRef } from "effect"constref =SubscriptionRef .make (0)
ts
import {SubscriptionRef } from "effect"constref =SubscriptionRef .make (0)
A SubscriptionRef
can be invaluable when modeling shared state, especially when multiple observers need to react to every change in that shared state. For example, in a functional reactive programming context, the SubscriptionRef
value might represent a part of the application state, and each observer could update various user interface elements based on changes to that state.
To see how this works, let's create a simple example where a "server" repeatedly updates a value observed by multiple "clients":
ts
import {Ref ,Effect } from "effect"constserver = (ref :Ref .Ref <number>) =>Ref .update (ref , (n ) =>n + 1).pipe (Effect .forever )
ts
import {Ref ,Effect } from "effect"constserver = (ref :Ref .Ref <number>) =>Ref .update (ref , (n ) =>n + 1).pipe (Effect .forever )
Note that the server
function operates on a regular Ref
and doesn't need to know about SubscriptionRef
. It simply updates a value.
ts
import {Ref ,Effect ,Stream ,Random } from "effect"constserver = (ref :Ref .Ref <number>) =>Ref .update (ref , (n ) =>n + 1).pipe (Effect .forever )constclient = (changes :Stream .Stream <number>) =>Effect .gen (function* (_ ) {constn = yield*_ (Random .nextIntBetween (1, 10))constchunk = yield*_ (Stream .runCollect (Stream .take (changes ,n )))returnchunk })
ts
import {Ref ,Effect ,Stream ,Random } from "effect"constserver = (ref :Ref .Ref <number>) =>Ref .update (ref , (n ) =>n + 1).pipe (Effect .forever )constclient = (changes :Stream .Stream <number>) =>Effect .gen (function* (_ ) {constn = yield*_ (Random .nextIntBetween (1, 10))constchunk = yield*_ (Stream .runCollect (Stream .take (changes ,n )))returnchunk })
Similarly, the client
function only works with a Stream
of values and doesn't concern itself with the source of these values.
To tie everything together, we start the server, launch multiple client instances in parallel, and then shut down the server when we're finished. We also create the SubscriptionRef
in this process.
ts
import {Ref ,Effect ,Stream ,Random ,SubscriptionRef ,Fiber } from "effect"constserver = (ref :Ref .Ref <number>) =>Ref .update (ref , (n ) =>n + 1).pipe (Effect .forever )constclient = (changes :Stream .Stream <number>) =>Effect .gen (function* (_ ) {constn = yield*_ (Random .nextIntBetween (1, 10))constchunk = yield*_ (Stream .runCollect (Stream .take (changes ,n )))returnchunk })constprogram =Effect .gen (function* (_ ) {constref = yield*_ (SubscriptionRef .make (0))constserverFiber = yield*_ (Effect .fork (server (ref )))constclients = newArray (5).fill (null).map (() =>client (ref .changes ))constchunks = yield*_ (Effect .all (clients , {concurrency : "unbounded" }))yield*_ (Fiber .interrupt (serverFiber ))for (constchunk ofchunks ) {console .log (chunk )}})Effect .runPromise (program )/*Output:{_id: "Chunk",values: [ 2, 3, 4 ]}{_id: "Chunk",values: [ 2 ]}{_id: "Chunk",values: [ 2, 3, 4, 5, 6, 7 ]}{_id: "Chunk",values: [ 2, 3, 4 ]}{_id: "Chunk",values: [ 2, 3, 4, 5, 6, 7, 8, 9 ]}*/
ts
import {Ref ,Effect ,Stream ,Random ,SubscriptionRef ,Fiber } from "effect"constserver = (ref :Ref .Ref <number>) =>Ref .update (ref , (n ) =>n + 1).pipe (Effect .forever )constclient = (changes :Stream .Stream <number>) =>Effect .gen (function* (_ ) {constn = yield*_ (Random .nextIntBetween (1, 10))constchunk = yield*_ (Stream .runCollect (Stream .take (changes ,n )))returnchunk })constprogram =Effect .gen (function* (_ ) {constref = yield*_ (SubscriptionRef .make (0))constserverFiber = yield*_ (Effect .fork (server (ref )))constclients = newArray (5).fill (null).map (() =>client (ref .changes ))constchunks = yield*_ (Effect .all (clients , {concurrency : "unbounded" }))yield*_ (Fiber .interrupt (serverFiber ))for (constchunk ofchunks ) {console .log (chunk )}})Effect .runPromise (program )/*Output:{_id: "Chunk",values: [ 2, 3, 4 ]}{_id: "Chunk",values: [ 2 ]}{_id: "Chunk",values: [ 2, 3, 4, 5, 6, 7 ]}{_id: "Chunk",values: [ 2, 3, 4 ]}{_id: "Chunk",values: [ 2, 3, 4, 5, 6, 7, 8, 9 ]}*/
This setup ensures that each client observes the current value when it starts and receives all subsequent changes to the value.
Since the changes are represented as streams, you can easily build more complex programs using familiar stream operators. You can transform, filter, or merge these streams with other streams to achieve more sophisticated behavior.