From 419ee660ca74f8176999ada896983bf013064268 Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Fri, 7 Nov 2025 01:37:10 +0300 Subject: [PATCH 1/4] Add resolveSeqNo method and track temporary seqNo shifts in TopicWriter - Add resolveSeqNo() method to get final seqNo for messages written before session initialization - Track seqNo shifts through SeqNoShiftEvent segments when session reconnects - Update write() JSDoc to warn about temporary seqNo values before session initialization - Implement efficient seqNo shift tracking using range merging and inversion algorithms - Update flush() documentation to clarify when seqNo values become final Breaking changes: None (backward compatible) --- packages/topic/package.json | 4 +- packages/topic/src/index.ts | 39 ++- packages/topic/src/writer2/machine.ts | 227 +++++++++++-- packages/topic/src/writer2/seqno-manager.ts | 14 +- packages/topic/src/writer2/types.ts | 51 +-- packages/topic/src/writer2/writer.ts | 357 +++++++++++++++++++- packages/topic/vitest.config.ts | 32 +- 7 files changed, 642 insertions(+), 82 deletions(-) diff --git a/packages/topic/package.json b/packages/topic/package.json index 3bc75faf..abcb676a 100644 --- a/packages/topic/package.json +++ b/packages/topic/package.json @@ -30,9 +30,11 @@ "types": "dist/index.d.ts", "exports": { ".": "./dist/index.js", + "./codec": "./dist/codec.js", "./reader": "./dist/reader/index.js", "./writer": "./dist/writer/index.js", - "./writer2": "./dist/writer2/index.js" + "./writer2": "./dist/writer2/index.js", + "./message": "./dist/message.js" }, "engines": { "node": ">=20.19.0", diff --git a/packages/topic/src/index.ts b/packages/topic/src/index.ts index 20d4a0d2..631019ef 100644 --- a/packages/topic/src/index.ts +++ b/packages/topic/src/index.ts @@ -1,32 +1,41 @@ -import { Driver } from "@ydbjs/core"; +import { Driver } from '@ydbjs/core' -import { type TopicReader, type TopicReaderOptions, type TopicTxReader, createTopicReader, createTopicTxReader } from "./reader/index.js"; -import type { TX } from "./tx.js"; -import { type TopicTxWriter, type TopicWriter, type TopicWriterOptions, createTopicTxWriter, createTopicWriter } from "./writer/index.js"; +import { + type TopicReader, + type TopicReaderOptions, + type TopicTxReader, + createTopicReader, + createTopicTxReader, +} from './reader/index.js' +import type { TX } from './tx.js' +import { + type TopicTxWriter, + type TopicWriter, + type TopicWriterOptions, + createTopicTxWriter, + createTopicWriter, +} from './writer/index.js' export interface TopicClient { - createReader(options: TopicReaderOptions): TopicReader; - createTxReader(tx: TX, options: TopicReaderOptions): TopicTxReader; - createWriter(options: TopicWriterOptions): TopicWriter; - createTxWriter(tx: TX, options: TopicWriterOptions): TopicTxWriter; + createReader(options: TopicReaderOptions): TopicReader + createTxReader(tx: TX, options: TopicReaderOptions): TopicTxReader + createWriter(options: TopicWriterOptions): TopicWriter + createTxWriter(tx: TX, options: TopicWriterOptions): TopicTxWriter } export function topic(driver: Driver): TopicClient { return { createReader(options) { - return createTopicReader(driver, options); + return createTopicReader(driver, options) }, createTxReader(tx: TX, options: TopicReaderOptions) { - return createTopicTxReader(tx, driver, options); + return createTopicTxReader(tx, driver, options) }, createWriter(options: TopicWriterOptions) { - return createTopicWriter(driver, options); + return createTopicWriter(driver, options) }, createTxWriter(tx: TX, options: Omit) { - return createTopicTxWriter(tx, driver, options); + return createTopicTxWriter(tx, driver, options) }, } as TopicClient } - -export type { TopicTxReader } from './reader/index.js'; -export type { TopicTxWriter } from './writer/index.js'; diff --git a/packages/topic/src/writer2/machine.ts b/packages/topic/src/writer2/machine.ts index ae897b2b..c64b89db 100644 --- a/packages/topic/src/writer2/machine.ts +++ b/packages/topic/src/writer2/machine.ts @@ -37,7 +37,14 @@ import { isRetryableError } from '@ydbjs/retry' import { assign, enqueueActions, sendTo, setup } from 'xstate' import { defaultCodecMap } from '../codec.js' import { WriterStream, type WriterStreamReceiveEvent } from './stream.js' -import type { TopicWriterOptions, WriterContext, WriterEmitted, WriterEvents, WriterInput } from './types.js' +import type { + SeqNoShiftEvent, + TopicWriterOptions, + WriterContext, + WriterEmitted, + WriterEvents, + WriterInput, +} from './types.js' import { loggers } from '@ydbjs/debug' // ============================================================================ @@ -259,24 +266,196 @@ let writerMachineFactory = setup({ // ==================================================================== /** - * Updates session state after receiving an init response. - * Emits a session event with session ID and last sequence number. + * Updates the writer context after receiving STREAM_WRITE_SESSION_INIT. + * Common steps for both seqNo modes: + * - Determine the new `inflightStart` using `serverLastSeqNo` + * - Leave all unsent messages in place while keeping their original order * - * @param enqueue - Enqueue function for scheduling actions - * @param event - Init response event containing session details + * Mode specific logic: + * - manual: perform a single pass over `[inflight, buffer)`; drop messages with `seqNo <= serverLastSeqNo` + * (already persisted on the server), compact the window, and update counters without changing seqNo values. + * - auto: after the same pass, renumber every message whose seqNo may shift and emit `SeqNoShiftEvent` + * segments so `TopicWriter.resolveSeqNo()` can map initial numbers to the final ones. + * + * @param enqueue - XState enqueue helper for scheduling actions + * @param event - init response with session metadata + * @param context - current state machine context */ - updateWriteSession: enqueueActions(({ enqueue, event }) => { + updateWriteSession: enqueueActions(({ enqueue, event, context }) => { assert.ok(event.type === 'writer.stream.response.init') assert.ok(event.data) + let lastSeqNo = event.data.lastSeqNo || 0n + let nextSeqNo = lastSeqNo + 1n + + // -------------------------------------------------------------------- + // 1. Подсчитываем подтверждённые сообщения и новое начало окна inflight + // Это позволяет дальше просто сдвигать указатели без пересоздания массивов + // -------------------------------------------------------------------- + let inflightStartIndex = context.inflightStart + let inflightEndIndex = context.inflightStart + context.inflightLength + let bufferEndIndex = context.bufferStart + context.bufferLength + + if (context.seqNoMode === 'manual') { + let writeIndex = inflightStartIndex + let acknowledgedSize = 0n + let pendingCount = 0 + let pendingSize = 0n + let bufferKeptCount = 0 + let skippedSize = 0n + let bufferSize = context.bufferSize + + for (let i = inflightStartIndex; i < bufferEndIndex; i++) { + let message = context.messages[i] + if (!message) continue + + let messageSize = BigInt(message.data.length) + + if (i < inflightEndIndex) { + if (message.seqNo <= lastSeqNo) { + acknowledgedSize += messageSize + continue + } + + pendingCount++ + pendingSize += messageSize + } else { + if (message.seqNo <= lastSeqNo) { + skippedSize += messageSize + bufferSize -= messageSize + continue + } + + bufferKeptCount++ + } + + if (writeIndex !== i) { + context.messages[writeIndex] = message + } + writeIndex++ + } + + let newBufferStart = inflightStartIndex + let bufferLength = pendingCount + bufferKeptCount + let inflightSize = context.inflightSize - (acknowledgedSize + pendingSize) + let garbageSize = context.garbageSize + acknowledgedSize + skippedSize + let newBufferSize = bufferSize + pendingSize + + enqueue.assign({ + sessionId: event.data.sessionId, + inflightStart: newBufferStart, + inflightLength: 0, + inflightSize, + bufferStart: newBufferStart, + bufferLength, + bufferSize: newBufferSize, + garbageSize, + }) + + enqueue.emit(() => ({ + type: 'writer.session', + sessionId: event.data.sessionId, + lastSeqNo, + nextSeqNo: lastSeqNo + 1n, + })) + + return + } + + let firstPendingIndex = inflightEndIndex + let acknowledgedSize = 0n + let pendingCount = 0 + let pendingSize = 0n + + for (let i = inflightStartIndex; i < inflightEndIndex; i++) { + let message = context.messages[i] + if (!message) continue + + if (firstPendingIndex === inflightEndIndex && message.seqNo > lastSeqNo) { + firstPendingIndex = i + } + + if (i < firstPendingIndex) { + acknowledgedSize += BigInt(message.data.length) + } else { + pendingCount++ + pendingSize += BigInt(message.data.length) + } + } + + let newBufferStart = firstPendingIndex + + let seqNoShifts: SeqNoShiftEvent[] = [] + let currentShiftStart: bigint | null = null + let currentShiftDelta: bigint | null = null + let currentShiftCount = 0 + + let flushCurrentShift = () => { + if (currentShiftStart !== null && currentShiftDelta !== null && currentShiftCount > 0) { + seqNoShifts.push({ + startOld: currentShiftStart, + count: currentShiftCount, + delta: currentShiftDelta, + }) + } + currentShiftStart = null + currentShiftDelta = null + currentShiftCount = 0 + } + + for (let i = firstPendingIndex; i < bufferEndIndex; i++) { + let message = context.messages[i] + if (!message) continue + + let oldSeqNo = message.seqNo + let newSeqNo = nextSeqNo + nextSeqNo++ + + if (oldSeqNo !== newSeqNo) { + let delta = newSeqNo - oldSeqNo + if ( + currentShiftStart !== null && + currentShiftDelta === delta && + oldSeqNo === currentShiftStart + BigInt(currentShiftCount) + ) { + currentShiftCount++ + } else { + flushCurrentShift() + currentShiftStart = oldSeqNo + currentShiftDelta = delta + currentShiftCount = 1 + } + } else { + flushCurrentShift() + } + + message.seqNo = newSeqNo + } + + flushCurrentShift() + + let inflightSize = context.inflightSize - acknowledgedSize - pendingSize + let bufferSize = context.bufferSize + pendingSize + let garbageSize = context.garbageSize + acknowledgedSize + let bufferLength = pendingCount + context.bufferLength + enqueue.assign({ sessionId: event.data.sessionId, + inflightStart: newBufferStart, + inflightLength: 0, + inflightSize, + bufferStart: newBufferStart, + bufferLength, + bufferSize, + garbageSize, }) enqueue.emit(() => ({ type: 'writer.session', sessionId: event.data.sessionId, - lastSeqNo: event.data.lastSeqNo || 0n, + lastSeqNo: lastSeqNo, + nextSeqNo, + ...(seqNoShifts.length ? { seqNoShifts } : {}), })) }), @@ -304,7 +483,9 @@ let writerMachineFactory = setup({ if (context.inflightLength >= context.options.maxInflightCount!) { enqueue.emit(() => ({ type: 'writer.error', - error: new Error('Internal Error: Max inflight messages limit reached. If you see this error, please report it.'), + error: new Error( + 'Internal Error: Max inflight messages limit reached. If you see this error, please report it.' + ), })) return @@ -440,7 +621,6 @@ let writerMachineFactory = setup({ }) } - // @ts-ignore enqueue({ type: 'log', params: { message: 'ACK | {stats}' } }) }), @@ -458,7 +638,9 @@ let writerMachineFactory = setup({ if (event.message.data.length > MAX_PAYLOAD_SIZE) { enqueue.emit(() => ({ type: 'writer.error', - error: new Error('Internal Error: Payload size exceeds 48MiB limit. If you see this error, please report it.'), + error: new Error( + 'Internal Error: Payload size exceeds 48MiB limit. If you see this error, please report it.' + ), })) return @@ -471,6 +653,10 @@ let writerMachineFactory = setup({ })) let uncompressedSize = BigInt(event.message.data.length) + // Track seqNo mode (set once on first message, then remains constant) + // Mode is passed from TopicWriter which knows it from SeqNoManager + let seqNoMode: 'auto' | 'manual' | null = context.seqNoMode ?? event.seqNoMode ?? null + let message = create(StreamWriteMessage_WriteRequest_MessageDataSchema, { data: event.message.data, seqNo: event.message.seqNo, @@ -483,8 +669,9 @@ let writerMachineFactory = setup({ context.messages.push(message) enqueue.assign(({ context }) => ({ + seqNoMode, bufferSize: context.bufferSize + BigInt(event.message.data.length), - bufferLength: context.bufferLength + 1 + bufferLength: context.bufferLength + 1, })) //@ts-ignore @@ -531,6 +718,7 @@ let writerMachineFactory = setup({ releaseResources: assign(() => { return { messages: [], + seqNoMode: null, bufferStart: 0, bufferLength: 0, inflightStart: 0, @@ -694,6 +882,7 @@ export const WriterMachine = writerMachineFactory.createMachine({ // Single array approach with sliding window messages: [], + seqNoMode: null, bufferStart: 0, bufferLength: 0, inflightStart: 0, @@ -716,12 +905,12 @@ export const WriterMachine = writerMachineFactory.createMachine({ on: { 'writer.close': { target: '.closing', - actions: [log('CLS | {topicPath}')] + actions: [log('CLS | {topicPath}')], }, 'writer.destroy': { // Force close, skip graceful shutdown target: '.closed', - actions: [log('DST | {topicPath}')] + actions: [log('DST | {topicPath}')], }, 'writer.stream.error': { // Enter error state on stream error @@ -738,8 +927,8 @@ export const WriterMachine = writerMachineFactory.createMachine({ idle: { always: { target: 'connecting', - actions: [log('INT | {topicPath}')] - } + actions: [log('INT | {topicPath}')], + }, }, /** * Connecting state: Establishes connection to the topic stream. @@ -941,11 +1130,7 @@ export const WriterMachine = writerMachineFactory.createMachine({ closed: { // All resources are released in this final state type: 'final', - entry: [ - 'closeConnection', - 'releaseResources', - log('FIN | {stats}'), - ], - } + entry: ['closeConnection', 'releaseResources', log('FIN | {stats}')], + }, }, }) diff --git a/packages/topic/src/writer2/seqno-manager.ts b/packages/topic/src/writer2/seqno-manager.ts index 2a63388d..9e78d4ab 100644 --- a/packages/topic/src/writer2/seqno-manager.ts +++ b/packages/topic/src/writer2/seqno-manager.ts @@ -7,8 +7,8 @@ */ export class SeqNoManager { #mode: 'auto' | 'manual' | null = null - #nextSeqNo: bigint = 1n #lastSeqNo: bigint = 0n + #nextSeqNo: bigint = 1n #highestUserSeqNo: bigint = 0n constructor(initialSeqNo: bigint = 0n) { @@ -37,7 +37,9 @@ export class SeqNoManager { if (this.#mode === 'auto') { if (userSeqNo !== undefined) { - throw new Error('Cannot mix auto and manual seqNo modes. Once auto mode is started, all messages must use auto seqNo.') + throw new Error( + 'Cannot mix auto and manual seqNo modes. Once auto mode is started, all messages must use auto seqNo.' + ) } let seqNo = this.#nextSeqNo @@ -47,12 +49,16 @@ export class SeqNoManager { } else { // Manual mode if (userSeqNo === undefined) { - throw new Error('Cannot mix manual and auto seqNo modes. Once manual mode is started, all messages must provide seqNo.') + throw new Error( + 'Cannot mix manual and auto seqNo modes. Once manual mode is started, all messages must provide seqNo.' + ) } // Validate strictly increasing if (userSeqNo <= this.#highestUserSeqNo) { - throw new Error(`SeqNo must be strictly increasing. Provided: ${userSeqNo}, highest seen: ${this.#highestUserSeqNo}`) + throw new Error( + `SeqNo must be strictly increasing. Provided: ${userSeqNo}, highest seen: ${this.#highestUserSeqNo}` + ) } this.#highestUserSeqNo = userSeqNo diff --git a/packages/topic/src/writer2/types.ts b/packages/topic/src/writer2/types.ts index ae9ed3ae..2230eeb6 100644 --- a/packages/topic/src/writer2/types.ts +++ b/packages/topic/src/writer2/types.ts @@ -1,9 +1,9 @@ -import type { Driver } from "@ydbjs/core" -import type { ActorRef, CallbackSnapshot } from "xstate" -import type { CompressionCodec } from "../codec.js" -import type { TX } from "../tx.js" -import type { WriterStreamEmittedEvent, WriterStreamInput, WriterStreamReceiveEvent } from "./stream.ts" -import type { YDBDebugLogger } from "@ydbjs/debug" +import type { Driver } from '@ydbjs/core' +import type { ActorRef, CallbackSnapshot } from 'xstate' +import type { CompressionCodec } from '../codec.js' +import type { TX } from '../tx.js' +import type { WriterStreamEmittedEvent, WriterStreamInput, WriterStreamReceiveEvent } from './stream.ts' +import type { YDBDebugLogger } from '@ydbjs/debug' export type TopicWriterOptions = { // Transaction identity. @@ -79,7 +79,11 @@ export type WriterContext = { readonly sessionId?: string // Message buffers - single array with sliding window approach - readonly messages: import("@ydbjs/api/topic").StreamWriteMessage_WriteRequest_MessageData[] + readonly messages: import('@ydbjs/api/topic').StreamWriteMessage_WriteRequest_MessageData[] + + // Track seqNo mode: 'auto' means all seqno are auto-generated, 'manual' means all are user-provided + // Updated when first message is written, then remains constant (SeqNoManager enforces mode consistency) + readonly seqNoMode: 'auto' | 'manual' | null // Buffer window: [bufferStart, bufferStart + bufferLength) readonly bufferStart: number @@ -97,20 +101,28 @@ export type WriterContext = { readonly lastError?: unknown // Reference to the stream actor - readonly streamRef?: ActorRef, WriterStreamReceiveEvent, WriterStreamEmittedEvent> | undefined + readonly streamRef?: + | ActorRef, WriterStreamReceiveEvent, WriterStreamEmittedEvent> + | undefined } export type MessageToSend = { data: Uint8Array - seqNo: bigint // Now required - TopicWriter always provides it + seqNo: bigint // Always provided by TopicWriter (either auto-generated or user-provided) createdAt?: Date metadataItems?: Record } +export type SeqNoShiftEvent = { + startOld: bigint + count: number + delta: bigint +} + // Events for the state machine export type WriterEvents = // User-initiated events - | { type: 'writer.write'; message: MessageToSend } + | { type: 'writer.write'; message: MessageToSend; seqNoMode?: 'auto' | 'manual' } | { type: 'writer.flush' } | { type: 'writer.close' } | { type: 'writer.destroy'; reason?: unknown } @@ -121,20 +133,19 @@ export type WriterEvents = export type WriterEmitted = | { type: 'writer.error'; error: unknown } | { type: 'writer.close'; reason?: unknown } - | { type: 'writer.session'; sessionId: string; lastSeqNo: bigint } + | { + type: 'writer.session' + sessionId: string + lastSeqNo: bigint + nextSeqNo: bigint + seqNoShifts?: SeqNoShiftEvent[] + } | { type: 'writer.acknowledgments'; acknowledgments: Map } export type WriterInput = { - driver: Driver; + driver: Driver options: TopicWriterOptions } // State machine states -export type WriterStates = - | 'connecting' - | 'connected' - | 'errored' - | 'writing' - | 'flushing' - | 'closing' - | 'destroyed' +export type WriterStates = 'connecting' | 'connected' | 'errored' | 'writing' | 'flushing' | 'closing' | 'destroyed' diff --git a/packages/topic/src/writer2/writer.ts b/packages/topic/src/writer2/writer.ts index 9c9088d9..cccfec0c 100644 --- a/packages/topic/src/writer2/writer.ts +++ b/packages/topic/src/writer2/writer.ts @@ -3,13 +3,21 @@ import type { Driver } from '@ydbjs/core' import { abortable } from '@ydbjs/abortable' import { WriterMachine } from './machine.js' import { SeqNoManager } from './seqno-manager.js' -import type { TopicWriterOptions } from './types.js' +import type { SeqNoShiftEvent, TopicWriterOptions } from './types.js' + +type SeqNoShiftSegment = { + start: bigint + end: bigint + delta: bigint +} export class TopicWriter implements AsyncDisposable { #actor: ActorRefFrom #promise: ReturnType> | null = null #subscription: Subscription #seqNoManager: SeqNoManager + #isSessionInitialized = false + #seqNoShifts: SeqNoShiftSegment[] = [] constructor(driver: Driver, options: TopicWriterOptions) { this.#seqNoManager = new SeqNoManager() @@ -27,7 +35,15 @@ export class TopicWriter implements AsyncDisposable { // Subscribe to emitted events for seqNo management this.#actor.on('writer.session', (event) => { - this.#seqNoManager.initialize(event.lastSeqNo) + // State machine already recalculated seqno for all buffered messages + // event.nextSeqNo is the next seqno that should be used for new messages + // So lastSeqNo for SeqNoManager should be nextSeqNo - 1 + let lastSeqNo = event.nextSeqNo - 1n + if (event.seqNoShifts?.length) { + this.#applySeqNoShifts(event.seqNoShifts) + } + this.#seqNoManager.initialize(lastSeqNo) + this.#isSessionInitialized = true }) // Subscribe to error events @@ -44,19 +60,249 @@ export class TopicWriter implements AsyncDisposable { this.#actor.start() } + #applySeqNoShifts(shifts: SeqNoShiftEvent[]): void { + for (let shift of shifts) { + this.#applySeqNoShift(shift.startOld, shift.count, shift.delta) + } + } + + #applySeqNoShift(startOld: bigint, count: number, delta: bigint): void { + if (!count || delta === 0n) { + return + } + + let oldEnd = startOld + BigInt(count) + let updatedSegments: SeqNoShiftSegment[] = [] + let coveredOldRanges: Array<{ start: bigint; end: bigint }> = [] + + for (let segment of this.#seqNoShifts) { + let segStart = segment.start + let segEnd = segment.end + let segDelta = segment.delta + + let intersectionStart = segStart > startOld - segDelta ? segStart : startOld - segDelta + let intersectionEnd = segEnd < oldEnd - segDelta ? segEnd : oldEnd - segDelta + + if (intersectionStart < intersectionEnd) { + if (segStart < intersectionStart) { + updatedSegments.push({ start: segStart, end: intersectionStart, delta: segDelta }) + } + + updatedSegments.push({ start: intersectionStart, end: intersectionEnd, delta: segDelta + delta }) + + if (intersectionEnd < segEnd) { + updatedSegments.push({ start: intersectionEnd, end: segEnd, delta: segDelta }) + } + + let coveredStartOld = intersectionStart + segDelta + let coveredEndOld = intersectionEnd + segDelta + coveredOldRanges.push({ start: coveredStartOld, end: coveredEndOld }) + } else { + updatedSegments.push({ ...segment }) + } + } + + this.#seqNoShifts = this.#mergeShiftSegments(updatedSegments) + + let mergedCovered = this.#mergeRanges(coveredOldRanges, startOld, oldEnd) + let uncovered = this.#invertRanges(mergedCovered, startOld, oldEnd) + for (let range of uncovered) { + this.#seqNoShifts.push({ start: range.start, end: range.end, delta }) + } + + this.#seqNoShifts = this.#mergeShiftSegments(this.#seqNoShifts) + } + + #mergeShiftSegments(segments: SeqNoShiftSegment[]): SeqNoShiftSegment[] { + if (!segments.length) { + return [] + } + + let sorted: SeqNoShiftSegment[] = [] + for (let segment of segments) { + let entry = { ...segment } + let insertIndex = 0 + while (insertIndex < sorted.length && sorted[insertIndex]!.start <= entry.start) { + insertIndex++ + } + sorted.splice(insertIndex, 0, entry) + } + + let merged: SeqNoShiftSegment[] = [] + + for (let segment of sorted) { + let last = merged[merged.length - 1] + if (!last) { + merged.push({ ...segment }) + continue + } + + if (segment.start <= last.end) { + if (segment.delta === last.delta) { + if (segment.end > last.end) { + last.end = segment.end + } + continue + } + + if (segment.start < last.end) { + let adjustedStart = last.end + if (adjustedStart < segment.end) { + merged.push({ start: adjustedStart, end: segment.end, delta: segment.delta }) + } + continue + } + } + + if (segment.start === last.end && segment.delta === last.delta) { + last.end = segment.end + } else { + merged.push({ ...segment }) + } + } + + return merged + } + + #mergeRanges( + ranges: Array<{ start: bigint; end: bigint }>, + boundStart: bigint, + boundEnd: bigint + ): Array<{ start: bigint; end: bigint }> { + if (!ranges.length) { + return [] + } + + let filtered = ranges + .map((range) => { + let start = range.start > boundStart ? range.start : boundStart + let end = range.end < boundEnd ? range.end : boundEnd + return start < end ? { start, end } : null + }) + .filter((range): range is { start: bigint; end: bigint } => Boolean(range)) + + if (!filtered.length) { + return [] + } + + let sorted: Array<{ start: bigint; end: bigint }> = [] + for (let range of filtered) { + let entry = { ...range } + let insertIndex = 0 + while (insertIndex < sorted.length && sorted[insertIndex]!.start <= entry.start) { + insertIndex++ + } + sorted.splice(insertIndex, 0, entry) + } + + let merged: Array<{ start: bigint; end: bigint }> = [] + + for (let range of sorted) { + let last = merged[merged.length - 1] + if (!last) { + merged.push({ ...range }) + continue + } + + if (range.start <= last.end) { + if (range.end > last.end) { + last.end = range.end + } + } else { + merged.push({ ...range }) + } + } + + return merged + } + + #invertRanges( + ranges: Array<{ start: bigint; end: bigint }>, + boundStart: bigint, + boundEnd: bigint + ): Array<{ start: bigint; end: bigint }> { + if (boundStart >= boundEnd) { + return [] + } + + if (!ranges.length) { + return [{ start: boundStart, end: boundEnd }] + } + + let result: Array<{ start: bigint; end: bigint }> = [] + let cursor = boundStart + + for (let range of ranges) { + if (cursor < range.start) { + result.push({ start: cursor, end: range.start }) + } + if (cursor < range.end) { + cursor = range.end + } + } + + if (cursor < boundEnd) { + result.push({ start: cursor, end: boundEnd }) + } + + return result + } + /** * Write a message to the topic * @param data Message payload * @param extra Optional message metadata * @returns Sequence number of the message + * + * **⚠️ WARNING: Do NOT rely on returned seqNo for critical operations!** + * + * The returned seqNo may be a temporary value that gets recalculated after session initialization. + * This can lead to incorrect behavior if used for: + * - Message deduplication + * - Tracking message delivery + * - Database lookups by seqNo + * - Any operation that requires accurate seqNo values + * + * **When seqNo is temporary:** + * - Session is not yet initialized (first messages written before connection) + * - Writer reconnected after network issues + * - Auto-generated seqNo mode (not user-provided) + * + * **When seqNo is final:** + * - User-provided seqNo (via `extra.seqNo`) - always final, never recalculated + * - After `flush()` completes - all messages have been sent with final seqNo + * + * **Recommended usage:** + * ```typescript + * // ❌ BAD: Storing seqNo immediately + * let seqNo = writer.write(data) + * await saveToDatabase(seqNo) // May be wrong! + * + * // ✅ GOOD: Wait for flush to ensure seqNo is final + * writer.write(data) + * let lastSeqNo = await writer.flush() // All messages up to this seqNo are final + * await saveToDatabase(lastSeqNo) + * + * // ✅ GOOD: Use user-provided seqNo (always final) + * let mySeqNo = 100n + * writer.write(data, { seqNo: mySeqNo }) + * // mySeqNo is guaranteed to be final + * ``` */ - write(data: Uint8Array, extra?: { - seqNo?: bigint - createdAt?: Date - metadataItems?: Record - }): bigint { + write( + data: Uint8Array, + extra?: { + seqNo?: bigint + createdAt?: Date + metadataItems?: Record + } + ): bigint { // Get seqNo from SeqNoManager (handles auto/manual modes) let seqNo = this.#seqNoManager.getNext(extra?.seqNo) + let seqNoState = this.#seqNoManager.getState() + // Determine mode for state machine (if not yet determined, determine it now) + let seqNoMode: 'auto' | 'manual' | undefined = + seqNoState.mode ?? (extra?.seqNo !== undefined ? 'manual' : 'auto') this.#actor.send({ type: 'writer.write', @@ -64,17 +310,46 @@ export class TopicWriter implements AsyncDisposable { data, seqNo, ...(extra?.createdAt && { createdAt: extra.createdAt }), - ...(extra?.metadataItems && { metadataItems: extra.metadataItems }) - } + ...(extra?.metadataItems && { metadataItems: extra.metadataItems }), + }, + seqNoMode, }) return seqNo } + /** + * Resolve final seqNo for a message that was written before session initialization + * or was retried after reconnection. + * + * @param initialSeqNo Temporary seqNo returned by write() + * @returns Final seqNo assigned after session re-initialization + */ + resolveSeqNo(initialSeqNo: bigint): bigint { + for (let segment of this.#seqNoShifts) { + if (initialSeqNo >= segment.start && initialSeqNo < segment.end) { + return initialSeqNo + segment.delta + } + } + + return initialSeqNo + } + /** * Flush all buffered messages and wait for acknowledgment * @param signal Optional AbortSignal to cancel the flush operation * @returns Promise that resolves with the last acknowledged sequence number + * + * **Important:** After `flush()` completes, all messages written before this call + * have been sent to the server with their final seqNo values. This is the safe way + * to ensure seqNo accuracy for critical operations like deduplication or tracking. + * + * **Getting final seqNo for specific messages:** + * After `flush()` completes, all seqNo values up to the returned `lastSeqNo` are final. + * If you need to track individual messages, you can: + * - Call `writer.resolveSeqNo(initialSeqNo)` to translate temporary numbers into final ones + * - Use the order of `write()` calls to determine final seqNo (sequential after flush) + * - Use user-provided seqNo (always final, never recalculated) */ async flush(signal?: AbortSignal): Promise { // If there's already a flush in progress, return the same promise @@ -110,7 +385,8 @@ export class TopicWriter implements AsyncDisposable { /** * Get current writer statistics - */ get stats() { + */ + get stats() { let snapshot = this.#actor.getSnapshot() let seqNoState = this.#seqNoManager.getState() @@ -123,26 +399,62 @@ export class TopicWriter implements AsyncDisposable { bufferLength: snapshot.context.bufferLength, inflightSize: snapshot.context.inflightSize, inflightLength: snapshot.context.inflightLength, + isSessionInitialized: this.#isSessionInitialized, } } + /** + * Check if the writer session is initialized + * @returns true if session is initialized and seqNo values are final, false if they may be temporary + * + * **Usage:** + * ```typescript + * let seqNo = writer.write(data) + * if (!writer.isSessionInitialized) { + * // seqNo may be temporary, wait for flush before using it + * await writer.flush() + * } + * ``` + */ + get isSessionInitialized(): boolean { + return this.#isSessionInitialized + } + /** * Close the writer gracefully, waiting for all messages to be sent */ async close(signal?: AbortSignal): Promise { + let snapshot = this.#actor.getSnapshot() + + // If already closed, return immediately + if (snapshot.value === 'closed') { + return + } + + // If actor is stopped, return immediately + if (snapshot.status === 'stopped') { + return + } + let { promise, resolve } = Promise.withResolvers() let subscription = this.#actor.subscribe((snapshot) => { - if (snapshot.value === 'closed') { + if (snapshot.value === 'closed' || snapshot.status === 'stopped') { resolve() } }) + // Check again before sending (actor might have stopped between checks) + snapshot = this.#actor.getSnapshot() + if (snapshot.value === 'closed' || snapshot.status === 'stopped') { + subscription.unsubscribe() + return + } + this.#actor.send({ type: 'writer.close' }) - return (signal ? abortable(signal, promise) : promise) - .finally(() => { - subscription.unsubscribe() - }) + return (signal ? abortable(signal, promise) : promise).finally(() => { + subscription.unsubscribe() + }) } /** @@ -152,6 +464,7 @@ export class TopicWriter implements AsyncDisposable { // Reject any pending flush this.#promise?.reject(new Error('Writer was destroyed')) this.#promise = null + this.#seqNoShifts = [] // Send destroy event (optional - for cleanup logic) this.#actor.send({ type: 'writer.destroy', ...(reason && { reason }) }) @@ -167,8 +480,16 @@ export class TopicWriter implements AsyncDisposable { * AsyncDisposable implementation - graceful close with resource cleanup */ async [Symbol.asyncDispose](): Promise { - await this.close() - this.destroy() + // Try graceful close first + try { + await this.close() + } catch (error) { + // If close fails, force destroy + this.destroy(error as Error) + throw error + } + // After successful close, the actor is already stopped in closed state + // Just clean up subscription + this.#subscription.unsubscribe() } - } diff --git a/packages/topic/vitest.config.ts b/packages/topic/vitest.config.ts index d38da172..98cf2c9e 100644 --- a/packages/topic/vitest.config.ts +++ b/packages/topic/vitest.config.ts @@ -2,8 +2,34 @@ import { defineProject } from 'vitest/config' export default defineProject({ test: { - name: 'topic', - include: ['src/**/*.test.ts'], - environment: 'node', + projects: [ + { + test: { + name: { + label: 'uni', + color: 'yellow', + }, + include: ['./src/**/*.test.ts'], + environment: 'node', + benchmark: { + include: ['./src/**/*.bench.ts'], + }, + }, + }, + { + test: { + name: { + label: 'int', + color: 'blue', + }, + include: ['./tests/**/*.test.ts'], + environment: 'node', + globalSetup: '../../vitest.setup.ydb.ts', + benchmark: { + include: ['./tests/**/*.bench.ts'], + }, + }, + }, + ], }, }) From 3873268ce8f5872ac075693ceb458681ead49ad2 Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Fri, 7 Nov 2025 12:13:44 +0300 Subject: [PATCH 2/4] Refactor seqNo resolution and simplify writer2 updateWriteSession - Extract seqNo mapping logic into SeqNoResolver class for better testability - Add SeqNoShiftBuilder to accumulate shifts during message renumbering - Simplify manual seqNo mode: only adjust pointers, no array mutations - Add comprehensive unit tests for SeqNoResolver, SeqNoShiftBuilder, SeqNoManager - Improve code documentation and comments throughout updateWriteSession - Add integration test verifying resolveSeqNo works correctly --- .changeset/heavy-facts-happen.md | 22 ++ packages/topic/src/writer2/machine.ts | 147 +++++------- .../topic/src/writer2/seqno-manager.test.ts | 66 ++++++ .../topic/src/writer2/seqno-resolver.test.ts | 87 +++++++ packages/topic/src/writer2/seqno-resolver.ts | 100 +++++++++ .../src/writer2/seqno-shift-builder.test.ts | 144 ++++++++++++ .../topic/src/writer2/seqno-shift-builder.ts | 82 +++++++ packages/topic/src/writer2/types.ts | 8 +- packages/topic/src/writer2/writer.ts | 212 +----------------- packages/topic/tests/writer2.test.ts | 8 +- 10 files changed, 570 insertions(+), 306 deletions(-) create mode 100644 .changeset/heavy-facts-happen.md create mode 100644 packages/topic/src/writer2/seqno-manager.test.ts create mode 100644 packages/topic/src/writer2/seqno-resolver.test.ts create mode 100644 packages/topic/src/writer2/seqno-resolver.ts create mode 100644 packages/topic/src/writer2/seqno-shift-builder.test.ts create mode 100644 packages/topic/src/writer2/seqno-shift-builder.ts diff --git a/.changeset/heavy-facts-happen.md b/.changeset/heavy-facts-happen.md new file mode 100644 index 00000000..919ae77d --- /dev/null +++ b/.changeset/heavy-facts-happen.md @@ -0,0 +1,22 @@ +--- +'@ydbjs/topic': minor +--- + +- Add `resolveSeqNo()` method to `TopicWriter` to resolve temporary sequence numbers to final values after session re-initialization. Refactored seqNo tracking logic into `SeqNoResolver` and `SeqNoShiftBuilder` classes for better testability and maintainability. + +**Important:** The `write()` method now returns **temporary** seqNo values that may be recalculated after session initialization or reconnection. To get the final seqNo assigned by the server, use `resolveSeqNo()` after `flush()` completes. + +**New API:** + +- `TopicWriter.resolveSeqNo(initialSeqNo: bigint): bigint` - resolves temporary seqNo returned by `write()` to final seqNo assigned by server + +**Behavior changes:** + +- `write()` returns temporary seqNo (may change after session re-initialization) +- User-provided seqNo (via `extra.seqNo`) remain final and unchanged +- After `flush()` completes, all seqNo up to returned `lastSeqNo` are final + +**Migration guide:** + +- If you store seqNo immediately after `write()`, consider using `resolveSeqNo()` after `flush()` to get final values +- User-provided seqNo are always final and don't require `resolveSeqNo()` diff --git a/packages/topic/src/writer2/machine.ts b/packages/topic/src/writer2/machine.ts index c64b89db..e8c62c22 100644 --- a/packages/topic/src/writer2/machine.ts +++ b/packages/topic/src/writer2/machine.ts @@ -37,14 +37,8 @@ import { isRetryableError } from '@ydbjs/retry' import { assign, enqueueActions, sendTo, setup } from 'xstate' import { defaultCodecMap } from '../codec.js' import { WriterStream, type WriterStreamReceiveEvent } from './stream.js' -import type { - SeqNoShiftEvent, - TopicWriterOptions, - WriterContext, - WriterEmitted, - WriterEvents, - WriterInput, -} from './types.js' +import { SeqNoShiftBuilder } from './seqno-shift-builder.js' +import type { TopicWriterOptions, WriterContext, WriterEmitted, WriterEvents, WriterInput } from './types.js' import { loggers } from '@ydbjs/debug' // ============================================================================ @@ -267,15 +261,15 @@ let writerMachineFactory = setup({ /** * Updates the writer context after receiving STREAM_WRITE_SESSION_INIT. - * Common steps for both seqNo modes: - * - Determine the new `inflightStart` using `serverLastSeqNo` - * - Leave all unsent messages in place while keeping their original order * - * Mode specific logic: - * - manual: perform a single pass over `[inflight, buffer)`; drop messages with `seqNo <= serverLastSeqNo` - * (already persisted on the server), compact the window, and update counters without changing seqNo values. - * - auto: after the same pass, renumber every message whose seqNo may shift and emit `SeqNoShiftEvent` - * segments so `TopicWriter.resolveSeqNo()` can map initial numbers to the final ones. + * Common groundwork for both seqNo modes: + * - Walk the `[inflight, buffer)` window once while keeping message order + * - Trim acked messages (seqNo <= `lastSeqNo`) and update sliding-window pointers + * + * Mode-specific behaviour: + * - Manual seqNo: compact the window, update bookkeeping, keep user-provided seqNo as-is + * - Auto seqNo: compact the window, renumber remaining messages, emit `SeqNoShift` segments so + * `TopicWriter.resolveSeqNo()` can translate temporary numbers into the final ones * * @param enqueue - XState enqueue helper for scheduling actions * @param event - init response with session metadata @@ -288,57 +282,50 @@ let writerMachineFactory = setup({ let lastSeqNo = event.data.lastSeqNo || 0n let nextSeqNo = lastSeqNo + 1n - // -------------------------------------------------------------------- - // 1. Подсчитываем подтверждённые сообщения и новое начало окна inflight - // Это позволяет дальше просто сдвигать указатели без пересоздания массивов - // -------------------------------------------------------------------- + // Count acknowledged messages and identify the new inflight window start so we can slide pointers in place. let inflightStartIndex = context.inflightStart let inflightEndIndex = context.inflightStart + context.inflightLength let bufferEndIndex = context.bufferStart + context.bufferLength + // Manual seqNo mode: drop acked entries and slide the window, seqNo stay untouched. if (context.seqNoMode === 'manual') { - let writeIndex = inflightStartIndex let acknowledgedSize = 0n - let pendingCount = 0 let pendingSize = 0n - let bufferKeptCount = 0 - let skippedSize = 0n let bufferSize = context.bufferSize + let firstKeptIndex: number | null = null + // Single pass over [inflight, buffer): skip acknowledged items and record the first live message. for (let i = inflightStartIndex; i < bufferEndIndex; i++) { let message = context.messages[i] if (!message) continue let messageSize = BigInt(message.data.length) - if (i < inflightEndIndex) { - if (message.seqNo <= lastSeqNo) { - acknowledgedSize += messageSize - continue - } - - pendingCount++ - pendingSize += messageSize - } else { - if (message.seqNo <= lastSeqNo) { - skippedSize += messageSize + // Messages already acknowledged by the server can be dropped from the sliding window. + if (message.seqNo <= lastSeqNo) { + acknowledgedSize += messageSize + if (i >= inflightEndIndex) { + // They came from buffer, so shrink buffer accounting as well. bufferSize -= messageSize - continue } + continue + } - bufferKeptCount++ + // Remember the first index that still contains a message we need to keep. + if (firstKeptIndex === null) { + firstKeptIndex = i } - if (writeIndex !== i) { - context.messages[writeIndex] = message + if (i < inflightEndIndex) { + // Anything left in inflight becomes pending work that must be resent. + pendingSize += messageSize } - writeIndex++ } - let newBufferStart = inflightStartIndex - let bufferLength = pendingCount + bufferKeptCount + let newBufferStart = firstKeptIndex ?? bufferEndIndex + let bufferLength = bufferEndIndex - newBufferStart let inflightSize = context.inflightSize - (acknowledgedSize + pendingSize) - let garbageSize = context.garbageSize + acknowledgedSize + skippedSize + let garbageSize = context.garbageSize + acknowledgedSize let newBufferSize = bufferSize + pendingSize enqueue.assign({ @@ -362,11 +349,14 @@ let writerMachineFactory = setup({ return } + // Auto seqNo mode: compact window then reassign seqNo for the remaining messages. let firstPendingIndex = inflightEndIndex let acknowledgedSize = 0n let pendingCount = 0 let pendingSize = 0n + // Scan inflight messages to find the first one that still needs server confirmation and to measure how much + // data must move back into the buffer before we renumber everything. for (let i = inflightStartIndex; i < inflightEndIndex; i++) { let message = context.messages[i] if (!message) continue @@ -385,24 +375,9 @@ let writerMachineFactory = setup({ let newBufferStart = firstPendingIndex - let seqNoShifts: SeqNoShiftEvent[] = [] - let currentShiftStart: bigint | null = null - let currentShiftDelta: bigint | null = null - let currentShiftCount = 0 - - let flushCurrentShift = () => { - if (currentShiftStart !== null && currentShiftDelta !== null && currentShiftCount > 0) { - seqNoShifts.push({ - startOld: currentShiftStart, - count: currentShiftCount, - delta: currentShiftDelta, - }) - } - currentShiftStart = null - currentShiftDelta = null - currentShiftCount = 0 - } + let shiftBuilder = new SeqNoShiftBuilder() + // Renumber the remaining messages sequentially so we continue where the server left off. for (let i = firstPendingIndex; i < bufferEndIndex; i++) { let message = context.messages[i] if (!message) continue @@ -411,28 +386,11 @@ let writerMachineFactory = setup({ let newSeqNo = nextSeqNo nextSeqNo++ - if (oldSeqNo !== newSeqNo) { - let delta = newSeqNo - oldSeqNo - if ( - currentShiftStart !== null && - currentShiftDelta === delta && - oldSeqNo === currentShiftStart + BigInt(currentShiftCount) - ) { - currentShiftCount++ - } else { - flushCurrentShift() - currentShiftStart = oldSeqNo - currentShiftDelta = delta - currentShiftCount = 1 - } - } else { - flushCurrentShift() - } - + shiftBuilder.addShift(oldSeqNo, newSeqNo) message.seqNo = newSeqNo } - flushCurrentShift() + let seqNoShifts = shiftBuilder.build() let inflightSize = context.inflightSize - acknowledgedSize - pendingSize let bufferSize = context.bufferSize + pendingSize @@ -573,7 +531,7 @@ let writerMachineFactory = setup({ // Update inflight and garbage metrics based on acknowledgments enqueue.assign(({ context }) => { let removedSize = 0n - let removedCount = 0 + let removedLength = 0 // Move acknowledged messages to garbage for (let i = context.inflightStart; i < context.inflightStart + context.inflightLength; i++) { @@ -581,7 +539,7 @@ let writerMachineFactory = setup({ if (message && acks.has(message.seqNo)) { removedSize += BigInt(message.data.length) - removedCount++ + removedLength++ } } @@ -589,33 +547,30 @@ let writerMachineFactory = setup({ return { garbageSize: context.garbageSize + removedSize, inflightSize: context.inflightSize - removedSize, - inflightStart: context.inflightStart + removedCount, - inflightLength: context.inflightLength - removedCount, + inflightStart: context.inflightStart + removedLength, + inflightLength: context.inflightLength - removedLength, } }) // @ts-ignore if (check({ type: 'shouldReclaimMemory' })) { enqueue.assign(({ context }) => { - let removed = context.messages.splice(0, context.inflightStart) - let bufferStart = context.bufferStart - removed.length - - // Recalculate bufferSize using sliding window approach: - // buffer region is messages from bufferStart to bufferStart + bufferLength - let bufferSize = 0n - for (let i = bufferStart; i < bufferStart + context.bufferLength; i++) { - let message = context.messages[i] - if (message) { - bufferSize += BigInt(message.data.length) + let garbageLength = context.inflightStart + if (!garbageLength) { + return { + garbageSize: 0n, } } - // Update context pointers + context.messages.splice(0, garbageLength) + let bufferStart = context.bufferStart - garbageLength + + assert.ok(bufferStart >= 0) + return { messages: context.messages, garbageSize: 0n, inflightStart: 0, - bufferSize, bufferStart, } }) @@ -647,11 +602,11 @@ let writerMachineFactory = setup({ } let createdAt = timestampFromDate(event.message.createdAt ?? new Date()) + let uncompressedSize = BigInt(event.message.data.length) let metadataItems = Object.entries(event.message.metadataItems || {}).map(([key, value]) => ({ key, value, })) - let uncompressedSize = BigInt(event.message.data.length) // Track seqNo mode (set once on first message, then remains constant) // Mode is passed from TopicWriter which knows it from SeqNoManager diff --git a/packages/topic/src/writer2/seqno-manager.test.ts b/packages/topic/src/writer2/seqno-manager.test.ts new file mode 100644 index 00000000..1b836577 --- /dev/null +++ b/packages/topic/src/writer2/seqno-manager.test.ts @@ -0,0 +1,66 @@ +import { expect, test } from 'vitest' +import { SeqNoManager } from './seqno-manager.js' + +test('auto mode generates sequential numbers starting from initial + 1', () => { + let manager = new SeqNoManager(5n) + + expect(manager.getNext()).toBe(6n) + expect(manager.getNext()).toBe(7n) + + let state = manager.getState() + expect(state.mode).toBe('auto') + expect(state.nextSeqNo).toBe(8n) + expect(state.lastSeqNo).toBe(7n) +}) + +test('auto mode rejects manual seqNo once started', () => { + let manager = new SeqNoManager() + + manager.getNext() + + expect(() => manager.getNext(10n)).toThrowError( + /Cannot mix auto and manual seqNo modes/ + ) +}) + +test('initialize adjusts next seqNo in auto mode', () => { + let manager = new SeqNoManager() + manager.initialize(42n) + + expect(manager.getNext()).toBe(43n) + let state = manager.getState() + expect(state.lastSeqNo).toBe(43n) + expect(state.nextSeqNo).toBe(44n) +}) + +test('manual mode accepts strictly increasing user seqNo', () => { + let manager = new SeqNoManager() + + expect(manager.getNext(100n)).toBe(100n) + expect(manager.getNext(101n)).toBe(101n) + + let state = manager.getState() + expect(state.mode).toBe('manual') + expect(state.highestUserSeqNo).toBe(101n) + expect(state.lastSeqNo).toBe(101n) +}) + +test('manual mode rejects missing seqNo once started', () => { + let manager = new SeqNoManager() + + manager.getNext(10n) + + expect(() => manager.getNext()).toThrowError( + /Cannot mix manual and auto seqNo modes/ + ) +}) + +test('manual mode enforces strictly increasing seqNo', () => { + let manager = new SeqNoManager() + + manager.getNext(10n) + + expect(() => manager.getNext(10n)).toThrowError( + /SeqNo must be strictly increasing/ + ) +}) diff --git a/packages/topic/src/writer2/seqno-resolver.test.ts b/packages/topic/src/writer2/seqno-resolver.test.ts new file mode 100644 index 00000000..17f5360d --- /dev/null +++ b/packages/topic/src/writer2/seqno-resolver.test.ts @@ -0,0 +1,87 @@ +import { expect, test } from 'vitest' +import { SeqNoResolver } from './seqno-resolver.js' + +test('returns original seqNo when no shifts recorded', () => { + let resolver = new SeqNoResolver() + + expect(resolver.resolveSeqNo(1n)).toBe(1n) + expect(resolver.resolveSeqNo(123n)).toBe(123n) +}) + +test('resolves seqNo inside a single shift', () => { + let resolver = new SeqNoResolver() + resolver.applyShift({ start: 10n, end: 13n, delta: 100n }) + + expect(resolver.resolveSeqNo(10n)).toBe(110n) + expect(resolver.resolveSeqNo(12n)).toBe(112n) + expect(resolver.resolveSeqNo(13n)).toBe(13n) +}) + +test('ignores zero-length and zero-delta shifts', () => { + let resolver = new SeqNoResolver() + resolver.applyShift({ start: 5n, end: 5n, delta: 99n }) + resolver.applyShift({ start: 7n, end: 9n, delta: 0n }) + + expect(resolver.getShifts().length).toBe(0) +}) + +test('merges adjacent shifts with identical delta', () => { + let resolver = new SeqNoResolver() + resolver.applyShift({ start: 1n, end: 3n, delta: 100n }) + resolver.applyShift({ start: 3n, end: 6n, delta: 100n }) + + let shifts = resolver.getShifts() + expect(shifts.length).toBe(1) + expect(shifts[0]).toEqual({ start: 1n, end: 6n, delta: 100n }) +}) + +test('adds new segment when delta changes', () => { + let resolver = new SeqNoResolver() + resolver.applyShift({ start: 1n, end: 4n, delta: 100n }) + resolver.applyShift({ start: 4n, end: 6n, delta: 50n }) + + let shifts = resolver.getShifts() + expect(shifts.length).toBe(2) + expect(shifts[0]).toEqual({ start: 1n, end: 4n, delta: 100n }) + expect(shifts[1]).toEqual({ start: 4n, end: 6n, delta: 50n }) +}) + +test('composes sequential shifts to final seqNo', () => { + let resolver = new SeqNoResolver() + resolver.applyShift({ start: 1n, end: 4n, delta: 100n }) + resolver.applyShift({ start: 101n, end: 104n, delta: 100n }) + + expect(resolver.resolveSeqNo(1n)).toBe(201n) + expect(resolver.resolveSeqNo(3n)).toBe(203n) +}) + +test('applyShifts helper applies list in order', () => { + let resolver = new SeqNoResolver() + resolver.applyShifts([ + { start: 1n, end: 3n, delta: 100n }, + { start: 3n, end: 5n, delta: 150n }, + ]) + + expect(resolver.getShifts()).toEqual([ + { start: 1n, end: 3n, delta: 100n }, + { start: 3n, end: 5n, delta: 150n }, + ]) +}) + +test('throws when overlapping shift is applied', () => { + let resolver = new SeqNoResolver() + resolver.applyShift({ start: 1n, end: 5n, delta: 100n }) + + expect(() => resolver.applyShift({ start: 4n, end: 7n, delta: 50n })).toThrowError( + /Internal error: overlapping seqNo shifts detected/ + ) +}) + +test('reset clears all recorded shifts', () => { + let resolver = new SeqNoResolver() + resolver.applyShift({ start: 1n, end: 5n, delta: 100n }) + resolver.reset() + + expect(resolver.getShifts().length).toBe(0) + expect(resolver.resolveSeqNo(2n)).toBe(2n) +}) diff --git a/packages/topic/src/writer2/seqno-resolver.ts b/packages/topic/src/writer2/seqno-resolver.ts new file mode 100644 index 00000000..50d6f96b --- /dev/null +++ b/packages/topic/src/writer2/seqno-resolver.ts @@ -0,0 +1,100 @@ +import type { SeqNoShift } from './types.js' + +/** + * Resolves temporary sequence numbers to final sequence numbers. + * + * When messages are written before session initialization or after reconnection, + * their seqNo values may be temporary and get recalculated. This resolver maintains + * a collection of shifts that describe how temporary seqNo map to final seqNo. + * + * Example: + * shift { start: 10n, end: 13n, delta: 100n } + * means temporary seqNo 10, 11, 12 were finally stored as 110, 111, 112 respectively. + */ +export class SeqNoResolver { + #shifts: SeqNoShift[] = [] + + /** + * Apply new shifts from state machine. + * Merges new shifts with existing ones, handling overlaps and intersections. + */ + applyShifts(newShifts: SeqNoShift[]): void { + for (let shift of newShifts) { + this.applyShift(shift) + } + } + + /** + * Apply a single shift. + * + * Invariant: shifts are emitted strictly in order of old seqNo, without overlaps. + */ + applyShift(shift: SeqNoShift): void { + let { start, end, delta } = shift + if (start >= end || delta === 0n) { + return + } + + let last = this.#shifts[this.#shifts.length - 1] + if (!last) { + this.#shifts.push({ start, end, delta }) + return + } + + if (start < last.end) { + throw new Error('Internal error: overlapping seqNo shifts detected') + } + + if (start === last.end && delta === last.delta) { + last.end = end + return + } + + this.#shifts.push({ start, end, delta }) + } + + /** + * Resolve final seqNo for a temporary seqNo. + * @param initialSeqNo Temporary seqNo returned by write() + * @returns Final seqNo assigned after session re-initialization + */ + resolveSeqNo(initialSeqNo: bigint): bigint { + let result = initialSeqNo + + while (true) { + let matched = false + + for (let segment of this.#shifts) { + if (result >= segment.start && result < segment.end) { + let next = result + segment.delta + if (next === result) { + return result + } + result = next + matched = true + break + } + } + + if (!matched) { + break + } + } + + return result + } + + /** + * Clear all shifts. + */ + reset(): void { + this.#shifts = [] + } + + /** + * Get current shifts (for testing/debugging). + */ + getShifts(): readonly SeqNoShift[] { + return this.#shifts + } +} diff --git a/packages/topic/src/writer2/seqno-shift-builder.test.ts b/packages/topic/src/writer2/seqno-shift-builder.test.ts new file mode 100644 index 00000000..c502a915 --- /dev/null +++ b/packages/topic/src/writer2/seqno-shift-builder.test.ts @@ -0,0 +1,144 @@ +import { expect, test } from 'vitest' +import { SeqNoShiftBuilder } from './seqno-shift-builder.js' + +test('builds empty shifts array when no shifts added', () => { + let builder = new SeqNoShiftBuilder() + let shifts = builder.build() + + expect(shifts.length).toBe(0) +}) + +test('builds single shift for one message', () => { + let builder = new SeqNoShiftBuilder() + builder.addShift(1n, 101n) + let shifts = builder.build() + + expect(shifts.length).toBe(1) + expect(shifts[0]?.start).toBe(1n) + expect(shifts[0]?.end).toBe(2n) + expect(shifts[0]?.delta).toBe(100n) +}) + +test('merges consecutive shifts with same delta', () => { + let builder = new SeqNoShiftBuilder() + builder.addShift(1n, 101n) + builder.addShift(2n, 102n) + builder.addShift(3n, 103n) + let shifts = builder.build() + + expect(shifts.length).toBe(1) + expect(shifts[0]?.start).toBe(1n) + expect(shifts[0]?.end).toBe(4n) + expect(shifts[0]?.delta).toBe(100n) +}) + +test('creates separate shifts for different deltas', () => { + let builder = new SeqNoShiftBuilder() + builder.addShift(1n, 101n) // delta = 100 + builder.addShift(2n, 102n) // delta = 100 + builder.addShift(5n, 210n) // delta = 205 (different!) + let shifts = builder.build() + + expect(shifts.length).toBe(2) + expect(shifts[0]?.start).toBe(1n) + expect(shifts[0]?.end).toBe(3n) + expect(shifts[0]?.delta).toBe(100n) + expect(shifts[1]?.start).toBe(5n) + expect(shifts[1]?.end).toBe(6n) + expect(shifts[1]?.delta).toBe(205n) +}) + +test('creates separate shifts for non-consecutive seqNo', () => { + let builder = new SeqNoShiftBuilder() + builder.addShift(1n, 101n) // delta = 100 + builder.addShift(2n, 102n) // delta = 100 + builder.addShift(5n, 105n) // delta = 100, but gap in seqNo + let shifts = builder.build() + + expect(shifts.length).toBe(2) + expect(shifts[0]?.start).toBe(1n) + expect(shifts[0]?.end).toBe(3n) + expect(shifts[0]?.delta).toBe(100n) + expect(shifts[1]?.start).toBe(5n) + expect(shifts[1]?.end).toBe(6n) + expect(shifts[1]?.delta).toBe(100n) +}) + +test('ignores shifts where oldSeqNo equals newSeqNo', () => { + let builder = new SeqNoShiftBuilder() + builder.addShift(1n, 101n) + builder.addShift(2n, 2n) // No shift + builder.addShift(3n, 103n) + let shifts = builder.build() + + expect(shifts.length).toBe(2) + expect(shifts[0]?.start).toBe(1n) + expect(shifts[0]?.end).toBe(2n) + expect(shifts[1]?.start).toBe(3n) + expect(shifts[1]?.end).toBe(4n) +}) + +test('handles negative deltas', () => { + let builder = new SeqNoShiftBuilder() + builder.addShift(100n, 50n) // delta = -50 + builder.addShift(101n, 51n) // delta = -50 + let shifts = builder.build() + + expect(shifts.length).toBe(1) + expect(shifts[0]?.start).toBe(100n) + expect(shifts[0]?.end).toBe(102n) + expect(shifts[0]?.delta).toBe(-50n) +}) + +test('flush can be called multiple times safely', () => { + let builder = new SeqNoShiftBuilder() + builder.addShift(1n, 101n) + builder.flush() + builder.flush() + builder.addShift(2n, 102n) + builder.flush() + let shifts = builder.build() + + expect(shifts.length).toBe(2) + expect(shifts[0]?.start).toBe(1n) + expect(shifts[1]?.start).toBe(2n) +}) + +test('reset clears all state', () => { + let builder = new SeqNoShiftBuilder() + builder.addShift(1n, 101n) + builder.addShift(2n, 102n) + builder.reset() + + expect(builder.build().length).toBe(0) + + builder.addShift(5n, 105n) + let shifts = builder.build() + expect(shifts.length).toBe(1) + expect(shifts[0]?.start).toBe(5n) +}) + +test('build can be called multiple times', () => { + let builder = new SeqNoShiftBuilder() + builder.addShift(1n, 101n) + + let shifts1 = builder.build() + let shifts2 = builder.build() + + expect(shifts1.length).toBe(1) + expect(shifts2.length).toBe(1) + expect(shifts1[0]?.start).toBe(shifts2[0]?.start) +}) + +test('handles large sequences', () => { + let builder = new SeqNoShiftBuilder() + for (let i = 0; i < 1000; i++) { + builder.addShift(BigInt(i + 1), BigInt(i + 1001)) + } + let shifts = builder.build() + + expect(shifts.length).toBe(1) + expect(shifts[0]?.start).toBe(1n) + expect(shifts[0]?.end).toBe(1001n) + expect(shifts[0]?.delta).toBe(1000n) +}) diff --git a/packages/topic/src/writer2/seqno-shift-builder.ts b/packages/topic/src/writer2/seqno-shift-builder.ts new file mode 100644 index 00000000..2bd98f31 --- /dev/null +++ b/packages/topic/src/writer2/seqno-shift-builder.ts @@ -0,0 +1,82 @@ +import type { SeqNoShift } from './types.js' + +/** + * Builder for collecting sequence number shifts during message renumbering. + * + * Accumulates consecutive shifts with the same delta into compact ranges, + * reducing the number of shift segments that need to be tracked. + */ +export class SeqNoShiftBuilder { + #shifts: SeqNoShift[] = [] + #currentStart: bigint | null = null + #currentDelta: bigint | null = null + #currentCount = 0 + + /** + * Add a shift for a single message. + * Automatically merges consecutive shifts with the same delta. + * + * @param oldSeqNo Original sequence number + * @param newSeqNo New sequence number after recalculation + */ + addShift(oldSeqNo: bigint, newSeqNo: bigint): void { + if (oldSeqNo === newSeqNo) { + this.flush() + return + } + + let delta = newSeqNo - oldSeqNo + if ( + this.#currentStart !== null && + this.#currentDelta === delta && + oldSeqNo === this.#currentStart + BigInt(this.#currentCount) + ) { + // Continue current shift range + this.#currentCount++ + } else { + // Start new shift range + this.flush() + this.#currentStart = oldSeqNo + this.#currentDelta = delta + this.#currentCount = 1 + } + } + + /** + * Flush current shift range to the shifts array. + * Called automatically when starting a new range or when build() is called. + */ + flush(): void { + if (this.#currentStart !== null && this.#currentDelta !== null && this.#currentCount > 0) { + this.#shifts.push({ + start: this.#currentStart, + end: this.#currentStart + BigInt(this.#currentCount), + delta: this.#currentDelta, + }) + } + this.#currentStart = null + this.#currentDelta = null + this.#currentCount = 0 + } + + /** + * Build final array of shifts. + * Flushes any pending shift range before returning. + * + * @returns Array of shift ranges + */ + build(): SeqNoShift[] { + this.flush() + return this.#shifts + } + + /** + * Reset builder to initial state. + */ + reset(): void { + this.#shifts = [] + this.#currentStart = null + this.#currentDelta = null + this.#currentCount = 0 + } +} diff --git a/packages/topic/src/writer2/types.ts b/packages/topic/src/writer2/types.ts index 2230eeb6..9f00adea 100644 --- a/packages/topic/src/writer2/types.ts +++ b/packages/topic/src/writer2/types.ts @@ -113,9 +113,9 @@ export type MessageToSend = { metadataItems?: Record } -export type SeqNoShiftEvent = { - startOld: bigint - count: number +export type SeqNoShift = { + start: bigint + end: bigint delta: bigint } @@ -138,7 +138,7 @@ export type WriterEmitted = sessionId: string lastSeqNo: bigint nextSeqNo: bigint - seqNoShifts?: SeqNoShiftEvent[] + seqNoShifts?: SeqNoShift[] } | { type: 'writer.acknowledgments'; acknowledgments: Map } diff --git a/packages/topic/src/writer2/writer.ts b/packages/topic/src/writer2/writer.ts index cccfec0c..07a63b53 100644 --- a/packages/topic/src/writer2/writer.ts +++ b/packages/topic/src/writer2/writer.ts @@ -3,13 +3,8 @@ import type { Driver } from '@ydbjs/core' import { abortable } from '@ydbjs/abortable' import { WriterMachine } from './machine.js' import { SeqNoManager } from './seqno-manager.js' -import type { SeqNoShiftEvent, TopicWriterOptions } from './types.js' - -type SeqNoShiftSegment = { - start: bigint - end: bigint - delta: bigint -} +import { SeqNoResolver } from './seqno-resolver.js' +import type { TopicWriterOptions } from './types.js' export class TopicWriter implements AsyncDisposable { #actor: ActorRefFrom @@ -17,10 +12,11 @@ export class TopicWriter implements AsyncDisposable { #subscription: Subscription #seqNoManager: SeqNoManager #isSessionInitialized = false - #seqNoShifts: SeqNoShiftSegment[] = [] + #seqNoResolver: SeqNoResolver constructor(driver: Driver, options: TopicWriterOptions) { this.#seqNoManager = new SeqNoManager() + this.#seqNoResolver = new SeqNoResolver() this.#actor = createActor(WriterMachine, { input: { driver, options } }) // Subscribe to state changes for flush completions @@ -40,7 +36,7 @@ export class TopicWriter implements AsyncDisposable { // So lastSeqNo for SeqNoManager should be nextSeqNo - 1 let lastSeqNo = event.nextSeqNo - 1n if (event.seqNoShifts?.length) { - this.#applySeqNoShifts(event.seqNoShifts) + this.#seqNoResolver.applyShifts(event.seqNoShifts) } this.#seqNoManager.initialize(lastSeqNo) this.#isSessionInitialized = true @@ -60,194 +56,6 @@ export class TopicWriter implements AsyncDisposable { this.#actor.start() } - #applySeqNoShifts(shifts: SeqNoShiftEvent[]): void { - for (let shift of shifts) { - this.#applySeqNoShift(shift.startOld, shift.count, shift.delta) - } - } - - #applySeqNoShift(startOld: bigint, count: number, delta: bigint): void { - if (!count || delta === 0n) { - return - } - - let oldEnd = startOld + BigInt(count) - let updatedSegments: SeqNoShiftSegment[] = [] - let coveredOldRanges: Array<{ start: bigint; end: bigint }> = [] - - for (let segment of this.#seqNoShifts) { - let segStart = segment.start - let segEnd = segment.end - let segDelta = segment.delta - - let intersectionStart = segStart > startOld - segDelta ? segStart : startOld - segDelta - let intersectionEnd = segEnd < oldEnd - segDelta ? segEnd : oldEnd - segDelta - - if (intersectionStart < intersectionEnd) { - if (segStart < intersectionStart) { - updatedSegments.push({ start: segStart, end: intersectionStart, delta: segDelta }) - } - - updatedSegments.push({ start: intersectionStart, end: intersectionEnd, delta: segDelta + delta }) - - if (intersectionEnd < segEnd) { - updatedSegments.push({ start: intersectionEnd, end: segEnd, delta: segDelta }) - } - - let coveredStartOld = intersectionStart + segDelta - let coveredEndOld = intersectionEnd + segDelta - coveredOldRanges.push({ start: coveredStartOld, end: coveredEndOld }) - } else { - updatedSegments.push({ ...segment }) - } - } - - this.#seqNoShifts = this.#mergeShiftSegments(updatedSegments) - - let mergedCovered = this.#mergeRanges(coveredOldRanges, startOld, oldEnd) - let uncovered = this.#invertRanges(mergedCovered, startOld, oldEnd) - for (let range of uncovered) { - this.#seqNoShifts.push({ start: range.start, end: range.end, delta }) - } - - this.#seqNoShifts = this.#mergeShiftSegments(this.#seqNoShifts) - } - - #mergeShiftSegments(segments: SeqNoShiftSegment[]): SeqNoShiftSegment[] { - if (!segments.length) { - return [] - } - - let sorted: SeqNoShiftSegment[] = [] - for (let segment of segments) { - let entry = { ...segment } - let insertIndex = 0 - while (insertIndex < sorted.length && sorted[insertIndex]!.start <= entry.start) { - insertIndex++ - } - sorted.splice(insertIndex, 0, entry) - } - - let merged: SeqNoShiftSegment[] = [] - - for (let segment of sorted) { - let last = merged[merged.length - 1] - if (!last) { - merged.push({ ...segment }) - continue - } - - if (segment.start <= last.end) { - if (segment.delta === last.delta) { - if (segment.end > last.end) { - last.end = segment.end - } - continue - } - - if (segment.start < last.end) { - let adjustedStart = last.end - if (adjustedStart < segment.end) { - merged.push({ start: adjustedStart, end: segment.end, delta: segment.delta }) - } - continue - } - } - - if (segment.start === last.end && segment.delta === last.delta) { - last.end = segment.end - } else { - merged.push({ ...segment }) - } - } - - return merged - } - - #mergeRanges( - ranges: Array<{ start: bigint; end: bigint }>, - boundStart: bigint, - boundEnd: bigint - ): Array<{ start: bigint; end: bigint }> { - if (!ranges.length) { - return [] - } - - let filtered = ranges - .map((range) => { - let start = range.start > boundStart ? range.start : boundStart - let end = range.end < boundEnd ? range.end : boundEnd - return start < end ? { start, end } : null - }) - .filter((range): range is { start: bigint; end: bigint } => Boolean(range)) - - if (!filtered.length) { - return [] - } - - let sorted: Array<{ start: bigint; end: bigint }> = [] - for (let range of filtered) { - let entry = { ...range } - let insertIndex = 0 - while (insertIndex < sorted.length && sorted[insertIndex]!.start <= entry.start) { - insertIndex++ - } - sorted.splice(insertIndex, 0, entry) - } - - let merged: Array<{ start: bigint; end: bigint }> = [] - - for (let range of sorted) { - let last = merged[merged.length - 1] - if (!last) { - merged.push({ ...range }) - continue - } - - if (range.start <= last.end) { - if (range.end > last.end) { - last.end = range.end - } - } else { - merged.push({ ...range }) - } - } - - return merged - } - - #invertRanges( - ranges: Array<{ start: bigint; end: bigint }>, - boundStart: bigint, - boundEnd: bigint - ): Array<{ start: bigint; end: bigint }> { - if (boundStart >= boundEnd) { - return [] - } - - if (!ranges.length) { - return [{ start: boundStart, end: boundEnd }] - } - - let result: Array<{ start: bigint; end: bigint }> = [] - let cursor = boundStart - - for (let range of ranges) { - if (cursor < range.start) { - result.push({ start: cursor, end: range.start }) - } - if (cursor < range.end) { - cursor = range.end - } - } - - if (cursor < boundEnd) { - result.push({ start: cursor, end: boundEnd }) - } - - return result - } - /** * Write a message to the topic * @param data Message payload @@ -326,13 +134,7 @@ export class TopicWriter implements AsyncDisposable { * @returns Final seqNo assigned after session re-initialization */ resolveSeqNo(initialSeqNo: bigint): bigint { - for (let segment of this.#seqNoShifts) { - if (initialSeqNo >= segment.start && initialSeqNo < segment.end) { - return initialSeqNo + segment.delta - } - } - - return initialSeqNo + return this.#seqNoResolver.resolveSeqNo(initialSeqNo) } /** @@ -464,7 +266,7 @@ export class TopicWriter implements AsyncDisposable { // Reject any pending flush this.#promise?.reject(new Error('Writer was destroyed')) this.#promise = null - this.#seqNoShifts = [] + this.#seqNoResolver.reset() // Send destroy event (optional - for cleanup logic) this.#actor.send({ type: 'writer.destroy', ...(reason && { reason }) }) diff --git a/packages/topic/tests/writer2.test.ts b/packages/topic/tests/writer2.test.ts index e8d61375..e4fdbc0c 100644 --- a/packages/topic/tests/writer2.test.ts +++ b/packages/topic/tests/writer2.test.ts @@ -496,7 +496,7 @@ test('new writer continues seqno sequence after previous writer', { timeout: 15_ let producerId = `test-producer-${Date.now()}` // First writer: write messages - await using writer1 = await new TopicWriter(driver, { + await using writer1 = new TopicWriter(driver, { topic: testTopicName, producerId, }) @@ -532,6 +532,12 @@ test('new writer continues seqno sequence after previous writer', { timeout: 15_ // writer2 wrote 2 messages, so lastSeqNo should be writer1LastSeqNo + 2 expect(writer2LastSeqNo).toBe(writer1LastSeqNo + 2n) + // Ensure resolveSeqNo returns authoritative values for both messages + let firstSeqNo = writer2.resolveSeqNo(writer1SeqNo1) + let secondSeqNo = writer2.resolveSeqNo(writer1SeqNo2) + expect(firstSeqNo).toBe(writer1LastSeqNo + 1n) + expect(secondSeqNo).toBe(writer1LastSeqNo + 2n) + // Verify sequentiality: lastSeqNo should be exactly 2 more than writer1's lastSeqNo // (because writer2 wrote 2 messages) expect(writer2LastSeqNo).toBeGreaterThan(writer1LastSeqNo) From e9c3f78052404a79fa1e149f24f7b3b17e982590 Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Fri, 7 Nov 2025 21:11:31 +0300 Subject: [PATCH 3/4] Remove write() return value and simplify TopicWriter API - Fix seqNo renumbering bug: messages written before session initialization are now properly renumbered after receiving lastSeqNo from server - Remove return value from write() method (now returns void) to simplify API - Remove resolveSeqNo() method and related seqNo shift tracking infrastructure - Update tests to remove assertions on write() return values - Add changeset describing bug fix and API simplification --- .changeset/heavy-facts-happen.md | 22 --- .changeset/remove-write-seqno-return.md | 28 ++++ packages/topic/src/writer2/machine.ts | 15 +- .../topic/src/writer2/seqno-resolver.test.ts | 87 ----------- packages/topic/src/writer2/seqno-resolver.ts | 100 ------------ .../src/writer2/seqno-shift-builder.test.ts | 144 ------------------ .../topic/src/writer2/seqno-shift-builder.ts | 82 ---------- packages/topic/src/writer2/types.ts | 7 - packages/topic/src/writer2/writer.ts | 72 +-------- packages/topic/tests/writer2.test.ts | 49 +++--- vitest.setup.ydb.ts | 4 +- 11 files changed, 52 insertions(+), 558 deletions(-) delete mode 100644 .changeset/heavy-facts-happen.md create mode 100644 .changeset/remove-write-seqno-return.md delete mode 100644 packages/topic/src/writer2/seqno-resolver.test.ts delete mode 100644 packages/topic/src/writer2/seqno-resolver.ts delete mode 100644 packages/topic/src/writer2/seqno-shift-builder.test.ts delete mode 100644 packages/topic/src/writer2/seqno-shift-builder.ts diff --git a/.changeset/heavy-facts-happen.md b/.changeset/heavy-facts-happen.md deleted file mode 100644 index 919ae77d..00000000 --- a/.changeset/heavy-facts-happen.md +++ /dev/null @@ -1,22 +0,0 @@ ---- -'@ydbjs/topic': minor ---- - -- Add `resolveSeqNo()` method to `TopicWriter` to resolve temporary sequence numbers to final values after session re-initialization. Refactored seqNo tracking logic into `SeqNoResolver` and `SeqNoShiftBuilder` classes for better testability and maintainability. - -**Important:** The `write()` method now returns **temporary** seqNo values that may be recalculated after session initialization or reconnection. To get the final seqNo assigned by the server, use `resolveSeqNo()` after `flush()` completes. - -**New API:** - -- `TopicWriter.resolveSeqNo(initialSeqNo: bigint): bigint` - resolves temporary seqNo returned by `write()` to final seqNo assigned by server - -**Behavior changes:** - -- `write()` returns temporary seqNo (may change after session re-initialization) -- User-provided seqNo (via `extra.seqNo`) remain final and unchanged -- After `flush()` completes, all seqNo up to returned `lastSeqNo` are final - -**Migration guide:** - -- If you store seqNo immediately after `write()`, consider using `resolveSeqNo()` after `flush()` to get final values -- User-provided seqNo are always final and don't require `resolveSeqNo()` diff --git a/.changeset/remove-write-seqno-return.md b/.changeset/remove-write-seqno-return.md new file mode 100644 index 00000000..7a34175c --- /dev/null +++ b/.changeset/remove-write-seqno-return.md @@ -0,0 +1,28 @@ +--- +'@ydbjs/topic': minor +--- + +Fix seqNo renumbering bug and simplify TopicWriter API. + +**Bug fix:** + +- Fixed issue where messages written before session initialization were not renumbered after receiving `lastSeqNo` from server. Previously, auto-generated seqNo started from 0 and were not updated when server provided actual `lastSeqNo`, causing seqNo conflicts. Now messages are properly renumbered to continue from server's `lastSeqNo + 1`. + +**API changes:** + +- `TopicWriter.write()` no longer returns sequence number (now returns `void`) to simplify API and prevent confusion about temporary vs final seqNo values + +**Migration guide:** + +- If you were storing seqNo from `write()` return value, use `flush()` instead to get final seqNo: + + ```typescript + // Before + let seqNo = writer.write(data) + + // After + writer.write(data) + let lastSeqNo = await writer.flush() // Get final seqNo + ``` + +- User-provided seqNo (via `extra.seqNo`) remain final and unchanged - no migration needed for this case. diff --git a/packages/topic/src/writer2/machine.ts b/packages/topic/src/writer2/machine.ts index e8c62c22..1c096a03 100644 --- a/packages/topic/src/writer2/machine.ts +++ b/packages/topic/src/writer2/machine.ts @@ -37,7 +37,6 @@ import { isRetryableError } from '@ydbjs/retry' import { assign, enqueueActions, sendTo, setup } from 'xstate' import { defaultCodecMap } from '../codec.js' import { WriterStream, type WriterStreamReceiveEvent } from './stream.js' -import { SeqNoShiftBuilder } from './seqno-shift-builder.js' import type { TopicWriterOptions, WriterContext, WriterEmitted, WriterEvents, WriterInput } from './types.js' import { loggers } from '@ydbjs/debug' @@ -268,8 +267,7 @@ let writerMachineFactory = setup({ * * Mode-specific behaviour: * - Manual seqNo: compact the window, update bookkeeping, keep user-provided seqNo as-is - * - Auto seqNo: compact the window, renumber remaining messages, emit `SeqNoShift` segments so - * `TopicWriter.resolveSeqNo()` can translate temporary numbers into the final ones + * - Auto seqNo: compact the window, renumber remaining messages sequentially * * @param enqueue - XState enqueue helper for scheduling actions * @param event - init response with session metadata @@ -375,23 +373,15 @@ let writerMachineFactory = setup({ let newBufferStart = firstPendingIndex - let shiftBuilder = new SeqNoShiftBuilder() - // Renumber the remaining messages sequentially so we continue where the server left off. for (let i = firstPendingIndex; i < bufferEndIndex; i++) { let message = context.messages[i] if (!message) continue - let oldSeqNo = message.seqNo - let newSeqNo = nextSeqNo + message.seqNo = nextSeqNo nextSeqNo++ - - shiftBuilder.addShift(oldSeqNo, newSeqNo) - message.seqNo = newSeqNo } - let seqNoShifts = shiftBuilder.build() - let inflightSize = context.inflightSize - acknowledgedSize - pendingSize let bufferSize = context.bufferSize + pendingSize let garbageSize = context.garbageSize + acknowledgedSize @@ -413,7 +403,6 @@ let writerMachineFactory = setup({ sessionId: event.data.sessionId, lastSeqNo: lastSeqNo, nextSeqNo, - ...(seqNoShifts.length ? { seqNoShifts } : {}), })) }), diff --git a/packages/topic/src/writer2/seqno-resolver.test.ts b/packages/topic/src/writer2/seqno-resolver.test.ts deleted file mode 100644 index 17f5360d..00000000 --- a/packages/topic/src/writer2/seqno-resolver.test.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { expect, test } from 'vitest' -import { SeqNoResolver } from './seqno-resolver.js' - -test('returns original seqNo when no shifts recorded', () => { - let resolver = new SeqNoResolver() - - expect(resolver.resolveSeqNo(1n)).toBe(1n) - expect(resolver.resolveSeqNo(123n)).toBe(123n) -}) - -test('resolves seqNo inside a single shift', () => { - let resolver = new SeqNoResolver() - resolver.applyShift({ start: 10n, end: 13n, delta: 100n }) - - expect(resolver.resolveSeqNo(10n)).toBe(110n) - expect(resolver.resolveSeqNo(12n)).toBe(112n) - expect(resolver.resolveSeqNo(13n)).toBe(13n) -}) - -test('ignores zero-length and zero-delta shifts', () => { - let resolver = new SeqNoResolver() - resolver.applyShift({ start: 5n, end: 5n, delta: 99n }) - resolver.applyShift({ start: 7n, end: 9n, delta: 0n }) - - expect(resolver.getShifts().length).toBe(0) -}) - -test('merges adjacent shifts with identical delta', () => { - let resolver = new SeqNoResolver() - resolver.applyShift({ start: 1n, end: 3n, delta: 100n }) - resolver.applyShift({ start: 3n, end: 6n, delta: 100n }) - - let shifts = resolver.getShifts() - expect(shifts.length).toBe(1) - expect(shifts[0]).toEqual({ start: 1n, end: 6n, delta: 100n }) -}) - -test('adds new segment when delta changes', () => { - let resolver = new SeqNoResolver() - resolver.applyShift({ start: 1n, end: 4n, delta: 100n }) - resolver.applyShift({ start: 4n, end: 6n, delta: 50n }) - - let shifts = resolver.getShifts() - expect(shifts.length).toBe(2) - expect(shifts[0]).toEqual({ start: 1n, end: 4n, delta: 100n }) - expect(shifts[1]).toEqual({ start: 4n, end: 6n, delta: 50n }) -}) - -test('composes sequential shifts to final seqNo', () => { - let resolver = new SeqNoResolver() - resolver.applyShift({ start: 1n, end: 4n, delta: 100n }) - resolver.applyShift({ start: 101n, end: 104n, delta: 100n }) - - expect(resolver.resolveSeqNo(1n)).toBe(201n) - expect(resolver.resolveSeqNo(3n)).toBe(203n) -}) - -test('applyShifts helper applies list in order', () => { - let resolver = new SeqNoResolver() - resolver.applyShifts([ - { start: 1n, end: 3n, delta: 100n }, - { start: 3n, end: 5n, delta: 150n }, - ]) - - expect(resolver.getShifts()).toEqual([ - { start: 1n, end: 3n, delta: 100n }, - { start: 3n, end: 5n, delta: 150n }, - ]) -}) - -test('throws when overlapping shift is applied', () => { - let resolver = new SeqNoResolver() - resolver.applyShift({ start: 1n, end: 5n, delta: 100n }) - - expect(() => resolver.applyShift({ start: 4n, end: 7n, delta: 50n })).toThrowError( - /Internal error: overlapping seqNo shifts detected/ - ) -}) - -test('reset clears all recorded shifts', () => { - let resolver = new SeqNoResolver() - resolver.applyShift({ start: 1n, end: 5n, delta: 100n }) - resolver.reset() - - expect(resolver.getShifts().length).toBe(0) - expect(resolver.resolveSeqNo(2n)).toBe(2n) -}) diff --git a/packages/topic/src/writer2/seqno-resolver.ts b/packages/topic/src/writer2/seqno-resolver.ts deleted file mode 100644 index 50d6f96b..00000000 --- a/packages/topic/src/writer2/seqno-resolver.ts +++ /dev/null @@ -1,100 +0,0 @@ -import type { SeqNoShift } from './types.js' - -/** - * Resolves temporary sequence numbers to final sequence numbers. - * - * When messages are written before session initialization or after reconnection, - * their seqNo values may be temporary and get recalculated. This resolver maintains - * a collection of shifts that describe how temporary seqNo map to final seqNo. - * - * Example: - * shift { start: 10n, end: 13n, delta: 100n } - * means temporary seqNo 10, 11, 12 were finally stored as 110, 111, 112 respectively. - */ -export class SeqNoResolver { - #shifts: SeqNoShift[] = [] - - /** - * Apply new shifts from state machine. - * Merges new shifts with existing ones, handling overlaps and intersections. - */ - applyShifts(newShifts: SeqNoShift[]): void { - for (let shift of newShifts) { - this.applyShift(shift) - } - } - - /** - * Apply a single shift. - * - * Invariant: shifts are emitted strictly in order of old seqNo, without overlaps. - */ - applyShift(shift: SeqNoShift): void { - let { start, end, delta } = shift - if (start >= end || delta === 0n) { - return - } - - let last = this.#shifts[this.#shifts.length - 1] - if (!last) { - this.#shifts.push({ start, end, delta }) - return - } - - if (start < last.end) { - throw new Error('Internal error: overlapping seqNo shifts detected') - } - - if (start === last.end && delta === last.delta) { - last.end = end - return - } - - this.#shifts.push({ start, end, delta }) - } - - /** - * Resolve final seqNo for a temporary seqNo. - * @param initialSeqNo Temporary seqNo returned by write() - * @returns Final seqNo assigned after session re-initialization - */ - resolveSeqNo(initialSeqNo: bigint): bigint { - let result = initialSeqNo - - while (true) { - let matched = false - - for (let segment of this.#shifts) { - if (result >= segment.start && result < segment.end) { - let next = result + segment.delta - if (next === result) { - return result - } - result = next - matched = true - break - } - } - - if (!matched) { - break - } - } - - return result - } - - /** - * Clear all shifts. - */ - reset(): void { - this.#shifts = [] - } - - /** - * Get current shifts (for testing/debugging). - */ - getShifts(): readonly SeqNoShift[] { - return this.#shifts - } -} diff --git a/packages/topic/src/writer2/seqno-shift-builder.test.ts b/packages/topic/src/writer2/seqno-shift-builder.test.ts deleted file mode 100644 index c502a915..00000000 --- a/packages/topic/src/writer2/seqno-shift-builder.test.ts +++ /dev/null @@ -1,144 +0,0 @@ -import { expect, test } from 'vitest' -import { SeqNoShiftBuilder } from './seqno-shift-builder.js' - -test('builds empty shifts array when no shifts added', () => { - let builder = new SeqNoShiftBuilder() - let shifts = builder.build() - - expect(shifts.length).toBe(0) -}) - -test('builds single shift for one message', () => { - let builder = new SeqNoShiftBuilder() - builder.addShift(1n, 101n) - let shifts = builder.build() - - expect(shifts.length).toBe(1) - expect(shifts[0]?.start).toBe(1n) - expect(shifts[0]?.end).toBe(2n) - expect(shifts[0]?.delta).toBe(100n) -}) - -test('merges consecutive shifts with same delta', () => { - let builder = new SeqNoShiftBuilder() - builder.addShift(1n, 101n) - builder.addShift(2n, 102n) - builder.addShift(3n, 103n) - let shifts = builder.build() - - expect(shifts.length).toBe(1) - expect(shifts[0]?.start).toBe(1n) - expect(shifts[0]?.end).toBe(4n) - expect(shifts[0]?.delta).toBe(100n) -}) - -test('creates separate shifts for different deltas', () => { - let builder = new SeqNoShiftBuilder() - builder.addShift(1n, 101n) // delta = 100 - builder.addShift(2n, 102n) // delta = 100 - builder.addShift(5n, 210n) // delta = 205 (different!) - let shifts = builder.build() - - expect(shifts.length).toBe(2) - expect(shifts[0]?.start).toBe(1n) - expect(shifts[0]?.end).toBe(3n) - expect(shifts[0]?.delta).toBe(100n) - expect(shifts[1]?.start).toBe(5n) - expect(shifts[1]?.end).toBe(6n) - expect(shifts[1]?.delta).toBe(205n) -}) - -test('creates separate shifts for non-consecutive seqNo', () => { - let builder = new SeqNoShiftBuilder() - builder.addShift(1n, 101n) // delta = 100 - builder.addShift(2n, 102n) // delta = 100 - builder.addShift(5n, 105n) // delta = 100, but gap in seqNo - let shifts = builder.build() - - expect(shifts.length).toBe(2) - expect(shifts[0]?.start).toBe(1n) - expect(shifts[0]?.end).toBe(3n) - expect(shifts[0]?.delta).toBe(100n) - expect(shifts[1]?.start).toBe(5n) - expect(shifts[1]?.end).toBe(6n) - expect(shifts[1]?.delta).toBe(100n) -}) - -test('ignores shifts where oldSeqNo equals newSeqNo', () => { - let builder = new SeqNoShiftBuilder() - builder.addShift(1n, 101n) - builder.addShift(2n, 2n) // No shift - builder.addShift(3n, 103n) - let shifts = builder.build() - - expect(shifts.length).toBe(2) - expect(shifts[0]?.start).toBe(1n) - expect(shifts[0]?.end).toBe(2n) - expect(shifts[1]?.start).toBe(3n) - expect(shifts[1]?.end).toBe(4n) -}) - -test('handles negative deltas', () => { - let builder = new SeqNoShiftBuilder() - builder.addShift(100n, 50n) // delta = -50 - builder.addShift(101n, 51n) // delta = -50 - let shifts = builder.build() - - expect(shifts.length).toBe(1) - expect(shifts[0]?.start).toBe(100n) - expect(shifts[0]?.end).toBe(102n) - expect(shifts[0]?.delta).toBe(-50n) -}) - -test('flush can be called multiple times safely', () => { - let builder = new SeqNoShiftBuilder() - builder.addShift(1n, 101n) - builder.flush() - builder.flush() - builder.addShift(2n, 102n) - builder.flush() - let shifts = builder.build() - - expect(shifts.length).toBe(2) - expect(shifts[0]?.start).toBe(1n) - expect(shifts[1]?.start).toBe(2n) -}) - -test('reset clears all state', () => { - let builder = new SeqNoShiftBuilder() - builder.addShift(1n, 101n) - builder.addShift(2n, 102n) - builder.reset() - - expect(builder.build().length).toBe(0) - - builder.addShift(5n, 105n) - let shifts = builder.build() - expect(shifts.length).toBe(1) - expect(shifts[0]?.start).toBe(5n) -}) - -test('build can be called multiple times', () => { - let builder = new SeqNoShiftBuilder() - builder.addShift(1n, 101n) - - let shifts1 = builder.build() - let shifts2 = builder.build() - - expect(shifts1.length).toBe(1) - expect(shifts2.length).toBe(1) - expect(shifts1[0]?.start).toBe(shifts2[0]?.start) -}) - -test('handles large sequences', () => { - let builder = new SeqNoShiftBuilder() - for (let i = 0; i < 1000; i++) { - builder.addShift(BigInt(i + 1), BigInt(i + 1001)) - } - let shifts = builder.build() - - expect(shifts.length).toBe(1) - expect(shifts[0]?.start).toBe(1n) - expect(shifts[0]?.end).toBe(1001n) - expect(shifts[0]?.delta).toBe(1000n) -}) diff --git a/packages/topic/src/writer2/seqno-shift-builder.ts b/packages/topic/src/writer2/seqno-shift-builder.ts deleted file mode 100644 index 2bd98f31..00000000 --- a/packages/topic/src/writer2/seqno-shift-builder.ts +++ /dev/null @@ -1,82 +0,0 @@ -import type { SeqNoShift } from './types.js' - -/** - * Builder for collecting sequence number shifts during message renumbering. - * - * Accumulates consecutive shifts with the same delta into compact ranges, - * reducing the number of shift segments that need to be tracked. - */ -export class SeqNoShiftBuilder { - #shifts: SeqNoShift[] = [] - #currentStart: bigint | null = null - #currentDelta: bigint | null = null - #currentCount = 0 - - /** - * Add a shift for a single message. - * Automatically merges consecutive shifts with the same delta. - * - * @param oldSeqNo Original sequence number - * @param newSeqNo New sequence number after recalculation - */ - addShift(oldSeqNo: bigint, newSeqNo: bigint): void { - if (oldSeqNo === newSeqNo) { - this.flush() - return - } - - let delta = newSeqNo - oldSeqNo - if ( - this.#currentStart !== null && - this.#currentDelta === delta && - oldSeqNo === this.#currentStart + BigInt(this.#currentCount) - ) { - // Continue current shift range - this.#currentCount++ - } else { - // Start new shift range - this.flush() - this.#currentStart = oldSeqNo - this.#currentDelta = delta - this.#currentCount = 1 - } - } - - /** - * Flush current shift range to the shifts array. - * Called automatically when starting a new range or when build() is called. - */ - flush(): void { - if (this.#currentStart !== null && this.#currentDelta !== null && this.#currentCount > 0) { - this.#shifts.push({ - start: this.#currentStart, - end: this.#currentStart + BigInt(this.#currentCount), - delta: this.#currentDelta, - }) - } - this.#currentStart = null - this.#currentDelta = null - this.#currentCount = 0 - } - - /** - * Build final array of shifts. - * Flushes any pending shift range before returning. - * - * @returns Array of shift ranges - */ - build(): SeqNoShift[] { - this.flush() - return this.#shifts - } - - /** - * Reset builder to initial state. - */ - reset(): void { - this.#shifts = [] - this.#currentStart = null - this.#currentDelta = null - this.#currentCount = 0 - } -} diff --git a/packages/topic/src/writer2/types.ts b/packages/topic/src/writer2/types.ts index 9f00adea..a2462515 100644 --- a/packages/topic/src/writer2/types.ts +++ b/packages/topic/src/writer2/types.ts @@ -113,12 +113,6 @@ export type MessageToSend = { metadataItems?: Record } -export type SeqNoShift = { - start: bigint - end: bigint - delta: bigint -} - // Events for the state machine export type WriterEvents = // User-initiated events @@ -138,7 +132,6 @@ export type WriterEmitted = sessionId: string lastSeqNo: bigint nextSeqNo: bigint - seqNoShifts?: SeqNoShift[] } | { type: 'writer.acknowledgments'; acknowledgments: Map } diff --git a/packages/topic/src/writer2/writer.ts b/packages/topic/src/writer2/writer.ts index 07a63b53..afa5ce2f 100644 --- a/packages/topic/src/writer2/writer.ts +++ b/packages/topic/src/writer2/writer.ts @@ -3,7 +3,6 @@ import type { Driver } from '@ydbjs/core' import { abortable } from '@ydbjs/abortable' import { WriterMachine } from './machine.js' import { SeqNoManager } from './seqno-manager.js' -import { SeqNoResolver } from './seqno-resolver.js' import type { TopicWriterOptions } from './types.js' export class TopicWriter implements AsyncDisposable { @@ -12,11 +11,9 @@ export class TopicWriter implements AsyncDisposable { #subscription: Subscription #seqNoManager: SeqNoManager #isSessionInitialized = false - #seqNoResolver: SeqNoResolver constructor(driver: Driver, options: TopicWriterOptions) { this.#seqNoManager = new SeqNoManager() - this.#seqNoResolver = new SeqNoResolver() this.#actor = createActor(WriterMachine, { input: { driver, options } }) // Subscribe to state changes for flush completions @@ -35,9 +32,6 @@ export class TopicWriter implements AsyncDisposable { // event.nextSeqNo is the next seqno that should be used for new messages // So lastSeqNo for SeqNoManager should be nextSeqNo - 1 let lastSeqNo = event.nextSeqNo - 1n - if (event.seqNoShifts?.length) { - this.#seqNoResolver.applyShifts(event.seqNoShifts) - } this.#seqNoManager.initialize(lastSeqNo) this.#isSessionInitialized = true }) @@ -60,42 +54,6 @@ export class TopicWriter implements AsyncDisposable { * Write a message to the topic * @param data Message payload * @param extra Optional message metadata - * @returns Sequence number of the message - * - * **⚠️ WARNING: Do NOT rely on returned seqNo for critical operations!** - * - * The returned seqNo may be a temporary value that gets recalculated after session initialization. - * This can lead to incorrect behavior if used for: - * - Message deduplication - * - Tracking message delivery - * - Database lookups by seqNo - * - Any operation that requires accurate seqNo values - * - * **When seqNo is temporary:** - * - Session is not yet initialized (first messages written before connection) - * - Writer reconnected after network issues - * - Auto-generated seqNo mode (not user-provided) - * - * **When seqNo is final:** - * - User-provided seqNo (via `extra.seqNo`) - always final, never recalculated - * - After `flush()` completes - all messages have been sent with final seqNo - * - * **Recommended usage:** - * ```typescript - * // ❌ BAD: Storing seqNo immediately - * let seqNo = writer.write(data) - * await saveToDatabase(seqNo) // May be wrong! - * - * // ✅ GOOD: Wait for flush to ensure seqNo is final - * writer.write(data) - * let lastSeqNo = await writer.flush() // All messages up to this seqNo are final - * await saveToDatabase(lastSeqNo) - * - * // ✅ GOOD: Use user-provided seqNo (always final) - * let mySeqNo = 100n - * writer.write(data, { seqNo: mySeqNo }) - * // mySeqNo is guaranteed to be final - * ``` */ write( data: Uint8Array, @@ -104,7 +62,7 @@ export class TopicWriter implements AsyncDisposable { createdAt?: Date metadataItems?: Record } - ): bigint { + ): void { // Get seqNo from SeqNoManager (handles auto/manual modes) let seqNo = this.#seqNoManager.getNext(extra?.seqNo) let seqNoState = this.#seqNoManager.getState() @@ -122,19 +80,6 @@ export class TopicWriter implements AsyncDisposable { }, seqNoMode, }) - - return seqNo - } - - /** - * Resolve final seqNo for a message that was written before session initialization - * or was retried after reconnection. - * - * @param initialSeqNo Temporary seqNo returned by write() - * @returns Final seqNo assigned after session re-initialization - */ - resolveSeqNo(initialSeqNo: bigint): bigint { - return this.#seqNoResolver.resolveSeqNo(initialSeqNo) } /** @@ -146,10 +91,9 @@ export class TopicWriter implements AsyncDisposable { * have been sent to the server with their final seqNo values. This is the safe way * to ensure seqNo accuracy for critical operations like deduplication or tracking. * - * **Getting final seqNo for specific messages:** + * **Getting final seqNo for messages:** * After `flush()` completes, all seqNo values up to the returned `lastSeqNo` are final. * If you need to track individual messages, you can: - * - Call `writer.resolveSeqNo(initialSeqNo)` to translate temporary numbers into final ones * - Use the order of `write()` calls to determine final seqNo (sequential after flush) * - Use user-provided seqNo (always final, never recalculated) */ @@ -207,16 +151,7 @@ export class TopicWriter implements AsyncDisposable { /** * Check if the writer session is initialized - * @returns true if session is initialized and seqNo values are final, false if they may be temporary - * - * **Usage:** - * ```typescript - * let seqNo = writer.write(data) - * if (!writer.isSessionInitialized) { - * // seqNo may be temporary, wait for flush before using it - * await writer.flush() - * } - * ``` + * @returns true if session is initialized, false otherwise */ get isSessionInitialized(): boolean { return this.#isSessionInitialized @@ -266,7 +201,6 @@ export class TopicWriter implements AsyncDisposable { // Reject any pending flush this.#promise?.reject(new Error('Writer was destroyed')) this.#promise = null - this.#seqNoResolver.reset() // Send destroy event (optional - for cleanup logic) this.#actor.send({ type: 'writer.destroy', ...(reason && { reason }) }) diff --git a/packages/topic/tests/writer2.test.ts b/packages/topic/tests/writer2.test.ts index e4fdbc0c..fa019bb2 100644 --- a/packages/topic/tests/writer2.test.ts +++ b/packages/topic/tests/writer2.test.ts @@ -406,20 +406,16 @@ test('messages written before initialization get correct seqno', async () => { }) // Write messages immediately after creating writer (before session initialization) - // These messages will get temporary seqno (1, 2, 3...) initially - let seqNo1 = writer.write(new TextEncoder().encode('Message 1')) - let seqNo2 = writer.write(new TextEncoder().encode('Message 2')) - let seqNo3 = writer.write(new TextEncoder().encode('Message 3')) + writer.write(new TextEncoder().encode('Message 1')) + writer.write(new TextEncoder().encode('Message 2')) + writer.write(new TextEncoder().encode('Message 3')) // Wait for initialization and flush let lastSeqNo = await writer.flush() // After initialization, seqno should be recalculated based on server's lastSeqNo // So final seqno should be sequential and start from serverLastSeqNo + 1 - expect(seqNo1).toBeGreaterThan(0n) - expect(seqNo2).toBe(seqNo1 + 1n) - expect(seqNo3).toBe(seqNo2 + 1n) - expect(lastSeqNo).toBe(seqNo3) + expect(lastSeqNo).toBeGreaterThan(0n) // Verify messages were written by reading them await using reader = createTopicReader(driver, { @@ -428,11 +424,11 @@ test('messages written before initialization get correct seqno', async () => { }) let messagesRead = 0 - let seqNos = new Set() + let seqNos: bigint[] = [] for await (let batch of reader.read({ limit: 10, waitMs: 2000 })) { for (let msg of batch) { - seqNos.add(msg.seqNo) + seqNos.push(msg.seqNo) let content = new TextDecoder().decode(msg.payload) expect(['Message 1', 'Message 2', 'Message 3']).toContain(content) messagesRead++ @@ -446,10 +442,11 @@ test('messages written before initialization get correct seqno', async () => { } expect(messagesRead).toBe(3) - // Verify all seqno are present (messages were not dropped) - expect(seqNos.has(seqNo1)).toBe(true) - expect(seqNos.has(seqNo2)).toBe(true) - expect(seqNos.has(seqNo3)).toBe(true) + // Verify seqno are sequential + expect(seqNos.length).toBe(3) + expect(seqNos[0]! + 1n).toBe(seqNos[1]) + expect(seqNos[1]! + 1n).toBe(seqNos[2]) + expect(seqNos[2]).toBe(lastSeqNo) }) test('messages written before initialization are not dropped', async () => { @@ -458,8 +455,8 @@ test('messages written before initialization are not dropped', async () => { producerId: `test-producer-${Date.now()}`, }) - // Write message immediately (will get temporary seqno = 1) - let seqNo = writer.write(new TextEncoder().encode('Test message')) + // Write message immediately + writer.write(new TextEncoder().encode('Test message')) // Flush to ensure message is sent await writer.flush() @@ -474,8 +471,8 @@ test('messages written before initialization are not dropped', async () => { let messagePayload: string | null = null for await (let batch of reader.read({ limit: 1, waitMs: 2000 })) { for (let msg of batch) { - if (msg.seqNo === seqNo) { - messagePayload = new TextDecoder().decode(msg.payload) + messagePayload = new TextDecoder().decode(msg.payload) + if (messagePayload === 'Test message') { messageFound = true break } @@ -501,12 +498,11 @@ test('new writer continues seqno sequence after previous writer', { timeout: 15_ producerId, }) - let writer1SeqNo1 = writer1.write(new TextEncoder().encode('Writer1 Message 1')) - let writer1SeqNo2 = writer1.write(new TextEncoder().encode('Writer1 Message 2')) + writer1.write(new TextEncoder().encode('Writer1 Message 1')) + writer1.write(new TextEncoder().encode('Writer1 Message 2')) let writer1LastSeqNo = await writer1.flush() - expect(writer1SeqNo2).toBe(writer1LastSeqNo) - expect(writer1SeqNo2).toBe(writer1SeqNo1 + 1n) + expect(writer1LastSeqNo).toBeGreaterThan(0n) // Wait a bit to ensure messages are committed on server await new Promise((resolve) => setTimeout(resolve, 500)) @@ -526,18 +522,9 @@ test('new writer continues seqno sequence after previous writer', { timeout: 15_ // Verify seqno are sequential and continue from writer1 // This is the key test: writer2 should get lastSeqNo from server and continue sequence // After initialization, seqno for writer2's messages should be recalculated from serverLastSeqNo - // Note: write() returns temporary seqno (1, 2, 3...) before initialization - // After initialization, seqno are recalculated, but write() return values don't change - // We verify correctness through flush() which returns the actual lastSeqNo after recalculation // writer2 wrote 2 messages, so lastSeqNo should be writer1LastSeqNo + 2 expect(writer2LastSeqNo).toBe(writer1LastSeqNo + 2n) - // Ensure resolveSeqNo returns authoritative values for both messages - let firstSeqNo = writer2.resolveSeqNo(writer1SeqNo1) - let secondSeqNo = writer2.resolveSeqNo(writer1SeqNo2) - expect(firstSeqNo).toBe(writer1LastSeqNo + 1n) - expect(secondSeqNo).toBe(writer1LastSeqNo + 2n) - // Verify sequentiality: lastSeqNo should be exactly 2 more than writer1's lastSeqNo // (because writer2 wrote 2 messages) expect(writer2LastSeqNo).toBeGreaterThan(writer1LastSeqNo) diff --git a/vitest.setup.ydb.ts b/vitest.setup.ydb.ts index 368491b0..1c054261 100644 --- a/vitest.setup.ydb.ts +++ b/vitest.setup.ydb.ts @@ -32,10 +32,8 @@ export async function setup(project: TestProject) { return } - let ports = ['2135', '2136', '8765', '9092'].map((port) => `--publish ${port}`).join(' ') - // prettier-ignore - let container = await $`docker run --rm --detach --hostname localhost --platform linux/amd64 ${ports} ydbplatform/local-ydb:25.2`.text() + let container = await $`docker run --rm --detach --hostname localhost --platform linux/amd64 --publish 2135 --publish 2136 --publish 8765 --publish 9092 ydbplatform/local-ydb:25.2`.text() containerID = container.trim() let signal = AbortSignal.timeout(30 * 1000) From a33d8b1d6f9e07bfe6f964887e7e3bc75f1aa8dc Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Fri, 7 Nov 2025 21:19:57 +0300 Subject: [PATCH 4/4] Fix seqNo renumbering when messages written before session initialization - Remove lastSeqNo update from _flush() - it was updating before messages were actually sent to server - Fix renumbering logic in _on_init_response to properly handle messages written before init - Check if messages in buffer need renumbering by comparing their seqNo with serverLastSeqNo - Never renumber messages if user provided seqNo (manual mode) - Update lastSeqNo only when session is initialized or new message is written, not on ACKs --- .changeset/remove-write-seqno-return.md | 3 +- packages/topic/src/writer/_flush.ts | 40 ++++---- packages/topic/src/writer/_init_reponse.ts | 101 ++++++++++++++----- packages/topic/src/writer/_write.ts | 72 +++++++------ packages/topic/src/writer/_write_response.ts | 51 ++++++---- packages/topic/src/writer/index.ts | 5 +- packages/topic/tests/writer.test.ts | 69 +++++++++++++ 7 files changed, 240 insertions(+), 101 deletions(-) diff --git a/.changeset/remove-write-seqno-return.md b/.changeset/remove-write-seqno-return.md index 7a34175c..162591cb 100644 --- a/.changeset/remove-write-seqno-return.md +++ b/.changeset/remove-write-seqno-return.md @@ -2,11 +2,12 @@ '@ydbjs/topic': minor --- -Fix seqNo renumbering bug and simplify TopicWriter API. +Fix seqNo renumbering bug in both writer implementations and simplify TopicWriter API. **Bug fix:** - Fixed issue where messages written before session initialization were not renumbered after receiving `lastSeqNo` from server. Previously, auto-generated seqNo started from 0 and were not updated when server provided actual `lastSeqNo`, causing seqNo conflicts. Now messages are properly renumbered to continue from server's `lastSeqNo + 1`. +- Fixed in both `writer` (legacy) and `writer2` implementations **API changes:** diff --git a/packages/topic/src/writer/_flush.ts b/packages/topic/src/writer/_flush.ts index 43020e59..ecc855e0 100644 --- a/packages/topic/src/writer/_flush.ts +++ b/packages/topic/src/writer/_flush.ts @@ -1,37 +1,37 @@ -import type { StreamWriteMessage_FromClient, StreamWriteMessage_WriteRequest_MessageData } from "@ydbjs/api/topic"; -import type { CompressionCodec } from "../codec.js"; -import type { AsyncPriorityQueue } from "../queue.js"; -import type { TX } from "../tx.js"; -import { _batch_messages } from "./_batch_messages.js"; -import { _emit_write_request } from "./_write_request.js"; -import type { ThroughputSettings } from "./types.js"; +import type { StreamWriteMessage_FromClient, StreamWriteMessage_WriteRequest_MessageData } from '@ydbjs/api/topic' +import type { CompressionCodec } from '../codec.js' +import type { AsyncPriorityQueue } from '../queue.js' +import type { TX } from '../tx.js' +import { _batch_messages } from './_batch_messages.js' +import { _emit_write_request } from './_write_request.js' +import type { ThroughputSettings } from './types.js' export const _flush = function flush(ctx: { readonly tx?: TX - readonly queue: AsyncPriorityQueue, - readonly codec: CompressionCodec, // Codec to use for compression - readonly buffer: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages in the buffer - readonly inflight: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight - readonly throughputSettings: ThroughputSettings; - updateBufferSize: (bytes: bigint) => void; // Function to update the buffer size + readonly queue: AsyncPriorityQueue + readonly codec: CompressionCodec // Codec to use for compression + readonly buffer: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages in the buffer + readonly inflight: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight + readonly throughputSettings: ThroughputSettings + updateBufferSize: (bytes: bigint) => void // Function to update the buffer size }) { if (!ctx.buffer.length) { - return; // Nothing to flush + return // Nothing to flush } - let messagesToSend: StreamWriteMessage_WriteRequest_MessageData[] = []; + let messagesToSend: StreamWriteMessage_WriteRequest_MessageData[] = [] while (ctx.inflight.length < ctx.throughputSettings.maxInflightCount) { - let message = ctx.buffer.shift(); + let message = ctx.buffer.shift() if (!message) { - break; // No more messages to send + break // No more messages to send } - ctx.inflight.push(message); - messagesToSend.push(message); + ctx.inflight.push(message) + messagesToSend.push(message) } for (let batch of _batch_messages(messagesToSend)) { - _emit_write_request(ctx, batch); // Emit the write request with the batch of messages + _emit_write_request(ctx, batch) // Emit the write request with the batch of messages } } diff --git a/packages/topic/src/writer/_init_reponse.ts b/packages/topic/src/writer/_init_reponse.ts index 97d2876c..ecea78e3 100644 --- a/packages/topic/src/writer/_init_reponse.ts +++ b/packages/topic/src/writer/_init_reponse.ts @@ -1,35 +1,84 @@ -import type { StreamWriteMessage_FromClient, StreamWriteMessage_InitResponse, StreamWriteMessage_WriteRequest_MessageData } from "@ydbjs/api/topic"; -import type { CompressionCodec } from "../codec.js"; -import type { AsyncPriorityQueue } from "../queue.js"; -import type { TX } from "../tx.js"; -import { _flush } from "./_flush.js"; -import type { ThroughputSettings } from "./types.js"; +import type { + StreamWriteMessage_FromClient, + StreamWriteMessage_InitResponse, + StreamWriteMessage_WriteRequest_MessageData, +} from '@ydbjs/api/topic' +import type { CompressionCodec } from '../codec.js' +import type { AsyncPriorityQueue } from '../queue.js' +import type { TX } from '../tx.js' +import { _flush } from './_flush.js' +import type { ThroughputSettings } from './types.js' -export const _on_init_response = function on_init_response(ctx: { - readonly tx?: TX - readonly queue: AsyncPriorityQueue, - readonly codec: CompressionCodec, // Codec to use for compression - readonly buffer: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages in the buffer - readonly inflight: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight - readonly lastSeqNo?: bigint; // The last sequence number acknowledged by the server - readonly throughputSettings: ThroughputSettings; // Current throughput settings for the writer - updateLastSeqNo: (seqNo: bigint) => void; - updateBufferSize: (bytes: bigint) => void; // Function to update the buffer size -}, input: StreamWriteMessage_InitResponse) { - if (!ctx.lastSeqNo) { - // Store the last sequence number from the server. - ctx.updateLastSeqNo(input.lastSeqNo); - } +export const _on_init_response = function on_init_response( + ctx: { + readonly tx?: TX + readonly queue: AsyncPriorityQueue + readonly codec: CompressionCodec // Codec to use for compression + readonly buffer: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages in the buffer + readonly inflight: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight + readonly lastSeqNo?: bigint // The last sequence number acknowledged by the server + readonly throughputSettings: ThroughputSettings // Current throughput settings for the writer + readonly isSeqNoProvided?: boolean // Whether user provided seqNo (manual mode) + updateLastSeqNo: (seqNo: bigint) => void + updateBufferSize: (bytes: bigint) => void // Function to update the buffer size + }, + input: StreamWriteMessage_InitResponse +) { + let serverLastSeqNo = input.lastSeqNo || 0n + let currentLastSeqNo = ctx.lastSeqNo + let isFirstInit = currentLastSeqNo === undefined + let lastSeqNoChanged = isFirstInit || currentLastSeqNo !== serverLastSeqNo + // Return inflight messages to buffer while (ctx.inflight.length > 0) { - const message = ctx.inflight.pop(); + const message = ctx.inflight.pop() if (!message) { - continue; + continue } - ctx.buffer.unshift(message); - ctx.updateBufferSize(BigInt(message.data.length)); + ctx.buffer.unshift(message) + ctx.updateBufferSize(BigInt(message.data.length)) + } + + // If this is the first initialization or server provided a new lastSeqNo, and we're in auto seqNo mode, + // renumber all messages in buffer to continue from serverLastSeqNo + 1 + // Always renumber on first init, even if currentLastSeqNo === serverLastSeqNo (messages written before init) + // Also renumber if there are messages in buffer that were written before init (their seqNo start from 1, not serverLastSeqNo + 1) + let finalLastSeqNo = serverLastSeqNo + let shouldRenumber = false + // Only renumber in auto mode (when user didn't provide seqNo) + if (!ctx.isSeqNoProvided && ctx.buffer.length > 0) { + if (isFirstInit) { + // First initialization: always renumber messages written before init + shouldRenumber = true + } else if (lastSeqNoChanged) { + // Reconnection: renumber if server's lastSeqNo changed + shouldRenumber = true + } else if (ctx.buffer.length > 0) { + // Check if messages in buffer were written before init (seqNo start from 1, not serverLastSeqNo + 1) + // If first message's seqNo is <= serverLastSeqNo, it was written before init and needs renumbering + let firstMessageSeqNo = ctx.buffer[0]?.seqNo + if (firstMessageSeqNo !== undefined && firstMessageSeqNo <= serverLastSeqNo) { + shouldRenumber = true + } + } + } + + if (shouldRenumber) { + let nextSeqNo = serverLastSeqNo + 1n + // Renumber all messages in buffer sequentially starting from serverLastSeqNo + 1 + for (let message of ctx.buffer) { + message.seqNo = nextSeqNo + nextSeqNo++ + } + // Update lastSeqNo to the last renumbered seqNo so flush() returns correct value + finalLastSeqNo = nextSeqNo - 1n + ctx.updateLastSeqNo(finalLastSeqNo) + } else if (lastSeqNoChanged) { + // Store the last sequence number from the server if we didn't renumber + ctx.updateLastSeqNo(serverLastSeqNo) } - _flush(ctx); // Flush the buffer to send any pending messages. + // Flush the buffer to send any pending messages + _flush(ctx) } diff --git a/packages/topic/src/writer/_write.ts b/packages/topic/src/writer/_write.ts index ec16dc8b..835c20fe 100644 --- a/packages/topic/src/writer/_write.ts +++ b/packages/topic/src/writer/_write.ts @@ -1,35 +1,41 @@ -import { create } from "@bufbuild/protobuf"; -import { timestampFromDate } from "@bufbuild/protobuf/wkt"; -import { type StreamWriteMessage_WriteRequest_MessageData, StreamWriteMessage_WriteRequest_MessageDataSchema } from "@ydbjs/api/topic"; -import type { CompressionCodec } from "../codec.js"; -import { _flush } from "./_flush.js"; -import { MAX_PAYLOAD_SIZE } from "./constants.js"; +import { create } from '@bufbuild/protobuf' +import { timestampFromDate } from '@bufbuild/protobuf/wkt' +import { + type StreamWriteMessage_WriteRequest_MessageData, + StreamWriteMessage_WriteRequest_MessageDataSchema, +} from '@ydbjs/api/topic' +import type { CompressionCodec } from '../codec.js' +import { _flush } from './_flush.js' +import { MAX_PAYLOAD_SIZE } from './constants.js' -export function _write(ctx: { - readonly codec: CompressionCodec, // Codec to use for compression - readonly buffer: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages in the buffer - readonly inflight: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight - readonly lastSeqNo: bigint, // Last sequence number used - updateLastSeqNo: (seqNo: bigint) => void; - updateBufferSize: (bytes: bigint) => void; // Function to update the buffer size -}, msg: { - data: Uint8Array, - seqNo?: bigint, - createdAt?: Date, - metadataItems?: Record -}): bigint { - let data = ctx.codec ? ctx.codec.compress(msg.data) : msg.data; +export function _write( + ctx: { + readonly codec: CompressionCodec // Codec to use for compression + readonly buffer: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages in the buffer + readonly inflight: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight + readonly lastSeqNo: bigint // Last sequence number used + updateLastSeqNo: (seqNo: bigint) => void + updateBufferSize: (bytes: bigint) => void // Function to update the buffer size + }, + msg: { + data: Uint8Array + seqNo?: bigint + createdAt?: Date + metadataItems?: Record + } +): bigint { + let data = ctx.codec ? ctx.codec.compress(msg.data) : msg.data // Validate the payload size, it should not exceed MAX_PAYLOAD_SIZE // This is a YDB limitation for single message size. if (data.length > MAX_PAYLOAD_SIZE) { - throw new Error(`Payload size exceeds ${Number(MAX_PAYLOAD_SIZE / (1024n * 1024n))}MiB limit.`); + throw new Error(`Payload size exceeds ${Number(MAX_PAYLOAD_SIZE / (1024n * 1024n))}MiB limit.`) } - let seqNo = msg.seqNo ?? ((ctx.lastSeqNo ?? 0n) + 1n); - let createdAt = timestampFromDate(msg.createdAt ?? new Date()); - let metadataItems = Object.entries(msg.metadataItems || {}).map(([key, value]) => ({ key, value })); - let uncompressedSize = BigInt(data.length); + let seqNo = msg.seqNo ?? (ctx.lastSeqNo ?? 0n) + 1n + let createdAt = timestampFromDate(msg.createdAt ?? new Date()) + let metadataItems = Object.entries(msg.metadataItems || {}).map(([key, value]) => ({ key, value })) + let uncompressedSize = BigInt(data.length) let message = create(StreamWriteMessage_WriteRequest_MessageDataSchema, { data, @@ -37,11 +43,17 @@ export function _write(ctx: { createdAt, metadataItems, uncompressedSize, - }); + }) + + ctx.buffer.push(message) // Store the message in the buffer + ctx.updateBufferSize(BigInt(data.length)) // Update the buffer size - ctx.buffer.push(message); // Store the message in the buffer - ctx.updateBufferSize(BigInt(data.length)); // Update the buffer size - ctx.updateLastSeqNo(seqNo); // Update the last sequence number + // Only update lastSeqNo if session is initialized (lastSeqNo is defined) + // For messages written before session initialization, lastSeqNo will be updated + // after renumbering in _on_init_response + if (ctx.lastSeqNo !== undefined) { + ctx.updateLastSeqNo(seqNo) + } - return seqNo; + return seqNo } diff --git a/packages/topic/src/writer/_write_response.ts b/packages/topic/src/writer/_write_response.ts index 2432cc28..ec78b873 100644 --- a/packages/topic/src/writer/_write_response.ts +++ b/packages/topic/src/writer/_write_response.ts @@ -1,38 +1,45 @@ -import type { StreamWriteMessage_FromClient, StreamWriteMessage_WriteRequest_MessageData, StreamWriteMessage_WriteResponse } from "@ydbjs/api/topic"; -import type { CompressionCodec } from "../codec.js"; -import type { AsyncPriorityQueue } from "../queue.js"; -import type { TX } from "../tx.js"; -import { _flush } from "./_flush.js"; -import type { ThroughputSettings } from "./types.js"; +import type { + StreamWriteMessage_FromClient, + StreamWriteMessage_WriteRequest_MessageData, + StreamWriteMessage_WriteResponse, +} from '@ydbjs/api/topic' +import type { CompressionCodec } from '../codec.js' +import type { AsyncPriorityQueue } from '../queue.js' +import type { TX } from '../tx.js' +import { _flush } from './_flush.js' +import type { ThroughputSettings } from './types.js' -export const _on_write_response = function on_write_response(ctx: { - readonly tx?: TX - readonly queue: AsyncPriorityQueue, - readonly codec: CompressionCodec, // Codec to use for compression - readonly buffer: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight - readonly inflight: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight - readonly throughputSettings: ThroughputSettings; // Current throughput settings for the writer - onAck?: (seqNo: bigint, status?: 'skipped' | 'written' | 'writtenInTx') => void // Callback for handling acknowledgments - updateBufferSize: (bytes: bigint) => void; // Function to update the buffer size -}, input: StreamWriteMessage_WriteResponse) { +export const _on_write_response = function on_write_response( + ctx: { + readonly tx?: TX + readonly queue: AsyncPriorityQueue + readonly codec: CompressionCodec // Codec to use for compression + readonly buffer: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight + readonly inflight: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight + readonly throughputSettings: ThroughputSettings // Current throughput settings for the writer + onAck?: (seqNo: bigint, status?: 'skipped' | 'written' | 'writtenInTx') => void // Callback for handling acknowledgments + updateBufferSize: (bytes: bigint) => void // Function to update the buffer size + }, + input: StreamWriteMessage_WriteResponse +) { // Process each acknowledgment in the response. - let acks = new Map(); + let acks = new Map() for (let ack of input.acks) { - acks.set(ack.seqNo, ack.messageWriteStatus.case!); + acks.set(ack.seqNo, ack.messageWriteStatus.case!) } // Acknowledge messages that have been processed. for (let i = ctx.inflight.length - 1; i >= 0; i--) { - const message = ctx.inflight[i]!; + const message = ctx.inflight[i]! if (acks.has(message.seqNo)) { - ctx.onAck?.(message.seqNo, acks.get(message.seqNo)); - ctx.inflight.splice(i, 1); + ctx.onAck?.(message.seqNo, acks.get(message.seqNo)) + ctx.inflight.splice(i, 1) } } // Clear the acknowledgment map. - acks.clear(); + acks.clear() // If there are still messages in the buffer, flush them. _flush(ctx) diff --git a/packages/topic/src/writer/index.ts b/packages/topic/src/writer/index.ts index e6df1054..1b158b97 100644 --- a/packages/topic/src/writer/index.ts +++ b/packages/topic/src/writer/index.ts @@ -205,8 +205,9 @@ export const createTopicWriter = function createTopicWriter(driver: Driver, opti throughputSettings, updateLastSeqNo, updateBufferSize, + isSeqNoProvided, ...(options.tx && { tx: options.tx }), - ...(lastSeqNo && { lastSeqNo }) + ...(lastSeqNo && { lastSeqNo }), }, chunk.serverMessage.value ) @@ -503,4 +504,4 @@ export const createTopicTxWriter = function createTopicTxWriter( } // Re-export types for compatibility -export type { TopicTxWriter, TopicWriter, TopicWriterOptions } from "./types.js" +export type { TopicTxWriter, TopicWriter, TopicWriterOptions } from './types.js' diff --git a/packages/topic/tests/writer.test.ts b/packages/topic/tests/writer.test.ts index c720c5e6..d6a95075 100644 --- a/packages/topic/tests/writer.test.ts +++ b/packages/topic/tests/writer.test.ts @@ -4,6 +4,7 @@ import { create } from '@bufbuild/protobuf' import { CreateTopicRequestSchema, DropTopicRequestSchema, TopicServiceDefinition } from '@ydbjs/api/topic' import { Driver } from '@ydbjs/core' +import { createTopicReader } from '../src/reader/index.js' import { createTopicWriter } from '../src/writer/index.js' let driver = new Driver(inject('connectionString'), { @@ -56,3 +57,71 @@ test('writes single message to topic', async () => { expect(seqNo).toBe(lastSeqNo) }) + +test('messages written before initialization are properly renumbered', async () => { + let producerId = `test-producer-${Date.now()}` + + // First writer: write messages to establish a sequence + await using writer1 = createTopicWriter(driver, { + topic: testTopicName, + producer: producerId, + }) + + writer1.write(new TextEncoder().encode('Writer1 Message 1')) + writer1.write(new TextEncoder().encode('Writer1 Message 2')) + let writer1LastSeqNo = (await writer1.flush())! + + expect(writer1LastSeqNo).toBeGreaterThan(0n) + + // Wait a bit to ensure messages are committed on server + await new Promise((resolve) => setTimeout(resolve, 500)) + + writer1.destroy() + + // Create new writer with same producerId - should continue seqno sequence + await using writer2 = createTopicWriter(driver, { + topic: testTopicName, + producer: producerId, + }) + + // Write messages immediately (before session initialization) + // These should get seqno starting from writer1LastSeqNo + 1 after initialization + writer2.write(new TextEncoder().encode('Writer2 Message 1')) + writer2.write(new TextEncoder().encode('Writer2 Message 2')) + let writer2LastSeqNo = (await writer2.flush())! + + // Verify seqno are sequential and continue from writer1 + // writer2 wrote 2 messages, so lastSeqNo should be writer1LastSeqNo + 2 + expect(writer2LastSeqNo).toBe(writer1LastSeqNo + 2n) + + // Verify messages were written correctly by reading them + await using reader = createTopicReader(driver, { + topic: testTopicName, + consumer: testConsumerName, + }) + + let messagesRead = 0 + let foundSeqNos: bigint[] = [] + + for await (let batch of reader.read({ limit: 10, waitMs: 2000 })) { + for (let msg of batch) { + foundSeqNos.push(msg.seqNo) + messagesRead++ + } + + await reader.commit(batch) + + if (messagesRead >= 4) { + break + } + } + + expect(messagesRead).toBeGreaterThanOrEqual(4) + // Verify seqno are sequential - should start from 1 and continue + foundSeqNos.sort((a, b) => Number(a - b)) + expect(foundSeqNos[0]!).toBe(1n) + expect(foundSeqNos[foundSeqNos.length - 1]!).toBe(writer2LastSeqNo) + // Verify writer2's messages continue from writer1 + expect(foundSeqNos).toContain(writer1LastSeqNo + 1n) + expect(foundSeqNos).toContain(writer2LastSeqNo) +})