Skip to content

Commit e870c73

Browse files
committed
Encode messages before storing in send buffer
1 parent 9b4912d commit e870c73

File tree

7 files changed

+49
-27
lines changed

7 files changed

+49
-27
lines changed

testUtil/fixtures/cleanup.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { expect, vi } from 'vitest';
1+
import { assert, expect, vi } from 'vitest';
22
import {
33
ClientTransport,
44
Connection,
@@ -68,9 +68,15 @@ export async function ensureTransportBuffersAreEventuallyEmpty(
6868
[...t.sessions]
6969
.map(([client, sess]) => {
7070
// get all messages that are not heartbeats
71-
const buff = sess.sendBuffer.filter((msg) => {
72-
return !Value.Check(ControlMessageAckSchema, msg.payload);
73-
});
71+
const buff = sess.sendBuffer
72+
.map((encodedMsg) => {
73+
const msg = sess.parseMsg(encodedMsg.data);
74+
assert(msg);
75+
return msg;
76+
})
77+
.filter((msg) => {
78+
return !Value.Check(ControlMessageAckSchema, msg.payload);
79+
});
7480

7581
return [client, buff] as [
7682
string,

transport/message.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,17 @@ export function cancelMessage(
280280
export type OpaqueTransportMessage = TransportMessage;
281281
export type TransportClientId = string;
282282

283+
/**
284+
* An encoded message that is ready to be send over the transport.
285+
* The seq number is kept to keep track of which messages have been
286+
* acked by the peer.
287+
*/
288+
export interface EncodedTransportMessage {
289+
id: string;
290+
seq: number;
291+
data: Uint8Array;
292+
}
293+
283294
/**
284295
* Checks if the given control flag (usually found in msg.controlFlag) is an ack message.
285296
* @param controlFlag - The control flag to check.

transport/sessionStateMachine/SessionConnected.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,10 @@ export class SessionConnected<
4949
}
5050

5151
send(msg: PartialTransportMessage): string {
52-
const constructedMsg = this.constructMsg(msg);
53-
this.sendBuffer.push(constructedMsg);
54-
this.conn.send(this.options.codec.toBuffer(constructedMsg));
55-
56-
return constructedMsg.id;
52+
const encodedMsg = this.encodeMsg(msg);
53+
this.sendBuffer.push(encodedMsg);
54+
this.conn.send(encodedMsg.data);
55+
return encodedMsg.id;
5756
}
5857

5958
constructor(props: SessionConnectedProps<ConnType>) {
@@ -75,7 +74,7 @@ export class SessionConnected<
7574
);
7675

7776
for (const msg of this.sendBuffer) {
78-
this.conn.send(this.options.codec.toBuffer(msg));
77+
this.conn.send(msg.data);
7978
}
8079
}
8180

transport/sessionStateMachine/common.ts

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { Logger, MessageMetadata } from '../../logging';
22
import { TelemetryInfo } from '../../tracing';
33
import {
4+
EncodedTransportMessage,
45
OpaqueTransportMessage,
56
OpaqueTransportMessageSchema,
67
PartialTransportMessage,
78
ProtocolVersion,
89
TransportClientId,
9-
TransportMessage,
1010
} from '../message';
1111
import { Value } from '@sinclair/typebox/value';
1212
import { Codec } from '../../codec';
@@ -204,7 +204,7 @@ export interface IdentifiedSessionProps extends CommonSessionProps {
204204
to: TransportClientId;
205205
seq: number;
206206
ack: number;
207-
sendBuffer: Array<OpaqueTransportMessage>;
207+
sendBuffer: Array<EncodedTransportMessage>;
208208
telemetry: TelemetryInfo;
209209
protocolVersion: ProtocolVersion;
210210
}
@@ -224,7 +224,7 @@ export abstract class IdentifiedSession extends CommonSession {
224224
* Number of unique messages we've received this session (excluding handshake)
225225
*/
226226
ack: number;
227-
sendBuffer: Array<OpaqueTransportMessage>;
227+
sendBuffer: Array<EncodedTransportMessage>;
228228

229229
constructor(props: IdentifiedSessionProps) {
230230
const { id, to, seq, ack, sendBuffer, telemetry, log, protocolVersion } =
@@ -258,9 +258,9 @@ export abstract class IdentifiedSession extends CommonSession {
258258
return metadata;
259259
}
260260

261-
constructMsg<Payload>(
261+
encodeMsg<Payload>(
262262
partialMsg: PartialTransportMessage<Payload>,
263-
): TransportMessage<Payload> {
263+
): EncodedTransportMessage {
264264
const msg = {
265265
...partialMsg,
266266
id: generateId(),
@@ -270,19 +270,24 @@ export abstract class IdentifiedSession extends CommonSession {
270270
ack: this.ack,
271271
};
272272

273+
const encodedMsg = {
274+
id: msg.id,
275+
seq: msg.seq,
276+
data: this.options.codec.toBuffer(msg),
277+
};
278+
273279
this.seq++;
274280

275-
return msg;
281+
return encodedMsg;
276282
}
277283

278284
nextSeq(): number {
279285
return this.sendBuffer.length > 0 ? this.sendBuffer[0].seq : this.seq;
280286
}
281287

282288
send(msg: PartialTransportMessage): string {
283-
const constructedMsg = this.constructMsg(msg);
289+
const constructedMsg = this.encodeMsg(msg);
284290
this.sendBuffer.push(constructedMsg);
285-
286291
return constructedMsg.id;
287292
}
288293

transport/sessionStateMachine/stateMachine.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1891,7 +1891,7 @@ describe('session state machine', () => {
18911891
expect(onConnectionClosed).not.toHaveBeenCalled();
18921892
expect(onConnectionErrored).not.toHaveBeenCalled();
18931893

1894-
const msg = session.constructMsg(payloadToTransportMessage('hello'));
1894+
const msg = session.encodeMsg(payloadToTransportMessage('hello'));
18951895
session.conn.emitData(session.options.codec.toBuffer(msg));
18961896

18971897
await waitFor(async () => {
@@ -1941,7 +1941,7 @@ describe('session state machine', () => {
19411941
// send a heartbeat
19421942
conn.emitData(
19431943
session.options.codec.toBuffer(
1944-
session.constructMsg({
1944+
session.encodeMsg({
19451945
streamId: 'heartbeat',
19461946
controlFlags: ControlFlags.AckBit,
19471947
payload: {
@@ -1963,7 +1963,7 @@ describe('session state machine', () => {
19631963
// send a heartbeat
19641964
conn.emitData(
19651965
session.options.codec.toBuffer(
1966-
session.constructMsg({
1966+
session.encodeMsg({
19671967
streamId: 'heartbeat',
19681968
controlFlags: ControlFlags.AckBit,
19691969
payload: {

transport/sessionStateMachine/transitions.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { OpaqueTransportMessage, TransportClientId } from '..';
21
import {
32
SessionConnecting,
43
SessionConnectingListeners,
@@ -38,7 +37,11 @@ import {
3837
SessionBackingOff,
3938
SessionBackingOffListeners,
4039
} from './SessionBackingOff';
41-
import { ProtocolVersion } from '../message';
40+
import {
41+
EncodedTransportMessage,
42+
ProtocolVersion,
43+
TransportClientId,
44+
} from '../message';
4245

4346
function inheritSharedSession(
4447
session: IdentifiedSession,
@@ -78,7 +81,7 @@ export const SessionStateGraph = {
7881
) => {
7982
const id = `session-${generateId()}`;
8083
const telemetry = createSessionTelemetryInfo(id, to, from);
81-
const sendBuffer: Array<OpaqueTransportMessage> = [];
84+
const sendBuffer: Array<EncodedTransportMessage> = [];
8285

8386
const session = new SessionNoConnection({
8487
listeners,

transport/transport.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,7 @@ export interface DeleteSessionOptions {
3838
unhealthy: boolean;
3939
}
4040

41-
export type SessionBoundSendFn = (
42-
msg: PartialTransportMessage,
43-
) => string | undefined;
41+
export type SessionBoundSendFn = (msg: PartialTransportMessage) => string;
4442

4543
/**
4644
* Transports manage the lifecycle (creation/deletion) of sessions

0 commit comments

Comments
 (0)