From 8e35315792bc65daeae9fbf0102cfe7630d83710 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Mon, 3 Nov 2025 22:09:40 +0200 Subject: [PATCH] fix(socket): prevent false-ready state when socket errors during handshake Fixes race condition where async socket errors during connection handshake don't trigger reconnection. Validates socket state after initiator completes to catch errors swallowed by command handlers. fixes: #3108 --- packages/client/lib/client/index.spec.ts | 95 +++++++++++++++++++++++- packages/client/lib/client/socket.ts | 10 +++ 2 files changed, 104 insertions(+), 1 deletion(-) diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index d7ce00f38a..b06b0d5079 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -4,12 +4,16 @@ import RedisClient, { RedisClientOptions, RedisClientType } from '.'; import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, MultiErrorReply, TimeoutError, WatchError } from '../errors'; import { defineScript } from '../lua-script'; import { spy, stub } from 'sinon'; -import { once } from 'node:events'; +import EventEmitter, { once } from 'node:events'; import { MATH_FUNCTION, loadMathFunction } from '../commands/FUNCTION_LOAD.spec'; import { RESP_TYPES } from '../RESP/decoder'; import { BlobStringReply, NumberReply } from '../RESP/types'; import { SortedSetMember } from '../commands/generic-transformers'; import { CommandParser } from './parser'; +import { RedisSocketOptions } from './socket'; +import { getFreePortNumber } from '@redis/test-utils/lib/proxy/redis-proxy'; +import { createClient } from '../../'; +import net from 'node:net' export const SQUARE_SCRIPT = defineScript({ SCRIPT: @@ -1008,6 +1012,95 @@ describe('Client', () => { } }, GLOBAL.SERVERS.OPEN); }); + + describe("socket errors during handshake", () => { + + it("should successfully connect when server accepts connection immediately", async () => { + const { log, client, teardown } = await setup({}, 0); + await client.connect(); + assert.deepEqual(["connect", "ready"], log); + teardown(); + }); + + it("should reconnect after multiple connection drops during handshake", async () => { + const { log, client, teardown } = await setup({}, 2); + await client.connect(); + assert.deepEqual( + [ + "connect", + "error", + "reconnecting", + "connect", + "error", + "reconnecting", + "connect", + "ready", + ], + log, + ); + teardown(); + }); + + //utils + + async function setup( + socketOptions: Partial, + dropCount: number, + ) { + const port = await getFreePortNumber(); + const server = setupMockServer(dropCount); + const options = { + ...{ + socket: { + host: "localhost", + port, + }, + ...socketOptions, + }, + }; + const client = createClient(options); + const log = setupLog(client); + await once(server.listen(port), "listening"); + return { + log, + client, + server, + teardown: async function () { + client.destroy(); + server.close(); + }, + }; + } + + function setupLog(client: EventEmitter): string[] { + const log: string[] = []; + client.on("connect", () => log.push("connect")); + client.on("ready", () => log.push("ready")); + client.on("reconnecting", () => log.push("reconnecting")); + client.on("error", () => log.push("error")); + return log; + } + + // Create a TCP server that accepts connections but immediately drops them times + // This simulates what happens when Docker container is stopped: + // - TCP connection succeeds (OS accepts it) + // - But socket is immediately destroyed, causing ECONNRESET during handshake + function setupMockServer(dropImmediately: number) { + const server = net.createServer(async (socket) => { + // socket.on("data", (data) => + // console.log(data.toString().replaceAll("\r\n", " ")), + // ); + if (dropImmediately > 0) { + dropImmediately--; + socket.destroy(); + } + socket.write("+OK\r\n+OK\r\n"); + // socket.write("*7\r\n$6\r\nserver\r\n$5\r\nredis\r\n$7\r\nversion\r\n$5\r\n7.2.0\r\n$5\r\nproto\r\n:3\r\n$2\r\nid\r\n:1\r\n") + }); + return server; + } + + }); }); /** diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index c5569e8654..ab8e9992b6 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -220,6 +220,15 @@ export default class RedisSocket extends EventEmitter { try { await this.#initiator(); + + // Check if socket was closed/destroyed during initiator execution + if (!this.#socket || this.#socket.destroyed || !this.#socket.readable || !this.#socket.writable) { + const retryIn = this.#shouldReconnect(retries++, new SocketClosedUnexpectedlyError()); + if (typeof retryIn !== 'number') { throw retryIn; } + await setTimeout(retryIn); + this.emit('reconnecting'); + continue; + } } catch (err) { this.#socket.destroy(); this.#socket = undefined; @@ -312,6 +321,7 @@ export default class RedisSocket extends EventEmitter { }); } + write(iterable: Iterable>) { if (!this.#socket) return;