Skip to content

Commit 778a7d3

Browse files
feat: allow to use the adapter within a Node.js cluster
1 parent a5b266f commit 778a7d3

File tree

9 files changed

+607
-320
lines changed

9 files changed

+607
-320
lines changed

lib/adapter.ts

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import { type Pool } from "pg";
2+
import { ClusterAdapterWithHeartbeat } from "socket.io-adapter";
3+
import type {
4+
ClusterAdapterOptions,
5+
ClusterMessage,
6+
ClusterResponse,
7+
Offset,
8+
ServerId,
9+
} from "socket.io-adapter";
10+
import debugModule from "debug";
11+
import { PubSubClient } from "./util";
12+
13+
const debug = debugModule("socket.io-postgres-adapter");
14+
15+
export interface PostgresAdapterOptions {
16+
/**
17+
* The prefix of the notification channel
18+
* @default "socket.io"
19+
*/
20+
channelPrefix?: string;
21+
/**
22+
* The name of the table for payloads over the 8000 bytes limit or containing binary data
23+
* @default "socket_io_attachments"
24+
*/
25+
tableName?: string;
26+
/**
27+
* The threshold for the payload size in bytes (see https://www.postgresql.org/docs/current/sql-notify.html)
28+
* @default 8000
29+
*/
30+
payloadThreshold?: number;
31+
/**
32+
* Number of ms between two cleanup queries
33+
* @default 30000
34+
*/
35+
cleanupInterval?: number;
36+
/**
37+
* Handler for errors. If undefined, the errors will be simply logged.
38+
*
39+
* @default undefined
40+
*/
41+
errorHandler?: (err: Error) => void;
42+
}
43+
44+
/**
45+
* Returns a function that will create a PostgresAdapter instance.
46+
*
47+
* @param pool - a pg.Pool instance
48+
* @param opts - additional options
49+
*
50+
* @public
51+
*/
52+
export function createAdapter(
53+
pool: Pool,
54+
opts: PostgresAdapterOptions & ClusterAdapterOptions = {}
55+
) {
56+
const options = Object.assign(
57+
{
58+
channelPrefix: "socket.io",
59+
tableName: "socket_io_attachments",
60+
payloadThreshold: 8_000,
61+
cleanupInterval: 30_000,
62+
errorHandler: (err: Error) => debug(err),
63+
},
64+
opts
65+
);
66+
67+
const namespaces = new Map<string, PostgresAdapter>();
68+
const client = new PubSubClient(
69+
pool,
70+
options,
71+
(msg) => {
72+
// @ts-expect-error uid is protected
73+
return namespaces.get(msg.nsp)?.uid === msg.uid;
74+
},
75+
(msg) => {
76+
namespaces.get(msg.nsp)?.onMessage(msg);
77+
}
78+
);
79+
80+
return function (nsp: any) {
81+
let adapter = new PostgresAdapter(nsp, opts, client);
82+
83+
namespaces.set(nsp.name, adapter);
84+
client.addNamespace(nsp.name);
85+
86+
const defaultClose = adapter.close;
87+
88+
adapter.close = () => {
89+
namespaces.delete(nsp.name);
90+
91+
if (namespaces.size === 0) {
92+
client.close();
93+
}
94+
95+
defaultClose.call(adapter);
96+
};
97+
98+
return adapter;
99+
};
100+
}
101+
102+
export class PostgresAdapter extends ClusterAdapterWithHeartbeat {
103+
/**
104+
* Adapter constructor.
105+
*
106+
* @param nsp - the namespace
107+
* @param opts - additional options
108+
* @param client
109+
*
110+
* @public
111+
*/
112+
constructor(
113+
nsp: any,
114+
opts: ClusterAdapterOptions,
115+
private readonly client: PubSubClient
116+
) {
117+
super(nsp, opts);
118+
}
119+
120+
protected override doPublish(message: ClusterMessage): Promise<Offset> {
121+
return this.client.publish(message).then(() => {
122+
// connection state recovery is not currently supported
123+
return "";
124+
});
125+
}
126+
127+
protected override doPublishResponse(
128+
_requesterUid: ServerId,
129+
response: ClusterResponse
130+
) {
131+
return this.client.publish(response);
132+
}
133+
}

0 commit comments

Comments
 (0)