Guideline
To run a stream purely for its side effects without accumulating the results in memory, use the Stream.runDrain sink.
Rationale
Not all pipelines are designed to produce a final list of values. Often, the goal is to perform an action for each item—write it to a database, send it to a message queue, or log it to a file. In these "fire and forget" scenarios, collecting the results is not just unnecessary; it's a performance anti-pattern.
Stream.runDrain is the perfect tool for this job:
- Memory Efficiency: This is its primary advantage.
runDrainprocesses each item and then immediately discards it, resulting in constant, minimal memory usage. This makes it the only safe choice for processing extremely large or infinite streams. - Clarity of Intent: Using
runDrainclearly communicates that you are interested in the successful execution of the stream's effects, not in its output values. The finalEffectit produces resolves tovoid, reinforcing that no value is returned. - Performance: By avoiding the overhead of allocating and managing a growing list in memory,
runDraincan be faster for pipelines with a very large number of small items.
Good Example
This example creates a stream of tasks. For each task, it performs a side effect (logging it as "complete"). Stream.runDrain executes the pipeline, ensuring all logs are written, but without collecting the void results of each logging operation.
import { Effect, Stream } from "effect";
const tasks = ["task 1", "task 2", "task 3"];
// A function that performs a side effect for a task
const completeTask = (task: string): Effect.Effect<void, never> =>
Effect.log(`Completing ${task}`);
const program = Stream.fromIterable(tasks).pipe(
// For each task, run the side-effectful operation
Stream.mapEffect(completeTask, { concurrency: 1 }),
// Run the stream for its effects, discarding the `void` results
Stream.runDrain
);
const programWithLogging = Effect.gen(function* () {
yield* program;
yield* Effect.log("\nAll tasks have been processed.");
});
Effect.runPromise(programWithLogging);
/*
Output:
... level=INFO msg="Completing task 1"
... level=INFO msg="Completing task 2"
... level=INFO msg="Completing task 3"
All tasks have been processed.
*/
Anti-Pattern
The anti-pattern is using Stream.runCollect when you only care about the side effects. This needlessly consumes memory and can lead to crashes.
import { Effect, Stream } from "effect";
// ... same tasks and completeTask function ...
const program = Stream.fromIterable(tasks).pipe(
Stream.mapEffect(completeTask, { concurrency: 1 }),
// Anti-pattern: Collecting results that we are just going to ignore
Stream.runCollect
);
Effect.runPromise(program).then((results) => {
// The `results` variable here is a Chunk of `[void, void, void]`.
// It served no purpose but consumed memory.
console.log(
`\nAll tasks processed. Unnecessarily collected ${results.length} empty results.`
);
});
While this works for a small array of three items, it's a dangerous habit. If the tasks array contained millions of items, this code would create a Chunk with millions of void values, consuming a significant amount of memory for no reason and potentially crashing the application. Stream.runDrain avoids this problem entirely.