Guideline
When multiple fibers need to coordinate work asynchronously, use Queue:
- Producers add items (enqueue)
- Consumers remove and process items (dequeue)
- Backpressure built-in: producers wait if queue is full
- Decoupling: Producers don't block on consumer speed
Queue variants: bounded (size limit), unbounded (unlimited), dropping (discards on overflow).
Rationale
Direct producer-consumer coordination creates problems:
- Blocking: Producer waits for consumer to finish
- Tight coupling: Producer depends on consumer speed
- Memory pressure: Fast producer floods memory with results
- No backpressure: Downstream overload propagates upstream
Queue solves these:
- Asynchronous: Producer enqueues and continues
- Decoupled: Producer/consumer independent
- Backpressure: Producer waits when queue full (natural flow control)
- Throughput: Consumer processes at own pace
Real-world example: API request handler + database writer
- Direct: Handler waits for DB write (blocking, slow requests)
- Queue: Handler enqueues write and returns immediately (responsive)
Good Example
This example demonstrates a producer-consumer pipeline with a bounded queue for buffering work items.
import { Effect, Queue, Fiber, Ref } from "effect";
interface WorkItem {
readonly id: number;
readonly data: string;
readonly timestamp: number;
}
interface WorkResult {
readonly itemId: number;
readonly processed: string;
readonly duration: number;
}
// Producer: generates work items
const producer = (
queue: Queue.Enqueue<WorkItem>,
count: number
): Effect.Effect<void> =>
Effect.gen(function* () {
yield* Effect.log(`[PRODUCER] Starting, generating ${count} items`);
for (let i = 1; i <= count; i++) {
const item: WorkItem = {
id: i,
data: `Item ${i}`,
timestamp: Date.now(),
};
const start = Date.now();
// Enqueue - will block if queue is full (backpressure)
yield* Queue.offer(queue, item);
const delay = Date.now() - start;
if (delay > 0) {
yield* Effect.log(
`[PRODUCER] Item ${i} enqueued (waited ${delay}ms due to backpressure)`
);
} else {
yield* Effect.log(`[PRODUCER] Item ${i} enqueued`);
}
// Simulate work
yield* Effect.sleep("50 millis");
}
yield* Effect.log(`[PRODUCER] ✓ All items enqueued`);
});
// Consumer: processes work items
const consumer = (
queue: Queue.Dequeue<WorkItem>,
consumerId: number,
results: Ref.Ref<WorkResult[]>
): Effect.Effect<void> =>
Effect.gen(function* () {
yield* Effect.log(`[CONSUMER ${consumerId}] Starting`);
while (true) {
// Dequeue - will block if queue is empty
const item = yield* Queue.take(queue).pipe(Effect.either);
if (item._tag === "Left") {
yield* Effect.log(`[CONSUMER ${consumerId}] Queue closed, stopping`);
return;
}
const workItem = item.right;
const startTime = Date.now();
yield* Effect.log(
`[CONSUMER ${consumerId}] Processing ${workItem.data}`
);
// Simulate processing
yield* Effect.sleep("150 millis");
const duration = Date.now() - startTime;
const result: WorkResult = {
itemId: workItem.id,
processed: `${workItem.data} [processed by consumer ${consumerId}]`,
duration,
};
yield* Ref.update(results, (rs) => [...rs, result]);
yield* Effect.log(
`[CONSUMER ${consumerId}] ✓ Completed ${workItem.data} in ${duration}ms`
);
}
});
// Main: coordinate producer and consumers
const program = Effect.gen(function* () {
// Create bounded queue with capacity 3
const queue = yield* Queue.bounded<WorkItem>(3);
const results = yield* Ref.make<WorkResult[]>([]);
console.log(`\n[MAIN] Starting producer-consumer pipeline with queue size 3\n`);
// Spawn producer
const producerFiber = yield* producer(queue, 10).pipe(Effect.fork);
// Spawn 2 consumers
const consumer1 = yield* consumer(queue, 1, results).pipe(Effect.fork);
const consumer2 = yield* consumer(queue, 2, results).pipe(Effect.fork);
// Wait for producer to finish
yield* Fiber.join(producerFiber);
// Give consumers time to finish
yield* Effect.sleep("3 seconds");
// Close queue and wait for consumers
yield* Queue.shutdown(queue);
yield* Fiber.join(consumer1);
yield* Fiber.join(consumer2);
// Summary
const allResults = yield* Ref.get(results);
const totalDuration = allResults.reduce((sum, r) => sum + r.duration, 0);
console.log(`\n[SUMMARY]`);
console.log(` Items processed: ${allResults.length}`);
console.log(
` Avg processing time: ${Math.round(totalDuration / allResults.length)}ms`
);
});
Effect.runPromise(program);
This pattern:
- Creates bounded queue with capacity (backpressure point)
- Producer enqueues items (blocks if full)
- Consumers dequeue and process (each at own pace)
- Queue coordinates flow automatically
Advanced: Dynamic Consumer Pool
Scale consumer count based on queue depth:
const adaptiveConsumerPool = (
queue: Queue.Dequeue<WorkItem>,
maxConsumers: number
) =>
Effect.gen(function* () {
const activeConsumers = yield* Ref.make(1);
const metrics = yield* Ref.make({
itemsProcessed: 0,
queueDepth: 0,
avgProcessTime: 0,
});
// Monitor queue and scale consumers
const scaler = Effect.gen(function* () {
while (true) {
yield* Effect.sleep("500 millis");
const depth = yield* Queue.size(queue);
const consumers = yield* Ref.get(activeConsumers);
// Scale up if queue building
if (depth > consumers * 2 && consumers < maxConsumers) {
yield* Ref.update(activeConsumers, (c) => c + 1);
yield* Effect.log(
`[SCALER] Increased to ${consumers + 1} consumers (queue depth: ${depth})`
);
}
// Scale down if queue draining
if (depth < consumers / 2 && consumers > 1) {
yield* Ref.update(activeConsumers, (c) => c - 1);
yield* Effect.log(
`[SCALER] Decreased to ${consumers - 1} consumers (queue depth: ${depth})`
);
}
}
});
return scaler;
});
Advanced: Priority Queue with Multiple Queues
Separate queues for different priority levels:
interface PriorityWorkItem extends WorkItem {
readonly priority: number; // Higher = more important
}
const createPriorityQueueSystem = () =>
Effect.gen(function* () {
// High priority queue (size 10), normal queue (size 50)
const highPriorityQueue = yield* Queue.bounded<PriorityWorkItem>(10);
const normalQueue = yield* Queue.bounded<PriorityWorkItem>(50);
const enqueueWithPriority = (item: PriorityWorkItem) =>
Effect.gen(function* () {
if (item.priority > 5) {
yield* Queue.offer(highPriorityQueue, item);
yield* Effect.log(
`[PRIORITY] Item ${item.id} enqueued to HIGH priority queue`
);
} else {
yield* Queue.offer(normalQueue, item);
yield* Effect.log(
`[PRIORITY] Item ${item.id} enqueued to NORMAL queue`
);
}
});
// Consumer prioritizes high-priority queue
const priorityConsumer = (consumerId: number) =>
Effect.gen(function* () {
while (true) {
// Try high priority first
const highItem = yield* Queue.poll(highPriorityQueue);
if (highItem._tag === "Some") {
yield* Effect.log(
`[CONSUMER ${consumerId}] Processing HIGH priority item ${highItem.value.id}`
);
yield* Effect.sleep("100 millis");
continue;
}
// Fall back to normal priority
const normalItem = yield* Queue.poll(normalQueue);
if (normalItem._tag === "Some") {
yield* Effect.log(
`[CONSUMER ${consumerId}] Processing NORMAL priority item ${normalItem.value.id}`
);
yield* Effect.sleep("100 millis");
continue;
}
// No items available, wait a bit
yield* Effect.sleep("100 millis");
}
});
return {
enqueueWithPriority,
priorityConsumer,
highPriorityQueue,
normalQueue,
};
});
Advanced: Queue with Batch Processing
Dequeue multiple items and process as batch:
const batchConsumer = (
queue: Queue.Dequeue<WorkItem>,
batchSize: number,
processBatch: (items: WorkItem[]) => Effect.Effect<void>
): Effect.Effect<void> =>
Effect.gen(function* () {
while (true) {
const batch: WorkItem[] = [];
// Collect up to batchSize items (with timeout)
for (let i = 0; i < batchSize; i++) {
const item = yield* Queue.take(queue).pipe(
Effect.timeout("100 millis"),
Effect.either
);
if (item._tag === "Left") {
// Timeout - process whatever we have
break;
}
batch.push(item.right);
}
if (batch.length > 0) {
yield* Effect.log(
`[BATCH] Processing ${batch.length} items`
);
yield* processBatch(batch);
yield* Effect.log(
`[BATCH] ✓ Completed batch of ${batch.length}`
);
} else {
// No items, wait before retrying
yield* Effect.sleep("100 millis");
}
}
});
Advanced: Async Pipeline with Multiple Stages
Chain multiple queue-based processing stages:
interface PipelineStage<I, O> {
readonly name: string;
readonly inputQueue: Queue.Dequeue<I>;
readonly outputQueue: Queue.Enqueue<O>;
readonly process: (item: I) => Effect.Effect<O>;
}
const createPipeline = <I, O>(
inputQueue: Queue.Dequeue<I>,
stages: Array<(item: I) => Effect.Effect<I>>,
outputQueue: Queue.Enqueue<O>
): Effect.Effect<void> =>
Effect.gen(function* () {
while (true) {
const item = yield* Queue.take(inputQueue);
let current: any = item;
// Process through all stages
for (const stage of stages) {
current = yield* stage(current);
}
yield* Queue.offer(outputQueue, current);
}
});
// Example: 3-stage pipeline
const pipelineExample = Effect.gen(function* () {
const inputQueue = yield* Queue.bounded<string>(10);
const outputQueue = yield* Queue.bounded<string>(10);
const stages = [
(item: string) =>
Effect.gen(function* () {
yield* Effect.log(`[STAGE 1] Validating: ${item}`);
return item.toUpperCase();
}),
(item: string) =>
Effect.gen(function* () {
yield* Effect.log(`[STAGE 2] Enriching: ${item}`);
return `${item}-ENRICHED`;
}),
(item: string) =>
Effect.gen(function* () {
yield* Effect.log(`[STAGE 3] Formatting: ${item}`);
return `[${item}]`;
}),
];
yield* createPipeline(inputQueue, stages, outputQueue);
});
When to Use This Pattern
✅ Use Queue when:
- Decoupling producers from consumers
- Implementing pipeline stages
- Handling backpressure naturally
- Work distribution across fibers
- Async work buffering
- Load balancing across workers
⚠️ Trade-offs:
- Queue copy overhead for large items
- Memory usage grows with queue size
- Bounded queues block producers
- Unbounded queues can exhaust memory
Queue Type Guide
| Type | Behavior | Use Case |
|---|---|---|
bounded(n) |
Backpressure when full | Producer/consumer control |
unbounded() |
Grows unbounded | Unbounded work streams |
dropping() |
Discards on overflow | High-frequency events (sampling) |
sliding() |
Removes oldest on overflow | Sliding window buffering |
See Also
- Decouple Fibers with Queue/PubSub - Queue basics
- Concurrency Pattern 3: Coordinate with Latch - Multi-fiber sync
- Concurrency Pattern 2: Rate Limit with Semaphore - Resource limiting
- Run Background Tasks with Fork - Background fibers