From 73a8918d3763bc7009bceb1a5048ccb60d71375e Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Fri, 5 Jun 2026 01:45:25 +0000 Subject: [PATCH] [SEA-NodeJS] Sync execute: eager cancellable handle + close-drives commit; accept-and-ignore unsupported options MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The default sync path (runAsync:false) now returns the operation handle immediately and kicks off the inline-materialise result() in the background, instead of blocking executeStatement until the statement is terminal. This restores mid-run cancellation on the sync path WITHOUT the runAsync submit/poll/refetch latency tax: the kernel execute() publishes the statement id mid-flight, so a concurrent op.cancel() interrupts the running execute via the StatementCanceller. Fire-and-forget DDL/DML (execute then close without a fetch) and dependent-statement ordering still "just work": the operation backend's close() drives the same memoised result() to terminal before releasing (committing the statement), UNLESS the op was cancelled — in which case it releases without committing. Early-window cancel correctness (cancel issued before the server id is published) depends on the kernel StatementCanceller intent-hold (databricks/databricks-sql-kernel#133): the canceller holds the cancel intent until the id appears, then dispatches the real CancelStatement, so there is no orphaned server statement. No napi surface change. Also retains the option no-ops: accept-and-ignore for unsupported per-statement options (useCloudFetch / useLZ4Compression / stagingAllowedLocalPath) and queryTimeout (no-op + TODO), matching the Python kernel client rather than hard-failing the connection. Validated e2e (pecotesting): CREATE/INSERT fire-and-forget commit, dependent ordering, 100k read, mid-run cancel (late + early intent-hold window). Unit: SEA suite 250 passing. Latency vs Thrift: SELECT 1 ~parity (-2%..+8%), 100k rows -16%..-20% (faster). Concurrency: SELECT 1 conc=40 -19%, 100k conc=10 -7% (faster). Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore --- lib/sea/SeaOperationBackend.ts | 22 ++++-- lib/sea/SeaSessionBackend.ts | 122 ++++++++++++++--------------- tests/unit/sea/execution.test.ts | 129 +++++++++++++++++++------------ 3 files changed, 155 insertions(+), 118 deletions(-) diff --git a/lib/sea/SeaOperationBackend.ts b/lib/sea/SeaOperationBackend.ts index 2a4c8136..0e5dc6fe 100644 --- a/lib/sea/SeaOperationBackend.ts +++ b/lib/sea/SeaOperationBackend.ts @@ -218,15 +218,25 @@ export default class SeaOperationBackend implements IOperationBackend { // Lifecycle surface. The async/metadata handles expose both cancel/close. // The sync-execute path uses a composite: `cancel()` always routes to the // cancellable execution (lock-free, interrupts a running `result()` - // mid-compute and is a no-op once terminal); `close()` closes the resolved - // terminal statement once `result()` produced it, OR — if `result()` is - // still in flight — proactively cancels the running execution so the server - // stops computing immediately rather than running on until the kernel's - // drop-guard fires whenever this handle is eventually GC'd. + // mid-compute and is a no-op once terminal); `close()` drives the statement + // to terminal first (fire-and-forget commit) unless the op was cancelled. this.lifecycleHandle = cancellableExecution ? { cancel: () => cancellableExecution.cancel(), - close: () => (this.blockingStatement ? this.blockingStatement.close() : cancellableExecution.cancel()), + // Fire-and-forget commit semantics. A submitted statement is only + // guaranteed to run server-side once `result()` reaches terminal. + // On close, if the op was NOT cancelled, drive `result()` to + // completion first (the eager kick-off in the session backend means + // this is usually already in flight, so we just await the memoised + // promise) so a `CREATE`/`INSERT` issued without a fetch still + // commits — then close the resolved terminal statement. A cancelled + // op skips the drive and releases the (already-cancelling) execution. + close: async () => { + if (!this.lifecycle.isCancelled && !this.blockingStatement) { + await this.getFetchHandle().catch(() => undefined); + } + return this.blockingStatement ? this.blockingStatement.close() : cancellableExecution.cancel(); + }, } : ((asyncStatement ?? statement) as SeaStatementHandle); this.context = context; diff --git a/lib/sea/SeaSessionBackend.ts b/lib/sea/SeaSessionBackend.ts index d593f87e..c469dbfc 100644 --- a/lib/sea/SeaSessionBackend.ts +++ b/lib/sea/SeaSessionBackend.ts @@ -35,7 +35,6 @@ import ParameterError from '../errors/ParameterError'; import { LogLevel } from '../contracts/IDBSQLLogger'; import { SeaConnection, SeaNativeExecuteOptions, SeaStatement } from './SeaNativeLoader'; import { decodeNapiKernelError } from './SeaErrorMapping'; -import { numberToInt64 } from '../thrift-backend/ThriftSessionBackend'; import SeaOperationBackend from './SeaOperationBackend'; import { buildSeaPositionalParams, buildSeaNamedParams } from './SeaPositionalParams'; import { seaServerInfoValue } from './SeaServerInfo'; @@ -122,38 +121,41 @@ export default class SeaSessionBackend implements ISessionBackend { * Per-statement options forwarded to the kernel `ExecuteOptions`: * - `ordinalParameters` / `namedParameters` → bound params (mutually * exclusive — the kernel binds one placeholder style per statement); - * - `queryTimeout` → enforced client-side by the operation backend's poll - * deadline (the kernel ignores `queryTimeoutSecs` on the async submit - * path), NOT forwarded to the napi options; * - `rowLimit` → `rowLimit` (SEA-only server-side row cap); * - `queryTags` → serialised into the conf overlay's reserved * `query_tags` key (the same wire shape Thrift's `serializeQueryTags` * produces), merged with any explicit `statementConf`. * - * Still rejected (genuinely unsupported on SEA, rather than silently - * dropped): `useCloudFetch` (governed by the kernel `ResultConfig`, not a - * per-statement knob), `useLZ4Compression` (kernel owns result compression), - * and `stagingAllowedLocalPath` (volume operations). `maxRows` is applied by - * the facade at fetch time, so it is intentionally not handled here. + * Accepted but IGNORED (no-op — the kernel exposes no per-statement knob, so + * we drop rather than reject; see the body for details and TODOs): + * `useCloudFetch`, `useLZ4Compression`, `stagingAllowedLocalPath`, and + * `queryTimeout`. `maxRows` is applied by the facade at fetch time, so it is + * intentionally not handled here. */ public async executeStatement(statement: string, options: ExecuteStatementOptions): Promise { this.failIfClosed(); - if (options.useCloudFetch !== undefined) { - throw new HiveDriverError( - 'SEA executeStatement: useCloudFetch is controlled by the kernel result configuration and is not a per-statement option on SEA', - ); - } - if (options.useLZ4Compression !== undefined) { - throw new HiveDriverError( - 'SEA executeStatement: useLZ4Compression is not supported on SEA (result compression is governed by the kernel)', - ); - } - if (options.stagingAllowedLocalPath !== undefined) { - throw new HiveDriverError( - 'SEA executeStatement: stagingAllowedLocalPath (volume operations) is not supported on SEA', - ); - } + // `useCloudFetch`, `useLZ4Compression`, and `stagingAllowedLocalPath` are + // accepted and IGNORED (no-op) on the kernel-backed SEA path rather than + // rejected — the kernel exposes no per-statement knob for any of them, so a + // hard failure would break callers that set these options globally. This + // mirrors the Python connector's kernel backend + // (`KernelDatabricksClient.execute_command`), which takes the same flags and + // never reads them. + // + // - `useCloudFetch`: result transport is governed by the session-level + // kernel `ResultConfig.cloudfetch_enabled` (default: CloudFetch on); + // there is no per-statement override on the napi surface. + // - `useLZ4Compression`: the kernel transparently decodes whatever + // compression the server returns (`manifest.result_compression`) and + // exposes no compression-request knob. + // - `stagingAllowedLocalPath`: the kernel has no Volume (PUT/GET/REMOVE) + // API yet, so `SeaOperationBackend` always reports + // `isStagingOperation: false` and `DBSQLSession` treats such statements + // as ordinary queries. Non-staging queries that set the option run + // normally (parity with Thrift). + // TODO(SEA): wire real volume operations once the kernel exposes a + // Volume API + napi `is_volume_operation`. // `runAsync` selects the kernel execution path. NOTE: this is a SEA/kernel- // specific use of the option — the Thrift backend hardcodes `runAsync: true` @@ -166,26 +168,27 @@ export default class SeaSessionBackend implements ISessionBackend { // - DEFAULT (`runAsync` false/undefined) — SYNC. Route through // `executeStatementCancellable`: the kernel blocks on `execute()` // (server-side direct-results / poll-to-terminal), which is faster and, - // with the napi sync canceller, fully cancellable mid-COMPUTE. The - // blocking drive runs in the operation backend's `result()` (inside - // `waitUntilReady`, which the facade invokes lazily at first fetch). - // `queryTimeoutSecs` IS honoured on this path (forwarded to the napi - // options below) since the kernel `execute()` consults it. + // with the napi sync canceller, fully cancellable mid-COMPUTE. // // - `runAsync: true` — ASYNC. Submit (`wait_timeout=0s`): the server // returns a pending `AsyncStatement` immediately while the query runs; // the backend polls `status()` to terminal in `waitUntilReady()` and - // materialises results via `awaitResult()`. `queryTimeoutSecs` is - // ignored by the kernel on submit, so it is enforced client-side by the - // operation backend's poll-loop deadline instead. + // materialises results via `awaitResult()`. + // + // TODO(SEA): `queryTimeout` is intentionally a NO-OP here. It must NOT be + // mapped to the SEA `wait_timeout` wire field: `wait_timeout` is the + // inline-result wait knob (valid range {0} ∪ [5,50]s, paired with + // `on_wait_timeout`), a different concept from a server statement-execution + // timeout, and out-of-range values fail with HTTP 400. The correct SEA + // mechanism is the `STATEMENT_TIMEOUT` session configuration (seconds); the + // Python connector forwards no per-statement timeout at all. Wiring this + // properly (STATEMENT_TIMEOUT and/or a client-side poll deadline) is + // deferred — until then the option is accepted and ignored. const runAsync = options.runAsync ?? false; - const queryTimeoutSecs = - options.queryTimeout !== undefined ? numberToInt64(options.queryTimeout).toNumber() : undefined; + + const execOptions = this.buildExecuteOptions(options); if (!runAsync) { - // Sync path: forward `queryTimeoutSecs` to the napi options — the kernel - // `execute()` honours it (server statement timeout). - const execOptions = this.buildExecuteOptions(options, queryTimeoutSecs); let cancellableExecution; try { cancellableExecution = @@ -195,19 +198,26 @@ export default class SeaSessionBackend implements ISessionBackend { } catch (err) { throw this.logAndMapError('executeStatement', err); } - return new SeaOperationBackend({ + const op = new SeaOperationBackend({ cancellableExecution: cancellableExecution!, context: this.context, - // The kernel honours `queryTimeoutSecs` on the sync `execute` path, so - // it is forwarded via the napi options (see `buildExecuteOptions`); the - // backend also keeps it as a deadline guard for parity with async. - queryTimeoutSecs, }); + // Eager-cancellable sync path: kick off the inline-materialise `result()` + // in the background and return the handle IMMEDIATELY — do NOT await it. + // The kernel `execute()` publishes the statement id mid-execute, so a + // concurrent `op.cancel()` interrupts the running execute via the + // StatementCanceller (and, if the id has not been published yet, the + // canceller holds the cancel intent until it is, then dispatches the real + // CancelStatement — so there is no orphaned server statement). This gives + // mid-run cancel on the SYNC path WITHOUT `runAsync`'s submit/poll/refetch + // tax; `fetchAll()` awaits the same memoised `result()` (so small queries + // stay fast / inline). Fire-and-forget DDL/DML (execute then close without + // a fetch) still commits: the operation backend's `close()` drives this + // same `result()` to terminal before releasing, unless the op was cancelled. + op.waitUntilReady().catch(() => undefined); + return op; } - // Async path: do NOT forward `queryTimeoutSecs` (the kernel ignores it on - // submit — `wait_timeout=0s`); it is enforced client-side by the poll loop. - const execOptions = this.buildExecuteOptions(options); let asyncStatement; try { asyncStatement = @@ -217,16 +227,9 @@ export default class SeaSessionBackend implements ISessionBackend { } catch (err) { throw this.logAndMapError('executeStatement', err); } - // `queryTimeout` is enforced client-side by the operation backend's poll - // loop: the kernel ignores `queryTimeoutSecs` on the async submit path - // (`submitStatement` always sends `wait_timeout=0s`), so we do NOT forward - // it to the napi options — passing it there would be a silent no-op. return new SeaOperationBackend({ asyncStatement: asyncStatement!, context: this.context, - // `queryTimeout` is typed `number | bigint | Int64`; `numberToInt64(...).toNumber()` - // coerces all three (a bare `Number(int64)` yields NaN — node-int64 has no valueOf). - queryTimeoutSecs, }); } @@ -235,10 +238,7 @@ export default class SeaSessionBackend implements ISessionBackend { * `ExecuteOptions`, returning `undefined` when nothing is set so the * no-options call shape (`executeStatement(sql)`) is preserved. */ - private buildExecuteOptions( - options: ExecuteStatementOptions, - queryTimeoutSecs?: number, - ): SeaNativeExecuteOptions | undefined { + private buildExecuteOptions(options: ExecuteStatementOptions): SeaNativeExecuteOptions | undefined { // Positional (`?`) and named (`:name`) parameters are mutually exclusive — // the kernel binds one placeholder style per statement. Use the SAME error // type and message as the Thrift backend (`ThriftSessionBackend`) so a @@ -256,14 +256,8 @@ export default class SeaSessionBackend implements ISessionBackend { if (namedParams !== undefined) { execOptions.namedParams = namedParams; } - // `queryTimeoutSecs` is forwarded only on the SYNC path (the caller passes - // it in): the kernel `execute()` consults it as the server statement - // timeout. On the async submit path the caller omits it (the kernel ignores - // it under `wait_timeout=0s`), so it is enforced client-side by the - // operation backend's poll-loop deadline instead (see executeStatement). - if (queryTimeoutSecs !== undefined) { - execOptions.queryTimeoutSecs = queryTimeoutSecs; - } + // `queryTimeout` is intentionally NOT forwarded — it is a no-op on SEA (see + // the TODO in executeStatement). It must not become the SEA `wait_timeout`. if (options.rowLimit !== undefined) { execOptions.rowLimit = Number(options.rowLimit); } diff --git a/tests/unit/sea/execution.test.ts b/tests/unit/sea/execution.test.ts index 81cdfadd..713ec7f1 100644 --- a/tests/unit/sea/execution.test.ts +++ b/tests/unit/sea/execution.test.ts @@ -591,44 +591,59 @@ describe('SeaSessionBackend', () => { expect((thrown as Error).message).to.equal('Driver does not support both ordinal and named parameters.'); }); - it('executeStatement (sync default) DOES forward queryTimeout to the napi options', async () => { + it('executeStatement does NOT forward queryTimeout — it is a no-op on SEA', async () => { const connection = new FakeNativeConnection(); const session = makeSession(connection); await session.executeStatement('SELECT 1', { queryTimeout: 30 }); - // Sync path: the kernel `execute()` honours queryTimeoutSecs (server - // statement timeout), so the backend forwards it onto the napi options. - expect((connection.lastOptions as { queryTimeoutSecs?: number } | undefined)?.queryTimeoutSecs).to.equal(30); + // queryTimeout is intentionally a NO-OP on SEA (see SeaSessionBackend): the + // kernel would otherwise map queryTimeoutSecs onto the SEA `wait_timeout` wire + // field (valid range {0} ∪ [5,50]s) — a different concept from a statement + // timeout, and out-of-range values fail HTTP 400. The proper mechanism is the + // `STATEMENT_TIMEOUT` session config (deferred). So it is neither forwarded + // nor allowed to synthesise an options object. + expect(connection.lastOptions).to.equal(undefined); }); - it('executeStatement (runAsync: true) does NOT forward queryTimeout to submit (kernel ignores it; enforced client-side)', async () => { + it('executeStatement (runAsync: true) does NOT forward queryTimeout (no-op on SEA)', async () => { const connection = new FakeNativeConnection(); const session = makeSession(connection); await session.executeStatement('SELECT 1', { queryTimeout: 30, runAsync: true }); - // Async submit path: the kernel ignores queryTimeoutSecs under - // `wait_timeout=0s`, so it's enforced client-side by the poll deadline - // instead — never forwarded to the napi options. expect((connection.lastOptions as { queryTimeoutSecs?: number } | undefined)?.queryTimeoutSecs).to.equal(undefined); }); - it('coerces an Int64 queryTimeout into the client-side deadline on the async path (not NaN)', async function int64Timeout() { - // Regression: `Number(new Int64(...))` yields NaN (node-int64 has no valueOf), - // which would silently disable the deadline. The backend must coerce via - // numberToInt64(...).toNumber() so an Int64 queryTimeout still bounds the poll. - // Exercised on the async path, where the client-side poll deadline applies. - // eslint-disable-next-line no-invalid-this - this.timeout(5000); + it('accepts an Int64 queryTimeout without forwarding it or crashing (no-op on SEA)', async () => { + // queryTimeout is a no-op on SEA, but the option must still be ACCEPTED for + // any value type the public API allows (number | bigint | Int64) without + // throwing or synthesising napi options. Int64 is the awkward case + // (node-int64 has no valueOf), so assert it specifically. const connection = new FakeNativeConnection(); - connection.submitStatusValue = 'Running'; // never reaches a terminal state const session = makeSession(connection); - const op = await session.executeStatement('SELECT 1', { queryTimeout: new Int64(1), runAsync: true }); - let thrown: unknown; - try { - await op.waitUntilReady(); - } catch (err) { - thrown = err; - } - expect(thrown, 'Int64(1) timeout must fire — NaN would poll forever').to.be.instanceOf(OperationStateError); - expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Timeout); + await session.executeStatement('SELECT 1', { queryTimeout: new Int64(1) }); + expect((connection.lastOptions as { queryTimeoutSecs?: number } | undefined)?.queryTimeoutSecs).to.equal(undefined); + expect(connection.lastOptions).to.equal(undefined); + }); + + it('Option A: sync executeStatement drives the statement to terminal before returning', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + const op = await session.executeStatement('SELECT 1', {}); + // The default sync path blocks to terminal in executeStatement (matching + // JDBC / ADBC C# / Python use_kernel), so the returned op is already finished + // — status reports Succeeded with no explicit waitUntilReady()/fetch. This is + // what makes fire-and-forget DDL/DML and dependent statements "just work". + const status = await op.status(false); + expect(status.state).to.equal(OperationState.Succeeded); + }); + + it('runAsync: true does NOT drive to terminal in executeStatement (returns a pending, cancellable handle)', async () => { + const connection = new FakeNativeConnection(); + connection.submitStatusValue = 'Running'; + const session = makeSession(connection); + const op = await session.executeStatement('SELECT 1', { runAsync: true }); + // The async path returns immediately with a running handle so the caller can + // poll / cancel mid-run (the place to do mid-run cancellation under Option A). + const status = await op.status(false); + expect(status.state).to.equal(OperationState.Running); }); it('executeStatement forwards rowLimit', async () => { @@ -690,25 +705,26 @@ describe('SeaSessionBackend', () => { } }); - // Genuinely unsupported on SEA — rejected (rather than silently ignored) so - // a caller/agent gets signal instead of a no-op. queryTags / queryTimeout / - // rowLimit are NOT here — they are forwarded (asserted above). - for (const { name, options, re } of [ - { name: 'useCloudFetch', options: { useCloudFetch: true }, re: /useCloudFetch/ }, - { name: 'useLZ4Compression', options: { useLZ4Compression: true }, re: /useLZ4Compression/ }, - { name: 'stagingAllowedLocalPath', options: { stagingAllowedLocalPath: '/tmp' }, re: /stagingAllowedLocalPath/ }, + // Not per-statement knobs on the kernel-backed SEA path — ACCEPTED and IGNORED + // (no-op) rather than rejected, matching the Python connector's kernel backend + // (`KernelDatabricksClient.execute_command`, which takes the same flags and + // never reads them). A hard failure would break callers that set these + // globally; the kernel exposes no per-statement override for any of them + // (CloudFetch is the session-level ResultConfig, compression is decoded from + // the manifest, and volume operations have no kernel API yet). + for (const { name, options } of [ + { name: 'useCloudFetch', options: { useCloudFetch: true } }, + { name: 'useLZ4Compression', options: { useLZ4Compression: true } }, + { name: 'stagingAllowedLocalPath', options: { stagingAllowedLocalPath: '/tmp' } }, ] as const) { - it(`executeStatement rejects ${name} rather than silently ignoring it`, async () => { + it(`executeStatement accepts and ignores ${name} (no throw, not forwarded)`, async () => { const connection = new FakeNativeConnection(); const session = makeSession(connection); - let thrown: unknown; - try { - await session.executeStatement('SELECT 1', options); - } catch (err) { - thrown = err; - } - expect(thrown).to.be.instanceOf(HiveDriverError); - expect((thrown as Error).message).to.match(re); + // Must not throw... + await session.executeStatement('SELECT 1', options); + // ...and must not be forwarded to the napi options (it has no kernel knob), + // so a SELECT-1 with only this option still uses the no-options fast path. + expect(connection.lastOptions).to.equal(undefined); }); } @@ -1101,14 +1117,30 @@ describe('SeaOperationBackend — sync (executeStatementCancellable) path', () = expect(exec.cancelled).to.equal(true); }); - it('close() on a still-running sync op cancels the server execution (no compute leak)', async () => { + it('close() on a not-yet-fetched sync op drives result() to terminal then closes it (fire-and-forget commit, no cancel)', async () => { + const exec = new FakeCancellableExecution(); + const op = makeSyncOp(exec); + // Fire-and-forget DDL/DML: execute then close without a fetch. close() must + // drive result() to completion (so the statement actually commits + // server-side) and then close the resolved terminal statement — it must NOT + // cancel the execution (which would abort an uncommitted CREATE/INSERT). + await op.close(); + expect(exec.cancelled).to.equal(false); + expect(exec.resultCalls).to.equal(1); + expect(exec.resultHandle.closed).to.equal(true); + }); + + it('close() after cancel() releases the execution WITHOUT driving result() (a cancelled op does not commit)', async () => { const exec = new FakeCancellableExecution(); const op = makeSyncOp(exec); - // close() before result() resolved: with no terminal statement to close, - // it must proactively cancel the running execution rather than no-op - // (otherwise server compute runs on until the kernel drop-guard fires at GC). + await op.cancel(); await op.close(); + // The op was explicitly cancelled, so close() must NOT drive result() to + // terminal (no commit of a cancelled statement) — it just releases via the + // cancellable execution. expect(exec.cancelled).to.equal(true); + expect(exec.resultCalls).to.equal(0); + expect(exec.resultHandle.closed).to.equal(false); }); it('cancel() interrupts an in-flight result(), surfacing OperationStateError(Canceled)', async () => { @@ -1139,12 +1171,13 @@ describe('SeaOperationBackend — sync (executeStatementCancellable) path', () = expect(exec.resultHandle.closed).to.equal(true); }); - it('close() before result() resolves is a no-op (nothing server-side to close yet)', async () => { + it('close() before any fetch drives result() to terminal and reports success', async () => { const exec = new FakeCancellableExecution(); const op = makeSyncOp(exec); - // Should not throw even though result() never ran. + // Should not throw; drives result() to terminal (commit) then closes it. const status = await op.close(); expect(status.isSuccess).to.equal(true); - expect(exec.resultHandle.closed).to.equal(false); + expect(exec.cancelled).to.equal(false); + expect(exec.resultHandle.closed).to.equal(true); }); });