Guideline
When dealing with a sequence of data that arrives asynchronously, model it as a Stream. A Stream<A, E, R> is like an asynchronous, effectful Array. It represents a sequence of values of type A that may fail with an error E and requires services R.
Rationale
Some data sources don't fit the one-shot request/response model of Effect. For example:
- Reading a multi-gigabyte file from disk.
- Receiving messages from a WebSocket.
- Fetching results from a paginated API.
Loading all this data into memory at once would be inefficient or impossible. Stream solves this by allowing you to process the data in chunks as it arrives. It provides a rich API of composable operators (map, filter, run, etc.) that mirror those on Effect and Array, but are designed for streaming data. This allows you to build efficient, constant-memory data processing pipelines.
Good Example
This example demonstrates creating a Stream from a paginated API. The Stream will make API calls as needed, processing one page of users at a time without ever holding the entire user list in memory.
import { Effect, Stream, Option } from "effect";
interface User {
id: number;
name: string;
}
interface PaginatedResponse {
users: User[];
nextPage: number | null;
}
// A mock API call that returns a page of users
const fetchUserPage = (
page: number
): Effect.Effect<PaginatedResponse, "ApiError"> =>
Effect.succeed(
page < 3
? {
users: [
{ id: page * 2 + 1, name: `User ${page * 2 + 1}` },
{ id: page * 2 + 2, name: `User ${page * 2 + 2}` },
],
nextPage: page + 1,
}
: { users: [], nextPage: null }
).pipe(Effect.delay("50 millis"));
// Stream.paginateEffect creates a stream from a paginated source
const userStream: Stream.Stream<User, "ApiError"> = Stream.paginateEffect(
0,
(page) =>
fetchUserPage(page).pipe(
Effect.map(
(response) =>
[response.users, Option.fromNullable(response.nextPage)] as const
)
)
).pipe(
// Flatten the stream of user arrays into a stream of individual users
Stream.flatMap((users) => Stream.fromIterable(users))
);
// We can now process the stream of users.
// Stream.runForEach will pull from the stream until it's exhausted.
const program = Stream.runForEach(userStream, (user: User) =>
Effect.log(`Processing user: ${user.name}`)
);
const programWithErrorHandling = program.pipe(
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.logError(`Stream processing error: ${error}`);
return null;
})
)
);
Effect.runPromise(programWithErrorHandling);
Anti-Pattern
Manually managing pagination state with recursive functions. This is complex, stateful, and easy to get wrong. It also requires loading all results into memory, which is inefficient for large datasets.
import { Effect } from "effect";
import { fetchUserPage } from "./somewhere"; // From previous example
// ❌ WRONG: Manual, stateful, and inefficient recursion.
const fetchAllUsers = (
page: number,
acc: any[]
): Effect.Effect<any[], "ApiError"> =>
fetchUserPage(page).pipe(
Effect.flatMap((response) => {
const allUsers = [...acc, ...response.users];
if (response.nextPage) {
return fetchAllUsers(response.nextPage, allUsers);
}
return Effect.succeed(allUsers);
})
);
// This holds all users in memory at once.
const program = fetchAllUsers(0, []);