Semaphore

On this page

A semaphore, in the context of programming, is a synchronization mechanism that allows you to control access to a shared resource. In Effect, semaphores are used to manage access to resources or coordinate tasks in an asynchronous and concurrent environment. Let's dive into the concept of semaphores and how they work in Effect.

What is a Semaphore?

A semaphore is a generalization of a mutex. It has a certain number of permits, which can be held and released concurrently by different parties. Think of permits as tickets that allow entities (e.g., tasks or fibers) to access a shared resource or perform a specific operation. If there are no permits available and an entity tries to acquire one, it will be suspended until a permit becomes available.

Let's take a look at an example using asynchronous tasks:

mutex.ts
ts
import { Effect } from "effect"
 
const task = Effect.gen(function* (_) {
yield* _(Effect.log("start"))
yield* _(Effect.sleep("2 seconds"))
yield* _(Effect.log("end"))
})
 
const semTask = (sem: Effect.Semaphore) => sem.withPermits(1)(task)
 
const semTaskSeq = (sem: Effect.Semaphore) =>
[1, 2, 3].map(() => semTask(sem).pipe(Effect.withLogSpan("elapsed")))
 
const program = Effect.gen(function* (_) {
const mutex = yield* _(Effect.makeSemaphore(1))
yield* _(Effect.all(semTaskSeq(mutex), { concurrency: "unbounded" }))
})
 
Effect.runPromise(program)
/*
Output:
timestamp=... level=INFO fiber=#1 message=start elapsed=3ms
timestamp=... level=INFO fiber=#1 message=end elapsed=2010ms
timestamp=... level=INFO fiber=#2 message=start elapsed=2012ms
timestamp=... level=INFO fiber=#2 message=end elapsed=4017ms
timestamp=... level=INFO fiber=#3 message=start elapsed=4018ms
timestamp=... level=INFO fiber=#3 message=end elapsed=6026ms
*/
mutex.ts
ts
import { Effect } from "effect"
 
const task = Effect.gen(function* (_) {
yield* _(Effect.log("start"))
yield* _(Effect.sleep("2 seconds"))
yield* _(Effect.log("end"))
})
 
const semTask = (sem: Effect.Semaphore) => sem.withPermits(1)(task)
 
const semTaskSeq = (sem: Effect.Semaphore) =>
[1, 2, 3].map(() => semTask(sem).pipe(Effect.withLogSpan("elapsed")))
 
const program = Effect.gen(function* (_) {
const mutex = yield* _(Effect.makeSemaphore(1))
yield* _(Effect.all(semTaskSeq(mutex), { concurrency: "unbounded" }))
})
 
Effect.runPromise(program)
/*
Output:
timestamp=... level=INFO fiber=#1 message=start elapsed=3ms
timestamp=... level=INFO fiber=#1 message=end elapsed=2010ms
timestamp=... level=INFO fiber=#2 message=start elapsed=2012ms
timestamp=... level=INFO fiber=#2 message=end elapsed=4017ms
timestamp=... level=INFO fiber=#3 message=start elapsed=4018ms
timestamp=... level=INFO fiber=#3 message=end elapsed=6026ms
*/

Here, we synchronize and control the execution of asynchronous tasks using a semaphore with one permit. When all permits are in use, additional tasks attempting to acquire permits will wait until some become available.

In another scenario, we create a semaphore with five permits. We then utilize withPermits(n) to acquire and release varying numbers of permits for each task:

ts
import { Effect } from "effect"
 
const program = Effect.gen(function* (_) {
const sem = yield* _(Effect.makeSemaphore(5))
 
yield* _(
Effect.forEach(
[1, 2, 3, 4, 5],
(n) =>
sem
.withPermits(n)(
Effect.delay(Effect.log(`process: ${n}`), "2 seconds")
)
.pipe(Effect.withLogSpan("elasped")),
{ concurrency: "unbounded" }
)
)
})
 
Effect.runPromise(program)
/*
Output:
timestamp=... level=INFO fiber=#1 message="process: 1" elasped=2011ms
timestamp=... level=INFO fiber=#2 message="process: 2" elasped=2017ms
timestamp=... level=INFO fiber=#3 message="process: 3" elasped=4020ms
timestamp=... level=INFO fiber=#4 message="process: 4" elasped=6025ms
timestamp=... level=INFO fiber=#5 message="process: 5" elasped=8034ms
*/
ts
import { Effect } from "effect"
 
const program = Effect.gen(function* (_) {
const sem = yield* _(Effect.makeSemaphore(5))
 
yield* _(
Effect.forEach(
[1, 2, 3, 4, 5],
(n) =>
sem
.withPermits(n)(
Effect.delay(Effect.log(`process: ${n}`), "2 seconds")
)
.pipe(Effect.withLogSpan("elasped")),
{ concurrency: "unbounded" }
)
)
})
 
Effect.runPromise(program)
/*
Output:
timestamp=... level=INFO fiber=#1 message="process: 1" elasped=2011ms
timestamp=... level=INFO fiber=#2 message="process: 2" elasped=2017ms
timestamp=... level=INFO fiber=#3 message="process: 3" elasped=4020ms
timestamp=... level=INFO fiber=#4 message="process: 4" elasped=6025ms
timestamp=... level=INFO fiber=#5 message="process: 5" elasped=8034ms
*/

In this example, we show that you can acquire and release any number of permits with withPermits(n). This flexibility allows for precise control over concurrency.

One crucial aspect to remember is that withPermits ensures that each acquisition is matched with an equivalent number of releases, regardless of whether the task succeeds, fails, or gets interrupted.