Skip to content

Commit c95a159

Browse files
authored
Merge pull request #331 from share/readSnapshots-specific-errors
Add ability for "readSnapshots" middleware to reject reads of specific snapshots for bulk fetch/subscribe
2 parents ca35a5c + 87041ae commit c95a159

File tree

9 files changed

+510
-39
lines changed

9 files changed

+510
-39
lines changed

README.md

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ Register a new middleware.
172172
* `stream`: The duplex Stream provided to `share.listen` (for 'connect')
173173
* `query`: The query object being handled (for 'query')
174174
* `snapshots`: Array of retrieved snapshots (for 'readSnapshots')
175+
* `rejectSnapshotRead(snapshot, error)`: Reject a specific snapshot read (for 'readSnapshots')
176+
- `rejectSnapshotReadSilent(snapshot, errorMessage)`: As above, but causes the ShareDB client to treat it as a silent rejection, not passing the error back to user code.
175177
* `data`: Received client message (for 'receive')
176178
* `request`: Client message being replied to (for 'reply')
177179
* `reply`: Reply to be sent to the client (for 'reply')
@@ -601,21 +603,28 @@ An `Agent` is the representation of a client's `Connection` state on the server.
601603
The `Agent` will be made available in all [middleware](#middlewares) requests. The `agent.custom` field is an object that can be used for storing arbitrary information for use in middleware. For example:
602604

603605
```javascript
604-
backend.useMiddleware('connect', function (request, callback) {
606+
backend.useMiddleware('connect', (request, callback) => {
605607
// Best practice to clone to prevent mutating the object after connection.
606608
// You may also want to consider a deep clone, depending on the shape of request.req.
607609
Object.assign(request.agent.custom, request.req);
608610
callback();
609611
});
610612

611-
backend.useMiddleware('readSnapshots', function (request, callback) {
612-
var connectionInfo = request.agent.custom;
613-
var snapshots = request.snapshots;
613+
backend.useMiddleware('readSnapshots', (request, callback) => {
614+
const connectionInfo = request.agent.custom;
615+
const snapshots = request.snapshots;
614616

615-
// Use the information provided at connection to determine if a user can access snapshots.
617+
// Use the information provided at connection to determine if a user can access the snapshots.
616618
// This should also be checked when fetching and submitting ops.
617-
if (!userCanAccessSnapshots(connectionInfo, snapshots)) {
618-
return callback(new Error('Authentication error'));
619+
if (!userCanAccessCollection(connectionInfo, request.collection)) {
620+
return callback(new Error('Not allowed to access collection ' + request.collection));
621+
}
622+
// Check each snapshot individually.
623+
for (const snapshot of snapshots) {
624+
if (!userCanAccessSnapshot(connectionInfo, request.collection, snapshot)) {
625+
request.rejectSnapshotRead(snapshot,
626+
new Error('Not allowed to access snapshot in ' request.collection));
627+
}
619628
}
620629

621630
callback();
@@ -625,10 +634,10 @@ backend.useMiddleware('readSnapshots', function (request, callback) {
625634
// potentially making some database request to check which documents they can access, or which
626635
// roles they have, etc. If doing this asynchronously, make sure you call backend.connect
627636
// after the permissions have been fetched.
628-
var connectionInfo = getUserPermissions();
637+
const connectionInfo = getUserPermissions();
629638
// Pass info in as the second argument. This will be made available as request.req in the
630639
// 'connection' middleware.
631-
var connection = backend.connect(null, connectionInfo);
640+
const connection = backend.connect(null, connectionInfo);
632641
```
633642

634643
### Logging

lib/agent.js

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ Agent.prototype._querySubscribe = function(queryId, collection, query, options,
424424
wait++;
425425
this.backend.fetchBulk(this, collection, options.fetch, function(err, snapshotMap) {
426426
if (err) return finish(err);
427-
message = {data: getMapData(snapshotMap)};
427+
message = getMapResult(snapshotMap);
428428
finish();
429429
});
430430
}
@@ -463,12 +463,20 @@ function getResultsData(results) {
463463
return items;
464464
}
465465

466-
function getMapData(snapshotMap) {
466+
function getMapResult(snapshotMap) {
467467
var data = {};
468468
for (var id in snapshotMap) {
469-
data[id] = getSnapshotData(snapshotMap[id]);
469+
var mapValue = snapshotMap[id];
470+
// fetchBulk / subscribeBulk map data can have either a Snapshot or an object
471+
// `{error: Error | string}` as a value.
472+
if (mapValue.error) {
473+
// Transform errors to serialization-friendly objects.
474+
data[id] = {error: getReplyErrorObject(mapValue.error)};
475+
} else {
476+
data[id] = getSnapshotData(mapValue);
477+
}
470478
}
471-
return data;
479+
return {data: data};
472480
}
473481

474482
function getSnapshotData(snapshot) {
@@ -517,8 +525,15 @@ Agent.prototype._fetchOps = function(collection, id, version, callback) {
517525
Agent.prototype._fetchBulk = function(collection, versions, callback) {
518526
if (Array.isArray(versions)) {
519527
this.backend.fetchBulk(this, collection, versions, function(err, snapshotMap) {
520-
if (err) return callback(err);
521-
callback(null, {data: getMapData(snapshotMap)});
528+
if (err) {
529+
return callback(err);
530+
}
531+
if (snapshotMap) {
532+
var result = getMapResult(snapshotMap);
533+
callback(null, result);
534+
} else {
535+
callback();
536+
}
522537
});
523538
} else {
524539
this._fetchBulkOps(collection, versions, callback);
@@ -565,15 +580,18 @@ Agent.prototype._subscribeBulk = function(collection, versions, callback) {
565580
// See _subscribe() above. This function's logic should match but in bulk
566581
var agent = this;
567582
this.backend.subscribeBulk(this, collection, versions, function(err, streams, snapshotMap, opsMap) {
568-
if (err) return callback(err);
583+
if (err) {
584+
return callback(err);
585+
}
569586
if (opsMap) {
570587
agent._sendOpsBulk(collection, opsMap);
571588
}
572589
for (var id in streams) {
573590
agent._subscribeToStream(collection, id, streams[id]);
574591
}
575592
if (snapshotMap) {
576-
callback(null, {data: getMapData(snapshotMap)});
593+
var result = getMapResult(snapshotMap);
594+
callback(null, result);
577595
} else {
578596
callback();
579597
}

lib/backend.js

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ var ShareDBError = require('./error');
1212
var Snapshot = require('./snapshot');
1313
var StreamSocket = require('./stream-socket');
1414
var SubmitRequest = require('./submit-request');
15+
var ReadSnapshotsRequest = require('./read-snapshots-request');
1516

1617
var ERROR_CODE = ShareDBError.CODES;
1718

@@ -252,13 +253,21 @@ Backend.prototype._sanitizeSnapshots = function(agent, projection, collection, s
252253
}
253254
}
254255

255-
var request = {
256-
collection: collection,
257-
snapshots: snapshots,
258-
snapshotType: snapshotType
259-
};
256+
var request = new ReadSnapshotsRequest(collection, snapshots, snapshotType);
260257

261-
this.trigger(this.MIDDLEWARE_ACTIONS.readSnapshots, agent, request, callback);
258+
this.trigger(this.MIDDLEWARE_ACTIONS.readSnapshots, agent, request, function(err) {
259+
if (err) return callback(err);
260+
// Handle "partial rejection" - "readSnapshots" middleware functions can use
261+
// `request.rejectSnapshotRead(snapshot, error)` to reject the read of a specific snapshot.
262+
if (request.hasSnapshotRejection()) {
263+
err = request.getReadSnapshotsError();
264+
}
265+
if (err) {
266+
callback(err);
267+
} else {
268+
callback();
269+
}
270+
});
262271
};
263272

264273
Backend.prototype._getSnapshotProjection = function(db, projection) {
@@ -382,6 +391,18 @@ Backend.prototype.fetch = function(agent, index, id, options, callback) {
382391
});
383392
};
384393

394+
/**
395+
* Map of document id to Snapshot or error object.
396+
* @typedef {{ [id: string]: Snapshot | { error: Error | string } }} SnapshotMap
397+
*/
398+
399+
/**
400+
* @param {Agent} agent
401+
* @param {string} index
402+
* @param {string[]} ids
403+
* @param {*} options
404+
* @param {(err?: Error | string, snapshotMap?: SnapshotMap) => void} callback
405+
*/
385406
Backend.prototype.fetchBulk = function(agent, index, ids, options, callback) {
386407
if (typeof options === 'function') {
387408
callback = options;
@@ -410,9 +431,18 @@ Backend.prototype.fetchBulk = function(agent, index, ids, options, callback) {
410431
snapshots,
411432
backend.SNAPSHOT_TYPES.current,
412433
function(err) {
413-
if (err) return callback(err);
434+
if (err) {
435+
if (err.code === ERROR_CODE.ERR_SNAPSHOT_READS_REJECTED) {
436+
for (var docId in err.idToError) {
437+
snapshotMap[docId] = {error: err.idToError[docId]};
438+
}
439+
err = undefined;
440+
} else {
441+
snapshotMap = undefined;
442+
}
443+
}
414444
backend.emit('timing', 'fetchBulk', Date.now() - start, request);
415-
callback(null, snapshotMap);
445+
callback(err, snapshotMap);
416446
});
417447
});
418448
};
@@ -471,6 +501,26 @@ Backend.prototype.subscribe = function(agent, index, id, version, options, callb
471501
});
472502
};
473503

504+
/**
505+
* Map of document id to pubsub stream.
506+
* @typedef {{ [id: string]: Stream }} StreamMap
507+
*/
508+
/**
509+
* Map of document id to array of ops for the doc.
510+
* @typedef {{ [id: string]: Op[] }} OpsMap
511+
*/
512+
513+
/**
514+
* @param {Agent} agent
515+
* @param {string} index
516+
* @param {string[]} versions
517+
* @param {(
518+
* err?: Error | string | null,
519+
* streams?: StreamMap,
520+
* snapshotMap?: SnapshotMap | null
521+
* opsMap?: OpsMap
522+
* ) => void} callback
523+
*/
474524
Backend.prototype.subscribeBulk = function(agent, index, versions, callback) {
475525
var start = Date.now();
476526
var projection = this.projections[index];
@@ -501,11 +551,21 @@ Backend.prototype.subscribeBulk = function(agent, index, versions, callback) {
501551
// If an array of ids, get current snapshots
502552
backend.fetchBulk(agent, index, ids, function(err, snapshotMap) {
503553
if (err) {
554+
// Full error, destroy all streams.
504555
destroyStreams(streams);
505-
return callback(err);
556+
streams = undefined;
557+
snapshotMap = undefined;
558+
}
559+
for (var docId in snapshotMap) {
560+
// The doc id could map to an object `{error: Error | string}`, which indicates that
561+
// particular snapshot's read was rejected. Destroy the streams fur such docs.
562+
if (snapshotMap[docId].error) {
563+
streams[docId].destroy();
564+
delete streams[docId];
565+
}
506566
}
507567
backend.emit('timing', 'subscribeBulk.snapshot', Date.now() - start, request);
508-
callback(null, streams, snapshotMap);
568+
callback(err, streams, snapshotMap);
509569
});
510570
} else {
511571
// If a versions map, get ops since requested versions

lib/client/connection.js

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,7 @@ Connection.prototype.bindToSocket = function(socket) {
175175
Connection.prototype.handleMessage = function(message) {
176176
var err = null;
177177
if (message.error) {
178-
// wrap in Error object so can be passed through event emitters
179-
err = new Error(message.error.message);
180-
err.code = message.error.code;
181-
// Add the message data to the error object for more context
182-
err.data = message;
178+
err = wrapErrorData(message.error, message);
183179
delete message.error;
184180
}
185181
// Switch on the message action. Most messages are for documents and are
@@ -233,11 +229,11 @@ Connection.prototype.handleMessage = function(message) {
233229
return;
234230

235231
case 'bf':
236-
return this._handleBulkMessage(message, '_handleFetch');
232+
return this._handleBulkMessage(err, message, '_handleFetch');
237233
case 'bs':
238-
return this._handleBulkMessage(message, '_handleSubscribe');
234+
return this._handleBulkMessage(err, message, '_handleSubscribe');
239235
case 'bu':
240-
return this._handleBulkMessage(message, '_handleUnsubscribe');
236+
return this._handleBulkMessage(err, message, '_handleUnsubscribe');
241237

242238
case 'nf':
243239
case 'nt':
@@ -265,22 +261,43 @@ Connection.prototype.handleMessage = function(message) {
265261
}
266262
};
267263

268-
Connection.prototype._handleBulkMessage = function(message, method) {
264+
function wrapErrorData(errorData, fullMessage) {
265+
// wrap in Error object so can be passed through event emitters
266+
var err = new Error(errorData.message);
267+
err.code = errorData.code;
268+
if (fullMessage) {
269+
// Add the message data to the error object for more context
270+
err.data = fullMessage;
271+
}
272+
return err;
273+
}
274+
275+
Connection.prototype._handleBulkMessage = function(err, message, method) {
269276
if (message.data) {
270277
for (var id in message.data) {
278+
var dataForId = message.data[id];
271279
var doc = this.getExisting(message.c, id);
272-
if (doc) doc[method](message.error, message.data[id]);
280+
if (doc) {
281+
if (err) {
282+
doc[method](err);
283+
} else if (dataForId.error) {
284+
// Bulk reply snapshot-specific errorr - see agent.js getMapResult
285+
doc[method](wrapErrorData(dataForId.error));
286+
} else {
287+
doc[method](null, dataForId);
288+
}
289+
}
273290
}
274291
} else if (Array.isArray(message.b)) {
275292
for (var i = 0; i < message.b.length; i++) {
276293
var id = message.b[i];
277294
var doc = this.getExisting(message.c, id);
278-
if (doc) doc[method](message.error);
295+
if (doc) doc[method](err);
279296
}
280297
} else if (message.b) {
281298
for (var id in message.b) {
282299
var doc = this.getExisting(message.c, id);
283-
if (doc) doc[method](message.error);
300+
if (doc) doc[method](err);
284301
}
285302
} else {
286303
logger.error('Invalid bulk message', message);

lib/client/doc.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,14 @@ Doc.prototype._emitNothingPending = function() {
246246
// **** Helpers for network messages
247247

248248
Doc.prototype._emitResponseError = function(err, callback) {
249+
if (err && err.code === ERROR_CODE.ERR_SNAPSHOT_READ_SILENT_REJECTION) {
250+
this.wantSubscribe = false;
251+
if (callback) {
252+
callback();
253+
}
254+
this._emitNothingPending();
255+
return;
256+
}
249257
if (callback) {
250258
callback(err);
251259
this._emitNothingPending();

lib/error.js

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,25 @@ ShareDBError.CODES = {
4141
ERR_OT_OP_NOT_PROVIDED: 'ERR_OT_OP_NOT_PROVIDED',
4242
ERR_PROTOCOL_VERSION_NOT_SUPPORTED: 'ERR_PROTOCOL_VERSION_NOT_SUPPORTED',
4343
ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED: 'ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED',
44+
/**
45+
* A special error that a "readSnapshots" middleware implementation can use to indicate that it
46+
* wishes for the ShareDB client to treat it as a silent rejection, not passing the error back to
47+
* user code.
48+
*
49+
* For subscribes, the ShareDB client will still cancel the document subscription.
50+
*/
51+
ERR_SNAPSHOT_READ_SILENT_REJECTION: 'ERR_SNAPSHOT_READ_SILENT_REJECTION',
52+
/**
53+
* A "readSnapshots" middleware rejected the reads of specific snapshots.
54+
*
55+
* This error code is mostly for server use and generally will not be encountered on the client.
56+
* Instead, each specific doc that encountered an error will receive its specific error.
57+
*
58+
* The one exception is for queries, where a "readSnapshots" rejection of specific snapshots will
59+
* cause the client to receive this error for the whole query, since queries don't support
60+
* doc-specific errors.
61+
*/
62+
ERR_SNAPSHOT_READS_REJECTED: 'ERR_SNAPSHOT_READS_REJECTED',
4463
ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND: 'ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND',
4564
ERR_TYPE_CANNOT_BE_PROJECTED: 'ERR_TYPE_CANNOT_BE_PROJECTED',
4665
ERR_UNKNOWN_ERROR: 'ERR_UNKNOWN_ERROR'

0 commit comments

Comments
 (0)