@@ -37,14 +37,8 @@ import { isRetryableError } from '@ydbjs/retry'
3737import { assign , enqueueActions , sendTo , setup } from 'xstate'
3838import { defaultCodecMap } from '../codec.js'
3939import { WriterStream , type WriterStreamReceiveEvent } from './stream.js'
40- import type {
41- SeqNoShiftEvent ,
42- TopicWriterOptions ,
43- WriterContext ,
44- WriterEmitted ,
45- WriterEvents ,
46- WriterInput ,
47- } from './types.js'
40+ import { SeqNoShiftBuilder } from './seqno-shift-builder.js'
41+ import type { TopicWriterOptions , WriterContext , WriterEmitted , WriterEvents , WriterInput } from './types.js'
4842import { loggers } from '@ydbjs/debug'
4943
5044// ============================================================================
@@ -267,15 +261,15 @@ let writerMachineFactory = setup({
267261
268262 /**
269263 * Updates the writer context after receiving STREAM_WRITE_SESSION_INIT.
270- * Common steps for both seqNo modes:
271- * - Determine the new `inflightStart` using `serverLastSeqNo`
272- * - Leave all unsent messages in place while keeping their original order
273264 *
274- * Mode specific logic:
275- * - manual: perform a single pass over `[inflight, buffer)`; drop messages with `seqNo <= serverLastSeqNo`
276- * (already persisted on the server), compact the window, and update counters without changing seqNo values.
277- * - auto: after the same pass, renumber every message whose seqNo may shift and emit `SeqNoShiftEvent`
278- * segments so `TopicWriter.resolveSeqNo()` can map initial numbers to the final ones.
265+ * Common groundwork for both seqNo modes:
266+ * - Walk the `[inflight, buffer)` window once while keeping message order
267+ * - Trim acked messages (seqNo <= `lastSeqNo`) and update sliding-window pointers
268+ *
269+ * Mode-specific behaviour:
270+ * - Manual seqNo: compact the window, update bookkeeping, keep user-provided seqNo as-is
271+ * - Auto seqNo: compact the window, renumber remaining messages, emit `SeqNoShift` segments so
272+ * `TopicWriter.resolveSeqNo()` can translate temporary numbers into the final ones
279273 *
280274 * @param enqueue - XState enqueue helper for scheduling actions
281275 * @param event - init response with session metadata
@@ -288,57 +282,50 @@ let writerMachineFactory = setup({
288282 let lastSeqNo = event . data . lastSeqNo || 0n
289283 let nextSeqNo = lastSeqNo + 1n
290284
291- // --------------------------------------------------------------------
292- // 1. Подсчитываем подтверждённые сообщения и новое начало окна inflight
293- // Это позволяет дальше просто сдвигать указатели без пересоздания массивов
294- // --------------------------------------------------------------------
285+ // Count acknowledged messages and identify the new inflight window start so we can slide pointers in place.
295286 let inflightStartIndex = context . inflightStart
296287 let inflightEndIndex = context . inflightStart + context . inflightLength
297288 let bufferEndIndex = context . bufferStart + context . bufferLength
298289
290+ // Manual seqNo mode: drop acked entries and slide the window, seqNo stay untouched.
299291 if ( context . seqNoMode === 'manual' ) {
300- let writeIndex = inflightStartIndex
301292 let acknowledgedSize = 0n
302- let pendingCount = 0
303293 let pendingSize = 0n
304- let bufferKeptCount = 0
305- let skippedSize = 0n
306294 let bufferSize = context . bufferSize
295+ let firstKeptIndex : number | null = null
307296
297+ // Single pass over [inflight, buffer): skip acknowledged items and record the first live message.
308298 for ( let i = inflightStartIndex ; i < bufferEndIndex ; i ++ ) {
309299 let message = context . messages [ i ]
310300 if ( ! message ) continue
311301
312302 let messageSize = BigInt ( message . data . length )
313303
314- if ( i < inflightEndIndex ) {
315- if ( message . seqNo <= lastSeqNo ) {
316- acknowledgedSize += messageSize
317- continue
318- }
319-
320- pendingCount ++
321- pendingSize += messageSize
322- } else {
323- if ( message . seqNo <= lastSeqNo ) {
324- skippedSize += messageSize
304+ // Messages already acknowledged by the server can be dropped from the sliding window.
305+ if ( message . seqNo <= lastSeqNo ) {
306+ acknowledgedSize += messageSize
307+ if ( i >= inflightEndIndex ) {
308+ // They came from buffer, so shrink buffer accounting as well.
325309 bufferSize -= messageSize
326- continue
327310 }
311+ continue
312+ }
328313
329- bufferKeptCount ++
314+ // Remember the first index that still contains a message we need to keep.
315+ if ( firstKeptIndex === null ) {
316+ firstKeptIndex = i
330317 }
331318
332- if ( writeIndex !== i ) {
333- context . messages [ writeIndex ] = message
319+ if ( i < inflightEndIndex ) {
320+ // Anything left in inflight becomes pending work that must be resent.
321+ pendingSize += messageSize
334322 }
335- writeIndex ++
336323 }
337324
338- let newBufferStart = inflightStartIndex
339- let bufferLength = pendingCount + bufferKeptCount
325+ let newBufferStart = firstKeptIndex ?? bufferEndIndex
326+ let bufferLength = bufferEndIndex - newBufferStart
340327 let inflightSize = context . inflightSize - ( acknowledgedSize + pendingSize )
341- let garbageSize = context . garbageSize + acknowledgedSize + skippedSize
328+ let garbageSize = context . garbageSize + acknowledgedSize
342329 let newBufferSize = bufferSize + pendingSize
343330
344331 enqueue . assign ( {
@@ -362,11 +349,14 @@ let writerMachineFactory = setup({
362349 return
363350 }
364351
352+ // Auto seqNo mode: compact window then reassign seqNo for the remaining messages.
365353 let firstPendingIndex = inflightEndIndex
366354 let acknowledgedSize = 0n
367355 let pendingCount = 0
368356 let pendingSize = 0n
369357
358+ // Scan inflight messages to find the first one that still needs server confirmation and to measure how much
359+ // data must move back into the buffer before we renumber everything.
370360 for ( let i = inflightStartIndex ; i < inflightEndIndex ; i ++ ) {
371361 let message = context . messages [ i ]
372362 if ( ! message ) continue
@@ -385,24 +375,9 @@ let writerMachineFactory = setup({
385375
386376 let newBufferStart = firstPendingIndex
387377
388- let seqNoShifts : SeqNoShiftEvent [ ] = [ ]
389- let currentShiftStart : bigint | null = null
390- let currentShiftDelta : bigint | null = null
391- let currentShiftCount = 0
392-
393- let flushCurrentShift = ( ) => {
394- if ( currentShiftStart !== null && currentShiftDelta !== null && currentShiftCount > 0 ) {
395- seqNoShifts . push ( {
396- startOld : currentShiftStart ,
397- count : currentShiftCount ,
398- delta : currentShiftDelta ,
399- } )
400- }
401- currentShiftStart = null
402- currentShiftDelta = null
403- currentShiftCount = 0
404- }
378+ let shiftBuilder = new SeqNoShiftBuilder ( )
405379
380+ // Renumber the remaining messages sequentially so we continue where the server left off.
406381 for ( let i = firstPendingIndex ; i < bufferEndIndex ; i ++ ) {
407382 let message = context . messages [ i ]
408383 if ( ! message ) continue
@@ -411,28 +386,11 @@ let writerMachineFactory = setup({
411386 let newSeqNo = nextSeqNo
412387 nextSeqNo ++
413388
414- if ( oldSeqNo !== newSeqNo ) {
415- let delta = newSeqNo - oldSeqNo
416- if (
417- currentShiftStart !== null &&
418- currentShiftDelta === delta &&
419- oldSeqNo === currentShiftStart + BigInt ( currentShiftCount )
420- ) {
421- currentShiftCount ++
422- } else {
423- flushCurrentShift ( )
424- currentShiftStart = oldSeqNo
425- currentShiftDelta = delta
426- currentShiftCount = 1
427- }
428- } else {
429- flushCurrentShift ( )
430- }
431-
389+ shiftBuilder . addShift ( oldSeqNo , newSeqNo )
432390 message . seqNo = newSeqNo
433391 }
434392
435- flushCurrentShift ( )
393+ let seqNoShifts = shiftBuilder . build ( )
436394
437395 let inflightSize = context . inflightSize - acknowledgedSize - pendingSize
438396 let bufferSize = context . bufferSize + pendingSize
@@ -573,49 +531,46 @@ let writerMachineFactory = setup({
573531 // Update inflight and garbage metrics based on acknowledgments
574532 enqueue . assign ( ( { context } ) => {
575533 let removedSize = 0n
576- let removedCount = 0
534+ let removedLength = 0
577535
578536 // Move acknowledged messages to garbage
579537 for ( let i = context . inflightStart ; i < context . inflightStart + context . inflightLength ; i ++ ) {
580538 let message = context . messages [ i ]
581539
582540 if ( message && acks . has ( message . seqNo ) ) {
583541 removedSize += BigInt ( message . data . length )
584- removedCount ++
542+ removedLength ++
585543 }
586544 }
587545
588546 // Update context pointers
589547 return {
590548 garbageSize : context . garbageSize + removedSize ,
591549 inflightSize : context . inflightSize - removedSize ,
592- inflightStart : context . inflightStart + removedCount ,
593- inflightLength : context . inflightLength - removedCount ,
550+ inflightStart : context . inflightStart + removedLength ,
551+ inflightLength : context . inflightLength - removedLength ,
594552 }
595553 } )
596554
597555 // @ts -ignore
598556 if ( check ( { type : 'shouldReclaimMemory' } ) ) {
599557 enqueue . assign ( ( { context } ) => {
600- let removed = context . messages . splice ( 0 , context . inflightStart )
601- let bufferStart = context . bufferStart - removed . length
602-
603- // Recalculate bufferSize using sliding window approach:
604- // buffer region is messages from bufferStart to bufferStart + bufferLength
605- let bufferSize = 0n
606- for ( let i = bufferStart ; i < bufferStart + context . bufferLength ; i ++ ) {
607- let message = context . messages [ i ]
608- if ( message ) {
609- bufferSize += BigInt ( message . data . length )
558+ let garbageLength = context . inflightStart
559+ if ( ! garbageLength ) {
560+ return {
561+ garbageSize : 0n ,
610562 }
611563 }
612564
613- // Update context pointers
565+ context . messages . splice ( 0 , garbageLength )
566+ let bufferStart = context . bufferStart - garbageLength
567+
568+ assert . ok ( bufferStart >= 0 )
569+
614570 return {
615571 messages : context . messages ,
616572 garbageSize : 0n ,
617573 inflightStart : 0 ,
618- bufferSize,
619574 bufferStart,
620575 }
621576 } )
@@ -647,11 +602,11 @@ let writerMachineFactory = setup({
647602 }
648603
649604 let createdAt = timestampFromDate ( event . message . createdAt ?? new Date ( ) )
605+ let uncompressedSize = BigInt ( event . message . data . length )
650606 let metadataItems = Object . entries ( event . message . metadataItems || { } ) . map ( ( [ key , value ] ) => ( {
651607 key,
652608 value,
653609 } ) )
654- let uncompressedSize = BigInt ( event . message . data . length )
655610
656611 // Track seqNo mode (set once on first message, then remains constant)
657612 // Mode is passed from TopicWriter which knows it from SeqNoManager
0 commit comments