Skip to content

Commit e7caacb

Browse files
committed
sync with java: 4c961f143020b34aec8edd04963b599b3475c365
1 parent e23bb2a commit e7caacb

File tree

71 files changed

+714
-171
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+714
-171
lines changed

src/Documents/BulkInsertOperation.ts

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import { TimeSeriesOperations } from "./TimeSeries/TimeSeriesOperations";
2828
import { TimeSeriesValuesHelper } from "./Session/TimeSeries/TimeSeriesValuesHelper";
2929

3030
export class BulkInsertOperation {
31+
private _options: BulkInsertOptions;
3132
private readonly _generateEntityIdOnTheClient: GenerateEntityIdOnTheClient;
3233

3334
private readonly _requestExecutor: RequestExecutor;
@@ -45,6 +46,7 @@ export class BulkInsertOperation {
4546
private readonly _timeSeriesBatchSize: number;
4647

4748
private _concurrentCheck: number = 0;
49+
private _isInitialWrite: boolean = true;
4850

4951
private _bulkInsertAborted: Promise<void>;
5052
private _abortReject: Function;
@@ -53,12 +55,15 @@ export class BulkInsertOperation {
5355
private _requestBodyStream: stream.PassThrough;
5456
private _pipelineFinished: Promise<void>;
5557

56-
public constructor(database: string, store: IDocumentStore) {
58+
public constructor(database: string, store: IDocumentStore, options?: BulkInsertOptions) {
5759
this._conventions = store.conventions;
5860
if (StringUtil.isNullOrEmpty(database)) {
5961
this._throwNoDatabase();
6062
}
6163
this._requestExecutor = store.getRequestExecutor(database);
64+
this._useCompression = options ? options.useCompression : false;
65+
66+
this._options = options ?? {};
6267

6368
this._timeSeriesBatchSize = this._conventions.bulkInsert.timeSeriesBatchSize;
6469

@@ -310,7 +315,7 @@ export class BulkInsertOperation {
310315

311316
this._requestBodyStream = new stream.PassThrough();
312317
const bulkCommand =
313-
new BulkInsertCommand(this._operationId, this._requestBodyStream, this._nodeTag);
318+
new BulkInsertCommand(this._operationId, this._requestBodyStream, this._nodeTag, this._options.skipOverwriteIfUnchanged);
314319
bulkCommand.useCompression = this._useCompression;
315320

316321
const bulkCommandPromise = this._requestExecutor.execute(bulkCommand);
@@ -787,19 +792,25 @@ export class BulkInsertCommand extends RavenCommand<void> {
787792
}
788793

789794
private readonly _stream: stream.Readable;
795+
private _skipOverwriteIfUnchanged: boolean;
790796
private readonly _id: number;
791797
public useCompression: boolean;
792798

793-
public constructor(id: number, stream: stream.Readable, nodeTag: string) {
799+
public constructor(id: number, stream: stream.Readable, nodeTag: string, skipOverwriteIfUnchanged: boolean) {
794800
super();
795801

796802
this._stream = stream;
797803
this._id = id;
798804
this._selectedNodeTag = nodeTag;
805+
this._skipOverwriteIfUnchanged = skipOverwriteIfUnchanged;
799806
}
800807

801808
public createRequest(node: ServerNode): HttpRequestParameters {
802-
const uri = node.url + "/databases/" + node.database + "/bulk_insert?id=" + this._id;
809+
const uri = node.url
810+
+ "/databases/" + node.database
811+
+ "/bulk_insert?id=" + this._id
812+
+ "&skipOverwriteIfUnchanged=" + (this._skipOverwriteIfUnchanged ? "true" : "false");
813+
803814
const headers = this._headers().typeAppJson().build();
804815
// TODO: useCompression ? new GzipCompressingEntity(_stream) : _stream);
805816
return {

src/Documents/Commands/ConditionalGetDocumentsCommand.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { DocumentConventions } from "../Conventions/DocumentConventions";
88
import { readToEnd, stringToReadable } from "../../Utility/StreamUtil";
99
import { RavenCommandResponsePipeline } from "../../Http/RavenCommandResponsePipeline";
1010
import { ObjectUtil } from "../../Utility/ObjectUtil";
11+
import { CONSTANTS, HEADERS } from "../../Constants";
1112

1213
export class ConditionalGetDocumentsCommand extends RavenCommand<ConditionalGetResult> {
1314

@@ -31,7 +32,7 @@ export class ConditionalGetDocumentsCommand extends RavenCommand<ConditionalGetR
3132
uri,
3233
method: "GET",
3334
headers: {
34-
"If-None-Match": `"${this._changeVector}"`
35+
[HEADERS.IF_NONE_MATCH]: `"${this._changeVector}"`
3536
}
3637
}
3738
}
@@ -101,4 +102,4 @@ export class ConditionalGetDocumentsCommand extends RavenCommand<ConditionalGetR
101102
export interface ConditionalGetResult {
102103
results: any[];
103104
changeVector: string;
104-
}
105+
}

src/Documents/Commands/GetDocumentsCommand.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ export interface GetDocumentsByIdsCommandOptions
4343
includes?: string[];
4444
metadataOnly?: boolean;
4545
timeSeriesIncludes?: AbstractTimeSeriesRange[];
46+
revisionsIncludesByChangeVector?: string[];
47+
revisionIncludeByDateTimeBefore?: Date;
4648
compareExchangeValueIncludes?: string[];
4749
}
4850

@@ -61,6 +63,7 @@ export interface GetDocumentsResult {
6163
includes: IRavenObject;
6264
results: any[];
6365
counterIncludes: IRavenObject;
66+
revisionIncludes: any[];
6467
timeSeriesIncludes: IRavenObject;
6568
compareExchangeValueIncludes: IRavenObject;
6669
nextPageStart: number;
@@ -77,6 +80,8 @@ export class GetDocumentsCommand extends RavenCommand<GetDocumentsResult> {
7780
private _includeAllCounters: boolean;
7881

7982
private _timeSeriesIncludes: AbstractTimeSeriesRange[];
83+
private _revisionsIncludeByChangeVector: string[];
84+
private _revisionsIncludeByDateTime: Date;
8085
private _compareExchangeValueIncludes: string[];
8186

8287
private readonly _metadataOnly: boolean;
@@ -114,6 +119,8 @@ export class GetDocumentsCommand extends RavenCommand<GetDocumentsResult> {
114119
this._metadataOnly = opts.metadataOnly;
115120
this._timeSeriesIncludes = opts.timeSeriesIncludes;
116121
this._compareExchangeValueIncludes = opts.compareExchangeValueIncludes;
122+
this._revisionsIncludeByDateTime = opts.revisionIncludeByDateTimeBefore;
123+
this._revisionsIncludeByChangeVector = opts.revisionsIncludesByChangeVector;
117124
} else if (opts.hasOwnProperty("start") && opts.hasOwnProperty("pageSize")) {
118125
opts = opts as GetDocumentsStartingWithOptions;
119126
this._start = opts.start;
@@ -224,6 +231,16 @@ export class GetDocumentsCommand extends RavenCommand<GetDocumentsResult> {
224231
}
225232
}
226233

234+
if (this._revisionsIncludeByChangeVector) {
235+
for (const changeVector of this._revisionsIncludeByChangeVector) {
236+
query += "&revisions=" + this._urlEncode(changeVector);
237+
}
238+
}
239+
240+
if (this._revisionsIncludeByDateTime) {
241+
query += "&revisionsBefore=" + DateUtil.utc.stringify(this._revisionsIncludeByDateTime);
242+
}
243+
227244
if (this._compareExchangeValueIncludes) {
228245
for (const compareExchangeValue of this._compareExchangeValueIncludes) {
229246
query += "&cmpxchg=" + this._urlEncode(compareExchangeValue);

src/Documents/Commands/GetRevisionsCommand.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,18 @@ export class GetRevisionsCommand extends RavenCommand<IRavenArrayResult> {
5959
this._conventions = conventions;
6060
}
6161

62+
public get id(): string {
63+
return this._id;
64+
}
65+
66+
public get before(): Date {
67+
return this._before;
68+
}
69+
70+
public get changeVector(): string {
71+
return this._changeVector;
72+
}
73+
6274
public get changeVectors() {
6375
return this._changeVectors;
6476
}

src/Documents/Commands/HeadAttachmentCommand.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { ServerNode } from "../../Http/ServerNode";
77
import { StatusCodes } from "./../../Http/StatusCode";
88
import * as stream from "readable-stream";
99
import { getRequiredEtagHeader } from "../../Utility/HttpUtil";
10+
import { HEADERS } from "../../Constants";
1011

1112
export class HeadAttachmentCommand extends RavenCommand<string> {
1213

@@ -41,12 +42,14 @@ export class HeadAttachmentCommand extends RavenCommand<string> {
4142
+ "/attachments?id=" + encodeURIComponent(this._documentId)
4243
+ "&name=" + encodeURIComponent(this._name);
4344

44-
const req = {
45+
const req: HttpRequestParameters = {
4546
method: "HEAD",
4647
uri
4748
};
4849

49-
this._addChangeVectorIfNotNull(this._changeVector, req);
50+
if (this._changeVector) {
51+
req.headers[HEADERS.IF_NONE_MATCH] = `"${this._changeVector}"`;
52+
}
5053

5154
return req;
5255
}

src/Documents/Commands/HeadDocumentCommand.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { HttpCache } from "../../Http/HttpCache";
66
import { getRequiredEtagHeader } from "../../Utility/HttpUtil";
77
import { ServerNode } from "../../Http/ServerNode";
88
import * as stream from "readable-stream";
9+
import { HEADERS } from "../../Constants";
910

1011
export class HeadDocumentCommand extends RavenCommand<string> {
1112

@@ -34,7 +35,7 @@ export class HeadDocumentCommand extends RavenCommand<string> {
3435
const headers = this._headers()
3536
.typeAppJson();
3637
if (this._changeVector) {
37-
headers.with("If-None-Match", this._changeVector);
38+
headers.with(HEADERS.IF_NONE_MATCH, this._changeVector);
3839
}
3940

4041
return {

src/Documents/Commands/MultiGet/MultiGetCommand.ts

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ export class MultiGetCommand extends RavenCommand<GetResponse[]> implements IDis
171171
for (let i = 0; i < responses.length; i++) {
172172
const res = responses[i];
173173
const command = this._commands[i];
174-
this._maybeSetCache(res, command);
174+
this._maybeSetCache(res, command, i);
175175

176176
if (this._cached && res.statusCode === StatusCodes.NotModified) {
177177
const clonedResponse = new GetResponse();
@@ -191,8 +191,12 @@ export class MultiGetCommand extends RavenCommand<GetResponse[]> implements IDis
191191
}
192192
}
193193

194-
private _maybeSetCache(getResponse: GetResponse, command: GetRequest): void {
194+
private _maybeSetCache(getResponse: GetResponse, command: GetRequest, cachedIndex: number): void {
195195
if (getResponse.statusCode === StatusCodes.NotModified) {
196+
// if not modified - update age
197+
if (this._cached) {
198+
this._cached.values[cachedIndex][0].notModified();
199+
}
196200
return;
197201
}
198202

@@ -220,11 +224,24 @@ export class MultiGetCommand extends RavenCommand<GetResponse[]> implements IDis
220224
}
221225

222226
public closeCache() {
227+
//If _cached is not null - it means that the client approached with this multitask request to node and the request failed.
228+
//and now client tries to send it to another node.
223229
if (this._cached) {
224230
this._cached.dispose();
225-
}
226231

227-
this._cached = null;
232+
this._cached = null;
233+
234+
// The client sends the commands.
235+
// Some of which could be saved in cache with a response
236+
// that includes the change vector that received from the old fallen node.
237+
// The client can't use those responses because their URLs are different
238+
// (include the IP and port of the old node), because of that the client
239+
// needs to get those docs again from the new node.
240+
241+
for (let command of this._commands) {
242+
delete command.headers[HEADERS.IF_NONE_MATCH];
243+
}
244+
}
228245
}
229246
}
230247

@@ -245,4 +262,4 @@ class Cached implements IDisposable {
245262

246263
this.values = null;
247264
}
248-
}
265+
}

src/Documents/DocumentStore.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@ import { IDocumentSession } from "./Session/IDocumentSession";
1212
import { SessionOptions } from "./Session/SessionOptions";
1313
import { DocumentSession } from "./Session/DocumentSession";
1414
import { IAuthOptions } from "../Auth/AuthOptions";
15-
import { BulkInsertOperation } from "./BulkInsertOperation";
15+
import { BulkInsertOperation, BulkInsertOptions } from "./BulkInsertOperation";
1616
import { IDatabaseChanges } from "./Changes/IDatabaseChanges";
1717
import { DatabaseChanges } from "./Changes/DatabaseChanges";
1818
import { DatabaseSmuggler } from "./Smuggler/DatabaseSmuggler";
1919
import { DatabaseChangesOptions } from "./Changes/DatabaseChangesOptions";
2020
import { IDisposable } from "../Types/Contracts";
2121
import { MultiDatabaseHiLoIdGenerator } from "./Identity/MultiDatabaseHiLoIdGenerator";
22+
import { TypeUtil } from "../Utility/TypeUtil";
2223

2324
const log = getLogger({ module: "DocumentStore" });
2425

@@ -84,6 +85,10 @@ export class DocumentStore extends DocumentStoreBase {
8485
this._identifier = identifier;
8586
}
8687

88+
public get hiLoIdGenerator() {
89+
return this._multiDbHiLo;
90+
}
91+
8792
/**
8893
* Disposes the document store
8994
*/
@@ -420,9 +425,14 @@ export class DocumentStore extends DocumentStoreBase {
420425

421426
public bulkInsert(): BulkInsertOperation;
422427
public bulkInsert(database: string): BulkInsertOperation;
423-
public bulkInsert(database?: string): BulkInsertOperation {
428+
public bulkInsert(options: BulkInsertOptions): BulkInsertOperation;
429+
public bulkInsert(database: string, options: BulkInsertOptions): BulkInsertOperation;
430+
public bulkInsert(databaseOrOptions?: string | BulkInsertOptions, optionalOptions?: BulkInsertOptions): BulkInsertOperation {
424431
this.assertInitialized();
425432

426-
return new BulkInsertOperation(this.getEffectiveDatabase(database), this);
433+
const database = TypeUtil.isString(databaseOrOptions) ? this.getEffectiveDatabase(databaseOrOptions) : this.getEffectiveDatabase(null);
434+
const options: BulkInsertOptions = TypeUtil.isString(databaseOrOptions) ? optionalOptions : databaseOrOptions;
435+
436+
return new BulkInsertOperation(database, this, options);
427437
}
428438
}

src/Documents/DocumentStoreBase.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import { DocumentConventions } from "./Conventions/DocumentConventions";
2525
import { RequestExecutor } from "../Http/RequestExecutor";
2626
import { IndexCreation } from "../Documents/Indexes/IndexCreation";
2727
import { PutIndexesOperation } from "./Operations/Indexes/PutIndexesOperation";
28-
import { BulkInsertOperation } from "./BulkInsertOperation";
28+
import { BulkInsertOperation, BulkInsertOptions } from "./BulkInsertOperation";
2929
import { IDatabaseChanges } from "./Changes/IDatabaseChanges";
3030
import { DocumentSubscriptions } from "./Subscriptions/DocumentSubscriptions";
3131
import { DocumentStore } from "./DocumentStore";
@@ -37,6 +37,7 @@ import { IDisposable } from "../Types/Contracts";
3737
import { TimeSeriesOperations } from "./TimeSeries/TimeSeriesOperations";
3838
import { IAbstractIndexCreationTask } from "./Indexes/IAbstractIndexCreationTask";
3939
import { StringUtil } from "../Utility/StringUtil";
40+
import { IHiLoIdGenerator } from "./Identity/IHiLoIdGenerator";
4041

4142
export abstract class DocumentStoreBase
4243
extends EventEmitter
@@ -70,6 +71,8 @@ export abstract class DocumentStoreBase
7071

7172
public abstract identifier: string;
7273

74+
public abstract hiLoIdGenerator: IHiLoIdGenerator;
75+
7376
public abstract initialize(): IDocumentStore;
7477

7578
public abstract openSession(): IDocumentSession;
@@ -153,7 +156,10 @@ export abstract class DocumentStoreBase
153156

154157
private _authOptions: IAuthOptions;
155158

156-
public abstract bulkInsert(database?: string): BulkInsertOperation;
159+
public abstract bulkInsert(): BulkInsertOperation;
160+
public abstract bulkInsert(database: string): BulkInsertOperation;
161+
public abstract bulkInsert(database: string, options: BulkInsertOptions): BulkInsertOperation;
162+
public abstract bulkInsert(options: BulkInsertOptions): BulkInsertOperation;
157163

158164
private readonly _subscriptions: DocumentSubscriptions;
159165

0 commit comments

Comments
 (0)