Semaphore
On this page
A semaphore, in the context of programming, is a synchronization mechanism that allows you to control access to a shared resource. In Effect, semaphores are used to manage access to resources or coordinate tasks in an asynchronous and concurrent environment. Let's dive into the concept of semaphores and how they work in Effect.
What is a Semaphore?
A semaphore is a generalization of a mutex. It has a certain number of permits, which can be held and released concurrently by different parties. Think of permits as tickets that allow entities (e.g., tasks or fibers) to access a shared resource or perform a specific operation. If there are no permits available and an entity tries to acquire one, it will be suspended until a permit becomes available.
Let's take a look at an example using asynchronous tasks:
ts
import {Effect } from "effect"consttask =Effect .gen (function* (_ ) {yield*_ (Effect .log ("start"))yield*_ (Effect .sleep ("2 seconds"))yield*_ (Effect .log ("end"))})constsemTask = (sem :Effect .Semaphore ) =>sem .withPermits (1)(task )constsemTaskSeq = (sem :Effect .Semaphore ) =>[1, 2, 3].map (() =>semTask (sem ).pipe (Effect .withLogSpan ("elapsed")))constprogram =Effect .gen (function* (_ ) {constmutex = yield*_ (Effect .makeSemaphore (1))yield*_ (Effect .all (semTaskSeq (mutex ), {concurrency : "unbounded" }))})Effect .runPromise (program )/*Output:timestamp=... level=INFO fiber=#1 message=start elapsed=3mstimestamp=... level=INFO fiber=#1 message=end elapsed=2010mstimestamp=... level=INFO fiber=#2 message=start elapsed=2012mstimestamp=... level=INFO fiber=#2 message=end elapsed=4017mstimestamp=... level=INFO fiber=#3 message=start elapsed=4018mstimestamp=... level=INFO fiber=#3 message=end elapsed=6026ms*/
ts
import {Effect } from "effect"consttask =Effect .gen (function* (_ ) {yield*_ (Effect .log ("start"))yield*_ (Effect .sleep ("2 seconds"))yield*_ (Effect .log ("end"))})constsemTask = (sem :Effect .Semaphore ) =>sem .withPermits (1)(task )constsemTaskSeq = (sem :Effect .Semaphore ) =>[1, 2, 3].map (() =>semTask (sem ).pipe (Effect .withLogSpan ("elapsed")))constprogram =Effect .gen (function* (_ ) {constmutex = yield*_ (Effect .makeSemaphore (1))yield*_ (Effect .all (semTaskSeq (mutex ), {concurrency : "unbounded" }))})Effect .runPromise (program )/*Output:timestamp=... level=INFO fiber=#1 message=start elapsed=3mstimestamp=... level=INFO fiber=#1 message=end elapsed=2010mstimestamp=... level=INFO fiber=#2 message=start elapsed=2012mstimestamp=... level=INFO fiber=#2 message=end elapsed=4017mstimestamp=... level=INFO fiber=#3 message=start elapsed=4018mstimestamp=... level=INFO fiber=#3 message=end elapsed=6026ms*/
Here, we synchronize and control the execution of asynchronous tasks using a semaphore with one permit. When all permits are in use, additional tasks attempting to acquire permits will wait until some become available.
In another scenario, we create a semaphore with five permits. We then utilize withPermits(n)
to acquire and release varying numbers of permits for each task:
ts
import {Effect } from "effect"constprogram =Effect .gen (function* (_ ) {constsem = yield*_ (Effect .makeSemaphore (5))yield*_ (Effect .forEach ([1, 2, 3, 4, 5],(n ) =>sem .withPermits (n )(Effect .delay (Effect .log (`process: ${n }`), "2 seconds")).pipe (Effect .withLogSpan ("elasped")),{concurrency : "unbounded" }))})Effect .runPromise (program )/*Output:timestamp=... level=INFO fiber=#1 message="process: 1" elasped=2011mstimestamp=... level=INFO fiber=#2 message="process: 2" elasped=2017mstimestamp=... level=INFO fiber=#3 message="process: 3" elasped=4020mstimestamp=... level=INFO fiber=#4 message="process: 4" elasped=6025mstimestamp=... level=INFO fiber=#5 message="process: 5" elasped=8034ms*/
ts
import {Effect } from "effect"constprogram =Effect .gen (function* (_ ) {constsem = yield*_ (Effect .makeSemaphore (5))yield*_ (Effect .forEach ([1, 2, 3, 4, 5],(n ) =>sem .withPermits (n )(Effect .delay (Effect .log (`process: ${n }`), "2 seconds")).pipe (Effect .withLogSpan ("elasped")),{concurrency : "unbounded" }))})Effect .runPromise (program )/*Output:timestamp=... level=INFO fiber=#1 message="process: 1" elasped=2011mstimestamp=... level=INFO fiber=#2 message="process: 2" elasped=2017mstimestamp=... level=INFO fiber=#3 message="process: 3" elasped=4020mstimestamp=... level=INFO fiber=#4 message="process: 4" elasped=6025mstimestamp=... level=INFO fiber=#5 message="process: 5" elasped=8034ms*/
In this example, we show that you can acquire and release any number of permits with withPermits(n)
. This flexibility allows for precise control over concurrency.
One crucial aspect to remember is that withPermits
ensures that each acquisition is matched with an equivalent number of releases, regardless of whether the task succeeds, fails, or gets interrupted.