Skip to content

Design request cancellation in the protocol #263

Description

@pgherveou

Context

Follow-up from PR review: #256 (comment)

One-shot request frames currently have no way to be cancelled. If a product issues a
request that bottoms out in a host RPC call and the peer never responds while the
connection stays open, the runtime awaits forever. Concretely, at the leaf
(truapi-server/src/host_rpc_client.rs):

rx.await
    .map_err(|_| client_error("json-rpc request was cancelled"))?

rx is a oneshot::Receiver with no timeout and no cancellation seam. It resolves only
when a matching response arrives or the whole connection is torn down. There is no
per-request cancellation.

The intended model (from the PR discussion): when the user cancels a request, its
CallContext (cx) should emit a cancellation signal that propagates down the stack so
the in-flight work unwinds promptly, the pending state is cleaned up, and the product
receives a terminal Cancelled frame.

This issue is to design cancellation properly in the protocol + runtime rather than bolt
on an ad-hoc timeout.

Current state (what already exists vs. what's missing)

Research over the current truapi / truapi-server code:

Already present

  • cx: CallContext already carries a CancellationToken (truapi/src/lib.rs), but it is
    created fresh per request inside each generated handler
    (CallContext::with_request_id(request_id) in truapi-server/src/generated/dispatcher.rs).
  • A wire frame taxonomy with kinds request / response / start / stop / interrupt /
    receive, and a working cooperative-teardown precedent for subscriptions via stop_ids +
    SubscriptionManager::handle_stop (truapi-server/src/dispatcher.rs).
  • Per-dispatch AbortHandles in ProductRuntime.in_flight (truapi-server/src/host_core.rs),
    but keyed by a monotonic dispatch_id and only fired wholesale on dispose().

Missing (the four gaps)

  1. The token is a bare Arc<AtomicBool>, i.e. poll-only. You cannot select! on it, so
    it cannot interrupt a blocked rx.await. Its own doc comment already notes "request
    tokens only fire when a future runtime explicitly cancels them."
  2. There is no client to server cancel frame for one-shot requests (only subscription
    _stop).
  3. The per-request token is never registered anywhere, so nothing can find it to fire it.
  4. cx is not threaded below the handler. e.g. get_spec_genesis_hash(&_cx, ...) drops
    it; ChainRuntime / subxt / HostRpcClient never see it.

Structural obstacle

  • subxt sits in the middle. RpcClientT::request_raw has a fixed signature with no
    cancellation parameter, so a token cannot be threaded through subxt to the rx.await.
    Propagation must happen by racing-and-dropping around subxt, with RAII cleanup at the
    leaf.

Proposed propagation model (keyed by request_id)

 product -- cancel frame [request_id=X][CANCEL] --------------+
                                                              v
 ProductRuntime.receive_frame        RequestManager.cancel("X")
   routes CANCEL to the registry  ->    token_X.cancel()   <- fires awaitable
                                                              |
        ( the in-flight dispatch for X is parked at rx.await )
                                                              |  wakes
                                                              v
   dispatcher: select! { handler(cx_X)  vs  cx_X.cancel().cancelled() }
        cancel wins => DROP the handler future
                                                              |  drop cascades
                    +------------------------------------------+
                    v
   host.method future drops -> ChainRuntime future drops ->
   subxt request future drops -> HostRpcClientInner::request future drops ->
                    v
   rx (oneshot receiver) drops.  A PendingGuard on the way out
   removes "truapi:N" from `pending`  <- clean, no leak
                    v
   dispatcher emits ONE terminal frame for X (Cancelled response)

Design pieces

1. Make the token awaitable (truapi/src/lib.rs). Keep is_cancelled() for cheap
polling; add a future so a blocked await can be raced:

#[derive(Clone)]
pub struct CancellationToken {
    cancelled: Arc<AtomicBool>,
    notify: Shared<oneshot::Receiver<()>>,          // resolves on cancel
    tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,    // fired once by cancel()
}

impl CancellationToken {
    pub fn cancel(&self) {
        self.cancelled.store(true, SeqCst);
        if let Some(tx) = self.tx.lock().unwrap().take() { let _ = tx.send(()); }
    }
    /// Future that resolves once cancelled. Cheap to clone; multiple awaiters ok.
    pub async fn cancelled(&self) { let _ = self.notify.clone().await; }
}

Any Notify / event-listener style primitive works; Shared<oneshot> keeps it
dependency-light and WASM-friendly.

2. A method-agnostic cancel frame. Do not add a per-method cancel_id (that bloats
RequestFrameIds / the wire table for every method). Cancellation correlates on
request_id, not method, so reserve one control discriminant CANCEL. Wire:
[request_id: X][CANCEL][empty]. Mirrors _stop but is global. (Wire-table ordering is
append-only, so this is additive.)

3. A RequestManager in the dispatcher, the exact analog of SubscriptionManager:

struct RequestManager { in_flight: Mutex<HashMap<String, CancellationToken>> }
impl RequestManager {
    fn register(&self, id: String, tok: CancellationToken) { /* insert */ }
    fn finish(&self, id: &str) { /* remove */ }
    fn cancel(&self, id: &str) { if let Some(t) = self.in_flight.lock().get(id) { t.cancel() } }
}

4. Dispatcher wires it up (dispatcher.rs). Register the token before awaiting (same
reserve-before-await discipline subscriptions already use to avoid the race), race the
handler against cancel, always emit exactly one terminal frame:

if let Some(entry) = self.by_request.get(&id) {
    let cx = CallContext::with_request_id(message.request_id.clone());
    self.requests.register(message.request_id.clone(), cx.cancel().clone());

    let value = futures::select! {
        v = (entry.handler)(cx, message.payload.value).fuse() => v.unwrap_or_else(|e| e),
        _ = cx.cancel().cancelled().fuse()                    => encode_cancelled_payload(),
    };
    self.requests.finish(&message.request_id);
    transport.send(ProtocolMessage { request_id: message.request_id,
        payload: Payload { id: entry.ids.response_id, value } });
} else if id == CANCEL {
    self.requests.cancel(&message.request_id);   // no-op if already finished
}

Codegen change this implies: the generated handler closure takes cx from the
dispatcher
instead of building its own (generated/dispatcher.rs). Mechanical, one line
per method in the emitter.

5. Clean leaf cleanup via RAII (host_rpc_client.rs). This is what makes
drop-propagation clean without threading anything through subxt. Today dropping the
request future drops rx but leaves the pending entry dangling until a late
response or connection close. Add a guard so drop removes it:

async fn request(&self, method: &str, params: ...) -> Result<Box<RawValue>, RpcError> {
    let id = self.next_request_id();
    let (tx, rx) = oneshot::channel();
    { /* insert PendingRequest { tx } */ }
    let _guard = PendingGuard { inner: self, id: id.clone() }; // removes on drop
    self.send_request(&id, method, params.as_deref())?;
    rx.await.map_err(|_| client_error("json-rpc request was cancelled"))?
}

Now cancellation is purely "drop the future," and the pending map self-heals whether the
drop came from cancel, dispose, or client teardown.

6. (Optional) cooperative + graceful remote stop. To do more than an abrupt drop
(e.g. tell the node chainHead_v1_stopOperation, or return a typed Cancelled domain error
instead of a framework one), thread cx into the ChainRuntime methods (they already receive
it at the trait boundary and ignore it) and select! on cx.cancel().cancelled() at the
ChainRuntime layer, which can issue the stop RPC before unwinding. This cannot live inside
HostRpcClient because subxt hides the per-request seam; ChainRuntime is the lowest layer
that still holds both cx and the connection.

Gotchas to resolve during design

  • Head-of-line blocking. receive_frame awaits core.dispatch to completion
    (host_core.rs). If the host pumps frames strictly serially, a long request blocks intake
    of its own cancel frame and cancellation can never arrive mid-flight. Either the host must
    pump frames concurrently, or (cleaner) the dispatcher should spawn request handlers the
    way it already spawns subscriptions (via the Spawner), returning intake immediately and
    tracking the task in RequestManager. Folding the existing AbortHandle into
    RequestManager gives a hard backstop alongside the cooperative token.
  • Terminal-frame exactly-once. A cancel can land in three windows: before registration
    (reserve-before-await closes this), during flight (the select! handles it), or after the
    response already went out (RequestManager::cancel misses -> no-op, same as handle_stop
    on a dead subscription). The finish() / select! structure must guarantee exactly one
    response frame per request_id.
  • Client side. The generated TS client needs an API to trigger the cancel frame
    (e.g. an AbortSignal wired to each request) and to settle the pending promise on the
    terminal Cancelled frame.

Scope options

  • Minimal (delivers the core ask): awaitable token (1) + global cancel frame (2) +
    RequestManager (3) + dispatcher select! (4) + PendingGuard (5). Cancellation
    propagates, rx.await unwinds promptly, the pending map stays clean, the product gets a
    Cancelled frame. No subxt or ChainRuntime signature changes.
  • Full: add (6) for typed cancelled errors and remote stopOperation, plus spawning
    request handlers with an abort backstop, and the TS client AbortSignal surface.

Open questions

  • Wire CANCEL as a single reserved global discriminant vs. a per-method cancel_id?
    (Leaning global: cancellation is per request_id, not per method.)
  • Terminal frame shape: reuse the method's response_id with a Cancelled variant, or a
    dedicated cancel-ack discriminant?
  • Should requests be spawned (like subscriptions) unconditionally, or only when the host
    opts into concurrent intake?
  • Do we also want a host/config-level default timeout as a backstop independent of explicit
    cancellation?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions