Skip to content

Commit 864ca38

Browse files
committed
RDBC-622 parse patch in sync way
1 parent 20619a4 commit 864ca38

File tree

7 files changed

+63
-219
lines changed

7 files changed

+63
-219
lines changed

src/Documents/Commands/MultiGet/MultiGetCommand.ts

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ import { IDisposable } from "../../../Types/Contracts";
1414
import { RequestExecutor } from "../../../Http/RequestExecutor";
1515
import { AggressiveCacheOptions } from "../../../Http/AggressiveCacheOptions";
1616
import { HEADERS } from "../../../Constants";
17+
import { ServerCasing, ServerResponse } from "../../../Types";
18+
import { ConditionalGetResult } from "../ConditionalGetDocumentsCommand";
19+
import { ObjectUtil } from "../../../Utility/ObjectUtil";
20+
import { camelCase } from "change-case";
1721

1822
export class MultiGetCommand extends RavenCommand<GetResponse[]> implements IDisposable {
1923
private readonly _requestExecutor: RequestExecutor;
@@ -144,27 +148,11 @@ export class MultiGetCommand extends RavenCommand<GetResponse[]> implements IDis
144148
this._throwInvalidResponse();
145149
}
146150
try {
147-
const result = await this._pipeline<GetResponse[]>()
148-
.parseJsonAsync()
149-
.jsonKeysTransform({
150-
getCurrentTransform(key, stack) {
151-
if (stack.length === 1
152-
|| stack.length === 2
153-
|| stack.length === 3) {
154-
// results.0.result
155-
return "camel";
156-
}
157-
158-
return null;
159-
}
160-
})
151+
const result = await this._pipeline<ServerCasing<ServerResponse<{ Results: GetResponse[] }>>>()
152+
.parseJsonSync()
161153
.process(bodyStream);
162154

163-
const responses = result["results"].reduce((result: GetResponse[], next) => {
164-
// TODO try to get it directly from parser
165-
next.result = TypeUtil.isNullOrUndefined(next.result) ? next.result : JSON.stringify(next.result);
166-
return [...result, next];
167-
}, []);
155+
const responses = result.Results.map(item => MultiGetCommand._mapToLocalObject(item));
168156

169157
this.result = [];
170158

@@ -243,6 +231,17 @@ export class MultiGetCommand extends RavenCommand<GetResponse[]> implements IDis
243231
}
244232
}
245233
}
234+
235+
private static _mapToLocalObject(json: ServerCasing<ServerResponse<GetResponse>>): any {
236+
// convert from Pascal to camel on top level only
237+
const item: any = {};
238+
for (const [key, value] of Object.entries(json)) {
239+
item[camelCase(key)] = value;
240+
}
241+
242+
item.result = item.result ? JSON.stringify(item.result) : null;
243+
return item;
244+
}
246245
}
247246

248247
class Cached implements IDisposable {

src/Documents/Operations/PatchOperation.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { ServerNode } from "../../Http/ServerNode";
1111
import { PatchResult } from "./PatchResult";
1212
import * as stream from "readable-stream";
1313
import { ObjectUtil } from "../../Utility/ObjectUtil";
14+
import { ServerCasing, ServerResponse } from "../../Types";
1415

1516
export interface Payload {
1617
patch: PatchRequest;
@@ -175,11 +176,25 @@ export class PatchCommand extends RavenCommand<PatchResult> {
175176
}
176177

177178
let body;
178-
this.result = await this._pipeline<PatchResult>()
179+
const results= await this._pipeline<ServerCasing<ServerResponse<PatchResult>>>()
179180
.collectBody(_ => body = _)
180-
.parseJsonAsync()
181-
.jsonKeysTransform("Patch", this._conventions)
181+
.parseJsonSync()
182182
.process(bodyStream);
183+
184+
this.result = PatchCommand._mapToLocalObject(results, this._conventions);
185+
183186
return body;
184187
}
188+
189+
private static _mapToLocalObject(json: ServerCasing<ServerResponse<PatchResult>>, conventions: DocumentConventions): PatchResult {
190+
return {
191+
changeVector: json.ChangeVector,
192+
collection: json.Collection,
193+
debug: json.Debug,
194+
lastModified: conventions.dateUtil.parse(json.LastModified),
195+
status: json.Status,
196+
modifiedDocument: ObjectUtil.transformDocumentKeys(json.ModifiedDocument, conventions),
197+
originalDocument: ObjectUtil.transformDocumentKeys(json.OriginalDocument, conventions)
198+
}
199+
}
185200
}

src/Http/RavenCommandResponsePipeline.ts

Lines changed: 2 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,6 @@ import {
1717
lastChunk
1818
} from "../Mapping/Json/Streams/CollectResultStream";
1919
import { throwError, getError } from "../Exceptions";
20-
import {
21-
TransformJsonKeysStreamOptions,
22-
TransformKeysJsonStream
23-
} from "../Mapping/Json/Streams/TransformKeysJsonStream";
24-
import {
25-
getTransformJsonKeysProfile,
26-
TransformJsonKeysProfile
27-
} from "../Mapping/Json/Streams/TransformJsonKeysProfiles";
2820
import { TypeUtil } from "../Utility/TypeUtil";
2921
import * as Asm from "stream-json/Assembler";
3022
import { DocumentConventions } from "../Documents/Conventions/DocumentConventions";
@@ -44,7 +36,6 @@ export interface RavenCommandResponsePipelineOptions<TResult> {
4436
streamKeyCaseTransform?: ObjectKeyCaseTransformStreamOptions;
4537
collectResult: CollectResultStreamOptions<TResult>;
4638
transform?: stream.Stream;
47-
transformKeys?: TransformJsonKeysStreamOptions;
4839
}
4940

5041
export class RavenCommandResponsePipeline<TStreamResult> extends EventEmitter {
@@ -75,20 +66,16 @@ export class RavenCommandResponsePipeline<TStreamResult> extends EventEmitter {
7566
* @param type Type of object to extract from objects stream - use Raw to skip extraction.
7667
* @param options
7768
*/
78-
public parseJsonlAsync(valueExtractor: (obj: any) => any, conventions?: DocumentConventions, options: { transforms?: stream.Transform[] } = {}) {
69+
public parseJsonlAsync(valueExtractor: (obj: any) => any, options: { transforms?: stream.Transform[] } = {}) {
7970
const transforms = options?.transforms ?? [];
8071
const extractItemTransform = new stream.Transform({
8172
objectMode: true,
8273
transform(chunk, encoding, callback) {
83-
let value = valueExtractor(chunk["value"]);
74+
const value = valueExtractor(chunk["value"]);
8475
if (!value) {
8576
return callback();
8677
}
8778

88-
if (conventions) {
89-
value = ObjectUtil.transformDocumentKeys(value, conventions);
90-
}
91-
9279
callback(null, {...chunk, value});
9380
}
9481
});
@@ -107,32 +94,6 @@ export class RavenCommandResponsePipeline<TStreamResult> extends EventEmitter {
10794
return this;
10895
}
10996

110-
public jsonKeysTransform(): this;
111-
public jsonKeysTransform(profile: TransformJsonKeysProfile, conventions: DocumentConventions): this;
112-
public jsonKeysTransform(profile: TransformJsonKeysProfile): this;
113-
public jsonKeysTransform(opts: TransformJsonKeysStreamOptions): this;
114-
public jsonKeysTransform(
115-
optsOrProfile?: TransformJsonKeysStreamOptions | TransformJsonKeysProfile,
116-
conventions?: DocumentConventions): this {
117-
118-
if (!this._opts.jsonAsync) {
119-
throwError("InvalidOperationException",
120-
"Cannot use transformKeys without doing parseJsonAsync() first.");
121-
}
122-
123-
if (!optsOrProfile) {
124-
throwError("InvalidArgumentException", "Must provide transform opts or profile name.");
125-
}
126-
127-
if (TypeUtil.isString(optsOrProfile)) {
128-
this._opts.transformKeys = getTransformJsonKeysProfile(optsOrProfile, conventions);
129-
} else {
130-
this._opts.transformKeys = optsOrProfile;
131-
}
132-
133-
return this;
134-
}
135-
13697
public objectKeysTransform(defaultTransform: CasingConvention, profile?: ObjectKeyCaseTransformProfile): this;
13798
public objectKeysTransform(opts: ObjectKeyCaseTransformStreamOptions): this;
13899
public objectKeysTransform(
@@ -248,10 +209,6 @@ export class RavenCommandResponsePipeline<TStreamResult> extends EventEmitter {
248209
streams.push(new ObjectKeyCaseTransformStream(keyCaseOpts));
249210
}
250211

251-
if (opts.transformKeys) {
252-
streams.push(new TransformKeysJsonStream(opts.transformKeys) as any);
253-
}
254-
255212
return streams;
256213
}
257214

src/Mapping/Json/Streams/Pipelines.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,40 @@ import { RavenCommandResponsePipeline } from "../../../Http/RavenCommandResponse
33
import { DocumentConventions } from "../../../Documents/Conventions/DocumentConventions";
44
import { stringer as jsonlStringer } from "stream-json/jsonl/Stringer";
55
import { stringer } from "stream-json/Stringer";
6-
import { TransformKeysJsonStream } from "./TransformKeysJsonStream";
7-
import { getTransformJsonKeysProfile } from "./TransformJsonKeysProfiles";
86
import { pick } from "stream-json/filters/Pick";
97
import { streamArray } from "stream-json/streamers/StreamArray";
8+
import { ObjectUtil } from "../../../Utility/ObjectUtil";
109

1110
export function getDocumentResultsAsObjects(
1211
conventions: DocumentConventions,
1312
queryStream: boolean
1413
): RavenCommandResponsePipeline<object[]> {
1514
const pipeline = RavenCommandResponsePipeline.create<object[]>();
1615

16+
const keysTransform = new stream.Transform({
17+
objectMode: true,
18+
transform(chunk, encoding, callback) {
19+
let value = chunk["value"];
20+
if (!value) {
21+
return callback();
22+
}
23+
24+
if (conventions) {
25+
value = ObjectUtil.transformDocumentKeys(value, conventions);
26+
}
27+
28+
callback(null, {...chunk, value});
29+
}
30+
});
31+
1732
return conventions.useJsonlStreaming
18-
? pipeline.parseJsonlAsync(queryStream ? x => x["Item"] : x => x, conventions)
33+
? pipeline.parseJsonlAsync(queryStream ? x => x["Item"] : x => x, {
34+
transforms: [keysTransform]
35+
})
1936
: pipeline.parseJsonAsync([
20-
new TransformKeysJsonStream(getTransformJsonKeysProfile("DocumentLoad", conventions)),
21-
pick({ filter: "results" }),
22-
streamArray()
37+
pick({ filter: "Results" }),
38+
streamArray(),
39+
keysTransform
2340
]);
2441
}
2542

@@ -29,14 +46,13 @@ export function getDocumentStreamResultsIntoStreamPipeline(
2946
const pipeline = RavenCommandResponsePipeline.create<object[]>();
3047

3148
return conventions.useJsonlStreaming
32-
? pipeline.parseJsonlAsync(x => x["Item"], conventions, {
49+
? pipeline.parseJsonlAsync(x => x["Item"], {
3350
transforms: [
3451
jsonlStringer({ replacer: (key, value) => key === '' ? value.value : value }),
3552
]
3653
})
3754
: pipeline
3855
.parseJsonAsync([
39-
new TransformKeysJsonStream(getTransformJsonKeysProfile("DocumentLoad", conventions)),
4056
stringer({ useValues: true })
4157
]);
4258
}

0 commit comments

Comments
 (0)