Context
Follow-up from PR review: #256 (comment)
One-shot request frames currently have no way to be cancelled. If a product issues a
request that bottoms out in a host RPC call and the peer never responds while the
connection stays open, the runtime awaits forever. Concretely, at the leaf
(truapi-server/src/host_rpc_client.rs):
rx.await
.map_err(|_| client_error("json-rpc request was cancelled"))?
rx is a oneshot::Receiver with no timeout and no cancellation seam. It resolves only
when a matching response arrives or the whole connection is torn down. There is no
per-request cancellation.
The intended model (from the PR discussion): when the user cancels a request, its
CallContext (cx) should emit a cancellation signal that propagates down the stack so
the in-flight work unwinds promptly, the pending state is cleaned up, and the product
receives a terminal Cancelled frame.
This issue is to design cancellation properly in the protocol + runtime rather than bolt
on an ad-hoc timeout.
Current state (what already exists vs. what's missing)
Research over the current truapi / truapi-server code:
Already present
cx: CallContext already carries a CancellationToken (truapi/src/lib.rs), but it is
created fresh per request inside each generated handler
(CallContext::with_request_id(request_id) in truapi-server/src/generated/dispatcher.rs).
- A wire frame taxonomy with kinds request / response / start / stop / interrupt /
receive, and a working cooperative-teardown precedent for subscriptions via stop_ids +
SubscriptionManager::handle_stop (truapi-server/src/dispatcher.rs).
- Per-dispatch
AbortHandles in ProductRuntime.in_flight (truapi-server/src/host_core.rs),
but keyed by a monotonic dispatch_id and only fired wholesale on dispose().
Missing (the four gaps)
- The token is a bare
Arc<AtomicBool>, i.e. poll-only. You cannot select! on it, so
it cannot interrupt a blocked rx.await. Its own doc comment already notes "request
tokens only fire when a future runtime explicitly cancels them."
- There is no client to server cancel frame for one-shot requests (only subscription
_stop).
- The per-request token is never registered anywhere, so nothing can find it to fire it.
cx is not threaded below the handler. e.g. get_spec_genesis_hash(&_cx, ...) drops
it; ChainRuntime / subxt / HostRpcClient never see it.
Structural obstacle
subxt sits in the middle. RpcClientT::request_raw has a fixed signature with no
cancellation parameter, so a token cannot be threaded through subxt to the rx.await.
Propagation must happen by racing-and-dropping around subxt, with RAII cleanup at the
leaf.
Proposed propagation model (keyed by request_id)
product -- cancel frame [request_id=X][CANCEL] --------------+
v
ProductRuntime.receive_frame RequestManager.cancel("X")
routes CANCEL to the registry -> token_X.cancel() <- fires awaitable
|
( the in-flight dispatch for X is parked at rx.await )
| wakes
v
dispatcher: select! { handler(cx_X) vs cx_X.cancel().cancelled() }
cancel wins => DROP the handler future
| drop cascades
+------------------------------------------+
v
host.method future drops -> ChainRuntime future drops ->
subxt request future drops -> HostRpcClientInner::request future drops ->
v
rx (oneshot receiver) drops. A PendingGuard on the way out
removes "truapi:N" from `pending` <- clean, no leak
v
dispatcher emits ONE terminal frame for X (Cancelled response)
Design pieces
1. Make the token awaitable (truapi/src/lib.rs). Keep is_cancelled() for cheap
polling; add a future so a blocked await can be raced:
#[derive(Clone)]
pub struct CancellationToken {
cancelled: Arc<AtomicBool>,
notify: Shared<oneshot::Receiver<()>>, // resolves on cancel
tx: Arc<Mutex<Option<oneshot::Sender<()>>>>, // fired once by cancel()
}
impl CancellationToken {
pub fn cancel(&self) {
self.cancelled.store(true, SeqCst);
if let Some(tx) = self.tx.lock().unwrap().take() { let _ = tx.send(()); }
}
/// Future that resolves once cancelled. Cheap to clone; multiple awaiters ok.
pub async fn cancelled(&self) { let _ = self.notify.clone().await; }
}
Any Notify / event-listener style primitive works; Shared<oneshot> keeps it
dependency-light and WASM-friendly.
2. A method-agnostic cancel frame. Do not add a per-method cancel_id (that bloats
RequestFrameIds / the wire table for every method). Cancellation correlates on
request_id, not method, so reserve one control discriminant CANCEL. Wire:
[request_id: X][CANCEL][empty]. Mirrors _stop but is global. (Wire-table ordering is
append-only, so this is additive.)
3. A RequestManager in the dispatcher, the exact analog of SubscriptionManager:
struct RequestManager { in_flight: Mutex<HashMap<String, CancellationToken>> }
impl RequestManager {
fn register(&self, id: String, tok: CancellationToken) { /* insert */ }
fn finish(&self, id: &str) { /* remove */ }
fn cancel(&self, id: &str) { if let Some(t) = self.in_flight.lock().get(id) { t.cancel() } }
}
4. Dispatcher wires it up (dispatcher.rs). Register the token before awaiting (same
reserve-before-await discipline subscriptions already use to avoid the race), race the
handler against cancel, always emit exactly one terminal frame:
if let Some(entry) = self.by_request.get(&id) {
let cx = CallContext::with_request_id(message.request_id.clone());
self.requests.register(message.request_id.clone(), cx.cancel().clone());
let value = futures::select! {
v = (entry.handler)(cx, message.payload.value).fuse() => v.unwrap_or_else(|e| e),
_ = cx.cancel().cancelled().fuse() => encode_cancelled_payload(),
};
self.requests.finish(&message.request_id);
transport.send(ProtocolMessage { request_id: message.request_id,
payload: Payload { id: entry.ids.response_id, value } });
} else if id == CANCEL {
self.requests.cancel(&message.request_id); // no-op if already finished
}
Codegen change this implies: the generated handler closure takes cx from the
dispatcher instead of building its own (generated/dispatcher.rs). Mechanical, one line
per method in the emitter.
5. Clean leaf cleanup via RAII (host_rpc_client.rs). This is what makes
drop-propagation clean without threading anything through subxt. Today dropping the
request future drops rx but leaves the pending entry dangling until a late
response or connection close. Add a guard so drop removes it:
async fn request(&self, method: &str, params: ...) -> Result<Box<RawValue>, RpcError> {
let id = self.next_request_id();
let (tx, rx) = oneshot::channel();
{ /* insert PendingRequest { tx } */ }
let _guard = PendingGuard { inner: self, id: id.clone() }; // removes on drop
self.send_request(&id, method, params.as_deref())?;
rx.await.map_err(|_| client_error("json-rpc request was cancelled"))?
}
Now cancellation is purely "drop the future," and the pending map self-heals whether the
drop came from cancel, dispose, or client teardown.
6. (Optional) cooperative + graceful remote stop. To do more than an abrupt drop
(e.g. tell the node chainHead_v1_stopOperation, or return a typed Cancelled domain error
instead of a framework one), thread cx into the ChainRuntime methods (they already receive
it at the trait boundary and ignore it) and select! on cx.cancel().cancelled() at the
ChainRuntime layer, which can issue the stop RPC before unwinding. This cannot live inside
HostRpcClient because subxt hides the per-request seam; ChainRuntime is the lowest layer
that still holds both cx and the connection.
Gotchas to resolve during design
- Head-of-line blocking.
receive_frame awaits core.dispatch to completion
(host_core.rs). If the host pumps frames strictly serially, a long request blocks intake
of its own cancel frame and cancellation can never arrive mid-flight. Either the host must
pump frames concurrently, or (cleaner) the dispatcher should spawn request handlers the
way it already spawns subscriptions (via the Spawner), returning intake immediately and
tracking the task in RequestManager. Folding the existing AbortHandle into
RequestManager gives a hard backstop alongside the cooperative token.
- Terminal-frame exactly-once. A cancel can land in three windows: before registration
(reserve-before-await closes this), during flight (the select! handles it), or after the
response already went out (RequestManager::cancel misses -> no-op, same as handle_stop
on a dead subscription). The finish() / select! structure must guarantee exactly one
response frame per request_id.
- Client side. The generated TS client needs an API to trigger the cancel frame
(e.g. an AbortSignal wired to each request) and to settle the pending promise on the
terminal Cancelled frame.
Scope options
- Minimal (delivers the core ask): awaitable token (1) + global cancel frame (2) +
RequestManager (3) + dispatcher select! (4) + PendingGuard (5). Cancellation
propagates, rx.await unwinds promptly, the pending map stays clean, the product gets a
Cancelled frame. No subxt or ChainRuntime signature changes.
- Full: add (6) for typed cancelled errors and remote
stopOperation, plus spawning
request handlers with an abort backstop, and the TS client AbortSignal surface.
Open questions
- Wire
CANCEL as a single reserved global discriminant vs. a per-method cancel_id?
(Leaning global: cancellation is per request_id, not per method.)
- Terminal frame shape: reuse the method's
response_id with a Cancelled variant, or a
dedicated cancel-ack discriminant?
- Should requests be spawned (like subscriptions) unconditionally, or only when the host
opts into concurrent intake?
- Do we also want a host/config-level default timeout as a backstop independent of explicit
cancellation?
Context
Follow-up from PR review: #256 (comment)
One-shot request frames currently have no way to be cancelled. If a product issues a
request that bottoms out in a host RPC call and the peer never responds while the
connection stays open, the runtime awaits forever. Concretely, at the leaf
(
truapi-server/src/host_rpc_client.rs):rxis aoneshot::Receiverwith no timeout and no cancellation seam. It resolves onlywhen a matching response arrives or the whole connection is torn down. There is no
per-request cancellation.
The intended model (from the PR discussion): when the user cancels a request, its
CallContext(cx) should emit a cancellation signal that propagates down the stack sothe in-flight work unwinds promptly, the pending state is cleaned up, and the product
receives a terminal
Cancelledframe.This issue is to design cancellation properly in the protocol + runtime rather than bolt
on an ad-hoc timeout.
Current state (what already exists vs. what's missing)
Research over the current
truapi/truapi-servercode:Already present
cx: CallContextalready carries aCancellationToken(truapi/src/lib.rs), but it iscreated fresh per request inside each generated handler
(
CallContext::with_request_id(request_id)intruapi-server/src/generated/dispatcher.rs).receive, and a working cooperative-teardown precedent for subscriptions via
stop_ids+SubscriptionManager::handle_stop(truapi-server/src/dispatcher.rs).AbortHandles inProductRuntime.in_flight(truapi-server/src/host_core.rs),but keyed by a monotonic
dispatch_idand only fired wholesale ondispose().Missing (the four gaps)
Arc<AtomicBool>, i.e. poll-only. You cannotselect!on it, soit cannot interrupt a blocked
rx.await. Its own doc comment already notes "requesttokens only fire when a future runtime explicitly cancels them."
_stop).cxis not threaded below the handler. e.g.get_spec_genesis_hash(&_cx, ...)dropsit; ChainRuntime / subxt /
HostRpcClientnever see it.Structural obstacle
subxtsits in the middle.RpcClientT::request_rawhas a fixed signature with nocancellation parameter, so a token cannot be threaded through subxt to the
rx.await.Propagation must happen by racing-and-dropping around subxt, with RAII cleanup at the
leaf.
Proposed propagation model (keyed by
request_id)Design pieces
1. Make the token awaitable (
truapi/src/lib.rs). Keepis_cancelled()for cheappolling; add a future so a blocked await can be raced:
Any
Notify/event-listenerstyle primitive works;Shared<oneshot>keeps itdependency-light and WASM-friendly.
2. A method-agnostic cancel frame. Do not add a per-method
cancel_id(that bloatsRequestFrameIds/ the wire table for every method). Cancellation correlates onrequest_id, not method, so reserve one control discriminantCANCEL. Wire:[request_id: X][CANCEL][empty]. Mirrors_stopbut is global. (Wire-table ordering isappend-only, so this is additive.)
3. A
RequestManagerin the dispatcher, the exact analog ofSubscriptionManager:4. Dispatcher wires it up (
dispatcher.rs). Register the token before awaiting (samereserve-before-await discipline subscriptions already use to avoid the race), race the
handler against cancel, always emit exactly one terminal frame:
Codegen change this implies: the generated handler closure takes
cxfrom thedispatcher instead of building its own (
generated/dispatcher.rs). Mechanical, one lineper method in the emitter.
5. Clean leaf cleanup via RAII (
host_rpc_client.rs). This is what makesdrop-propagation clean without threading anything through subxt. Today dropping the
requestfuture dropsrxbut leaves thependingentry dangling until a lateresponse or connection close. Add a guard so drop removes it:
Now cancellation is purely "drop the future," and the
pendingmap self-heals whether thedrop came from cancel, dispose, or client teardown.
6. (Optional) cooperative + graceful remote stop. To do more than an abrupt drop
(e.g. tell the node
chainHead_v1_stopOperation, or return a typedCancelleddomain errorinstead of a framework one), thread
cxinto the ChainRuntime methods (they already receiveit at the trait boundary and ignore it) and
select!oncx.cancel().cancelled()at theChainRuntime layer, which can issue the stop RPC before unwinding. This cannot live inside
HostRpcClientbecause subxt hides the per-request seam; ChainRuntime is the lowest layerthat still holds both
cxand the connection.Gotchas to resolve during design
receive_frameawaitscore.dispatchto completion(
host_core.rs). If the host pumps frames strictly serially, a long request blocks intakeof its own cancel frame and cancellation can never arrive mid-flight. Either the host must
pump frames concurrently, or (cleaner) the dispatcher should spawn request handlers the
way it already spawns subscriptions (via the
Spawner), returning intake immediately andtracking the task in
RequestManager. Folding the existingAbortHandleintoRequestManagergives a hard backstop alongside the cooperative token.(reserve-before-await closes this), during flight (the
select!handles it), or after theresponse already went out (
RequestManager::cancelmisses -> no-op, same ashandle_stopon a dead subscription). The
finish()/select!structure must guarantee exactly oneresponse frame per
request_id.(e.g. an
AbortSignalwired to each request) and to settle the pending promise on theterminal
Cancelledframe.Scope options
RequestManager(3) + dispatcherselect!(4) +PendingGuard(5). Cancellationpropagates,
rx.awaitunwinds promptly, the pending map stays clean, the product gets aCancelledframe. No subxt or ChainRuntime signature changes.stopOperation, plus spawningrequest handlers with an abort backstop, and the TS client
AbortSignalsurface.Open questions
CANCELas a single reserved global discriminant vs. a per-methodcancel_id?(Leaning global: cancellation is per
request_id, not per method.)response_idwith aCancelledvariant, or adedicated cancel-ack discriminant?
opts into concurrent intake?
cancellation?