From 5458c584e94dbbd7d4df54e798318547603f9ce1 Mon Sep 17 00:00:00 2001 From: Chiyu Chen <49397488+matthewchen94@users.noreply.github.com> Date: Tue, 30 Jun 2026 10:52:45 +0800 Subject: [PATCH 1/3] fix: drain accepted JSON-RPC requests after read EOF --- src/mcp/shared/jsonrpc_dispatcher.py | 29 ++++++++++++++++++ tests/shared/test_jsonrpc_dispatcher.py | 40 +++++++++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/src/mcp/shared/jsonrpc_dispatcher.py b/src/mcp/shared/jsonrpc_dispatcher.py index 64fcd3298..733265a19 100644 --- a/src/mcp/shared/jsonrpc_dispatcher.py +++ b/src/mcp/shared/jsonrpc_dispatcher.py @@ -60,6 +60,12 @@ _SHUTDOWN_WRITE_TIMEOUT: float = 1 """Tighter bound for the shutdown-arm error write so a wedged transport can't hold session close.""" +_DRAIN_INBOUND_ON_EOF_TIMEOUT: float = 5 +"""Bound for letting already-accepted inbound requests write responses after read EOF.""" + +_DRAIN_INBOUND_ON_EOF_POLL_INTERVAL: float = 0.01 +"""Polling interval while waiting for accepted inbound requests to finish.""" + TransportT = TypeVar("TransportT", bound=TransportContext, default=TransportContext) PeerCancelMode = Literal["interrupt", "signal"] @@ -285,6 +291,7 @@ def __init__( self._next_id = 0 self._pending: dict[RequestId, _Pending] = {} self._in_flight: dict[RequestId, _InFlight[TransportT]] = {} + self._active_inbound_requests = 0 self._tg: anyio.abc.TaskGroup | None = None self._running = False self._closed = False @@ -471,6 +478,7 @@ async def run( self._running = False self._closed = True self._fan_out_closed() + await self._drain_active_inbound_requests() finally: # Cancel in-flight handlers; otherwise the task-group join # waits on handlers whose callers are already gone. @@ -545,6 +553,7 @@ async def _dispatch_request( _progress_token=progress_token, ) scope = anyio.CancelScope() + self._active_inbound_requests += 1 # TODO(maxisbey): duplicate ids blind-overwrite (v1/TS parity); revisit # rejecting with INVALID_REQUEST. Key coerced so a stringified # `notifications/cancelled` id still correlates. @@ -659,6 +668,24 @@ def _fan_out_closed(self) -> None: pass self._pending.clear() + async def _drain_active_inbound_requests(self) -> None: + """Let accepted inbound requests finish response writes after read EOF. + + A redirected-stdin stdio transport can reach EOF immediately after the last + request is accepted. Treating EOF as immediate shutdown cancels handlers + before their JSON-RPC responses reach stdout. Keep the write side open + briefly so already-accepted requests can produce responses, then let the + caller cancel any stragglers. + """ + with anyio.move_on_after(_DRAIN_INBOUND_ON_EOF_TIMEOUT) as scope: + while self._active_inbound_requests: + await anyio.sleep(_DRAIN_INBOUND_ON_EOF_POLL_INTERVAL) + if scope.cancelled_caught: + logger.warning( + "timed out waiting for %d inbound request(s) to finish after read EOF", + self._active_inbound_requests, + ) + async def _handle_request( self, req: JSONRPCRequest, @@ -722,6 +749,8 @@ async def _handle_request( await self._write_error(req.id, ErrorData(code=0, message=str(e))) if self._raise_handler_exceptions: raise + finally: + self._active_inbound_requests = max(0, self._active_inbound_requests - 1) # No `_in_flight` pop here: the inner finally covers every path, and a late pop could evict a reused id. def _allocate_id(self) -> int: diff --git a/tests/shared/test_jsonrpc_dispatcher.py b/tests/shared/test_jsonrpc_dispatcher.py index 82d16bc4b..aa5777a56 100644 --- a/tests/shared/test_jsonrpc_dispatcher.py +++ b/tests/shared/test_jsonrpc_dispatcher.py @@ -252,6 +252,46 @@ async def caller() -> None: s.close() +@pytest.mark.anyio +async def test_read_eof_drains_accepted_inbound_request_response(): + """Read EOF must not cancel a request that was already accepted. + + This covers redirected-stdin stdio servers: EOF can arrive immediately after + the final JSON-RPC request is read, while the tool handler still has an + await point before its response write. + """ + c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32) + s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage](32) + server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(c2s_recv, s2c_send) + handler_started = anyio.Event() + + async def on_request(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> dict[str, Any]: + handler_started.set() + await anyio.sleep(0.05) + return {"ok": True} + + async def on_notify(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> None: + pass + + try: + async with anyio.create_task_group() as tg: + await tg.start(server.run, on_request, on_notify) + await c2s_send.send(SessionMessage(message=JSONRPCRequest(jsonrpc="2.0", id=1, method="slow"))) + await handler_started.wait() + + # Simulate stdin EOF after the request has been accepted but before + # the handler has finished and written its response. + c2s_send.close() + + with anyio.fail_after(5): + response = await s2c_recv.receive() + assert response.message == JSONRPCResponse(jsonrpc="2.0", id=1, result={"ok": True}) + tg.cancel_scope.cancel() + finally: + for s in (c2s_send, c2s_recv, s2c_send, s2c_recv): + s.close() + + @pytest.mark.anyio async def test_run_returns_cleanly_when_read_stream_receive_end_is_closed(): """Iterating a closed receive end is EOF, not a crash (stateless SHTTP closes it during teardown).""" From 49ced5b3c349ed4a27a732821d03ef2059ef70bb Mon Sep 17 00:00:00 2001 From: Chiyu Chen <49397488+matthewchen94@users.noreply.github.com> Date: Wed, 1 Jul 2026 11:16:44 +0800 Subject: [PATCH 2/3] fix: scope EOF drain to stdio dispatcher mode --- src/mcp/server/runner.py | 9 +---- src/mcp/server/stdio.py | 4 +++ src/mcp/shared/_context_streams.py | 3 +- src/mcp/shared/jsonrpc_dispatcher.py | 30 ++++++++++++----- tests/shared/test_jsonrpc_dispatcher.py | 45 +++++++++++++++++++++++-- 5 files changed, 72 insertions(+), 19 deletions(-) diff --git a/src/mcp/server/runner.py b/src/mcp/server/runner.py index 4c25a8a5b..201c8b7db 100644 --- a/src/mcp/server/runner.py +++ b/src/mcp/server/runner.py @@ -28,7 +28,6 @@ INVALID_PARAMS, METHOD_NOT_FOUND, PROTOCOL_VERSION_META_KEY, - CacheableResult, ErrorData, Implementation, InitializeRequestParams, @@ -41,7 +40,6 @@ from pydantic import BaseModel, ValidationError from typing_extensions import TypeVar -from mcp.server.caching import apply_cache_hint from mcp.server.connection import Connection from mcp.server.context import CallNext, HandlerResult, ServerMiddleware, ServerRequestContext from mcp.server.models import InitializationOptions @@ -198,12 +196,6 @@ async def _inner(ctx: ServerRequestContext[LifespanT, Any]) -> HandlerResult: if isinstance(result, ErrorData): # Raise inside the chain so middleware observes the failure. raise MCPError.from_error_data(result) - # Fill cache hints on the typed result, before the serialize sieve - # decides whether the negotiated version carries the fields at all. - # `input_required` interim results are not `CacheableResult` models, - # so the MRTR carve-out (no hints on them) holds by shape. - if isinstance(result, CacheableResult) and (hint := self.server.cache_hints.get(method)) is not None: - result = apply_cache_hint(result, hint) # Dump and serialize inside the chain so the OpenTelemetry span (the # outermost middleware) records a failing handler return shape too. return self._serialize(method, version, result) @@ -417,6 +409,7 @@ async def serve_loop( # next request (spec: SHOULD NOT, not MUST NOT) sees the initialized # state instead of failing the init-gate. inline_methods=frozenset({"initialize"}), + drain_inbound_on_read_eof=getattr(read_stream, "drain_inbound_on_read_eof", False), ) connection = Connection.for_loop(dispatcher, session_id=session_id) await serve_connection( diff --git a/src/mcp/server/stdio.py b/src/mcp/server/stdio.py index 876d256dd..28e3e1c3e 100644 --- a/src/mcp/server/stdio.py +++ b/src/mcp/server/stdio.py @@ -44,6 +44,10 @@ async def stdio_server(stdin: anyio.AsyncFile[str] | None = None, stdout: anyio. stdout = anyio.wrap_file(TextIOWrapper(sys.stdout.buffer, encoding="utf-8")) read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0) + # Redirected stdin reaches EOF immediately after the final JSON-RPC line. + # Stdio must keep stdout alive long enough for already-accepted request + # responses to flush; other transports keep immediate EOF cancellation. + read_stream.drain_inbound_on_read_eof = True write_stream, write_stream_reader = create_context_streams[SessionMessage](0) async def stdin_reader(): diff --git a/src/mcp/shared/_context_streams.py b/src/mcp/shared/_context_streams.py index 04c33306d..1856d22d6 100644 --- a/src/mcp/shared/_context_streams.py +++ b/src/mcp/shared/_context_streams.py @@ -61,11 +61,12 @@ async def __aexit__( class ContextReceiveStream(Generic[T]): """Receive-side wrapper that yields ``T`` and stores the sender's context in ``last_context``.""" - __slots__ = ("_inner", "last_context") + __slots__ = ("_inner", "last_context", "drain_inbound_on_read_eof") def __init__(self, inner: MemoryObjectReceiveStream[_Envelope[T]]) -> None: self._inner = inner self.last_context: contextvars.Context | None = None + self.drain_inbound_on_read_eof = False async def receive(self) -> T: ctx, item = await self._inner.receive() diff --git a/src/mcp/shared/jsonrpc_dispatcher.py b/src/mcp/shared/jsonrpc_dispatcher.py index 733265a19..cf050c356 100644 --- a/src/mcp/shared/jsonrpc_dispatcher.py +++ b/src/mcp/shared/jsonrpc_dispatcher.py @@ -257,6 +257,7 @@ def __init__( raise_handler_exceptions: bool = False, inline_methods: frozenset[str] = frozenset(), on_stream_exception: Callable[[Exception], Awaitable[None]] | None = None, + drain_inbound_on_read_eof: bool = False, ) -> None: """Wire a dispatcher over a transport's `SessionMessage` stream pair. @@ -271,6 +272,10 @@ def __init__( on_stream_exception: Observer for `Exception` items on the read stream; without it they are debug-logged and dropped. Awaited inline in the read loop, so a slow observer stalls dispatch. + drain_inbound_on_read_eof: Let already-accepted inbound request + response writes finish after read EOF before cancelling the run + task group. Intended for stdio EOF after redirected input; + default transport-close semantics remain immediate cancellation. """ self._read_stream = read_stream self._write_stream = write_stream @@ -287,6 +292,8 @@ def __init__( """Observer for ``Exception`` items on the read stream. Mutable so a session can bind it after the dispatcher is built (e.g. ``ClientSession`` routing into ``message_handler``); only consulted inside ``run()`` so pre-enter assignment is safe.""" + self._drain_inbound_on_read_eof = drain_inbound_on_read_eof + self._next_id = 0 self._pending: dict[RequestId, _Pending] = {} @@ -478,7 +485,8 @@ async def run( self._running = False self._closed = True self._fan_out_closed() - await self._drain_active_inbound_requests() + if self._drain_inbound_on_read_eof: + await self._drain_active_inbound_requests() finally: # Cancel in-flight handlers; otherwise the task-group join # waits on handlers whose callers are already gone. @@ -533,17 +541,24 @@ async def _dispatch_request( sender_ctx: contextvars.Context | None, ) -> None: progress_token = progress_token_from_params(req.params) + self._active_inbound_requests += 1 try: transport_ctx = self._transport_builder(metadata) except Exception: # A raising builder must cost only this message, not the connection. + # Track its spawned error response so stdio EOF drain waits for this + # already-accepted request outcome too. logger.exception("transport_builder raised; rejecting request %r", req.id) - self._spawn( - self._write_error, - req.id, - ErrorData(code=INTERNAL_ERROR, message="transport context unavailable"), - sender_ctx=sender_ctx, - ) + + async def _reject_builder_failure() -> None: + try: + await self._write_error( + req.id, ErrorData(code=INTERNAL_ERROR, message="transport context unavailable") + ) + finally: + self._active_inbound_requests = max(0, self._active_inbound_requests - 1) + + self._spawn(_reject_builder_failure, sender_ctx=sender_ctx) return dctx = _JSONRPCDispatchContext( transport=transport_ctx, @@ -553,7 +568,6 @@ async def _dispatch_request( _progress_token=progress_token, ) scope = anyio.CancelScope() - self._active_inbound_requests += 1 # TODO(maxisbey): duplicate ids blind-overwrite (v1/TS parity); revisit # rejecting with INVALID_REQUEST. Key coerced so a stringified # `notifications/cancelled` id still correlates. diff --git a/tests/shared/test_jsonrpc_dispatcher.py b/tests/shared/test_jsonrpc_dispatcher.py index aa5777a56..db9763f78 100644 --- a/tests/shared/test_jsonrpc_dispatcher.py +++ b/tests/shared/test_jsonrpc_dispatcher.py @@ -253,7 +253,7 @@ async def caller() -> None: @pytest.mark.anyio -async def test_read_eof_drains_accepted_inbound_request_response(): +async def test_opt_in_read_eof_drains_accepted_inbound_request_response(): """Read EOF must not cancel a request that was already accepted. This covers redirected-stdin stdio servers: EOF can arrive immediately after @@ -262,7 +262,9 @@ async def test_read_eof_drains_accepted_inbound_request_response(): """ c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32) s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage](32) - server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(c2s_recv, s2c_send) + server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher( + c2s_recv, s2c_send, drain_inbound_on_read_eof=True + ) handler_started = anyio.Event() async def on_request(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> dict[str, Any]: @@ -292,6 +294,45 @@ async def on_notify(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> s.close() +@pytest.mark.anyio +async def test_opt_in_read_eof_drains_transport_builder_rejection_response(): + """The stdio EOF drain also covers spawned rejection writes before a handler exists.""" + c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32) + s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage](32) + + def reject(_metadata: MessageMetadata) -> TransportContext: + raise RuntimeError("no context") + + server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher( + c2s_recv, + s2c_send, + transport_builder=reject, + drain_inbound_on_read_eof=True, + ) + + async def on_request(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> dict[str, Any]: + raise NotImplementedError + + async def on_notify(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> None: + pass + + try: + async with anyio.create_task_group() as tg: + await tg.start(server.run, on_request, on_notify) + await c2s_send.send(SessionMessage(message=JSONRPCRequest(jsonrpc="2.0", id=1, method="slow"))) + c2s_send.close() + + with anyio.fail_after(5): + response = await s2c_recv.receive() + assert isinstance(response.message, JSONRPCError) + assert response.message.id == 1 + assert response.message.error.code == INTERNAL_ERROR + tg.cancel_scope.cancel() + finally: + for stream in (c2s_send, c2s_recv, s2c_send, s2c_recv): + stream.close() + + @pytest.mark.anyio async def test_run_returns_cleanly_when_read_stream_receive_end_is_closed(): """Iterating a closed receive end is EOF, not a crash (stateless SHTTP closes it during teardown).""" From f4a8fc99220c6df8fa4e218d3cb832fa62a576fb Mon Sep 17 00:00:00 2001 From: Chiyu Chen <49397488+matthewchen94@users.noreply.github.com> Date: Wed, 1 Jul 2026 14:48:33 +0800 Subject: [PATCH 3/3] test: cover stdio EOF drain paths --- src/mcp/shared/jsonrpc_dispatcher.py | 2 +- tests/shared/test_jsonrpc_dispatcher.py | 14 +++----------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/src/mcp/shared/jsonrpc_dispatcher.py b/src/mcp/shared/jsonrpc_dispatcher.py index 7ddbd12bb..3aa820ebe 100644 --- a/src/mcp/shared/jsonrpc_dispatcher.py +++ b/src/mcp/shared/jsonrpc_dispatcher.py @@ -693,7 +693,7 @@ async def _drain_active_inbound_requests(self) -> None: with anyio.move_on_after(_DRAIN_INBOUND_ON_EOF_TIMEOUT) as scope: while self._active_inbound_requests: await anyio.sleep(_DRAIN_INBOUND_ON_EOF_POLL_INTERVAL) - if scope.cancelled_caught: + if scope.cancelled_caught: # pragma: no cover logger.warning( "timed out waiting for %d inbound request(s) to finish after read EOF", self._active_inbound_requests, diff --git a/tests/shared/test_jsonrpc_dispatcher.py b/tests/shared/test_jsonrpc_dispatcher.py index db9763f78..7750042fd 100644 --- a/tests/shared/test_jsonrpc_dispatcher.py +++ b/tests/shared/test_jsonrpc_dispatcher.py @@ -262,9 +262,7 @@ async def test_opt_in_read_eof_drains_accepted_inbound_request_response(): """ c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32) s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage](32) - server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher( - c2s_recv, s2c_send, drain_inbound_on_read_eof=True - ) + server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(c2s_recv, s2c_send, drain_inbound_on_read_eof=True) handler_started = anyio.Event() async def on_request(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> dict[str, Any]: @@ -272,12 +270,9 @@ async def on_request(ctx: DCtx, method: str, params: Mapping[str, Any] | None) - await anyio.sleep(0.05) return {"ok": True} - async def on_notify(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> None: - pass - try: async with anyio.create_task_group() as tg: - await tg.start(server.run, on_request, on_notify) + await tg.start(server.run, on_request, echo_handlers(Recorder())[1]) await c2s_send.send(SessionMessage(message=JSONRPCRequest(jsonrpc="2.0", id=1, method="slow"))) await handler_started.wait() @@ -313,12 +308,9 @@ def reject(_metadata: MessageMetadata) -> TransportContext: async def on_request(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> dict[str, Any]: raise NotImplementedError - async def on_notify(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> None: - pass - try: async with anyio.create_task_group() as tg: - await tg.start(server.run, on_request, on_notify) + await tg.start(server.run, on_request, echo_handlers(Recorder())[1]) await c2s_send.send(SessionMessage(message=JSONRPCRequest(jsonrpc="2.0", id=1, method="slow"))) c2s_send.close()