From 791611283aad91e96e3bcf3b2518935e9910df59 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Wed, 18 Mar 2026 16:20:03 -0400 Subject: [PATCH 1/3] feat(pubsub): add support for streaming pull keepalives from the server --- handwritten/pubsub/src/message-stream.ts | 32 +++++++++++ handwritten/pubsub/test/message-stream.ts | 67 +++++++++++++++++++++++ 2 files changed, 99 insertions(+) diff --git a/handwritten/pubsub/src/message-stream.ts b/handwritten/pubsub/src/message-stream.ts index 60c404bcb85..cdfad590331 100644 --- a/handwritten/pubsub/src/message-stream.ts +++ b/handwritten/pubsub/src/message-stream.ts @@ -139,6 +139,7 @@ export class ChannelError extends Error implements grpc.ServiceError { interface StreamTracked { stream?: PullStream; receivedStatus?: boolean; + pingTimeout?: NodeJS.Timeout; } /** @@ -227,6 +228,9 @@ export class MessageStream extends PassThrough { for (let i = 0; i < this._streams.length; i++) { const tracker = this._streams[i]; + if (tracker.pingTimeout) { + clearTimeout(tracker.pingTimeout); + } if (tracker.stream) { this._removeStream(i, 'overall message stream destroyed', 'n/a'); } @@ -254,6 +258,8 @@ export class MessageStream extends PassThrough { tracker.stream = stream; tracker.receivedStatus = false; + this._resetPingTimer(index); + stream .on('error', err => this._onError(index, err)) .once('status', status => this._onStatus(index, status)) @@ -264,10 +270,30 @@ export class MessageStream extends PassThrough { // Mark this stream as alive again. (reset backoff) const tracker = this._streams[index]; this._retrier.reset(tracker); + this._resetPingTimer(index); this.emit('data', data); } + private _resetPingTimer(index: number): void { + const tracker = this._streams[index]; + if (tracker.pingTimeout) { + clearTimeout(tracker.pingTimeout); + } + // We expect a packet from the server at least once every 30 seconds. + // Give it a 1-second grace period. + tracker.pingTimeout = setTimeout(() => { + this._removeStream( + index, + 'stream inactive for longer than 30 seconds', + 'will be retried', + ); + this._retrier.retryLater(tracker, () => + this._fillOne(index, undefined, 'retry'), + ); + }, 31000); + } + /** * Attempts to create and cache the desired number of StreamingPull requests. * gRPC does not supply a way to confirm that a stream is connected, so our @@ -347,6 +373,8 @@ export class MessageStream extends PassThrough { maxOutstandingBytes: this._subscriber.useLegacyFlowControl ? 0 : this._subscriber.maxBytes, + clientId: 'node-pubsub', + protocolVersion: 1, // Set protocol version to fulfill keepalive capabilities }; const otherArgs = { headers: { @@ -511,6 +539,10 @@ export class MessageStream extends PassThrough { whatNext?: string, ): void { const tracker = this._streams[index]; + if (tracker.pingTimeout) { + clearTimeout(tracker.pingTimeout); + tracker.pingTimeout = undefined; + } if (tracker.stream) { logs.subscriberStreams.info( 'closing stream %i; why: %s; next: %s', diff --git a/handwritten/pubsub/test/message-stream.ts b/handwritten/pubsub/test/message-stream.ts index 2463d17eb50..21945b19e27 100644 --- a/handwritten/pubsub/test/message-stream.ts +++ b/handwritten/pubsub/test/message-stream.ts @@ -523,6 +523,24 @@ describe('MessageStream', () => { }); describe('keeping streams alive', () => { + it('should set protocolVersion in the initial packet', async () => { + // The special handling for messageStream and the spy below are + // so that we can test the initial message. + messageStream.destroy(); + + const spy = sandbox.spy(FakeGrpcStream.prototype, 'write'); + const ms = new MessageStream(subscriber); + await ms.start(); + + assert.strictEqual(spy.callCount, 5); + const {args} = spy.firstCall; + const request = args[0] as any; + + assert.strictEqual(String(request.protocolVersion), '1'); + + ms.destroy(); + }); + it('should keep the streams alive', () => { const frequency = 30000; const stubs = client.streams.map(stream => { @@ -536,6 +554,55 @@ describe('MessageStream', () => { assert.deepStrictEqual(data, {}); }); }); + + it('should close stream if no data received for 30 seconds', async () => { + messageStream.destroy(); + client.streams.length = 0; + + const ms = new MessageStream(subscriber); + await ms.start(); + + const streamCount = client.streams.length; + const cancelSpies = client.streams.map(s => sandbox.spy(s, 'cancel')); + + sandbox.clock.tick(32000); + + cancelSpies.forEach(spy => { + assert.strictEqual(spy.callCount, 1); + }); + + // The retry minimum backoff is 100ms. + sandbox.clock.tick(150); + + // The streams are restarted, wait for next tick for fill stream pool + await promisify(process.nextTick)(); + assert.ok(client.streams.length > streamCount); + + ms.destroy(); + }); + + it('should not close stream if data received within 30 seconds', async () => { + messageStream.destroy(); + // client.streams.length = 0; + + const ms = new MessageStream(subscriber); + await ms.start(); + + const cancelSpies = client.streams.map(s => sandbox.spy(s, 'cancel')); + + sandbox.clock.tick(20000); + + // Simulating data prevents timeout + client.streams.forEach(s => s.emit('data', {})); + + sandbox.clock.tick(20000); + + cancelSpies.forEach(spy => { + assert.strictEqual(spy.callCount, 0); + }); + + ms.destroy(); + }); }); it('should allow updating the ack deadline', async () => { From 7d6cecb3aa04864e9b56c60418700a505f402761 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Fri, 20 Mar 2026 18:01:57 -0400 Subject: [PATCH 2/3] feat(pubsub): update algorithm to match Java impl --- handwritten/pubsub/src/message-stream.ts | 62 ++++++++++++++++------- handwritten/pubsub/test/message-stream.ts | 21 +++----- 2 files changed, 50 insertions(+), 33 deletions(-) diff --git a/handwritten/pubsub/src/message-stream.ts b/handwritten/pubsub/src/message-stream.ts index cdfad590331..4d5de066c51 100644 --- a/handwritten/pubsub/src/message-stream.ts +++ b/handwritten/pubsub/src/message-stream.ts @@ -139,7 +139,9 @@ export class ChannelError extends Error implements grpc.ServiceError { interface StreamTracked { stream?: PullStream; receivedStatus?: boolean; - pingTimeout?: NodeJS.Timeout; + lastPingTime?: number; + lastResponseTime?: number; + aliveTimer?: NodeJS.Timeout; } /** @@ -201,7 +203,7 @@ export class MessageStream extends PassThrough { */ setStreamAckDeadline(deadline: Duration) { const request: StreamingPullRequest = { - streamAckDeadlineSeconds: deadline.totalOf('second'), + streamAckDeadlineSeconds: deadline.seconds, }; for (const tracker of this._streams) { @@ -228,8 +230,8 @@ export class MessageStream extends PassThrough { for (let i = 0; i < this._streams.length; i++) { const tracker = this._streams[i]; - if (tracker.pingTimeout) { - clearTimeout(tracker.pingTimeout); + if (tracker.aliveTimer) { + clearInterval(tracker.aliveTimer); } if (tracker.stream) { this._removeStream(i, 'overall message stream destroyed', 'n/a'); @@ -257,8 +259,8 @@ export class MessageStream extends PassThrough { const tracker = this._streams[index]; tracker.stream = stream; tracker.receivedStatus = false; - - this._resetPingTimer(index); + tracker.lastResponseTime = Date.now(); + this._setAliveTimer(index); stream .on('error', err => this._onError(index, err)) @@ -269,29 +271,49 @@ export class MessageStream extends PassThrough { private _onData(index: number, data: PullResponse): void { // Mark this stream as alive again. (reset backoff) const tracker = this._streams[index]; + tracker.lastResponseTime = Date.now(); this._retrier.reset(tracker); - this._resetPingTimer(index); this.emit('data', data); } - private _resetPingTimer(index: number): void { + private _clearAliveTimer(tracker: StreamTracked): void { + if (tracker.aliveTimer) { + clearInterval(tracker.aliveTimer); + tracker.aliveTimer = undefined; + } + } + + private _checkAliveTimer(index: number): void { const tracker = this._streams[index]; - if (tracker.pingTimeout) { - clearTimeout(tracker.pingTimeout); + const now = Date.now(); + const lastPingTime = tracker.lastPingTime ?? -1; + const lastResponseTime = tracker.lastResponseTime ?? 0; + if (lastPingTime <= lastResponseTime) { + return; } - // We expect a packet from the server at least once every 30 seconds. - // Give it a 1-second grace period. - tracker.pingTimeout = setTimeout(() => { + + const elapsedSincePing = now - lastPingTime; + + if (elapsedSincePing > 15000) { this._removeStream( index, - 'stream inactive for longer than 30 seconds', + 'no keepalive response from server within 15 seconds', 'will be retried', ); this._retrier.retryLater(tracker, () => this._fillOne(index, undefined, 'retry'), ); - }, 31000); + } + } + + private _setAliveTimer(index: number): void { + const tracker = this._streams[index]; + this._clearAliveTimer(tracker); + + tracker.aliveTimer = setInterval(() => { + this._checkAliveTimer(index); + }, 10000); } /** @@ -414,12 +436,14 @@ export class MessageStream extends PassThrough { 'sending keepAlive to %i streams', this._streams.length, ); - this._streams.forEach(tracker => { + this._streams.forEach((tracker, index) => { // It's possible that a status event fires off (signaling the rpc being // closed) but the stream hasn't drained yet. Writing to such a stream will // result in a `write after end` error. if (!tracker.receivedStatus && tracker.stream) { tracker.stream.write({}); + tracker.lastPingTime = Date.now(); + this._setAliveTimer(index); } }); } @@ -539,9 +563,9 @@ export class MessageStream extends PassThrough { whatNext?: string, ): void { const tracker = this._streams[index]; - if (tracker.pingTimeout) { - clearTimeout(tracker.pingTimeout); - tracker.pingTimeout = undefined; + if (tracker.aliveTimer) { + clearInterval(tracker.aliveTimer); + tracker.aliveTimer = undefined; } if (tracker.stream) { logs.subscriberStreams.info( diff --git a/handwritten/pubsub/test/message-stream.ts b/handwritten/pubsub/test/message-stream.ts index 21945b19e27..8ae58d7d9d4 100644 --- a/handwritten/pubsub/test/message-stream.ts +++ b/handwritten/pubsub/test/message-stream.ts @@ -555,47 +555,40 @@ describe('MessageStream', () => { }); }); - it('should close stream if no data received for 30 seconds', async () => { + it('should close stream if no data received for 15 seconds after keepalive', async () => { messageStream.destroy(); client.streams.length = 0; const ms = new MessageStream(subscriber); await ms.start(); - const streamCount = client.streams.length; const cancelSpies = client.streams.map(s => sandbox.spy(s, 'cancel')); - sandbox.clock.tick(32000); + // wait for keepalive ping (30s) + 21s timeout to pass two 10s polling intervals + sandbox.clock.tick(51000); cancelSpies.forEach(spy => { assert.strictEqual(spy.callCount, 1); }); - // The retry minimum backoff is 100ms. - sandbox.clock.tick(150); - - // The streams are restarted, wait for next tick for fill stream pool - await promisify(process.nextTick)(); - assert.ok(client.streams.length > streamCount); - ms.destroy(); }); - it('should not close stream if data received within 30 seconds', async () => { + it('should not close stream if data received within 15 seconds of keepalive', async () => { messageStream.destroy(); - // client.streams.length = 0; const ms = new MessageStream(subscriber); await ms.start(); const cancelSpies = client.streams.map(s => sandbox.spy(s, 'cancel')); - sandbox.clock.tick(20000); + sandbox.clock.tick(30000); // Simulating data prevents timeout client.streams.forEach(s => s.emit('data', {})); - sandbox.clock.tick(20000); + // Wait for two 10s polling intervals to pass + sandbox.clock.tick(21000); cancelSpies.forEach(spy => { assert.strictEqual(spy.callCount, 0); From 2e7b1a80cab3341bd0ee892f1db70fa3daa05c95 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Fri, 20 Mar 2026 18:04:47 -0400 Subject: [PATCH 3/3] fix(pubsub): correct clientId value --- handwritten/pubsub/src/message-stream.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/handwritten/pubsub/src/message-stream.ts b/handwritten/pubsub/src/message-stream.ts index 4d5de066c51..7ae79e7e37c 100644 --- a/handwritten/pubsub/src/message-stream.ts +++ b/handwritten/pubsub/src/message-stream.ts @@ -27,6 +27,7 @@ import {Duration} from './temporal'; import {ExponentialRetry} from './exponential-retry'; import {DebugMessage} from './debug'; import {logs as baseLogs} from './logs'; +import {randomUUID} from 'crypto'; /** * Loggers. Exported for unit tests. @@ -395,7 +396,7 @@ export class MessageStream extends PassThrough { maxOutstandingBytes: this._subscriber.useLegacyFlowControl ? 0 : this._subscriber.maxBytes, - clientId: 'node-pubsub', + clientId: randomUUID().toString(), protocolVersion: 1, // Set protocol version to fulfill keepalive capabilities }; const otherArgs = {