Skip to content

Commit ef7fdff

Browse files
committed
Era-scope cache arms and harden store interaction paths
1 parent fb1c510 commit ef7fdff

4 files changed

Lines changed: 274 additions & 98 deletions

File tree

docs/advanced/caching.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ Cache keys also carry the **server's identity**: the URL string you dialed, with
9595
* **No coalescing.** Two concurrent identical calls are two fetches.
9696
* **No TTL beyond 24 hours.** A larger `ttlMs`, whether server-sent or configured, is clamped down on store (`mcp.client.caching.MAX_TTL_MS`), bounding how long any entry, however generously hinted, can be served.
9797
* On a **shared store**, clients race each other. Each client drops its own write when an eviction overtook the fetch in flight, but a *co-tenant* client can still write back an entry that an eviction it never saw had removed; and that race bookkeeping is itself bounded: past 4096 tracked keys the oldest key's guard is dropped first. Both windows are accepted, and closed by the TTL cap above.
98-
* On a **shared persistent store**, a session that negotiated a different protocol era than the entry's writer may be served the writer's entry until TTL or eviction. Accepted, and likewise bounded by the TTL cap.
98+
* **No serving across protocol eras.** Entries are scoped to the negotiated protocol version: on a shared persistent store, a session never serves an entry written under a different negotiated version (the same listing genuinely differs by era, since the SDK strips the 2026 fields for older sessions). Eviction likewise touches only the current era's entries; another era's entries simply age out by TTL.
9999

100100
### Reading the hints yourself
101101

src/mcp/client/caching.py

Lines changed: 72 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from typing import Any, Final, Literal, Protocol
1111

1212
import anyio
13+
import anyio.lowlevel
1314
from mcp_types import (
1415
CacheableResult,
1516
PromptListChangedNotification,
@@ -135,35 +136,32 @@ class InMemoryResponseCacheStore:
135136
"""Default in-process `ResponseCacheStore`.
136137
137138
Method bodies are synchronous, so concurrent tasks never observe a torn
138-
write. Non-read methods form a small closed key set; `max_read_entries`
139-
caps the `resources/read` keys, FIFO-evicting at the cap (`0` disables it).
139+
write. `max_entries` caps the whole store, evicting least-recently-used
140+
at the cap (`0` disables it); `get` and `set` both refresh recency, so a
141+
hot entry survives churn from other keys.
140142
141143
Raises:
142-
ValueError: If `max_read_entries` is negative.
144+
ValueError: If `max_entries` is negative.
143145
"""
144146

145-
def __init__(self, *, max_read_entries: int = 512) -> None:
146-
if max_read_entries < 0:
147-
raise ValueError(f"max_read_entries must be >= 0, got {max_read_entries}")
148-
self._max_read_entries = max_read_entries
147+
def __init__(self, *, max_entries: int = 1024) -> None:
148+
if max_entries < 0:
149+
raise ValueError(f"max_entries must be >= 0, got {max_entries}")
150+
self._max_entries = max_entries
149151
self._entries: dict[CacheKey, CacheEntry] = {}
150152

151153
async def get(self, key: CacheKey) -> CacheEntry | None:
152-
return self._entries.get(key)
154+
entry = self._entries.get(key)
155+
if entry is not None:
156+
# Pop-and-reinsert moves the key to the back: the dict's insertion order is the LRU ledger.
157+
self._entries[key] = self._entries.pop(key)
158+
return entry
153159

154160
async def set(self, key: CacheKey, entry: CacheEntry) -> None:
155-
if (
156-
self._max_read_entries
157-
and key.method == "resources/read"
158-
and key not in self._entries
159-
# Total size below the cap implies the read subset is below it too - skip the scan.
160-
and len(self._entries) >= self._max_read_entries
161-
):
162-
# Insertion order (replacement keeps position) makes the dict itself the FIFO ledger.
163-
read_keys = [k for k in self._entries if k.method == "resources/read"]
164-
if len(read_keys) >= self._max_read_entries:
165-
del self._entries[read_keys[0]]
161+
self._entries.pop(key, None)
166162
self._entries[key] = entry
163+
if self._max_entries and len(self._entries) > self._max_entries:
164+
del self._entries[next(iter(self._entries))]
167165

168166
async def delete(self, key: CacheKey) -> None:
169167
self._entries.pop(key, None)
@@ -175,6 +173,10 @@ async def clear(self) -> None:
175173
_GENERATION_MAP_CAP: Final[int] = 4096
176174
"""Cap on the generation map; at the cap the oldest key's eviction-race guard is dropped (FIFO)."""
177175

176+
_STORE_CLEANUP_TIMEOUT: Final[float] = 5
177+
"""Bound for must-complete store cleanup deletes (mirrors the dispatcher's final-write bound);
178+
a wedged store delete must not hold client teardown uncancellably."""
179+
178180

179181
class ClientResponseCache:
180182
"""Coordinates the `Client` caching verbs with a `ResponseCacheStore`: keys, era gate, TTL/scope, eviction."""
@@ -190,27 +192,43 @@ def __init__(
190192
share_public: bool,
191193
negotiated_version: Callable[[], str | None],
192194
generation_map_cap: int = _GENERATION_MAP_CAP,
195+
store_cleanup_timeout: float = _STORE_CLEANUP_TIMEOUT,
193196
) -> None:
194197
self._store = store
198+
self._partition = partition
199+
self._arm_id = arm_id
200+
self._share_public = share_public
195201
self._default_ttl_ms = default_ttl_ms
196202
self._clock = clock
197203
self._negotiated_version = negotiated_version
198-
# JSON arrays so crafted arm_id/partition values cannot collide across field boundaries.
199-
self._private_arm = json.dumps(["private", arm_id, partition])
200-
self._public_arm = json.dumps(["public", arm_id] if share_public else ["public", arm_id, partition])
201204
# A key is eviction-race-guarded iff registered here.
202205
self._generations: dict[tuple[str, str], int] = {}
203206
self._generation_map_cap = generation_map_cap
207+
self._store_cleanup_timeout = store_cleanup_timeout
204208
self._warned_store_ops: set[str] = set()
205209

210+
def _arm(self, scope: Literal["public", "private"]) -> str:
211+
# JSON arrays so crafted arm_id/partition values cannot collide across field boundaries.
212+
# The negotiated version era-scopes every arm: a session never serves an entry written
213+
# under a different protocol era (its content differs - sieve-stripped fields, header
214+
# filtering). Every caller runs post-connect; were that ever untrue, the supplier's
215+
# None still partitions harmlessly.
216+
fields: list[str | None] = [scope, self._negotiated_version(), self._arm_id]
217+
if scope == "private" or not self._share_public:
218+
fields.append(self._partition)
219+
return json.dumps(fields)
220+
206221
async def read(self, method: str, params_key: str) -> CacheableResult | None:
207222
"""Serve a fresh entry for the key, or `None`; the served result is a deep copy."""
223+
# A hit completes without any other yielding await, so checkpoint here: a poll
224+
# loop over a fresh entry must not starve spawned tasks (eviction dispatch).
225+
await anyio.lowlevel.checkpoint()
208226
# A wrong-shape entry raises as late as the copy, so the boundary wraps the whole read path.
209227
try:
210-
entry = await self._get_fresh(CacheKey(method, params_key, self._private_arm))
228+
entry = await self._get_fresh(CacheKey(method, params_key, self._arm("private")))
211229
if entry is None:
212230
# After a scope flip, a stale private entry must not shadow a fresh public one.
213-
entry = await self._get_fresh(CacheKey(method, params_key, self._public_arm))
231+
entry = await self._get_fresh(CacheKey(method, params_key, self._arm("public")))
214232
if entry is not None and entry.scope != "public":
215233
# Never serve an entry the server scoped "private" out of the shared arm.
216234
entry = None
@@ -250,49 +268,52 @@ async def write(
250268
if self._generation_moved(gen_key, gen_at_capture):
251269
return # the key was evicted while the fetch was in flight
252270
ttl_ms, scope = self._resolve(result)
253-
private_key = CacheKey(method, params_key, self._private_arm)
254-
public_key = CacheKey(method, params_key, self._public_arm)
271+
private_key = CacheKey(method, params_key, self._arm("private"))
272+
public_key = CacheKey(method, params_key, self._arm("public"))
255273
if ttl_ms <= 0:
256274
if mode == "refresh":
257-
# The refetch superseded the warm entry; shielded so a cancellation cannot leave one arm warm.
258-
with anyio.CancelScope(shield=True):
259-
await self._delete(private_key)
260-
await self._delete(public_key)
275+
# The refetch superseded the warm entry, which a cancellation must not leave serving.
276+
await self._cleanup_delete(private_key, public_key)
261277
return
262278
own, opposite = (public_key, private_key) if scope == "public" else (private_key, public_key)
263279
# Opposite arm first: a failed delete aborts before the set - never two arms answering for one key.
264280
if not await self._delete(opposite):
265-
# The own arm's entry is superseded too: shielded best-effort delete, degrading to a full miss.
266-
with anyio.CancelScope(shield=True):
267-
await self._delete(own)
281+
# The own arm's entry is superseded too: best-effort delete, degrading to a full miss.
282+
await self._cleanup_delete(own)
268283
return
269284
entry = CacheEntry(value=result.model_copy(deep=True), scope=scope, expires_at=self._clock() + ttl_ms / 1000)
270285
try:
271-
await self._set(own, entry)
286+
if not await self._set(own, entry):
287+
# The fetch superseded any pre-existing own-arm entry, and the failed set
288+
# left it in place: purge it (mirrors the opposite-arm-failure path).
289+
await self._cleanup_delete(own)
272290
finally:
273291
# An eviction can land while the set commits - even when the await
274-
# is cancelled - so re-check on every exit; the delete is shielded
292+
# is cancelled - so re-check on every exit; the delete must complete
275293
# so the pending cancellation cannot resurrect the evicted entry.
276294
if self._generation_moved(gen_key, gen_at_capture):
277-
with anyio.CancelScope(shield=True):
278-
await self._delete(own)
295+
await self._cleanup_delete(own)
279296

280297
async def evict_method(self, method: str) -> None:
281298
"""Evict the method's cursor-less entry."""
282299
await self.evict_key(method, "")
283300

284301
async def evict_key(self, method: str, params_key: str) -> None:
285-
"""Evict one key from both arms."""
302+
"""Evict one key from both arms.
303+
304+
Only the current era's arms are touched; other-era entries in a persistent store age out by TTL.
305+
"""
286306
gen_key = (method, params_key)
287307
# Bump first so an in-flight fetch cannot write the evicted entry back.
288308
# Unregistered keys skip the bump (uris must not grow the map) but not
289309
# the deletes - a persistent store may hold uncaptured entries.
290310
if gen_key in self._generations:
291311
self._generations[gen_key] += 1
292-
# Shielded: a cancellation between the deletes would leave one arm serving the evicted entry.
293-
with anyio.CancelScope(shield=True):
294-
await self._delete(CacheKey(method, params_key, self._private_arm))
295-
await self._delete(CacheKey(method, params_key, self._public_arm))
312+
# Must complete: a cancellation between the deletes would leave one arm serving the evicted entry.
313+
await self._cleanup_delete(
314+
CacheKey(method, params_key, self._arm("private")),
315+
CacheKey(method, params_key, self._arm("public")),
316+
)
296317

297318
async def evict_for_notification(self, notification: ServerNotification) -> None:
298319
"""Map a server notification to the entries it makes stale.
@@ -340,6 +361,15 @@ async def _set(self, key: CacheKey, entry: CacheEntry) -> bool:
340361
self._warned_store_ops.discard("set")
341362
return True
342363

364+
async def _cleanup_delete(self, *keys: CacheKey) -> None:
365+
# Must-complete cleanup: shielded so a pending cancellation cannot skip the deletes,
366+
# bounded so a wedged store delete cannot hold client teardown uncancellably.
367+
with anyio.move_on_after(self._store_cleanup_timeout, shield=True) as scope:
368+
for key in keys:
369+
await self._delete(key)
370+
if scope.cancelled_caught:
371+
logger.warning("Response cache store delete timed out; the entry will age out by TTL")
372+
343373
async def _delete(self, key: CacheKey) -> bool:
344374
try:
345375
await self._store.delete(key)

0 commit comments

Comments
 (0)