EffectTalk
Back to Tour
core-conceptsIntermediate

Process Streaming Data with Stream

Use Stream<A, E, R> to represent and process data that arrives over time, such as file reads, WebSocket messages, or paginated API results.

Guideline

When dealing with a sequence of data that arrives asynchronously, model it as a Stream. A Stream<A, E, R> is like an asynchronous, effectful Array. It represents a sequence of values of type A that may fail with an error E and requires services R.


Rationale

Some data sources don't fit the one-shot request/response model of Effect. For example:

  • Reading a multi-gigabyte file from disk.
  • Receiving messages from a WebSocket.
  • Fetching results from a paginated API.

Loading all this data into memory at once would be inefficient or impossible. Stream solves this by allowing you to process the data in chunks as it arrives. It provides a rich API of composable operators (map, filter, run, etc.) that mirror those on Effect and Array, but are designed for streaming data. This allows you to build efficient, constant-memory data processing pipelines.


Good Example

This example demonstrates creating a Stream from a paginated API. The Stream will make API calls as needed, processing one page of users at a time without ever holding the entire user list in memory.

import { Effect, Stream, Option } from "effect";

interface User {
  id: number;
  name: string;
}
interface PaginatedResponse {
  users: User[];
  nextPage: number | null;
}

// A mock API call that returns a page of users
const fetchUserPage = (
  page: number
): Effect.Effect&#x3C;PaginatedResponse, "ApiError"> =>
  Effect.succeed(
    page &#x3C; 3
      ? {
          users: [
            { id: page * 2 + 1, name: `User ${page * 2 + 1}` },
            { id: page * 2 + 2, name: `User ${page * 2 + 2}` },
          ],
          nextPage: page + 1,
        }
      : { users: [], nextPage: null }
  ).pipe(Effect.delay("50 millis"));

// Stream.paginateEffect creates a stream from a paginated source
const userStream: Stream.Stream&#x3C;User, "ApiError"> = Stream.paginateEffect(
  0,
  (page) =>
    fetchUserPage(page).pipe(
      Effect.map(
        (response) =>
          [response.users, Option.fromNullable(response.nextPage)] as const
      )
    )
).pipe(
  // Flatten the stream of user arrays into a stream of individual users
  Stream.flatMap((users) => Stream.fromIterable(users))
);

// We can now process the stream of users.
// Stream.runForEach will pull from the stream until it's exhausted.
const program = Stream.runForEach(userStream, (user: User) =>
  Effect.log(`Processing user: ${user.name}`)
);

const programWithErrorHandling = program.pipe(
  Effect.catchAll((error) =>
    Effect.gen(function* () {
      yield* Effect.logError(`Stream processing error: ${error}`);
      return null;
    })
  )
);

Effect.runPromise(programWithErrorHandling);

Anti-Pattern

Manually managing pagination state with recursive functions. This is complex, stateful, and easy to get wrong. It also requires loading all results into memory, which is inefficient for large datasets.

import { Effect } from "effect";
import { fetchUserPage } from "./somewhere"; // From previous example

// ❌ WRONG: Manual, stateful, and inefficient recursion.
const fetchAllUsers = (
  page: number,
  acc: any[]
): Effect.Effect&#x3C;any[], "ApiError"> =>
  fetchUserPage(page).pipe(
    Effect.flatMap((response) => {
      const allUsers = [...acc, ...response.users];
      if (response.nextPage) {
        return fetchAllUsers(response.nextPage, allUsers);
      }
      return Effect.succeed(allUsers);
    })
  );

// This holds all users in memory at once.
const program = fetchAllUsers(0, []);