@@ -30,6 +30,8 @@ const QUERY_FAILURE = 2;
3030const SHARD_CONFIG_STALE = 4 ;
3131const AWAIT_CAPABLE = 8 ;
3232
33+ const encodeUTF8Into = BSON . BSON . onDemand . ByteUtils . encodeUTF8Into ;
34+
3335/** @internal */
3436export type WriteProtocolMessageType = OpQueryRequest | OpMsgRequest ;
3537
@@ -411,6 +413,15 @@ export interface OpMsgOptions {
411413 readPreference : ReadPreference ;
412414}
413415
416+ /** @internal */
417+ export class DocumentSequence {
418+ documents : Document [ ] ;
419+
420+ constructor ( documents : Document [ ] ) {
421+ this . documents = documents ;
422+ }
423+ }
424+
414425/** @internal */
415426export class OpMsgRequest {
416427 requestId : number ;
@@ -480,7 +491,7 @@ export class OpMsgRequest {
480491
481492 let totalLength = header . length ;
482493 const command = this . command ;
483- totalLength += this . makeDocumentSegment ( buffers , command ) ;
494+ totalLength += this . makeSections ( buffers , command ) ;
484495
485496 header . writeInt32LE ( totalLength , 0 ) ; // messageLength
486497 header . writeInt32LE ( this . requestId , 4 ) ; // requestID
@@ -490,15 +501,65 @@ export class OpMsgRequest {
490501 return buffers ;
491502 }
492503
493- makeDocumentSegment ( buffers : Uint8Array [ ] , document : Document ) : number {
494- const payloadTypeBuffer = Buffer . alloc ( 1 ) ;
504+ /**
505+ * Add the sections to the OP_MSG request's buffers and returns the length.
506+ */
507+ makeSections ( buffers : Uint8Array [ ] , document : Document ) : number {
508+ const sequencesBuffer = this . extractDocumentSequences ( document ) ;
509+ const payloadTypeBuffer = Buffer . allocUnsafe ( 1 ) ;
495510 payloadTypeBuffer [ 0 ] = 0 ;
496511
497512 const documentBuffer = this . serializeBson ( document ) ;
513+ // First section, type 0
498514 buffers . push ( payloadTypeBuffer ) ;
499515 buffers . push ( documentBuffer ) ;
516+ // Subsequent sections, type 1
517+ buffers . push ( sequencesBuffer ) ;
500518
501- return payloadTypeBuffer . length + documentBuffer . length ;
519+ return payloadTypeBuffer . length + documentBuffer . length + sequencesBuffer . length ;
520+ }
521+
522+ /**
523+ * Extracts the document sequences from the command document and returns
524+ * a buffer to be added as multiple sections after the initial type 0
525+ * section in the message.
526+ */
527+ extractDocumentSequences ( document : Document ) : Uint8Array {
528+ // Pull out any field in the command document that's value is a document sequence.
529+ const chunks = [ ] ;
530+ for ( const [ key , value ] of Object . entries ( document ) ) {
531+ if ( value instanceof DocumentSequence ) {
532+ // Document sequences starts with type 1 at the first byte.
533+ const buffer = Buffer . allocUnsafe ( 1 + 4 + key . length ) ;
534+ buffer [ 0 ] = 1 ;
535+ // Third part is the field name at offset 5.
536+ encodeUTF8Into ( buffer , key , 5 ) ;
537+ chunks . push ( buffer ) ;
538+ // Fourth part are the documents' bytes.
539+ let docsLength = 0 ;
540+ for ( const doc of value . documents ) {
541+ const docBson = this . serializeBson ( doc ) ;
542+ docsLength += docBson . length ;
543+ chunks . push ( docBson ) ;
544+ }
545+ // Second part of the sequence is the length at offset 1;
546+ buffer . writeInt32LE ( key . length + docsLength , 1 ) ;
547+ // Why are we removing the field from the command? This is because it needs to be
548+ // removed in the OP_MSG request first section, and DocumentSequence is not a
549+ // BSON type and is specific to the MongoDB wire protocol so there's nothing
550+ // our BSON serializer can do about this. Since DocumentSequence is not exposed
551+ // in the public API and only used internally, we are never mutating an original
552+ // command provided by the user, just our own, and it's cheaper to delete from
553+ // our own command than copying it.
554+ delete document [ key ] ;
555+ }
556+ }
557+ if ( chunks . length > 0 ) {
558+ return Buffer . concat ( chunks ) ;
559+ }
560+ // If we have no document sequences we return an empty buffer for nothing to add
561+ // to the payload.
562+ return Buffer . alloc ( 0 ) ;
502563 }
503564
504565 serializeBson ( document : Document ) : Uint8Array {
0 commit comments