EffectTalk
Back to Tour
concurrencyIntermediate

Concurrency Pattern 4: Distribute Work with Queue

Use Queue to decouple producers and consumers, enabling work distribution, pipeline stages, and backpressure handling across concurrent fibers.

Guideline

When multiple fibers need to coordinate work asynchronously, use Queue:

  • Producers add items (enqueue)
  • Consumers remove and process items (dequeue)
  • Backpressure built-in: producers wait if queue is full
  • Decoupling: Producers don't block on consumer speed

Queue variants: bounded (size limit), unbounded (unlimited), dropping (discards on overflow).


Rationale

Direct producer-consumer coordination creates problems:

  • Blocking: Producer waits for consumer to finish
  • Tight coupling: Producer depends on consumer speed
  • Memory pressure: Fast producer floods memory with results
  • No backpressure: Downstream overload propagates upstream

Queue solves these:

  • Asynchronous: Producer enqueues and continues
  • Decoupled: Producer/consumer independent
  • Backpressure: Producer waits when queue full (natural flow control)
  • Throughput: Consumer processes at own pace

Real-world example: API request handler + database writer

  • Direct: Handler waits for DB write (blocking, slow requests)
  • Queue: Handler enqueues write and returns immediately (responsive)

Good Example

This example demonstrates a producer-consumer pipeline with a bounded queue for buffering work items.

import { Effect, Queue, Fiber, Ref } from "effect";

interface WorkItem {
  readonly id: number;
  readonly data: string;
  readonly timestamp: number;
}

interface WorkResult {
  readonly itemId: number;
  readonly processed: string;
  readonly duration: number;
}

// Producer: generates work items
const producer = (
  queue: Queue.Enqueue<WorkItem>,
  count: number
): Effect.Effect<void> =>
  Effect.gen(function* () {
    yield* Effect.log(`[PRODUCER] Starting, generating ${count} items`);

    for (let i = 1; i <= count; i++) {
      const item: WorkItem = {
        id: i,
        data: `Item ${i}`,
        timestamp: Date.now(),
      };

      const start = Date.now();

      // Enqueue - will block if queue is full (backpressure)
      yield* Queue.offer(queue, item);

      const delay = Date.now() - start;

      if (delay > 0) {
        yield* Effect.log(
          `[PRODUCER] Item ${i} enqueued (waited ${delay}ms due to backpressure)`
        );
      } else {
        yield* Effect.log(`[PRODUCER] Item ${i} enqueued`);
      }

      // Simulate work
      yield* Effect.sleep("50 millis");
    }

    yield* Effect.log(`[PRODUCER] ✓ All items enqueued`);
  });

// Consumer: processes work items
const consumer = (
  queue: Queue.Dequeue<WorkItem>,
  consumerId: number,
  results: Ref.Ref<WorkResult[]>
): Effect.Effect<void> =>
  Effect.gen(function* () {
    yield* Effect.log(`[CONSUMER ${consumerId}] Starting`);

    while (true) {
      // Dequeue - will block if queue is empty
      const item = yield* Queue.take(queue).pipe(Effect.either);

      if (item._tag === "Left") {
        yield* Effect.log(`[CONSUMER ${consumerId}] Queue closed, stopping`);
        return;
      }

      const workItem = item.right;
      const startTime = Date.now();

      yield* Effect.log(
        `[CONSUMER ${consumerId}] Processing ${workItem.data}`
      );

      // Simulate processing
      yield* Effect.sleep("150 millis");

      const duration = Date.now() - startTime;
      const result: WorkResult = {
        itemId: workItem.id,
        processed: `${workItem.data} [processed by consumer ${consumerId}]`,
        duration,
      };

      yield* Ref.update(results, (rs) => [...rs, result]);

      yield* Effect.log(
        `[CONSUMER ${consumerId}] ✓ Completed ${workItem.data} in ${duration}ms`
      );
    }
  });

// Main: coordinate producer and consumers
const program = Effect.gen(function* () {
  // Create bounded queue with capacity 3
  const queue = yield* Queue.bounded<WorkItem>(3);
  const results = yield* Ref.make<WorkResult[]>([]);

  console.log(`\n[MAIN] Starting producer-consumer pipeline with queue size 3\n`);

  // Spawn producer
  const producerFiber = yield* producer(queue, 10).pipe(Effect.fork);

  // Spawn 2 consumers
  const consumer1 = yield* consumer(queue, 1, results).pipe(Effect.fork);
  const consumer2 = yield* consumer(queue, 2, results).pipe(Effect.fork);

  // Wait for producer to finish
  yield* Fiber.join(producerFiber);

  // Give consumers time to finish
  yield* Effect.sleep("3 seconds");

  // Close queue and wait for consumers
  yield* Queue.shutdown(queue);
  yield* Fiber.join(consumer1);
  yield* Fiber.join(consumer2);

  // Summary
  const allResults = yield* Ref.get(results);
  const totalDuration = allResults.reduce((sum, r) => sum + r.duration, 0);

  console.log(`\n[SUMMARY]`);
  console.log(`  Items processed: ${allResults.length}`);
  console.log(
    `  Avg processing time: ${Math.round(totalDuration / allResults.length)}ms`
  );
});

Effect.runPromise(program);

This pattern:

  1. Creates bounded queue with capacity (backpressure point)
  2. Producer enqueues items (blocks if full)
  3. Consumers dequeue and process (each at own pace)
  4. Queue coordinates flow automatically

Advanced: Dynamic Consumer Pool

Scale consumer count based on queue depth:

const adaptiveConsumerPool = (
  queue: Queue.Dequeue<WorkItem>,
  maxConsumers: number
) =>
  Effect.gen(function* () {
    const activeConsumers = yield* Ref.make(1);
    const metrics = yield* Ref.make({
      itemsProcessed: 0,
      queueDepth: 0,
      avgProcessTime: 0,
    });

    // Monitor queue and scale consumers
    const scaler = Effect.gen(function* () {
      while (true) {
        yield* Effect.sleep("500 millis");

        const depth = yield* Queue.size(queue);
        const consumers = yield* Ref.get(activeConsumers);

        // Scale up if queue building
        if (depth > consumers * 2 && consumers < maxConsumers) {
          yield* Ref.update(activeConsumers, (c) => c + 1);
          yield* Effect.log(
            `[SCALER] Increased to ${consumers + 1} consumers (queue depth: ${depth})`
          );
        }

        // Scale down if queue draining
        if (depth < consumers / 2 && consumers > 1) {
          yield* Ref.update(activeConsumers, (c) => c - 1);
          yield* Effect.log(
            `[SCALER] Decreased to ${consumers - 1} consumers (queue depth: ${depth})`
          );
        }
      }
    });

    return scaler;
  });

Advanced: Priority Queue with Multiple Queues

Separate queues for different priority levels:

interface PriorityWorkItem extends WorkItem {
  readonly priority: number; // Higher = more important
}

const createPriorityQueueSystem = () =>
  Effect.gen(function* () {
    // High priority queue (size 10), normal queue (size 50)
    const highPriorityQueue = yield* Queue.bounded<PriorityWorkItem>(10);
    const normalQueue = yield* Queue.bounded<PriorityWorkItem>(50);

    const enqueueWithPriority = (item: PriorityWorkItem) =>
      Effect.gen(function* () {
        if (item.priority > 5) {
          yield* Queue.offer(highPriorityQueue, item);
          yield* Effect.log(
            `[PRIORITY] Item ${item.id} enqueued to HIGH priority queue`
          );
        } else {
          yield* Queue.offer(normalQueue, item);
          yield* Effect.log(
            `[PRIORITY] Item ${item.id} enqueued to NORMAL queue`
          );
        }
      });

    // Consumer prioritizes high-priority queue
    const priorityConsumer = (consumerId: number) =>
      Effect.gen(function* () {
        while (true) {
          // Try high priority first
          const highItem = yield* Queue.poll(highPriorityQueue);

          if (highItem._tag === "Some") {
            yield* Effect.log(
              `[CONSUMER ${consumerId}] Processing HIGH priority item ${highItem.value.id}`
            );
            yield* Effect.sleep("100 millis");
            continue;
          }

          // Fall back to normal priority
          const normalItem = yield* Queue.poll(normalQueue);

          if (normalItem._tag === "Some") {
            yield* Effect.log(
              `[CONSUMER ${consumerId}] Processing NORMAL priority item ${normalItem.value.id}`
            );
            yield* Effect.sleep("100 millis");
            continue;
          }

          // No items available, wait a bit
          yield* Effect.sleep("100 millis");
        }
      });

    return {
      enqueueWithPriority,
      priorityConsumer,
      highPriorityQueue,
      normalQueue,
    };
  });

Advanced: Queue with Batch Processing

Dequeue multiple items and process as batch:

const batchConsumer = (
  queue: Queue.Dequeue<WorkItem>,
  batchSize: number,
  processBatch: (items: WorkItem[]) => Effect.Effect<void>
): Effect.Effect<void> =>
  Effect.gen(function* () {
    while (true) {
      const batch: WorkItem[] = [];

      // Collect up to batchSize items (with timeout)
      for (let i = 0; i < batchSize; i++) {
        const item = yield* Queue.take(queue).pipe(
          Effect.timeout("100 millis"),
          Effect.either
        );

        if (item._tag === "Left") {
          // Timeout - process whatever we have
          break;
        }

        batch.push(item.right);
      }

      if (batch.length > 0) {
        yield* Effect.log(
          `[BATCH] Processing ${batch.length} items`
        );

        yield* processBatch(batch);

        yield* Effect.log(
          `[BATCH] ✓ Completed batch of ${batch.length}`
        );
      } else {
        // No items, wait before retrying
        yield* Effect.sleep("100 millis");
      }
    }
  });

Advanced: Async Pipeline with Multiple Stages

Chain multiple queue-based processing stages:

interface PipelineStage<I, O> {
  readonly name: string;
  readonly inputQueue: Queue.Dequeue<I>;
  readonly outputQueue: Queue.Enqueue<O>;
  readonly process: (item: I) => Effect.Effect<O>;
}

const createPipeline = <I, O>(
  inputQueue: Queue.Dequeue<I>,
  stages: Array<(item: I) => Effect.Effect<I>>,
  outputQueue: Queue.Enqueue<O>
): Effect.Effect<void> =>
  Effect.gen(function* () {
    while (true) {
      const item = yield* Queue.take(inputQueue);
      let current: any = item;

      // Process through all stages
      for (const stage of stages) {
        current = yield* stage(current);
      }

      yield* Queue.offer(outputQueue, current);
    }
  });

// Example: 3-stage pipeline
const pipelineExample = Effect.gen(function* () {
  const inputQueue = yield* Queue.bounded<string>(10);
  const outputQueue = yield* Queue.bounded<string>(10);

  const stages = [
    (item: string) =>
      Effect.gen(function* () {
        yield* Effect.log(`[STAGE 1] Validating: ${item}`);
        return item.toUpperCase();
      }),

    (item: string) =>
      Effect.gen(function* () {
        yield* Effect.log(`[STAGE 2] Enriching: ${item}`);
        return `${item}-ENRICHED`;
      }),

    (item: string) =>
      Effect.gen(function* () {
        yield* Effect.log(`[STAGE 3] Formatting: ${item}`);
        return `[${item}]`;
      }),
  ];

  yield* createPipeline(inputQueue, stages, outputQueue);
});

When to Use This Pattern

Use Queue when:

  • Decoupling producers from consumers
  • Implementing pipeline stages
  • Handling backpressure naturally
  • Work distribution across fibers
  • Async work buffering
  • Load balancing across workers

⚠️ Trade-offs:

  • Queue copy overhead for large items
  • Memory usage grows with queue size
  • Bounded queues block producers
  • Unbounded queues can exhaust memory

Queue Type Guide

Type Behavior Use Case
bounded(n) Backpressure when full Producer/consumer control
unbounded() Grows unbounded Unbounded work streams
dropping() Discards on overflow High-frequency events (sampling)
sliding() Removes oldest on overflow Sliding window buffering

See Also