Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions console/src/api/materialize/MaterializeWebsocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ export class MaterializeWebsocket implements Connectable {
onMessage: MessageCallback | undefined;
onClose: CloseCallback | undefined;
onOpen: OpenCallback | undefined;
/** Optional diagnostic label for console logging. */
debugLabel: string | undefined;

constructor(options: {
httpAddress: string;
Expand All @@ -47,19 +49,28 @@ export class MaterializeWebsocket implements Connectable {
onMessage?: MessageCallback;
onClose?: CloseCallback;
onOpen?: OpenCallback;
debugLabel?: string;
}) {
this.httpAddress = options.httpAddress;
this.sessionVariables = options.sessionVariables ?? {};
this.onReadyForQuery = options.onReadyForQuery;
this.onMessage = options.onMessage;
this.onClose = options.onClose;
this.onOpen = options.onOpen;
this.debugLabel = options.debugLabel;
}

connect(httpAddress?: string, sessionVariables?: SessionVariables) {
this.httpAddress = httpAddress ?? this.httpAddress;
this.sessionVariables = sessionVariables ?? this.sessionVariables;
const priorState = this.socket?.readyState;
this.disconnect();
if (this.debugLabel) {
// eslint-disable-next-line no-console
console.log(
`[ws:${this.debugLabel}] connect (priorReadyState=${priorState}) -> ${this.httpAddress}`,
);
}
this.socket = new WebSocket(
`${apiClient.mzWebsocketUrlScheme}://${this.httpAddress}/api/experimental/sql`,
);
Expand All @@ -70,6 +81,12 @@ export class MaterializeWebsocket implements Connectable {
}

disconnect() {
if (this.debugLabel && this.socket) {
// eslint-disable-next-line no-console
console.log(
`[ws:${this.debugLabel}] disconnect (readyState=${this.socket.readyState})`,
);
}
this.setState({
readyForQuery: false,
error: undefined,
Expand Down Expand Up @@ -159,6 +176,10 @@ export class MaterializeWebsocket implements Connectable {
}

private handleOpen = (event: Event) => {
if (this.debugLabel) {
// eslint-disable-next-line no-console
console.log(`[ws:${this.debugLabel}] open`);
}
this.onOpen?.(event);
// Notify open listeners (used by WebsocketConnectionManager to update state)
for (const callback of this.openListeners) {
Expand Down Expand Up @@ -187,6 +208,10 @@ export class MaterializeWebsocket implements Connectable {
};

private handleError = () => {
if (this.debugLabel) {
// eslint-disable-next-line no-console
console.log(`[ws:${this.debugLabel}] error event (readyState=${this.socket?.readyState})`);
}
this.setState({
error: "Socket error",
});
Expand All @@ -196,6 +221,12 @@ export class MaterializeWebsocket implements Connectable {
};

private handleClose = (event: CloseEvent) => {
if (this.debugLabel) {
// eslint-disable-next-line no-console
console.log(
`[ws:${this.debugLabel}] close code=${event.code} wasClean=${event.wasClean} reason=${event.reason || "(none)"}`,
);
}
this.setState({
readyForQuery: false,
error: "Connection closed unexpectedly",
Expand Down
165 changes: 164 additions & 1 deletion console/src/api/materialize/SubscribeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ export interface SubscribeManagerOptions<
flushInterval?: number;
upsert?: UpsertSubscribeOptions<T>;
select?: SelectFunction<T, R>;
/**
* If set, every raw WebSocket message and every buffer flush is logged to
* the console, prefixed with this label. Intended for diagnostics only.
*/
debugLabel?: string;
}

export type SelectFunction<T extends object, R> = (row: SubscribeRow<T>) => R;
Expand Down Expand Up @@ -104,8 +109,18 @@ export class SubscribeManager<T extends object, R> implements Connectable {

private flushIntervalHandle: NodeJS.Timeout | undefined;
private flushInterval: number = 16;
/** Optional diagnostic label exposed so external hooks (e.g.
* useAutomaticallyConnectSocket) can correlate logs. */
debugLabel: string | undefined;

constructor(options: SubscribeManagerOptions<T, R>) {
this.debugLabel = options.debugLabel;
if (this.debugLabel) {
// eslint-disable-next-line no-console
console.log(
`[subscribe:${this.debugLabel}] constructed (hasRequest=${!!options.request})`,
);
}
this.socket = new MaterializeWebsocket({
httpAddress: options.httpAddress,
sessionVariables: buildSessionVariables({
Expand All @@ -115,6 +130,7 @@ export class SubscribeManager<T extends object, R> implements Connectable {
onMessage: this.onMessage,
onClose: this.onClose,
onOpen: this.onOpen,
debugLabel: options.debugLabel,
});
this.upsert = options.upsert;
this.closeSocketOnComplete = options.closeSocketOnComplete ?? false;
Expand Down Expand Up @@ -182,6 +198,12 @@ export class SubscribeManager<T extends object, R> implements Connectable {
}

reset = () => {
if (this.debugLabel) {
// eslint-disable-next-line no-console
console.log(
`[subscribe:${this.debugLabel}] reset (snapshotComplete -> false)`,
);
}
this.columns = [];
this.querySent = false;
this.closedTimestampBuffer = [];
Expand Down Expand Up @@ -216,10 +238,31 @@ export class SubscribeManager<T extends object, R> implements Connectable {
};

private setState(update: Partial<SubscribeState<SubscribeRow<T>>>) {
const prevSnapshotComplete = this.currentState.snapshotComplete;
const prevDataLen = this.currentState.data.length;
this.currentState = {
...this.currentState,
...update,
};
if (
this.debugLabel &&
prevSnapshotComplete !== this.currentState.snapshotComplete
) {
// eslint-disable-next-line no-console
console.log(
`[subscribe:${this.debugLabel}] snapshotComplete: ${prevSnapshotComplete} -> ${this.currentState.snapshotComplete} (data.length=${this.currentState.data.length}, listeners=${this.listeners.size})`,
);
}
if (
this.debugLabel &&
prevDataLen !== this.currentState.data.length &&
this.currentState.snapshotComplete
) {
// eslint-disable-next-line no-console
console.log(
`[subscribe:${this.debugLabel}] data.length: ${prevDataLen} -> ${this.currentState.data.length}`,
);
}
// We need to have the snapshot state as a separate object and not just a derived copy of this.currentState because
// React.useSyncExternalStore requires a stable reference of the snapshot state, otherwise it will enter an infinite render loop.
this.snapshotState = {
Expand All @@ -235,16 +278,34 @@ export class SubscribeManager<T extends object, R> implements Connectable {

private onReadyForQuery = () => {
if (this.querySent && this.closeSocketOnComplete) {
if (this.debugLabel) {
// eslint-disable-next-line no-console
console.log(
`[subscribe:${this.debugLabel}] onReadyForQuery: closing (closeSocketOnComplete)`,
);
}
this.socket.disconnect();
return;
}
if (this.sqlRequest) {
if (this.debugLabel) {
// eslint-disable-next-line no-console
console.log(
`[subscribe:${this.debugLabel}] onReadyForQuery: sending SUBSCRIBE (querySent->true)`,
);
}
this.socket.send(this.sqlRequest);
this.querySent = true;
}
};

private onClose = (event: CloseEvent) => {
if (this.debugLabel) {
// eslint-disable-next-line no-console
console.log(
`[subscribe:${this.debugLabel}] onClose code=${event.code}`,
);
}
this.setState({
error: {
code: SUBSCRIBE_ERROR_CODE.CONNECTION_CLOSED,
Expand All @@ -254,6 +315,16 @@ export class SubscribeManager<T extends object, R> implements Connectable {
};

private onMessage = (message: WebSocketResult) => {
if (this.debugLabel && message.type !== "Row") {
// Row messages get logged in detail inside onRow as state-machine steps;
// everything else (Error, Rows-with-columns, ReadyForQuery, etc.) is logged here.
// eslint-disable-next-line no-console
console.log(
`[subscribe:${this.debugLabel}] msg`,
message.type,
message.payload,
);
}
if (message.type === "Error") {
captureException(
new Error(`Subscribe error: ${JSON.stringify(message.payload)}`),
Expand All @@ -276,31 +347,69 @@ export class SubscribeManager<T extends object, R> implements Connectable {
private onRow = (payload: unknown[]) => {
// If querySent is false, it means we are still getting results from a previous
// query, ignore the data.
if (!this.querySent) return;
if (!this.querySent) {
if (this.debugLabel) {
// eslint-disable-next-line no-console
console.log(
`[subscribe:${this.debugLabel}] onRow skipped (querySent=false)`,
);
}
return;
}

const meta = extractSubscribeMetadata(payload, this.columns);
const before = {
currentTimestamp: this.currentTimestamp,
currentTimestampType: typeof this.currentTimestamp,
snapshotComplete: this.currentState.snapshotComplete,
};
const trace: string[] = [];

if (this.currentTimestamp && meta.mzTimestamp > this.currentTimestamp) {
// this timestamp is complete, flush it
const updates = this.currentTimestampBuffer.get(this.currentTimestamp);
if (updates) {
this.closedTimestampBuffer.push(...updates);
this.currentTimestampBuffer.delete(this.currentTimestamp);
trace.push(
`advance(${this.currentTimestamp}->${meta.mzTimestamp}): flushed ${updates.length} to closed`,
);
} else {
trace.push(
`advance(${this.currentTimestamp}->${meta.mzTimestamp}): no buffer for prev`,
);
}
} else if (this.debugLabel && this.currentTimestamp) {
// Helpful when timestamps are equal or compare wrong (e.g. string-compare bug):
trace.push(
`noadvance: cmp(${this.currentTimestamp} ${typeof this.currentTimestamp}, ${meta.mzTimestamp} ${typeof meta.mzTimestamp}) -> ${meta.mzTimestamp > this.currentTimestamp}`,
);
} else if (this.debugLabel) {
trace.push(`noadvance: currentTimestamp not set`);
}

// Once we've received a second progress message, we know we've received
// the initial snapshot.
const snapshotGateChecked =
meta.mzProgressed && !this.currentState.snapshotComplete;
if (
meta.mzProgressed &&
this.currentTimestamp &&
!this.currentState.snapshotComplete
) {
// Eagerly flush the buffer to make the snapshot available.
trace.push(`snapshotGate: FIRING (flushing+setting snapshotComplete)`);
this.flushSocketBuffer();
this.setState({
snapshotComplete: true,
});
} else if (snapshotGateChecked) {
// mzProgressed && !snapshotComplete but currentTimestamp falsy -> gate blocked
trace.push(
`snapshotGate: BLOCKED (currentTimestamp=${String(this.currentTimestamp)})`,
);
}

// Track the new currently open timestamp.
this.currentTimestamp = meta.mzTimestamp;
const row = mapRowToObject<T>(payload, this.columns, [
Expand All @@ -314,11 +423,65 @@ export class SubscribeManager<T extends object, R> implements Connectable {
data: row,
});
this.currentTimestampBuffer.set(meta.mzTimestamp, updates);

if (this.debugLabel) {
// Only log onRow at interesting transitions, not for every progress tick:
// - before snapshot completes (every row is potentially relevant)
// - when mzState is non-null (real data: upsert/delete/key_violation)
// - when the snapshot-completion gate is firing or blocked
const interesting =
!before.snapshotComplete ||
meta.mzState !== null ||
trace.some((t) => t.startsWith("snapshotGate"));
if (interesting) {
// eslint-disable-next-line no-console
console.log(
`[subscribe:${this.debugLabel}] onRow`,
{
mzTimestamp: meta.mzTimestamp,
mzProgressed: meta.mzProgressed,
mzState: meta.mzState,
},
"before=",
before,
"after=",
{
currentTimestamp: this.currentTimestamp,
snapshotComplete: this.currentState.snapshotComplete,
},
trace.length ? trace : undefined,
);
}
}
};

private flushSocketBuffer = () => {
if (this.closedTimestampBuffer.length === 0) return;

if (this.debugLabel) {
const stateCounts = this.closedTimestampBuffer.reduce<
Record<string, number>
>((acc, r) => {
const k = String(r.mzState);
acc[k] = (acc[k] ?? 0) + 1;
return acc;
}, {});
const hasRealData = this.closedTimestampBuffer.some(
(r) => r.mzState !== null,
);
if (hasRealData || !this.currentState.snapshotComplete) {
// eslint-disable-next-line no-console
console.log(
`[subscribe:${this.debugLabel}] flush`,
this.closedTimestampBuffer.length,
"rows by mz_state=",
stateCounts,
"snapshotComplete=",
this.currentState.snapshotComplete,
);
}
}

const clearBuffer = () => {
this.closedTimestampBuffer = [];
};
Expand Down
Loading
Loading