Skip to content

Commit 4df092f

Browse files
authored
Merge pull request #447 from ml054/RDBC-858
RDBC-858 unify http compression/decompression API
2 parents 503bc45 + db0d90b commit 4df092f

File tree

9 files changed

+78
-36
lines changed

9 files changed

+78
-36
lines changed

src/Documents/BulkInsert/BulkInsertWriterBase.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { pipeline, Readable } from "node:stream";
44
import type { Gzip } from "node:zlib";
55
import { promisify } from "node:util";
66
import { TypeUtil } from "../../Utility/TypeUtil.js";
7+
import { HttpCompressionAlgorithm } from "../../Http/HttpCompressionAlgorithm.js";
78

89
export class BulkInsertWriterBase implements IDisposable {
910
private readonly _maxSizeInBuffer = 1024 * 1024;
@@ -97,8 +98,8 @@ export class BulkInsertWriterBase implements IDisposable {
9798
}
9899
}
99100

100-
public async ensureStream(compression: boolean) {
101-
if (compression) {
101+
public async ensureStream(compression: HttpCompressionAlgorithm) {
102+
if (compression === "Gzip") {
102103
const { createGzip } = await import("node:zlib");
103104
this.compressedStream = createGzip();
104105
pipeline(this.requestBodyStream, this.compressedStream, TypeUtil.NOOP);

src/Documents/BulkInsertOperation.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import { Semaphore } from "../Utility/Semaphore.js";
3434
import { BulkInsertOperationBase } from "./BulkInsert/BulkInsertOperationBase.js";
3535
import { BulkInsertOptions } from "./BulkInsert/BulkInsertOptions.js";
3636
import { BulkInsertWriter } from "./BulkInsert/BulkInsertWriter.js";
37+
import { HttpCompressionAlgorithm } from "../Http/HttpCompressionAlgorithm.js";
3738

3839
export class BulkInsertOperation extends BulkInsertOperationBase<object> {
3940

@@ -750,11 +751,11 @@ export class BulkInsertOperation extends BulkInsertOperationBase<object> {
750751
}
751752

752753
protected async _ensureStream() {
753-
await this._writer.ensureStream(this.useCompression);
754+
const compressionAlgorithm = this._useCompression ? this._conventions.httpCompressionAlgorithm : null;
755+
await this._writer.ensureStream(compressionAlgorithm);
754756

755757
const bulkCommand =
756-
new BulkInsertCommand(this._operationId, this._writer.compressedStream ?? this._writer.requestBodyStream, this._nodeTag, this._options.skipOverwriteIfUnchanged);
757-
bulkCommand.useCompression = this._useCompression;
758+
new BulkInsertCommand(this._operationId, compressionAlgorithm, this._writer.compressedStream ?? this._writer.requestBodyStream, this._nodeTag, this._options.skipOverwriteIfUnchanged);
758759

759760
this._bulkInsertExecuteTask = this._requestExecutor.execute(bulkCommand);
760761
this._bulkInsertExecuteTask
@@ -955,11 +956,12 @@ export class BulkInsertCommand extends RavenCommand<void> {
955956
private readonly _stream: Readable;
956957
private _skipOverwriteIfUnchanged: boolean;
957958
private readonly _id: number;
958-
public useCompression: boolean;
959+
private _compressionAlgorithm: HttpCompressionAlgorithm;
959960

960-
public constructor(id: number, stream: Readable, nodeTag: string, skipOverwriteIfUnchanged: boolean) {
961+
public constructor(id: number, compressionAlgorithm: HttpCompressionAlgorithm, stream: Readable, nodeTag: string, skipOverwriteIfUnchanged: boolean) {
961962
super();
962963

964+
this._compressionAlgorithm = compressionAlgorithm;
963965
this._stream = stream;
964966
this._id = id;
965967
this._selectedNodeTag = nodeTag;
@@ -973,7 +975,7 @@ export class BulkInsertCommand extends RavenCommand<void> {
973975
+ "&skipOverwriteIfUnchanged=" + (this._skipOverwriteIfUnchanged ? "true" : "false");
974976

975977
const headersBuilder = this._headers().typeAppJson();
976-
if (this.useCompression) {
978+
if (this._compressionAlgorithm === "Gzip") {
977979
headersBuilder.with("Content-Encoding", "gzip");
978980
}
979981

src/Documents/Conventions/DocumentConventions.ts

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { BulkInsertConventions } from "./BulkInsertConventions.js";
1919
import { InMemoryDocumentSessionOperations } from "../Session/InMemoryDocumentSessionOperations.js";
2020
import { ShardingConventions } from "./ShardingConventions.js";
2121
import { plural } from "../../ext/pluralize/pluralize.js";
22+
import { HttpCompressionAlgorithm } from "../../Http/HttpCompressionAlgorithm.js";
2223

2324
export type IdConvention = (databaseName: string, entity: object) => Promise<string>;
2425
export type IValueForQueryConverter<T> =
@@ -98,7 +99,9 @@ export class DocumentConventions {
9899
private _customFetch: any;
99100
private _dateUtil: DateUtil;
100101

101-
private _useCompression: boolean;
102+
private _useHttpDecompression: boolean | null = null;
103+
private _httpCompressionAlgorithm: HttpCompressionAlgorithm = "Gzip";
104+
102105
private _sendApplicationIdentifier: boolean;
103106

104107
private readonly _bulkInsert: BulkInsertConventions;
@@ -156,8 +159,6 @@ export class DocumentConventions {
156159
documentConventions: this
157160
});
158161

159-
this._useCompression = null;
160-
161162
this._dateUtilOpts = {};
162163
this._dateUtil = new DateUtil(this._dateUtilOpts);
163164

@@ -407,10 +408,6 @@ export class DocumentConventions {
407408
this._maxHttpCacheSize = value;
408409
}
409410

410-
public get hasExplicitlySetCompressionUsage() {
411-
return this._useCompression !== null;
412-
}
413-
414411
public get waitForIndexesAfterSaveChangesTimeout() {
415412
return this._waitForIndexesAfterSaveChangesTimeout;
416413
}
@@ -438,17 +435,26 @@ export class DocumentConventions {
438435
this._waitForReplicationAfterSaveChangesTimeout = value;
439436
}
440437

441-
public get useCompression() {
442-
if (this._useCompression === null) {
438+
/**
439+
* Can accept compressed HTTP response content and will use decompression methods
440+
*/
441+
public get useHttpDecompression() {
442+
if (this._useHttpDecompression === null) {
443443
return true;
444444
}
445-
446-
return this._useCompression;
445+
return this._useHttpDecompression;
447446
}
448447

449-
public set useCompression(value) {
448+
/**
449+
* Can accept compressed HTTP response content and will use decompression methods
450+
*/
451+
public set useHttpDecompression(value: boolean) {
450452
this._assertNotFrozen();
451-
this._useCompression = value;
453+
this._useHttpDecompression = value;
454+
}
455+
456+
public get httpCompressionAlgorithm() {
457+
return this._httpCompressionAlgorithm;
452458
}
453459

454460
private _dateUtilOpts: DateUtilOpts;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
2+
3+
export type HttpCompressionAlgorithm = "Gzip";

src/Http/RavenCommand.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,10 @@ export abstract class RavenCommand<TResult> {
168168
const fetchFn = fetcher ?? fetch; // support for custom fetcher
169169
const response = await fetchFn(uri, optionsToUse);
170170

171-
const effectiveStream: Readable =
171+
const effectiveStream: Stream =
172172
response.body
173173
? Readable.fromWeb(response.body)
174-
: (response.body ?? new Stream());
174+
: new Stream();
175175

176176
effectiveStream
177177
.pipe(passthrough);

src/Http/RequestExecutor.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,19 @@ export class RequestExecutor implements IDisposable {
365365
}
366366
}
367367

368+
private compressionHeaders(params: HttpRequestParameters) {
369+
if (this._conventions.useHttpDecompression) {
370+
// do nothing - node.js sends 'accept-encoding: gzip, deflate' by default
371+
} else {
372+
// disable response compression
373+
let { headers} = params;
374+
if (!headers) {
375+
params.headers = headers = {};
376+
}
377+
headers["Accept-Encoding"] = "identity";
378+
}
379+
}
380+
368381
private static async createAgent(options: Agent.Options) {
369382
try {
370383
const { Agent: AgentInstance } = await import(importFix("undici"));
@@ -1162,6 +1175,7 @@ export class RequestExecutor implements IDisposable {
11621175
if (this._shouldExecuteOnAll(chosenNode, command)) {
11631176
responseAndStream = await this._executeOnAllToFigureOutTheFastest(chosenNode, command);
11641177
} else {
1178+
this.compressionHeaders(request);
11651179
responseAndStream = await command.send(await this.getHttpAgent(), request);
11661180
}
11671181

@@ -1331,6 +1345,7 @@ export class RequestExecutor implements IDisposable {
13311345
return;
13321346
}
13331347
this._setRequestHeaders(null, null, req);
1348+
this.compressionHeaders(req);
13341349
return command.send(agent, req);
13351350
})
13361351
.then(commandResult => new IndexAndResponse(taskNumber, commandResult.response, commandResult.bodyStream))
@@ -1918,14 +1933,8 @@ export class RequestExecutor implements IDisposable {
19181933

19191934

19201935
private _setDefaultRequestOptions(): void {
1921-
// add property only if compression enabled - having fixed properly makes custom fetch (like in next.js) fail
1922-
const compressOpts = !this._conventions.hasExplicitlySetCompressionUsage || this._conventions.useCompression ? {
1923-
compress: true
1924-
} : {};
1925-
19261936
this._defaultRequestOptions = Object.assign(
19271937
DEFAULT_REQUEST_OPTIONS,
1928-
compressOpts,
19291938
this._customHttpRequestOptions);
19301939
}
19311940

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export * from "./Http/IBroadcast.js";
1313
export * from "./Http/IRaftCommand.js";
1414
export * from "./Http/NodeSelector.js";
1515
export * from "./Http/LoadBalanceBehavior.js";
16+
export * from "./Http/HttpCompressionAlgorithm.js";
1617
export * from "./Http/RavenCommand.js";
1718
export * from "./Http/ReadBalanceBehavior.js";
1819
export * from "./Http/RequestExecutor.js";

test/Executor/CompressionTests.ts

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import DocumentStore, {
66
IDocumentStore,
77
GetDatabaseNamesCommand,
88
} from "../../src/index.js";
9+
import { assertThat } from "../Utils/AssertExtensions.js";
910

1011
describe("Compression", function () {
1112

@@ -25,16 +26,35 @@ describe("Compression", function () {
2526

2627
await exec.execute(cmd);
2728
const reqParams = createReqSpy.lastCall.returnValue;
28-
//TODO: assert.ok(reqParams.compress);
29+
assertThat(reqParams.headers["Accept-Encoding"])
30+
.isNull();
2931
});
3032

31-
it("is turned off on demand", async () => {
33+
it("can be turned on on demand", async () => {
3234
const store2 = new DocumentStore(store.urls, store.database);
3335
try {
34-
store2.conventions.useCompression = false;
35-
assert.ok(store2.conventions.hasExplicitlySetCompressionUsage);
36-
assert.ok(!store2.conventions.useCompression);
36+
store2.conventions.useHttpDecompression = true;
37+
store2.initialize();
38+
39+
const exec = store2.getRequestExecutor();
40+
const cmd = new GetDatabaseNamesCommand(0, 5);
41+
const createReqSpy = exec["_createRequest"] = sinon.spy(exec["_createRequest"]);
42+
43+
await exec.execute(cmd);
44+
const reqParams = createReqSpy.lastCall.returnValue;
45+
assertThat(reqParams.headers["Accept-Encoding"])
46+
.isNull();
47+
} finally {
48+
if (store2) {
49+
store2.dispose();
50+
}
51+
}
52+
});
3753

54+
it("can be turned off on demand", async () => {
55+
const store2 = new DocumentStore(store.urls, store.database);
56+
try {
57+
store2.conventions.useHttpDecompression = false;
3858
store2.initialize();
3959

4060
const exec = store2.getRequestExecutor();
@@ -43,7 +63,8 @@ describe("Compression", function () {
4363

4464
await exec.execute(cmd);
4565
const reqParams = createReqSpy.lastCall.returnValue;
46-
//TODO: assert.ok(!reqParams.compress);
66+
assertThat(reqParams.headers["Accept-Encoding"])
67+
.isEqualTo("identity");
4768
} finally {
4869
if (store2) {
4970
store2.dispose();

tsconfig.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
"lib": [ "ES2022" ],
1717
"typeRoots": [
1818
"node_modules/@types",
19-
"node_modules/moment",
2019
"src/Types"
2120
],
2221
"types": [

0 commit comments

Comments
 (0)