Skip to content

Commit d4c09fa

Browse files
authored
Merge pull request #157 from gregolsky/subscriptions
Subscriptions & bug fixes
2 parents 800c915 + 2046f68 commit d4c09fa

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+3689
-458
lines changed

package-lock.json

Lines changed: 258 additions & 274 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,10 @@
6565
"typescript": "^2.9.2"
6666
},
6767
"dependencies": {
68-
"@types/change-case": "^2.3.1",
6968
"@types/pluralize": "0.0.27",
7069
"@types/qs": "^6.5.1",
7170
"@types/request": "^2.0.3",
7271
"@types/semaphore": "^1.1.0",
73-
"@types/through2": "^2.0.33",
7472
"@types/uuid": "^2.0.29",
7573
"@types/verror": "^1.10.3",
7674
"bluebird": "^3.4.7",
@@ -79,7 +77,7 @@
7977
"moment": "^2.22.0",
8078
"pluralize": "^4.0.0",
8179
"qs": "^6.5.2",
82-
"readable-stream": "^3.0.3",
80+
"readable-stream": "^3.0.6",
8381
"request": "^2.88.0",
8482
"safe-memory-cache": "^1.5.1",
8583
"semaphore": "^1.1.0",

src/Auth/Certificate.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,11 @@ export abstract class Certificate implements ICertificate {
6262
}
6363

6464
public toWebSocketOptions(): WebSocket.ClientOptions {
65-
return {
66-
passphrase: this._passphrase
67-
};
65+
if (this._passphrase) {
66+
return { passphrase: this._passphrase };
67+
}
68+
69+
return {};
6870
}
6971
}
7072

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { DocumentConventions, ServerNode } from "../..";
2+
import { RavenCommand } from "../../Http/RavenCommand";
3+
import { CreateSubscriptionResult } from "../Subscriptions/CreateSubscriptionResult";
4+
import { SubscriptionCreationOptions } from "../Subscriptions/SubscriptionCreationOptions";
5+
import { HttpRequestParameters } from "../../Primitives/Http";
6+
import * as stream from "readable-stream";
7+
8+
export class CreateSubscriptionCommand extends RavenCommand<CreateSubscriptionResult> {
9+
private readonly _conventions: DocumentConventions;
10+
private readonly _options: SubscriptionCreationOptions;
11+
private readonly _id: string;
12+
13+
public constructor(conventions: DocumentConventions, options: SubscriptionCreationOptions, id?: string) {
14+
super();
15+
this._conventions = conventions;
16+
this._options = options;
17+
this._id = id;
18+
}
19+
20+
public createRequest(node: ServerNode): HttpRequestParameters {
21+
let uri = node.url + "/databases/" + node.database + "/subscriptions";
22+
23+
if (this._id) {
24+
uri += "?id=" + this._id;
25+
}
26+
27+
const body = this._serializer.serialize(this._options);
28+
29+
return {
30+
uri,
31+
method: "PUT",
32+
body
33+
};
34+
}
35+
36+
public async setResponseAsync(bodyStream: stream.Stream, fromCache: boolean): Promise<string> {
37+
return this._parseResponseDefaultAsync(bodyStream);
38+
}
39+
40+
public get isReadRequest() {
41+
return false;
42+
}
43+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { ServerNode } from "../..";
2+
import { RavenCommand } from "../../Http/RavenCommand";
3+
import { HttpRequestParameters } from "../../Primitives/Http";
4+
5+
export class DeleteSubscriptionCommand extends RavenCommand<void> {
6+
private readonly _name: string;
7+
8+
public constructor(name: string) {
9+
super();
10+
this._name = name;
11+
}
12+
13+
public createRequest(node: ServerNode): HttpRequestParameters {
14+
const uri = node.url + "/databases/" + node.database + "/subscriptions?taskName=" + this._name;
15+
return {
16+
uri,
17+
method: "DELETE"
18+
};
19+
}
20+
21+
public get isReadRequest() {
22+
return false;
23+
}
24+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { ServerNode } from "../..";
2+
import { RavenCommand } from "../../Http/RavenCommand";
3+
import { HttpRequestParameters } from "../../Primitives/Http";
4+
5+
export class DropSubscriptionConnectionCommand extends RavenCommand<void> {
6+
7+
private readonly _name: string;
8+
9+
public constructor(name: string) {
10+
super();
11+
this._name = name;
12+
}
13+
14+
public createRequest(node: ServerNode): HttpRequestParameters {
15+
const uri = node.url + "/databases/" + node.database + "/subscriptions/drop?name=" + this._name;
16+
17+
return {
18+
method: "POST",
19+
uri
20+
};
21+
}
22+
23+
public get isReadRequest() {
24+
return false;
25+
}
26+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { ServerNode } from "../..";
2+
import { RavenCommand } from "../../Http/RavenCommand";
3+
import { SubscriptionState } from "../Subscriptions/SubscriptionState";
4+
import { HttpRequestParameters } from "../../Primitives/Http";
5+
import * as stream from "readable-stream";
6+
7+
export class GetSubscriptionStateCommand extends RavenCommand<SubscriptionState> {
8+
9+
private readonly _subscriptionName: string;
10+
11+
public constructor(subscriptionName: string) {
12+
super();
13+
this._subscriptionName = subscriptionName;
14+
}
15+
16+
public createRequest(node: ServerNode): HttpRequestParameters {
17+
const uri = node.url + "/databases/" + node.database + "/subscriptions/state?name=" + this._subscriptionName;
18+
19+
return {
20+
uri
21+
};
22+
}
23+
24+
public async setResponseAsync(bodyStream: stream.Stream, fromCache: boolean): Promise<string> {
25+
return this._parseResponseDefaultAsync(bodyStream);
26+
}
27+
28+
public get isReadRequest() {
29+
return true;
30+
}
31+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { ServerNode } from "../..";
2+
import { RavenCommand } from "../../Http/RavenCommand";
3+
import { SubscriptionState } from "../Subscriptions/SubscriptionState";
4+
import { HttpRequestParameters } from "../../Primitives/Http";
5+
import * as stream from "readable-stream";
6+
7+
export class GetSubscriptionsCommand extends RavenCommand<SubscriptionState[]> {
8+
9+
private readonly _start: number;
10+
private readonly _pageSize: number;
11+
12+
public constructor(start: number, pageSize: number) {
13+
super();
14+
15+
this._start = start;
16+
this._pageSize = pageSize;
17+
}
18+
19+
public createRequest(node: ServerNode): HttpRequestParameters {
20+
const uri = node.url + "/databases/" + node.database
21+
+ "/subscriptions?start=" + this._start + "&pageSize=" + this._pageSize;
22+
23+
return {
24+
uri
25+
};
26+
}
27+
28+
public async setResponseAsync(bodyStream: stream.Stream, fromCache: boolean): Promise<string> {
29+
if (!bodyStream) {
30+
this.result = null;
31+
return;
32+
}
33+
34+
let body: string = null;
35+
await this._defaultPipeline(_ => body = _)
36+
.process(bodyStream)
37+
.then(data => {
38+
const results = data["results"] as SubscriptionState[];
39+
if (!results) {
40+
this._throwInvalidResponse();
41+
return;
42+
}
43+
44+
this.result = results;
45+
});
46+
47+
return body;
48+
}
49+
50+
public get isReadRequest() {
51+
return true;
52+
}
53+
}

src/Documents/DocumentStore.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ export class DocumentStore extends DocumentStoreBase {
110110
})
111111
.then(() => {
112112
this._disposed = true;
113-
// TBD: Subscriptions?.Dispose();
113+
this.subscriptions.dispose();
114114

115115
return new BluebirdPromise((resolve, reject) => {
116116
let listenersExecCallbacksCount = 0;

src/Documents/DocumentStoreBase.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import { IndexCreation } from "../Documents/Indexes/IndexCreation";
2222
import { PutIndexesOperation } from "./Operations/Indexes/PutIndexesOperation";
2323
import { BulkInsertOperation } from "./BulkInsertOperation";
2424
import { IDatabaseChanges } from "./Changes/IDatabaseChanges";
25+
import { DocumentSubscriptions } from "./Subscriptions/DocumentSubscriptions";
26+
import { DocumentStore } from "./DocumentStore";
2527

2628
export abstract class DocumentStoreBase
2729
extends EventEmitter
@@ -34,7 +36,7 @@ export abstract class DocumentStoreBase
3436

3537
protected constructor() {
3638
super();
37-
// TBD: Subscriptions = new DocumentSubscriptions(this);
39+
this._subscriptions = new DocumentSubscriptions(this as any as DocumentStore);
3840
}
3941

4042
public abstract dispose(): void;
@@ -140,7 +142,11 @@ export abstract class DocumentStoreBase
140142

141143
public abstract bulkInsert(database?: string): BulkInsertOperation;
142144

143-
// TBD: public IReliableSubscriptions Subscriptions { get; }
145+
private readonly _subscriptions: DocumentSubscriptions;
146+
147+
public get subscriptions(): DocumentSubscriptions {
148+
return this._subscriptions;
149+
}
144150

145151
protected _ensureNotDisposed(): void {
146152
if (this._disposed) {

0 commit comments

Comments
 (0)