From 75dfec7118689f082b8286a70209981463465813 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Sat, 7 Feb 2026 11:46:15 -0800 Subject: [PATCH 1/4] fix(rooms): cleanup edge case for 1hr ttl --- apps/sim/lib/core/config/feature-flags.ts | 6 +- apps/sim/socket/handlers/connection.ts | 3 +- apps/sim/socket/handlers/workflow.ts | 60 +++++++++++--- apps/sim/socket/rooms/memory-manager.ts | 2 +- apps/sim/socket/rooms/redis-manager.ts | 97 ++++++++++++++++++++--- apps/sim/socket/rooms/types.ts | 5 +- 6 files changed, 144 insertions(+), 29 deletions(-) diff --git a/apps/sim/lib/core/config/feature-flags.ts b/apps/sim/lib/core/config/feature-flags.ts index 9f746c5b12..7f6ececcbb 100644 --- a/apps/sim/lib/core/config/feature-flags.ts +++ b/apps/sim/lib/core/config/feature-flags.ts @@ -1,7 +1,7 @@ /** * Environment utility functions for consistent environment detection across the application */ -import { env, getEnv, isFalsy, isTruthy } from './env' +import { env, isFalsy, isTruthy } from './env' /** * Is the application running in production mode @@ -21,9 +21,7 @@ export const isTest = env.NODE_ENV === 'test' /** * Is this the hosted version of the application */ -export const isHosted = - getEnv('NEXT_PUBLIC_APP_URL') === 'https://www.sim.ai' || - getEnv('NEXT_PUBLIC_APP_URL') === 'https://www.staging.sim.ai' +export const isHosted = true /** * Is billing enforcement enabled diff --git a/apps/sim/socket/handlers/connection.ts b/apps/sim/socket/handlers/connection.ts index 5444c9a830..ee7a9a7743 100644 --- a/apps/sim/socket/handlers/connection.ts +++ b/apps/sim/socket/handlers/connection.ts @@ -21,7 +21,8 @@ export function setupConnectionHandlers(socket: AuthenticatedSocket, roomManager cleanupPendingSubblocksForSocket(socket.id) cleanupPendingVariablesForSocket(socket.id) - const workflowId = await roomManager.removeUserFromRoom(socket.id) + const workflowIdHint = [...socket.rooms].find((roomId) => roomId !== socket.id) + const workflowId = await roomManager.removeUserFromRoom(socket.id, workflowIdHint) if (workflowId) { await roomManager.broadcastPresenceUpdate(workflowId) diff --git a/apps/sim/socket/handlers/workflow.ts b/apps/sim/socket/handlers/workflow.ts index c59316d1e7..f6da5edda1 100644 --- a/apps/sim/socket/handlers/workflow.ts +++ b/apps/sim/socket/handlers/workflow.ts @@ -51,26 +51,62 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: const currentWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id) if (currentWorkflowId) { socket.leave(currentWorkflowId) - await roomManager.removeUserFromRoom(socket.id) + await roomManager.removeUserFromRoom(socket.id, currentWorkflowId) await roomManager.broadcastPresenceUpdate(currentWorkflowId) } - const STALE_THRESHOLD_MS = 60_000 + // Keep this above Redis socket key TTL (1h) so a normal idle user is not evicted too aggressively. + const STALE_THRESHOLD_MS = 75 * 60 * 1000 const now = Date.now() const existingUsers = await roomManager.getWorkflowUsers(workflowId) + let liveSocketIds = new Set() + let canCheckLiveness = false + + try { + const liveSockets = await roomManager.io.in(workflowId).fetchSockets() + liveSocketIds = new Set(liveSockets.map((liveSocket) => liveSocket.id)) + canCheckLiveness = true + } catch (error) { + logger.warn( + `Skipping stale cleanup for ${workflowId} due to live socket lookup failure`, + error + ) + } + for (const existingUser of existingUsers) { - if (existingUser.userId === userId && existingUser.socketId !== socket.id) { - const isSameTab = tabSessionId && existingUser.tabSessionId === tabSessionId - const isStale = - now - (existingUser.lastActivity || existingUser.joinedAt || 0) > STALE_THRESHOLD_MS + try { + if (existingUser.socketId === socket.id) { + continue + } - if (isSameTab || isStale) { + const isSameTab = Boolean(tabSessionId && existingUser.tabSessionId === tabSessionId) + + if (isSameTab) { logger.info( - `Cleaning up socket ${existingUser.socketId} for user ${userId} (${isSameTab ? 'same tab' : 'stale'})` + `Cleaning up socket ${existingUser.socketId} for user ${existingUser.userId} (same tab)` ) - await roomManager.removeUserFromRoom(existingUser.socketId) - roomManager.io.in(existingUser.socketId).socketsLeave(workflowId) + await roomManager.removeUserFromRoom(existingUser.socketId, workflowId) + await roomManager.io.in(existingUser.socketId).socketsLeave(workflowId) + continue + } + + if (!canCheckLiveness || liveSocketIds.has(existingUser.socketId)) { + continue + } + + const isStaleByActivity = + now - (existingUser.lastActivity || existingUser.joinedAt || 0) > STALE_THRESHOLD_MS + if (!isStaleByActivity) { + continue } + + logger.info( + `Cleaning up socket ${existingUser.socketId} for user ${existingUser.userId} (stale activity)` + ) + await roomManager.removeUserFromRoom(existingUser.socketId, workflowId) + await roomManager.io.in(existingUser.socketId).socketsLeave(workflowId) + } catch (error) { + logger.warn(`Best-effort cleanup failed for socket ${existingUser.socketId}`, error) } } @@ -136,7 +172,7 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: logger.error('Error joining workflow:', error) // Undo socket.join and room manager entry if any operation failed socket.leave(workflowId) - await roomManager.removeUserFromRoom(socket.id) + await roomManager.removeUserFromRoom(socket.id, workflowId) const isReady = roomManager.isReady() socket.emit('join-workflow-error', { error: isReady ? 'Failed to join workflow' : 'Realtime unavailable', @@ -156,7 +192,7 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: if (workflowId && session) { socket.leave(workflowId) - await roomManager.removeUserFromRoom(socket.id) + await roomManager.removeUserFromRoom(socket.id, workflowId) await roomManager.broadcastPresenceUpdate(workflowId) logger.info(`User ${session.userId} (${session.userName}) left workflow ${workflowId}`) diff --git a/apps/sim/socket/rooms/memory-manager.ts b/apps/sim/socket/rooms/memory-manager.ts index 908ee13f71..fa631ff689 100644 --- a/apps/sim/socket/rooms/memory-manager.ts +++ b/apps/sim/socket/rooms/memory-manager.ts @@ -66,7 +66,7 @@ export class MemoryRoomManager implements IRoomManager { logger.debug(`Added user ${presence.userId} to workflow ${workflowId} (socket: ${socketId})`) } - async removeUserFromRoom(socketId: string): Promise { + async removeUserFromRoom(socketId: string, _workflowIdHint?: string): Promise { const workflowId = this.socketToWorkflow.get(socketId) if (!workflowId) { diff --git a/apps/sim/socket/rooms/redis-manager.ts b/apps/sim/socket/rooms/redis-manager.ts index 9288a47628..87131b62e1 100644 --- a/apps/sim/socket/rooms/redis-manager.ts +++ b/apps/sim/socket/rooms/redis-manager.ts @@ -10,9 +10,11 @@ const KEYS = { workflowMeta: (wfId: string) => `workflow:${wfId}:meta`, socketWorkflow: (socketId: string) => `socket:${socketId}:workflow`, socketSession: (socketId: string) => `socket:${socketId}:session`, + socketPresenceWorkflow: (socketId: string) => `socket:${socketId}:presence-workflow`, } as const const SOCKET_KEY_TTL = 3600 +const SOCKET_PRESENCE_WORKFLOW_KEY_TTL = 24 * 60 * 60 /** * Lua script for atomic user removal from room. @@ -22,20 +24,24 @@ const SOCKET_KEY_TTL = 3600 const REMOVE_USER_SCRIPT = ` local socketWorkflowKey = KEYS[1] local socketSessionKey = KEYS[2] +local socketPresenceWorkflowKey = KEYS[3] local workflowUsersPrefix = ARGV[1] local workflowMetaPrefix = ARGV[2] local socketId = ARGV[3] local workflowId = redis.call('GET', socketWorkflowKey) if not workflowId then - return nil + workflowId = redis.call('GET', socketPresenceWorkflowKey) + if not workflowId then + return nil + end end local workflowUsersKey = workflowUsersPrefix .. workflowId .. ':users' local workflowMetaKey = workflowMetaPrefix .. workflowId .. ':meta' redis.call('HDEL', workflowUsersKey, socketId) -redis.call('DEL', socketWorkflowKey, socketSessionKey) +redis.call('DEL', socketWorkflowKey, socketSessionKey, socketPresenceWorkflowKey) local remaining = redis.call('HLEN', workflowUsersKey) if remaining == 0 then @@ -54,11 +60,13 @@ const UPDATE_ACTIVITY_SCRIPT = ` local workflowUsersKey = KEYS[1] local socketWorkflowKey = KEYS[2] local socketSessionKey = KEYS[3] +local socketPresenceWorkflowKey = KEYS[4] local socketId = ARGV[1] local cursorJson = ARGV[2] local selectionJson = ARGV[3] local lastActivity = ARGV[4] local ttl = tonumber(ARGV[5]) +local presenceWorkflowTtl = tonumber(ARGV[6]) local existingJson = redis.call('HGET', workflowUsersKey, socketId) if not existingJson then @@ -78,6 +86,7 @@ existing.lastActivity = tonumber(lastActivity) redis.call('HSET', workflowUsersKey, socketId, cjson.encode(existing)) redis.call('EXPIRE', socketWorkflowKey, ttl) redis.call('EXPIRE', socketSessionKey, ttl) +redis.call('EXPIRE', socketPresenceWorkflowKey, presenceWorkflowTtl) return 1 ` @@ -164,6 +173,8 @@ export class RedisRoomManager implements IRoomManager { pipeline.hSet(KEYS.workflowMeta(workflowId), 'lastModified', Date.now().toString()) pipeline.set(KEYS.socketWorkflow(socketId), workflowId) pipeline.expire(KEYS.socketWorkflow(socketId), SOCKET_KEY_TTL) + pipeline.set(KEYS.socketPresenceWorkflow(socketId), workflowId) + pipeline.expire(KEYS.socketPresenceWorkflow(socketId), SOCKET_PRESENCE_WORKFLOW_KEY_TTL) pipeline.hSet(KEYS.socketSession(socketId), { userId: presence.userId, userName: presence.userName, @@ -187,7 +198,11 @@ export class RedisRoomManager implements IRoomManager { } } - async removeUserFromRoom(socketId: string, retried = false): Promise { + async removeUserFromRoom( + socketId: string, + workflowIdHint?: string, + retried = false + ): Promise { if (!this.removeUserScriptSha) { logger.error('removeUserFromRoom called before initialize()') return null @@ -195,19 +210,30 @@ export class RedisRoomManager implements IRoomManager { try { const workflowId = await this.redis.evalSha(this.removeUserScriptSha, { - keys: [KEYS.socketWorkflow(socketId), KEYS.socketSession(socketId)], + keys: [ + KEYS.socketWorkflow(socketId), + KEYS.socketSession(socketId), + KEYS.socketPresenceWorkflow(socketId), + ], arguments: ['workflow:', 'workflow:', socketId], }) - if (workflowId) { + if (typeof workflowId === 'string' && workflowId.length > 0) { logger.debug(`Removed socket ${socketId} from workflow ${workflowId}`) + return workflowId + } + + // Fallback without global SCAN: direct cleanup using workflow hint from socket rooms / join context. + if (workflowIdHint) { + return this.removeUserFromWorkflowHint(socketId, workflowIdHint) } - return workflowId as string | null + + return null } catch (error) { if ((error as Error).message?.includes('NOSCRIPT') && !retried) { logger.warn('Lua script not found, reloading...') this.removeUserScriptSha = await this.redis.scriptLoad(REMOVE_USER_SCRIPT) - return this.removeUserFromRoom(socketId, true) + return this.removeUserFromRoom(socketId, workflowIdHint, true) } logger.error(`Failed to remove user from room: ${socketId}`, error) return null @@ -215,7 +241,12 @@ export class RedisRoomManager implements IRoomManager { } async getWorkflowIdForSocket(socketId: string): Promise { - return this.redis.get(KEYS.socketWorkflow(socketId)) + const workflowId = await this.redis.get(KEYS.socketWorkflow(socketId)) + if (workflowId) { + return workflowId + } + + return this.redis.get(KEYS.socketPresenceWorkflow(socketId)) } async getUserSession(socketId: string): Promise { @@ -261,6 +292,52 @@ export class RedisRoomManager implements IRoomManager { return exists > 0 } + private async removeUserFromWorkflowHint( + socketId: string, + workflowIdHint: string + ): Promise { + try { + const pipeline = this.redis.multi() + pipeline.hDel(KEYS.workflowUsers(workflowIdHint), socketId) + pipeline.del(KEYS.socketWorkflow(socketId)) + pipeline.del(KEYS.socketSession(socketId)) + pipeline.del(KEYS.socketPresenceWorkflow(socketId)) + + const results = await pipeline.exec() + if (results.some((result) => result instanceof Error)) { + logger.error('Pipeline partially failed during hinted fallback cleanup', { + socketId, + workflowIdHint, + }) + return null + } + + const hDelResult = results[0] + const removedCount = + typeof hDelResult === 'number' + ? hDelResult + : typeof hDelResult === 'string' + ? Number.parseInt(hDelResult, 10) || 0 + : 0 + + if (removedCount <= 0) { + return null + } + + await this.redis.hSet( + KEYS.workflowMeta(workflowIdHint), + 'lastModified', + Date.now().toString() + ) + + logger.warn(`Removed socket ${socketId} from workflow ${workflowIdHint} via hinted fallback`) + return workflowIdHint + } catch (error) { + logger.error('Failed hinted fallback cleanup', { socketId, workflowIdHint, error }) + return null + } + } + async updateUserActivity( workflowId: string, socketId: string, @@ -278,6 +355,7 @@ export class RedisRoomManager implements IRoomManager { KEYS.workflowUsers(workflowId), KEYS.socketWorkflow(socketId), KEYS.socketSession(socketId), + KEYS.socketPresenceWorkflow(socketId), ], arguments: [ socketId, @@ -285,6 +363,7 @@ export class RedisRoomManager implements IRoomManager { updates.selection !== undefined ? JSON.stringify(updates.selection) : '', (updates.lastActivity ?? Date.now()).toString(), SOCKET_KEY_TTL.toString(), + SOCKET_PRESENCE_WORKFLOW_KEY_TTL.toString(), ], }) } catch (error) { @@ -348,7 +427,7 @@ export class RedisRoomManager implements IRoomManager { // Remove all users from Redis state for (const user of users) { - await this.removeUserFromRoom(user.socketId) + await this.removeUserFromRoom(user.socketId, workflowId) } // Clean up room data diff --git a/apps/sim/socket/rooms/types.ts b/apps/sim/socket/rooms/types.ts index b294646f60..5c755a739e 100644 --- a/apps/sim/socket/rooms/types.ts +++ b/apps/sim/socket/rooms/types.ts @@ -65,9 +65,10 @@ export interface IRoomManager { /** * Remove a user from their current room - * Returns the workflowId they were in, or null if not in any room + * Optional workflowIdHint is used when socket mapping keys are missing/expired. + * Returns the workflowId they were in, or null if not in any room. */ - removeUserFromRoom(socketId: string): Promise + removeUserFromRoom(socketId: string, workflowIdHint?: string): Promise /** * Get the workflow ID for a socket From 163ee8089dc952ba84b12e6b5c86a6b492d9dca9 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Sat, 7 Feb 2026 11:48:26 -0800 Subject: [PATCH 2/4] revert feature flags --- apps/sim/lib/core/config/feature-flags.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/sim/lib/core/config/feature-flags.ts b/apps/sim/lib/core/config/feature-flags.ts index 7f6ececcbb..9f746c5b12 100644 --- a/apps/sim/lib/core/config/feature-flags.ts +++ b/apps/sim/lib/core/config/feature-flags.ts @@ -1,7 +1,7 @@ /** * Environment utility functions for consistent environment detection across the application */ -import { env, isFalsy, isTruthy } from './env' +import { env, getEnv, isFalsy, isTruthy } from './env' /** * Is the application running in production mode @@ -21,7 +21,9 @@ export const isTest = env.NODE_ENV === 'test' /** * Is this the hosted version of the application */ -export const isHosted = true +export const isHosted = + getEnv('NEXT_PUBLIC_APP_URL') === 'https://www.sim.ai' || + getEnv('NEXT_PUBLIC_APP_URL') === 'https://www.staging.sim.ai' /** * Is billing enforcement enabled From fce4e92377f242e348a89120ee2d551edac2a634 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Sat, 7 Feb 2026 12:03:31 -0800 Subject: [PATCH 3/4] address comments --- apps/sim/socket/handlers/workflow.ts | 6 ++- apps/sim/socket/rooms/redis-manager.ts | 65 ++++---------------------- 2 files changed, 15 insertions(+), 56 deletions(-) diff --git a/apps/sim/socket/handlers/workflow.ts b/apps/sim/socket/handlers/workflow.ts index f6da5edda1..8353f0a388 100644 --- a/apps/sim/socket/handlers/workflow.ts +++ b/apps/sim/socket/handlers/workflow.ts @@ -79,7 +79,11 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: continue } - const isSameTab = Boolean(tabSessionId && existingUser.tabSessionId === tabSessionId) + const isSameTab = Boolean( + existingUser.userId === userId && + tabSessionId && + existingUser.tabSessionId === tabSessionId + ) if (isSameTab) { logger.info( diff --git a/apps/sim/socket/rooms/redis-manager.ts b/apps/sim/socket/rooms/redis-manager.ts index 87131b62e1..fb0d0d1042 100644 --- a/apps/sim/socket/rooms/redis-manager.ts +++ b/apps/sim/socket/rooms/redis-manager.ts @@ -28,13 +28,19 @@ local socketPresenceWorkflowKey = KEYS[3] local workflowUsersPrefix = ARGV[1] local workflowMetaPrefix = ARGV[2] local socketId = ARGV[3] +local workflowIdHint = ARGV[4] local workflowId = redis.call('GET', socketWorkflowKey) if not workflowId then workflowId = redis.call('GET', socketPresenceWorkflowKey) - if not workflowId then - return nil - end +end + +if not workflowId and workflowIdHint ~= '' then + workflowId = workflowIdHint +end + +if not workflowId then + return nil end local workflowUsersKey = workflowUsersPrefix .. workflowId .. ':users' @@ -215,7 +221,7 @@ export class RedisRoomManager implements IRoomManager { KEYS.socketSession(socketId), KEYS.socketPresenceWorkflow(socketId), ], - arguments: ['workflow:', 'workflow:', socketId], + arguments: ['workflow:', 'workflow:', socketId, workflowIdHint ?? ''], }) if (typeof workflowId === 'string' && workflowId.length > 0) { @@ -223,11 +229,6 @@ export class RedisRoomManager implements IRoomManager { return workflowId } - // Fallback without global SCAN: direct cleanup using workflow hint from socket rooms / join context. - if (workflowIdHint) { - return this.removeUserFromWorkflowHint(socketId, workflowIdHint) - } - return null } catch (error) { if ((error as Error).message?.includes('NOSCRIPT') && !retried) { @@ -292,52 +293,6 @@ export class RedisRoomManager implements IRoomManager { return exists > 0 } - private async removeUserFromWorkflowHint( - socketId: string, - workflowIdHint: string - ): Promise { - try { - const pipeline = this.redis.multi() - pipeline.hDel(KEYS.workflowUsers(workflowIdHint), socketId) - pipeline.del(KEYS.socketWorkflow(socketId)) - pipeline.del(KEYS.socketSession(socketId)) - pipeline.del(KEYS.socketPresenceWorkflow(socketId)) - - const results = await pipeline.exec() - if (results.some((result) => result instanceof Error)) { - logger.error('Pipeline partially failed during hinted fallback cleanup', { - socketId, - workflowIdHint, - }) - return null - } - - const hDelResult = results[0] - const removedCount = - typeof hDelResult === 'number' - ? hDelResult - : typeof hDelResult === 'string' - ? Number.parseInt(hDelResult, 10) || 0 - : 0 - - if (removedCount <= 0) { - return null - } - - await this.redis.hSet( - KEYS.workflowMeta(workflowIdHint), - 'lastModified', - Date.now().toString() - ) - - logger.warn(`Removed socket ${socketId} from workflow ${workflowIdHint} via hinted fallback`) - return workflowIdHint - } catch (error) { - logger.error('Failed hinted fallback cleanup', { socketId, workflowIdHint, error }) - return null - } - } - async updateUserActivity( workflowId: string, socketId: string, From efb46d4a54db2f07ca0449fd131f5b0568a6f2bf Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Sat, 7 Feb 2026 12:17:15 -0800 Subject: [PATCH 4/4] remove console log --- apps/sim/serializer/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/sim/serializer/index.ts b/apps/sim/serializer/index.ts index 66f4568a4c..622667d9fc 100644 --- a/apps/sim/serializer/index.ts +++ b/apps/sim/serializer/index.ts @@ -70,7 +70,6 @@ function shouldSerializeSubBlock( : group.basicId === subBlockConfig.id return matchesMode && evaluateSubBlockCondition(subBlockConfig.condition, values) } - console.log('[FUCK] subBlockConfig.condition', subBlockConfig.condition, values) return evaluateSubBlockCondition(subBlockConfig.condition, values) }