Skip to content

Commit 5bddc19

Browse files
authored
Merge pull request #391 from ml054/jsonl-support
JSONL support
2 parents 3eece5a + 93f11d8 commit 5bddc19

File tree

10 files changed

+204
-92
lines changed

10 files changed

+204
-92
lines changed

src/Documents/Commands/QueryStreamCommand.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@ export class QueryStreamCommand extends RavenCommand<StreamResultResponse> {
3030
}
3131

3232
public createRequest(node: ServerNode): HttpRequestParameters {
33+
const format = this._conventions.useJsonlStreaming ? 'jsonl' : 'json';
34+
3335
return {
3436
method: "POST",
35-
uri: `${node.url}/databases/${node.database}/streams/queries`,
37+
uri: `${node.url}/databases/${node.database}/streams/queries?format=${format}`,
3638
body: writeIndexQuery(this._conventions, this._indexQuery),
3739
headers: this._headers().typeAppJson().build()
3840
};

src/Documents/Conventions/DocumentConventions.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,21 @@ export class DocumentConventions {
102102

103103
private readonly _bulkInsert: BulkInsertConventions;
104104

105+
private _useJsonlStreaming = false;
106+
105107
public get bulkInsert() {
106108
return this._bulkInsert;
107109
}
108110

111+
public get useJsonlStreaming() {
112+
return this._useJsonlStreaming;
113+
}
114+
115+
public set useJsonlStreaming(value) {
116+
this._assertNotFrozen();
117+
this._useJsonlStreaming = value;
118+
}
119+
109120
public constructor() {
110121
this._readBalanceBehavior = "None";
111122
this._identityPartsSeparator = "/";

src/Documents/Session/Operations/StreamOperation.ts

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,12 @@ import { StartingWithOptions } from "../IDocumentSession";
77
import { StreamCommand } from "../../Commands/StreamCommand";
88
import { TypeUtil } from "../../../Utility/TypeUtil";
99
import { StreamResultResponse } from "../../Commands/StreamResultResponse";
10-
import { streamValues } from "stream-json/streamers/StreamValues";
11-
import { ignore } from "stream-json/filters/Ignore";
12-
import { RavenCommandResponsePipeline } from "../../../Http/RavenCommandResponsePipeline";
1310
import { getDocumentResultsAsObjects } from "../../../Mapping/Json/Streams/Pipelines";
14-
import { TransformKeysJsonStream } from "../../../Mapping/Json/Streams/TransformKeysJsonStream";
15-
import { getTransformJsonKeysProfile } from "../../../Mapping/Json/Streams/TransformJsonKeysProfiles";
1611
import { StringBuilder } from "../../../Utility/StringBuilder";
12+
import { ObjectUtil } from "../../../Utility/ObjectUtil";
13+
import { RavenCommandResponsePipeline } from "../../../Http/RavenCommandResponsePipeline";
14+
import { ignore } from "stream-json/filters/Ignore";
15+
import { streamValues } from "stream-json/streamers/StreamValues";
1716

1817
export class StreamOperation {
1918
private readonly _session: InMemoryDocumentSessionOperations;
@@ -50,7 +49,9 @@ export class StreamOperation {
5049
}
5150

5251
private _createRequestForIdPrefix(idPrefix: string, opts: StartingWithOptions): StreamCommand {
53-
const sb = new StringBuilder("streams/docs?");
52+
const format = this._session.conventions.useJsonlStreaming ? 'jsonl' : 'json';
53+
54+
const sb = new StringBuilder(`streams/docs?format=${format}&`);
5455
if (idPrefix) {
5556
sb.append("startsWith=")
5657
.append(encodeURIComponent(idPrefix)).append("&");
@@ -89,25 +90,34 @@ export class StreamOperation {
8990
throwError("IndexDoesNotExistException", "The index does not exists, failed to stream results.");
9091
}
9192

92-
const result = getDocumentResultsAsObjects(this._session.conventions).stream(response.stream);
93-
93+
const result = getDocumentResultsAsObjects(this._session.conventions, !!this._isQueryStream)
94+
.stream(response.stream);
95+
9496
if (this._isQueryStream) {
95-
RavenCommandResponsePipeline.create()
96-
.parseJsonAsync([
97+
const pipeline = RavenCommandResponsePipeline.create<object[]>();
98+
99+
this._session.conventions.useJsonlStreaming
100+
? pipeline.parseJsonlAsync('Stats')
101+
: pipeline.parseJsonAsync([
97102
ignore({ filter: /^Results|Includes$/ }),
98-
new TransformKeysJsonStream(getTransformJsonKeysProfile("CommandResponsePayload")),
99103
streamValues()
100-
])
101-
.stream(response.stream)
104+
]);
105+
106+
pipeline.stream(response.stream)
102107
.on("error", err => result.emit("error", err))
103108
.on("data", data => {
109+
const rawWithCamel = ObjectUtil.transformObjectKeys(data["value"], {
110+
defaultTransform: "camel"
111+
});
112+
104113
const statsResult =
105114
this._session.conventions.objectMapper
106-
.fromObjectLiteral(data["value"], {
115+
.fromObjectLiteral(rawWithCamel, {
107116
nestedTypes: {
108117
indexTimestamp: "date"
109118
}
110119
});
120+
111121
result.emit("stats", statsResult);
112122
});
113123
}

src/Http/RavenCommandResponsePipeline.ts

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,27 @@ import {
1818
} from "../Mapping/Json/Streams/CollectResultStream";
1919
import { throwError, getError } from "../Exceptions";
2020
import {
21-
TransformJsonKeysStreamOptions,
22-
TransformKeysJsonStream } from "../Mapping/Json/Streams/TransformKeysJsonStream";
23-
import {
24-
TransformJsonKeysProfile,
25-
getTransformJsonKeysProfile } from "../Mapping/Json/Streams/TransformJsonKeysProfiles";
21+
TransformJsonKeysStreamOptions,
22+
TransformKeysJsonStream
23+
} from "../Mapping/Json/Streams/TransformKeysJsonStream";
24+
import {
25+
getTransformJsonKeysProfile,
26+
TransformJsonKeysProfile
27+
} from "../Mapping/Json/Streams/TransformJsonKeysProfiles";
2628
import { TypeUtil } from "../Utility/TypeUtil";
2729
import * as Asm from "stream-json/Assembler";
2830
import { DocumentConventions } from "../Documents/Conventions/DocumentConventions";
2931
import { ErrorFirstCallback } from "../Types/Callbacks";
3032
import { StringBuilder } from "../Utility/StringBuilder";
33+
import { parser as jsonlParser } from "stream-json/jsonl/Parser";
3134

3235
export interface RavenCommandResponsePipelineOptions<TResult> {
3336
collectBody?: boolean | ((body: string) => void);
3437
jsonAsync?: {
35-
filters: any[]
38+
filters: any[];
39+
};
40+
jsonlAsync?: {
41+
transforms: stream.Transform[];
3642
};
3743
jsonSync?: boolean;
3844
streamKeyCaseTransform?: ObjectKeyCaseTransformStreamOptions;
@@ -65,6 +71,34 @@ export class RavenCommandResponsePipeline<TStreamResult> extends EventEmitter {
6571
return this;
6672
}
6773

74+
/**
75+
* @param type Type of object to extract from objects stream - use Raw to skip extraction.
76+
* @param options
77+
*/
78+
public parseJsonlAsync(type: "Item" | "Stats" | "Raw", options: { transforms?: stream.Transform[] } = {}) {
79+
const transforms = options?.transforms ?? [];
80+
if (type !== "Raw") {
81+
const extractItemTransform = new stream.Transform({
82+
objectMode: true,
83+
transform(chunk, encoding, callback) {
84+
const value = chunk["value"][type];
85+
if (!value) {
86+
return callback();
87+
}
88+
89+
callback(null, {...chunk, value});
90+
}
91+
});
92+
93+
transforms.push(extractItemTransform);
94+
}
95+
this._opts.jsonlAsync = {
96+
transforms
97+
};
98+
99+
return this;
100+
}
101+
68102
public collectBody(callback?: (body: string) => void) {
69103
this._opts.collectBody = callback || true;
70104
return this;
@@ -171,6 +205,12 @@ export class RavenCommandResponsePipeline<TStreamResult> extends EventEmitter {
171205
if (opts.jsonAsync.filters && opts.jsonAsync.filters.length) {
172206
streams.push(...opts.jsonAsync.filters);
173207
}
208+
} else if (opts.jsonlAsync) {
209+
streams.push(jsonlParser());
210+
211+
if (opts.jsonlAsync.transforms) {
212+
streams.push(...opts.jsonlAsync.transforms);
213+
}
174214
} else if (opts.jsonSync) {
175215
const bytesChunks = [];
176216
const parseJsonSyncTransform = new stream.Transform({

src/Mapping/Json/Streams/Pipelines.ts

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,45 @@
11
import * as stream from "readable-stream";
22
import { RavenCommandResponsePipeline } from "../../../Http/RavenCommandResponsePipeline";
3-
import { pick } from "stream-json/filters/Pick";
4-
import { streamArray } from "stream-json/streamers/StreamArray";
5-
import { stringer } from "stream-json/Stringer";
63
import { DocumentConventions } from "../../../Documents/Conventions/DocumentConventions";
4+
import { stringer as jsonlStringer } from "stream-json/jsonl/Stringer";
5+
import { stringer } from "stream-json/Stringer";
76
import { TransformKeysJsonStream } from "./TransformKeysJsonStream";
87
import { getTransformJsonKeysProfile } from "./TransformJsonKeysProfiles";
8+
import { pick } from "stream-json/filters/Pick";
9+
import { streamArray } from "stream-json/streamers/StreamArray";
910

1011
export function getDocumentResultsAsObjects(
11-
conventions: DocumentConventions): RavenCommandResponsePipeline<object[]> {
12+
conventions: DocumentConventions,
13+
queryStream: boolean
14+
): RavenCommandResponsePipeline<object[]> {
15+
const pipeline = RavenCommandResponsePipeline.create<object[]>();
1216

13-
return RavenCommandResponsePipeline.create<object[]>()
14-
.parseJsonAsync([
17+
return conventions.useJsonlStreaming
18+
? pipeline.parseJsonlAsync(queryStream ? "Item" : "Raw")
19+
: pipeline.parseJsonAsync([
1520
new TransformKeysJsonStream(getTransformJsonKeysProfile("DocumentLoad", conventions)),
1621
pick({ filter: "results" }),
1722
streamArray()
1823
]);
1924
}
2025

21-
export function getDocumentResultsPipeline(
22-
conventions: DocumentConventions): RavenCommandResponsePipeline<object[]> {
23-
return RavenCommandResponsePipeline.create<object[]>()
24-
.parseJsonAsync([
25-
new TransformKeysJsonStream(getTransformJsonKeysProfile("DocumentLoad", conventions)),
26-
stringer({ useValues: true })
27-
]);
26+
export function getDocumentStreamResultsIntoStreamPipeline(
27+
conventions: DocumentConventions
28+
): RavenCommandResponsePipeline<object[]> {
29+
const pipeline = RavenCommandResponsePipeline.create<object[]>();
30+
31+
return conventions.useJsonlStreaming
32+
? pipeline.parseJsonlAsync('Item', {
33+
transforms: [
34+
// TODO: conventions? + what's the purpose?
35+
jsonlStringer({ replacer: (key, value) => key === '' ? value.value : value }),
36+
]
37+
})
38+
: pipeline
39+
.parseJsonAsync([
40+
new TransformKeysJsonStream(getTransformJsonKeysProfile("DocumentLoad", conventions)),
41+
stringer({ useValues: true })
42+
]);
2843
}
2944

3045
export async function streamResultsIntoStream(
@@ -33,7 +48,7 @@ export async function streamResultsIntoStream(
3348
writable: stream.Writable): Promise<void> {
3449

3550
return new Promise<void>((resolve, reject) => {
36-
getDocumentResultsPipeline(conventions)
51+
getDocumentStreamResultsIntoStreamPipeline(conventions)
3752
.stream(bodyStream, writable, (err) => {
3853
err ? reject(err) : resolve();
3954
});

src/Mapping/ObjectMapper.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ export class TypesAwareObjectMapper implements ITypesAwareObjectMapper {
3333
public constructor(opts?: TypesAwareJsonObjectMapperOptions) {
3434
if (opts) {
3535
this._dateFormat = opts.dateFormat;
36-
36+
3737
if (!opts.documentConventions) {
3838
throwError("InvalidArgumentException", "Document conventions cannot be empty.");
3939
}
@@ -415,7 +415,7 @@ export class TypesAwareObjectMapper implements ITypesAwareObjectMapper {
415415
return Array.from(map.entries()).reduce((result, [ name, value ]) => {
416416
return [
417417
...result,
418-
[
418+
[
419419
this._makeObjectLiteral(name, valuePathPrefix + "KEY", typeInfoCallback, knownTypes),
420420
this._makeObjectLiteral(value, valuePathPrefix, typeInfoCallback, knownTypes)
421421
]
@@ -430,7 +430,7 @@ export class TypesAwareObjectMapper implements ITypesAwareObjectMapper {
430430
if (TypeUtil.isObject(obj)) {
431431
if (objPathPrefix) { // if it's non-root object
432432
const matchedType = TypeUtil.findType(obj, knownTypes);
433-
if (!skipTypes
433+
if (!skipTypes
434434
&& matchedType
435435
&& matchedType.name !== "Function") {
436436
typeInfoCallback({ [objPathPrefix]: matchedType.name });

test/Ported/Core/Streaming/DocumentStreaming.ts

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { testContext, disposeTestDocumentStore } from "../../../Utils/TestUtil";
22

33
import {
4+
DocumentStore,
45
IDocumentStore,
56
StreamResult,
67
} from "../../../../src";
@@ -36,30 +37,46 @@ describe("document streaming", function () {
3637
await session.saveChanges();
3738
}
3839

39-
it("can stream documents starting with", async () => {
40-
await prepareData(200);
40+
async function streamDocuments(format: "json" | "jsonl") {
41+
const newStore = new DocumentStore(store.urls, store.database);
42+
newStore.conventions.useJsonlStreaming = format === "jsonl";
43+
newStore.initialize();
44+
try {
4145

42-
{
43-
const session = store.openSession();
46+
await prepareData(200);
4447

45-
const queryStream = await session.advanced.stream<User>("users/");
48+
{
49+
const session = newStore.openSession();
4650

47-
const items = [];
48-
queryStream.on("data", item => {
49-
items.push(item);
50-
});
51+
const queryStream = await session.advanced.stream<User>("users/");
5152

52-
await StreamUtil.finishedAsync(queryStream);
53+
const items = [];
54+
queryStream.on("data", item => {
55+
items.push(item);
56+
});
5357

54-
assert.strictEqual(items.length, 200);
55-
items.forEach(item => {
56-
assertStreamResultEntry(item, (doc: any) => {
57-
assert.ok(doc);
58-
assert.ok(doc.name);
59-
assert.ok(doc.lastName);
58+
await StreamUtil.finishedAsync(queryStream);
59+
60+
assert.strictEqual(items.length, 200);
61+
items.forEach(item => {
62+
assertStreamResultEntry(item, (doc: any) => {
63+
assert.ok(doc);
64+
assert.ok(doc.name);
65+
assert.ok(doc.lastName);
66+
});
6067
});
61-
});
68+
}
69+
} finally {
70+
newStore.dispose();
6271
}
72+
}
73+
74+
it("can stream documents starting with - json", async () => {
75+
await streamDocuments("json");
76+
});
77+
78+
it("can stream documents starting with - jsonl", async () => {
79+
await streamDocuments("jsonl");
6380
});
6481

6582
it("[TODO] can stream starting with prefix using opts");

0 commit comments

Comments
 (0)