|
| 1 | +============== |
| 2 | +Change Streams |
| 3 | +============== |
| 4 | + |
| 5 | +.. default-domain:: mongodb |
| 6 | + |
| 7 | +.. contents:: On this page |
| 8 | + :local: |
| 9 | + :backlinks: none |
| 10 | + :depth: 1 |
| 11 | + :class: singlecol |
| 12 | + |
| 13 | +As of version 3.6 of the MongoDB server, a new ``$changeStream`` pipeline stage is supported in the aggregation |
| 14 | +framework. The Ruby driver provides an API for receiving notifications for changes to a particular collection using this |
| 15 | +new pipeline stage. Although you can create a change stream using the pipeline operator and aggregation framework |
| 16 | +directly, it is recommended to use the driver API described below as the driver resumes the change stream if there is |
| 17 | +timeout or network error. |
| 18 | + |
| 19 | +Change streams on the server requires a ``"majority"`` read concern or no read concern. |
| 20 | + |
| 21 | +Change streams do not work properly with JRuby because of the issue documented here_. |
| 22 | + |
| 23 | +.. _here: https://github.com/jruby/jruby/issues/4212 |
| 24 | + |
| 25 | +Namely, JRuby eagerly evaluates ```#next`` on an Enumerator in a background green thread. |
| 26 | +So calling ```#next`` on the change stream will cause getmores to be called in a loop in the background. |
| 27 | + |
| 28 | +Watching for changes on a particular collection |
| 29 | +----------------------------------------------- |
| 30 | + |
| 31 | +A change stream is created by calling the ``#watch`` method on a collection: |
| 32 | + |
| 33 | +.. code-block:: ruby |
| 34 | + |
| 35 | + stream = collection.watch |
| 36 | + collection.insert_one(a: 1) |
| 37 | + doc = stream.to_enum.next |
| 38 | + process(doc) |
| 39 | + |
| 40 | + |
| 41 | +You can also receive the notifications as they are available: |
| 42 | + |
| 43 | +.. code-block:: ruby |
| 44 | + |
| 45 | + stream = client[:test].watch |
| 46 | + enum = stream.to_enum |
| 47 | + while doc = enum.next |
| 48 | + process(doc) |
| 49 | + end |
| 50 | + |
| 51 | + |
| 52 | +The change stream can take filters in the aggregation framework pipeline operator format: |
| 53 | + |
| 54 | +.. code-block:: ruby |
| 55 | + |
| 56 | + stream = collection.watch([{'$match' => { 'operationType' => {'$in' => ['insert', 'replace'] } } }, |
| 57 | + {'$match' => { 'fullDocument.n' => { '$gte' => 1 } } } |
| 58 | + ]) |
| 59 | + enum = stream.to_enum |
| 60 | + while doc = enum.next |
| 61 | + process(doc) |
| 62 | + end |
| 63 | + |
| 64 | +Close a Change Stream |
| 65 | +--------------------- |
| 66 | + |
| 67 | +You can close a Change Stream by calling the ``#close`` method: |
| 68 | + |
| 69 | +.. code-block:: ruby |
| 70 | + |
| 71 | + stream = collection.watch |
| 72 | + collection.insert_one(a: 1) |
| 73 | + doc = stream.to_enum.next |
| 74 | + process(doc) |
| 75 | + stream.close |
0 commit comments