Skip to content

Commit 20ad257

Browse files
author
Alexey Zorkaltsev
authored
Merge pull request #409 from ydb-platform/call-cancelled-bug3
feat: add idempotent option to tableClient session.executeQuery
2 parents f4ce0c4 + 2bf0cd3 commit 20ad257

File tree

4 files changed

+40
-13
lines changed

4 files changed

+40
-13
lines changed

src/errors.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,9 +315,9 @@ export class ClientResourceExhausted extends TransportError {
315315
public readonly [RetryPolicySymbol] = retryPolicy(Backoff.Slow, false, true, true);
316316
}
317317

318-
export class ClientCancelled extends TransportError {
318+
export class ClientCancelled extends TransportError { // TODO: "Call cancelled" error appears also when connection string is wrong - would be right to avoid such dead lock retrying
319319
static status = StatusCode.CLIENT_CANCELED;
320-
public readonly [RetryPolicySymbol] = retryPolicy(Backoff.No, false, false, false);
320+
public readonly [RetryPolicySymbol] = retryPolicy(Backoff.Fast, false, true, false);
321321
}
322322

323323
const TRANSPORT_ERROR_CODES = new Map([

src/retries_obsoleted.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ const RETRYABLE_ERRORS_FAST = [
5959
];
6060
const RETRYABLE_ERRORS_SLOW = [errors.Overloaded, errors.ClientResourceExhausted];
6161

62-
class RetryStrategy {
62+
export class RetryStrategy {
6363
// private logger: Logger;
6464
constructor(
6565
public methodName = 'UnknownClass::UnknownMethod',

src/table/table-session.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import * as grpc from "@grpc/grpc-js";
2121
import EventEmitter from "events";
2222
import {ICreateSessionResult, SessionEvent, TableService} from "./table-session-pool";
2323
import {Endpoint} from "../discovery";
24-
import {retryable} from "../retries_obsoleted";
24+
import {retryable, RetryParameters, RetryStrategy} from "../retries_obsoleted";
2525
import {MissingStatus, MissingValue, SchemeError, YdbError} from "../errors";
2626
import {ResponseMetadataKeys} from "../constants";
2727
import {pessimizable} from "../utils";
@@ -171,15 +171,21 @@ export class PrepareQuerySettings extends OperationParamsSettings {
171171
}
172172

173173
export class ExecuteQuerySettings extends OperationParamsSettings {
174-
keepInCache: boolean = false;
174+
keepInCache?: boolean = false;
175175
collectStats?: Ydb.Table.QueryStatsCollection.Mode;
176176
onResponseMetadata?: (metadata: grpc.Metadata) => void;
177+
idempotent: boolean = false;
177178

178179
withKeepInCache(keepInCache: boolean) {
179180
this.keepInCache = keepInCache;
180181
return this;
181182
}
182183

184+
withIdempotent(idempotent: boolean) {
185+
this.idempotent = idempotent;
186+
return this;
187+
}
188+
183189
withCollectStats(collectStats: Ydb.Table.QueryStatsCollection.Mode) {
184190
this.collectStats = collectStats;
185191
return this;
@@ -258,6 +264,8 @@ export class ExecuteScanQuerySettings {
258264
}
259265
}
260266

267+
let executeQueryRetryer: RetryStrategy;
268+
261269
export class TableSession extends EventEmitter implements ICreateSessionResult {
262270
private beingDeleted = false;
263271
private free = true;
@@ -518,7 +526,13 @@ export class TableSession extends EventEmitter implements ICreateSessionResult {
518526
if (keepInCache) {
519527
request.queryCachePolicy = {keepInCache};
520528
}
521-
const response = await this.api.executeDataQuery(request);
529+
530+
if (!executeQueryRetryer) executeQueryRetryer = new RetryStrategy('TableSession:executeQuery', new RetryParameters(), this.logger);
531+
532+
const response =
533+
settings?.idempotent
534+
? await executeQueryRetryer.retry(() => this.api.executeDataQuery(request))
535+
: await this.api.executeDataQuery(request);
522536
const payload = getOperationPayload(this.processResponseMetadata(request, response, settings?.onResponseMetadata));
523537
return ExecuteQueryResult.decode(payload);
524538
}

src/utils/test/create-table.ts

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import {Column, TableDescription, TableSession} from "../../table";
2-
import {withRetries} from "../../retries_obsoleted";
1+
import {AUTO_TX, Column, ExecuteQuerySettings, TableDescription, TableSession} from "../../table";
2+
// import {withRetries} from "../../retries_obsoleted";
33
import {Types} from "../../types";
44
import {Row} from "./row";
55

@@ -29,10 +29,23 @@ DECLARE $data AS List<Struct<id: Uint64, title: Utf8>>;
2929
REPLACE INTO ${TABLE}
3030
SELECT * FROM AS_TABLE($data);`;
3131

32-
await withRetries(async () => {
33-
const preparedQuery = await session.prepareQuery(query);
34-
await session.executeQuery(preparedQuery, {
32+
// Now we can specify that the operation should be repeated in case of an error by specifying that it is idempotent
33+
34+
// Old code:
35+
36+
// await withRetries(async () => {
37+
// const preparedQuery = await session.prepareQuery(query);
38+
// await session.executeQuery(preparedQuery, {
39+
// '$data': Row.asTypedCollection(rows),
40+
// });
41+
// });
42+
43+
// New code variant:
44+
45+
const preparedQuery = await session.prepareQuery(query);
46+
await session.executeQuery(preparedQuery, {
3547
'$data': Row.asTypedCollection(rows),
36-
});
37-
});
48+
},
49+
AUTO_TX,
50+
new ExecuteQuerySettings().withIdempotent(true));
3851
}

0 commit comments

Comments
 (0)