diff --git a/lib/sea/SeaOperationBackend.ts b/lib/sea/SeaOperationBackend.ts index 2a4c8136..0e5dc6fe 100644 --- a/lib/sea/SeaOperationBackend.ts +++ b/lib/sea/SeaOperationBackend.ts @@ -218,15 +218,25 @@ export default class SeaOperationBackend implements IOperationBackend { // Lifecycle surface. The async/metadata handles expose both cancel/close. // The sync-execute path uses a composite: `cancel()` always routes to the // cancellable execution (lock-free, interrupts a running `result()` - // mid-compute and is a no-op once terminal); `close()` closes the resolved - // terminal statement once `result()` produced it, OR — if `result()` is - // still in flight — proactively cancels the running execution so the server - // stops computing immediately rather than running on until the kernel's - // drop-guard fires whenever this handle is eventually GC'd. + // mid-compute and is a no-op once terminal); `close()` drives the statement + // to terminal first (fire-and-forget commit) unless the op was cancelled. this.lifecycleHandle = cancellableExecution ? { cancel: () => cancellableExecution.cancel(), - close: () => (this.blockingStatement ? this.blockingStatement.close() : cancellableExecution.cancel()), + // Fire-and-forget commit semantics. A submitted statement is only + // guaranteed to run server-side once `result()` reaches terminal. + // On close, if the op was NOT cancelled, drive `result()` to + // completion first (the eager kick-off in the session backend means + // this is usually already in flight, so we just await the memoised + // promise) so a `CREATE`/`INSERT` issued without a fetch still + // commits — then close the resolved terminal statement. A cancelled + // op skips the drive and releases the (already-cancelling) execution. + close: async () => { + if (!this.lifecycle.isCancelled && !this.blockingStatement) { + await this.getFetchHandle().catch(() => undefined); + } + return this.blockingStatement ? this.blockingStatement.close() : cancellableExecution.cancel(); + }, } : ((asyncStatement ?? statement) as SeaStatementHandle); this.context = context; diff --git a/lib/sea/SeaSessionBackend.ts b/lib/sea/SeaSessionBackend.ts index d593f87e..c469dbfc 100644 --- a/lib/sea/SeaSessionBackend.ts +++ b/lib/sea/SeaSessionBackend.ts @@ -35,7 +35,6 @@ import ParameterError from '../errors/ParameterError'; import { LogLevel } from '../contracts/IDBSQLLogger'; import { SeaConnection, SeaNativeExecuteOptions, SeaStatement } from './SeaNativeLoader'; import { decodeNapiKernelError } from './SeaErrorMapping'; -import { numberToInt64 } from '../thrift-backend/ThriftSessionBackend'; import SeaOperationBackend from './SeaOperationBackend'; import { buildSeaPositionalParams, buildSeaNamedParams } from './SeaPositionalParams'; import { seaServerInfoValue } from './SeaServerInfo'; @@ -122,38 +121,41 @@ export default class SeaSessionBackend implements ISessionBackend { * Per-statement options forwarded to the kernel `ExecuteOptions`: * - `ordinalParameters` / `namedParameters` → bound params (mutually * exclusive — the kernel binds one placeholder style per statement); - * - `queryTimeout` → enforced client-side by the operation backend's poll - * deadline (the kernel ignores `queryTimeoutSecs` on the async submit - * path), NOT forwarded to the napi options; * - `rowLimit` → `rowLimit` (SEA-only server-side row cap); * - `queryTags` → serialised into the conf overlay's reserved * `query_tags` key (the same wire shape Thrift's `serializeQueryTags` * produces), merged with any explicit `statementConf`. * - * Still rejected (genuinely unsupported on SEA, rather than silently - * dropped): `useCloudFetch` (governed by the kernel `ResultConfig`, not a - * per-statement knob), `useLZ4Compression` (kernel owns result compression), - * and `stagingAllowedLocalPath` (volume operations). `maxRows` is applied by - * the facade at fetch time, so it is intentionally not handled here. + * Accepted but IGNORED (no-op — the kernel exposes no per-statement knob, so + * we drop rather than reject; see the body for details and TODOs): + * `useCloudFetch`, `useLZ4Compression`, `stagingAllowedLocalPath`, and + * `queryTimeout`. `maxRows` is applied by the facade at fetch time, so it is + * intentionally not handled here. */ public async executeStatement(statement: string, options: ExecuteStatementOptions): Promise { this.failIfClosed(); - if (options.useCloudFetch !== undefined) { - throw new HiveDriverError( - 'SEA executeStatement: useCloudFetch is controlled by the kernel result configuration and is not a per-statement option on SEA', - ); - } - if (options.useLZ4Compression !== undefined) { - throw new HiveDriverError( - 'SEA executeStatement: useLZ4Compression is not supported on SEA (result compression is governed by the kernel)', - ); - } - if (options.stagingAllowedLocalPath !== undefined) { - throw new HiveDriverError( - 'SEA executeStatement: stagingAllowedLocalPath (volume operations) is not supported on SEA', - ); - } + // `useCloudFetch`, `useLZ4Compression`, and `stagingAllowedLocalPath` are + // accepted and IGNORED (no-op) on the kernel-backed SEA path rather than + // rejected — the kernel exposes no per-statement knob for any of them, so a + // hard failure would break callers that set these options globally. This + // mirrors the Python connector's kernel backend + // (`KernelDatabricksClient.execute_command`), which takes the same flags and + // never reads them. + // + // - `useCloudFetch`: result transport is governed by the session-level + // kernel `ResultConfig.cloudfetch_enabled` (default: CloudFetch on); + // there is no per-statement override on the napi surface. + // - `useLZ4Compression`: the kernel transparently decodes whatever + // compression the server returns (`manifest.result_compression`) and + // exposes no compression-request knob. + // - `stagingAllowedLocalPath`: the kernel has no Volume (PUT/GET/REMOVE) + // API yet, so `SeaOperationBackend` always reports + // `isStagingOperation: false` and `DBSQLSession` treats such statements + // as ordinary queries. Non-staging queries that set the option run + // normally (parity with Thrift). + // TODO(SEA): wire real volume operations once the kernel exposes a + // Volume API + napi `is_volume_operation`. // `runAsync` selects the kernel execution path. NOTE: this is a SEA/kernel- // specific use of the option — the Thrift backend hardcodes `runAsync: true` @@ -166,26 +168,27 @@ export default class SeaSessionBackend implements ISessionBackend { // - DEFAULT (`runAsync` false/undefined) — SYNC. Route through // `executeStatementCancellable`: the kernel blocks on `execute()` // (server-side direct-results / poll-to-terminal), which is faster and, - // with the napi sync canceller, fully cancellable mid-COMPUTE. The - // blocking drive runs in the operation backend's `result()` (inside - // `waitUntilReady`, which the facade invokes lazily at first fetch). - // `queryTimeoutSecs` IS honoured on this path (forwarded to the napi - // options below) since the kernel `execute()` consults it. + // with the napi sync canceller, fully cancellable mid-COMPUTE. // // - `runAsync: true` — ASYNC. Submit (`wait_timeout=0s`): the server // returns a pending `AsyncStatement` immediately while the query runs; // the backend polls `status()` to terminal in `waitUntilReady()` and - // materialises results via `awaitResult()`. `queryTimeoutSecs` is - // ignored by the kernel on submit, so it is enforced client-side by the - // operation backend's poll-loop deadline instead. + // materialises results via `awaitResult()`. + // + // TODO(SEA): `queryTimeout` is intentionally a NO-OP here. It must NOT be + // mapped to the SEA `wait_timeout` wire field: `wait_timeout` is the + // inline-result wait knob (valid range {0} ∪ [5,50]s, paired with + // `on_wait_timeout`), a different concept from a server statement-execution + // timeout, and out-of-range values fail with HTTP 400. The correct SEA + // mechanism is the `STATEMENT_TIMEOUT` session configuration (seconds); the + // Python connector forwards no per-statement timeout at all. Wiring this + // properly (STATEMENT_TIMEOUT and/or a client-side poll deadline) is + // deferred — until then the option is accepted and ignored. const runAsync = options.runAsync ?? false; - const queryTimeoutSecs = - options.queryTimeout !== undefined ? numberToInt64(options.queryTimeout).toNumber() : undefined; + + const execOptions = this.buildExecuteOptions(options); if (!runAsync) { - // Sync path: forward `queryTimeoutSecs` to the napi options — the kernel - // `execute()` honours it (server statement timeout). - const execOptions = this.buildExecuteOptions(options, queryTimeoutSecs); let cancellableExecution; try { cancellableExecution = @@ -195,19 +198,26 @@ export default class SeaSessionBackend implements ISessionBackend { } catch (err) { throw this.logAndMapError('executeStatement', err); } - return new SeaOperationBackend({ + const op = new SeaOperationBackend({ cancellableExecution: cancellableExecution!, context: this.context, - // The kernel honours `queryTimeoutSecs` on the sync `execute` path, so - // it is forwarded via the napi options (see `buildExecuteOptions`); the - // backend also keeps it as a deadline guard for parity with async. - queryTimeoutSecs, }); + // Eager-cancellable sync path: kick off the inline-materialise `result()` + // in the background and return the handle IMMEDIATELY — do NOT await it. + // The kernel `execute()` publishes the statement id mid-execute, so a + // concurrent `op.cancel()` interrupts the running execute via the + // StatementCanceller (and, if the id has not been published yet, the + // canceller holds the cancel intent until it is, then dispatches the real + // CancelStatement — so there is no orphaned server statement). This gives + // mid-run cancel on the SYNC path WITHOUT `runAsync`'s submit/poll/refetch + // tax; `fetchAll()` awaits the same memoised `result()` (so small queries + // stay fast / inline). Fire-and-forget DDL/DML (execute then close without + // a fetch) still commits: the operation backend's `close()` drives this + // same `result()` to terminal before releasing, unless the op was cancelled. + op.waitUntilReady().catch(() => undefined); + return op; } - // Async path: do NOT forward `queryTimeoutSecs` (the kernel ignores it on - // submit — `wait_timeout=0s`); it is enforced client-side by the poll loop. - const execOptions = this.buildExecuteOptions(options); let asyncStatement; try { asyncStatement = @@ -217,16 +227,9 @@ export default class SeaSessionBackend implements ISessionBackend { } catch (err) { throw this.logAndMapError('executeStatement', err); } - // `queryTimeout` is enforced client-side by the operation backend's poll - // loop: the kernel ignores `queryTimeoutSecs` on the async submit path - // (`submitStatement` always sends `wait_timeout=0s`), so we do NOT forward - // it to the napi options — passing it there would be a silent no-op. return new SeaOperationBackend({ asyncStatement: asyncStatement!, context: this.context, - // `queryTimeout` is typed `number | bigint | Int64`; `numberToInt64(...).toNumber()` - // coerces all three (a bare `Number(int64)` yields NaN — node-int64 has no valueOf). - queryTimeoutSecs, }); } @@ -235,10 +238,7 @@ export default class SeaSessionBackend implements ISessionBackend { * `ExecuteOptions`, returning `undefined` when nothing is set so the * no-options call shape (`executeStatement(sql)`) is preserved. */ - private buildExecuteOptions( - options: ExecuteStatementOptions, - queryTimeoutSecs?: number, - ): SeaNativeExecuteOptions | undefined { + private buildExecuteOptions(options: ExecuteStatementOptions): SeaNativeExecuteOptions | undefined { // Positional (`?`) and named (`:name`) parameters are mutually exclusive — // the kernel binds one placeholder style per statement. Use the SAME error // type and message as the Thrift backend (`ThriftSessionBackend`) so a @@ -256,14 +256,8 @@ export default class SeaSessionBackend implements ISessionBackend { if (namedParams !== undefined) { execOptions.namedParams = namedParams; } - // `queryTimeoutSecs` is forwarded only on the SYNC path (the caller passes - // it in): the kernel `execute()` consults it as the server statement - // timeout. On the async submit path the caller omits it (the kernel ignores - // it under `wait_timeout=0s`), so it is enforced client-side by the - // operation backend's poll-loop deadline instead (see executeStatement). - if (queryTimeoutSecs !== undefined) { - execOptions.queryTimeoutSecs = queryTimeoutSecs; - } + // `queryTimeout` is intentionally NOT forwarded — it is a no-op on SEA (see + // the TODO in executeStatement). It must not become the SEA `wait_timeout`. if (options.rowLimit !== undefined) { execOptions.rowLimit = Number(options.rowLimit); } diff --git a/tests/unit/sea/execution.test.ts b/tests/unit/sea/execution.test.ts index 81cdfadd..713ec7f1 100644 --- a/tests/unit/sea/execution.test.ts +++ b/tests/unit/sea/execution.test.ts @@ -591,44 +591,59 @@ describe('SeaSessionBackend', () => { expect((thrown as Error).message).to.equal('Driver does not support both ordinal and named parameters.'); }); - it('executeStatement (sync default) DOES forward queryTimeout to the napi options', async () => { + it('executeStatement does NOT forward queryTimeout — it is a no-op on SEA', async () => { const connection = new FakeNativeConnection(); const session = makeSession(connection); await session.executeStatement('SELECT 1', { queryTimeout: 30 }); - // Sync path: the kernel `execute()` honours queryTimeoutSecs (server - // statement timeout), so the backend forwards it onto the napi options. - expect((connection.lastOptions as { queryTimeoutSecs?: number } | undefined)?.queryTimeoutSecs).to.equal(30); + // queryTimeout is intentionally a NO-OP on SEA (see SeaSessionBackend): the + // kernel would otherwise map queryTimeoutSecs onto the SEA `wait_timeout` wire + // field (valid range {0} ∪ [5,50]s) — a different concept from a statement + // timeout, and out-of-range values fail HTTP 400. The proper mechanism is the + // `STATEMENT_TIMEOUT` session config (deferred). So it is neither forwarded + // nor allowed to synthesise an options object. + expect(connection.lastOptions).to.equal(undefined); }); - it('executeStatement (runAsync: true) does NOT forward queryTimeout to submit (kernel ignores it; enforced client-side)', async () => { + it('executeStatement (runAsync: true) does NOT forward queryTimeout (no-op on SEA)', async () => { const connection = new FakeNativeConnection(); const session = makeSession(connection); await session.executeStatement('SELECT 1', { queryTimeout: 30, runAsync: true }); - // Async submit path: the kernel ignores queryTimeoutSecs under - // `wait_timeout=0s`, so it's enforced client-side by the poll deadline - // instead — never forwarded to the napi options. expect((connection.lastOptions as { queryTimeoutSecs?: number } | undefined)?.queryTimeoutSecs).to.equal(undefined); }); - it('coerces an Int64 queryTimeout into the client-side deadline on the async path (not NaN)', async function int64Timeout() { - // Regression: `Number(new Int64(...))` yields NaN (node-int64 has no valueOf), - // which would silently disable the deadline. The backend must coerce via - // numberToInt64(...).toNumber() so an Int64 queryTimeout still bounds the poll. - // Exercised on the async path, where the client-side poll deadline applies. - // eslint-disable-next-line no-invalid-this - this.timeout(5000); + it('accepts an Int64 queryTimeout without forwarding it or crashing (no-op on SEA)', async () => { + // queryTimeout is a no-op on SEA, but the option must still be ACCEPTED for + // any value type the public API allows (number | bigint | Int64) without + // throwing or synthesising napi options. Int64 is the awkward case + // (node-int64 has no valueOf), so assert it specifically. const connection = new FakeNativeConnection(); - connection.submitStatusValue = 'Running'; // never reaches a terminal state const session = makeSession(connection); - const op = await session.executeStatement('SELECT 1', { queryTimeout: new Int64(1), runAsync: true }); - let thrown: unknown; - try { - await op.waitUntilReady(); - } catch (err) { - thrown = err; - } - expect(thrown, 'Int64(1) timeout must fire — NaN would poll forever').to.be.instanceOf(OperationStateError); - expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Timeout); + await session.executeStatement('SELECT 1', { queryTimeout: new Int64(1) }); + expect((connection.lastOptions as { queryTimeoutSecs?: number } | undefined)?.queryTimeoutSecs).to.equal(undefined); + expect(connection.lastOptions).to.equal(undefined); + }); + + it('Option A: sync executeStatement drives the statement to terminal before returning', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + const op = await session.executeStatement('SELECT 1', {}); + // The default sync path blocks to terminal in executeStatement (matching + // JDBC / ADBC C# / Python use_kernel), so the returned op is already finished + // — status reports Succeeded with no explicit waitUntilReady()/fetch. This is + // what makes fire-and-forget DDL/DML and dependent statements "just work". + const status = await op.status(false); + expect(status.state).to.equal(OperationState.Succeeded); + }); + + it('runAsync: true does NOT drive to terminal in executeStatement (returns a pending, cancellable handle)', async () => { + const connection = new FakeNativeConnection(); + connection.submitStatusValue = 'Running'; + const session = makeSession(connection); + const op = await session.executeStatement('SELECT 1', { runAsync: true }); + // The async path returns immediately with a running handle so the caller can + // poll / cancel mid-run (the place to do mid-run cancellation under Option A). + const status = await op.status(false); + expect(status.state).to.equal(OperationState.Running); }); it('executeStatement forwards rowLimit', async () => { @@ -690,25 +705,26 @@ describe('SeaSessionBackend', () => { } }); - // Genuinely unsupported on SEA — rejected (rather than silently ignored) so - // a caller/agent gets signal instead of a no-op. queryTags / queryTimeout / - // rowLimit are NOT here — they are forwarded (asserted above). - for (const { name, options, re } of [ - { name: 'useCloudFetch', options: { useCloudFetch: true }, re: /useCloudFetch/ }, - { name: 'useLZ4Compression', options: { useLZ4Compression: true }, re: /useLZ4Compression/ }, - { name: 'stagingAllowedLocalPath', options: { stagingAllowedLocalPath: '/tmp' }, re: /stagingAllowedLocalPath/ }, + // Not per-statement knobs on the kernel-backed SEA path — ACCEPTED and IGNORED + // (no-op) rather than rejected, matching the Python connector's kernel backend + // (`KernelDatabricksClient.execute_command`, which takes the same flags and + // never reads them). A hard failure would break callers that set these + // globally; the kernel exposes no per-statement override for any of them + // (CloudFetch is the session-level ResultConfig, compression is decoded from + // the manifest, and volume operations have no kernel API yet). + for (const { name, options } of [ + { name: 'useCloudFetch', options: { useCloudFetch: true } }, + { name: 'useLZ4Compression', options: { useLZ4Compression: true } }, + { name: 'stagingAllowedLocalPath', options: { stagingAllowedLocalPath: '/tmp' } }, ] as const) { - it(`executeStatement rejects ${name} rather than silently ignoring it`, async () => { + it(`executeStatement accepts and ignores ${name} (no throw, not forwarded)`, async () => { const connection = new FakeNativeConnection(); const session = makeSession(connection); - let thrown: unknown; - try { - await session.executeStatement('SELECT 1', options); - } catch (err) { - thrown = err; - } - expect(thrown).to.be.instanceOf(HiveDriverError); - expect((thrown as Error).message).to.match(re); + // Must not throw... + await session.executeStatement('SELECT 1', options); + // ...and must not be forwarded to the napi options (it has no kernel knob), + // so a SELECT-1 with only this option still uses the no-options fast path. + expect(connection.lastOptions).to.equal(undefined); }); } @@ -1101,14 +1117,30 @@ describe('SeaOperationBackend — sync (executeStatementCancellable) path', () = expect(exec.cancelled).to.equal(true); }); - it('close() on a still-running sync op cancels the server execution (no compute leak)', async () => { + it('close() on a not-yet-fetched sync op drives result() to terminal then closes it (fire-and-forget commit, no cancel)', async () => { + const exec = new FakeCancellableExecution(); + const op = makeSyncOp(exec); + // Fire-and-forget DDL/DML: execute then close without a fetch. close() must + // drive result() to completion (so the statement actually commits + // server-side) and then close the resolved terminal statement — it must NOT + // cancel the execution (which would abort an uncommitted CREATE/INSERT). + await op.close(); + expect(exec.cancelled).to.equal(false); + expect(exec.resultCalls).to.equal(1); + expect(exec.resultHandle.closed).to.equal(true); + }); + + it('close() after cancel() releases the execution WITHOUT driving result() (a cancelled op does not commit)', async () => { const exec = new FakeCancellableExecution(); const op = makeSyncOp(exec); - // close() before result() resolved: with no terminal statement to close, - // it must proactively cancel the running execution rather than no-op - // (otherwise server compute runs on until the kernel drop-guard fires at GC). + await op.cancel(); await op.close(); + // The op was explicitly cancelled, so close() must NOT drive result() to + // terminal (no commit of a cancelled statement) — it just releases via the + // cancellable execution. expect(exec.cancelled).to.equal(true); + expect(exec.resultCalls).to.equal(0); + expect(exec.resultHandle.closed).to.equal(false); }); it('cancel() interrupts an in-flight result(), surfacing OperationStateError(Canceled)', async () => { @@ -1139,12 +1171,13 @@ describe('SeaOperationBackend — sync (executeStatementCancellable) path', () = expect(exec.resultHandle.closed).to.equal(true); }); - it('close() before result() resolves is a no-op (nothing server-side to close yet)', async () => { + it('close() before any fetch drives result() to terminal and reports success', async () => { const exec = new FakeCancellableExecution(); const op = makeSyncOp(exec); - // Should not throw even though result() never ran. + // Should not throw; drives result() to terminal (commit) then closes it. const status = await op.close(); expect(status.isSuccess).to.equal(true); - expect(exec.resultHandle.closed).to.equal(false); + expect(exec.cancelled).to.equal(false); + expect(exec.resultHandle.closed).to.equal(true); }); });