Technology · Claude API

Intermediate

Claude API Streaming

A quick reference for streaming Claude responses with SSE events, text deltas, and the finalMessage helper.

TL;DR
  1. 01Use stream helpers to avoid raw SSE parsing in most cases.
  2. 02Iterate text_delta events to print tokens as they arrive.
  3. 03Call get_final_message() after the stream to get usage and stop_reason.

Why Stream

  • Streaming returns tokens as they generate — users see output immediately.

  • Use streaming for any response that may take more than a second to complete.

  • Streaming prevents request timeouts for long outputs or high max_tokens values.

  • The SDK stream helper manages the SSE connection and event loop for you.

  • You can still get the complete message object after streaming ends via get_final_message().

Stream With the Helper

  • Use client.messages.stream() as a context manager in Python.

    with client.messages.stream(
        model="claude-opus-4-8",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Write a haiku about APIs."}],
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
    
  • Call stream.get_final_message() after the loop to get the complete response.

    with client.messages.stream(...) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
    
    final = stream.get_final_message()
    print(final.usage.output_tokens)
    
  • In TypeScript, use stream() and iterate textStream with for await.

    const stream = await client.messages.stream({
        model: "claude-opus-4-8",
        max_tokens: 1024,
        messages: [{ role: "user", content: "Write a haiku about APIs." }],
    });
    for await (const text of stream.textStream) {
        process.stdout.write(text);
    }
    const final = await stream.finalMessage();
    
  • Use stream.finalMessage() (TypeScript) or stream.get_final_message() (Python) — same concept, different name.

  • Wrap the stream in a try/finally block to ensure it always closes on error.

    try:
        with client.messages.stream(...) as stream:
            for text in stream.text_stream:
                print(text, end="", flush=True)
    except anthropic.APIError as e:
        print(f"Stream error: {e}")
    

Raw SSE Events

  • Use client.messages.create(stream=True) to handle raw SSE events manually.

    with client.messages.create(
        model="claude-opus-4-8",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Hello"}],
        stream=True,
    ) as stream:
        for event in stream:
            print(event.type, event)
    
  • Key event types and when they fire:

    Event type When it fires Key fields
    message_start Stream opens message.id, usage.input_tokens
    content_block_start New content block starts index, content_block.type
    content_block_delta Token arrives delta.type, delta.text
    content_block_stop Block complete index
    message_delta Message metadata update delta.stop_reason, usage.output_tokens
    message_stop Stream ends
  • Filter for text_delta events to extract streamed text tokens.

    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)
    
  • message_delta carries stop_reason and final output_tokens — read it at stream end.

    for event in stream:
        if event.type == "message_delta":
            print(event.delta.stop_reason)    # end_turn | max_tokens | tool_use
            print(event.usage.output_tokens)
    
  • Thinking blocks stream as thinking_delta events — same pattern as text_delta.

    if event.delta.type == "thinking_delta":
        print(event.delta.thinking, end="")
    

Streaming in a Web App

  • In a Next.js API route, pipe the Anthropic stream directly into a ReadableStream.

    export async function POST(req: Request) {
        const stream = await client.messages.stream({
            model: "claude-opus-4-8",
            max_tokens: 1024,
            messages: [{ role: "user", content: await req.text() }],
        });
        return new Response(stream.toReadableStream());
    }
    
  • On the client, read the stream with ReadableStream and a TextDecoder.

    const res = await fetch("/api/chat", { method: "POST", body: prompt });
    const reader = res.body!.getReader();
    const decoder = new TextDecoder();
    while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        console.log(decoder.decode(value));
    }
    
  • Use the Vercel AI SDK's streamText helper for a higher-level abstraction in Next.js.

  • Set a generous server-side timeout — long completions can take 60+ seconds.

Tip: Use stream.text_stream (Python) or stream.textStream (TypeScript) for simple text streaming — reserve raw SSE for cases where you need event metadata.

Warning: Do not await the entire response before streaming to the client — this defeats the purpose of streaming and risks timeout errors.

Claude Prompt CachingClaude Thinking and Effort