Skip to content

Commit d82ac26

Browse files
author
Alexey Zorkaltsev
authored
Merge pull request #400 from ydb-platform/topic
Add topic write retrier, test reader and continues reauth
2 parents b09086f + bc22f6d commit d82ac26

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1506
-786
lines changed

.env.dev.sample

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
YDB_ANONYMOUS_CREDENTIALS=1
22
YDB_SSL_ROOT_CERTIFICATES_FILE=../slo-tests/playground/data/ydb_certs/ca.pem
3-
YDB_ENDPOINT=grpc://<localhost / fqdn / ip-address>:<2135 / 2136>
3+
YDB_ENDPOINT=grpc://localhost:2136
44
YDB_LOG_LEVEL=debug
5+
YDB_DETAILED_TRACE_STACK=true
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Due to a problem with a reference to json - TEMPORARY example is as md-file
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import {Driver as YDB} from '../../src';
2+
import {AnonymousAuthService} from "../../src/credentials/anonymous-auth-service";
3+
import {Ydb} from "ydb-sdk-proto";
4+
import {SimpleLogger} from "../../src/logger/simple-logger";
5+
import {Context} from "../../src/context";
6+
7+
require('dotenv').config();
8+
9+
const DATABASE = '/local';
10+
const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136';
11+
12+
async function main() {
13+
const db = new YDB({
14+
endpoint: ENDPOINT,
15+
database: DATABASE,
16+
authService: new AnonymousAuthService(),
17+
logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}),
18+
});
19+
if (!(await db.ready(3000))) throw new Error('Driver is not ready!');
20+
await db.topic.createTopic({
21+
path: 'demoTopic',
22+
consumers: [{
23+
name: 'demo',
24+
}],
25+
});
26+
const writer = await db.topic.createWriter({
27+
path: 'demoTopic',
28+
// producerId: '...', // will be genereted automatically
29+
// messageGroupId: '...' // will be the same as producerId
30+
getLastSeqNo: true, // seqNo will be assigned automatically
31+
});
32+
await writer.sendMessages({
33+
codec: Ydb.Topic.Codec.CODEC_RAW,
34+
messages: [{
35+
data: Buffer.from('Hello, world'),
36+
uncompressedSize: 'Hello, world'.length,
37+
}],
38+
});
39+
const promises = [];
40+
for (let n = 0; n < 4; n++) {
41+
// ((writer as any).innerWriteStream as TopicWriteStreamWithEvents).close(Context.createNew().ctx, new Error('Fake error'));
42+
43+
// await sleep(3000); // TODO:
44+
45+
promises.push(writer.sendMessages({
46+
codec: Ydb.Topic.Codec.CODEC_RAW,
47+
messages: [{
48+
data: Buffer.from(`Message N${n}`),
49+
uncompressedSize: `Message N${n}`.length,
50+
}],
51+
}));
52+
}
53+
await Promise.all(promises);
54+
const reader = await db.topic.createReader(Context.createNew({
55+
timeout: 3000,
56+
}).ctx, {
57+
topicsReadSettings: [{
58+
path: 'demoTopic',
59+
}],
60+
consumer: 'demo',
61+
receiveBufferSizeInBytes: 10_000_000,
62+
});
63+
for await (const message of reader.messages) {
64+
console.info(`Message: ${message.data!.toString()}`);
65+
await message.commit();
66+
}
67+
await reader.close(); // graceful close() - complete when all messages are commited
68+
}
69+
70+
main();

jest.config.dev.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ module.exports = {
88
},
99
testRegex: '(/__tests__/.*|(\\.|/)(test|spec))\\.tsx?$',
1010
moduleFileExtensions: ['ts', 'tsx', 'js', 'jsx', 'json', 'node'],
11+
noStackTrace: true,
1112
}

slo-workload/DEV.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@ in the _slo-workload_ folder
3030

3131
### Create the test database
3232

33-
`npx ts-node src/index.ts create grpcs://localhost:2135 local`
33+
`npx ts-node src/index.ts.md create grpcs://localhost:2135 local`
3434

3535
### Run the test - for 5 min
3636

37-
`npx ts-node src/index.ts run grpcs://localhost:2135 local`
37+
`npx ts-node src/index.ts.md run grpcs://localhost:2135 local`
3838

3939
### Clean the baseClean the base
4040

41-
`npx ts-node src/index.ts cleanup grpcs://localhost:2135 local`
41+
`npx ts-node src/index.ts.md cleanup grpcs://localhost:2135 local`
4242

4343
### What to do in case of problems
4444

src/__tests__/e2e/connection.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
12
import {initDriver, destroyDriver} from "../../utils/test";
23

34
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

src/__tests__/e2e/query-service/method-execute.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
12
import DiscoveryService from "../../../discovery/discovery-service";
23
import {ENDPOINT_DISCOVERY_PERIOD} from "../../../constants";
34
import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service";
@@ -10,6 +11,8 @@ import {Context} from "../../../context";
1011
import {ctxSymbol} from "../../../query/symbols";
1112
import StatsMode = Ydb.Query.StatsMode;
1213
import ExecMode = Ydb.Query.ExecMode;
14+
import {RetryParameters} from "../../../retries/retryParameters";
15+
import {RetryStrategy} from "../../../retries/retryStrategy";
1316

1417
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
1518

@@ -238,6 +241,7 @@ describe('Query.execute()', () => {
238241
database: DATABASE,
239242
authService,
240243
discoveryPeriod: ENDPOINT_DISCOVERY_PERIOD,
244+
retrier: new RetryStrategy(new RetryParameters(), logger),
241245
logger,
242246
});
243247
await discoveryService.ready(ENDPOINT_DISCOVERY_PERIOD);

src/__tests__/e2e/query-service/query-service-client.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
12
import Driver from "../../../driver";
23
import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service";
34
import * as errors from "../../../errors";

src/__tests__/e2e/query-service/rows-conversion.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
12
import DiscoveryService from "../../../discovery/discovery-service";
23
import {QuerySession, RowType} from "../../../query";
34
import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service";
@@ -8,6 +9,8 @@ import {Ydb} from "ydb-sdk-proto";
89
import {getDefaultLogger} from "../../../logger/get-default-logger";
910
import {ctxSymbol} from "../../../query/symbols";
1011
import {Context} from "../../../context";
12+
import {RetryParameters} from "../../../retries/retryParameters";
13+
import {RetryStrategy} from "../../../retries/retryStrategy";
1114

1215
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
1316

@@ -155,6 +158,7 @@ describe('Rows conversion', () => {
155158
database: DATABASE,
156159
authService,
157160
discoveryPeriod: ENDPOINT_DISCOVERY_PERIOD,
161+
retrier: new RetryStrategy(new RetryParameters(), logger),
158162
logger,
159163
});
160164

src/__tests__/e2e/query-service/transactions.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
12
import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service";
23
import DiscoveryService from "../../../discovery/discovery-service";
34
import {ENDPOINT_DISCOVERY_PERIOD} from "../../../constants";
@@ -7,6 +8,8 @@ import * as symbols from "../../../query/symbols";
78
import {getDefaultLogger} from "../../../logger/get-default-logger";
89
import {ctxSymbol} from "../../../query/symbols";
910
import {Context} from "../../../context";
11+
import {RetryParameters} from "../../../retries/retryParameters";
12+
import {RetryStrategy} from "../../../retries/retryStrategy";
1013

1114
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
1215

@@ -122,8 +125,8 @@ describe('Query service transactions', () => {
122125
database: DATABASE,
123126
authService,
124127
discoveryPeriod: ENDPOINT_DISCOVERY_PERIOD,
128+
retrier: new RetryStrategy(new RetryParameters(), logger),
125129
logger,
126-
127130
});
128131

129132
await discoveryService.ready(ENDPOINT_DISCOVERY_PERIOD);

0 commit comments

Comments
 (0)