Skip to content

Streaming input closes the CLI control channel (stdin) mid-conversation — in-process MCP tool calls die with "Stream closed" #348

@OoO256

Description

@OoO256

Description

The SDK closes the CLI's stdin — the bidirectional control channel that carries in-process MCP tool results, hook callbacks, and canUseTool responses — the moment the prompt input iterable completes, as long as any result has already been seen. It does not check whether the CLI is still mid-turn or has unsettled background tasks. Any in-process MCP tool call made after that point fails with Stream closed; the host-side handler is never invoked, and the side effect is silently lost.

It shows up in two ordinary situations:

  1. The last message's turn loses its tools (Repro A). The final message asks the model for an in-process tool call; the input iterable ends right after it's sent, stdin closes, and the tool call dies a few seconds later. Deterministic.
  2. A background-task continuation loses its tools (Repro B). A run_in_background task settles after stdin is already closed; the CLI continues from the task notification, and that turn's tool calls die the same way. This is the common real-world shape.

Once it fires it is unrecoverable from the host: query.reconnectMcpServer() throws Query closed before response received / ProcessTransport is not ready for writing.

Reproductions

Everything here is reproducible from a self-contained repo — both repros, a model-free deterministic harness, and the fix: https://github.com/OoO256/claude-agent-sdk-stream-closed-repro

npm install && npm run patch flips the harness from CHANNEL_CLOSED to CHANNEL_ALIVE, with no model and no network.

Repro A — minimal, deterministic (no background tasks, ~15s)

npm i @anthropic-ai/claude-agent-sdk@0.3.165 zod
ANTHROPIC_API_KEY=... node repro.mjs

Two ordinary messages over streaming input: (1) "Reply READY" → a result is emitted; (2) "Now call record_result" → the app has nothing more to send, so the input iterable completes — exactly how any finite streaming-input script ends. The model, mid-turn on message 2, calls the tool.

Expected: the handler runs; the model reports the tool's output.
Actual (7/7 on 0.3.165, 2/2 on 0.3.153 — identical trace):

[result #1]                          ← message 1 answered
[tool_result ERROR] Stream closed    ← message 2's turn: input had just exhausted → SDK closed stdin
[result #2]
---
results=2 handlerInvocations=0 streamClosedErrors=1

handlerInvocations=0 shows the failure is in the SDK↔CLI bridge, not in the tool. It is deterministic because the race is structural: endInput() fires within milliseconds of the iterable completing, while the model needs seconds to reach its tool call — the teardown always wins. The input shape is the documented Streaming Input pattern ("Default & Recommended"), an async generator that awaits between messages exactly like the docs' own example; and completing the iterable is the only way to let query() terminate (keeping it open hangs forever).

repro.mjs (self-contained)
/**
 * Repro: the SDK closes the CLI's stdin (control channel) the moment the
 * input iterable completes — even though the CLI is still processing the
 * message the app just sent. Any in-process SDK MCP tool call in that turn
 * fails with "Stream closed"; the app's handler is never invoked.
 *
 * Scenario (minimal multi-turn streaming-input app — documented pattern):
 *   - msg 1: "Reply READY"            → model replies; a `result` is emitted
 *   - msg 2: "call record_result"     → the app has nothing more to send,
 *     so the input iterable completes right after yielding it
 *   - SDK: input exhausted + a result already seen → endInput() → stdin closed
 *   - the model — mid-turn on msg 2 — calls `record_result`
 *   → "Stream closed". handlerInvocations stays 0; the outcome is silently lost.
 *
 * No background tasks, no timers, no delays. ~15s, any model.
 *
 * Setup:  npm i @anthropic-ai/claude-agent-sdk@0.3.165 zod
 * Run:    ANTHROPIC_API_KEY=... node repro.mjs
 */
import { query, createSdkMcpServer, tool } from "@anthropic-ai/claude-agent-sdk";
import { z } from "zod";

// ---- the in-process MCP tool the app exposes --------------------------------
let handlerInvocations = 0;
const server = createSdkMcpServer({
  name: "app",
  tools: [
    tool(
      "record_result",
      "Persists the final outcome of a job. Call when work completes.",
      { summary: z.string().describe("one-line outcome summary") },
      async (args) => {
        handlerInvocations++;
        return { content: [{ type: "text", text: `recorded: ${args.summary}` }] };
      },
    ),
  ],
});

// ---- streaming input: two messages, the second sent after the first reply ---
const MSG_1 = "Reply with exactly: READY. Do not call any tools.";
const MSG_2 =
  "Now call mcp__app__record_result once with summary='hello' and report what it returns.";

const userMsg = (text) => ({
  type: "user",
  message: { role: "user", content: [{ type: "text", text }] },
  parent_tool_use_id: null,
});

let releaseFollowUp;
const firstResultSeen = new Promise((resolve) => (releaseFollowUp = resolve));

async function* input() {
  yield userMsg(MSG_1);
  await firstResultSeen; // send the follow-up only after the model has answered
  yield userMsg(MSG_2);
  // generator returns → input exhausted, like any app that has nothing more to send
}

// ---- run ---------------------------------------------------------------------
const streamClosedErrors = [];
let recorded = false;
let resultCount = 0;

const timeout = setTimeout(() => {
  console.log("TIMEOUT after 120s");
  process.exit(2);
}, 120_000);

for await (const msg of query({
  prompt: input(),
  options: {
    mcpServers: { app: server },
    model: "claude-haiku-4-5", // reproduces with any model
    settingSources: [],        // hermetic — no user/project settings
    permissionMode: "bypassPermissions",
    allowDangerouslySkipPermissions: true,
  },
})) {
  if (msg.type === "result") {
    resultCount++;
    console.log(`[result #${resultCount}]`);
    releaseFollowUp();
  } else if (msg.type === "user" && Array.isArray(msg.message?.content)) {
    for (const blk of msg.message.content) {
      if (blk.type !== "tool_result") continue;
      const text = Array.isArray(blk.content)
        ? blk.content.map((c) => c?.text ?? "").join(" ")
        : String(blk.content ?? "");
      if (blk.is_error) {
        console.log(`[tool_result ERROR] ${text.slice(0, 90)}`);
        if (/stream closed/i.test(text)) streamClosedErrors.push(text);
      } else if (/recorded:/.test(text)) {
        console.log(`[tool_result OK] ${text.slice(0, 90)}`);
        recorded = true;
      }
    }
  }
}
clearTimeout(timeout);

console.log("---");
console.log(`results=${resultCount} handlerInvocations=${handlerInvocations} streamClosedErrors=${streamClosedErrors.length}`);
if (streamClosedErrors.length > 0 && handlerInvocations === 0) {
  console.log("VERDICT: REPRODUCED — record_result died in the bridge; app handler never ran, outcome silently lost");
} else if (recorded) {
  console.log("VERDICT: NOT REPRODUCED — tool call succeeded");
} else {
  console.log("VERDICT: INCONCLUSIVE");
}

Repro B — background-task continuation (the production shape)

Same setup, but message 1 launches a background agent that calls a ~25s in-process tool (generate_result), and message 2 asks to record_result when it finishes.

Actual (3/3 on 0.3.165):

[task_started] task=a829...
[generate_result] handler started +8s     ← called while the channel is still alive → succeeds
[generate_result] handler finished +33s
[task_notification] task=a829... completed
[result #1]
[tool_result ERROR] Stream closed         ← record_result, in the post-teardown continuation → dies
[result #2]
---
results=2 handlerInvocations=0 streamClosedErrors=1

generate_result succeeds (called pre-teardown) while record_result dies (called in the continuation) — pinning the teardown moment between the two calls.

repro-background.mjs (self-contained)
/**
 * Repro B (background-continuation variant): the real-world shape.
 *
 * An in-process MCP tool call made in a background-task continuation — after
 * the SDK has torn down the control channel — fails with "Stream closed".
 *
 *   - msg 1: launch a background agent that calls `generate_result` (a ~25s tool)
 *   - msg 2: "when the job completes, call `record_result`" → model acks; input exhausts
 *   - SDK: input exhausted + a result already seen → endInput() → stdin closed
 *   - ~25s later the agent settles → the CLI continues from the task notification
 *     and calls `record_result`
 *   → "Stream closed"; record_result's handler is NEVER invoked (handlerInvocations=0).
 *
 * Note the trace shows `generate_result` *succeeds* (its handler runs +9s→+34s):
 * it was called while the channel was still alive. record_result, called in the
 * post-teardown continuation, is the one that dies — pinning the teardown moment
 * between the two calls. This is the flavor that hit us in production.
 *
 * Setup:  npm i @anthropic-ai/claude-agent-sdk@0.3.165 zod
 * Run:    ANTHROPIC_API_KEY=... node repro-background.mjs
 */
import { query, createSdkMcpServer, tool } from "@anthropic-ai/claude-agent-sdk";
import { z } from "zod";

// ---- in-process MCP tools the app exposes -----------------------------------
const t0 = Date.now();
const at = () => `+${Math.round((Date.now() - t0) / 1000)}s`;
let handlerInvocations = 0;
const server = createSdkMcpServer({
  name: "app",
  tools: [
    tool(
      "generate_result",
      "Generates the result of a job. Always takes about 25 seconds.",
      { job: z.string().describe("what to generate") },
      async (args) => {
        console.log(`[generate_result] handler started ${at()}`);
        await new Promise((resolve) => setTimeout(resolve, 25_000));
        console.log(`[generate_result] handler finished ${at()}`);
        return { content: [{ type: "text", text: `generated: result for ${args.job}` }] };
      },
    ),
    tool(
      "record_result",
      "Persists the final outcome of a job. Call when work completes.",
      { summary: z.string().describe("one-line outcome summary") },
      async (args) => {
        handlerInvocations++;
        return { content: [{ type: "text", text: `recorded: ${args.summary}` }] };
      },
    ),
  ],
});

// ---- streaming input: two messages, the second sent after the first result --
const userMsg = (text) => ({
  type: "user",
  message: { role: "user", content: [{ type: "text", text }] },
  parent_tool_use_id: null,
});

let releaseFollowUp;
const firstResultSeen = new Promise((resolve) => (releaseFollowUp = resolve));

async function* input() {
  yield userMsg(MSG_1);
  await firstResultSeen; // wait until the model has answered message 1
  yield userMsg(MSG_2);
  // generator returns → input exhausted, like any app that has nothing more to send
}

const MSG_1 = [
  "Use the Agent tool (run_in_background: true, general-purpose) to run this task:",
  '"Call mcp__app__generate_result once with job=\'demo\' and report its output."',
  "Just tell me once it's started.",
].join("\n");

const MSG_2 =
  "Thanks. When the background job completes, call mcp__app__record_result once with a one-line summary.";

// ---- run --------------------------------------------------------------------
const streamClosedErrors = [];
let recorded = false;
let resultCount = 0;

const timeout = setTimeout(() => {
  console.log("TIMEOUT after 240s");
  process.exit(2);
}, 240_000);

for await (const msg of query({
  prompt: input(),
  options: {
    mcpServers: { app: server },
    model: "claude-haiku-4-5", // reproduces with any model
    settingSources: [],        // hermetic — no user/project settings
    permissionMode: "bypassPermissions",
    allowDangerouslySkipPermissions: true,
  },
})) {
  if (msg.type === "result") {
    resultCount++;
    console.log(`[result #${resultCount}]`);
    releaseFollowUp();
  } else if (msg.type === "system" && (msg.subtype === "task_started" || msg.subtype === "task_notification")) {
    console.log(`[${msg.subtype}] task=${msg.task_id} ${msg.status ?? ""}`);
  } else if (msg.type === "user" && Array.isArray(msg.message?.content)) {
    for (const blk of msg.message.content) {
      if (blk.type !== "tool_result") continue;
      const text = Array.isArray(blk.content)
        ? blk.content.map((c) => c?.text ?? "").join(" ")
        : String(blk.content ?? "");
      if (blk.is_error) {
        console.log(`[tool_result ERROR] ${text.slice(0, 90)}`);
        if (/stream closed/i.test(text)) streamClosedErrors.push(text);
      } else if (/recorded:/.test(text)) {
        console.log(`[tool_result OK] ${text.slice(0, 90)}`);
        recorded = true;
      }
    }
  }
}
clearTimeout(timeout);

console.log("---");
console.log(`results=${resultCount} handlerInvocations=${handlerInvocations} streamClosedErrors=${streamClosedErrors.length}`);
if (streamClosedErrors.length > 0 && handlerInvocations === 0) {
  console.log("VERDICT: REPRODUCED — record_result died in the bridge; app handler never ran, outcome silently lost");
} else if (recorded) {
  console.log("VERDICT: NOT REPRODUCED — tool call succeeded");
} else {
  console.log("VERDICT: INCONCLUSIVE");
}

Environment

  • @anthropic-ai/claude-agent-sdk 0.3.165 (CLI 2.1.165) and 0.3.153 (CLI 2.1.153)
  • Node 20/22, macOS arm64 and Linux x64, any model (claude-haiku-4-5 in the scripts, for cost)
  • settingSources: [] (hermetic) — no user/project settings involved

Root cause

The SDK is a thin JS wrapper that drives the claude CLI binary over stdio. The control channel is multiplexed onto the same pipes as ordinary I/O:

host → CLI (stdin):   user messages   +   control responses (MCP results, hooks, canUseTool)
CLI → host (stdout):  SDK messages    +   control requests  ("run this tool")

A tool round-trip is stdout(request) → host runs handler → stdin(response). endInput() is stdin.end() — an EOF — so it tears down the response path along with user input, even though their lifetimes differ (input may end early; the control channel is needed as long as the CLI runs). From sdk.mjs (de-minified):

async streamInput(inputIterable) {
  let count = 0;
  for await (let msg of inputIterable) { count++; await this.transport.write(serialize(msg) + "\n"); }
  if (count > 0 && this.hasBidirectionalNeeds())   // SDK MCP / hooks / canUseTool
    await this.waitForFirstResult();               // satisfied by ANY earlier result
  this.transport.endInput();                        // → stdin.end() → "Stream closed" for later calls
}

waitForFirstResult() resolves on any prior result — it does not wait for a result to the message just sent, and nothing consults the CLI's in-flight turn or its task registry (which lives inside the CLI binary, not the SDK). The continuation that fails is itself supported, modeled behavior — result messages even carry an origin field to distinguish "user-prompted results from task-notification followups".

Why past fixes haven't held

This class has been patched at least three times; each fix changed how the SDK guesses the conversation is over (or hardened the aftermath) without removing the coupling — so it keeps resurfacing. Traced across published versions:

Era streamInput teardown Outcome
≤ 0.1.69 if (hasBidirectionalNeeds()) await waitForInactivity(); endInput() "go quiet → close"; an in-flight tool that doesn't reset the activity timer is cut mid-call — #114
v0.1.64 / v0.1.73 SDK unchanged (0.1.63↔0.1.65 diff is a childStdinprocessStdin rename) CLI-side hardening to tolerate late control responses; SDK still closed on a guess
0.2.x → current waitForInactivity() replaced by waitForFirstResult() Removed #114's misfire, introduced this bug: "any result → close"
v0.2.89 adds ERR_STREAM_WRITE_AFTER_END handling swallows the crash after a late response; root untouched

Every iteration patched the guess or the aftermath, never the coupling: "no more user input" and "tear down the control channel" share one stdin lifetime, and the close decision is made from the SDK's view of the stream rather than the CLI's own knowledge of whether it is done. Swap the heuristic again and the failure just moves again.

Fix — SDK-only, implemented and verified

After the input iterable is exhausted, don't close stdin immediately. Defer endInput() to a turn boundary: close once a result arrives after the last message was sent and the background-task ledger (task_startedtask_notification) is empty. A result is itself a turn boundary, so the close never lands mid-turn — no timers or inactivity heuristics. It uses only signals the SDK already receives, and needs no CLI change.

This is also the natural generalisation of the SDK's existing single-turn behaviour: readMessages already closes stdin on the first result for single-turn queries (the isSingleUserTurn branch). The fix just drives the streaming close from the same place, by the same rule.

The change is ~3 one-line call-site edits ((1)(2)(3) below) + two small private methods (noteForShutdown, maybeFinishStreaming). It fixes both Repro A and Repro B, leaves the non-bidirectional path untouched, and cannot hang (capturing the result-count at the last write means an app that does work after its final yield still sees a fresh result).

The change — readable, source-level (TypeScript)
class Query {
  private inputComplete = false;
  private resultCount = 0;
  private resultCountAtLastWrite = 0;          // results seen as of the last input write
  private outstandingTasks = new Set<string>();

  async streamInput(inputStream) {
    let count = 0;
    for await (const m of inputStream) {
      count++;
      if (this.abortController?.signal.aborted) break;
      this.resultCountAtLastWrite = this.resultCount;     // (1)
      await this.transport.write(serialize(m) + "\n");
    }
    if (count > 0 && this.hasBidirectionalNeeds()) {
      this.inputComplete = true;                          // (2) defer the close…
      this.maybeFinishStreaming();                        //     …to the turn-boundary check
    } else {
      this.transport.endInput();                          // unchanged: nothing bidirectional to protect
    }
  }

  /** called for every message read from the CLI */
  private noteForShutdown(m) {
    if (m?.type === "result") { this.resultCount++; this.maybeFinishStreaming(); }
    else if (m?.type === "system") {
      if (m.subtype === "task_started")       this.outstandingTasks.add(m.task_id);
      else if (m.subtype === "task_notification") this.outstandingTasks.delete(m.task_id);
    }
  }

  /** close stdin only once the conversation is actually over */
  private maybeFinishStreaming() {
    if (!this.inputComplete) return;
    if (this.hasBidirectionalNeeds()) {
      if (this.resultCount <= this.resultCountAtLastWrite) return; // wait for the LAST turn's result
      if (this.outstandingTasks.size > 0) return;                  // wait for background continuations
    }
    this.transport.endInput();
  }

  async readMessages() {
    for await (const m of this.transport.readMessages()) {
      this.noteForShutdown(m);                            // (3)
      /* …existing dispatch unchanged… */
    }
  }
}

The three guards: resultCount > resultCountAtLastWrite waits for the last message's own result (fixes A); outstandingTasks.size === 0 keeps the channel open for a background-task continuation (fixes B); evaluating only at a result means the close never lands mid-turn, so a slow in-turn tool can't be cut either (the #114 shape).

Verified against 0.3.165:

Test Unpatched Patched
Repro A (real, deterministic) REPRODUCED NOT REPRODUCED
Repro B (real, model-driven) REPRODUCED NOT REPRODUCED
Model-free harness, A-shape / B-shape CHANNEL_CLOSED CHANNEL_ALIVE
Regression: single-turn / streaming-without-bidi clean exit clean exit
Edge: work after the last yield terminates, no hang

Full write-up with the rationale for each guard: SOLUTION.md · the exact minified before→after applied to the bundle: apply-patch.mjs · model-free harness: fake-cli.mjs + harness.mjs. Happy to open a PR if the source repo is the right place for it.

Limitation, and a deeper fix only you can make. This keeps the close decision inside the SDK — it just makes that decision correct and aligns it with the single-turn path. The underlying coupling remains: "no more user input" and "tear down the control channel" still share one stdin lifetime via EOF. To dissolve the class for good, remove the coupling itself — signal "no more user input" with an application-level sentinel over stdin instead of an EOF, and keep the bidirectional bridge alive until the CLI process exits, letting the claude binary (which authoritatively knows its turn/task state) drive shutdown. That needs changes inside the CLI, so only Anthropic can do it; the SDK-only fix above is the most an external contributor can ship, and it holds until that re-architecture lands.

Notes

Host-side workaround we currently ship (the precursor to the SDK fix above)

Never let the input iterable complete on its own; close it explicitly from the host. Track CLI task lifecycle from task_started / task_notification / task_updated system messages; at each result, close immediately if no background activity was ever observed, otherwise close once the task ledger is empty and a short message-quiet window passes. This eliminates the failures in our verification harness (deterministic repro: red before, green after, both shapes), at the cost of re-implementing conversation-lifetime tracking the CLI already has internally.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions