From bec8a521c9bcab30613ac504ed069851d399749e Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Wed, 10 Jun 2026 17:18:38 +0000 Subject: [PATCH 1/2] fix(fetch): serialize result consumption per operation to prevent concurrent-fetch row loss MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two concurrent fetchAll()/fetchChunk() calls on a single operation could silently drop rows. The result pipeline behind every backend is a single stateful cursor; the kernel (SEA) path in particular threads shared, non-atomic prefetch state through KernelResultsProvider → ArrowResultConverter → ResultSlicer, so overlapping consumers corrupt the cursor and lose batches (observed ~99788/100000 rows on a 100k-row SELECT). Thrift avoided visible loss only because it delivers this result set in one drainable unit. Serialize all result consumption on an operation via a per-operation fetch lock. fetchAll holds it across its entire drain loop; fetchChunk/hasMoreRows hold it per call. Holding it across the whole drain makes two concurrent fetchAll() calls behave identically on both backends: the first drains the complete result set, the second observes an exhausted cursor and returns [] — Thrift parity by construction, no kernel change required. The single-consumer hot path is uncontended (the chain is an already-resolved promise). Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore --- lib/DBSQLOperation.ts | 101 ++++++++++++++++++++++++++++++------------ 1 file changed, 72 insertions(+), 29 deletions(-) diff --git a/lib/DBSQLOperation.ts b/lib/DBSQLOperation.ts index 1ec1922c..3a8d40dd 100644 --- a/lib/DBSQLOperation.ts +++ b/lib/DBSQLOperation.ts @@ -65,6 +65,20 @@ export default class DBSQLOperation implements IOperation { private sessionId?: string; + // Serialises all result consumption on THIS operation. `fetchAll` holds it + // across its entire drain loop; `fetchChunk` / `hasMoreRows` hold it per + // call. The result pipeline behind every backend is a single stateful cursor + // (the kernel path in particular threads shared, non-atomic prefetch state + // through `KernelResultsProvider` → `ArrowResultConverter` → `ResultSlicer`), + // so concurrent consumers on one operation must be serialised or they corrupt + // that cursor and silently drop rows. Holding the lock across the WHOLE + // `fetchAll` drain (not per chunk) is what makes two concurrent `fetchAll()` + // calls behave like the Thrift backend: the first drains the complete result + // set, the second observes an exhausted cursor and returns `[]` — rather than + // splitting the rows between them. Uncontended on the normal single-consumer + // path (the chain is an already-resolved promise). + private fetchChain: Promise = Promise.resolve(); + constructor(options: DBSQLOperationConstructorOptions) { this.context = options.context; this.backend = options.backend; @@ -115,21 +129,30 @@ export default class DBSQLOperation implements IOperation { * const result = await queryOperation.fetchAll(); */ public async fetchAll(options?: FetchOptions): Promise> { - const data: Array> = []; - - const fetchChunkOptions = { - ...options, - disableBuffering: true, - }; - - do { - // eslint-disable-next-line no-await-in-loop - const chunk = await this.fetchChunk(fetchChunkOptions); - data.push(chunk); - } while (await this.hasMoreRows()); // eslint-disable-line no-await-in-loop - this.context.getLogger().log(LogLevel.debug, `Fetched all data from operation with id: ${this.id}`); - - return data.flat(); + // Hold the fetch lock across the ENTIRE drain (see `fetchChain`): a + // concurrent fetchAll()/fetchChunk() on the same operation queues behind + // this loop instead of interleaving with it. The loop calls the + // *Internal (non-locking) primitives to avoid self-deadlock on the lock we + // already hold. Error telemetry wraps the whole drain. + return this.runFetchExclusive(() => + this.withErrorTelemetry(async () => { + const data: Array> = []; + + const fetchChunkOptions = { + ...options, + disableBuffering: true, + }; + + do { + // eslint-disable-next-line no-await-in-loop + const chunk = await this.fetchChunkInternal(fetchChunkOptions); + data.push(chunk); + } while (await this.hasMoreRowsInternal()); // eslint-disable-line no-await-in-loop + this.context.getLogger().log(LogLevel.debug, `Fetched all data from operation with id: ${this.id}`); + + return data.flat(); + }), + ); } /** @@ -142,7 +165,7 @@ export default class DBSQLOperation implements IOperation { * const result = await queryOperation.fetchChunk({maxRows: 1000}); */ public async fetchChunk(options?: FetchOptions): Promise> { - return this.withErrorTelemetry(() => this.fetchChunkInternal(options)); + return this.runFetchExclusive(() => this.withErrorTelemetry(() => this.fetchChunkInternal(options))); } private async fetchChunkInternal(options?: FetchOptions): Promise> { @@ -241,21 +264,26 @@ export default class DBSQLOperation implements IOperation { } public async hasMoreRows(): Promise { - return this.withErrorTelemetry(async () => { - // If operation is closed or cancelled - we should not try to get data from it - if (this.closed || this.cancelled) { - return false; - } + return this.runFetchExclusive(() => this.withErrorTelemetry(() => this.hasMoreRowsInternal())); + } - // Wait for operation to finish before checking for more rows - // This ensures metadata can be fetched successfully - if (this.backend.hasResultSet()) { - await this.waitUntilReadyThroughBackend(); - } + // Non-locking body of `hasMoreRows`. Called directly by `fetchAll`'s drain + // loop (which already holds `fetchChain`) and by the public `hasMoreRows` + // wrapper (which acquires it). Must never acquire the lock itself. + private async hasMoreRowsInternal(): Promise { + // If operation is closed or cancelled - we should not try to get data from it + if (this.closed || this.cancelled) { + return false; + } - // If we fetched all the data from server - check if there's anything buffered in result handler - return this.backend.hasMore(); - }); + // Wait for operation to finish before checking for more rows + // This ensures metadata can be fetched successfully + if (this.backend.hasResultSet()) { + await this.waitUntilReadyThroughBackend(); + } + + // If we fetched all the data from server - check if there's anything buffered in result handler + return this.backend.hasMore(); } public async getSchema(options?: GetSchemaOptions): Promise { @@ -338,6 +366,21 @@ export default class DBSQLOperation implements IOperation { } } + // Run `fn` with exclusive access to this operation's result cursor by + // chaining it onto `fetchChain`. The next caller waits for this one to settle + // (success OR failure) before starting, so the single stateful fetch pipeline + // is only ever driven by one in-flight consumer. A rejection is delivered to + // THIS caller but not propagated to the next waiter (the chain swallows it), + // so one failed fetch never poisons subsequent fetches. + private runFetchExclusive(fn: () => Promise): Promise { + const run = this.fetchChain.then(fn, fn); + this.fetchChain = run.then( + () => undefined, + () => undefined, + ); + return run; + } + private async failIfClosed(): Promise { if (this.closed) { throw new OperationStateError(OperationStateErrorCode.Closed); From df3e568b38aa290691c79878c9f5228df3a2e350 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Thu, 11 Jun 2026 07:06:47 +0000 Subject: [PATCH 2/2] test(fetch): stub internal drain primitives in fetchAll unit test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The fetchAll() drain holds the per-operation fetch lock across the whole loop and therefore calls the non-locking `fetchChunkInternal`/`hasMoreRowsInternal` primitives (calling the public `fetchChunk`/`hasMoreRows`, which re-acquire the same lock, would self-deadlock). The existing unit test stubbed the *public* methods, which the refactored drain no longer calls — so the stubs were bypassed, the real internals ran with no data source, and the test timed out (2000ms). Stub the internal primitives the drain actually invokes. Behavior asserted is unchanged (fetchAll drains all chunks and returns all rows). Test is explicitly implementation-specific (see its comment). Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore --- tests/unit/DBSQLOperation.test.ts | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/tests/unit/DBSQLOperation.test.ts b/tests/unit/DBSQLOperation.test.ts index f9195105..1fbde86a 100644 --- a/tests/unit/DBSQLOperation.test.ts +++ b/tests/unit/DBSQLOperation.test.ts @@ -1008,20 +1008,26 @@ describe('DBSQLOperation', () => { const originalData = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]; const tempData = [...originalData]; - const fetchChunkStub = sinon.stub(operation, 'fetchChunk').callsFake(async (): Promise> => { - return tempData.splice(0, 3); - }); - const hasMoreRowsStub = sinon.stub(operation, 'hasMoreRows').callsFake(async () => { + // Warning: this check is implementation-specific. + // `fetchAll` holds the per-operation fetch lock across the entire drain, so + // it calls the *non-locking* internal primitives (`fetchChunkInternal` / + // `hasMoreRowsInternal`) rather than the public `fetchChunk` / `hasMoreRows` + // (which re-acquire the same lock and would self-deadlock). Stub the + // internals that the drain loop actually invokes. + const fetchChunkStub = sinon + .stub(operation as any, 'fetchChunkInternal') + .callsFake(async (): Promise> => { + return tempData.splice(0, 3); + }); + const hasMoreRowsStub = sinon.stub(operation as any, 'hasMoreRowsInternal').callsFake(async () => { return tempData.length > 0; }); const fetchedData = await operation.fetchAll(); - // Warning: this check is implementation-specific - // `fetchAll` should wait for operation to complete. In current implementation - // it does so by calling `fetchChunk` at least once, which internally does - // all the job. But since here we stub `fetchChunk` it won't really wait, - // therefore here we ensure it was called at least once + // `fetchAll` should wait for the operation to complete; in the current + // implementation it does so by draining via `fetchChunkInternal` at least + // once, which internally does all the work. expect(fetchChunkStub.callCount).to.be.gte(1); expect(fetchChunkStub.called).to.be.true;