diff --git a/.changeset/remove-write-seqno-return.md b/.changeset/remove-write-seqno-return.md new file mode 100644 index 00000000..162591cb --- /dev/null +++ b/.changeset/remove-write-seqno-return.md @@ -0,0 +1,29 @@ +--- +'@ydbjs/topic': minor +--- + +Fix seqNo renumbering bug in both writer implementations and simplify TopicWriter API. + +**Bug fix:** + +- Fixed issue where messages written before session initialization were not renumbered after receiving `lastSeqNo` from server. Previously, auto-generated seqNo started from 0 and were not updated when server provided actual `lastSeqNo`, causing seqNo conflicts. Now messages are properly renumbered to continue from server's `lastSeqNo + 1`. +- Fixed in both `writer` (legacy) and `writer2` implementations + +**API changes:** + +- `TopicWriter.write()` no longer returns sequence number (now returns `void`) to simplify API and prevent confusion about temporary vs final seqNo values + +**Migration guide:** + +- If you were storing seqNo from `write()` return value, use `flush()` instead to get final seqNo: + + ```typescript + // Before + let seqNo = writer.write(data) + + // After + writer.write(data) + let lastSeqNo = await writer.flush() // Get final seqNo + ``` + +- User-provided seqNo (via `extra.seqNo`) remain final and unchanged - no migration needed for this case. diff --git a/packages/topic/package.json b/packages/topic/package.json index 3bc75faf..abcb676a 100644 --- a/packages/topic/package.json +++ b/packages/topic/package.json @@ -30,9 +30,11 @@ "types": "dist/index.d.ts", "exports": { ".": "./dist/index.js", + "./codec": "./dist/codec.js", "./reader": "./dist/reader/index.js", "./writer": "./dist/writer/index.js", - "./writer2": "./dist/writer2/index.js" + "./writer2": "./dist/writer2/index.js", + "./message": "./dist/message.js" }, "engines": { "node": ">=20.19.0", diff --git a/packages/topic/src/index.ts b/packages/topic/src/index.ts index 20d4a0d2..631019ef 100644 --- a/packages/topic/src/index.ts +++ b/packages/topic/src/index.ts @@ -1,32 +1,41 @@ -import { Driver } from "@ydbjs/core"; +import { Driver } from '@ydbjs/core' -import { type TopicReader, type TopicReaderOptions, type TopicTxReader, createTopicReader, createTopicTxReader } from "./reader/index.js"; -import type { TX } from "./tx.js"; -import { type TopicTxWriter, type TopicWriter, type TopicWriterOptions, createTopicTxWriter, createTopicWriter } from "./writer/index.js"; +import { + type TopicReader, + type TopicReaderOptions, + type TopicTxReader, + createTopicReader, + createTopicTxReader, +} from './reader/index.js' +import type { TX } from './tx.js' +import { + type TopicTxWriter, + type TopicWriter, + type TopicWriterOptions, + createTopicTxWriter, + createTopicWriter, +} from './writer/index.js' export interface TopicClient { - createReader(options: TopicReaderOptions): TopicReader; - createTxReader(tx: TX, options: TopicReaderOptions): TopicTxReader; - createWriter(options: TopicWriterOptions): TopicWriter; - createTxWriter(tx: TX, options: TopicWriterOptions): TopicTxWriter; + createReader(options: TopicReaderOptions): TopicReader + createTxReader(tx: TX, options: TopicReaderOptions): TopicTxReader + createWriter(options: TopicWriterOptions): TopicWriter + createTxWriter(tx: TX, options: TopicWriterOptions): TopicTxWriter } export function topic(driver: Driver): TopicClient { return { createReader(options) { - return createTopicReader(driver, options); + return createTopicReader(driver, options) }, createTxReader(tx: TX, options: TopicReaderOptions) { - return createTopicTxReader(tx, driver, options); + return createTopicTxReader(tx, driver, options) }, createWriter(options: TopicWriterOptions) { - return createTopicWriter(driver, options); + return createTopicWriter(driver, options) }, createTxWriter(tx: TX, options: Omit) { - return createTopicTxWriter(tx, driver, options); + return createTopicTxWriter(tx, driver, options) }, } as TopicClient } - -export type { TopicTxReader } from './reader/index.js'; -export type { TopicTxWriter } from './writer/index.js'; diff --git a/packages/topic/src/writer/_flush.ts b/packages/topic/src/writer/_flush.ts index 43020e59..ecc855e0 100644 --- a/packages/topic/src/writer/_flush.ts +++ b/packages/topic/src/writer/_flush.ts @@ -1,37 +1,37 @@ -import type { StreamWriteMessage_FromClient, StreamWriteMessage_WriteRequest_MessageData } from "@ydbjs/api/topic"; -import type { CompressionCodec } from "../codec.js"; -import type { AsyncPriorityQueue } from "../queue.js"; -import type { TX } from "../tx.js"; -import { _batch_messages } from "./_batch_messages.js"; -import { _emit_write_request } from "./_write_request.js"; -import type { ThroughputSettings } from "./types.js"; +import type { StreamWriteMessage_FromClient, StreamWriteMessage_WriteRequest_MessageData } from '@ydbjs/api/topic' +import type { CompressionCodec } from '../codec.js' +import type { AsyncPriorityQueue } from '../queue.js' +import type { TX } from '../tx.js' +import { _batch_messages } from './_batch_messages.js' +import { _emit_write_request } from './_write_request.js' +import type { ThroughputSettings } from './types.js' export const _flush = function flush(ctx: { readonly tx?: TX - readonly queue: AsyncPriorityQueue, - readonly codec: CompressionCodec, // Codec to use for compression - readonly buffer: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages in the buffer - readonly inflight: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight - readonly throughputSettings: ThroughputSettings; - updateBufferSize: (bytes: bigint) => void; // Function to update the buffer size + readonly queue: AsyncPriorityQueue + readonly codec: CompressionCodec // Codec to use for compression + readonly buffer: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages in the buffer + readonly inflight: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight + readonly throughputSettings: ThroughputSettings + updateBufferSize: (bytes: bigint) => void // Function to update the buffer size }) { if (!ctx.buffer.length) { - return; // Nothing to flush + return // Nothing to flush } - let messagesToSend: StreamWriteMessage_WriteRequest_MessageData[] = []; + let messagesToSend: StreamWriteMessage_WriteRequest_MessageData[] = [] while (ctx.inflight.length < ctx.throughputSettings.maxInflightCount) { - let message = ctx.buffer.shift(); + let message = ctx.buffer.shift() if (!message) { - break; // No more messages to send + break // No more messages to send } - ctx.inflight.push(message); - messagesToSend.push(message); + ctx.inflight.push(message) + messagesToSend.push(message) } for (let batch of _batch_messages(messagesToSend)) { - _emit_write_request(ctx, batch); // Emit the write request with the batch of messages + _emit_write_request(ctx, batch) // Emit the write request with the batch of messages } } diff --git a/packages/topic/src/writer/_init_reponse.ts b/packages/topic/src/writer/_init_reponse.ts index 97d2876c..ecea78e3 100644 --- a/packages/topic/src/writer/_init_reponse.ts +++ b/packages/topic/src/writer/_init_reponse.ts @@ -1,35 +1,84 @@ -import type { StreamWriteMessage_FromClient, StreamWriteMessage_InitResponse, StreamWriteMessage_WriteRequest_MessageData } from "@ydbjs/api/topic"; -import type { CompressionCodec } from "../codec.js"; -import type { AsyncPriorityQueue } from "../queue.js"; -import type { TX } from "../tx.js"; -import { _flush } from "./_flush.js"; -import type { ThroughputSettings } from "./types.js"; +import type { + StreamWriteMessage_FromClient, + StreamWriteMessage_InitResponse, + StreamWriteMessage_WriteRequest_MessageData, +} from '@ydbjs/api/topic' +import type { CompressionCodec } from '../codec.js' +import type { AsyncPriorityQueue } from '../queue.js' +import type { TX } from '../tx.js' +import { _flush } from './_flush.js' +import type { ThroughputSettings } from './types.js' -export const _on_init_response = function on_init_response(ctx: { - readonly tx?: TX - readonly queue: AsyncPriorityQueue, - readonly codec: CompressionCodec, // Codec to use for compression - readonly buffer: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages in the buffer - readonly inflight: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight - readonly lastSeqNo?: bigint; // The last sequence number acknowledged by the server - readonly throughputSettings: ThroughputSettings; // Current throughput settings for the writer - updateLastSeqNo: (seqNo: bigint) => void; - updateBufferSize: (bytes: bigint) => void; // Function to update the buffer size -}, input: StreamWriteMessage_InitResponse) { - if (!ctx.lastSeqNo) { - // Store the last sequence number from the server. - ctx.updateLastSeqNo(input.lastSeqNo); - } +export const _on_init_response = function on_init_response( + ctx: { + readonly tx?: TX + readonly queue: AsyncPriorityQueue + readonly codec: CompressionCodec // Codec to use for compression + readonly buffer: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages in the buffer + readonly inflight: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight + readonly lastSeqNo?: bigint // The last sequence number acknowledged by the server + readonly throughputSettings: ThroughputSettings // Current throughput settings for the writer + readonly isSeqNoProvided?: boolean // Whether user provided seqNo (manual mode) + updateLastSeqNo: (seqNo: bigint) => void + updateBufferSize: (bytes: bigint) => void // Function to update the buffer size + }, + input: StreamWriteMessage_InitResponse +) { + let serverLastSeqNo = input.lastSeqNo || 0n + let currentLastSeqNo = ctx.lastSeqNo + let isFirstInit = currentLastSeqNo === undefined + let lastSeqNoChanged = isFirstInit || currentLastSeqNo !== serverLastSeqNo + // Return inflight messages to buffer while (ctx.inflight.length > 0) { - const message = ctx.inflight.pop(); + const message = ctx.inflight.pop() if (!message) { - continue; + continue } - ctx.buffer.unshift(message); - ctx.updateBufferSize(BigInt(message.data.length)); + ctx.buffer.unshift(message) + ctx.updateBufferSize(BigInt(message.data.length)) + } + + // If this is the first initialization or server provided a new lastSeqNo, and we're in auto seqNo mode, + // renumber all messages in buffer to continue from serverLastSeqNo + 1 + // Always renumber on first init, even if currentLastSeqNo === serverLastSeqNo (messages written before init) + // Also renumber if there are messages in buffer that were written before init (their seqNo start from 1, not serverLastSeqNo + 1) + let finalLastSeqNo = serverLastSeqNo + let shouldRenumber = false + // Only renumber in auto mode (when user didn't provide seqNo) + if (!ctx.isSeqNoProvided && ctx.buffer.length > 0) { + if (isFirstInit) { + // First initialization: always renumber messages written before init + shouldRenumber = true + } else if (lastSeqNoChanged) { + // Reconnection: renumber if server's lastSeqNo changed + shouldRenumber = true + } else if (ctx.buffer.length > 0) { + // Check if messages in buffer were written before init (seqNo start from 1, not serverLastSeqNo + 1) + // If first message's seqNo is <= serverLastSeqNo, it was written before init and needs renumbering + let firstMessageSeqNo = ctx.buffer[0]?.seqNo + if (firstMessageSeqNo !== undefined && firstMessageSeqNo <= serverLastSeqNo) { + shouldRenumber = true + } + } + } + + if (shouldRenumber) { + let nextSeqNo = serverLastSeqNo + 1n + // Renumber all messages in buffer sequentially starting from serverLastSeqNo + 1 + for (let message of ctx.buffer) { + message.seqNo = nextSeqNo + nextSeqNo++ + } + // Update lastSeqNo to the last renumbered seqNo so flush() returns correct value + finalLastSeqNo = nextSeqNo - 1n + ctx.updateLastSeqNo(finalLastSeqNo) + } else if (lastSeqNoChanged) { + // Store the last sequence number from the server if we didn't renumber + ctx.updateLastSeqNo(serverLastSeqNo) } - _flush(ctx); // Flush the buffer to send any pending messages. + // Flush the buffer to send any pending messages + _flush(ctx) } diff --git a/packages/topic/src/writer/_write.ts b/packages/topic/src/writer/_write.ts index ec16dc8b..835c20fe 100644 --- a/packages/topic/src/writer/_write.ts +++ b/packages/topic/src/writer/_write.ts @@ -1,35 +1,41 @@ -import { create } from "@bufbuild/protobuf"; -import { timestampFromDate } from "@bufbuild/protobuf/wkt"; -import { type StreamWriteMessage_WriteRequest_MessageData, StreamWriteMessage_WriteRequest_MessageDataSchema } from "@ydbjs/api/topic"; -import type { CompressionCodec } from "../codec.js"; -import { _flush } from "./_flush.js"; -import { MAX_PAYLOAD_SIZE } from "./constants.js"; +import { create } from '@bufbuild/protobuf' +import { timestampFromDate } from '@bufbuild/protobuf/wkt' +import { + type StreamWriteMessage_WriteRequest_MessageData, + StreamWriteMessage_WriteRequest_MessageDataSchema, +} from '@ydbjs/api/topic' +import type { CompressionCodec } from '../codec.js' +import { _flush } from './_flush.js' +import { MAX_PAYLOAD_SIZE } from './constants.js' -export function _write(ctx: { - readonly codec: CompressionCodec, // Codec to use for compression - readonly buffer: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages in the buffer - readonly inflight: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight - readonly lastSeqNo: bigint, // Last sequence number used - updateLastSeqNo: (seqNo: bigint) => void; - updateBufferSize: (bytes: bigint) => void; // Function to update the buffer size -}, msg: { - data: Uint8Array, - seqNo?: bigint, - createdAt?: Date, - metadataItems?: Record -}): bigint { - let data = ctx.codec ? ctx.codec.compress(msg.data) : msg.data; +export function _write( + ctx: { + readonly codec: CompressionCodec // Codec to use for compression + readonly buffer: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages in the buffer + readonly inflight: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight + readonly lastSeqNo: bigint // Last sequence number used + updateLastSeqNo: (seqNo: bigint) => void + updateBufferSize: (bytes: bigint) => void // Function to update the buffer size + }, + msg: { + data: Uint8Array + seqNo?: bigint + createdAt?: Date + metadataItems?: Record + } +): bigint { + let data = ctx.codec ? ctx.codec.compress(msg.data) : msg.data // Validate the payload size, it should not exceed MAX_PAYLOAD_SIZE // This is a YDB limitation for single message size. if (data.length > MAX_PAYLOAD_SIZE) { - throw new Error(`Payload size exceeds ${Number(MAX_PAYLOAD_SIZE / (1024n * 1024n))}MiB limit.`); + throw new Error(`Payload size exceeds ${Number(MAX_PAYLOAD_SIZE / (1024n * 1024n))}MiB limit.`) } - let seqNo = msg.seqNo ?? ((ctx.lastSeqNo ?? 0n) + 1n); - let createdAt = timestampFromDate(msg.createdAt ?? new Date()); - let metadataItems = Object.entries(msg.metadataItems || {}).map(([key, value]) => ({ key, value })); - let uncompressedSize = BigInt(data.length); + let seqNo = msg.seqNo ?? (ctx.lastSeqNo ?? 0n) + 1n + let createdAt = timestampFromDate(msg.createdAt ?? new Date()) + let metadataItems = Object.entries(msg.metadataItems || {}).map(([key, value]) => ({ key, value })) + let uncompressedSize = BigInt(data.length) let message = create(StreamWriteMessage_WriteRequest_MessageDataSchema, { data, @@ -37,11 +43,17 @@ export function _write(ctx: { createdAt, metadataItems, uncompressedSize, - }); + }) + + ctx.buffer.push(message) // Store the message in the buffer + ctx.updateBufferSize(BigInt(data.length)) // Update the buffer size - ctx.buffer.push(message); // Store the message in the buffer - ctx.updateBufferSize(BigInt(data.length)); // Update the buffer size - ctx.updateLastSeqNo(seqNo); // Update the last sequence number + // Only update lastSeqNo if session is initialized (lastSeqNo is defined) + // For messages written before session initialization, lastSeqNo will be updated + // after renumbering in _on_init_response + if (ctx.lastSeqNo !== undefined) { + ctx.updateLastSeqNo(seqNo) + } - return seqNo; + return seqNo } diff --git a/packages/topic/src/writer/_write_response.ts b/packages/topic/src/writer/_write_response.ts index 2432cc28..ec78b873 100644 --- a/packages/topic/src/writer/_write_response.ts +++ b/packages/topic/src/writer/_write_response.ts @@ -1,38 +1,45 @@ -import type { StreamWriteMessage_FromClient, StreamWriteMessage_WriteRequest_MessageData, StreamWriteMessage_WriteResponse } from "@ydbjs/api/topic"; -import type { CompressionCodec } from "../codec.js"; -import type { AsyncPriorityQueue } from "../queue.js"; -import type { TX } from "../tx.js"; -import { _flush } from "./_flush.js"; -import type { ThroughputSettings } from "./types.js"; +import type { + StreamWriteMessage_FromClient, + StreamWriteMessage_WriteRequest_MessageData, + StreamWriteMessage_WriteResponse, +} from '@ydbjs/api/topic' +import type { CompressionCodec } from '../codec.js' +import type { AsyncPriorityQueue } from '../queue.js' +import type { TX } from '../tx.js' +import { _flush } from './_flush.js' +import type { ThroughputSettings } from './types.js' -export const _on_write_response = function on_write_response(ctx: { - readonly tx?: TX - readonly queue: AsyncPriorityQueue, - readonly codec: CompressionCodec, // Codec to use for compression - readonly buffer: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight - readonly inflight: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight - readonly throughputSettings: ThroughputSettings; // Current throughput settings for the writer - onAck?: (seqNo: bigint, status?: 'skipped' | 'written' | 'writtenInTx') => void // Callback for handling acknowledgments - updateBufferSize: (bytes: bigint) => void; // Function to update the buffer size -}, input: StreamWriteMessage_WriteResponse) { +export const _on_write_response = function on_write_response( + ctx: { + readonly tx?: TX + readonly queue: AsyncPriorityQueue + readonly codec: CompressionCodec // Codec to use for compression + readonly buffer: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight + readonly inflight: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight + readonly throughputSettings: ThroughputSettings // Current throughput settings for the writer + onAck?: (seqNo: bigint, status?: 'skipped' | 'written' | 'writtenInTx') => void // Callback for handling acknowledgments + updateBufferSize: (bytes: bigint) => void // Function to update the buffer size + }, + input: StreamWriteMessage_WriteResponse +) { // Process each acknowledgment in the response. - let acks = new Map(); + let acks = new Map() for (let ack of input.acks) { - acks.set(ack.seqNo, ack.messageWriteStatus.case!); + acks.set(ack.seqNo, ack.messageWriteStatus.case!) } // Acknowledge messages that have been processed. for (let i = ctx.inflight.length - 1; i >= 0; i--) { - const message = ctx.inflight[i]!; + const message = ctx.inflight[i]! if (acks.has(message.seqNo)) { - ctx.onAck?.(message.seqNo, acks.get(message.seqNo)); - ctx.inflight.splice(i, 1); + ctx.onAck?.(message.seqNo, acks.get(message.seqNo)) + ctx.inflight.splice(i, 1) } } // Clear the acknowledgment map. - acks.clear(); + acks.clear() // If there are still messages in the buffer, flush them. _flush(ctx) diff --git a/packages/topic/src/writer/index.ts b/packages/topic/src/writer/index.ts index e6df1054..1b158b97 100644 --- a/packages/topic/src/writer/index.ts +++ b/packages/topic/src/writer/index.ts @@ -205,8 +205,9 @@ export const createTopicWriter = function createTopicWriter(driver: Driver, opti throughputSettings, updateLastSeqNo, updateBufferSize, + isSeqNoProvided, ...(options.tx && { tx: options.tx }), - ...(lastSeqNo && { lastSeqNo }) + ...(lastSeqNo && { lastSeqNo }), }, chunk.serverMessage.value ) @@ -503,4 +504,4 @@ export const createTopicTxWriter = function createTopicTxWriter( } // Re-export types for compatibility -export type { TopicTxWriter, TopicWriter, TopicWriterOptions } from "./types.js" +export type { TopicTxWriter, TopicWriter, TopicWriterOptions } from './types.js' diff --git a/packages/topic/src/writer2/machine.ts b/packages/topic/src/writer2/machine.ts index ae897b2b..1c096a03 100644 --- a/packages/topic/src/writer2/machine.ts +++ b/packages/topic/src/writer2/machine.ts @@ -259,24 +259,150 @@ let writerMachineFactory = setup({ // ==================================================================== /** - * Updates session state after receiving an init response. - * Emits a session event with session ID and last sequence number. + * Updates the writer context after receiving STREAM_WRITE_SESSION_INIT. * - * @param enqueue - Enqueue function for scheduling actions - * @param event - Init response event containing session details + * Common groundwork for both seqNo modes: + * - Walk the `[inflight, buffer)` window once while keeping message order + * - Trim acked messages (seqNo <= `lastSeqNo`) and update sliding-window pointers + * + * Mode-specific behaviour: + * - Manual seqNo: compact the window, update bookkeeping, keep user-provided seqNo as-is + * - Auto seqNo: compact the window, renumber remaining messages sequentially + * + * @param enqueue - XState enqueue helper for scheduling actions + * @param event - init response with session metadata + * @param context - current state machine context */ - updateWriteSession: enqueueActions(({ enqueue, event }) => { + updateWriteSession: enqueueActions(({ enqueue, event, context }) => { assert.ok(event.type === 'writer.stream.response.init') assert.ok(event.data) + let lastSeqNo = event.data.lastSeqNo || 0n + let nextSeqNo = lastSeqNo + 1n + + // Count acknowledged messages and identify the new inflight window start so we can slide pointers in place. + let inflightStartIndex = context.inflightStart + let inflightEndIndex = context.inflightStart + context.inflightLength + let bufferEndIndex = context.bufferStart + context.bufferLength + + // Manual seqNo mode: drop acked entries and slide the window, seqNo stay untouched. + if (context.seqNoMode === 'manual') { + let acknowledgedSize = 0n + let pendingSize = 0n + let bufferSize = context.bufferSize + let firstKeptIndex: number | null = null + + // Single pass over [inflight, buffer): skip acknowledged items and record the first live message. + for (let i = inflightStartIndex; i < bufferEndIndex; i++) { + let message = context.messages[i] + if (!message) continue + + let messageSize = BigInt(message.data.length) + + // Messages already acknowledged by the server can be dropped from the sliding window. + if (message.seqNo <= lastSeqNo) { + acknowledgedSize += messageSize + if (i >= inflightEndIndex) { + // They came from buffer, so shrink buffer accounting as well. + bufferSize -= messageSize + } + continue + } + + // Remember the first index that still contains a message we need to keep. + if (firstKeptIndex === null) { + firstKeptIndex = i + } + + if (i < inflightEndIndex) { + // Anything left in inflight becomes pending work that must be resent. + pendingSize += messageSize + } + } + + let newBufferStart = firstKeptIndex ?? bufferEndIndex + let bufferLength = bufferEndIndex - newBufferStart + let inflightSize = context.inflightSize - (acknowledgedSize + pendingSize) + let garbageSize = context.garbageSize + acknowledgedSize + let newBufferSize = bufferSize + pendingSize + + enqueue.assign({ + sessionId: event.data.sessionId, + inflightStart: newBufferStart, + inflightLength: 0, + inflightSize, + bufferStart: newBufferStart, + bufferLength, + bufferSize: newBufferSize, + garbageSize, + }) + + enqueue.emit(() => ({ + type: 'writer.session', + sessionId: event.data.sessionId, + lastSeqNo, + nextSeqNo: lastSeqNo + 1n, + })) + + return + } + + // Auto seqNo mode: compact window then reassign seqNo for the remaining messages. + let firstPendingIndex = inflightEndIndex + let acknowledgedSize = 0n + let pendingCount = 0 + let pendingSize = 0n + + // Scan inflight messages to find the first one that still needs server confirmation and to measure how much + // data must move back into the buffer before we renumber everything. + for (let i = inflightStartIndex; i < inflightEndIndex; i++) { + let message = context.messages[i] + if (!message) continue + + if (firstPendingIndex === inflightEndIndex && message.seqNo > lastSeqNo) { + firstPendingIndex = i + } + + if (i < firstPendingIndex) { + acknowledgedSize += BigInt(message.data.length) + } else { + pendingCount++ + pendingSize += BigInt(message.data.length) + } + } + + let newBufferStart = firstPendingIndex + + // Renumber the remaining messages sequentially so we continue where the server left off. + for (let i = firstPendingIndex; i < bufferEndIndex; i++) { + let message = context.messages[i] + if (!message) continue + + message.seqNo = nextSeqNo + nextSeqNo++ + } + + let inflightSize = context.inflightSize - acknowledgedSize - pendingSize + let bufferSize = context.bufferSize + pendingSize + let garbageSize = context.garbageSize + acknowledgedSize + let bufferLength = pendingCount + context.bufferLength + enqueue.assign({ sessionId: event.data.sessionId, + inflightStart: newBufferStart, + inflightLength: 0, + inflightSize, + bufferStart: newBufferStart, + bufferLength, + bufferSize, + garbageSize, }) enqueue.emit(() => ({ type: 'writer.session', sessionId: event.data.sessionId, - lastSeqNo: event.data.lastSeqNo || 0n, + lastSeqNo: lastSeqNo, + nextSeqNo, })) }), @@ -304,7 +430,9 @@ let writerMachineFactory = setup({ if (context.inflightLength >= context.options.maxInflightCount!) { enqueue.emit(() => ({ type: 'writer.error', - error: new Error('Internal Error: Max inflight messages limit reached. If you see this error, please report it.'), + error: new Error( + 'Internal Error: Max inflight messages limit reached. If you see this error, please report it.' + ), })) return @@ -392,7 +520,7 @@ let writerMachineFactory = setup({ // Update inflight and garbage metrics based on acknowledgments enqueue.assign(({ context }) => { let removedSize = 0n - let removedCount = 0 + let removedLength = 0 // Move acknowledged messages to garbage for (let i = context.inflightStart; i < context.inflightStart + context.inflightLength; i++) { @@ -400,7 +528,7 @@ let writerMachineFactory = setup({ if (message && acks.has(message.seqNo)) { removedSize += BigInt(message.data.length) - removedCount++ + removedLength++ } } @@ -408,39 +536,35 @@ let writerMachineFactory = setup({ return { garbageSize: context.garbageSize + removedSize, inflightSize: context.inflightSize - removedSize, - inflightStart: context.inflightStart + removedCount, - inflightLength: context.inflightLength - removedCount, + inflightStart: context.inflightStart + removedLength, + inflightLength: context.inflightLength - removedLength, } }) // @ts-ignore if (check({ type: 'shouldReclaimMemory' })) { enqueue.assign(({ context }) => { - let removed = context.messages.splice(0, context.inflightStart) - let bufferStart = context.bufferStart - removed.length - - // Recalculate bufferSize using sliding window approach: - // buffer region is messages from bufferStart to bufferStart + bufferLength - let bufferSize = 0n - for (let i = bufferStart; i < bufferStart + context.bufferLength; i++) { - let message = context.messages[i] - if (message) { - bufferSize += BigInt(message.data.length) + let garbageLength = context.inflightStart + if (!garbageLength) { + return { + garbageSize: 0n, } } - // Update context pointers + context.messages.splice(0, garbageLength) + let bufferStart = context.bufferStart - garbageLength + + assert.ok(bufferStart >= 0) + return { messages: context.messages, garbageSize: 0n, inflightStart: 0, - bufferSize, bufferStart, } }) } - // @ts-ignore enqueue({ type: 'log', params: { message: 'ACK | {stats}' } }) }), @@ -458,18 +582,24 @@ let writerMachineFactory = setup({ if (event.message.data.length > MAX_PAYLOAD_SIZE) { enqueue.emit(() => ({ type: 'writer.error', - error: new Error('Internal Error: Payload size exceeds 48MiB limit. If you see this error, please report it.'), + error: new Error( + 'Internal Error: Payload size exceeds 48MiB limit. If you see this error, please report it.' + ), })) return } let createdAt = timestampFromDate(event.message.createdAt ?? new Date()) + let uncompressedSize = BigInt(event.message.data.length) let metadataItems = Object.entries(event.message.metadataItems || {}).map(([key, value]) => ({ key, value, })) - let uncompressedSize = BigInt(event.message.data.length) + + // Track seqNo mode (set once on first message, then remains constant) + // Mode is passed from TopicWriter which knows it from SeqNoManager + let seqNoMode: 'auto' | 'manual' | null = context.seqNoMode ?? event.seqNoMode ?? null let message = create(StreamWriteMessage_WriteRequest_MessageDataSchema, { data: event.message.data, @@ -483,8 +613,9 @@ let writerMachineFactory = setup({ context.messages.push(message) enqueue.assign(({ context }) => ({ + seqNoMode, bufferSize: context.bufferSize + BigInt(event.message.data.length), - bufferLength: context.bufferLength + 1 + bufferLength: context.bufferLength + 1, })) //@ts-ignore @@ -531,6 +662,7 @@ let writerMachineFactory = setup({ releaseResources: assign(() => { return { messages: [], + seqNoMode: null, bufferStart: 0, bufferLength: 0, inflightStart: 0, @@ -694,6 +826,7 @@ export const WriterMachine = writerMachineFactory.createMachine({ // Single array approach with sliding window messages: [], + seqNoMode: null, bufferStart: 0, bufferLength: 0, inflightStart: 0, @@ -716,12 +849,12 @@ export const WriterMachine = writerMachineFactory.createMachine({ on: { 'writer.close': { target: '.closing', - actions: [log('CLS | {topicPath}')] + actions: [log('CLS | {topicPath}')], }, 'writer.destroy': { // Force close, skip graceful shutdown target: '.closed', - actions: [log('DST | {topicPath}')] + actions: [log('DST | {topicPath}')], }, 'writer.stream.error': { // Enter error state on stream error @@ -738,8 +871,8 @@ export const WriterMachine = writerMachineFactory.createMachine({ idle: { always: { target: 'connecting', - actions: [log('INT | {topicPath}')] - } + actions: [log('INT | {topicPath}')], + }, }, /** * Connecting state: Establishes connection to the topic stream. @@ -941,11 +1074,7 @@ export const WriterMachine = writerMachineFactory.createMachine({ closed: { // All resources are released in this final state type: 'final', - entry: [ - 'closeConnection', - 'releaseResources', - log('FIN | {stats}'), - ], - } + entry: ['closeConnection', 'releaseResources', log('FIN | {stats}')], + }, }, }) diff --git a/packages/topic/src/writer2/seqno-manager.test.ts b/packages/topic/src/writer2/seqno-manager.test.ts new file mode 100644 index 00000000..1b836577 --- /dev/null +++ b/packages/topic/src/writer2/seqno-manager.test.ts @@ -0,0 +1,66 @@ +import { expect, test } from 'vitest' +import { SeqNoManager } from './seqno-manager.js' + +test('auto mode generates sequential numbers starting from initial + 1', () => { + let manager = new SeqNoManager(5n) + + expect(manager.getNext()).toBe(6n) + expect(manager.getNext()).toBe(7n) + + let state = manager.getState() + expect(state.mode).toBe('auto') + expect(state.nextSeqNo).toBe(8n) + expect(state.lastSeqNo).toBe(7n) +}) + +test('auto mode rejects manual seqNo once started', () => { + let manager = new SeqNoManager() + + manager.getNext() + + expect(() => manager.getNext(10n)).toThrowError( + /Cannot mix auto and manual seqNo modes/ + ) +}) + +test('initialize adjusts next seqNo in auto mode', () => { + let manager = new SeqNoManager() + manager.initialize(42n) + + expect(manager.getNext()).toBe(43n) + let state = manager.getState() + expect(state.lastSeqNo).toBe(43n) + expect(state.nextSeqNo).toBe(44n) +}) + +test('manual mode accepts strictly increasing user seqNo', () => { + let manager = new SeqNoManager() + + expect(manager.getNext(100n)).toBe(100n) + expect(manager.getNext(101n)).toBe(101n) + + let state = manager.getState() + expect(state.mode).toBe('manual') + expect(state.highestUserSeqNo).toBe(101n) + expect(state.lastSeqNo).toBe(101n) +}) + +test('manual mode rejects missing seqNo once started', () => { + let manager = new SeqNoManager() + + manager.getNext(10n) + + expect(() => manager.getNext()).toThrowError( + /Cannot mix manual and auto seqNo modes/ + ) +}) + +test('manual mode enforces strictly increasing seqNo', () => { + let manager = new SeqNoManager() + + manager.getNext(10n) + + expect(() => manager.getNext(10n)).toThrowError( + /SeqNo must be strictly increasing/ + ) +}) diff --git a/packages/topic/src/writer2/seqno-manager.ts b/packages/topic/src/writer2/seqno-manager.ts index 2a63388d..9e78d4ab 100644 --- a/packages/topic/src/writer2/seqno-manager.ts +++ b/packages/topic/src/writer2/seqno-manager.ts @@ -7,8 +7,8 @@ */ export class SeqNoManager { #mode: 'auto' | 'manual' | null = null - #nextSeqNo: bigint = 1n #lastSeqNo: bigint = 0n + #nextSeqNo: bigint = 1n #highestUserSeqNo: bigint = 0n constructor(initialSeqNo: bigint = 0n) { @@ -37,7 +37,9 @@ export class SeqNoManager { if (this.#mode === 'auto') { if (userSeqNo !== undefined) { - throw new Error('Cannot mix auto and manual seqNo modes. Once auto mode is started, all messages must use auto seqNo.') + throw new Error( + 'Cannot mix auto and manual seqNo modes. Once auto mode is started, all messages must use auto seqNo.' + ) } let seqNo = this.#nextSeqNo @@ -47,12 +49,16 @@ export class SeqNoManager { } else { // Manual mode if (userSeqNo === undefined) { - throw new Error('Cannot mix manual and auto seqNo modes. Once manual mode is started, all messages must provide seqNo.') + throw new Error( + 'Cannot mix manual and auto seqNo modes. Once manual mode is started, all messages must provide seqNo.' + ) } // Validate strictly increasing if (userSeqNo <= this.#highestUserSeqNo) { - throw new Error(`SeqNo must be strictly increasing. Provided: ${userSeqNo}, highest seen: ${this.#highestUserSeqNo}`) + throw new Error( + `SeqNo must be strictly increasing. Provided: ${userSeqNo}, highest seen: ${this.#highestUserSeqNo}` + ) } this.#highestUserSeqNo = userSeqNo diff --git a/packages/topic/src/writer2/types.ts b/packages/topic/src/writer2/types.ts index ae9ed3ae..a2462515 100644 --- a/packages/topic/src/writer2/types.ts +++ b/packages/topic/src/writer2/types.ts @@ -1,9 +1,9 @@ -import type { Driver } from "@ydbjs/core" -import type { ActorRef, CallbackSnapshot } from "xstate" -import type { CompressionCodec } from "../codec.js" -import type { TX } from "../tx.js" -import type { WriterStreamEmittedEvent, WriterStreamInput, WriterStreamReceiveEvent } from "./stream.ts" -import type { YDBDebugLogger } from "@ydbjs/debug" +import type { Driver } from '@ydbjs/core' +import type { ActorRef, CallbackSnapshot } from 'xstate' +import type { CompressionCodec } from '../codec.js' +import type { TX } from '../tx.js' +import type { WriterStreamEmittedEvent, WriterStreamInput, WriterStreamReceiveEvent } from './stream.ts' +import type { YDBDebugLogger } from '@ydbjs/debug' export type TopicWriterOptions = { // Transaction identity. @@ -79,7 +79,11 @@ export type WriterContext = { readonly sessionId?: string // Message buffers - single array with sliding window approach - readonly messages: import("@ydbjs/api/topic").StreamWriteMessage_WriteRequest_MessageData[] + readonly messages: import('@ydbjs/api/topic').StreamWriteMessage_WriteRequest_MessageData[] + + // Track seqNo mode: 'auto' means all seqno are auto-generated, 'manual' means all are user-provided + // Updated when first message is written, then remains constant (SeqNoManager enforces mode consistency) + readonly seqNoMode: 'auto' | 'manual' | null // Buffer window: [bufferStart, bufferStart + bufferLength) readonly bufferStart: number @@ -97,12 +101,14 @@ export type WriterContext = { readonly lastError?: unknown // Reference to the stream actor - readonly streamRef?: ActorRef, WriterStreamReceiveEvent, WriterStreamEmittedEvent> | undefined + readonly streamRef?: + | ActorRef, WriterStreamReceiveEvent, WriterStreamEmittedEvent> + | undefined } export type MessageToSend = { data: Uint8Array - seqNo: bigint // Now required - TopicWriter always provides it + seqNo: bigint // Always provided by TopicWriter (either auto-generated or user-provided) createdAt?: Date metadataItems?: Record } @@ -110,7 +116,7 @@ export type MessageToSend = { // Events for the state machine export type WriterEvents = // User-initiated events - | { type: 'writer.write'; message: MessageToSend } + | { type: 'writer.write'; message: MessageToSend; seqNoMode?: 'auto' | 'manual' } | { type: 'writer.flush' } | { type: 'writer.close' } | { type: 'writer.destroy'; reason?: unknown } @@ -121,20 +127,18 @@ export type WriterEvents = export type WriterEmitted = | { type: 'writer.error'; error: unknown } | { type: 'writer.close'; reason?: unknown } - | { type: 'writer.session'; sessionId: string; lastSeqNo: bigint } + | { + type: 'writer.session' + sessionId: string + lastSeqNo: bigint + nextSeqNo: bigint + } | { type: 'writer.acknowledgments'; acknowledgments: Map } export type WriterInput = { - driver: Driver; + driver: Driver options: TopicWriterOptions } // State machine states -export type WriterStates = - | 'connecting' - | 'connected' - | 'errored' - | 'writing' - | 'flushing' - | 'closing' - | 'destroyed' +export type WriterStates = 'connecting' | 'connected' | 'errored' | 'writing' | 'flushing' | 'closing' | 'destroyed' diff --git a/packages/topic/src/writer2/writer.ts b/packages/topic/src/writer2/writer.ts index 9c9088d9..afa5ce2f 100644 --- a/packages/topic/src/writer2/writer.ts +++ b/packages/topic/src/writer2/writer.ts @@ -10,6 +10,7 @@ export class TopicWriter implements AsyncDisposable { #promise: ReturnType> | null = null #subscription: Subscription #seqNoManager: SeqNoManager + #isSessionInitialized = false constructor(driver: Driver, options: TopicWriterOptions) { this.#seqNoManager = new SeqNoManager() @@ -27,7 +28,12 @@ export class TopicWriter implements AsyncDisposable { // Subscribe to emitted events for seqNo management this.#actor.on('writer.session', (event) => { - this.#seqNoManager.initialize(event.lastSeqNo) + // State machine already recalculated seqno for all buffered messages + // event.nextSeqNo is the next seqno that should be used for new messages + // So lastSeqNo for SeqNoManager should be nextSeqNo - 1 + let lastSeqNo = event.nextSeqNo - 1n + this.#seqNoManager.initialize(lastSeqNo) + this.#isSessionInitialized = true }) // Subscribe to error events @@ -48,15 +54,21 @@ export class TopicWriter implements AsyncDisposable { * Write a message to the topic * @param data Message payload * @param extra Optional message metadata - * @returns Sequence number of the message */ - write(data: Uint8Array, extra?: { - seqNo?: bigint - createdAt?: Date - metadataItems?: Record - }): bigint { + write( + data: Uint8Array, + extra?: { + seqNo?: bigint + createdAt?: Date + metadataItems?: Record + } + ): void { // Get seqNo from SeqNoManager (handles auto/manual modes) let seqNo = this.#seqNoManager.getNext(extra?.seqNo) + let seqNoState = this.#seqNoManager.getState() + // Determine mode for state machine (if not yet determined, determine it now) + let seqNoMode: 'auto' | 'manual' | undefined = + seqNoState.mode ?? (extra?.seqNo !== undefined ? 'manual' : 'auto') this.#actor.send({ type: 'writer.write', @@ -64,17 +76,26 @@ export class TopicWriter implements AsyncDisposable { data, seqNo, ...(extra?.createdAt && { createdAt: extra.createdAt }), - ...(extra?.metadataItems && { metadataItems: extra.metadataItems }) - } + ...(extra?.metadataItems && { metadataItems: extra.metadataItems }), + }, + seqNoMode, }) - - return seqNo } /** * Flush all buffered messages and wait for acknowledgment * @param signal Optional AbortSignal to cancel the flush operation * @returns Promise that resolves with the last acknowledged sequence number + * + * **Important:** After `flush()` completes, all messages written before this call + * have been sent to the server with their final seqNo values. This is the safe way + * to ensure seqNo accuracy for critical operations like deduplication or tracking. + * + * **Getting final seqNo for messages:** + * After `flush()` completes, all seqNo values up to the returned `lastSeqNo` are final. + * If you need to track individual messages, you can: + * - Use the order of `write()` calls to determine final seqNo (sequential after flush) + * - Use user-provided seqNo (always final, never recalculated) */ async flush(signal?: AbortSignal): Promise { // If there's already a flush in progress, return the same promise @@ -110,7 +131,8 @@ export class TopicWriter implements AsyncDisposable { /** * Get current writer statistics - */ get stats() { + */ + get stats() { let snapshot = this.#actor.getSnapshot() let seqNoState = this.#seqNoManager.getState() @@ -123,26 +145,53 @@ export class TopicWriter implements AsyncDisposable { bufferLength: snapshot.context.bufferLength, inflightSize: snapshot.context.inflightSize, inflightLength: snapshot.context.inflightLength, + isSessionInitialized: this.#isSessionInitialized, } } + /** + * Check if the writer session is initialized + * @returns true if session is initialized, false otherwise + */ + get isSessionInitialized(): boolean { + return this.#isSessionInitialized + } + /** * Close the writer gracefully, waiting for all messages to be sent */ async close(signal?: AbortSignal): Promise { + let snapshot = this.#actor.getSnapshot() + + // If already closed, return immediately + if (snapshot.value === 'closed') { + return + } + + // If actor is stopped, return immediately + if (snapshot.status === 'stopped') { + return + } + let { promise, resolve } = Promise.withResolvers() let subscription = this.#actor.subscribe((snapshot) => { - if (snapshot.value === 'closed') { + if (snapshot.value === 'closed' || snapshot.status === 'stopped') { resolve() } }) + // Check again before sending (actor might have stopped between checks) + snapshot = this.#actor.getSnapshot() + if (snapshot.value === 'closed' || snapshot.status === 'stopped') { + subscription.unsubscribe() + return + } + this.#actor.send({ type: 'writer.close' }) - return (signal ? abortable(signal, promise) : promise) - .finally(() => { - subscription.unsubscribe() - }) + return (signal ? abortable(signal, promise) : promise).finally(() => { + subscription.unsubscribe() + }) } /** @@ -167,8 +216,16 @@ export class TopicWriter implements AsyncDisposable { * AsyncDisposable implementation - graceful close with resource cleanup */ async [Symbol.asyncDispose](): Promise { - await this.close() - this.destroy() + // Try graceful close first + try { + await this.close() + } catch (error) { + // If close fails, force destroy + this.destroy(error as Error) + throw error + } + // After successful close, the actor is already stopped in closed state + // Just clean up subscription + this.#subscription.unsubscribe() } - } diff --git a/packages/topic/tests/writer.test.ts b/packages/topic/tests/writer.test.ts index c720c5e6..d6a95075 100644 --- a/packages/topic/tests/writer.test.ts +++ b/packages/topic/tests/writer.test.ts @@ -4,6 +4,7 @@ import { create } from '@bufbuild/protobuf' import { CreateTopicRequestSchema, DropTopicRequestSchema, TopicServiceDefinition } from '@ydbjs/api/topic' import { Driver } from '@ydbjs/core' +import { createTopicReader } from '../src/reader/index.js' import { createTopicWriter } from '../src/writer/index.js' let driver = new Driver(inject('connectionString'), { @@ -56,3 +57,71 @@ test('writes single message to topic', async () => { expect(seqNo).toBe(lastSeqNo) }) + +test('messages written before initialization are properly renumbered', async () => { + let producerId = `test-producer-${Date.now()}` + + // First writer: write messages to establish a sequence + await using writer1 = createTopicWriter(driver, { + topic: testTopicName, + producer: producerId, + }) + + writer1.write(new TextEncoder().encode('Writer1 Message 1')) + writer1.write(new TextEncoder().encode('Writer1 Message 2')) + let writer1LastSeqNo = (await writer1.flush())! + + expect(writer1LastSeqNo).toBeGreaterThan(0n) + + // Wait a bit to ensure messages are committed on server + await new Promise((resolve) => setTimeout(resolve, 500)) + + writer1.destroy() + + // Create new writer with same producerId - should continue seqno sequence + await using writer2 = createTopicWriter(driver, { + topic: testTopicName, + producer: producerId, + }) + + // Write messages immediately (before session initialization) + // These should get seqno starting from writer1LastSeqNo + 1 after initialization + writer2.write(new TextEncoder().encode('Writer2 Message 1')) + writer2.write(new TextEncoder().encode('Writer2 Message 2')) + let writer2LastSeqNo = (await writer2.flush())! + + // Verify seqno are sequential and continue from writer1 + // writer2 wrote 2 messages, so lastSeqNo should be writer1LastSeqNo + 2 + expect(writer2LastSeqNo).toBe(writer1LastSeqNo + 2n) + + // Verify messages were written correctly by reading them + await using reader = createTopicReader(driver, { + topic: testTopicName, + consumer: testConsumerName, + }) + + let messagesRead = 0 + let foundSeqNos: bigint[] = [] + + for await (let batch of reader.read({ limit: 10, waitMs: 2000 })) { + for (let msg of batch) { + foundSeqNos.push(msg.seqNo) + messagesRead++ + } + + await reader.commit(batch) + + if (messagesRead >= 4) { + break + } + } + + expect(messagesRead).toBeGreaterThanOrEqual(4) + // Verify seqno are sequential - should start from 1 and continue + foundSeqNos.sort((a, b) => Number(a - b)) + expect(foundSeqNos[0]!).toBe(1n) + expect(foundSeqNos[foundSeqNos.length - 1]!).toBe(writer2LastSeqNo) + // Verify writer2's messages continue from writer1 + expect(foundSeqNos).toContain(writer1LastSeqNo + 1n) + expect(foundSeqNos).toContain(writer2LastSeqNo) +}) diff --git a/packages/topic/tests/writer2.test.ts b/packages/topic/tests/writer2.test.ts index e8d61375..fa019bb2 100644 --- a/packages/topic/tests/writer2.test.ts +++ b/packages/topic/tests/writer2.test.ts @@ -406,20 +406,16 @@ test('messages written before initialization get correct seqno', async () => { }) // Write messages immediately after creating writer (before session initialization) - // These messages will get temporary seqno (1, 2, 3...) initially - let seqNo1 = writer.write(new TextEncoder().encode('Message 1')) - let seqNo2 = writer.write(new TextEncoder().encode('Message 2')) - let seqNo3 = writer.write(new TextEncoder().encode('Message 3')) + writer.write(new TextEncoder().encode('Message 1')) + writer.write(new TextEncoder().encode('Message 2')) + writer.write(new TextEncoder().encode('Message 3')) // Wait for initialization and flush let lastSeqNo = await writer.flush() // After initialization, seqno should be recalculated based on server's lastSeqNo // So final seqno should be sequential and start from serverLastSeqNo + 1 - expect(seqNo1).toBeGreaterThan(0n) - expect(seqNo2).toBe(seqNo1 + 1n) - expect(seqNo3).toBe(seqNo2 + 1n) - expect(lastSeqNo).toBe(seqNo3) + expect(lastSeqNo).toBeGreaterThan(0n) // Verify messages were written by reading them await using reader = createTopicReader(driver, { @@ -428,11 +424,11 @@ test('messages written before initialization get correct seqno', async () => { }) let messagesRead = 0 - let seqNos = new Set() + let seqNos: bigint[] = [] for await (let batch of reader.read({ limit: 10, waitMs: 2000 })) { for (let msg of batch) { - seqNos.add(msg.seqNo) + seqNos.push(msg.seqNo) let content = new TextDecoder().decode(msg.payload) expect(['Message 1', 'Message 2', 'Message 3']).toContain(content) messagesRead++ @@ -446,10 +442,11 @@ test('messages written before initialization get correct seqno', async () => { } expect(messagesRead).toBe(3) - // Verify all seqno are present (messages were not dropped) - expect(seqNos.has(seqNo1)).toBe(true) - expect(seqNos.has(seqNo2)).toBe(true) - expect(seqNos.has(seqNo3)).toBe(true) + // Verify seqno are sequential + expect(seqNos.length).toBe(3) + expect(seqNos[0]! + 1n).toBe(seqNos[1]) + expect(seqNos[1]! + 1n).toBe(seqNos[2]) + expect(seqNos[2]).toBe(lastSeqNo) }) test('messages written before initialization are not dropped', async () => { @@ -458,8 +455,8 @@ test('messages written before initialization are not dropped', async () => { producerId: `test-producer-${Date.now()}`, }) - // Write message immediately (will get temporary seqno = 1) - let seqNo = writer.write(new TextEncoder().encode('Test message')) + // Write message immediately + writer.write(new TextEncoder().encode('Test message')) // Flush to ensure message is sent await writer.flush() @@ -474,8 +471,8 @@ test('messages written before initialization are not dropped', async () => { let messagePayload: string | null = null for await (let batch of reader.read({ limit: 1, waitMs: 2000 })) { for (let msg of batch) { - if (msg.seqNo === seqNo) { - messagePayload = new TextDecoder().decode(msg.payload) + messagePayload = new TextDecoder().decode(msg.payload) + if (messagePayload === 'Test message') { messageFound = true break } @@ -496,17 +493,16 @@ test('new writer continues seqno sequence after previous writer', { timeout: 15_ let producerId = `test-producer-${Date.now()}` // First writer: write messages - await using writer1 = await new TopicWriter(driver, { + await using writer1 = new TopicWriter(driver, { topic: testTopicName, producerId, }) - let writer1SeqNo1 = writer1.write(new TextEncoder().encode('Writer1 Message 1')) - let writer1SeqNo2 = writer1.write(new TextEncoder().encode('Writer1 Message 2')) + writer1.write(new TextEncoder().encode('Writer1 Message 1')) + writer1.write(new TextEncoder().encode('Writer1 Message 2')) let writer1LastSeqNo = await writer1.flush() - expect(writer1SeqNo2).toBe(writer1LastSeqNo) - expect(writer1SeqNo2).toBe(writer1SeqNo1 + 1n) + expect(writer1LastSeqNo).toBeGreaterThan(0n) // Wait a bit to ensure messages are committed on server await new Promise((resolve) => setTimeout(resolve, 500)) @@ -526,9 +522,6 @@ test('new writer continues seqno sequence after previous writer', { timeout: 15_ // Verify seqno are sequential and continue from writer1 // This is the key test: writer2 should get lastSeqNo from server and continue sequence // After initialization, seqno for writer2's messages should be recalculated from serverLastSeqNo - // Note: write() returns temporary seqno (1, 2, 3...) before initialization - // After initialization, seqno are recalculated, but write() return values don't change - // We verify correctness through flush() which returns the actual lastSeqNo after recalculation // writer2 wrote 2 messages, so lastSeqNo should be writer1LastSeqNo + 2 expect(writer2LastSeqNo).toBe(writer1LastSeqNo + 2n) diff --git a/packages/topic/vitest.config.ts b/packages/topic/vitest.config.ts index d38da172..98cf2c9e 100644 --- a/packages/topic/vitest.config.ts +++ b/packages/topic/vitest.config.ts @@ -2,8 +2,34 @@ import { defineProject } from 'vitest/config' export default defineProject({ test: { - name: 'topic', - include: ['src/**/*.test.ts'], - environment: 'node', + projects: [ + { + test: { + name: { + label: 'uni', + color: 'yellow', + }, + include: ['./src/**/*.test.ts'], + environment: 'node', + benchmark: { + include: ['./src/**/*.bench.ts'], + }, + }, + }, + { + test: { + name: { + label: 'int', + color: 'blue', + }, + include: ['./tests/**/*.test.ts'], + environment: 'node', + globalSetup: '../../vitest.setup.ydb.ts', + benchmark: { + include: ['./tests/**/*.bench.ts'], + }, + }, + }, + ], }, }) diff --git a/vitest.setup.ydb.ts b/vitest.setup.ydb.ts index 368491b0..1c054261 100644 --- a/vitest.setup.ydb.ts +++ b/vitest.setup.ydb.ts @@ -32,10 +32,8 @@ export async function setup(project: TestProject) { return } - let ports = ['2135', '2136', '8765', '9092'].map((port) => `--publish ${port}`).join(' ') - // prettier-ignore - let container = await $`docker run --rm --detach --hostname localhost --platform linux/amd64 ${ports} ydbplatform/local-ydb:25.2`.text() + let container = await $`docker run --rm --detach --hostname localhost --platform linux/amd64 --publish 2135 --publish 2136 --publish 8765 --publish 9092 ydbplatform/local-ydb:25.2`.text() containerID = container.trim() let signal = AbortSignal.timeout(30 * 1000)