diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index e14b3d715eb..c1938b5f06c 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -15,7 +15,6 @@ import { requestChatTitle, SSE_RESPONSE_HEADERS, } from '@/lib/copilot/chat-streaming' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models' import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer' @@ -184,36 +183,31 @@ export async function POST(req: NextRequest) { const wf = await getWorkflowById(workflowId) resolvedWorkspaceId = wf?.workspaceId ?? undefined } catch { - logger.warn( - appendCopilotLogContext('Failed to resolve workspaceId from workflow', { - requestId: tracker.requestId, - messageId: userMessageId, - }) - ) + logger + .withMetadata({ requestId: tracker.requestId, messageId: userMessageId }) + .warn('Failed to resolve workspaceId from workflow') } const userMessageIdToUse = userMessageId || crypto.randomUUID() + const reqLogger = logger.withMetadata({ + requestId: tracker.requestId, + messageId: userMessageIdToUse, + }) try { - logger.error( - appendCopilotLogContext('Received chat POST', { - requestId: tracker.requestId, - messageId: userMessageIdToUse, - }), - { - workflowId, - hasContexts: Array.isArray(normalizedContexts), - contextsCount: Array.isArray(normalizedContexts) ? normalizedContexts.length : 0, - contextsPreview: Array.isArray(normalizedContexts) - ? normalizedContexts.map((c: any) => ({ - kind: c?.kind, - chatId: c?.chatId, - workflowId: c?.workflowId, - executionId: (c as any)?.executionId, - label: c?.label, - })) - : undefined, - } - ) + reqLogger.info('Received chat POST', { + workflowId, + hasContexts: Array.isArray(normalizedContexts), + contextsCount: Array.isArray(normalizedContexts) ? normalizedContexts.length : 0, + contextsPreview: Array.isArray(normalizedContexts) + ? normalizedContexts.map((c: any) => ({ + kind: c?.kind, + chatId: c?.chatId, + workflowId: c?.workflowId, + executionId: (c as any)?.executionId, + label: c?.label, + })) + : undefined, + }) } catch {} let currentChat: any = null @@ -251,40 +245,22 @@ export async function POST(req: NextRequest) { actualChatId ) agentContexts = processed - logger.error( - appendCopilotLogContext('Contexts processed for request', { - requestId: tracker.requestId, - messageId: userMessageIdToUse, - }), - { - processedCount: agentContexts.length, - kinds: agentContexts.map((c) => c.type), - lengthPreview: agentContexts.map((c) => c.content?.length ?? 0), - } - ) + reqLogger.info('Contexts processed for request', { + processedCount: agentContexts.length, + kinds: agentContexts.map((c) => c.type), + lengthPreview: agentContexts.map((c) => c.content?.length ?? 0), + }) if ( Array.isArray(normalizedContexts) && normalizedContexts.length > 0 && agentContexts.length === 0 ) { - logger.warn( - appendCopilotLogContext( - 'Contexts provided but none processed. Check executionId for logs contexts.', - { - requestId: tracker.requestId, - messageId: userMessageIdToUse, - } - ) + reqLogger.warn( + 'Contexts provided but none processed. Check executionId for logs contexts.' ) } } catch (e) { - logger.error( - appendCopilotLogContext('Failed to process contexts', { - requestId: tracker.requestId, - messageId: userMessageIdToUse, - }), - e - ) + reqLogger.error('Failed to process contexts', e) } } @@ -313,13 +289,7 @@ export async function POST(req: NextRequest) { if (result.status === 'fulfilled' && result.value) { agentContexts.push(result.value) } else if (result.status === 'rejected') { - logger.error( - appendCopilotLogContext('Failed to resolve resource attachment', { - requestId: tracker.requestId, - messageId: userMessageIdToUse, - }), - result.reason - ) + reqLogger.error('Failed to resolve resource attachment', result.reason) } } } @@ -358,26 +328,20 @@ export async function POST(req: NextRequest) { ) try { - logger.error( - appendCopilotLogContext('About to call Sim Agent', { - requestId: tracker.requestId, - messageId: userMessageIdToUse, - }), - { - hasContext: agentContexts.length > 0, - contextCount: agentContexts.length, - hasFileAttachments: Array.isArray(requestPayload.fileAttachments), - messageLength: message.length, - mode: effectiveMode, - hasTools: Array.isArray(requestPayload.tools), - toolCount: Array.isArray(requestPayload.tools) ? requestPayload.tools.length : 0, - hasBaseTools: Array.isArray(requestPayload.baseTools), - baseToolCount: Array.isArray(requestPayload.baseTools) - ? requestPayload.baseTools.length - : 0, - hasCredentials: !!requestPayload.credentials, - } - ) + reqLogger.info('About to call Sim Agent', { + hasContext: agentContexts.length > 0, + contextCount: agentContexts.length, + hasFileAttachments: Array.isArray(requestPayload.fileAttachments), + messageLength: message.length, + mode: effectiveMode, + hasTools: Array.isArray(requestPayload.tools), + toolCount: Array.isArray(requestPayload.tools) ? requestPayload.tools.length : 0, + hasBaseTools: Array.isArray(requestPayload.baseTools), + baseToolCount: Array.isArray(requestPayload.baseTools) + ? requestPayload.baseTools.length + : 0, + hasCredentials: !!requestPayload.credentials, + }) } catch {} if (stream && actualChatId) { @@ -521,16 +485,10 @@ export async function POST(req: NextRequest) { .where(eq(copilotChats.id, actualChatId)) } } catch (error) { - logger.error( - appendCopilotLogContext('Failed to persist chat messages', { - requestId: tracker.requestId, - messageId: userMessageIdToUse, - }), - { - chatId: actualChatId, - error: error instanceof Error ? error.message : 'Unknown error', - } - ) + reqLogger.error('Failed to persist chat messages', { + chatId: actualChatId, + error: error instanceof Error ? error.message : 'Unknown error', + }) } }, }, @@ -572,19 +530,13 @@ export async function POST(req: NextRequest) { provider: typeof requestPayload?.provider === 'string' ? requestPayload.provider : undefined, } - logger.error( - appendCopilotLogContext('Non-streaming response from orchestrator', { - requestId: tracker.requestId, - messageId: userMessageIdToUse, - }), - { - hasContent: !!responseData.content, - contentLength: responseData.content?.length || 0, - model: responseData.model, - provider: responseData.provider, - toolCallsCount: responseData.toolCalls?.length || 0, - } - ) + reqLogger.info('Non-streaming response from orchestrator', { + hasContent: !!responseData.content, + contentLength: responseData.content?.length || 0, + model: responseData.model, + provider: responseData.provider, + toolCallsCount: responseData.toolCalls?.length || 0, + }) // Save messages if we have a chat if (currentChat && responseData.content) { @@ -617,12 +569,7 @@ export async function POST(req: NextRequest) { // Start title generation in parallel if this is first message (non-streaming) if (actualChatId && !currentChat.title && conversationHistory.length === 0) { - logger.error( - appendCopilotLogContext('Starting title generation for non-streaming response', { - requestId: tracker.requestId, - messageId: userMessageIdToUse, - }) - ) + reqLogger.info('Starting title generation for non-streaming response') requestChatTitle({ message, model: selectedModel, provider, messageId: userMessageIdToUse }) .then(async (title) => { if (title) { @@ -633,22 +580,11 @@ export async function POST(req: NextRequest) { updatedAt: new Date(), }) .where(eq(copilotChats.id, actualChatId!)) - logger.error( - appendCopilotLogContext(`Generated and saved title: ${title}`, { - requestId: tracker.requestId, - messageId: userMessageIdToUse, - }) - ) + reqLogger.info(`Generated and saved title: ${title}`) } }) .catch((error) => { - logger.error( - appendCopilotLogContext('Title generation failed', { - requestId: tracker.requestId, - messageId: userMessageIdToUse, - }), - error - ) + reqLogger.error('Title generation failed', error) }) } @@ -662,17 +598,11 @@ export async function POST(req: NextRequest) { .where(eq(copilotChats.id, actualChatId!)) } - logger.error( - appendCopilotLogContext('Returning non-streaming response', { - requestId: tracker.requestId, - messageId: userMessageIdToUse, - }), - { - duration: tracker.getDuration(), - chatId: actualChatId, - responseLength: responseData.content?.length || 0, - } - ) + reqLogger.info('Returning non-streaming response', { + duration: tracker.getDuration(), + chatId: actualChatId, + responseLength: responseData.content?.length || 0, + }) return NextResponse.json({ success: true, @@ -696,33 +626,25 @@ export async function POST(req: NextRequest) { const duration = tracker.getDuration() if (error instanceof z.ZodError) { - logger.error( - appendCopilotLogContext('Validation error', { - requestId: tracker.requestId, - messageId: pendingChatStreamID ?? undefined, - }), - { + logger + .withMetadata({ requestId: tracker.requestId, messageId: pendingChatStreamID ?? undefined }) + .error('Validation error', { duration, errors: error.errors, - } - ) + }) return NextResponse.json( { error: 'Invalid request data', details: error.errors }, { status: 400 } ) } - logger.error( - appendCopilotLogContext('Error handling copilot chat', { - requestId: tracker.requestId, - messageId: pendingChatStreamID ?? undefined, - }), - { + logger + .withMetadata({ requestId: tracker.requestId, messageId: pendingChatStreamID ?? undefined }) + .error('Error handling copilot chat', { duration, error: error instanceof Error ? error.message : 'Unknown error', stack: error instanceof Error ? error.stack : undefined, - } - ) + }) return NextResponse.json( { error: error instanceof Error ? error.message : 'Internal server error' }, @@ -767,16 +689,13 @@ export async function GET(req: NextRequest) { status: meta?.status || 'unknown', } } catch (err) { - logger.warn( - appendCopilotLogContext('Failed to read stream snapshot for chat', { - messageId: chat.conversationId || undefined, - }), - { + logger + .withMetadata({ messageId: chat.conversationId || undefined }) + .warn('Failed to read stream snapshot for chat', { chatId, conversationId: chat.conversationId, error: err instanceof Error ? err.message : String(err), - } - ) + }) } } @@ -795,11 +714,9 @@ export async function GET(req: NextRequest) { ...(streamSnapshot ? { streamSnapshot } : {}), } - logger.error( - appendCopilotLogContext(`Retrieved chat ${chatId}`, { - messageId: chat.conversationId || undefined, - }) - ) + logger + .withMetadata({ messageId: chat.conversationId || undefined }) + .info(`Retrieved chat ${chatId}`) return NextResponse.json({ success: true, chat: transformedChat }) } diff --git a/apps/sim/app/api/copilot/chat/stream/route.ts b/apps/sim/app/api/copilot/chat/stream/route.ts index c442f72ed18..b56d9471817 100644 --- a/apps/sim/app/api/copilot/chat/stream/route.ts +++ b/apps/sim/app/api/copilot/chat/stream/route.ts @@ -1,6 +1,5 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import { getStreamMeta, readStreamEvents, @@ -36,24 +35,21 @@ export async function GET(request: NextRequest) { const toParam = url.searchParams.get('to') const toEventId = toParam ? Number(toParam) : undefined - logger.error( - appendCopilotLogContext('[Resume] Received resume request', { - messageId: streamId || undefined, - }), - { - streamId: streamId || undefined, - fromEventId, - toEventId, - batchMode, - } - ) + const reqLogger = logger.withMetadata({ messageId: streamId || undefined }) + + reqLogger.info('[Resume] Received resume request', { + streamId: streamId || undefined, + fromEventId, + toEventId, + batchMode, + }) if (!streamId) { return NextResponse.json({ error: 'streamId is required' }, { status: 400 }) } const meta = (await getStreamMeta(streamId)) as StreamMeta | null - logger.error(appendCopilotLogContext('[Resume] Stream lookup', { messageId: streamId }), { + reqLogger.info('[Resume] Stream lookup', { streamId, fromEventId, toEventId, @@ -72,7 +68,7 @@ export async function GET(request: NextRequest) { if (batchMode) { const events = await readStreamEvents(streamId, fromEventId) const filteredEvents = toEventId ? events.filter((e) => e.eventId <= toEventId) : events - logger.error(appendCopilotLogContext('[Resume] Batch response', { messageId: streamId }), { + reqLogger.info('[Resume] Batch response', { streamId, fromEventId, toEventId, @@ -124,14 +120,11 @@ export async function GET(request: NextRequest) { const flushEvents = async () => { const events = await readStreamEvents(streamId, lastEventId) if (events.length > 0) { - logger.error( - appendCopilotLogContext('[Resume] Flushing events', { messageId: streamId }), - { - streamId, - fromEventId: lastEventId, - eventCount: events.length, - } - ) + reqLogger.info('[Resume] Flushing events', { + streamId, + fromEventId: lastEventId, + eventCount: events.length, + }) } for (const entry of events) { lastEventId = entry.eventId @@ -178,7 +171,7 @@ export async function GET(request: NextRequest) { } } catch (error) { if (!controllerClosed && !request.signal.aborted) { - logger.warn(appendCopilotLogContext('Stream replay failed', { messageId: streamId }), { + reqLogger.warn('Stream replay failed', { streamId, error: error instanceof Error ? error.message : String(error), }) diff --git a/apps/sim/app/api/mothership/chat/route.ts b/apps/sim/app/api/mothership/chat/route.ts index 5e51f4aa4c9..6accb899a1a 100644 --- a/apps/sim/app/api/mothership/chat/route.ts +++ b/apps/sim/app/api/mothership/chat/route.ts @@ -12,7 +12,6 @@ import { createSSEStream, SSE_RESPONSE_HEADERS, } from '@/lib/copilot/chat-streaming' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types' import { processContextsServer, resolveActiveResourceContext } from '@/lib/copilot/process-contents' import { createRequestTracker, createUnauthorizedResponse } from '@/lib/copilot/request-helpers' @@ -112,27 +111,22 @@ export async function POST(req: NextRequest) { const userMessageId = providedMessageId || crypto.randomUUID() userMessageIdForLogs = userMessageId + const reqLogger = logger.withMetadata({ + requestId: tracker.requestId, + messageId: userMessageId, + }) - logger.error( - appendCopilotLogContext('Received mothership chat start request', { - requestId: tracker.requestId, - messageId: userMessageId, - }), - { - workspaceId, - chatId, - createNewChat, - hasContexts: Array.isArray(contexts) && contexts.length > 0, - contextsCount: Array.isArray(contexts) ? contexts.length : 0, - hasResourceAttachments: - Array.isArray(resourceAttachments) && resourceAttachments.length > 0, - resourceAttachmentCount: Array.isArray(resourceAttachments) - ? resourceAttachments.length - : 0, - hasFileAttachments: Array.isArray(fileAttachments) && fileAttachments.length > 0, - fileAttachmentCount: Array.isArray(fileAttachments) ? fileAttachments.length : 0, - } - ) + reqLogger.info('Received mothership chat start request', { + workspaceId, + chatId, + createNewChat, + hasContexts: Array.isArray(contexts) && contexts.length > 0, + contextsCount: Array.isArray(contexts) ? contexts.length : 0, + hasResourceAttachments: Array.isArray(resourceAttachments) && resourceAttachments.length > 0, + resourceAttachmentCount: Array.isArray(resourceAttachments) ? resourceAttachments.length : 0, + hasFileAttachments: Array.isArray(fileAttachments) && fileAttachments.length > 0, + fileAttachmentCount: Array.isArray(fileAttachments) ? fileAttachments.length : 0, + }) try { await assertActiveWorkspaceAccess(workspaceId, authenticatedUserId) @@ -174,13 +168,7 @@ export async function POST(req: NextRequest) { actualChatId ) } catch (e) { - logger.error( - appendCopilotLogContext('Failed to process contexts', { - requestId: tracker.requestId, - messageId: userMessageId, - }), - e - ) + reqLogger.error('Failed to process contexts', e) } } @@ -205,13 +193,7 @@ export async function POST(req: NextRequest) { if (result.status === 'fulfilled' && result.value) { agentContexts.push(result.value) } else if (result.status === 'rejected') { - logger.error( - appendCopilotLogContext('Failed to resolve resource attachment', { - requestId: tracker.requestId, - messageId: userMessageId, - }), - result.reason - ) + reqLogger.error('Failed to resolve resource attachment', result.reason) } } } @@ -399,16 +381,10 @@ export async function POST(req: NextRequest) { }) } } catch (error) { - logger.error( - appendCopilotLogContext('Failed to persist chat messages', { - requestId: tracker.requestId, - messageId: userMessageId, - }), - { - chatId: actualChatId, - error: error instanceof Error ? error.message : 'Unknown error', - } - ) + reqLogger.error('Failed to persist chat messages', { + chatId: actualChatId, + error: error instanceof Error ? error.message : 'Unknown error', + }) } }, }, @@ -423,15 +399,11 @@ export async function POST(req: NextRequest) { ) } - logger.error( - appendCopilotLogContext('Error handling mothership chat', { - requestId: tracker.requestId, - messageId: userMessageIdForLogs, - }), - { + logger + .withMetadata({ requestId: tracker.requestId, messageId: userMessageIdForLogs }) + .error('Error handling mothership chat', { error: error instanceof Error ? error.message : 'Unknown error', - } - ) + }) return NextResponse.json( { error: error instanceof Error ? error.message : 'Internal server error' }, diff --git a/apps/sim/app/api/mothership/chats/[chatId]/route.ts b/apps/sim/app/api/mothership/chats/[chatId]/route.ts index b51b24a4ace..e41a7c713d6 100644 --- a/apps/sim/app/api/mothership/chats/[chatId]/route.ts +++ b/apps/sim/app/api/mothership/chats/[chatId]/route.ts @@ -5,7 +5,6 @@ import { and, eq, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { getAccessibleCopilotChat } from '@/lib/copilot/chat-lifecycle' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer' import { authenticateCopilotRequestSessionOnly, @@ -63,16 +62,13 @@ export async function GET( status: meta?.status || 'unknown', } } catch (error) { - logger.warn( - appendCopilotLogContext('Failed to read stream snapshot for mothership chat', { - messageId: chat.conversationId || undefined, - }), - { + logger + .withMetadata({ messageId: chat.conversationId || undefined }) + .warn('Failed to read stream snapshot for mothership chat', { chatId, conversationId: chat.conversationId, error: error instanceof Error ? error.message : String(error), - } - ) + }) } } diff --git a/apps/sim/app/api/mothership/execute/route.ts b/apps/sim/app/api/mothership/execute/route.ts index b0fc3a82d08..0570a808e45 100644 --- a/apps/sim/app/api/mothership/execute/route.ts +++ b/apps/sim/app/api/mothership/execute/route.ts @@ -4,7 +4,6 @@ import { z } from 'zod' import { checkInternalAuth } from '@/lib/auth/hybrid' import { createRunSegment } from '@/lib/copilot/async-runs/repository' import { buildIntegrationToolSchemas } from '@/lib/copilot/chat-payload' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' import { generateWorkspaceContext } from '@/lib/copilot/workspace-context' import { @@ -53,6 +52,7 @@ export async function POST(req: NextRequest) { const effectiveChatId = chatId || crypto.randomUUID() messageId = crypto.randomUUID() + const reqLogger = logger.withMetadata({ messageId }) const [workspaceContext, integrationTools, userPermission] = await Promise.all([ generateWorkspaceContext(workspaceId, userId), buildIntegrationToolSchemas(userId, messageId), @@ -96,7 +96,7 @@ export async function POST(req: NextRequest) { }) if (!result.success) { - logger.error(appendCopilotLogContext('Mothership execute failed', { messageId }), { + reqLogger.error('Mothership execute failed', { error: result.error, errors: result.errors, }) @@ -135,7 +135,7 @@ export async function POST(req: NextRequest) { ) } - logger.error(appendCopilotLogContext('Mothership execute error', { messageId }), { + logger.withMetadata({ messageId }).error('Mothership execute error', { error: error instanceof Error ? error.message : 'Unknown error', }) diff --git a/apps/sim/app/api/v1/copilot/chat/route.ts b/apps/sim/app/api/v1/copilot/chat/route.ts index dafb1baf0e4..09a5a70f3e1 100644 --- a/apps/sim/app/api/v1/copilot/chat/route.ts +++ b/apps/sim/app/api/v1/copilot/chat/route.ts @@ -2,7 +2,6 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { createRunSegment } from '@/lib/copilot/async-runs/repository' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models' import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' import { getWorkflowById, resolveWorkflowIdForUser } from '@/lib/workflows/utils' @@ -84,17 +83,15 @@ export async function POST(req: NextRequest) { const chatId = parsed.chatId || crypto.randomUUID() messageId = crypto.randomUUID() - logger.error( - appendCopilotLogContext('Received headless copilot chat start request', { messageId }), - { - workflowId: resolved.workflowId, - workflowName: parsed.workflowName, - chatId, - mode: transportMode, - autoExecuteTools: parsed.autoExecuteTools, - timeout: parsed.timeout, - } - ) + const reqLogger = logger.withMetadata({ messageId }) + reqLogger.info('Received headless copilot chat start request', { + workflowId: resolved.workflowId, + workflowName: parsed.workflowName, + chatId, + mode: transportMode, + autoExecuteTools: parsed.autoExecuteTools, + timeout: parsed.timeout, + }) const requestPayload = { message: parsed.message, workflowId: resolved.workflowId, @@ -144,7 +141,7 @@ export async function POST(req: NextRequest) { ) } - logger.error(appendCopilotLogContext('Headless copilot request failed', { messageId }), { + logger.withMetadata({ messageId }).error('Headless copilot request failed', { error: error instanceof Error ? error.message : String(error), }) return NextResponse.json({ success: false, error: 'Internal server error' }, { status: 500 }) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts b/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts index 355ae6ddf06..a800994c004 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts @@ -82,14 +82,16 @@ vi.mock('@/background/workflow-execution', () => ({ executeWorkflowJob: vi.fn(), })) -vi.mock('@sim/logger', () => ({ - createLogger: vi.fn().mockReturnValue({ +vi.mock('@sim/logger', () => { + const createMockLogger = (): Record => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn(), - }), -})) + withMetadata: vi.fn(() => createMockLogger()), + }) + return { createLogger: vi.fn(() => createMockLogger()) } +}) vi.mock('uuid', () => ({ validate: vi.fn().mockReturnValue(true), diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index df3fc41d434..86522989004 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -187,6 +187,13 @@ type AsyncExecutionParams = { async function handleAsyncExecution(params: AsyncExecutionParams): Promise { const { requestId, workflowId, userId, workspaceId, input, triggerType, executionId, callChain } = params + const asyncLogger = logger.withMetadata({ + requestId, + workflowId, + workspaceId, + userId, + executionId, + }) const correlation = { executionId, @@ -233,10 +240,7 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise { const requestId = generateRequestId() const { id: workflowId } = await params + let reqLogger = logger.withMetadata({ requestId, workflowId }) const incomingCallChain = parseCallChain(req.headers.get(SIM_VIA_HEADER)) const callChainError = validateCallChain(incomingCallChain) if (callChainError) { - logger.warn(`[${requestId}] Call chain rejected for workflow ${workflowId}: ${callChainError}`) + reqLogger.warn(`Call chain rejected: ${callChainError}`) return NextResponse.json({ error: callChainError }, { status: 409 }) } const callChain = buildNextCallChain(incomingCallChain, workflowId) @@ -414,12 +419,12 @@ async function handleExecutePost( body = JSON.parse(text) } } catch (error) { - logger.warn(`[${requestId}] Failed to parse request body, using defaults`) + reqLogger.warn('Failed to parse request body, using defaults') } const validation = ExecuteWorkflowSchema.safeParse(body) if (!validation.success) { - logger.warn(`[${requestId}] Invalid request body:`, validation.error.errors) + reqLogger.warn('Invalid request body:', validation.error.errors) return NextResponse.json( { error: 'Invalid request body', @@ -589,9 +594,10 @@ async function handleExecutePost( ) } - logger.info(`[${requestId}] Starting server-side execution`, { - workflowId, - userId, + const executionId = uuidv4() + reqLogger = reqLogger.withMetadata({ userId, executionId }) + + reqLogger.info('Starting server-side execution', { hasInput: !!input, triggerType, authType: auth.authType, @@ -600,8 +606,6 @@ async function handleExecutePost( enableSSE, isAsyncMode, }) - - const executionId = uuidv4() let loggingTriggerType: CoreTriggerType = 'manual' if (CORE_TRIGGER_TYPES.includes(triggerType as CoreTriggerType)) { loggingTriggerType = triggerType as CoreTriggerType @@ -657,10 +661,11 @@ async function handleExecutePost( const workflow = preprocessResult.workflowRecord! if (!workflow.workspaceId) { - logger.error(`[${requestId}] Workflow ${workflowId} has no workspaceId`) + reqLogger.error('Workflow has no workspaceId') return NextResponse.json({ error: 'Workflow has no associated workspace' }, { status: 500 }) } const workspaceId = workflow.workspaceId + reqLogger = reqLogger.withMetadata({ workspaceId, userId: actorUserId }) if (auth.apiKeyType === 'workspace' && auth.workspaceId !== workspaceId) { return NextResponse.json( @@ -669,11 +674,7 @@ async function handleExecutePost( ) } - logger.info(`[${requestId}] Preprocessing passed`, { - workflowId, - actorUserId, - workspaceId, - }) + reqLogger.info('Preprocessing passed') if (isAsyncMode) { return handleAsyncExecution({ @@ -744,7 +745,7 @@ async function handleExecutePost( ) } } catch (fileError) { - logger.error(`[${requestId}] Failed to process input file fields:`, fileError) + reqLogger.error('Failed to process input file fields:', fileError) await loggingSession.safeStart({ userId: actorUserId, @@ -772,7 +773,7 @@ async function handleExecutePost( sanitizedWorkflowStateOverride || cachedWorkflowData || undefined if (!enableSSE) { - logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`) + reqLogger.info('Using non-SSE execution (direct JSON response)') const metadata: ExecutionMetadata = { requestId, executionId, @@ -866,7 +867,7 @@ async function handleExecutePost( const errorMessage = error instanceof Error ? error.message : 'Unknown error' - logger.error(`[${requestId}] Queued non-SSE execution failed: ${errorMessage}`) + reqLogger.error(`Queued non-SSE execution failed: ${errorMessage}`) return NextResponse.json( { @@ -908,7 +909,7 @@ async function handleExecutePost( timeoutController.timeoutMs ) { const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) - logger.info(`[${requestId}] Non-SSE execution timed out`, { + reqLogger.info('Non-SSE execution timed out', { timeoutMs: timeoutController.timeoutMs, }) await loggingSession.markAsFailed(timeoutErrorMessage) @@ -962,7 +963,7 @@ async function handleExecutePost( } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : 'Unknown error' - logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`) + reqLogger.error(`Non-SSE execution failed: ${errorMessage}`) const executionResult = hasExecutionResult(error) ? error.executionResult : undefined @@ -985,7 +986,7 @@ async function handleExecutePost( timeoutController.cleanup() if (executionId) { void cleanupExecutionBase64Cache(executionId).catch((error) => { - logger.error(`[${requestId}] Failed to cleanup base64 cache`, { error }) + reqLogger.error('Failed to cleanup base64 cache', { error }) }) } } @@ -1039,9 +1040,9 @@ async function handleExecutePost( }) } - logger.info(`[${requestId}] Using SSE console log streaming (manual execution)`) + reqLogger.info('Using SSE console log streaming (manual execution)') } else { - logger.info(`[${requestId}] Using streaming API response`) + reqLogger.info('Using streaming API response') const resolvedSelectedOutputs = resolveOutputIds( selectedOutputs, @@ -1135,7 +1136,7 @@ async function handleExecutePost( iterationContext?: IterationContext, childWorkflowContext?: ChildWorkflowContext ) => { - logger.info(`[${requestId}] 🔷 onBlockStart called:`, { blockId, blockName, blockType }) + reqLogger.info('onBlockStart called', { blockId, blockName, blockType }) sendEvent({ type: 'block:started', timestamp: new Date().toISOString(), @@ -1184,7 +1185,7 @@ async function handleExecutePost( : {} if (hasError) { - logger.info(`[${requestId}] ✗ onBlockComplete (error) called:`, { + reqLogger.info('onBlockComplete (error) called', { blockId, blockName, blockType, @@ -1219,7 +1220,7 @@ async function handleExecutePost( }, }) } else { - logger.info(`[${requestId}] ✓ onBlockComplete called:`, { + reqLogger.info('onBlockComplete called', { blockId, blockName, blockType, @@ -1284,7 +1285,7 @@ async function handleExecutePost( data: { blockId }, }) } catch (error) { - logger.error(`[${requestId}] Error streaming block content:`, error) + reqLogger.error('Error streaming block content:', error) } finally { try { await reader.cancel().catch(() => {}) @@ -1360,9 +1361,7 @@ async function handleExecutePost( if (result.status === 'paused') { if (!result.snapshotSeed) { - logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { - executionId, - }) + reqLogger.error('Missing snapshot seed for paused execution') await loggingSession.markAsFailed('Missing snapshot seed for paused execution') } else { try { @@ -1374,8 +1373,7 @@ async function handleExecutePost( executorUserId: result.metadata?.userId, }) } catch (pauseError) { - logger.error(`[${requestId}] Failed to persist pause result`, { - executionId, + reqLogger.error('Failed to persist pause result', { error: pauseError instanceof Error ? pauseError.message : String(pauseError), }) await loggingSession.markAsFailed( @@ -1390,7 +1388,7 @@ async function handleExecutePost( if (result.status === 'cancelled') { if (timeoutController.isTimedOut() && timeoutController.timeoutMs) { const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) - logger.info(`[${requestId}] Workflow execution timed out`, { + reqLogger.info('Workflow execution timed out', { timeoutMs: timeoutController.timeoutMs, }) @@ -1408,7 +1406,7 @@ async function handleExecutePost( }) finalMetaStatus = 'error' } else { - logger.info(`[${requestId}] Workflow execution was cancelled`) + reqLogger.info('Workflow execution was cancelled') sendEvent({ type: 'execution:cancelled', @@ -1452,7 +1450,7 @@ async function handleExecutePost( ? error.message : 'Unknown error' - logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`, { isTimeout }) + reqLogger.error(`SSE execution failed: ${errorMessage}`, { isTimeout }) const executionResult = hasExecutionResult(error) ? error.executionResult : undefined @@ -1475,7 +1473,7 @@ async function handleExecutePost( try { await eventWriter.close() } catch (closeError) { - logger.warn(`[${requestId}] Failed to close event writer`, { + reqLogger.warn('Failed to close event writer', { error: closeError instanceof Error ? closeError.message : String(closeError), }) } @@ -1496,7 +1494,7 @@ async function handleExecutePost( }, cancel() { isStreamClosed = true - logger.info(`[${requestId}] Client disconnected from SSE stream`) + reqLogger.info('Client disconnected from SSE stream') }, }) @@ -1518,7 +1516,7 @@ async function handleExecutePost( ) } - logger.error(`[${requestId}] Failed to start workflow execution:`, error) + reqLogger.error('Failed to start workflow execution:', error) return NextResponse.json( { error: error.message || 'Failed to start workflow execution' }, { status: 500 } diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 5680ee4e3cf..5044eab5639 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -1,4 +1,4 @@ -import { createLogger } from '@sim/logger' +import { createLogger, type Logger } from '@sim/logger' import { redactApiKeys } from '@/lib/core/security/redaction' import { getBaseUrl } from '@/lib/core/utils/urls' import { @@ -49,12 +49,22 @@ import { SYSTEM_SUBBLOCK_IDS } from '@/triggers/constants' const logger = createLogger('BlockExecutor') export class BlockExecutor { + private execLogger: Logger + constructor( private blockHandlers: BlockHandler[], private resolver: VariableResolver, private contextExtensions: ContextExtensions, private state: BlockStateWriter - ) {} + ) { + this.execLogger = logger.withMetadata({ + workflowId: this.contextExtensions.metadata?.workflowId, + workspaceId: this.contextExtensions.workspaceId, + executionId: this.contextExtensions.executionId, + userId: this.contextExtensions.userId, + requestId: this.contextExtensions.metadata?.requestId, + }) + } async execute( ctx: ExecutionContext, @@ -273,7 +283,7 @@ export class BlockExecutor { } } - logger.error( + this.execLogger.error( phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed', { blockId: node.id, @@ -306,7 +316,7 @@ export class BlockExecutor { if (blockLog) { blockLog.errorHandled = true } - logger.info('Block has error port - returning error output instead of throwing', { + this.execLogger.info('Block has error port - returning error output instead of throwing', { blockId: node.id, error: errorMessage, }) @@ -358,7 +368,7 @@ export class BlockExecutor { blockName = `${blockName} (iteration ${loopScope.iteration})` iterationIndex = loopScope.iteration } else { - logger.warn('Loop scope not found for block', { blockId, loopId }) + this.execLogger.warn('Loop scope not found for block', { blockId, loopId }) } } } @@ -462,7 +472,7 @@ export class BlockExecutor { ctx.childWorkflowContext ) } catch (error) { - logger.warn('Block start callback failed', { + this.execLogger.warn('Block start callback failed', { blockId, blockType, error: error instanceof Error ? error.message : String(error), @@ -508,7 +518,7 @@ export class BlockExecutor { ctx.childWorkflowContext ) } catch (error) { - logger.warn('Block completion callback failed', { + this.execLogger.warn('Block completion callback failed', { blockId, blockType, error: error instanceof Error ? error.message : String(error), @@ -633,7 +643,7 @@ export class BlockExecutor { try { await ctx.onStream?.(clientStreamingExec) } catch (error) { - logger.error('Error in onStream callback', { blockId, error }) + this.execLogger.error('Error in onStream callback', { blockId, error }) // Cancel the client stream to release the tee'd buffer await processedClientStream.cancel().catch(() => {}) } @@ -663,7 +673,7 @@ export class BlockExecutor { stream: processedStream, }) } catch (error) { - logger.error('Error in onStream callback', { blockId, error }) + this.execLogger.error('Error in onStream callback', { blockId, error }) await processedStream.cancel().catch(() => {}) } } @@ -687,7 +697,7 @@ export class BlockExecutor { const tail = decoder.decode() if (tail) chunks.push(tail) } catch (error) { - logger.error('Error reading executor stream for block', { blockId, error }) + this.execLogger.error('Error reading executor stream for block', { blockId, error }) } finally { try { await reader.cancel().catch(() => {}) @@ -718,7 +728,10 @@ export class BlockExecutor { } return } catch (error) { - logger.warn('Failed to parse streamed content for response format', { blockId, error }) + this.execLogger.warn('Failed to parse streamed content for response format', { + blockId, + error, + }) } } diff --git a/apps/sim/executor/execution/engine.ts b/apps/sim/executor/execution/engine.ts index 2f479125282..a420c5df7dd 100644 --- a/apps/sim/executor/execution/engine.ts +++ b/apps/sim/executor/execution/engine.ts @@ -1,4 +1,4 @@ -import { createLogger } from '@sim/logger' +import { createLogger, type Logger } from '@sim/logger' import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation' import { BlockType } from '@/executor/constants' import type { DAG } from '@/executor/dag/builder' @@ -34,6 +34,7 @@ export class ExecutionEngine { private readonly CANCELLATION_CHECK_INTERVAL_MS = 500 private abortPromise: Promise | null = null private abortResolve: (() => void) | null = null + private execLogger: Logger constructor( private context: ExecutionContext, @@ -43,6 +44,13 @@ export class ExecutionEngine { ) { this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true this.useRedisCancellation = isRedisCancellationEnabled() && !!this.context.executionId + this.execLogger = logger.withMetadata({ + workflowId: this.context.workflowId, + workspaceId: this.context.workspaceId, + executionId: this.context.executionId, + userId: this.context.userId, + requestId: this.context.metadata.requestId, + }) this.initializeAbortHandler() } @@ -88,7 +96,9 @@ export class ExecutionEngine { const cancelled = await isExecutionCancelled(this.context.executionId!) if (cancelled) { this.cancelledFlag = true - logger.info('Execution cancelled via Redis', { executionId: this.context.executionId }) + this.execLogger.info('Execution cancelled via Redis', { + executionId: this.context.executionId, + }) } return cancelled } @@ -169,7 +179,7 @@ export class ExecutionEngine { this.finalizeIncompleteLogs() const errorMessage = normalizeError(error) - logger.error('Execution failed', { error: errorMessage }) + this.execLogger.error('Execution failed', { error: errorMessage }) const executionResult: ExecutionResult = { success: false, @@ -270,7 +280,7 @@ export class ExecutionEngine { private initializeQueue(triggerBlockId?: string): void { if (this.context.runFromBlockContext) { const { startBlockId } = this.context.runFromBlockContext - logger.info('Initializing queue for run-from-block mode', { + this.execLogger.info('Initializing queue for run-from-block mode', { startBlockId, dirtySetSize: this.context.runFromBlockContext.dirtySet.size, }) @@ -282,7 +292,7 @@ export class ExecutionEngine { const remainingEdges = (this.context.metadata as any).remainingEdges if (remainingEdges && Array.isArray(remainingEdges) && remainingEdges.length > 0) { - logger.info('Removing edges from resumed pause blocks', { + this.execLogger.info('Removing edges from resumed pause blocks', { edgeCount: remainingEdges.length, edges: remainingEdges, }) @@ -294,13 +304,13 @@ export class ExecutionEngine { targetNode.incomingEdges.delete(edge.source) if (this.edgeManager.isNodeReady(targetNode)) { - logger.info('Node became ready after edge removal', { nodeId: targetNode.id }) + this.execLogger.info('Node became ready after edge removal', { nodeId: targetNode.id }) this.addToQueue(targetNode.id) } } } - logger.info('Edge removal complete, queued ready nodes', { + this.execLogger.info('Edge removal complete, queued ready nodes', { queueLength: this.readyQueue.length, queuedNodes: this.readyQueue, }) @@ -309,7 +319,7 @@ export class ExecutionEngine { } if (pendingBlocks && pendingBlocks.length > 0) { - logger.info('Initializing queue from pending blocks (resume mode)', { + this.execLogger.info('Initializing queue from pending blocks (resume mode)', { pendingBlocks, allowResumeTriggers: this.allowResumeTriggers, dagNodeCount: this.dag.nodes.size, @@ -319,7 +329,7 @@ export class ExecutionEngine { this.addToQueue(nodeId) } - logger.info('Pending blocks queued', { + this.execLogger.info('Pending blocks queued', { queueLength: this.readyQueue.length, queuedNodes: this.readyQueue, }) @@ -341,7 +351,7 @@ export class ExecutionEngine { if (startNode) { this.addToQueue(startNode.id) } else { - logger.warn('No start node found in DAG') + this.execLogger.warn('No start node found in DAG') } } @@ -373,7 +383,7 @@ export class ExecutionEngine { } } catch (error) { const errorMessage = normalizeError(error) - logger.error('Node execution failed', { nodeId, error: errorMessage }) + this.execLogger.error('Node execution failed', { nodeId, error: errorMessage }) throw error } } @@ -385,7 +395,7 @@ export class ExecutionEngine { ): Promise { const node = this.dag.nodes.get(nodeId) if (!node) { - logger.error('Node not found during completion', { nodeId }) + this.execLogger.error('Node not found during completion', { nodeId }) return } @@ -409,7 +419,7 @@ export class ExecutionEngine { // shouldContinue: true means more iterations, shouldExit: true means loop is done const shouldContinueLoop = output.shouldContinue === true if (!shouldContinueLoop) { - logger.info('Stopping execution after target block', { nodeId }) + this.execLogger.info('Stopping execution after target block', { nodeId }) this.stoppedEarlyFlag = true return } @@ -417,7 +427,7 @@ export class ExecutionEngine { const readyNodes = this.edgeManager.processOutgoingEdges(node, output, false) - logger.info('Processing outgoing edges', { + this.execLogger.info('Processing outgoing edges', { nodeId, outgoingEdgesCount: node.outgoingEdges.size, outgoingEdges: Array.from(node.outgoingEdges.entries()).map(([id, e]) => ({ @@ -435,7 +445,7 @@ export class ExecutionEngine { if (this.context.pendingDynamicNodes && this.context.pendingDynamicNodes.length > 0) { const dynamicNodes = this.context.pendingDynamicNodes this.context.pendingDynamicNodes = [] - logger.info('Adding dynamically expanded parallel nodes', { dynamicNodes }) + this.execLogger.info('Adding dynamically expanded parallel nodes', { dynamicNodes }) this.addMultipleToQueue(dynamicNodes) } } @@ -482,7 +492,7 @@ export class ExecutionEngine { } return parsedSnapshot.state } catch (error) { - logger.warn('Failed to serialize execution state', { + this.execLogger.warn('Failed to serialize execution state', { error: error instanceof Error ? error.message : String(error), }) return undefined diff --git a/apps/sim/executor/execution/executor.ts b/apps/sim/executor/execution/executor.ts index 67d3b4c24b8..8e3a8c8c8c9 100644 --- a/apps/sim/executor/execution/executor.ts +++ b/apps/sim/executor/execution/executor.ts @@ -1,4 +1,4 @@ -import { createLogger } from '@sim/logger' +import { createLogger, type Logger } from '@sim/logger' import { StartBlockPath } from '@/lib/workflows/triggers/triggers' import type { DAG } from '@/executor/dag/builder' import { DAGBuilder } from '@/executor/dag/builder' @@ -52,6 +52,7 @@ export class DAGExecutor { private workflowVariables: Record private contextExtensions: ContextExtensions private dagBuilder: DAGBuilder + private execLogger: Logger constructor(options: DAGExecutorOptions) { this.workflow = options.workflow @@ -60,6 +61,13 @@ export class DAGExecutor { this.workflowVariables = options.workflowVariables ?? {} this.contextExtensions = options.contextExtensions ?? {} this.dagBuilder = new DAGBuilder() + this.execLogger = logger.withMetadata({ + workflowId: this.contextExtensions.metadata?.workflowId, + workspaceId: this.contextExtensions.workspaceId, + executionId: this.contextExtensions.executionId, + userId: this.contextExtensions.userId, + requestId: this.contextExtensions.metadata?.requestId, + }) } async execute(workflowId: string, triggerBlockId?: string): Promise { @@ -79,7 +87,9 @@ export class DAGExecutor { _pendingBlocks: string[], context: ExecutionContext ): Promise { - logger.warn('Debug mode (continueExecution) is not yet implemented in the refactored executor') + this.execLogger.warn( + 'Debug mode (continueExecution) is not yet implemented in the refactored executor' + ) return { success: false, output: {}, @@ -163,7 +173,7 @@ export class DAGExecutor { parallelExecutions: filteredParallelExecutions, } - logger.info('Executing from block', { + this.execLogger.info('Executing from block', { workflowId, startBlockId, effectiveStartBlockId, @@ -247,7 +257,7 @@ export class DAGExecutor { if (overrides?.runFromBlockContext) { const { dirtySet } = overrides.runFromBlockContext executedBlocks = new Set([...executedBlocks].filter((id) => !dirtySet.has(id))) - logger.info('Cleared executed status for dirty blocks', { + this.execLogger.info('Cleared executed status for dirty blocks', { dirtySetSize: dirtySet.size, remainingExecutedBlocks: executedBlocks.size, }) @@ -332,7 +342,7 @@ export class DAGExecutor { if (this.contextExtensions.resumeFromSnapshot) { context.metadata.resumeFromSnapshot = true - logger.info('Resume from snapshot enabled', { + this.execLogger.info('Resume from snapshot enabled', { resumePendingQueue: this.contextExtensions.resumePendingQueue, remainingEdges: this.contextExtensions.remainingEdges, triggerBlockId, @@ -341,14 +351,14 @@ export class DAGExecutor { if (this.contextExtensions.remainingEdges) { ;(context.metadata as any).remainingEdges = this.contextExtensions.remainingEdges - logger.info('Set remaining edges for resume', { + this.execLogger.info('Set remaining edges for resume', { edgeCount: this.contextExtensions.remainingEdges.length, }) } if (this.contextExtensions.resumePendingQueue?.length) { context.metadata.pendingBlocks = [...this.contextExtensions.resumePendingQueue] - logger.info('Set pending blocks from resume queue', { + this.execLogger.info('Set pending blocks from resume queue', { pendingBlocks: context.metadata.pendingBlocks, skipStarterBlockInit: true, }) @@ -409,7 +419,7 @@ export class DAGExecutor { if (triggerBlockId) { const triggerBlock = this.workflow.blocks.find((b) => b.id === triggerBlockId) if (!triggerBlock) { - logger.error('Specified trigger block not found in workflow', { + this.execLogger.error('Specified trigger block not found in workflow', { triggerBlockId, }) throw new Error(`Trigger block not found: ${triggerBlockId}`) @@ -431,7 +441,7 @@ export class DAGExecutor { }) if (!startResolution?.block) { - logger.warn('No start block found in workflow') + this.execLogger.warn('No start block found in workflow') return } } diff --git a/apps/sim/lib/copilot/chat-payload.test.ts b/apps/sim/lib/copilot/chat-payload.test.ts index 1447e32882f..0c7b187e7fd 100644 --- a/apps/sim/lib/copilot/chat-payload.test.ts +++ b/apps/sim/lib/copilot/chat-payload.test.ts @@ -3,13 +3,15 @@ */ import { beforeEach, describe, expect, it, vi } from 'vitest' -vi.mock('@sim/logger', () => ({ - createLogger: vi.fn(() => ({ +vi.mock('@sim/logger', () => { + const createMockLogger = (): Record => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn(), - })), -})) + withMetadata: vi.fn(() => createMockLogger()), + }) + return { createLogger: vi.fn(() => createMockLogger()) } +}) vi.mock('@/lib/billing/core/subscription', () => ({ getUserSubscriptionState: vi.fn(), diff --git a/apps/sim/lib/copilot/chat-payload.ts b/apps/sim/lib/copilot/chat-payload.ts index 81731cf9dff..69b1d342f17 100644 --- a/apps/sim/lib/copilot/chat-payload.ts +++ b/apps/sim/lib/copilot/chat-payload.ts @@ -1,6 +1,5 @@ import { createLogger } from '@sim/logger' import { getUserSubscriptionState } from '@/lib/billing/core/subscription' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import { getCopilotToolDescription } from '@/lib/copilot/tool-descriptions' import { isHosted } from '@/lib/core/config/feature-flags' import { createMcpToolId } from '@/lib/mcp/utils' @@ -50,6 +49,7 @@ export async function buildIntegrationToolSchemas( userId: string, messageId?: string ): Promise { + const reqLogger = logger.withMetadata({ messageId }) const integrationTools: ToolSchema[] = [] try { const { createUserToolSchema } = await import('@/tools/params') @@ -60,15 +60,10 @@ export async function buildIntegrationToolSchemas( const subscriptionState = await getUserSubscriptionState(userId) shouldAppendEmailTagline = subscriptionState.isFree } catch (error) { - logger.warn( - appendCopilotLogContext('Failed to load subscription state for copilot tool descriptions', { - messageId, - }), - { - userId, - error: error instanceof Error ? error.message : String(error), - } - ) + reqLogger.warn('Failed to load subscription state for copilot tool descriptions', { + userId, + error: error instanceof Error ? error.message : String(error), + }) } for (const [toolId, toolConfig] of Object.entries(latestTools)) { @@ -92,17 +87,14 @@ export async function buildIntegrationToolSchemas( }), }) } catch (toolError) { - logger.warn( - appendCopilotLogContext('Failed to build schema for tool, skipping', { messageId }), - { - toolId, - error: toolError instanceof Error ? toolError.message : String(toolError), - } - ) + reqLogger.warn('Failed to build schema for tool, skipping', { + toolId, + error: toolError instanceof Error ? toolError.message : String(toolError), + }) } } } catch (error) { - logger.warn(appendCopilotLogContext('Failed to build tool schemas', { messageId }), { + reqLogger.warn('Failed to build tool schemas', { error: error instanceof Error ? error.message : String(error), }) } @@ -182,6 +174,8 @@ export async function buildCopilotRequestPayload( let integrationTools: ToolSchema[] = [] + const payloadLogger = logger.withMetadata({ messageId: userMessageId }) + if (effectiveMode === 'build') { integrationTools = await buildIntegrationToolSchemas(userId, userMessageId) @@ -201,23 +195,13 @@ export async function buildCopilotRequestPayload( }) } if (mcpTools.length > 0) { - logger.error( - appendCopilotLogContext('Added MCP tools to copilot payload', { - messageId: userMessageId, - }), - { count: mcpTools.length } - ) + payloadLogger.info('Added MCP tools to copilot payload', { count: mcpTools.length }) } } } catch (error) { - logger.warn( - appendCopilotLogContext('Failed to discover MCP tools for copilot', { - messageId: userMessageId, - }), - { - error: error instanceof Error ? error.message : String(error), - } - ) + payloadLogger.warn('Failed to discover MCP tools for copilot', { + error: error instanceof Error ? error.message : String(error), + }) } } } diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts index 76ae305a9f8..5779d20f65f 100644 --- a/apps/sim/lib/copilot/chat-streaming.ts +++ b/apps/sim/lib/copilot/chat-streaming.ts @@ -4,7 +4,6 @@ import { createLogger } from '@sim/logger' import { eq } from 'drizzle-orm' import { createRunSegment, updateRunStatus } from '@/lib/copilot/async-runs/repository' import { SIM_AGENT_API_URL } from '@/lib/copilot/constants' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import type { OrchestrateStreamOptions } from '@/lib/copilot/orchestrator' import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' import { @@ -229,20 +228,17 @@ export async function requestChatTitle(params: { const payload = await response.json().catch(() => ({})) if (!response.ok) { - logger.warn( - appendCopilotLogContext('Failed to generate chat title via copilot backend', { messageId }), - { - status: response.status, - error: payload, - } - ) + logger.withMetadata({ messageId }).warn('Failed to generate chat title via copilot backend', { + status: response.status, + error: payload, + }) return null } const title = typeof payload?.title === 'string' ? payload.title.trim() : '' return title || null } catch (error) { - logger.error(appendCopilotLogContext('Error generating chat title', { messageId }), error) + logger.withMetadata({ messageId }).error('Error generating chat title', error) return null } } @@ -285,6 +281,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS } = params const messageId = typeof requestPayload.messageId === 'string' ? requestPayload.messageId : streamId + const reqLogger = logger.withMetadata({ requestId, messageId }) let eventWriter: ReturnType | null = null let clientDisconnected = false @@ -306,17 +303,11 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS if (!clientDisconnectedController.signal.aborted) { clientDisconnectedController.abort() } - logger.info( - appendCopilotLogContext('Client disconnected from live SSE stream', { - requestId, - messageId, - }), - { - streamId, - runId, - reason, - } - ) + reqLogger.info('Client disconnected from live SSE stream', { + streamId, + runId, + reason, + }) } await resetStreamBuffer(streamId) @@ -334,15 +325,9 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS provider: (requestPayload.provider as string | undefined) || null, requestContext: { requestId }, }).catch((error) => { - logger.warn( - appendCopilotLogContext('Failed to create copilot run segment', { - requestId, - messageId, - }), - { - error: error instanceof Error ? error.message : String(error), - } - ) + reqLogger.warn('Failed to create copilot run segment', { + error: error instanceof Error ? error.message : String(error), + }) }) } eventWriter = createStreamEventWriter(streamId) @@ -362,16 +347,10 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS await redis.del(getStreamAbortKey(streamId)) } } catch (error) { - logger.warn( - appendCopilotLogContext('Failed to poll distributed stream abort', { - requestId, - messageId, - }), - { - streamId, - error: error instanceof Error ? error.message : String(error), - } - ) + reqLogger.warn('Failed to poll distributed stream abort', { + streamId, + error: error instanceof Error ? error.message : String(error), + }) } })() }, STREAM_ABORT_POLL_MS) @@ -388,14 +367,11 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS await eventWriter.flush() } } catch (error) { - logger.error( - appendCopilotLogContext('Failed to persist stream event', { requestId, messageId }), - { - eventType: event.type, - eventId, - error: error instanceof Error ? error.message : String(error), - } - ) + reqLogger.error('Failed to persist stream event', { + eventType: event.type, + eventId, + error: error instanceof Error ? error.message : String(error), + }) // Keep the live SSE stream going even if durable buffering hiccups. } @@ -414,7 +390,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS try { await pushEvent(event) } catch (error) { - logger.error(appendCopilotLogContext('Failed to push event', { requestId, messageId }), { + reqLogger.error('Failed to push event', { eventType: event.type, error: error instanceof Error ? error.message : String(error), }) @@ -437,10 +413,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS } }) .catch((error) => { - logger.error( - appendCopilotLogContext('Title generation failed', { requestId, messageId }), - error - ) + reqLogger.error('Title generation failed', error) }) } @@ -467,9 +440,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS }) if (abortController.signal.aborted) { - logger.error( - appendCopilotLogContext('Stream aborted by explicit stop', { requestId, messageId }) - ) + reqLogger.info('Stream aborted by explicit stop') await eventWriter.close().catch(() => {}) await setStreamMeta(streamId, { status: 'cancelled', userId, executionId, runId }) await updateRunStatus(runId, 'cancelled', { completedAt: new Date() }).catch(() => {}) @@ -483,23 +454,14 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS 'An unexpected error occurred while processing the response.' if (clientDisconnected) { - logger.error( - appendCopilotLogContext('Stream failed after client disconnect', { - requestId, - messageId, - }), - { - error: errorMessage, - } - ) + reqLogger.info('Stream failed after client disconnect', { + error: errorMessage, + }) } - logger.error( - appendCopilotLogContext('Orchestration returned failure', { requestId, messageId }), - { - error: errorMessage, - } - ) + reqLogger.error('Orchestration returned failure', { + error: errorMessage, + }) await pushEventBestEffort({ type: 'error', error: errorMessage, @@ -526,42 +488,25 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS await setStreamMeta(streamId, { status: 'complete', userId, executionId, runId }) await updateRunStatus(runId, 'complete', { completedAt: new Date() }).catch(() => {}) if (clientDisconnected) { - logger.info( - appendCopilotLogContext('Orchestration completed after client disconnect', { - requestId, - messageId, - }), - { - streamId, - runId, - } - ) + reqLogger.info('Orchestration completed after client disconnect', { + streamId, + runId, + }) } } catch (error) { if (abortController.signal.aborted) { - logger.error( - appendCopilotLogContext('Stream aborted by explicit stop', { requestId, messageId }) - ) + reqLogger.info('Stream aborted by explicit stop') await eventWriter.close().catch(() => {}) await setStreamMeta(streamId, { status: 'cancelled', userId, executionId, runId }) await updateRunStatus(runId, 'cancelled', { completedAt: new Date() }).catch(() => {}) return } if (clientDisconnected) { - logger.error( - appendCopilotLogContext('Stream errored after client disconnect', { - requestId, - messageId, - }), - { - error: error instanceof Error ? error.message : 'Stream error', - } - ) + reqLogger.info('Stream errored after client disconnect', { + error: error instanceof Error ? error.message : 'Stream error', + }) } - logger.error( - appendCopilotLogContext('Orchestration error', { requestId, messageId }), - error - ) + reqLogger.error('Orchestration error', error) const errorMessage = error instanceof Error ? error.message : 'Stream error' await pushEventBestEffort({ type: 'error', @@ -583,7 +528,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS error: errorMessage, }).catch(() => {}) } finally { - logger.info(appendCopilotLogContext('Closing live SSE stream', { requestId, messageId }), { + reqLogger.info('Closing live SSE stream', { streamId, runId, clientDisconnected, @@ -611,16 +556,10 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS } }, cancel() { - logger.info( - appendCopilotLogContext('ReadableStream cancel received from client', { - requestId, - messageId, - }), - { - streamId, - runId, - } - ) + reqLogger.info('ReadableStream cancel received from client', { + streamId, + runId, + }) if (!clientDisconnected) { clientDisconnected = true if (!clientDisconnectedController.signal.aborted) { diff --git a/apps/sim/lib/copilot/orchestrator/index.ts b/apps/sim/lib/copilot/orchestrator/index.ts index d07553ca645..3b03aa8c982 100644 --- a/apps/sim/lib/copilot/orchestrator/index.ts +++ b/apps/sim/lib/copilot/orchestrator/index.ts @@ -14,7 +14,6 @@ import { updateRunStatus, } from '@/lib/copilot/async-runs/repository' import { SIM_AGENT_API_URL, SIM_AGENT_VERSION } from '@/lib/copilot/constants' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import { isToolAvailableOnSimSide, prepareExecutionContext, @@ -128,15 +127,11 @@ export async function orchestrateCopilotStream( messageId, }) const continuationWorkerId = `sim-resume:${crypto.randomUUID()}` - const withLogContext = (message: string) => - appendCopilotLogContext(message, { - requestId: context.requestId, - messageId, - }) + const reqLogger = logger.withMetadata({ requestId: context.requestId, messageId }) let claimedToolCallIds: string[] = [] let claimedByWorkerId: string | null = null - logger.error(withLogContext('Starting copilot orchestration'), { + reqLogger.info('Starting copilot orchestration', { goRoute, workflowId, workspaceId, @@ -155,7 +150,7 @@ export async function orchestrateCopilotStream( for (;;) { context.streamComplete = false - logger.error(withLogContext('Starting orchestration loop iteration'), { + reqLogger.info('Starting orchestration loop iteration', { route, hasPendingAsyncContinuation: Boolean(context.awaitingAsyncContinuation), claimedToolCallCount: claimedToolCallIds.length, @@ -168,7 +163,7 @@ export async function orchestrateCopilotStream( const d = (event.data ?? {}) as Record const response = (d.response ?? {}) as Record if (response.async_pause) { - logger.error(withLogContext('Detected async pause from copilot backend'), { + reqLogger.info('Detected async pause from copilot backend', { route, checkpointId: typeof (response.async_pause as Record)?.checkpointId === @@ -201,7 +196,7 @@ export async function orchestrateCopilotStream( loopOptions ) - logger.error(withLogContext('Completed orchestration loop iteration'), { + reqLogger.info('Completed orchestration loop iteration', { route, streamComplete: context.streamComplete, wasAborted: context.wasAborted, @@ -210,7 +205,7 @@ export async function orchestrateCopilotStream( }) if (claimedToolCallIds.length > 0) { - logger.error(withLogContext('Marking async tool calls as delivered'), { + reqLogger.info('Marking async tool calls as delivered', { toolCallIds: claimedToolCallIds, }) await Promise.all( @@ -223,7 +218,7 @@ export async function orchestrateCopilotStream( } if (options.abortSignal?.aborted || context.wasAborted) { - logger.error(withLogContext('Stopping orchestration because request was aborted'), { + reqLogger.info('Stopping orchestration because request was aborted', { pendingToolCallCount: Array.from(context.toolCalls.values()).filter( (toolCall) => toolCall.status === 'pending' || toolCall.status === 'executing' ).length, @@ -241,13 +236,13 @@ export async function orchestrateCopilotStream( const continuation = context.awaitingAsyncContinuation if (!continuation) { - logger.error(withLogContext('No async continuation pending; finishing orchestration')) + reqLogger.info('No async continuation pending; finishing orchestration') break } let resumeReady = false let resumeRetries = 0 - logger.error(withLogContext('Processing async continuation'), { + reqLogger.info('Processing async continuation', { checkpointId: continuation.checkpointId, runId: continuation.runId, pendingToolCallIds: continuation.pendingToolCallIds, @@ -267,26 +262,19 @@ export async function orchestrateCopilotStream( if (localPendingPromise) { localPendingPromises.push(localPendingPromise) - logger.info( - withLogContext( - 'Waiting for local async tool completion before retrying resume claim' - ), - { - toolCallId, - runId: continuation.runId, - workerId: resumeWorkerId, - } - ) + reqLogger.info('Waiting for local async tool completion before retrying resume claim', { + toolCallId, + runId: continuation.runId, + workerId: resumeWorkerId, + }) continue } if (durableRow && isTerminalAsyncStatus(durableRow.status)) { if (durableRow.claimedBy && durableRow.claimedBy !== resumeWorkerId) { missingToolCallIds.push(toolCallId) - logger.warn( - withLogContext( - 'Async tool continuation is waiting on a claim held by another worker' - ), + reqLogger.warn( + 'Async tool continuation is waiting on a claim held by another worker', { toolCallId, runId: continuation.runId, @@ -312,15 +300,12 @@ export async function orchestrateCopilotStream( isTerminalToolCallStatus(toolState.status) && !isToolAvailableOnSimSide(toolState.name) ) { - logger.info( - withLogContext('Including Go-handled tool in resume payload (no Sim-side row)'), - { - toolCallId, - toolName: toolState.name, - status: toolState.status, - runId: continuation.runId, - } - ) + reqLogger.info('Including Go-handled tool in resume payload (no Sim-side row)', { + toolCallId, + toolName: toolState.name, + status: toolState.status, + runId: continuation.runId, + }) readyTools.push({ toolCallId, toolState, @@ -330,7 +315,7 @@ export async function orchestrateCopilotStream( continue } - logger.warn(withLogContext('Skipping already-claimed or missing async tool resume'), { + reqLogger.warn('Skipping already-claimed or missing async tool resume', { toolCallId, runId: continuation.runId, durableStatus: durableRow?.status, @@ -340,13 +325,10 @@ export async function orchestrateCopilotStream( } if (localPendingPromises.length > 0) { - logger.info( - withLogContext('Waiting for local pending async tools before resuming continuation'), - { - checkpointId: continuation.checkpointId, - pendingPromiseCount: localPendingPromises.length, - } - ) + reqLogger.info('Waiting for local pending async tools before resuming continuation', { + checkpointId: continuation.checkpointId, + pendingPromiseCount: localPendingPromises.length, + }) await Promise.allSettled(localPendingPromises) continue } @@ -354,23 +336,18 @@ export async function orchestrateCopilotStream( if (missingToolCallIds.length > 0) { if (resumeRetries < 3) { resumeRetries++ - logger.info( - withLogContext('Retrying async resume after some tool calls were not yet ready'), - { - checkpointId: continuation.checkpointId, - runId: continuation.runId, - workerId: resumeWorkerId, - retry: resumeRetries, - missingToolCallIds, - } - ) + reqLogger.info('Retrying async resume after some tool calls were not yet ready', { + checkpointId: continuation.checkpointId, + runId: continuation.runId, + workerId: resumeWorkerId, + retry: resumeRetries, + missingToolCallIds, + }) await new Promise((resolve) => setTimeout(resolve, 250 * resumeRetries)) continue } - logger.error( - withLogContext( - 'Async continuation failed because pending tool calls never became ready' - ), + reqLogger.error( + 'Async continuation failed because pending tool calls never became ready', { checkpointId: continuation.checkpointId, runId: continuation.runId, @@ -385,26 +362,20 @@ export async function orchestrateCopilotStream( if (readyTools.length === 0) { if (resumeRetries < 3 && continuation.pendingToolCallIds.length > 0) { resumeRetries++ - logger.info( - withLogContext('Retrying async resume because no tool calls were ready yet'), - { - checkpointId: continuation.checkpointId, - runId: continuation.runId, - workerId: resumeWorkerId, - retry: resumeRetries, - } - ) + reqLogger.info('Retrying async resume because no tool calls were ready yet', { + checkpointId: continuation.checkpointId, + runId: continuation.runId, + workerId: resumeWorkerId, + retry: resumeRetries, + }) await new Promise((resolve) => setTimeout(resolve, 250 * resumeRetries)) continue } - logger.error( - withLogContext('Async continuation failed because no tool calls were ready'), - { - checkpointId: continuation.checkpointId, - runId: continuation.runId, - requestedToolCallIds: continuation.pendingToolCallIds, - } - ) + reqLogger.error('Async continuation failed because no tool calls were ready', { + checkpointId: continuation.checkpointId, + runId: continuation.runId, + requestedToolCallIds: continuation.pendingToolCallIds, + }) throw new Error('Failed to resume async tool continuation: no tool calls were ready') } @@ -425,16 +396,13 @@ export async function orchestrateCopilotStream( if (claimFailures.length > 0) { if (newlyClaimedToolCallIds.length > 0) { - logger.info( - withLogContext('Releasing async tool claims after claim contention during resume'), - { - checkpointId: continuation.checkpointId, - runId: continuation.runId, - workerId: resumeWorkerId, - newlyClaimedToolCallIds, - claimFailures, - } - ) + reqLogger.info('Releasing async tool claims after claim contention during resume', { + checkpointId: continuation.checkpointId, + runId: continuation.runId, + workerId: resumeWorkerId, + newlyClaimedToolCallIds, + claimFailures, + }) await Promise.all( newlyClaimedToolCallIds.map((toolCallId) => releaseCompletedAsyncToolClaim(toolCallId, resumeWorkerId).catch(() => null) @@ -443,7 +411,7 @@ export async function orchestrateCopilotStream( } if (resumeRetries < 3) { resumeRetries++ - logger.error(withLogContext('Retrying async resume after claim contention'), { + reqLogger.info('Retrying async resume after claim contention', { checkpointId: continuation.checkpointId, runId: continuation.runId, workerId: resumeWorkerId, @@ -453,14 +421,11 @@ export async function orchestrateCopilotStream( await new Promise((resolve) => setTimeout(resolve, 250 * resumeRetries)) continue } - logger.error( - withLogContext('Async continuation failed because tool claims could not be acquired'), - { - checkpointId: continuation.checkpointId, - runId: continuation.runId, - claimFailures, - } - ) + reqLogger.error('Async continuation failed because tool claims could not be acquired', { + checkpointId: continuation.checkpointId, + runId: continuation.runId, + claimFailures, + }) throw new Error( `Failed to resume async tool continuation: unable to claim tool calls (${claimFailures.join(', ')})` ) @@ -474,7 +439,7 @@ export async function orchestrateCopilotStream( ] claimedByWorkerId = claimedToolCallIds.length > 0 ? resumeWorkerId : null - logger.error(withLogContext('Resuming async tool continuation'), { + reqLogger.info('Resuming async tool continuation', { checkpointId: continuation.checkpointId, runId: continuation.runId, workerId: resumeWorkerId, @@ -514,10 +479,8 @@ export async function orchestrateCopilotStream( !isTerminalAsyncStatus(durableStatus) && !isDeliveredAsyncStatus(durableStatus) ) { - logger.warn( - withLogContext( - 'Async tool row was claimed for resume without terminal durable state' - ), + reqLogger.warn( + 'Async tool row was claimed for resume without terminal durable state', { toolCallId: tool.toolCallId, status: durableStatus, @@ -540,7 +503,7 @@ export async function orchestrateCopilotStream( checkpointId: continuation.checkpointId, results, } - logger.error(withLogContext('Prepared async continuation payload for resume endpoint'), { + reqLogger.info('Prepared async continuation payload for resume endpoint', { route, checkpointId: continuation.checkpointId, resultCount: results.length, @@ -550,7 +513,7 @@ export async function orchestrateCopilotStream( } if (!resumeReady) { - logger.warn(withLogContext('Async continuation loop exited without resume payload'), { + reqLogger.warn('Async continuation loop exited without resume payload', { checkpointId: continuation.checkpointId, runId: continuation.runId, }) @@ -569,7 +532,7 @@ export async function orchestrateCopilotStream( usage: context.usage, cost: context.cost, } - logger.error(withLogContext('Completing copilot orchestration'), { + reqLogger.info('Completing copilot orchestration', { success: result.success, chatId: result.chatId, hasRequestId: Boolean(result.requestId), @@ -581,7 +544,7 @@ export async function orchestrateCopilotStream( } catch (error) { const err = error instanceof Error ? error : new Error('Copilot orchestration failed') if (claimedToolCallIds.length > 0 && claimedByWorkerId) { - logger.warn(withLogContext('Releasing async tool claims after delivery failure'), { + reqLogger.warn('Releasing async tool claims after delivery failure', { toolCallIds: claimedToolCallIds, workerId: claimedByWorkerId, }) @@ -591,7 +554,7 @@ export async function orchestrateCopilotStream( ) ) } - logger.error(withLogContext('Copilot orchestration failed'), { + reqLogger.error('Copilot orchestration failed', { error: err.message, }) await options.onError?.(err) diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts index 3732fed983e..e3f1cd829df 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts @@ -1,7 +1,6 @@ import { createLogger } from '@sim/logger' import { upsertAsyncToolCall } from '@/lib/copilot/async-runs/repository' import { STREAM_TIMEOUT_MS } from '@/lib/copilot/constants' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import { asRecord, getEventData, @@ -83,15 +82,12 @@ function abortPendingToolIfStreamDead( }, context.messageId ).catch((err) => { - logger.error( - appendCopilotLogContext('markToolComplete fire-and-forget failed (stream aborted)', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .error('markToolComplete fire-and-forget failed (stream aborted)', { toolCallId: toolCall.id, error: err instanceof Error ? err.message : String(err), - } - ) + }) }) return true } @@ -136,15 +132,12 @@ function handleClientCompletion( { background: true }, context.messageId ).catch((err) => { - logger.error( - appendCopilotLogContext('markToolComplete fire-and-forget failed (client background)', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .error('markToolComplete fire-and-forget failed (client background)', { toolCallId: toolCall.id, error: err instanceof Error ? err.message : String(err), - } - ) + }) }) markToolResultSeen(toolCallId) return @@ -160,15 +153,12 @@ function handleClientCompletion( undefined, context.messageId ).catch((err) => { - logger.error( - appendCopilotLogContext('markToolComplete fire-and-forget failed (client rejected)', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .error('markToolComplete fire-and-forget failed (client rejected)', { toolCallId: toolCall.id, error: err instanceof Error ? err.message : String(err), - } - ) + }) }) markToolResultSeen(toolCallId) return @@ -184,15 +174,12 @@ function handleClientCompletion( completion.data, context.messageId ).catch((err) => { - logger.error( - appendCopilotLogContext('markToolComplete fire-and-forget failed (client cancelled)', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .error('markToolComplete fire-and-forget failed (client cancelled)', { toolCallId: toolCall.id, error: err instanceof Error ? err.message : String(err), - } - ) + }) }) markToolResultSeen(toolCallId) return @@ -209,16 +196,13 @@ function handleClientCompletion( completion?.data, context.messageId ).catch((err) => { - logger.error( - appendCopilotLogContext('markToolComplete fire-and-forget failed (client completion)', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .error('markToolComplete fire-and-forget failed (client completion)', { toolCallId: toolCall.id, toolName: toolCall.name, error: err instanceof Error ? err.message : String(err), - } - ) + }) }) markToolResultSeen(toolCallId) } @@ -252,16 +236,13 @@ async function emitSyntheticToolResult( error: !success ? completion?.message : undefined, } as SSEEvent) } catch (error) { - logger.warn( - appendCopilotLogContext('Failed to emit synthetic tool_result', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .warn('Failed to emit synthetic tool_result', { toolCallId, toolName, error: error instanceof Error ? error.message : String(error), - } - ) + }) } } @@ -328,17 +309,14 @@ export const sseHandlers: Record = { const rid = typeof event.data === 'string' ? event.data : undefined if (rid) { context.requestId = rid - logger.error( - appendCopilotLogContext('Mapped copilot message to Go trace ID', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .info('Mapped copilot message to Go trace ID', { goTraceId: rid, chatId: context.chatId, executionId: context.executionId, runId: context.runId, - } - ) + }) } }, title_updated: () => {}, @@ -485,29 +463,23 @@ export const sseHandlers: Record = { args, }) } catch (err) { - logger.warn( - appendCopilotLogContext('Failed to persist async tool row before execution', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .warn('Failed to persist async tool row before execution', { toolCallId, toolName, error: err instanceof Error ? err.message : String(err), - } - ) + }) } return executeToolAndReport(toolCallId, context, execContext, options) })().catch((err) => { - logger.error( - appendCopilotLogContext('Parallel tool execution failed', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .error('Parallel tool execution failed', { toolCallId, toolName, error: err instanceof Error ? err.message : String(err), - } - ) + }) return { status: 'error', message: err instanceof Error ? err.message : String(err), @@ -546,16 +518,13 @@ export const sseHandlers: Record = { args, status: 'running', }).catch((err) => { - logger.warn( - appendCopilotLogContext('Failed to persist async tool row for client-executable tool', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .warn('Failed to persist async tool row for client-executable tool', { toolCallId, toolName, error: err instanceof Error ? err.message : String(err), - } - ) + }) }) const clientWaitSignal = buildClientToolAbortSignal(options) const completion = await waitForToolCompletion( @@ -746,29 +715,23 @@ export const subAgentHandlers: Record = { args, }) } catch (err) { - logger.warn( - appendCopilotLogContext('Failed to persist async subagent tool row before execution', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .warn('Failed to persist async subagent tool row before execution', { toolCallId, toolName, error: err instanceof Error ? err.message : String(err), - } - ) + }) } return executeToolAndReport(toolCallId, context, execContext, options) })().catch((err) => { - logger.error( - appendCopilotLogContext('Parallel subagent tool execution failed', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .error('Parallel subagent tool execution failed', { toolCallId, toolName, error: err instanceof Error ? err.message : String(err), - } - ) + }) return { status: 'error', message: err instanceof Error ? err.message : String(err), @@ -802,17 +765,13 @@ export const subAgentHandlers: Record = { args, status: 'running', }).catch((err) => { - logger.warn( - appendCopilotLogContext( - 'Failed to persist async tool row for client-executable subagent tool', - { messageId: context.messageId } - ), - { + logger + .withMetadata({ messageId: context.messageId }) + .warn('Failed to persist async tool row for client-executable subagent tool', { toolCallId, toolName, error: err instanceof Error ? err.message : String(err), - } - ) + }) }) const clientWaitSignal = buildClientToolAbortSignal(options) const completion = await waitForToolCompletion( @@ -881,15 +840,12 @@ export const subAgentHandlers: Record = { export function handleSubagentRouting(event: SSEEvent, context: StreamingContext): boolean { if (!event.subagent) return false if (!context.subAgentParentToolCallId) { - logger.warn( - appendCopilotLogContext('Subagent event missing parent tool call', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .warn('Subagent event missing parent tool call', { type: event.type, subagent: event.subagent, - } - ) + }) return false } return true diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts index 445075ef992..e0296f8b525 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts @@ -3,7 +3,6 @@ import { userTableRows } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { eq } from 'drizzle-orm' import { completeAsyncToolCall, markAsyncToolRunning } from '@/lib/copilot/async-runs/repository' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import { waitForToolConfirmation } from '@/lib/copilot/orchestrator/persistence' import { asRecord, markToolResultSeen } from '@/lib/copilot/orchestrator/sse/utils' import { executeToolServerSide, markToolComplete } from '@/lib/copilot/orchestrator/tool-executor' @@ -187,15 +186,12 @@ async function maybeWriteOutputToFile( contentType ) - logger.error( - appendCopilotLogContext('Tool output written to file', { messageId: context.messageId }), - { - toolName, - fileName, - size: buffer.length, - fileId: uploaded.id, - } - ) + logger.withMetadata({ messageId: context.messageId }).info('Tool output written to file', { + toolName, + fileName, + size: buffer.length, + fileId: uploaded.id, + }) return { success: true, @@ -209,16 +205,13 @@ async function maybeWriteOutputToFile( } } catch (err) { const message = err instanceof Error ? err.message : String(err) - logger.warn( - appendCopilotLogContext('Failed to write tool output to file', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .warn('Failed to write tool output to file', { toolName, outputPath, error: message, - } - ) + }) return { success: false, error: `Failed to write output file: ${message}`, @@ -306,7 +299,7 @@ function reportCancelledTool( data: Record = { cancelled: true } ): void { markToolComplete(toolCall.id, toolCall.name, 499, message, data, messageId).catch((err) => { - logger.error(appendCopilotLogContext('markToolComplete failed (cancelled)', { messageId }), { + logger.withMetadata({ messageId }).error('markToolComplete failed (cancelled)', { toolCallId: toolCall.id, toolName: toolCall.name, error: err instanceof Error ? err.message : String(err), @@ -401,14 +394,11 @@ async function maybeWriteOutputToTable( } }) - logger.error( - appendCopilotLogContext('Tool output written to table', { messageId: context.messageId }), - { - toolName, - tableId: outputTable, - rowCount: rows.length, - } - ) + logger.withMetadata({ messageId: context.messageId }).info('Tool output written to table', { + toolName, + tableId: outputTable, + rowCount: rows.length, + }) return { success: true, @@ -419,16 +409,13 @@ async function maybeWriteOutputToTable( }, } } catch (err) { - logger.warn( - appendCopilotLogContext('Failed to write tool output to table', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .warn('Failed to write tool output to table', { toolName, outputTable, error: err instanceof Error ? err.message : String(err), - } - ) + }) return { success: false, error: `Failed to write to table: ${err instanceof Error ? err.message : String(err)}`, @@ -528,16 +515,13 @@ async function maybeWriteReadCsvToTable( } }) - logger.error( - appendCopilotLogContext('Read output written to table', { messageId: context.messageId }), - { - toolName, - tableId: outputTable, - tableName: table.name, - rowCount: rows.length, - filePath, - } - ) + logger.withMetadata({ messageId: context.messageId }).info('Read output written to table', { + toolName, + tableId: outputTable, + tableName: table.name, + rowCount: rows.length, + filePath, + }) return { success: true, @@ -549,16 +533,13 @@ async function maybeWriteReadCsvToTable( }, } } catch (err) { - logger.warn( - appendCopilotLogContext('Failed to write read output to table', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .warn('Failed to write read output to table', { toolName, outputTable, error: err instanceof Error ? err.message : String(err), - } - ) + }) return { success: false, error: `Failed to import into table: ${err instanceof Error ? err.message : String(err)}`, @@ -599,14 +580,11 @@ export async function executeToolAndReport( toolCall.status = 'executing' await markAsyncToolRunning(toolCall.id, 'sim-stream').catch(() => {}) - logger.error( - appendCopilotLogContext('Tool execution started', { messageId: context.messageId }), - { - toolCallId: toolCall.id, - toolName: toolCall.name, - params: toolCall.params, - } - ) + logger.withMetadata({ messageId: context.messageId }).info('Tool execution started', { + toolCallId: toolCall.id, + toolName: toolCall.name, + params: toolCall.params, + }) try { let result = await executeToolServerSide(toolCall, execContext) @@ -693,24 +671,18 @@ export async function executeToolAndReport( : raw && typeof raw === 'object' ? JSON.stringify(raw).slice(0, 200) : undefined - logger.error( - appendCopilotLogContext('Tool execution succeeded', { messageId: context.messageId }), - { - toolCallId: toolCall.id, - toolName: toolCall.name, - outputPreview: preview, - } - ) + logger.withMetadata({ messageId: context.messageId }).info('Tool execution succeeded', { + toolCallId: toolCall.id, + toolName: toolCall.name, + outputPreview: preview, + }) } else { - logger.warn( - appendCopilotLogContext('Tool execution failed', { messageId: context.messageId }), - { - toolCallId: toolCall.id, - toolName: toolCall.name, - error: result.error, - params: toolCall.params, - } - ) + logger.withMetadata({ messageId: context.messageId }).warn('Tool execution failed', { + toolCallId: toolCall.id, + toolName: toolCall.name, + error: result.error, + params: toolCall.params, + }) } // If create_workflow was successful, update the execution context with the new workflowId. @@ -760,16 +732,13 @@ export async function executeToolAndReport( result.output, context.messageId ).catch((err) => { - logger.error( - appendCopilotLogContext('markToolComplete fire-and-forget failed', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .error('markToolComplete fire-and-forget failed', { toolCallId: toolCall.id, toolName: toolCall.name, error: err instanceof Error ? err.message : String(err), - } - ) + }) }) const resultEvent: SSEEvent = { @@ -804,15 +773,12 @@ export async function executeToolAndReport( if (deleted.length > 0) { isDeleteOp = true removeChatResources(execContext.chatId, deleted).catch((err) => { - logger.warn( - appendCopilotLogContext('Failed to remove chat resources after deletion', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .warn('Failed to remove chat resources after deletion', { chatId: execContext.chatId, error: err instanceof Error ? err.message : String(err), - } - ) + }) }) for (const resource of deleted) { @@ -835,15 +801,12 @@ export async function executeToolAndReport( if (resources.length > 0) { persistChatResources(execContext.chatId, resources).catch((err) => { - logger.warn( - appendCopilotLogContext('Failed to persist chat resources', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .warn('Failed to persist chat resources', { chatId: execContext.chatId, error: err instanceof Error ? err.message : String(err), - } - ) + }) }) for (const resource of resources) { @@ -879,15 +842,12 @@ export async function executeToolAndReport( toolCall.error = error instanceof Error ? error.message : String(error) toolCall.endTime = Date.now() - logger.error( - appendCopilotLogContext('Tool execution threw', { messageId: context.messageId }), - { - toolCallId: toolCall.id, - toolName: toolCall.name, - error: toolCall.error, - params: toolCall.params, - } - ) + logger.withMetadata({ messageId: context.messageId }).error('Tool execution threw', { + toolCallId: toolCall.id, + toolName: toolCall.name, + error: toolCall.error, + params: toolCall.params, + }) markToolResultSeen(toolCall.id) await completeAsyncToolCall({ @@ -909,16 +869,13 @@ export async function executeToolAndReport( }, context.messageId ).catch((err) => { - logger.error( - appendCopilotLogContext('markToolComplete fire-and-forget failed', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .error('markToolComplete fire-and-forget failed', { toolCallId: toolCall.id, toolName: toolCall.name, error: err instanceof Error ? err.message : String(err), - } - ) + }) }) const errorEvent: SSEEvent = { diff --git a/apps/sim/lib/copilot/orchestrator/stream/core.ts b/apps/sim/lib/copilot/orchestrator/stream/core.ts index 79f5facb47d..1dccfa2700c 100644 --- a/apps/sim/lib/copilot/orchestrator/stream/core.ts +++ b/apps/sim/lib/copilot/orchestrator/stream/core.ts @@ -2,7 +2,6 @@ import { createLogger } from '@sim/logger' import { getHighestPrioritySubscription } from '@/lib/billing/core/plan' import { isPaid } from '@/lib/billing/plan-helpers' import { ORCHESTRATION_TIMEOUT_MS } from '@/lib/copilot/constants' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import { handleSubagentRouting, sseHandlers, @@ -165,13 +164,10 @@ export async function runStreamLoop( try { await options.onEvent?.(normalizedEvent) } catch (error) { - logger.warn( - appendCopilotLogContext('Failed to forward SSE event', { messageId: context.messageId }), - { - type: normalizedEvent.type, - error: error instanceof Error ? error.message : String(error), - } - ) + logger.withMetadata({ messageId: context.messageId }).warn('Failed to forward SSE event', { + type: normalizedEvent.type, + error: error instanceof Error ? error.message : String(error), + }) } // Let the caller intercept before standard dispatch. @@ -205,11 +201,9 @@ export async function runStreamLoop( if (context.subAgentParentStack.length > 0) { context.subAgentParentStack.pop() } else { - logger.warn( - appendCopilotLogContext('subagent_end without matching subagent_start', { - messageId: context.messageId, - }) - ) + logger + .withMetadata({ messageId: context.messageId }) + .warn('subagent_end without matching subagent_start') } context.subAgentParentToolCallId = context.subAgentParentStack.length > 0 diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts b/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts index c4fb11fac3e..e824f25a064 100644 --- a/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts +++ b/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts @@ -3,7 +3,6 @@ import { credential, mcpServers, pendingCredentialDraft, user } from '@sim/db/sc import { createLogger } from '@sim/logger' import { and, eq, isNull, lt } from 'drizzle-orm' import { SIM_AGENT_API_URL } from '@/lib/copilot/constants' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import type { ExecutionContext, ToolCallResult, @@ -322,17 +321,14 @@ async function executeManageCustomTool( error: `Unsupported operation for manage_custom_tool: ${operation}`, } } catch (error) { - logger.error( - appendCopilotLogContext('manage_custom_tool execution failed', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .error('manage_custom_tool execution failed', { operation, workspaceId, userId: context.userId, error: error instanceof Error ? error.message : String(error), - } - ) + }) return { success: false, error: error instanceof Error ? error.message : 'Failed to manage custom tool', @@ -559,16 +555,13 @@ async function executeManageMcpTool( return { success: false, error: `Unsupported operation for manage_mcp_tool: ${operation}` } } catch (error) { - logger.error( - appendCopilotLogContext('manage_mcp_tool execution failed', { - messageId: context.messageId, - }), - { + logger + .withMetadata({ messageId: context.messageId }) + .error('manage_mcp_tool execution failed', { operation, workspaceId, error: error instanceof Error ? error.message : String(error), - } - ) + }) return { success: false, error: error instanceof Error ? error.message : 'Failed to manage MCP server', @@ -727,16 +720,11 @@ async function executeManageSkill( return { success: false, error: `Unsupported operation for manage_skill: ${operation}` } } catch (error) { - logger.error( - appendCopilotLogContext('manage_skill execution failed', { - messageId: context.messageId, - }), - { - operation, - workspaceId, - error: error instanceof Error ? error.message : String(error), - } - ) + logger.withMetadata({ messageId: context.messageId }).error('manage_skill execution failed', { + operation, + workspaceId, + error: error instanceof Error ? error.message : String(error), + }) return { success: false, error: error instanceof Error ? error.message : 'Failed to manage skill', @@ -1007,15 +995,12 @@ const SIM_WORKFLOW_TOOL_HANDLERS: Record< }, } } catch (err) { - logger.warn( - appendCopilotLogContext('Failed to generate OAuth link, falling back to generic URL', { - messageId: c.messageId, - }), - { + logger + .withMetadata({ messageId: c.messageId }) + .warn('Failed to generate OAuth link, falling back to generic URL', { providerName, error: err instanceof Error ? err.message : String(err), - } - ) + }) const workspaceUrl = c.workspaceId ? `${baseUrl}/workspace/${c.workspaceId}` : `${baseUrl}/workspace` @@ -1199,12 +1184,9 @@ export async function executeToolServerSide( const toolConfig = getTool(resolvedToolName) if (!toolConfig) { - logger.warn( - appendCopilotLogContext('Tool not found in registry', { - messageId: context.messageId, - }), - { toolName, resolvedToolName } - ) + logger + .withMetadata({ messageId: context.messageId }) + .warn('Tool not found in registry', { toolName, resolvedToolName }) return { success: false, error: `Tool not found: ${toolName}`, @@ -1293,15 +1275,10 @@ async function executeServerToolDirect( return { success: true, output: result } } catch (error) { - logger.error( - appendCopilotLogContext('Server tool execution failed', { - messageId: context.messageId, - }), - { - toolName, - error: error instanceof Error ? error.message : String(error), - } - ) + logger.withMetadata({ messageId: context.messageId }).error('Server tool execution failed', { + toolName, + error: error instanceof Error ? error.message : String(error), + }) return { success: false, error: error instanceof Error ? error.message : 'Server tool execution failed', @@ -1377,7 +1354,7 @@ export async function markToolComplete( }) if (!response.ok) { - logger.warn(appendCopilotLogContext('Mark-complete call failed', { messageId }), { + logger.withMetadata({ messageId }).warn('Mark-complete call failed', { toolCallId, toolName, status: response.status, @@ -1391,7 +1368,7 @@ export async function markToolComplete( } } catch (error) { const isTimeout = error instanceof DOMException && error.name === 'AbortError' - logger.error(appendCopilotLogContext('Mark-complete call failed', { messageId }), { + logger.withMetadata({ messageId }).error('Mark-complete call failed', { toolCallId, toolName, timedOut: isTimeout, diff --git a/apps/sim/lib/copilot/tools/server/files/download-to-workspace-file.ts b/apps/sim/lib/copilot/tools/server/files/download-to-workspace-file.ts index fbae8b4dcc0..845aac805b6 100644 --- a/apps/sim/lib/copilot/tools/server/files/download-to-workspace-file.ts +++ b/apps/sim/lib/copilot/tools/server/files/download-to-workspace-file.ts @@ -1,6 +1,5 @@ import { createLogger } from '@sim/logger' import { z } from 'zod' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import { assertServerToolNotAborted, type BaseServerTool, @@ -125,8 +124,7 @@ export const downloadToWorkspaceFileServerTool: BaseServerTool< params: DownloadToWorkspaceFileArgs, context?: ServerToolContext ): Promise { - const withMessageId = (message: string) => - appendCopilotLogContext(message, { messageId: context?.messageId }) + const reqLogger = logger.withMetadata({ messageId: context?.messageId }) if (!context?.userId) { throw new Error('Authentication required') @@ -178,7 +176,7 @@ export const downloadToWorkspaceFileServerTool: BaseServerTool< mimeType ) - logger.info(withMessageId('Downloaded remote file to workspace'), { + reqLogger.info('Downloaded remote file to workspace', { sourceUrl: params.url, fileId: uploaded.id, fileName: uploaded.name, @@ -195,7 +193,7 @@ export const downloadToWorkspaceFileServerTool: BaseServerTool< } } catch (error) { const msg = error instanceof Error ? error.message : 'Unknown error' - logger.error(withMessageId('Failed to download file to workspace'), { + reqLogger.error('Failed to download file to workspace', { url: params.url, error: msg, }) diff --git a/apps/sim/lib/copilot/tools/server/files/workspace-file.ts b/apps/sim/lib/copilot/tools/server/files/workspace-file.ts index 538ea4f52fa..76a0d1fdafe 100644 --- a/apps/sim/lib/copilot/tools/server/files/workspace-file.ts +++ b/apps/sim/lib/copilot/tools/server/files/workspace-file.ts @@ -1,5 +1,4 @@ import { createLogger } from '@sim/logger' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import { assertServerToolNotAborted, type BaseServerTool, @@ -51,11 +50,10 @@ export const workspaceFileServerTool: BaseServerTool { - const withMessageId = (message: string) => - appendCopilotLogContext(message, { messageId: context?.messageId }) + const reqLogger = logger.withMetadata({ messageId: context?.messageId }) if (!context?.userId) { - logger.error(withMessageId('Unauthorized attempt to access workspace files')) + reqLogger.error('Unauthorized attempt to access workspace files') throw new Error('Authentication required') } @@ -94,7 +92,7 @@ export const workspaceFileServerTool: BaseServerTool { - const withMessageId = (message: string) => - appendCopilotLogContext(message, { messageId: context?.messageId }) + const reqLogger = logger.withMetadata({ messageId: context?.messageId }) if (!context?.userId) { throw new Error('Authentication required') @@ -97,17 +95,17 @@ export const generateImageServerTool: BaseServerTool = { name: 'get_job_logs', async execute(rawArgs: GetJobLogsArgs, context?: ServerToolContext): Promise { - const withMessageId = (message: string) => - appendCopilotLogContext(message, { messageId: context?.messageId }) + const reqLogger = logger.withMetadata({ messageId: context?.messageId }) const { jobId, @@ -114,7 +112,7 @@ export const getJobLogsServerTool: BaseServerTool const clampedLimit = Math.min(Math.max(1, limit), 5) - logger.info(withMessageId('Fetching job logs'), { + reqLogger.info('Fetching job logs', { jobId, executionId, limit: clampedLimit, @@ -173,7 +171,7 @@ export const getJobLogsServerTool: BaseServerTool return entry }) - logger.info(withMessageId('Job logs prepared'), { + reqLogger.info('Job logs prepared', { jobId, count: entries.length, resultSizeKB: Math.round(JSON.stringify(entries).length / 1024), diff --git a/apps/sim/lib/copilot/tools/server/knowledge/knowledge-base.ts b/apps/sim/lib/copilot/tools/server/knowledge/knowledge-base.ts index 2eae12f37ef..d0115015699 100644 --- a/apps/sim/lib/copilot/tools/server/knowledge/knowledge-base.ts +++ b/apps/sim/lib/copilot/tools/server/knowledge/knowledge-base.ts @@ -3,7 +3,6 @@ import { knowledgeConnector } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq, isNull } from 'drizzle-orm' import { generateInternalToken } from '@/lib/auth/internal' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import { assertServerToolNotAborted, type BaseServerTool, @@ -48,14 +47,11 @@ export const knowledgeBaseServerTool: BaseServerTool { - const withMessageId = (message: string) => - appendCopilotLogContext(message, { messageId: context?.messageId }) + const reqLogger = logger.withMetadata({ messageId: context?.messageId }) if (!context?.userId) { - logger.error( - withMessageId( - 'Unauthorized attempt to access knowledge base - no authenticated user context' - ) + reqLogger.error( + 'Unauthorized attempt to access knowledge base - no authenticated user context' ) throw new Error('Authentication required') } @@ -105,7 +101,7 @@ export const knowledgeBaseServerTool: BaseServerTool { - logger.error(withMessageId('Background document processing failed'), { + reqLogger.error('Background document processing failed', { documentId: doc.id, error: err instanceof Error ? err.message : String(err), }) }) - logger.info(withMessageId('Workspace file added to knowledge base via copilot'), { + reqLogger.info('Workspace file added to knowledge base via copilot', { knowledgeBaseId: args.knowledgeBaseId, documentId: doc.id, fileName: fileRecord.name, @@ -352,7 +348,7 @@ export const knowledgeBaseServerTool: BaseServerTool = { name: 'user_table', async execute(params: UserTableArgs, context?: ServerToolContext): Promise { - const withMessageId = (message: string) => - appendCopilotLogContext(message, { messageId: context?.messageId }) + const reqLogger = logger.withMetadata({ messageId: context?.messageId }) if (!context?.userId) { - logger.error( - withMessageId('Unauthorized attempt to access user table - no authenticated user context') - ) + logger.error('Unauthorized attempt to access user table - no authenticated user context') throw new Error('Authentication required') } @@ -729,7 +725,7 @@ export const userTableServerTool: BaseServerTool const coerced = coerceRows(rows, columns, columnMap) const inserted = await batchInsertAll(table.id, coerced, table, workspaceId, context) - logger.info(withMessageId('Table created from file'), { + reqLogger.info('Table created from file', { tableId: table.id, fileName: file.name, columns: columns.length, @@ -805,7 +801,7 @@ export const userTableServerTool: BaseServerTool const coerced = coerceRows(rows, matchedColumns, columnMap) const inserted = await batchInsertAll(table.id, coerced, table, workspaceId, context) - logger.info(withMessageId('Rows imported from file'), { + reqLogger.info('Rows imported from file', { tableId: table.id, fileName: file.name, matchedColumns: mappedHeaders.length, @@ -1003,7 +999,7 @@ export const userTableServerTool: BaseServerTool ? error.cause.message : String(error.cause) : undefined - logger.error(withMessageId('Table operation failed'), { + reqLogger.error('Table operation failed', { operation, error: errorMessage, cause, diff --git a/apps/sim/lib/copilot/tools/server/visualization/generate-visualization.ts b/apps/sim/lib/copilot/tools/server/visualization/generate-visualization.ts index abceb9d9d50..f58c40c4a9f 100644 --- a/apps/sim/lib/copilot/tools/server/visualization/generate-visualization.ts +++ b/apps/sim/lib/copilot/tools/server/visualization/generate-visualization.ts @@ -1,5 +1,4 @@ import { createLogger } from '@sim/logger' -import { appendCopilotLogContext } from '@/lib/copilot/logging' import { assertServerToolNotAborted, type BaseServerTool, @@ -66,7 +65,7 @@ async function collectSandboxFiles( inputTables?: string[], messageId?: string ): Promise { - const withMessageId = (message: string) => appendCopilotLogContext(message, { messageId }) + const reqLogger = logger.withMetadata({ messageId }) const sandboxFiles: SandboxFile[] = [] let totalSize = 0 @@ -75,12 +74,12 @@ async function collectSandboxFiles( for (const fileRef of inputFiles) { const record = findWorkspaceFileRecord(allFiles, fileRef) if (!record) { - logger.warn(withMessageId('Sandbox input file not found'), { fileRef }) + reqLogger.warn('Sandbox input file not found', { fileRef }) continue } const ext = record.name.split('.').pop()?.toLowerCase() ?? '' if (!TEXT_EXTENSIONS.has(ext)) { - logger.warn(withMessageId('Skipping non-text sandbox input file'), { + reqLogger.warn('Skipping non-text sandbox input file', { fileId: record.id, fileName: record.name, ext, @@ -88,7 +87,7 @@ async function collectSandboxFiles( continue } if (record.size > MAX_FILE_SIZE) { - logger.warn(withMessageId('Sandbox input file exceeds size limit'), { + reqLogger.warn('Sandbox input file exceeds size limit', { fileId: record.id, fileName: record.name, size: record.size, @@ -96,9 +95,7 @@ async function collectSandboxFiles( continue } if (totalSize + record.size > MAX_TOTAL_SIZE) { - logger.warn( - withMessageId('Sandbox input total size limit reached, skipping remaining files') - ) + logger.warn('Sandbox input total size limit reached, skipping remaining files') break } const buffer = await downloadWorkspaceFile(record) @@ -119,7 +116,7 @@ async function collectSandboxFiles( for (const tableId of inputTables) { const table = await getTableById(tableId) if (!table) { - logger.warn(withMessageId('Sandbox input table not found'), { tableId }) + reqLogger.warn('Sandbox input table not found', { tableId }) continue } const { rows } = await queryRows(tableId, workspaceId, { limit: 10000 }, 'sandbox-input') @@ -134,9 +131,7 @@ async function collectSandboxFiles( } const csvContent = csvLines.join('\n') if (totalSize + csvContent.length > MAX_TOTAL_SIZE) { - logger.warn( - withMessageId('Sandbox input total size limit reached, skipping remaining tables') - ) + logger.warn('Sandbox input total size limit reached, skipping remaining tables') break } totalSize += csvContent.length @@ -157,8 +152,7 @@ export const generateVisualizationServerTool: BaseServerTool< params: VisualizationArgs, context?: ServerToolContext ): Promise { - const withMessageId = (message: string) => - appendCopilotLogContext(message, { messageId: context?.messageId }) + const reqLogger = logger.withMetadata({ messageId: context?.messageId }) if (!context?.userId) { throw new Error('Authentication required') @@ -243,7 +237,7 @@ export const generateVisualizationServerTool: BaseServerTool< imageBuffer, 'image/png' ) - logger.info(withMessageId('Chart image overwritten'), { + reqLogger.info('Chart image overwritten', { fileId: updated.id, fileName: updated.name, size: imageBuffer.length, @@ -267,7 +261,7 @@ export const generateVisualizationServerTool: BaseServerTool< 'image/png' ) - logger.info(withMessageId('Chart image saved'), { + reqLogger.info('Chart image saved', { fileId: uploaded.id, fileName: uploaded.name, size: imageBuffer.length, @@ -282,7 +276,7 @@ export const generateVisualizationServerTool: BaseServerTool< } } catch (error) { const msg = error instanceof Error ? error.message : 'Unknown error' - logger.error(withMessageId('Visualization generation failed'), { error: msg }) + reqLogger.error('Visualization generation failed', { error: msg }) return { success: false, message: `Failed to generate visualization: ${msg}` } } }, diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index b21a8c9eaad..83f97e6cd58 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -153,8 +153,9 @@ export class ExecutionLogger implements IExecutionLoggerService { workflowState, deploymentVersionId, } = params + const execLog = logger.withMetadata({ workflowId, workspaceId, executionId }) - logger.debug(`Starting workflow execution ${executionId} for workflow ${workflowId}`) + execLog.debug('Starting workflow execution') // Check if execution log already exists (idempotency check) const existingLog = await db @@ -164,9 +165,7 @@ export class ExecutionLogger implements IExecutionLoggerService { .limit(1) if (existingLog.length > 0) { - logger.debug( - `Execution log already exists for ${executionId}, skipping duplicate INSERT (idempotent)` - ) + execLog.debug('Execution log already exists, skipping duplicate INSERT (idempotent)') const snapshot = await snapshotService.getSnapshot(existingLog[0].stateSnapshotId) if (!snapshot) { throw new Error(`Snapshot ${existingLog[0].stateSnapshotId} not found for existing log`) @@ -228,7 +227,7 @@ export class ExecutionLogger implements IExecutionLoggerService { }) .returning() - logger.debug(`Created workflow log ${workflowLog.id} for execution ${executionId}`) + execLog.debug('Created workflow log', { logId: workflowLog.id }) return { workflowLog: { @@ -298,13 +297,20 @@ export class ExecutionLogger implements IExecutionLoggerService { status: statusOverride, } = params - logger.debug(`Completing workflow execution ${executionId}`, { isResume }) + let execLog = logger.withMetadata({ executionId }) + execLog.debug('Completing workflow execution', { isResume }) const [existingLog] = await db .select() .from(workflowExecutionLogs) .where(eq(workflowExecutionLogs.executionId, executionId)) .limit(1) + if (existingLog) { + execLog = execLog.withMetadata({ + workflowId: existingLog.workflowId ?? undefined, + workspaceId: existingLog.workspaceId ?? undefined, + }) + } const billingUserId = this.extractBillingUserId(existingLog?.executionData) const existingExecutionData = existingLog?.executionData as | WorkflowExecutionLog['executionData'] @@ -507,10 +513,10 @@ export class ExecutionLogger implements IExecutionLoggerService { billingUserId ) } catch {} - logger.warn('Usage threshold notification check failed (non-fatal)', { error: e }) + execLog.warn('Usage threshold notification check failed (non-fatal)', { error: e }) } - logger.debug(`Completed workflow execution ${executionId}`) + execLog.debug('Completed workflow execution') const completedLog: WorkflowExecutionLog = { id: updatedLog.id, @@ -528,10 +534,7 @@ export class ExecutionLogger implements IExecutionLoggerService { } emitWorkflowExecutionCompleted(completedLog).catch((error) => { - logger.error('Failed to emit workflow execution completed event', { - error, - executionId, - }) + execLog.error('Failed to emit workflow execution completed event', { error }) }) return completedLog @@ -608,18 +611,20 @@ export class ExecutionLogger implements IExecutionLoggerService { executionId?: string, billingUserId?: string | null ): Promise { + const statsLog = logger.withMetadata({ workflowId: workflowId ?? undefined, executionId }) + if (!isBillingEnabled) { - logger.debug('Billing is disabled, skipping user stats cost update') + statsLog.debug('Billing is disabled, skipping user stats cost update') return } if (costSummary.totalCost <= 0) { - logger.debug('No cost to update in user stats') + statsLog.debug('No cost to update in user stats') return } if (!workflowId) { - logger.debug('Workflow was deleted, skipping user stats update') + statsLog.debug('Workflow was deleted, skipping user stats update') return } @@ -631,16 +636,14 @@ export class ExecutionLogger implements IExecutionLoggerService { .limit(1) if (!workflowRecord) { - logger.error(`Workflow ${workflowId} not found for user stats update`) + statsLog.error('Workflow not found for user stats update') return } const userId = billingUserId?.trim() || null if (!userId) { - logger.error('Missing billing actor in execution context; skipping stats update', { - workflowId, + statsLog.error('Missing billing actor in execution context; skipping stats update', { trigger, - executionId, }) return } @@ -702,8 +705,7 @@ export class ExecutionLogger implements IExecutionLoggerService { // Check if user has hit overage threshold and bill incrementally await checkAndBillOverageThreshold(userId) } catch (error) { - logger.error('Error updating user stats with cost information', { - workflowId, + statsLog.error('Error updating user stats with cost information', { error, costSummary, }) diff --git a/packages/logger/src/index.test.ts b/packages/logger/src/index.test.ts index 48652a34e95..db14f191603 100644 --- a/packages/logger/src/index.test.ts +++ b/packages/logger/src/index.test.ts @@ -154,4 +154,63 @@ describe('Logger', () => { }).not.toThrow() }) }) + + describe('withMetadata', () => { + const createEnabledLogger = () => + new Logger('Test', { enabled: true, colorize: false, logLevel: LogLevel.DEBUG }) + + test('should return a new Logger instance', () => { + const logger = createEnabledLogger() + const child = logger.withMetadata({ workflowId: 'wf_1' }) + expect(child).toBeInstanceOf(Logger) + expect(child).not.toBe(logger) + }) + + test('should include metadata in log output', () => { + const child = createEnabledLogger().withMetadata({ workflowId: 'wf_1' }) + child.info('hello') + expect(consoleLogSpy).toHaveBeenCalledTimes(1) + const prefix = consoleLogSpy.mock.calls[0][0] as string + expect(prefix).toContain('{workflowId=wf_1}') + }) + + test('should not affect original logger output', () => { + const logger = createEnabledLogger() + logger.withMetadata({ workflowId: 'wf_1' }) + logger.info('hello') + const prefix = consoleLogSpy.mock.calls[0][0] as string + expect(prefix).not.toContain('workflowId') + }) + + test('should merge metadata across chained calls', () => { + const child = createEnabledLogger().withMetadata({ a: '1' }).withMetadata({ b: '2' }) + child.info('hello') + const prefix = consoleLogSpy.mock.calls[0][0] as string + expect(prefix).toContain('{a=1 b=2}') + }) + + test('should override parent metadata for same key', () => { + const child = createEnabledLogger().withMetadata({ a: '1' }).withMetadata({ a: '2' }) + child.info('hello') + const prefix = consoleLogSpy.mock.calls[0][0] as string + expect(prefix).toContain('{a=2}') + expect(prefix).not.toContain('a=1') + }) + + test('should exclude undefined values from output', () => { + const child = createEnabledLogger().withMetadata({ a: '1', b: undefined }) + child.info('hello') + const prefix = consoleLogSpy.mock.calls[0][0] as string + expect(prefix).toContain('{a=1}') + expect(prefix).not.toContain('b=') + }) + + test('should produce no metadata segment when metadata is empty', () => { + const child = createEnabledLogger().withMetadata({}) + child.info('hello') + const prefix = consoleLogSpy.mock.calls[0][0] as string + expect(prefix).not.toContain('{') + expect(prefix).not.toContain('}') + }) + }) }) diff --git a/packages/logger/src/index.ts b/packages/logger/src/index.ts index ab848051222..eb28fec3fcf 100644 --- a/packages/logger/src/index.ts +++ b/packages/logger/src/index.ts @@ -33,6 +33,12 @@ export interface LoggerConfig { enabled?: boolean } +/** + * Metadata key-value pairs attached to a logger instance. + * Included automatically in every log line produced by that logger. + */ +export type LoggerMetadata = Record + const getNodeEnv = (): string => { if (typeof process !== 'undefined' && process.env) { return process.env.NODE_ENV || 'development' @@ -141,6 +147,7 @@ export class Logger { private module: string private config: ReturnType private isDev: boolean + private metadata: LoggerMetadata = {} /** * Create a new logger for a specific module @@ -172,6 +179,20 @@ export class Logger { } } + /** + * Creates a child logger with additional metadata merged in. + * The child inherits this logger's module name, config, and existing metadata. + * New metadata keys override existing ones with the same name. + */ + withMetadata(metadata: LoggerMetadata): Logger { + const child = Object.create(Logger.prototype) as Logger + child.module = this.module + child.config = this.config + child.isDev = this.isDev + child.metadata = { ...this.metadata, ...metadata } + return child + } + /** * Determines if a log at the given level should be displayed */ @@ -209,6 +230,12 @@ export class Logger { const timestamp = new Date().toISOString() const formattedArgs = this.formatArgs(args) + const metadataEntries = Object.entries(this.metadata).filter(([_, v]) => v !== undefined) + const metadataStr = + metadataEntries.length > 0 + ? ` {${metadataEntries.map(([k, v]) => `${k}=${v}`).join(' ')}}` + : '' + if (this.config.colorize) { let levelColor: (text: string) => string const moduleColor = chalk.cyan @@ -229,7 +256,8 @@ export class Logger { break } - const coloredPrefix = `${timestampColor(`[${timestamp}]`)} ${levelColor(`[${level}]`)} ${moduleColor(`[${this.module}]`)}` + const coloredMeta = metadataStr ? ` ${chalk.magenta(metadataStr.trim())}` : '' + const coloredPrefix = `${timestampColor(`[${timestamp}]`)} ${levelColor(`[${level}]`)} ${moduleColor(`[${this.module}]`)}${coloredMeta}` if (level === LogLevel.ERROR) { console.error(coloredPrefix, message, ...formattedArgs) @@ -237,7 +265,7 @@ export class Logger { console.log(coloredPrefix, message, ...formattedArgs) } } else { - const prefix = `[${timestamp}] [${level}] [${this.module}]` + const prefix = `[${timestamp}] [${level}] [${this.module}]${metadataStr}` if (level === LogLevel.ERROR) { console.error(prefix, message, ...formattedArgs) diff --git a/packages/testing/src/mocks/logger.mock.ts b/packages/testing/src/mocks/logger.mock.ts index 50c25122b3a..a71eedb1ecc 100644 --- a/packages/testing/src/mocks/logger.mock.ts +++ b/packages/testing/src/mocks/logger.mock.ts @@ -21,6 +21,7 @@ export function createMockLogger() { trace: vi.fn(), fatal: vi.fn(), child: vi.fn(() => createMockLogger()), + withMetadata: vi.fn(() => createMockLogger()), } } @@ -60,4 +61,5 @@ export function clearLoggerMocks(logger: ReturnType) { logger.debug.mockClear() logger.trace.mockClear() logger.fatal.mockClear() + logger.withMetadata.mockClear() }