Queue
On this page
A Queue
is a lightweight in-memory queue built on Effect with composable and transparent back-pressure.
It is fully asynchronous (no locks or blocking), purely-functional and type-safe.
Basic Operations
A Queue<A>
stores values of type A
and provides two fundamental operations:
Queue.offer
: This operation adds a value of typeA
to theQueue
.Queue.take
: It removes and returns the oldest value from theQueue
.
Here's an example demonstrating these basic operations:
ts
import {Effect ,Queue } from "effect"constprogram =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(100))yield*_ (Queue .offer (queue , 1)) // Add 1 to the queueconstvalue = yield*_ (Queue .take (queue )) // Retrieve and remove the oldest valuereturnvalue })Effect .runPromise (program ).then (console .log ) // Output: 1
ts
import {Effect ,Queue } from "effect"constprogram =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(100))yield*_ (Queue .offer (queue , 1)) // Add 1 to the queueconstvalue = yield*_ (Queue .take (queue )) // Retrieve and remove the oldest valuereturnvalue })Effect .runPromise (program ).then (console .log ) // Output: 1
Creating a Queue
A Queue
can have bounded (limited capacity) or unbounded storage. Depending on your requirements, you can choose from various strategies to handle new values when the queue reaches its capacity.
Bounded Queue
A bounded queue provides back-pressure when it's full. This means that if the queue is full, any attempt to add more items will be suspended until there's space available.
ts
import {Queue } from "effect"// Creating a bounded queue with a capacity of 100constboundedQueue =Queue .bounded <number>(100)
ts
import {Queue } from "effect"// Creating a bounded queue with a capacity of 100constboundedQueue =Queue .bounded <number>(100)
Dropping Queue
A dropping queue simply drops new items when it's full. It doesn't wait for space to become available.
ts
import {Queue } from "effect"// Creating a dropping queue with a capacity of 100constdroppingQueue =Queue .dropping <number>(100)
ts
import {Queue } from "effect"// Creating a dropping queue with a capacity of 100constdroppingQueue =Queue .dropping <number>(100)
Sliding Queue
A sliding queue removes old items when it's full to accommodate new ones.
ts
import {Queue } from "effect"// Creating a sliding queue with a capacity of 100constslidingQueue =Queue .sliding <number>(100)
ts
import {Queue } from "effect"// Creating a sliding queue with a capacity of 100constslidingQueue =Queue .sliding <number>(100)
Unbounded Queue
An unbounded queue has no capacity limit.
ts
import {Queue } from "effect"// Creating an unbounded queueconstunboundedQueue =Queue .unbounded <number>()
ts
import {Queue } from "effect"// Creating an unbounded queueconstunboundedQueue =Queue .unbounded <number>()
Adding Items to a Queue
To add a value to the queue, you can use the Queue.offer
operation:
ts
import {Effect ,Queue } from "effect"constprogram =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(100))yield*_ (Queue .offer (queue , 1)) // put 1 in the queue})
ts
import {Effect ,Queue } from "effect"constprogram =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(100))yield*_ (Queue .offer (queue , 1)) // put 1 in the queue})
If you're using a back-pressured queue and it's full, the offer
operation might suspend. In such cases, you can use Effect.fork
to wait in a different execution context (fiber).
ts
import {Effect ,Queue ,Fiber } from "effect"constprogram =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(1))yield*_ (Queue .offer (queue , 1))constfiber = yield*_ (Effect .fork (Queue .offer (queue , 2))) // will be suspended because the queue is fullyield*_ (Queue .take (queue ))yield*_ (Fiber .join (fiber ))})
ts
import {Effect ,Queue ,Fiber } from "effect"constprogram =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(1))yield*_ (Queue .offer (queue , 1))constfiber = yield*_ (Effect .fork (Queue .offer (queue , 2))) // will be suspended because the queue is fullyield*_ (Queue .take (queue ))yield*_ (Fiber .join (fiber ))})
You can also add multiple values at once using Queue.offerAll
:
ts
import {Effect ,Queue ,ReadonlyArray } from "effect"constprogram =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(100))constitems =ReadonlyArray .range (1, 10)yield*_ (Queue .offerAll (queue ,items ))return yield*_ (Queue .size (queue ))})Effect .runPromise (program ).then (console .log ) // Output: 10
ts
import {Effect ,Queue ,ReadonlyArray } from "effect"constprogram =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(100))constitems =ReadonlyArray .range (1, 10)yield*_ (Queue .offerAll (queue ,items ))return yield*_ (Queue .size (queue ))})Effect .runPromise (program ).then (console .log ) // Output: 10
Consuming Items from a Queue
The Queue.take
operation removes the oldest item from the queue and returns it. If the queue is empty, it will suspend and resume only when an item is added to the queue. You can also use Effect.fork
to wait for the value in a different execution context (fiber).
ts
import {Effect ,Queue ,Fiber } from "effect"constoldestItem =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <string>(100))constfiber = yield*_ (Effect .fork (Queue .take (queue ))) // will be suspended because the queue is emptyyield*_ (Queue .offer (queue , "something"))constvalue = yield*_ (Fiber .join (fiber ))returnvalue })Effect .runPromise (oldestItem ).then (console .log ) // Output: something
ts
import {Effect ,Queue ,Fiber } from "effect"constoldestItem =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <string>(100))constfiber = yield*_ (Effect .fork (Queue .take (queue ))) // will be suspended because the queue is emptyyield*_ (Queue .offer (queue , "something"))constvalue = yield*_ (Fiber .join (fiber ))returnvalue })Effect .runPromise (oldestItem ).then (console .log ) // Output: something
You can retrieve the first item using Queue.poll
. If the queue is empty, you'll get None
; otherwise, the top item will be wrapped in Some
.
ts
import {Effect ,Queue } from "effect"constpolled =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(100))yield*_ (Queue .offer (queue , 10))yield*_ (Queue .offer (queue , 20))consthead = yield*_ (Queue .poll (queue ))returnhead })Effect .runPromise (polled ).then (console .log )/*Output:{_id: "Option",_tag: "Some",value: 10}*/
ts
import {Effect ,Queue } from "effect"constpolled =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(100))yield*_ (Queue .offer (queue , 10))yield*_ (Queue .offer (queue , 20))consthead = yield*_ (Queue .poll (queue ))returnhead })Effect .runPromise (polled ).then (console .log )/*Output:{_id: "Option",_tag: "Some",value: 10}*/
You can retrieve multiple items at once using Queue.takeUpTo
. If the queue doesn't have enough items to return, it will return all the available items without waiting for more offers.
ts
import {Effect ,Queue } from "effect"constpolled =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(100))yield*_ (Queue .offer (queue , 10))yield*_ (Queue .offer (queue , 20))yield*_ (Queue .offer (queue , 30))constchunk = yield*_ (Queue .takeUpTo (queue , 2))returnchunk })Effect .runPromise (polled ).then (console .log )/*Output:{_id: "Chunk",values: [ 10, 20 ]}*/
ts
import {Effect ,Queue } from "effect"constpolled =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(100))yield*_ (Queue .offer (queue , 10))yield*_ (Queue .offer (queue , 20))yield*_ (Queue .offer (queue , 30))constchunk = yield*_ (Queue .takeUpTo (queue , 2))returnchunk })Effect .runPromise (polled ).then (console .log )/*Output:{_id: "Chunk",values: [ 10, 20 ]}*/
Similarly, you can retrieve all items at once using Queue.takeAll
. It returns immediately, providing an empty collection if the queue is empty.
ts
import {Effect ,Queue } from "effect"constpolled =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(100))yield*_ (Queue .offer (queue , 10))yield*_ (Queue .offer (queue , 20))yield*_ (Queue .offer (queue , 30))constchunk = yield*_ (Queue .takeAll (queue ))returnchunk })Effect .runPromise (polled ).then (console .log )/*Output:{_id: "Chunk",values: [ 10, 20, 30 ]}*/
ts
import {Effect ,Queue } from "effect"constpolled =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(100))yield*_ (Queue .offer (queue , 10))yield*_ (Queue .offer (queue , 20))yield*_ (Queue .offer (queue , 30))constchunk = yield*_ (Queue .takeAll (queue ))returnchunk })Effect .runPromise (polled ).then (console .log )/*Output:{_id: "Chunk",values: [ 10, 20, 30 ]}*/
Shutting Down a Queue
With Queue.shutdown
, you can interrupt all fibers that are suspended on offer*
or take*
. It also empties the queue and causes all future offer*
and take*
calls to terminate immediately.
ts
import {Effect ,Queue ,Fiber } from "effect"constprogram =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(3))constfiber = yield*_ (Effect .fork (Queue .take (queue )))yield*_ (Queue .shutdown (queue )) // will interrupt fiberyield*_ (Fiber .join (fiber )) // will terminate})
ts
import {Effect ,Queue ,Fiber } from "effect"constprogram =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(3))constfiber = yield*_ (Effect .fork (Queue .take (queue )))yield*_ (Queue .shutdown (queue )) // will interrupt fiberyield*_ (Fiber .join (fiber )) // will terminate})
You can use Queue.awaitShutdown
to execute an effect when the queue is shut down. This function waits until the queue is shut down, and if it's already shut down, it resumes immediately.
ts
import {Effect ,Queue ,Fiber ,Console } from "effect"constprogram =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(3))constfiber = yield*_ (Effect .fork (Queue .awaitShutdown (queue ).pipe (Effect .flatMap (() =>Console .log ("shutting down")))))yield*_ (Queue .shutdown (queue ))yield*_ (Fiber .join (fiber ))})Effect .runPromise (program ) // Output: shutting down
ts
import {Effect ,Queue ,Fiber ,Console } from "effect"constprogram =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .bounded <number>(3))constfiber = yield*_ (Effect .fork (Queue .awaitShutdown (queue ).pipe (Effect .flatMap (() =>Console .log ("shutting down")))))yield*_ (Queue .shutdown (queue ))yield*_ (Fiber .join (fiber ))})Effect .runPromise (program ) // Output: shutting down
Offer-only / Take-only Queues
In some situations, you may need specific parts of your code to have exclusive capabilities, such as only offering values (Enqueue
) or only taking values (Dequeue
) from a queue. Effect provides a straightforward way to achieve this.
All operations related to offering values are defined by the Enqueue
interface. Here's an example of how to use it:
ts
import {Queue } from "effect"constsend = (offerOnlyQueue :Queue .Enqueue <number>,value : number) => {// This enqueue can only be used to offer values// @ts-expect-errorQueue .take (offerOnlyQueue )// OkreturnQueue .offer (offerOnlyQueue ,value )}
ts
import {Queue } from "effect"constsend = (offerOnlyQueue :Queue .Enqueue <number>,value : number) => {// This enqueue can only be used to offer values// @ts-expect-errorQueue .take (offerOnlyQueue )// OkreturnQueue .offer (offerOnlyQueue ,value )}
Similarly, all operations related to taking values are defined by the Dequeue
interface. Here's an example:
ts
import {Queue } from "effect"constreceive = (takeOnlyQueue :Queue .Dequeue <number>) => {// This dequeue can only be used to take values// @ts-expect-errorQueue .offer (takeOnlyQueue , 1)// OkreturnQueue .take (takeOnlyQueue )}
ts
import {Queue } from "effect"constreceive = (takeOnlyQueue :Queue .Dequeue <number>) => {// This dequeue can only be used to take values// @ts-expect-errorQueue .offer (takeOnlyQueue , 1)// OkreturnQueue .take (takeOnlyQueue )}
The Queue
type extends both Enqueue
and Dequeue
, allowing you to conveniently pass it to different parts of your code where you want to enforce either Enqueue
or Dequeue
behavior:
ts
import {Effect ,Queue } from "effect"constsend = (offerOnlyQueue :Queue .Enqueue <number>,value : number) => {returnQueue .offer (offerOnlyQueue ,value )}constreceive = (takeOnlyQueue :Queue .Dequeue <number>) => {returnQueue .take (takeOnlyQueue )}constprogram =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .unbounded <number>())// Offer values to the queueyield*_ (send (queue , 1))yield*_ (send (queue , 2))// Take values from the queueconsole .log (yield*_ (receive (queue )))console .log (yield*_ (receive (queue )))})Effect .runPromise (program )/*Output:12*/
ts
import {Effect ,Queue } from "effect"constsend = (offerOnlyQueue :Queue .Enqueue <number>,value : number) => {returnQueue .offer (offerOnlyQueue ,value )}constreceive = (takeOnlyQueue :Queue .Dequeue <number>) => {returnQueue .take (takeOnlyQueue )}constprogram =Effect .gen (function* (_ ) {constqueue = yield*_ (Queue .unbounded <number>())// Offer values to the queueyield*_ (send (queue , 1))yield*_ (send (queue , 2))// Take values from the queueconsole .log (yield*_ (receive (queue )))console .log (yield*_ (receive (queue )))})Effect .runPromise (program )/*Output:12*/