diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts index 689104668db..2ea3916ea1e 100644 --- a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts +++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts @@ -77,10 +77,10 @@ export interface ParagraphConfig { } export interface ParagraphResults { + [index: number]: Record; + code?: string; msg?: ParagraphIResultsMsgItem[]; - - [index: number]: Record; } export enum DatasetType { diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts index 29a05ddf8ca..401a7bcf637 100644 --- a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts +++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts @@ -40,6 +40,9 @@ export type ReceiveArgumentsType = MessageReceiveDataTypeMap[K] extends undefined ? () => void : (data: MessageReceiveDataTypeMap[K]) => void; export class Message { + /** Prevent unbounded growth of the short-circuit tracker */ + private static readonly MAX_SHORT_CIRCUIT_SIZE = 100; + public connectedStatus = false; public connectedStatus$ = new Subject(); private ws: WebSocketSubject> | null = null; @@ -54,6 +57,13 @@ export class Message { private uniqueClientId = Math.random().toString(36).substring(2, 7); private lastMsgIdSeqSent = 0; private readonly normalCloseCode = 1000; + /** + * Track which PARAGRAPH message seq IDs were explicitly short-circuited. + * Only these should be filtered out — not all messages where + * lastMsgIdSeqSent > receivedSeq (which can happen legitimately when + * unrelated requests like EDITOR_SETTING are interleaved). + */ + private shortCircuitedParagraphMsgIds = new Set(); constructor() { this.open$.subscribe(() => { @@ -185,8 +195,11 @@ export class Message { const isResponseForRequestFromThisClient = uniqueClientId === this.uniqueClientId; if (message.op === OP.PARAGRAPH) { - if (isResponseForRequestFromThisClient && this.lastMsgIdSeqSent > msgIdSeqReceived) { + if (isResponseForRequestFromThisClient && + this.shortCircuitedParagraphMsgIds.has(msgIdSeqReceived) + ) { console.log('PARAPGRAPH is already updated by shortcircuit'); + this.shortCircuitedParagraphMsgIds.delete(msgIdSeqReceived); return false; } else { return true; @@ -200,6 +213,20 @@ export class Message { } shortCircuit(message: WebSocketMessage) { + // Track which PARAGRAPH responses were explicitly short-circuited + // so the receive filter can correctly identify and skip them + if (message.op === OP.PARAGRAPH && message.msgId) { + const msgIdSeq = parseInt(message.msgId.split('-')[1], 10); + this.shortCircuitedParagraphMsgIds.add(msgIdSeq); + // Prevent unbounded growth: evict the oldest (smallest seq) entries + if (this.shortCircuitedParagraphMsgIds.size > Message.MAX_SHORT_CIRCUIT_SIZE) { + const sorted = [...this.shortCircuitedParagraphMsgIds].sort((a, b) => a - b); + const toDelete = sorted.slice(0, sorted.length - Message.MAX_SHORT_CIRCUIT_SIZE / 2); + for (const id of toDelete) { + this.shortCircuitedParagraphMsgIds.delete(id); + } + } + } this.received$.next(this.interceptReceived(message)); }