Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/mcp/shared/jsonrpc_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@
_SHUTDOWN_WRITE_TIMEOUT: float = 1
"""Tighter bound for the shutdown-arm error write so a wedged transport can't hold session close."""

_DRAIN_INBOUND_ON_EOF_TIMEOUT: float = 5
"""Bound for letting already-accepted inbound requests write responses after read EOF."""

_DRAIN_INBOUND_ON_EOF_POLL_INTERVAL: float = 0.01
"""Polling interval while waiting for accepted inbound requests to finish."""

TransportT = TypeVar("TransportT", bound=TransportContext, default=TransportContext)

PeerCancelMode = Literal["interrupt", "signal"]
Expand Down Expand Up @@ -285,6 +291,7 @@ def __init__(
self._next_id = 0
self._pending: dict[RequestId, _Pending] = {}
self._in_flight: dict[RequestId, _InFlight[TransportT]] = {}
self._active_inbound_requests = 0
self._tg: anyio.abc.TaskGroup | None = None
self._running = False
self._closed = False
Expand Down Expand Up @@ -471,6 +478,7 @@ async def run(
self._running = False
self._closed = True
self._fan_out_closed()
await self._drain_active_inbound_requests()
finally:
# Cancel in-flight handlers; otherwise the task-group join
# waits on handlers whose callers are already gone.
Expand Down Expand Up @@ -545,6 +553,7 @@ async def _dispatch_request(
_progress_token=progress_token,
)
scope = anyio.CancelScope()
self._active_inbound_requests += 1

@cubic-dev-ai cubic-dev-ai Bot Jun 30, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Track builder-rejection response writes in the EOF drain too. Otherwise a request whose transport_builder raises just before EOF can have its spawned INTERNAL_ERROR response cancelled before it reaches the peer.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/mcp/shared/jsonrpc_dispatcher.py, line 556:

<comment>Track builder-rejection response writes in the EOF drain too. Otherwise a request whose `transport_builder` raises just before EOF can have its spawned INTERNAL_ERROR response cancelled before it reaches the peer.</comment>

<file context>
@@ -545,6 +553,7 @@ async def _dispatch_request(
             _progress_token=progress_token,
         )
         scope = anyio.CancelScope()
+        self._active_inbound_requests += 1
         # TODO(maxisbey): duplicate ids blind-overwrite (v1/TS parity); revisit
         # rejecting with INVALID_REQUEST. Key coerced so a stringified
</file context>
Fix with cubic

# TODO(maxisbey): duplicate ids blind-overwrite (v1/TS parity); revisit
# rejecting with INVALID_REQUEST. Key coerced so a stringified
# `notifications/cancelled` id still correlates.
Expand Down Expand Up @@ -659,6 +668,24 @@ def _fan_out_closed(self) -> None:
pass
self._pending.clear()

async def _drain_active_inbound_requests(self) -> None:
"""Let accepted inbound requests finish response writes after read EOF.

A redirected-stdin stdio transport can reach EOF immediately after the last
request is accepted. Treating EOF as immediate shutdown cancels handlers
before their JSON-RPC responses reach stdout. Keep the write side open
briefly so already-accepted requests can produce responses, then let the
caller cancel any stragglers.
"""
with anyio.move_on_after(_DRAIN_INBOUND_ON_EOF_TIMEOUT) as scope:
while self._active_inbound_requests:
await anyio.sleep(_DRAIN_INBOUND_ON_EOF_POLL_INTERVAL)
if scope.cancelled_caught:
logger.warning(
"timed out waiting for %d inbound request(s) to finish after read EOF",
self._active_inbound_requests,
)

async def _handle_request(
self,
req: JSONRPCRequest,
Expand Down Expand Up @@ -722,6 +749,8 @@ async def _handle_request(
await self._write_error(req.id, ErrorData(code=0, message=str(e)))
if self._raise_handler_exceptions:
raise
finally:
self._active_inbound_requests = max(0, self._active_inbound_requests - 1)
# No `_in_flight` pop here: the inner finally covers every path, and a late pop could evict a reused id.

def _allocate_id(self) -> int:
Expand Down
40 changes: 40 additions & 0 deletions tests/shared/test_jsonrpc_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,46 @@ async def caller() -> None:
s.close()


@pytest.mark.anyio
async def test_read_eof_drains_accepted_inbound_request_response():
"""Read EOF must not cancel a request that was already accepted.

This covers redirected-stdin stdio servers: EOF can arrive immediately after
the final JSON-RPC request is read, while the tool handler still has an
await point before its response write.
"""
c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32)
s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage](32)
server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(c2s_recv, s2c_send)
handler_started = anyio.Event()

async def on_request(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> dict[str, Any]:
handler_started.set()
await anyio.sleep(0.05)
return {"ok": True}

async def on_notify(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> None:
pass

try:
async with anyio.create_task_group() as tg:
await tg.start(server.run, on_request, on_notify)
await c2s_send.send(SessionMessage(message=JSONRPCRequest(jsonrpc="2.0", id=1, method="slow")))
await handler_started.wait()

# Simulate stdin EOF after the request has been accepted but before
# the handler has finished and written its response.
c2s_send.close()

with anyio.fail_after(5):
response = await s2c_recv.receive()
assert response.message == JSONRPCResponse(jsonrpc="2.0", id=1, result={"ok": True})
tg.cancel_scope.cancel()
finally:
for s in (c2s_send, c2s_recv, s2c_send, s2c_recv):
s.close()


@pytest.mark.anyio
async def test_run_returns_cleanly_when_read_stream_receive_end_is_closed():
"""Iterating a closed receive end is EOF, not a crash (stateless SHTTP closes it during teardown)."""
Expand Down
Loading