Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .changeset/remove-write-seqno-return.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
'@ydbjs/topic': minor
---

Fix seqNo renumbering bug in both writer implementations and simplify TopicWriter API.

**Bug fix:**

- Fixed issue where messages written before session initialization were not renumbered after receiving `lastSeqNo` from server. Previously, auto-generated seqNo started from 0 and were not updated when server provided actual `lastSeqNo`, causing seqNo conflicts. Now messages are properly renumbered to continue from server's `lastSeqNo + 1`.
- Fixed in both `writer` (legacy) and `writer2` implementations

**API changes:**

- `TopicWriter.write()` no longer returns sequence number (now returns `void`) to simplify API and prevent confusion about temporary vs final seqNo values

**Migration guide:**

- If you were storing seqNo from `write()` return value, use `flush()` instead to get final seqNo:

```typescript
// Before
let seqNo = writer.write(data)

// After
writer.write(data)
let lastSeqNo = await writer.flush() // Get final seqNo
```

- User-provided seqNo (via `extra.seqNo`) remain final and unchanged - no migration needed for this case.
4 changes: 3 additions & 1 deletion packages/topic/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
"types": "dist/index.d.ts",
"exports": {
".": "./dist/index.js",
"./codec": "./dist/codec.js",
"./reader": "./dist/reader/index.js",
"./writer": "./dist/writer/index.js",
"./writer2": "./dist/writer2/index.js"
"./writer2": "./dist/writer2/index.js",
"./message": "./dist/message.js"
},
"engines": {
"node": ">=20.19.0",
Expand Down
39 changes: 24 additions & 15 deletions packages/topic/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,41 @@
import { Driver } from "@ydbjs/core";
import { Driver } from '@ydbjs/core'

import { type TopicReader, type TopicReaderOptions, type TopicTxReader, createTopicReader, createTopicTxReader } from "./reader/index.js";
import type { TX } from "./tx.js";
import { type TopicTxWriter, type TopicWriter, type TopicWriterOptions, createTopicTxWriter, createTopicWriter } from "./writer/index.js";
import {
type TopicReader,
type TopicReaderOptions,
type TopicTxReader,
createTopicReader,
createTopicTxReader,
} from './reader/index.js'
import type { TX } from './tx.js'
import {
type TopicTxWriter,
type TopicWriter,
type TopicWriterOptions,
createTopicTxWriter,
createTopicWriter,
} from './writer/index.js'

export interface TopicClient {
createReader(options: TopicReaderOptions): TopicReader;
createTxReader(tx: TX, options: TopicReaderOptions): TopicTxReader;
createWriter(options: TopicWriterOptions): TopicWriter;
createTxWriter(tx: TX, options: TopicWriterOptions): TopicTxWriter;
createReader(options: TopicReaderOptions): TopicReader
createTxReader(tx: TX, options: TopicReaderOptions): TopicTxReader
createWriter(options: TopicWriterOptions): TopicWriter
createTxWriter(tx: TX, options: TopicWriterOptions): TopicTxWriter
}

export function topic(driver: Driver): TopicClient {
return {
createReader(options) {
return createTopicReader(driver, options);
return createTopicReader(driver, options)
},
createTxReader(tx: TX, options: TopicReaderOptions) {
return createTopicTxReader(tx, driver, options);
return createTopicTxReader(tx, driver, options)
},
createWriter(options: TopicWriterOptions) {
return createTopicWriter(driver, options);
return createTopicWriter(driver, options)
},
createTxWriter(tx: TX, options: Omit<TopicWriterOptions, 'tx'>) {
return createTopicTxWriter(tx, driver, options);
return createTopicTxWriter(tx, driver, options)
},
} as TopicClient
}

export type { TopicTxReader } from './reader/index.js';
export type { TopicTxWriter } from './writer/index.js';
40 changes: 20 additions & 20 deletions packages/topic/src/writer/_flush.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,37 @@
import type { StreamWriteMessage_FromClient, StreamWriteMessage_WriteRequest_MessageData } from "@ydbjs/api/topic";
import type { CompressionCodec } from "../codec.js";
import type { AsyncPriorityQueue } from "../queue.js";
import type { TX } from "../tx.js";
import { _batch_messages } from "./_batch_messages.js";
import { _emit_write_request } from "./_write_request.js";
import type { ThroughputSettings } from "./types.js";
import type { StreamWriteMessage_FromClient, StreamWriteMessage_WriteRequest_MessageData } from '@ydbjs/api/topic'
import type { CompressionCodec } from '../codec.js'
import type { AsyncPriorityQueue } from '../queue.js'
import type { TX } from '../tx.js'
import { _batch_messages } from './_batch_messages.js'
import { _emit_write_request } from './_write_request.js'
import type { ThroughputSettings } from './types.js'

export const _flush = function flush(ctx: {
readonly tx?: TX
readonly queue: AsyncPriorityQueue<StreamWriteMessage_FromClient>,
readonly codec: CompressionCodec, // Codec to use for compression
readonly buffer: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages in the buffer
readonly inflight: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight
readonly throughputSettings: ThroughputSettings;
updateBufferSize: (bytes: bigint) => void; // Function to update the buffer size
readonly queue: AsyncPriorityQueue<StreamWriteMessage_FromClient>
readonly codec: CompressionCodec // Codec to use for compression
readonly buffer: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages in the buffer
readonly inflight: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight
readonly throughputSettings: ThroughputSettings
updateBufferSize: (bytes: bigint) => void // Function to update the buffer size
}) {
if (!ctx.buffer.length) {
return; // Nothing to flush
return // Nothing to flush
}

let messagesToSend: StreamWriteMessage_WriteRequest_MessageData[] = [];
let messagesToSend: StreamWriteMessage_WriteRequest_MessageData[] = []

while (ctx.inflight.length < ctx.throughputSettings.maxInflightCount) {
let message = ctx.buffer.shift();
let message = ctx.buffer.shift()
if (!message) {
break; // No more messages to send
break // No more messages to send
}

ctx.inflight.push(message);
messagesToSend.push(message);
ctx.inflight.push(message)
messagesToSend.push(message)
}

for (let batch of _batch_messages(messagesToSend)) {
_emit_write_request(ctx, batch); // Emit the write request with the batch of messages
_emit_write_request(ctx, batch) // Emit the write request with the batch of messages
}
}
101 changes: 75 additions & 26 deletions packages/topic/src/writer/_init_reponse.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,84 @@
import type { StreamWriteMessage_FromClient, StreamWriteMessage_InitResponse, StreamWriteMessage_WriteRequest_MessageData } from "@ydbjs/api/topic";
import type { CompressionCodec } from "../codec.js";
import type { AsyncPriorityQueue } from "../queue.js";
import type { TX } from "../tx.js";
import { _flush } from "./_flush.js";
import type { ThroughputSettings } from "./types.js";
import type {
StreamWriteMessage_FromClient,
StreamWriteMessage_InitResponse,
StreamWriteMessage_WriteRequest_MessageData,
} from '@ydbjs/api/topic'
import type { CompressionCodec } from '../codec.js'
import type { AsyncPriorityQueue } from '../queue.js'
import type { TX } from '../tx.js'
import { _flush } from './_flush.js'
import type { ThroughputSettings } from './types.js'

export const _on_init_response = function on_init_response(ctx: {
readonly tx?: TX
readonly queue: AsyncPriorityQueue<StreamWriteMessage_FromClient>,
readonly codec: CompressionCodec, // Codec to use for compression
readonly buffer: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages in the buffer
readonly inflight: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight
readonly lastSeqNo?: bigint; // The last sequence number acknowledged by the server
readonly throughputSettings: ThroughputSettings; // Current throughput settings for the writer
updateLastSeqNo: (seqNo: bigint) => void;
updateBufferSize: (bytes: bigint) => void; // Function to update the buffer size
}, input: StreamWriteMessage_InitResponse) {
if (!ctx.lastSeqNo) {
// Store the last sequence number from the server.
ctx.updateLastSeqNo(input.lastSeqNo);
}
export const _on_init_response = function on_init_response(
ctx: {
readonly tx?: TX
readonly queue: AsyncPriorityQueue<StreamWriteMessage_FromClient>
readonly codec: CompressionCodec // Codec to use for compression
readonly buffer: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages in the buffer
readonly inflight: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight
readonly lastSeqNo?: bigint // The last sequence number acknowledged by the server
readonly throughputSettings: ThroughputSettings // Current throughput settings for the writer
readonly isSeqNoProvided?: boolean // Whether user provided seqNo (manual mode)
updateLastSeqNo: (seqNo: bigint) => void
updateBufferSize: (bytes: bigint) => void // Function to update the buffer size
},
input: StreamWriteMessage_InitResponse
) {
let serverLastSeqNo = input.lastSeqNo || 0n
let currentLastSeqNo = ctx.lastSeqNo
let isFirstInit = currentLastSeqNo === undefined
let lastSeqNoChanged = isFirstInit || currentLastSeqNo !== serverLastSeqNo

// Return inflight messages to buffer
while (ctx.inflight.length > 0) {
const message = ctx.inflight.pop();
const message = ctx.inflight.pop()
if (!message) {
continue;
continue
}

ctx.buffer.unshift(message);
ctx.updateBufferSize(BigInt(message.data.length));
ctx.buffer.unshift(message)
ctx.updateBufferSize(BigInt(message.data.length))
}

// If this is the first initialization or server provided a new lastSeqNo, and we're in auto seqNo mode,
// renumber all messages in buffer to continue from serverLastSeqNo + 1
// Always renumber on first init, even if currentLastSeqNo === serverLastSeqNo (messages written before init)
// Also renumber if there are messages in buffer that were written before init (their seqNo start from 1, not serverLastSeqNo + 1)
let finalLastSeqNo = serverLastSeqNo
let shouldRenumber = false
// Only renumber in auto mode (when user didn't provide seqNo)
if (!ctx.isSeqNoProvided && ctx.buffer.length > 0) {
if (isFirstInit) {
// First initialization: always renumber messages written before init
shouldRenumber = true
} else if (lastSeqNoChanged) {
// Reconnection: renumber if server's lastSeqNo changed
shouldRenumber = true
} else if (ctx.buffer.length > 0) {
// Check if messages in buffer were written before init (seqNo start from 1, not serverLastSeqNo + 1)
// If first message's seqNo is <= serverLastSeqNo, it was written before init and needs renumbering
let firstMessageSeqNo = ctx.buffer[0]?.seqNo
if (firstMessageSeqNo !== undefined && firstMessageSeqNo <= serverLastSeqNo) {
shouldRenumber = true
}
}
}

if (shouldRenumber) {
let nextSeqNo = serverLastSeqNo + 1n
// Renumber all messages in buffer sequentially starting from serverLastSeqNo + 1
for (let message of ctx.buffer) {
message.seqNo = nextSeqNo
nextSeqNo++
}
// Update lastSeqNo to the last renumbered seqNo so flush() returns correct value
finalLastSeqNo = nextSeqNo - 1n
ctx.updateLastSeqNo(finalLastSeqNo)
} else if (lastSeqNoChanged) {
// Store the last sequence number from the server if we didn't renumber
ctx.updateLastSeqNo(serverLastSeqNo)
}

_flush(ctx); // Flush the buffer to send any pending messages.
// Flush the buffer to send any pending messages
_flush(ctx)
}
72 changes: 42 additions & 30 deletions packages/topic/src/writer/_write.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,59 @@
import { create } from "@bufbuild/protobuf";
import { timestampFromDate } from "@bufbuild/protobuf/wkt";
import { type StreamWriteMessage_WriteRequest_MessageData, StreamWriteMessage_WriteRequest_MessageDataSchema } from "@ydbjs/api/topic";
import type { CompressionCodec } from "../codec.js";
import { _flush } from "./_flush.js";
import { MAX_PAYLOAD_SIZE } from "./constants.js";
import { create } from '@bufbuild/protobuf'
import { timestampFromDate } from '@bufbuild/protobuf/wkt'
import {
type StreamWriteMessage_WriteRequest_MessageData,
StreamWriteMessage_WriteRequest_MessageDataSchema,
} from '@ydbjs/api/topic'
import type { CompressionCodec } from '../codec.js'
import { _flush } from './_flush.js'
import { MAX_PAYLOAD_SIZE } from './constants.js'

export function _write(ctx: {
readonly codec: CompressionCodec, // Codec to use for compression
readonly buffer: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages in the buffer
readonly inflight: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight
readonly lastSeqNo: bigint, // Last sequence number used
updateLastSeqNo: (seqNo: bigint) => void;
updateBufferSize: (bytes: bigint) => void; // Function to update the buffer size
}, msg: {
data: Uint8Array,
seqNo?: bigint,
createdAt?: Date,
metadataItems?: Record<string, Uint8Array>
}): bigint {
let data = ctx.codec ? ctx.codec.compress(msg.data) : msg.data;
export function _write(
ctx: {
readonly codec: CompressionCodec // Codec to use for compression
readonly buffer: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages in the buffer
readonly inflight: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight
readonly lastSeqNo: bigint // Last sequence number used
updateLastSeqNo: (seqNo: bigint) => void
updateBufferSize: (bytes: bigint) => void // Function to update the buffer size
},
msg: {
data: Uint8Array
seqNo?: bigint
createdAt?: Date
metadataItems?: Record<string, Uint8Array>
}
): bigint {
let data = ctx.codec ? ctx.codec.compress(msg.data) : msg.data

// Validate the payload size, it should not exceed MAX_PAYLOAD_SIZE
// This is a YDB limitation for single message size.
if (data.length > MAX_PAYLOAD_SIZE) {
throw new Error(`Payload size exceeds ${Number(MAX_PAYLOAD_SIZE / (1024n * 1024n))}MiB limit.`);
throw new Error(`Payload size exceeds ${Number(MAX_PAYLOAD_SIZE / (1024n * 1024n))}MiB limit.`)
}

let seqNo = msg.seqNo ?? ((ctx.lastSeqNo ?? 0n) + 1n);
let createdAt = timestampFromDate(msg.createdAt ?? new Date());
let metadataItems = Object.entries(msg.metadataItems || {}).map(([key, value]) => ({ key, value }));
let uncompressedSize = BigInt(data.length);
let seqNo = msg.seqNo ?? (ctx.lastSeqNo ?? 0n) + 1n
let createdAt = timestampFromDate(msg.createdAt ?? new Date())
let metadataItems = Object.entries(msg.metadataItems || {}).map(([key, value]) => ({ key, value }))
let uncompressedSize = BigInt(data.length)

let message = create(StreamWriteMessage_WriteRequest_MessageDataSchema, {
data,
seqNo,
createdAt,
metadataItems,
uncompressedSize,
});
})

ctx.buffer.push(message) // Store the message in the buffer
ctx.updateBufferSize(BigInt(data.length)) // Update the buffer size

ctx.buffer.push(message); // Store the message in the buffer
ctx.updateBufferSize(BigInt(data.length)); // Update the buffer size
ctx.updateLastSeqNo(seqNo); // Update the last sequence number
// Only update lastSeqNo if session is initialized (lastSeqNo is defined)
// For messages written before session initialization, lastSeqNo will be updated
// after renumbering in _on_init_response
if (ctx.lastSeqNo !== undefined) {
ctx.updateLastSeqNo(seqNo)
}

return seqNo;
return seqNo
}
Loading