From efa8b02f549da8fbb5f9f5d706e6b1c6212e1690 Mon Sep 17 00:00:00 2001 From: Andrii Vasyliev Date: Mon, 21 Jul 2025 14:06:53 +0300 Subject: [PATCH] feat: add support of node_redis 5 --- lib/adapter.ts | 8 ++++++-- lib/util.ts | 39 ++++++++++++++++++++++++--------------- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/lib/adapter.ts b/lib/adapter.ts index 17f082a..5727f60 100644 --- a/lib/adapter.ts +++ b/lib/adapter.ts @@ -55,6 +55,10 @@ export function createAdapter( redisClient: any, opts?: RedisStreamsAdapterOptions & ClusterAdapterOptions ) { + const internalRedisClient = + typeof redisClient.createPool === "function" + ? redisClient.createPool() + : redisClient; const namespaceToAdapters = new Map(); const options = Object.assign( { @@ -74,7 +78,7 @@ export function createAdapter( async function poll() { try { let response = await XREAD( - redisClient, + internalRedisClient, options.streamName, offset, options.readCount @@ -106,7 +110,7 @@ export function createAdapter( } return function (nsp) { - const adapter = new RedisStreamsAdapter(nsp, redisClient, options); + const adapter = new RedisStreamsAdapter(nsp, internalRedisClient, options); namespaceToAdapters.set(nsp.name, adapter); if (!polling) { diff --git a/lib/util.ts b/lib/util.ts index 590fa2e..a7d0b0d 100644 --- a/lib/util.ts +++ b/lib/util.ts @@ -37,7 +37,10 @@ export function hasBinary(obj: any, toJSON?: boolean): boolean { * @see https://github.com/redis/node-redis */ function isRedisV4Client(redisClient: any) { - return typeof redisClient.sSubscribe === "function"; + return ( + typeof redisClient.sSubscribe === "function" || + typeof redisClient.totalClients === "number" + ); } /** @@ -68,21 +71,27 @@ export function XREAD( ) { if (isRedisV4Client(redisClient)) { return import("redis").then((redisPackage) => { - return redisClient.xRead( - redisPackage.commandOptions({ - isolated: true, - }), - [ - { - key: streamName, - id: offset, - }, - ], + const streams = [ { - COUNT: readCount, - BLOCK: 5000, - } - ); + key: streamName, + id: offset, + }, + ]; + const options = { + COUNT: readCount, + BLOCK: 5000, + }; + if (redisPackage.commandOptions) { + return redisClient.xRead( + redisPackage.commandOptions({ + isolated: true, + }), + streams, + options + ); + } else { + return redisClient.xRead(streams, options); + } }); } else { return redisClient