Skip to content

Commit cd694a9

Browse files
committed
RDBC-859 OOM on bulk insert - node.js client
1 parent 8359570 commit cd694a9

File tree

2 files changed

+70
-24
lines changed

2 files changed

+70
-24
lines changed

src/Documents/BulkInsertOperation.ts

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ import { BulkInsertOnProgressEventArgs } from "./Session/SessionEvents";
3131
import * as semaphore from "semaphore";
3232
import { acquireSemaphore } from "../Utility/SemaphoreUtil";
3333
import { Buffer } from "node:buffer";
34+
import { createGzip, Gzip } from "node:zlib";
35+
import { pipeline } from "readable-stream";
36+
import { promisify } from "node:util";
3437

3538
class BulkInsertStream {
3639

@@ -105,7 +108,7 @@ export class BulkInsertOperation {
105108
private _bulkInsertExecuteTask: Promise<any>;
106109
private _bulkInsertExecuteTaskErrored = false;
107110

108-
private _stream: RequestBodyStream;
111+
private _stream: stream.Readable & { flush: (callback?: () => void) => void; };
109112

110113
private _first: boolean = true;
111114
private _inProgressCommand: CommandType;
@@ -129,8 +132,8 @@ export class BulkInsertOperation {
129132
private readonly _streamLock: semaphore.Semaphore;
130133
private _heartbeatCheckInterval = 40_000;
131134

132-
//TODO: private GZipStream _compressedStream;
133-
private _requestBodyStream: RequestBodyStream; //TODO: raw reableable or wrapped with compressed
135+
private _compressedStream: Gzip;
136+
private _requestBodyStream: RequestBodyStream;
134137
private _requestBodyStreamFinished: boolean = false;
135138
private _currentWriter: BulkInsertStream;
136139
private _backgroundWriter: BulkInsertStream;
@@ -460,6 +463,12 @@ export class BulkInsertOperation {
460463
this._isInitialWrite = false;
461464

462465
await this._requestBodyStream.flush();
466+
467+
if (this._compressedStream) {
468+
const flush = promisify(this._compressedStream.flush);
469+
await flush.call(this._compressedStream);
470+
}
471+
463472
this._lastWriteToStream = new Date();
464473
}
465474
} finally {
@@ -535,29 +544,23 @@ export class BulkInsertOperation {
535544
}
536545

537546
private async _ensureStream() {
538-
//TODO: sync with c#
539-
//TODO if (CompressionLevel != CompressionLevel.NoCompression)
540-
// _streamExposerContent.Headers.ContentEncoding.Add("gzip");
541-
542547
try {
543548
this._requestBodyStream = new RequestBodyStream();
544-
this._stream = this._requestBodyStream; //TODO:
549+
this._stream = this._requestBodyStream;
550+
551+
if (this.useCompression) {
552+
this._compressedStream = createGzip();
553+
pipeline(this._requestBodyStream, this._compressedStream);
554+
}
555+
545556
const bulkCommand =
546-
new BulkInsertCommand(this._operationId, this._requestBodyStream, this._nodeTag, this._options.skipOverwriteIfUnchanged);
557+
new BulkInsertCommand(this._operationId, this._compressedStream ?? this._requestBodyStream, this._nodeTag, this._options.skipOverwriteIfUnchanged);
547558
bulkCommand.useCompression = this._useCompression;
548559

549560
this._bulkInsertExecuteTask = this._requestExecutor.execute(bulkCommand);
550561

551562
this._currentWriter.push("[");
552563

553-
/* TODO
554-
if (CompressionLevel != CompressionLevel.NoCompression)
555-
{
556-
_compressedStream = new GZipStream(_stream, CompressionLevel, leaveOpen: true);
557-
_requestBodyStream = _compressedStream;
558-
}
559-
*/
560-
561564
this._bulkInsertExecuteTask
562565
.catch(() => this._bulkInsertExecuteTaskErrored = true);
563566

@@ -598,8 +601,7 @@ export class BulkInsertOperation {
598601
try {
599602
this._currentWriter.push("]");
600603
await this._asyncWrite;
601-
this._requestBodyStream.write(this._currentWriter.toBuffer());
602-
//TODO: _compressedStream?.Dispose();
604+
this._requestBodyStream.push(this._currentWriter.toBuffer());
603605
await this._stream.flush();
604606
} finally {
605607
context.dispose();
@@ -1017,7 +1019,7 @@ export class BulkInsertOperation {
10171019

10181020
await this._operation.flushIfNeeded();
10191021

1020-
this._operation._currentWriter.push(bytes); //TODO: do we want to stream here?
1022+
this._operation._currentWriter.push(bytes);
10211023

10221024
await this._operation.flushIfNeeded();
10231025
} catch (e) {
@@ -1078,8 +1080,12 @@ export class BulkInsertCommand extends RavenCommand<void> {
10781080
+ "/bulk_insert?id=" + this._id
10791081
+ "&skipOverwriteIfUnchanged=" + (this._skipOverwriteIfUnchanged ? "true" : "false");
10801082

1081-
const headers = this._headers().typeAppJson().build();
1082-
// TODO: useCompression ? new GzipCompressingEntity(_stream) : _stream);
1083+
const headersBuilder = this._headers().typeAppJson();
1084+
if (this.useCompression) {
1085+
headersBuilder.with("Content-Encoding", "gzip");
1086+
}
1087+
1088+
const headers = headersBuilder.build();
10831089
return {
10841090
method: "POST",
10851091
uri,

test/Ported/BulkInsert/BulkInsertsTest.ts

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import { createMetadataDictionary } from "../../../src/Mapping/MetadataAsDiction
88
import { CONSTANTS } from "../../../src/Constants";
99
import { DateUtil } from "../../../src/Utility/DateUtil";
1010
import { delay } from "../../../src/Utility/PromiseUtil";
11+
import { assertThat } from "../../Utils/AssertExtensions";
12+
import { Timer } from "../../../src/Primitives/Timer";
1113

1214
describe("bulk insert", function () {
1315

@@ -20,7 +22,7 @@ describe("bulk insert", function () {
2022
afterEach(async () =>
2123
await disposeTestDocumentStore(store));
2224

23-
it("simple bulk insert should work", async () => {
25+
const bulkInsertTest = async (compressed: boolean) => {
2426
const fooBar1 = new FooBar();
2527
fooBar1.name = "John Doe";
2628

@@ -33,7 +35,9 @@ describe("bulk insert", function () {
3335
const fooBar4 = new FooBar();
3436
fooBar4.name = "Mega Jane";
3537

36-
const bulkInsert = store.bulkInsert();
38+
const bulkInsert = store.bulkInsert({
39+
useCompression: compressed
40+
});
3741

3842
await bulkInsert.store(fooBar1);
3943
await bulkInsert.store(fooBar2);
@@ -65,8 +69,44 @@ describe("bulk insert", function () {
6569
} finally {
6670
session.dispose();
6771
}
72+
}
73+
74+
it("simple bulk insert should work - no compressed", async () => {
75+
await bulkInsertTest(false);
6876
});
6977

78+
it("simple bulk insert should work - compressed", async () => {
79+
await bulkInsertTest(true);
80+
});
81+
82+
it ("can send heartbeats", async () => {
83+
84+
const bulkInsert = store.bulkInsert();
85+
try {
86+
87+
// access private variable
88+
const timer = (bulkInsert as any)._timer as Timer;
89+
const checkInterval = (bulkInsert as any)._heartbeatCheckInterval;
90+
91+
assertThat(timer)
92+
.isNotNull();
93+
assertThat(checkInterval)
94+
.isEqualTo(40_000);
95+
96+
timer.change(20, 20);
97+
(bulkInsert as any)._heartbeatCheckInterval = 20;
98+
99+
await bulkInsert.store(new FooBar());
100+
101+
await delay(250); //it should send heartbeats
102+
103+
await bulkInsert.store(new FooBar());
104+
105+
} finally {
106+
await bulkInsert.finish();
107+
}
108+
})
109+
70110
it("can be killed early before making connection", async () => {
71111
const bulkInsert = store.bulkInsert();
72112
try {

0 commit comments

Comments
 (0)