Skip to content

Commit 590e82f

Browse files
authored
Merge pull request #393 from ml054/jsonl-support
RDBC-622 Consider adding JSON streaming support for streaming queries
2 parents 5bddc19 + 4f87f3c commit 590e82f

28 files changed

+369
-690
lines changed

src/Documents/Commands/FacetQueryCommand.ts

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ import { DocumentConventions } from "../Conventions/DocumentConventions";
33
import * as stream from "readable-stream";
44
import { QueryCommand } from "./QueryCommand";
55
import { RavenCommandResponsePipeline } from "../../Http/RavenCommandResponsePipeline";
6-
import { ServerResponse } from "../../Types";
6+
import { ServerCasing, ServerResponse } from "../../Types";
7+
import { ObjectUtil } from "../../Utility/ObjectUtil";
78

89
export class FacetQueryCommand extends QueryCommand {
910

@@ -26,23 +27,33 @@ export class FacetQueryCommand extends QueryCommand {
2627
fromCache: boolean,
2728
bodyCallback?: (body: string) => void): Promise<QueryResult> {
2829

29-
const rawResult = await RavenCommandResponsePipeline.create<ServerResponse<QueryResult>>()
30+
const rawResult = await RavenCommandResponsePipeline.create<ServerCasing<ServerResponse<QueryResult>>>()
3031
.collectBody(bodyCallback)
31-
.parseJsonAsync()
32-
.jsonKeysTransform("FacetQuery")
32+
.parseJsonSync()
3333
.process(bodyStream);
3434

35-
const overrides: Partial<QueryResult> = {
36-
indexTimestamp: conventions.dateUtil.parse(rawResult.indexTimestamp),
37-
lastQueryTime: conventions.dateUtil.parse(rawResult.lastQueryTime)
38-
};
39-
40-
const queryResult = Object.assign(new QueryResult(), rawResult, overrides) as QueryResult;
35+
const queryResult = FacetQueryCommand.mapToLocalObject(rawResult, conventions);
4136

4237
if (fromCache) {
4338
queryResult.durationInMs = -1;
4439
}
4540

4641
return queryResult;
4742
}
43+
44+
public static mapToLocalObject(json: ServerCasing<ServerResponse<QueryResult>>, conventions: DocumentConventions): QueryResult {
45+
const { Results, Includes, IndexTimestamp, LastQueryTime, ...rest } = json;
46+
47+
const restMapped = ObjectUtil.transformObjectKeys(rest, {
48+
defaultTransform: "camel"
49+
}) as any;
50+
51+
return {
52+
...restMapped,
53+
indexTimestamp: conventions.dateUtil.parse(IndexTimestamp),
54+
lastQueryTime: conventions.dateUtil.parse(LastQueryTime),
55+
results: Results.map(x => ObjectUtil.transformObjectKeys(x, { defaultTransform: "camel" })),
56+
includes: ObjectUtil.mapIncludesToLocalObject(json.Includes, conventions)
57+
};
58+
}
4859
}

src/Documents/Commands/GetDocumentsCommand.ts

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -346,16 +346,9 @@ export class GetDocumentsCommand extends RavenCommand<GetDocumentsResult> {
346346
}
347347

348348
private static _mapToLocalObject(json: any, conventions: DocumentConventions): GetDocumentsResult {
349-
const mappedIncludes: Record<string, any> = {};
350-
if (json.Includes) {
351-
for (const [key, value] of Object.entries(json.Includes)) {
352-
mappedIncludes[key] = ObjectUtil.transformDocumentKeys(value, conventions);
353-
}
354-
}
355-
356349
return {
357350
results: json.Results.map(x => ObjectUtil.transformDocumentKeys(x, conventions)),
358-
includes: mappedIncludes,
351+
includes: ObjectUtil.mapIncludesToLocalObject(json.Includes, conventions),
359352
compareExchangeValueIncludes: ObjectUtil.mapCompareExchangeToLocalObject(json.CompareExchangeValueIncludes),
360353
timeSeriesIncludes: ObjectUtil.mapTimeSeriesIncludesToLocalObject(json.TimeSeriesIncludes),
361354
counterIncludes: ObjectUtil.mapCounterIncludesToLocalObject(json.CounterIncludes),

src/Documents/Commands/MultiGet/MultiGetCommand.ts

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ import { IDisposable } from "../../../Types/Contracts";
1414
import { RequestExecutor } from "../../../Http/RequestExecutor";
1515
import { AggressiveCacheOptions } from "../../../Http/AggressiveCacheOptions";
1616
import { HEADERS } from "../../../Constants";
17+
import { ServerCasing, ServerResponse } from "../../../Types";
18+
import { ConditionalGetResult } from "../ConditionalGetDocumentsCommand";
19+
import { ObjectUtil } from "../../../Utility/ObjectUtil";
20+
import { camelCase } from "change-case";
1721

1822
export class MultiGetCommand extends RavenCommand<GetResponse[]> implements IDisposable {
1923
private readonly _requestExecutor: RequestExecutor;
@@ -144,27 +148,11 @@ export class MultiGetCommand extends RavenCommand<GetResponse[]> implements IDis
144148
this._throwInvalidResponse();
145149
}
146150
try {
147-
const result = await this._pipeline<GetResponse[]>()
148-
.parseJsonAsync()
149-
.jsonKeysTransform({
150-
getCurrentTransform(key, stack) {
151-
if (stack.length === 1
152-
|| stack.length === 2
153-
|| stack.length === 3) {
154-
// results.0.result
155-
return "camel";
156-
}
157-
158-
return null;
159-
}
160-
})
151+
const result = await this._pipeline<ServerCasing<ServerResponse<{ Results: GetResponse[] }>>>()
152+
.parseJsonSync()
161153
.process(bodyStream);
162154

163-
const responses = result["results"].reduce((result: GetResponse[], next) => {
164-
// TODO try to get it directly from parser
165-
next.result = TypeUtil.isNullOrUndefined(next.result) ? next.result : JSON.stringify(next.result);
166-
return [...result, next];
167-
}, []);
155+
const responses = result.Results.map(item => MultiGetCommand._mapToLocalObject(item));
168156

169157
this.result = [];
170158

@@ -243,6 +231,17 @@ export class MultiGetCommand extends RavenCommand<GetResponse[]> implements IDis
243231
}
244232
}
245233
}
234+
235+
private static _mapToLocalObject(json: ServerCasing<ServerResponse<GetResponse>>): any {
236+
// convert from Pascal to camel on top level only
237+
const item: any = {};
238+
for (const [key, value] of Object.entries(json)) {
239+
item[camelCase(key)] = value;
240+
}
241+
242+
item.result = item.result ? JSON.stringify(item.result) : null;
243+
return item;
244+
}
246245
}
247246

248247
class Cached implements IDisposable {

src/Documents/Commands/QueryCommand.ts

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { JsonSerializer } from "../../Mapping/Json/Serializer";
99
import * as stream from "readable-stream";
1010
import { RavenCommandResponsePipeline } from "../../Http/RavenCommandResponsePipeline";
1111
import { StringBuilder } from "../../Utility/StringBuilder";
12-
import { ServerResponse } from "../../Types";
12+
import { ServerCasing, ServerResponse } from "../../Types";
1313
import { QueryTimings } from "../Queries/Timings/QueryTimings";
1414
import { StringUtil } from "../../Utility/StringUtil";
1515
import { readToEnd, stringToReadable } from "../../Utility/StreamUtil";
@@ -136,8 +136,7 @@ export class QueryCommand extends RavenCommand<QueryResult> {
136136
return queryResult;
137137
}
138138

139-
//TODO: use ServerCasing<QueryTimings> instead of any, after upgrading to TS 4.2
140-
private static _mapTimingsToLocalObject(timings: any) {
139+
private static _mapTimingsToLocalObject(timings: ServerCasing<ServerResponse<QueryTimings>>) {
141140
if (!timings) {
142141
return undefined;
143142
}
@@ -154,18 +153,10 @@ export class QueryCommand extends RavenCommand<QueryResult> {
154153
}
155154

156155

157-
//TODO: use ServerCasing<ServerResponse<QueryResult>> instead of any, after upgrading to TS 4.2
158156
private static _mapToLocalObject(json: any, conventions: DocumentConventions): QueryResult {
159-
const mappedIncludes: Record<string, any> = {};
160-
if (json.Includes) {
161-
for (const [key, value] of Object.entries(json.Includes)) {
162-
mappedIncludes[key] = ObjectUtil.transformDocumentKeys(value, conventions);
163-
}
164-
}
165-
166157
const props: Omit<QueryResult, "scoreExplanations" | "cappedMaxResults" | "createSnapshot" | "resultSize"> = {
167158
results: json.Results.map(x => ObjectUtil.transformDocumentKeys(x, conventions)),
168-
includes: mappedIncludes,
159+
includes: ObjectUtil.mapIncludesToLocalObject(json.Includes, conventions),
169160
indexName: json.IndexName,
170161
indexTimestamp: conventions.dateUtil.parse(json.IndexTimestamp),
171162
includedPaths: json.IncludedPaths,

src/Documents/Conventions/DocumentConventions.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ export class DocumentConventions {
102102

103103
private readonly _bulkInsert: BulkInsertConventions;
104104

105-
private _useJsonlStreaming = false;
105+
private _useJsonlStreaming = true;
106106

107107
public get bulkInsert() {
108108
return this._bulkInsert;

src/Documents/Operations/CompareExchange/CompareExchangeResult.ts

Lines changed: 17 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import { DocumentConventions } from "../../Conventions/DocumentConventions";
22
import { throwError } from "../../../Exceptions";
33
import { TypeUtil } from "../../../Utility/TypeUtil";
4-
import { CompareExchangeResultClass } from "../../../Types";
4+
import { CompareExchangeResultClass, ServerCasing, ServerResponse } from "../../../Types";
55
import { COMPARE_EXCHANGE } from "../../../Constants";
66
import { DocumentType } from "../../DocumentAbstractions";
7+
import { ObjectUtil } from "../../../Utility/ObjectUtil";
78

89
export interface CompareExchangeResultResponse {
910
index: number;
@@ -20,39 +21,15 @@ export class CompareExchangeResult<T> {
2021
public successful: boolean;
2122

2223
public static parseFromObject<T>(
23-
{ index, value, successful }: CompareExchangeResultResponse,
24+
response: ServerCasing<ServerResponse<CompareExchangeResultResponse>>,
2425
conventions: DocumentConventions,
2526
clazz?: CompareExchangeResultClass<T>): CompareExchangeResult<T> {
26-
if (!index) {
27+
if (!response.Index) {
2728
throwError("InvalidOperationException", "Response is invalid. Index is missing");
2829
}
2930

30-
const val = value.object || null;
31-
return CompareExchangeResult._create(val, index, successful, conventions, clazz);
32-
}
33-
34-
public static parseFromString<T>(
35-
responseString: string,
36-
conventions: DocumentConventions,
37-
clazz?: CompareExchangeResultClass<T>): CompareExchangeResult<T> {
38-
39-
const response = JSON.parse(responseString);
40-
41-
const index = response["Index"];
42-
if (!index) {
43-
throwError("InvalidOperationException", "Response is invalid. Index is missing");
44-
}
45-
46-
const successful = response["Successful"];
47-
const raw = response["Value"];
48-
49-
let val = null;
50-
51-
if (raw) {
52-
val = raw[COMPARE_EXCHANGE.OBJECT_FIELD_NAME];
53-
}
54-
55-
return CompareExchangeResult._create(val, index, successful, conventions, clazz);
31+
const val = response.Value.Object || null;
32+
return CompareExchangeResult._create(val, response.Index, response.Successful, conventions, clazz);
5633
}
5734

5835
private static _create<T>(
@@ -74,13 +51,22 @@ export class CompareExchangeResult<T> {
7451
return emptyExchangeResult;
7552
}
7653

77-
let result: T;
54+
let result: any = null;
7855
if (TypeUtil.isPrimitive(val)) {
7956
result = val as any as T;
8057
} else {
58+
let rawValue = val;
8159
// val comes here with proper key case already
8260
const entityType = conventions.getJsTypeByDocumentType(clazz as DocumentType);
83-
result = conventions.deserializeEntityFromJson(entityType, val) as any as T;
61+
if (conventions.entityFieldNameConvention) {
62+
rawValue = ObjectUtil.transformObjectKeys(
63+
rawValue, {
64+
defaultTransform: conventions.entityFieldNameConvention,
65+
recursive: true,
66+
arrayRecursive: true
67+
});
68+
}
69+
result = conventions.deserializeEntityFromJson(entityType, rawValue) as any as T;
8470
}
8571

8672
const exchangeResult = new CompareExchangeResult<T>();

src/Documents/Operations/CompareExchange/CompareExchangeValueResultParser.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { CompareExchangeResultClass, EntityConstructor } from "../../../Types";
1010
export interface CompareExchangeResultItem {
1111
index: number;
1212
key: string;
13-
value: { object: object };
13+
value: { object: object, "@metadata"?: any };
1414
}
1515

1616
export interface GetCompareExchangeValuesResponse {
@@ -91,7 +91,7 @@ export class CompareExchangeValueResultParser {
9191
}
9292

9393
let rawValue = raw.object;
94-
if (TypeUtil.isPrimitiveType(clazz) || !clazz) {
94+
if (clazz && TypeUtil.isPrimitiveType(clazz) || TypeUtil.isPrimitive(rawValue)) {
9595
return new CompareExchangeValue(key, index, rawValue, metadata);
9696
} else {
9797
if (!rawValue) {

src/Documents/Operations/CompareExchange/DeleteCompareExchangeValueOperation.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { HttpRequestParameters } from "../../../Primitives/Http";
22
import { IOperation, OperationResultType } from "../OperationAbstractions";
33
import { CompareExchangeResult, CompareExchangeResultResponse } from "./CompareExchangeResult";
4-
import { CompareExchangeResultClass } from "../../../Types";
4+
import { CompareExchangeResultClass, ServerCasing, ServerResponse } from "../../../Types";
55
import { IDocumentStore } from "../../IDocumentStore";
66
import { DocumentConventions } from "../../Conventions/DocumentConventions";
77
import { HttpCache } from "../../../Http/HttpCache";
@@ -59,7 +59,7 @@ export class RemoveCompareExchangeCommand<T> extends RavenCommand<CompareExchang
5959
}
6060

6161
public createRequest(node: ServerNode): HttpRequestParameters {
62-
const uri = node.url + "/databases/" + node.database + "/cmpxchg?key=" + encodeURIComponent(this._key)
62+
const uri = node.url + "/databases/" + node.database + "/cmpxchg?key=" + encodeURIComponent(this._key)
6363
+ "&index=" + this._index;
6464
return {
6565
method: "DELETE",
@@ -69,11 +69,11 @@ export class RemoveCompareExchangeCommand<T> extends RavenCommand<CompareExchang
6969

7070
public async setResponseAsync(bodyStream: stream.Stream, fromCache: boolean): Promise<string> {
7171
let body: string = null;
72-
const resObj = await this._pipeline<CompareExchangeResultResponse>()
72+
const resObj = await this._pipeline<ServerCasing<ServerResponse<CompareExchangeResultResponse>>>()
7373
.collectBody(_ => body = _)
74-
.parseJsonAsync()
75-
.jsonKeysTransform("CompareExchangeValue", this._conventions)
74+
.parseJsonSync()
7675
.process(bodyStream);
76+
7777
this.result = CompareExchangeResult.parseFromObject(resObj, this._conventions, this._clazz);
7878
return body;
7979
}

src/Documents/Operations/CompareExchange/GetCompareExchangeValueOperation.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ import { DocumentConventions } from "../../Conventions/DocumentConventions";
77
import { IDocumentStore } from "../../IDocumentStore";
88
import { throwError } from "../../../Exceptions";
99
import { ServerNode } from "../../../Http/ServerNode";
10-
import { CompareExchangeResultClass } from "../../../Types";
10+
import { CompareExchangeResultClass, ServerCasing, ServerResponse } from "../../../Types";
1111
import { CompareExchangeValueResultParser, GetCompareExchangeValuesResponse } from "./CompareExchangeValueResultParser";
1212
import * as stream from "readable-stream";
13+
import { GetCompareExchangeValuesCommand } from "./GetCompareExchangeValuesOperation";
1314

1415
export class GetCompareExchangeValueOperation<T> implements IOperation<CompareExchangeValue<T>> {
1516

@@ -60,7 +61,7 @@ export class GetCompareExchangeValueCommand<T> extends RavenCommand<CompareExcha
6061

6162
public createRequest(node: ServerNode): HttpRequestParameters {
6263
const uri = node.url + "/databases/" + node.database + "/cmpxchg?key=" + encodeURIComponent(this._key);
63-
return { uri };
64+
return {uri};
6465
}
6566

6667
public async setResponseAsync(bodyStream: stream.Stream, fromCache: boolean): Promise<string> {
@@ -69,14 +70,14 @@ export class GetCompareExchangeValueCommand<T> extends RavenCommand<CompareExcha
6970
}
7071

7172
let body: string = null;
72-
const results = await this._pipeline<GetCompareExchangeValuesResponse>()
73+
const results = await this._pipeline<ServerCasing<ServerResponse<GetCompareExchangeValuesResponse>>>()
7374
.collectBody(x => body = x)
74-
.parseJsonAsync()
75-
.jsonKeysTransform("GetCompareExchangeValue", this._conventions)
75+
.parseJsonSync()
7676
.process(bodyStream);
7777

78-
this.result = CompareExchangeValueResultParser.getValue(
79-
results as GetCompareExchangeValuesResponse, this._materializeMetadata, this._conventions, this._clazz);
78+
const localObject = GetCompareExchangeValuesCommand.mapToLocalObject(results);
79+
80+
this.result = CompareExchangeValueResultParser.getValue(localObject, this._materializeMetadata, this._conventions, this._clazz);
8081
return body;
8182
}
8283
}

0 commit comments

Comments
 (0)