diff --git a/apps/realtime/src/handlers/variables.ts b/apps/realtime/src/handlers/variables.ts index 98dc3a5b7af..7a4303e70b3 100644 --- a/apps/realtime/src/handlers/variables.ts +++ b/apps/realtime/src/handlers/variables.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { workflow } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -18,6 +19,9 @@ type PendingVariable = { latest: { variableId: string; field: string; value: any; timestamp: number } timeout: NodeJS.Timeout opToSocket: Map + /** Most recent writer, used as the audit actor when the coalesced update is persisted. */ + actorId: string + actorName?: string } // Keyed by `${workflowId}:${variableId}:${field}` @@ -177,6 +181,8 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: if (existing) { clearTimeout(existing.timeout) existing.latest = { variableId, field, value, timestamp } + existing.actorId = session.userId + existing.actorName = session.userName if (operationId) existing.opToSocket.set(operationId, socket.id) existing.timeout = setTimeout(async () => { await flushVariableUpdate(workflowId, existing, roomManager) @@ -196,6 +202,8 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: latest: { variableId, field, value, timestamp }, timeout, opToSocket, + actorId: session.userId, + actorName: session.userName, }) } } catch (error) { @@ -231,7 +239,11 @@ async function flushVariableUpdate( try { const workflowExists = await db - .select({ id: workflow.id }) + .select({ + id: workflow.id, + name: workflow.name, + workspaceId: workflow.workspaceId, + }) .from(workflow) .where(eq(workflow.id, workflowId)) .limit(1) @@ -294,6 +306,19 @@ async function flushVariableUpdate( }) if (updateSuccessful) { + const workflowRow = workflowExists[0] + recordAudit({ + workspaceId: workflowRow.workspaceId ?? null, + actorId: pending.actorId, + actorName: pending.actorName, + action: AuditAction.WORKFLOW_VARIABLES_UPDATED, + resourceType: AuditResourceType.WORKFLOW, + resourceId: workflowId, + resourceName: workflowRow.name ?? undefined, + description: `Updated workflow variables`, + metadata: { variableId, field }, + }) + // Broadcast to room excluding all senders (works cross-pod via Redis adapter) const senderSocketIds = [...pending.opToSocket.values()] const broadcastPayload = { diff --git a/apps/sim/app/api/auth/oauth/token/route.ts b/apps/sim/app/api/auth/oauth/token/route.ts index d7d1b9cd9f3..91b4d55ea4b 100644 --- a/apps/sim/app/api/auth/oauth/token/route.ts +++ b/apps/sim/app/api/auth/oauth/token/route.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { createLogger } from '@sim/logger' import { getErrorMessage } from '@sim/utils/errors' import { type NextRequest, NextResponse } from 'next/server' @@ -11,6 +12,7 @@ import { AuthType, checkSessionOrInternalAuth } from '@/lib/auth/hybrid' import { generateRequestId } from '@/lib/core/utils/request' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { ATLASSIAN_SERVICE_ACCOUNT_PROVIDER_ID } from '@/lib/oauth/types' +import { captureServerEvent } from '@/lib/posthog/server' import { getAtlassianServiceAccountSecret, getCredential, @@ -96,6 +98,24 @@ export const POST = withRouteHandler(async (request: NextRequest) => { ) } + recordAudit({ + actorId: auth.userId, + action: AuditAction.CREDENTIAL_ACCESSED, + resourceType: AuditResourceType.CREDENTIAL, + resourceId: providerId, + description: `Accessed OAuth credential for provider ${providerId}`, + metadata: { + provider: providerId, + credentialType: 'oauth', + credentialAccountUserId, + }, + request, + }) + captureServerEvent(auth.userId, 'credential_used', { + credential_type: 'oauth', + provider_id: providerId, + }) + return NextResponse.json({ accessToken }, { status: 200 }) } catch (error) { const message = getErrorMessage(error, 'Failed to get OAuth token') @@ -120,9 +140,39 @@ export const POST = withRouteHandler(async (request: NextRequest) => { return NextResponse.json({ error: authz.error || 'Unauthorized' }, { status: 403 }) } + const saActorId = authz.requesterUserId + const saWorkspaceId = resolved.workspaceId ?? authz.workspaceId ?? null + const emitServiceAccountAccess = () => { + if (!saActorId) return + recordAudit({ + workspaceId: saWorkspaceId, + actorId: saActorId, + action: AuditAction.CREDENTIAL_ACCESSED, + resourceType: AuditResourceType.CREDENTIAL, + resourceId: resolved.credentialId ?? credentialId, + description: `Accessed service account credential for provider ${resolved.providerId ?? 'unknown'}`, + metadata: { + provider: resolved.providerId, + credentialType: 'service_account', + }, + request, + }) + captureServerEvent( + saActorId, + 'credential_used', + { + credential_type: 'service_account', + provider_id: resolved.providerId ?? 'unknown', + ...(saWorkspaceId ? { workspace_id: saWorkspaceId } : {}), + }, + saWorkspaceId ? { groups: { workspace: saWorkspaceId } } : undefined + ) + } + try { if (resolved.providerId === ATLASSIAN_SERVICE_ACCOUNT_PROVIDER_ID) { const secret = await getAtlassianServiceAccountSecret(resolved.credentialId) + emitServiceAccountAccess() return NextResponse.json( { accessToken: secret.apiToken, @@ -137,6 +187,7 @@ export const POST = withRouteHandler(async (request: NextRequest) => { scopes ?? [], impersonateEmail ) + emitServiceAccountAccess() return NextResponse.json({ accessToken }, { status: 200 }) } catch (error) { logger.error(`[${requestId}] Service account token error:`, error) @@ -165,6 +216,9 @@ export const POST = withRouteHandler(async (request: NextRequest) => { return NextResponse.json({ error: 'Credential not found' }, { status: 404 }) } + const oauthActorId = authz.requesterUserId + const oauthWorkspaceId = authz.workspaceId ?? null + try { const { accessToken } = await refreshTokenIfNeeded( requestId, @@ -172,6 +226,32 @@ export const POST = withRouteHandler(async (request: NextRequest) => { resolvedCredentialId ) + if (oauthActorId) { + recordAudit({ + workspaceId: oauthWorkspaceId, + actorId: oauthActorId, + action: AuditAction.CREDENTIAL_ACCESSED, + resourceType: AuditResourceType.CREDENTIAL, + resourceId: resolvedCredentialId, + description: `Accessed OAuth credential for provider ${credential.providerId}`, + metadata: { + provider: credential.providerId, + credentialType: 'oauth', + }, + request, + }) + captureServerEvent( + oauthActorId, + 'credential_used', + { + credential_type: 'oauth', + provider_id: credential.providerId, + ...(oauthWorkspaceId ? { workspace_id: oauthWorkspaceId } : {}), + }, + oauthWorkspaceId ? { groups: { workspace: oauthWorkspaceId } } : undefined + ) + } + let instanceUrl: string | undefined if (credential.providerId === 'salesforce' && credential.scope) { const instanceMatch = credential.scope.match(SALESFORCE_INSTANCE_URL_REGEX) @@ -247,6 +327,9 @@ export const GET = withRouteHandler(async (request: NextRequest) => { return NextResponse.json({ error: 'No access token available' }, { status: 400 }) } + const actorId = authz.requesterUserId + const workspaceId = authz.workspaceId ?? null + try { const { accessToken } = await refreshTokenIfNeeded( requestId, @@ -254,6 +337,32 @@ export const GET = withRouteHandler(async (request: NextRequest) => { resolvedCredentialId ) + if (actorId) { + recordAudit({ + workspaceId, + actorId, + action: AuditAction.CREDENTIAL_ACCESSED, + resourceType: AuditResourceType.CREDENTIAL, + resourceId: resolvedCredentialId, + description: `Accessed OAuth credential for provider ${credential.providerId}`, + metadata: { + provider: credential.providerId, + credentialType: 'oauth', + }, + request, + }) + captureServerEvent( + actorId, + 'credential_used', + { + credential_type: 'oauth', + provider_id: credential.providerId, + ...(workspaceId ? { workspace_id: workspaceId } : {}), + }, + workspaceId ? { groups: { workspace: workspaceId } } : undefined + ) + } + // For Salesforce, extract instanceUrl from the scope field let instanceUrl: string | undefined if (credential.providerId === 'salesforce' && credential.scope) { diff --git a/apps/sim/app/api/billing/switch-plan/route.ts b/apps/sim/app/api/billing/switch-plan/route.ts index c066afbedb0..7e56b9cba49 100644 --- a/apps/sim/app/api/billing/switch-plan/route.ts +++ b/apps/sim/app/api/billing/switch-plan/route.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { subscription as subscriptionTable } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -174,6 +175,29 @@ export const POST = withRouteHandler(async (request: NextRequest) => { { set: { plan: targetPlanName } } ) + if (isOrgScopedSubscription(sub, userId)) { + recordAudit({ + actorId: userId, + action: AuditAction.ORG_PLAN_CONVERTED, + resourceType: AuditResourceType.ORGANIZATION, + resourceId: sub.referenceId, + description: `Plan converted from ${sub.plan ?? 'unknown'} to ${targetPlanName}`, + metadata: { + organizationId: sub.referenceId, + subscriptionId: sub.id, + fromPlan: sub.plan, + toPlan: targetPlanName, + interval: targetInterval, + }, + request, + }) + captureServerEvent(userId, 'plan_converted', { + organization_id: sub.referenceId, + from_plan: sub.plan ?? 'unknown', + to_plan: targetPlanName, + }) + } + return NextResponse.json({ success: true, plan: targetPlanName, interval: targetInterval }) } catch (error) { logger.error('Failed to switch subscription', { diff --git a/apps/sim/app/api/environment/route.ts b/apps/sim/app/api/environment/route.ts index a976da6b6ee..75162c53e0b 100644 --- a/apps/sim/app/api/environment/route.ts +++ b/apps/sim/app/api/environment/route.ts @@ -14,6 +14,7 @@ import { generateRequestId } from '@/lib/core/utils/request' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { syncPersonalEnvCredentialsForUser } from '@/lib/credentials/environment' import type { EnvironmentVariable } from '@/lib/environment/api' +import { captureServerEvent } from '@/lib/posthog/server' const logger = createLogger('EnvironmentAPI') @@ -89,6 +90,11 @@ export const POST = withRouteHandler(async (req: NextRequest) => { request: req, }) + captureServerEvent(session.user.id, 'environment_updated', { + key_count: Object.keys(variables).length, + scope: 'personal', + }) + return NextResponse.json({ success: true }) } catch (error) { logger.error(`[${requestId}] Error updating environment variables`, error) diff --git a/apps/sim/app/api/files/download/route.ts b/apps/sim/app/api/files/download/route.ts index 33f1ce61146..793b4a55d44 100644 --- a/apps/sim/app/api/files/download/route.ts +++ b/apps/sim/app/api/files/download/route.ts @@ -1,9 +1,11 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { fileDownloadContract } from '@/lib/api/contracts/storage-transfer' import { getValidationErrorMessage, parseRequest } from '@/lib/api/server' import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { captureServerEvent } from '@/lib/posthog/server' import type { StorageContext } from '@/lib/uploads/config' import { hasCloudStorage } from '@/lib/uploads/core/storage-service' import { verifyFileAccess } from '@/app/api/files/authorization' @@ -84,10 +86,26 @@ export const POST = withRouteHandler(async (request: NextRequest) => { logger.info(`Generated download URL for ${storageContext ?? 'inferred'} file: ${key}`) + const downloadName = name || key.split('/').pop() || 'download' + recordAudit({ + workspaceId: null, + actorId: userId, + action: AuditAction.FILE_DOWNLOADED, + resourceType: AuditResourceType.FILE, + resourceName: downloadName, + description: `Downloaded file "${downloadName}"`, + metadata: { key, fileName: downloadName, context: storageContext }, + request, + }) + captureServerEvent(userId, 'file_downloaded', { + is_bulk: false, + file_count: 1, + }) + return NextResponse.json({ downloadUrl, expiresIn: null, - fileName: name || key.split('/').pop() || 'download', + fileName: downloadName, }) } catch (error) { logger.error('Error in file download endpoint:', error) diff --git a/apps/sim/app/api/files/export/[id]/route.ts b/apps/sim/app/api/files/export/[id]/route.ts index 26dc06abe0d..8a7e4b292e5 100644 --- a/apps/sim/app/api/files/export/[id]/route.ts +++ b/apps/sim/app/api/files/export/[id]/route.ts @@ -1,4 +1,5 @@ import path from 'node:path' +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' import JSZip from 'jszip' @@ -9,6 +10,7 @@ import { parseRequest } from '@/lib/api/server' import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' import { extractEmbeddedImageIds } from '@/lib/copilot/tools/server/files/embedded-image-refs' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { captureServerEvent } from '@/lib/posthog/server' import type { StorageContext } from '@/lib/uploads/config' import { USE_BLOB_STORAGE } from '@/lib/uploads/config' import { downloadFile } from '@/lib/uploads/core/storage-service' @@ -68,9 +70,45 @@ export const GET = withRouteHandler( return NextResponse.json({ error: 'Forbidden' }, { status: 403 }) } + /** + * Records the egress only at a real success exit (serve redirect, plain + * markdown, or bundled zip) so a mid-export failure never logs a download + * that never happened. + */ + const auditExport = (format: 'file' | 'markdown' | 'zip', assetCount: number) => { + recordAudit({ + workspaceId: record.workspaceId ?? null, + actorId: userId, + action: AuditAction.FILE_DOWNLOADED, + resourceType: AuditResourceType.FILE, + resourceId: record.id, + resourceName: record.originalName, + description: `Exported file "${record.originalName}"`, + metadata: { + fileId: record.id, + fileName: record.originalName, + bytes: record.size, + format, + assetCount, + }, + request, + }) + captureServerEvent( + userId, + 'file_downloaded', + { + ...(record.workspaceId ? { workspace_id: record.workspaceId } : {}), + is_bulk: assetCount > 0, + file_count: 1 + assetCount, + }, + record.workspaceId ? { groups: { workspace: record.workspaceId } } : undefined + ) + } + if (!isMarkdown(record.originalName, record.contentType)) { const storagePrefix = USE_BLOB_STORAGE ? 'blob' : 's3' const servePath = `/api/files/serve/${storagePrefix}/${encodeURIComponent(record.key)}` + auditExport('file', 0) return NextResponse.redirect(new URL(servePath, request.url), { status: 302 }) } @@ -87,6 +125,7 @@ export const GET = withRouteHandler( if (imageIds.length === 0) { const mdName = safeFilename(record.originalName) const mdBytes = Buffer.from(mdContent, 'utf-8') + auditExport('markdown', 0) return new NextResponse(new Uint8Array(mdBytes), { status: 200, headers: { @@ -151,6 +190,7 @@ export const GET = withRouteHandler( const zipBuffer = await zip.generateAsync({ type: 'nodebuffer', compression: 'DEFLATE' }) const zipName = safeFilename(`${record.originalName.replace(/\.[^.]+$/, '')}.zip`) + auditExport('zip', assetMap.size) return new NextResponse(new Uint8Array(zipBuffer), { status: 200, headers: { diff --git a/apps/sim/app/api/files/public/[token]/content/route.ts b/apps/sim/app/api/files/public/[token]/content/route.ts index 8db42e412bd..87282d86f4d 100644 --- a/apps/sim/app/api/files/public/[token]/content/route.ts +++ b/apps/sim/app/api/files/public/[token]/content/route.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { createLogger } from '@sim/logger' import type { NextRequest } from 'next/server' import { NextResponse } from 'next/server' @@ -76,6 +77,25 @@ export const GET = withRouteHandler( logger.info('Public shared file served', { token, key: file.key, size: buffer.length }) + // Anonymous access: null actor (owner-as-actor would misread as a self-download). + recordAudit({ + workspaceId: file.workspaceId ?? null, + actorId: null, + action: AuditAction.FILE_DOWNLOADED, + resourceType: AuditResourceType.FILE, + resourceId: file.id, + resourceName: file.originalName, + description: `Public share download of "${file.originalName}"`, + metadata: { + access: 'public_share', + anonymous: true, + sharedByUserId: file.userId, + fileName: file.originalName, + bytes: buffer.length, + }, + request, + }) + // Revalidate every request: a shared file can be unshared, edited, or deleted, // so the fixed token URL must never serve stale bytes from a long-lived cache. return createFileResponse({ diff --git a/apps/sim/app/api/files/public/[token]/inline/route.ts b/apps/sim/app/api/files/public/[token]/inline/route.ts index 87c343a26a8..a777b953ba5 100644 --- a/apps/sim/app/api/files/public/[token]/inline/route.ts +++ b/apps/sim/app/api/files/public/[token]/inline/route.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { createLogger } from '@sim/logger' import type { NextRequest } from 'next/server' import { NextResponse } from 'next/server' @@ -86,8 +87,27 @@ export const GET = withRouteHandler( throw new FileNotFoundError('Not found') } - // Content-truth gate (`sniff`): render only genuine raster image bytes. - return await serveInlineImage(image, { sniff: true }) + // Content-truth gate (`sniff`): render only genuine raster image bytes; audit after. + const response = await serveInlineImage(image, { sniff: true }) + + // Anonymous access: null actor (owner-as-actor would misread as a self-download). + recordAudit({ + workspaceId: doc.workspaceId, + actorId: null, + action: AuditAction.FILE_DOWNLOADED, + resourceType: AuditResourceType.FILE, + resourceName: image.filename, + description: `Public share inline image "${image.filename}"`, + metadata: { + access: 'public_share', + anonymous: true, + inline: true, + sharedByUserId: doc.userId, + }, + request, + }) + + return response } catch (error) { if (error instanceof FileNotFoundError) { return createErrorResponse(error) diff --git a/apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts b/apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts index 69433ca0b7d..38e2e831c8e 100644 --- a/apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts +++ b/apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts @@ -18,6 +18,7 @@ import { } from '@/lib/billing/organizations/membership' import { reconcileOrganizationSeats } from '@/lib/billing/organizations/seats' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { captureServerEvent } from '@/lib/posthog/server' const logger = createLogger('OrganizationMemberAPI') @@ -253,6 +254,13 @@ export const PUT = withRouteHandler( request, }) + captureServerEvent( + session.user.id, + 'org_member_role_changed', + { organization_id: organizationId, new_role: role }, + { groups: { organization: organizationId } } + ) + return NextResponse.json({ success: true, message: 'Member role updated successfully', @@ -382,6 +390,13 @@ export const DELETE = withRouteHandler( request, }) + captureServerEvent( + session.user.id, + 'org_member_removed', + { organization_id: organizationId, is_self_removal: session.user.id === targetUserId }, + { groups: { organization: organizationId } } + ) + return NextResponse.json({ success: true, message: 'External member removed successfully', @@ -422,6 +437,7 @@ export const DELETE = withRouteHandler( seatReduction = await reconcileOrganizationSeats({ organizationId, reason: 'member-removed', + actorId: session.user.id, }) } catch (seatError) { logger.error('Failed to reduce seats after member removal', { @@ -479,6 +495,13 @@ export const DELETE = withRouteHandler( request, }) + captureServerEvent( + session.user.id, + 'org_member_removed', + { organization_id: organizationId, is_self_removal: session.user.id === targetUserId }, + { groups: { organization: organizationId } } + ) + return NextResponse.json({ success: true, message: diff --git a/apps/sim/app/api/organizations/route.ts b/apps/sim/app/api/organizations/route.ts index ba1fece2f3d..839ff3c7617 100644 --- a/apps/sim/app/api/organizations/route.ts +++ b/apps/sim/app/api/organizations/route.ts @@ -23,6 +23,7 @@ import { import { isOrgPlan } from '@/lib/billing/plan-helpers' import { ENTITLED_SUBSCRIPTION_STATUSES } from '@/lib/billing/subscriptions/utils' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { captureServerEvent } from '@/lib/posthog/server' import { attachOwnedWorkspacesToOrganization, WorkspaceOrganizationMembershipConflictError, @@ -256,6 +257,15 @@ export const POST = withRouteHandler(async (request: Request) => { metadata: { organizationSlug }, request, }) + captureServerEvent( + user.id, + 'organization_created', + { + organization_id: organizationId, + ...(organizationName ? { name: organizationName } : {}), + }, + { groups: { organization: organizationId } } + ) } return NextResponse.json({ diff --git a/apps/sim/app/api/table/[tableId]/export-async/route.ts b/apps/sim/app/api/table/[tableId]/export-async/route.ts index 9208808e61e..7ad5cdb251d 100644 --- a/apps/sim/app/api/table/[tableId]/export-async/route.ts +++ b/apps/sim/app/api/table/[tableId]/export-async/route.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { createLogger } from '@sim/logger' import { generateId } from '@sim/utils/id' import { type NextRequest, NextResponse } from 'next/server' @@ -8,6 +9,7 @@ import { isTriggerDevEnabled } from '@/lib/core/config/env-flags' import { runDetached } from '@/lib/core/utils/background' import { generateRequestId } from '@/lib/core/utils/request' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { captureServerEvent } from '@/lib/posthog/server' import { runTableExport, type TableExportPayload } from '@/lib/table/export-runner' import { markTableJobRunning, releaseJobClaim } from '@/lib/table/jobs/service' import type { TableExportJobPayload } from '@/lib/table/types' @@ -58,7 +60,12 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro return NextResponse.json({ error: 'Failed to start export' }, { status: 409 }) } - const payload: TableExportPayload = { jobId, tableId, workspaceId, format } + const payload: TableExportPayload = { + jobId, + tableId, + workspaceId, + format, + } if (isTriggerDevEnabled) { try { const [{ tableExportTask }, { tasks }, { resolveTriggerRegion }] = await Promise.all([ @@ -80,6 +87,27 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro runDetached('table-export', () => runTableExport(payload)) } + // Audit at authorization (like the sync route) so a failed/abandoned job still records the export. + recordAudit({ + workspaceId, + actorId: authResult.userId, + action: AuditAction.TABLE_EXPORTED, + resourceType: AuditResourceType.TABLE, + resourceId: tableId, + resourceName: access.table.name, + description: `Exported table "${access.table.name}" as ${format.toUpperCase()}`, + metadata: { format, rowCount: access.table.rowCount, async: true }, + request, + }) + if (access.table.workspaceId) { + captureServerEvent( + authResult.userId, + 'table_exported', + { table_id: tableId, workspace_id: workspaceId }, + { groups: { workspace: workspaceId } } + ) + } + logger.info(`[${requestId}] Async export started`, { tableId, jobId, format }) return NextResponse.json({ success: true, data: { tableId, jobId } }) }) diff --git a/apps/sim/app/api/table/[tableId]/export/route.ts b/apps/sim/app/api/table/[tableId]/export/route.ts index 75208e3d982..df217ec7d31 100644 --- a/apps/sim/app/api/table/[tableId]/export/route.ts +++ b/apps/sim/app/api/table/[tableId]/export/route.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { tableExportFormatSchema, tableIdParamsSchema } from '@/lib/api/contracts/tables' @@ -6,6 +7,7 @@ import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' import { neutralizeCsvFormula } from '@/lib/core/utils/csv' import { generateRequestId } from '@/lib/core/utils/request' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { captureServerEvent } from '@/lib/posthog/server' import { buildNameById, getColumnId, rowDataIdToName } from '@/lib/table/column-keys' import { queryRows } from '@/lib/table/rows/service' import { accessError, checkAccess } from '@/app/api/table/utils' @@ -29,6 +31,7 @@ export const GET = withRouteHandler(async (request: NextRequest, { params }: Rou if (!auth.success || !auth.userId) { return NextResponse.json({ error: 'Authentication required' }, { status: 401 }) } + const userId = auth.userId const { searchParams } = new URL(request.url) const formatValidation = tableExportFormatSchema.safeParse( @@ -53,6 +56,27 @@ export const GET = withRouteHandler(async (request: NextRequest, { params }: Rou const safeName = sanitizeFilename(table.name) const filename = `${safeName}.${format}` + // Audit before streaming: rows leave incrementally, so a mid-stream failure still exfiltrates partial data. + recordAudit({ + workspaceId: table.workspaceId ?? null, + actorId: userId, + action: AuditAction.TABLE_EXPORTED, + resourceType: AuditResourceType.TABLE, + resourceId: tableId, + resourceName: table.name, + description: `Exported table "${table.name}" as ${format.toUpperCase()}`, + metadata: { format, rowCount: table.rowCount }, + request, + }) + if (table.workspaceId) { + captureServerEvent( + userId, + 'table_exported', + { table_id: tableId, workspace_id: table.workspaceId }, + { groups: { workspace: table.workspaceId } } + ) + } + const stream = new ReadableStream({ async start(controller) { const encoder = new TextEncoder() diff --git a/apps/sim/app/api/table/[tableId]/route.ts b/apps/sim/app/api/table/[tableId]/route.ts index bd398867eeb..5d0069fa570 100644 --- a/apps/sim/app/api/table/[tableId]/route.ts +++ b/apps/sim/app/api/table/[tableId]/route.ts @@ -125,7 +125,7 @@ export const PATCH = withRouteHandler( return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 }) } - const updated = await renameTable(tableId, validated.name, requestId) + const updated = await renameTable(tableId, validated.name, requestId, authResult.userId) return NextResponse.json({ success: true, @@ -172,7 +172,7 @@ export const DELETE = withRouteHandler( return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 }) } - await deleteTable(tableId, requestId) + await deleteTable(tableId, requestId, authResult.userId) captureServerEvent( authResult.userId, diff --git a/apps/sim/app/api/users/me/api-keys/[id]/route.ts b/apps/sim/app/api/users/me/api-keys/[id]/route.ts index 4ae92aff51d..514832d4a06 100644 --- a/apps/sim/app/api/users/me/api-keys/[id]/route.ts +++ b/apps/sim/app/api/users/me/api-keys/[id]/route.ts @@ -9,6 +9,7 @@ import { getValidationErrorMessage } from '@/lib/api/server' import { getSession } from '@/lib/auth' import { generateRequestId } from '@/lib/core/utils/request' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { captureServerEvent } from '@/lib/posthog/server' const logger = createLogger('ApiKeyAPI') @@ -59,6 +60,11 @@ export const DELETE = withRouteHandler( request, }) + captureServerEvent(userId, 'api_key_revoked', { + key_name: deletedKey.name, + scope: 'personal', + }) + return NextResponse.json({ success: true }) } catch (error) { logger.error('Failed to delete API key', { error }) diff --git a/apps/sim/app/api/users/me/api-keys/route.ts b/apps/sim/app/api/users/me/api-keys/route.ts index e14a00a8118..fb2411ace55 100644 --- a/apps/sim/app/api/users/me/api-keys/route.ts +++ b/apps/sim/app/api/users/me/api-keys/route.ts @@ -11,6 +11,7 @@ import { createApiKey, getApiKeyDisplayFormat } from '@/lib/api-key/auth' import { hashApiKey } from '@/lib/api-key/crypto' import { getSession } from '@/lib/auth' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { captureServerEvent } from '@/lib/posthog/server' const logger = createLogger('ApiKeysAPI') @@ -122,6 +123,11 @@ export const POST = withRouteHandler(async (request: NextRequest) => { request, }) + captureServerEvent(userId, 'api_key_created', { + key_name: name, + scope: 'personal', + }) + return NextResponse.json({ key: { ...newKey, diff --git a/apps/sim/app/api/users/me/subscription/[id]/transfer/route.ts b/apps/sim/app/api/users/me/subscription/[id]/transfer/route.ts index c1a64e93175..adce3c771ef 100644 --- a/apps/sim/app/api/users/me/subscription/[id]/transfer/route.ts +++ b/apps/sim/app/api/users/me/subscription/[id]/transfer/route.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { member, organization, subscription } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -13,6 +14,7 @@ import { hasPaidSubscriptionStatus, } from '@/lib/billing/subscriptions/utils' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { captureServerEvent } from '@/lib/posthog/server' const logger = createLogger('SubscriptionTransferAPI') @@ -132,6 +134,26 @@ export const POST = withRouteHandler( organizationId, userId, }) + + recordAudit({ + actorId: userId, + action: AuditAction.SUBSCRIPTION_TRANSFERRED, + resourceType: AuditResourceType.SUBSCRIPTION, + resourceId: subscriptionId, + description: `Subscription transferred to organization ${organizationId}`, + metadata: { + subscriptionId, + organizationId, + fromEntity: 'user', + toEntity: 'organization', + }, + request, + }) + captureServerEvent(userId, 'subscription_transferred', { + subscription_id: subscriptionId, + from_entity: 'user', + to_entity: 'organization', + }) } return NextResponse.json({ success: true, message: outcome.message }) diff --git a/apps/sim/app/api/v1/admin/credits/route.ts b/apps/sim/app/api/v1/admin/credits/route.ts index 756f1efc304..e7e5ed81833 100644 --- a/apps/sim/app/api/v1/admin/credits/route.ts +++ b/apps/sim/app/api/v1/admin/credits/route.ts @@ -23,6 +23,7 @@ * Usage limits are updated accordingly to allow spending the credits. */ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { organization, subscription, user, userStats } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -209,6 +210,24 @@ export const POST = withRouteHandler( reason: reason || 'No reason provided', }) + recordAudit({ + actorId: 'admin-api', + action: AuditAction.CREDIT_ISSUED, + resourceType: AuditResourceType.BILLING, + resourceId: entityId, + description: `Admin API issued $${Number(amount).toFixed(2)} credits to ${entityType} ${entityId}`, + metadata: { + targetUserId: resolvedUserId, + ...(entityType === 'organization' ? { targetOrgId: entityId } : {}), + entityType, + amount, + currency: 'usd', + reason: reason || null, + newCreditBalance, + }, + request, + }) + return singleResponse({ success: true, userId: resolvedUserId, diff --git a/apps/sim/app/api/v1/admin/organizations/[id]/members/[memberId]/route.ts b/apps/sim/app/api/v1/admin/organizations/[id]/members/[memberId]/route.ts index 0f25618fc2c..68b79e3a78a 100644 --- a/apps/sim/app/api/v1/admin/organizations/[id]/members/[memberId]/route.ts +++ b/apps/sim/app/api/v1/admin/organizations/[id]/members/[memberId]/route.ts @@ -25,6 +25,7 @@ * Response: { success: true, memberId: string, billingActions: {...} } */ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { member, organization, user, userStats } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -205,6 +206,22 @@ export const PATCH = withRouteHandler( previousRole: existingMember.role, }) + recordAudit({ + workspaceId: null, + actorId: 'admin-api', + action: AuditAction.ORG_MEMBER_ROLE_CHANGED, + resourceType: AuditResourceType.ORGANIZATION, + resourceId: organizationId, + description: `Admin API changed organization member role to ${role}`, + metadata: { + memberId, + targetUserId: existingMember.userId, + previousRole: existingMember.role, + role, + }, + request, + }) + return singleResponse(data) } catch (error) { logger.error('Admin API: Failed to update member', { error, organizationId, memberId }) @@ -275,6 +292,17 @@ export const DELETE = withRouteHandler( billingActions: result.billingActions, }) + recordAudit({ + workspaceId: null, + actorId: 'admin-api', + action: AuditAction.ORG_MEMBER_REMOVED, + resourceType: AuditResourceType.ORGANIZATION, + resourceId: organizationId, + description: 'Admin API removed member from organization', + metadata: { memberId, targetUserId: userId }, + request, + }) + return singleResponse({ success: true, memberId, diff --git a/apps/sim/app/api/v1/admin/organizations/[id]/members/route.ts b/apps/sim/app/api/v1/admin/organizations/[id]/members/route.ts index 99e6d2c791c..5f77c3aa1e9 100644 --- a/apps/sim/app/api/v1/admin/organizations/[id]/members/route.ts +++ b/apps/sim/app/api/v1/admin/organizations/[id]/members/route.ts @@ -28,6 +28,7 @@ * }> */ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { member, organization, user, userStats } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -204,6 +205,17 @@ export const POST = withRouteHandler( } ) + recordAudit({ + workspaceId: null, + actorId: 'admin-api', + action: AuditAction.ORG_MEMBER_ROLE_CHANGED, + resourceType: AuditResourceType.ORGANIZATION, + resourceId: organizationId, + description: `Admin API changed organization member role to ${role}`, + metadata: { targetUserId: userId, previousRole: existingMember.role, role }, + request, + }) + return singleResponse({ id: existingMember.id, userId, @@ -268,6 +280,17 @@ export const POST = withRouteHandler( billingActions: result.billingActions, }) + recordAudit({ + workspaceId: null, + actorId: 'admin-api', + action: AuditAction.ORG_MEMBER_ADDED, + resourceType: AuditResourceType.ORGANIZATION, + resourceId: organizationId, + description: `Admin API added member to organization as ${role}`, + metadata: { targetUserId: userId, role, memberId: result.memberId }, + request, + }) + return singleResponse({ ...data, action: 'created' as const, diff --git a/apps/sim/app/api/v1/admin/organizations/[id]/route.ts b/apps/sim/app/api/v1/admin/organizations/[id]/route.ts index 5f8881dc739..a1ef18a9cd0 100644 --- a/apps/sim/app/api/v1/admin/organizations/[id]/route.ts +++ b/apps/sim/app/api/v1/admin/organizations/[id]/route.ts @@ -16,6 +16,7 @@ * Response: AdminSingleResponse */ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { member, organization, subscription } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -161,6 +162,18 @@ export const PATCH = withRouteHandler( fields: Object.keys(updateData).filter((k) => k !== 'updatedAt'), }) + recordAudit({ + workspaceId: null, + actorId: 'admin-api', + action: AuditAction.ORGANIZATION_UPDATED, + resourceType: AuditResourceType.ORGANIZATION, + resourceId: organizationId, + resourceName: updated.name, + description: `Admin API updated organization "${updated.name}"`, + metadata: { fields: Object.keys(updateData).filter((k) => k !== 'updatedAt') }, + request, + }) + return singleResponse(toAdminOrganization(updated)) } catch (error) { if (error instanceof OrganizationSlugInvalidError) { diff --git a/apps/sim/app/api/v1/admin/organizations/[id]/transfer-ownership/route.ts b/apps/sim/app/api/v1/admin/organizations/[id]/transfer-ownership/route.ts index 9da30b930d2..026d3458488 100644 --- a/apps/sim/app/api/v1/admin/organizations/[id]/transfer-ownership/route.ts +++ b/apps/sim/app/api/v1/admin/organizations/[id]/transfer-ownership/route.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { member, organization, user } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -108,6 +109,22 @@ export const POST = withRouteHandler( billingBlockInherited: result.billingBlockInherited, }) + recordAudit({ + workspaceId: null, + actorId: 'admin-api', + action: AuditAction.ORG_MEMBER_ROLE_CHANGED, + resourceType: AuditResourceType.ORGANIZATION, + resourceId: organizationId, + description: 'Admin API transferred organization ownership', + metadata: { + currentOwnerUserId, + newOwnerUserId, + workspacesReassigned: result.workspacesReassigned, + billedAccountReassigned: result.billedAccountReassigned, + }, + request, + }) + return singleResponse({ organizationId, currentOwnerUserId, diff --git a/apps/sim/app/api/v1/admin/organizations/route.ts b/apps/sim/app/api/v1/admin/organizations/route.ts index 5799c2bf1ad..26a2a652868 100644 --- a/apps/sim/app/api/v1/admin/organizations/route.ts +++ b/apps/sim/app/api/v1/admin/organizations/route.ts @@ -21,6 +21,7 @@ * Response: AdminSingleResponse */ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db, dbReplica } from '@sim/db' import { member, organization, user } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -167,6 +168,18 @@ export const POST = withRouteHandler( memberId, }) + recordAudit({ + workspaceId: null, + actorId: 'admin-api', + action: AuditAction.ORGANIZATION_CREATED, + resourceType: AuditResourceType.ORGANIZATION, + resourceId: organizationId, + resourceName: name, + description: `Admin API created organization "${name}"`, + metadata: { slug, ownerId, memberId }, + request, + }) + return singleResponse({ ...toAdminOrganization(createdOrg), memberId, diff --git a/apps/sim/app/api/v1/admin/workflows/export/route.ts b/apps/sim/app/api/v1/admin/workflows/export/route.ts index e5021518a3e..f3f90f968cd 100644 --- a/apps/sim/app/api/v1/admin/workflows/export/route.ts +++ b/apps/sim/app/api/v1/admin/workflows/export/route.ts @@ -14,6 +14,7 @@ * - JSON: AdminListResponse */ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { workflow } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -101,7 +102,25 @@ export const POST = withRouteHandler( logger.info(`Admin API: Exporting ${workflowExports.length} workflows`) + const auditExport = () => { + if (workflowExports.length === 0) return + recordAudit({ + actorId: 'admin-api', + action: AuditAction.WORKFLOW_EXPORTED, + resourceType: AuditResourceType.WORKFLOW, + description: `Admin API exported ${workflowExports.length} workflow(s)`, + metadata: { + format, + requestedCount: body.ids.length, + exportedCount: workflowExports.length, + requestedIds: body.ids, + }, + request, + }) + } + if (format === 'json') { + auditExport() return listResponse(workflowExports, { total: workflowExports.length, limit: workflowExports.length, @@ -122,6 +141,7 @@ export const POST = withRouteHandler( const filename = `workflows-export-${new Date().toISOString().split('T')[0]}.zip` + auditExport() return new NextResponse(arrayBuffer, { status: 200, headers: { diff --git a/apps/sim/app/api/v1/admin/workspaces/[id]/export/route.ts b/apps/sim/app/api/v1/admin/workspaces/[id]/export/route.ts index dcc1638c9b9..e8913c74e0f 100644 --- a/apps/sim/app/api/v1/admin/workspaces/[id]/export/route.ts +++ b/apps/sim/app/api/v1/admin/workspaces/[id]/export/route.ts @@ -11,6 +11,7 @@ * - JSON: WorkspaceExportPayload */ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { workflow, workflowFolder, workspace } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -124,7 +125,25 @@ export const GET = withRouteHandler( `Admin API: Exporting workspace ${workspaceId} with ${workflowExports.length} workflows and ${folderExports.length} folders` ) + const auditExport = () => + recordAudit({ + workspaceId, + actorId: 'admin-api', + action: AuditAction.WORKSPACE_EXPORTED, + resourceType: AuditResourceType.WORKSPACE, + resourceId: workspaceId, + resourceName: workspaceData.name, + description: `Admin API exported workspace "${workspaceData.name}"`, + metadata: { + format, + workflowCount: workflowExports.length, + folderCount: folderExports.length, + }, + request, + }) + if (format === 'json') { + auditExport() const exportPayload: WorkspaceExportPayload = { version: '1.0', exportedAt: new Date().toISOString(), @@ -156,6 +175,7 @@ export const GET = withRouteHandler( const sanitizedName = sanitizePathSegment(workspaceData.name) const filename = `${sanitizedName}-${new Date().toISOString().split('T')[0]}.zip` + auditExport() return new NextResponse(arrayBuffer, { status: 200, headers: { diff --git a/apps/sim/app/api/v1/admin/workspaces/[id]/members/[memberId]/route.ts b/apps/sim/app/api/v1/admin/workspaces/[id]/members/[memberId]/route.ts index c4c0dfe0324..fc3082427ab 100644 --- a/apps/sim/app/api/v1/admin/workspaces/[id]/members/[memberId]/route.ts +++ b/apps/sim/app/api/v1/admin/workspaces/[id]/members/[memberId]/route.ts @@ -21,6 +21,7 @@ * Response: AdminSingleResponse<{ removed: true, memberId: string, userId: string }> */ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { permissions, user, workspace } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -196,6 +197,22 @@ export const PATCH = withRouteHandler( previousPermissions: existingMember.permissionType, }) + recordAudit({ + workspaceId, + actorId: 'admin-api', + action: AuditAction.MEMBER_ROLE_CHANGED, + resourceType: AuditResourceType.WORKSPACE, + resourceId: workspaceId, + description: `Admin API changed workspace member permissions to ${permissionLevel}`, + metadata: { + memberId, + targetUserId: existingMember.userId, + previousPermissions: existingMember.permissionType, + permissions: permissionLevel, + }, + request, + }) + return singleResponse(data) } catch (error) { logger.error('Admin API: Failed to update workspace member', { error, workspaceId, memberId }) @@ -275,6 +292,17 @@ export const DELETE = withRouteHandler( userId: existingMember.userId, }) + recordAudit({ + workspaceId, + actorId: 'admin-api', + action: AuditAction.MEMBER_REMOVED, + resourceType: AuditResourceType.WORKSPACE, + resourceId: workspaceId, + description: 'Admin API removed member from workspace', + metadata: { memberId, targetUserId: existingMember.userId }, + request, + }) + return singleResponse({ removed: true, memberId, diff --git a/apps/sim/app/api/v1/admin/workspaces/[id]/members/route.ts b/apps/sim/app/api/v1/admin/workspaces/[id]/members/route.ts index 8d81b839099..0be8954b693 100644 --- a/apps/sim/app/api/v1/admin/workspaces/[id]/members/route.ts +++ b/apps/sim/app/api/v1/admin/workspaces/[id]/members/route.ts @@ -30,6 +30,7 @@ * Response: AdminSingleResponse<{ removed: true }> */ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { permissions, user, workspace, workspaceEnvironment } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -199,6 +200,21 @@ export const POST = withRouteHandler( newPermissions: permissionLevel, }) + recordAudit({ + workspaceId, + actorId: 'admin-api', + action: AuditAction.MEMBER_ROLE_CHANGED, + resourceType: AuditResourceType.WORKSPACE, + resourceId: workspaceId, + description: `Admin API changed workspace member permissions to ${permissionLevel}`, + metadata: { + targetUserId: userId, + previousPermissions: existingPermission.permissionType, + permissions: permissionLevel, + }, + request, + }) + return singleResponse({ id: existingPermission.id, workspaceId, @@ -245,6 +261,17 @@ export const POST = withRouteHandler( permissionId, }) + recordAudit({ + workspaceId, + actorId: 'admin-api', + action: AuditAction.MEMBER_ADDED, + resourceType: AuditResourceType.WORKSPACE, + resourceId: workspaceId, + description: `Admin API added member to workspace with ${permissionLevel} permissions`, + metadata: { targetUserId: userId, permissions: permissionLevel }, + request, + }) + const [wsEnvRow] = await db .select({ variables: workspaceEnvironment.variables }) .from(workspaceEnvironment) @@ -348,6 +375,17 @@ export const DELETE = withRouteHandler( logger.info(`Admin API: Removed user ${userId} from workspace ${workspaceId}`) + recordAudit({ + workspaceId, + actorId: 'admin-api', + action: AuditAction.MEMBER_REMOVED, + resourceType: AuditResourceType.WORKSPACE, + resourceId: workspaceId, + description: 'Admin API removed member from workspace', + metadata: { targetUserId: userId }, + request, + }) + return singleResponse({ removed: true, userId, workspaceId }) } catch (error) { if (error instanceof WorkspaceBillingAccountRemovalError) { diff --git a/apps/sim/app/api/v1/files/[fileId]/route.ts b/apps/sim/app/api/v1/files/[fileId]/route.ts index 5482a4d378f..258e36b9ffb 100644 --- a/apps/sim/app/api/v1/files/[fileId]/route.ts +++ b/apps/sim/app/api/v1/files/[fileId]/route.ts @@ -1,9 +1,11 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { v1DeleteFileContract, v1DownloadFileContract } from '@/lib/api/contracts/v1/files' import { parseRequest } from '@/lib/api/server' import { generateRequestId } from '@/lib/core/utils/request' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { captureServerEvent } from '@/lib/posthog/server' import { fetchWorkspaceFileBuffer, getWorkspaceFile } from '@/lib/uploads/contexts/workspace' import { performDeleteWorkspaceFileItems } from '@/lib/workspace-files/orchestration' import { @@ -48,6 +50,29 @@ export const GET = withRouteHandler(async (request: NextRequest, context: FileRo const buffer = await fetchWorkspaceFileBuffer(fileRecord) + recordAudit({ + workspaceId, + actorId: userId, + action: AuditAction.FILE_DOWNLOADED, + resourceType: AuditResourceType.FILE, + resourceId: fileRecord.id, + resourceName: fileRecord.name, + description: `Downloaded file "${fileRecord.name}" via API`, + metadata: { + fileId: fileRecord.id, + fileName: fileRecord.name, + bytes: buffer.length, + source: 'api_v1', + }, + request, + }) + captureServerEvent( + userId, + 'file_downloaded', + { workspace_id: workspaceId, is_bulk: false, file_count: 1 }, + { groups: { workspace: workspaceId } } + ) + return new Response(new Uint8Array(buffer), { status: 200, headers: { diff --git a/apps/sim/app/api/workflows/[id]/deploy/route.ts b/apps/sim/app/api/workflows/[id]/deploy/route.ts index 75eca4dc779..440636186e1 100644 --- a/apps/sim/app/api/workflows/[id]/deploy/route.ts +++ b/apps/sim/app/api/workflows/[id]/deploy/route.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db, workflow } from '@sim/db' import { createLogger } from '@sim/logger' import { assertWorkflowMutable, WorkflowLockedError } from '@sim/platform-authz/workflow' @@ -186,6 +187,19 @@ export const PATCH = withRouteHandler( logger.info(`[${requestId}] Updated isPublicApi for workflow ${id} to ${isPublicApi}`) const wsId = workflowData?.workspaceId + + recordAudit({ + workspaceId: wsId ?? null, + actorId: session!.user.id, + action: AuditAction.WORKFLOW_PUBLIC_API_TOGGLED, + resourceType: AuditResourceType.WORKFLOW, + resourceId: id, + resourceName: workflowData?.name ?? undefined, + description: `${isPublicApi ? 'Enabled' : 'Disabled'} public API for workflow "${workflowData?.name ?? id}"`, + metadata: { isPublicApi }, + request, + }) + captureServerEvent( session!.user.id, 'workflow_public_api_toggled', diff --git a/apps/sim/app/api/workflows/[id]/route.ts b/apps/sim/app/api/workflows/[id]/route.ts index d2c1f607b2d..08867154aea 100644 --- a/apps/sim/app/api/workflows/[id]/route.ts +++ b/apps/sim/app/api/workflows/[id]/route.ts @@ -319,6 +319,7 @@ export const PUT = withRouteHandler( workspaceId: workflowData.workspaceId, currentName: workflowData.name, currentFolderId: workflowData.folderId, + currentLocked: workflowData.locked, ...updates, requestId, }) diff --git a/apps/sim/app/api/workspaces/[id]/files/[fileId]/download/route.ts b/apps/sim/app/api/workspaces/[id]/files/[fileId]/download/route.ts index d56e7ab6442..77c8900a718 100644 --- a/apps/sim/app/api/workspaces/[id]/files/[fileId]/download/route.ts +++ b/apps/sim/app/api/workspaces/[id]/files/[fileId]/download/route.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { createLogger } from '@sim/logger' import { getErrorMessage } from '@sim/utils/errors' import { type NextRequest, NextResponse } from 'next/server' @@ -6,6 +7,7 @@ import { getValidationErrorMessage } from '@/lib/api/server' import { getSession } from '@/lib/auth' import { generateRequestId } from '@/lib/core/utils/request' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { captureServerEvent } from '@/lib/posthog/server' import { getWorkspaceFile } from '@/lib/uploads/contexts/workspace' import { verifyWorkspaceMembership } from '@/app/api/workflows/utils' @@ -55,6 +57,24 @@ export const POST = withRouteHandler( logger.info(`[${requestId}] Generated download URL for workspace file: ${fileRecord.name}`) + recordAudit({ + workspaceId, + actorId: session.user.id, + action: AuditAction.FILE_DOWNLOADED, + resourceType: AuditResourceType.FILE, + resourceId: fileId, + resourceName: fileRecord.name, + description: `Downloaded file "${fileRecord.name}"`, + metadata: { fileId, fileName: fileRecord.name, bytes: fileRecord.size }, + request, + }) + captureServerEvent( + session.user.id, + 'file_downloaded', + { workspace_id: workspaceId, is_bulk: false, file_count: 1 }, + { groups: { workspace: workspaceId } } + ) + return NextResponse.json({ success: true, downloadUrl: serveUrl, diff --git a/apps/sim/app/api/workspaces/[id]/files/download/route.ts b/apps/sim/app/api/workspaces/[id]/files/download/route.ts index c65b5438158..b6220308e55 100644 --- a/apps/sim/app/api/workspaces/[id]/files/download/route.ts +++ b/apps/sim/app/api/workspaces/[id]/files/download/route.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { createLogger } from '@sim/logger' import JSZip from 'jszip' import { type NextRequest, NextResponse } from 'next/server' @@ -5,6 +6,7 @@ import { downloadWorkspaceFileItemsContract } from '@/lib/api/contracts/workspac import { parseRequest } from '@/lib/api/server' import { getSession } from '@/lib/auth' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { captureServerEvent } from '@/lib/posthog/server' import { buildWorkspaceFileFolderPathMap, fetchWorkspaceFileBuffer, @@ -135,6 +137,23 @@ export const GET = withRouteHandler( } const zipBuffer = await zip.generateAsync({ type: 'nodebuffer' }) + + recordAudit({ + workspaceId, + actorId: session.user.id, + action: AuditAction.FILE_DOWNLOADED, + resourceType: AuditResourceType.FILE, + description: `Downloaded ${filesToZip.length} file${filesToZip.length === 1 ? '' : 's'} as zip`, + metadata: { fileCount: filesToZip.length, totalBytes }, + request, + }) + captureServerEvent( + session.user.id, + 'file_downloaded', + { workspace_id: workspaceId, is_bulk: true, file_count: filesToZip.length }, + { groups: { workspace: workspaceId } } + ) + return new NextResponse(new Uint8Array(zipBuffer), { headers: { 'Content-Type': 'application/zip', diff --git a/apps/sim/app/api/workspaces/members/[id]/route.ts b/apps/sim/app/api/workspaces/members/[id]/route.ts index 54889076a58..51a1e038c63 100644 --- a/apps/sim/app/api/workspaces/members/[id]/route.ts +++ b/apps/sim/app/api/workspaces/members/[id]/route.ts @@ -188,6 +188,7 @@ export const DELETE = withRouteHandler( seatReduction = await reconcileOrganizationSeats({ organizationId, reason: 'member-removed', + actorId: session.user.id, }) } catch (seatError) { logger.error('Failed to reduce seats after workspace member removal', { diff --git a/apps/sim/app/workspace/[workspaceId]/providers/workspace-scope-sync.tsx b/apps/sim/app/workspace/[workspaceId]/providers/workspace-scope-sync.tsx index 72c859a4a56..7173e394dd2 100644 --- a/apps/sim/app/workspace/[workspaceId]/providers/workspace-scope-sync.tsx +++ b/apps/sim/app/workspace/[workspaceId]/providers/workspace-scope-sync.tsx @@ -3,6 +3,7 @@ import { useEffect } from 'react' import { useParams } from 'next/navigation' import { usePostHog } from 'posthog-js/react' +import { useWorkspacesWithMetadata } from '@/hooks/queries/workspace' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' /** @@ -13,11 +14,25 @@ export function WorkspaceScopeSync() { const hydrationWorkspaceId = useWorkflowRegistry((state) => state.hydration.workspaceId) const switchToWorkspace = useWorkflowRegistry((state) => state.switchToWorkspace) const posthog = usePostHog() + const { data: workspaceData } = useWorkspacesWithMetadata() + + const activeWorkspace = workspaceData?.workspaces.find((ws) => ws.id === workspaceId) + const workspaceName = activeWorkspace?.name + const organizationId = activeWorkspace?.organizationId ?? null useEffect(() => { - if (!workspaceId) return - posthog?.group('workspace', workspaceId) - }, [posthog, workspaceId]) + // Wait for metadata so the workspace and org groups switch together; acting + // mid-load (organizationId transiently null) would mismatch or strip them. + if (!workspaceId || !activeWorkspace) return + if (organizationId) { + posthog?.group('organization', organizationId) + } else { + // No org — clear any stale org group; resetGroups clears all, so the + // workspace group is re-applied immediately below. + posthog?.resetGroups() + } + posthog?.group('workspace', workspaceId, workspaceName ? { name: workspaceName } : undefined) + }, [posthog, workspaceId, workspaceName, organizationId, activeWorkspace]) useEffect(() => { if (!workspaceId || hydrationWorkspaceId === workspaceId) { diff --git a/apps/sim/lib/billing/organizations/seats.ts b/apps/sim/lib/billing/organizations/seats.ts index a956a38d444..bd87a02608d 100644 --- a/apps/sim/lib/billing/organizations/seats.ts +++ b/apps/sim/lib/billing/organizations/seats.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { member, subscription } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -8,6 +9,7 @@ import { USABLE_SUBSCRIPTION_STATUSES } from '@/lib/billing/subscriptions/utils' import { OUTBOX_EVENT_TYPES } from '@/lib/billing/webhooks/outbox-handlers' import { isBillingEnabled } from '@/lib/core/config/env-flags' import { enqueueOutboxEvent } from '@/lib/core/outbox/service' +import { captureServerEvent } from '@/lib/posthog/server' const logger = createLogger('OrganizationSeats') @@ -22,6 +24,13 @@ export interface ReconcileOrganizationSeatsResult { interface ReconcileOrganizationSeatsParams { organizationId: string reason: string + /** + * Real `user.id` of the actor whose action triggered this reconcile, used to + * attribute the seat-change audit log and analytics event. Omit for system + * reconciles (e.g. the seat-drift sweep) that have no acting user; the audit + * is then skipped and analytics fall back to the organization id. + */ + actorId?: string } /** @@ -41,6 +50,7 @@ interface ReconcileOrganizationSeatsParams { export async function reconcileOrganizationSeats({ organizationId, reason, + actorId, }: ReconcileOrganizationSeatsParams): Promise { if (!isBillingEnabled) { return { changed: false, reason: 'Billing is not enabled' } @@ -142,6 +152,30 @@ export async function reconcileOrganizationSeats({ outboxEventId: outcome.outboxEventId, }) + const increased = outcome.seats > outcome.previousSeats + if (actorId) { + recordAudit({ + workspaceId: null, + actorId, + action: increased ? AuditAction.ORG_SEAT_PROVISIONED : AuditAction.ORG_SEAT_DEPROVISIONED, + resourceType: AuditResourceType.ORGANIZATION, + resourceId: organizationId, + description: `${increased ? 'Provisioned' : 'Deprovisioned'} organization seats: ${outcome.previousSeats} → ${outcome.seats}`, + metadata: { previousSeats: outcome.previousSeats, seats: outcome.seats, reason }, + }) + } + captureServerEvent( + actorId ?? organizationId, + increased ? 'seats_provisioned' : 'seats_deprovisioned', + { + organization_id: organizationId, + previous_seats: outcome.previousSeats, + seats: outcome.seats, + reason, + }, + { groups: { organization: organizationId } } + ) + return { changed: true, previousSeats: outcome.previousSeats, diff --git a/apps/sim/lib/billing/storage/index.ts b/apps/sim/lib/billing/storage/index.ts index 50e10480a3d..9203c24b455 100644 --- a/apps/sim/lib/billing/storage/index.ts +++ b/apps/sim/lib/billing/storage/index.ts @@ -1,2 +1,6 @@ export { checkStorageQuota, getUserStorageLimit, getUserStorageUsage } from './limits' -export { decrementStorageUsage, incrementStorageUsage } from './tracking' +export { + decrementStorageUsage, + decrementStorageUsageInTx, + incrementStorageUsage, +} from './tracking' diff --git a/apps/sim/lib/billing/storage/tracking.ts b/apps/sim/lib/billing/storage/tracking.ts index 8c29fbb8b75..6311a881846 100644 --- a/apps/sim/lib/billing/storage/tracking.ts +++ b/apps/sim/lib/billing/storage/tracking.ts @@ -165,3 +165,33 @@ export async function decrementStorageUsage( void maybeNotifyStorageLimit(userId, workspaceId, sub, true) } } + +type StorageTransaction = Parameters[0]>[0] + +/** + * Decrement a user's (or their org's) storage counter inside an existing + * transaction, using a pre-resolved subscription. This lets a caller make the + * counter update atomic with the DB rows it is deleting (e.g. hard-deleting + * documents), so a failure of either rolls back both — no inflated counter, no + * over-decrement. The caller resolves the subscription (a read) before opening + * the transaction. + */ +export async function decrementStorageUsageInTx( + tx: StorageTransaction, + sub: HighestPrioritySubscription | null, + userId: string, + bytes: number +): Promise { + if (!isBillingEnabled || bytes <= 0) return + if (isOrgScopedSubscription(sub, userId) && sub) { + await tx + .update(organization) + .set({ storageUsedBytes: sql`GREATEST(0, ${organization.storageUsedBytes} - ${bytes})` }) + .where(eq(organization.id, sub.referenceId)) + } else { + await tx + .update(userStats) + .set({ storageUsedBytes: sql`GREATEST(0, ${userStats.storageUsedBytes} - ${bytes})` }) + .where(eq(userStats.userId, userId)) + } +} diff --git a/apps/sim/lib/billing/threshold-billing.ts b/apps/sim/lib/billing/threshold-billing.ts index 1f5cfd67d4f..bb52ae62980 100644 --- a/apps/sim/lib/billing/threshold-billing.ts +++ b/apps/sim/lib/billing/threshold-billing.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { member, organization, subscription, userStats } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -19,6 +20,7 @@ import { toDecimal, toNumber } from '@/lib/billing/utils/decimal' import { OUTBOX_EVENT_TYPES } from '@/lib/billing/webhooks/outbox-handlers' import { env, envNumber } from '@/lib/core/config/env' import { enqueueOutboxEvent } from '@/lib/core/outbox/service' +import { captureServerEvent } from '@/lib/posthog/server' const logger = createLogger('ThresholdBilling') @@ -118,73 +120,97 @@ export async function checkAndBillOverageThreshold(userId: string): Promise { - await tx.execute(sql.raw(`SET LOCAL lock_timeout = '${BILLING_LOCK_TIMEOUT_MS}ms'`)) - - const statsRecords = await tx - .select() - .from(userStats) - .where(eq(userStats.userId, userId)) - .for('update') - .limit(1) + const billedResult = await db.transaction( + async ( + tx + ): Promise<{ + amount: number + creditsApplied: number + settledVia: 'stripe' | 'credits' + } | null> => { + await tx.execute(sql.raw(`SET LOCAL lock_timeout = '${BILLING_LOCK_TIMEOUT_MS}ms'`)) + + const statsRecords = await tx + .select() + .from(userStats) + .where(eq(userStats.userId, userId)) + .for('update') + .limit(1) + + if (statsRecords.length === 0) { + logger.warn('User stats not found for threshold billing', { userId }) + return null + } + + const stats = statsRecords[0] + const lockedUsageSnapshot = personalUsageSnapshotFromStats(stats) + if (!personalUsageSnapshotMatches(usageSnapshot, lockedUsageSnapshot)) { + logger.debug('Personal usage changed during threshold billing check; retry later', { + userId, + usageSnapshot, + lockedUsageSnapshot, + }) + return null + } - if (statsRecords.length === 0) { - logger.warn('User stats not found for threshold billing', { userId }) - return - } + const billedOverageThisPeriod = toNumber(toDecimal(stats.billedOverageThisPeriod)) + const unbilledOverage = Math.max(0, currentOverage - billedOverageThisPeriod) - const stats = statsRecords[0] - const lockedUsageSnapshot = personalUsageSnapshotFromStats(stats) - if (!personalUsageSnapshotMatches(usageSnapshot, lockedUsageSnapshot)) { - logger.debug('Personal usage changed during threshold billing check; retry later', { + logger.debug('Threshold billing check', { userId, - usageSnapshot, - lockedUsageSnapshot, + plan: userSubscription.plan, + currentOverage, + billedOverageThisPeriod, + unbilledOverage, + threshold, }) - return - } - - const billedOverageThisPeriod = toNumber(toDecimal(stats.billedOverageThisPeriod)) - const unbilledOverage = Math.max(0, currentOverage - billedOverageThisPeriod) - - logger.debug('Threshold billing check', { - userId, - plan: userSubscription.plan, - currentOverage, - billedOverageThisPeriod, - unbilledOverage, - threshold, - }) - - if (unbilledOverage < threshold) { - return - } - // Apply credits to reduce the amount to bill (use stats from locked row) - let amountToBill = unbilledOverage - let creditsApplied = 0 - const creditBalance = toNumber(toDecimal(stats.creditBalance)) - - if (creditBalance > 0) { - creditsApplied = Math.min(creditBalance, amountToBill) - await tx - .update(userStats) - .set({ - creditBalance: sql`GREATEST(0, ${userStats.creditBalance} - ${creditsApplied})`, + if (unbilledOverage < threshold) { + return null + } + + // Apply credits to reduce the amount to bill (use stats from locked row) + let amountToBill = unbilledOverage + let creditsApplied = 0 + const creditBalance = toNumber(toDecimal(stats.creditBalance)) + + if (creditBalance > 0) { + creditsApplied = Math.min(creditBalance, amountToBill) + await tx + .update(userStats) + .set({ + creditBalance: sql`GREATEST(0, ${userStats.creditBalance} - ${creditsApplied})`, + }) + .where(eq(userStats.userId, userId)) + amountToBill = amountToBill - creditsApplied + + logger.info('Applied credits to reduce threshold overage', { + userId, + creditBalance, + creditsApplied, + remainingToBill: amountToBill, }) - .where(eq(userStats.userId, userId)) - amountToBill = amountToBill - creditsApplied + } + + // If credits covered everything, bump billed tracker but don't enqueue Stripe invoice. + if (amountToBill <= 0) { + await tx + .update(userStats) + .set({ + billedOverageThisPeriod: sql`${userStats.billedOverageThisPeriod} + ${unbilledOverage}`, + }) + .where(eq(userStats.userId, userId)) + + logger.info('Credits fully covered threshold overage', { + userId, + creditsApplied, + unbilledOverage, + }) + return { amount: unbilledOverage, creditsApplied, settledVia: 'credits' } + } - logger.info('Applied credits to reduce threshold overage', { - userId, - creditBalance, - creditsApplied, - remainingToBill: amountToBill, - }) - } + const amountCents = Math.round(amountToBill * 100) - // If credits covered everything, bump billed tracker but don't enqueue Stripe invoice. - if (amountToBill <= 0) { await tx .update(userStats) .set({ @@ -192,51 +218,66 @@ export async function checkAndBillOverageThreshold(userId: string): Promise { - await tx.execute(sql.raw(`SET LOCAL lock_timeout = '${BILLING_LOCK_TIMEOUT_MS}ms'`)) - - const lockedOwnerRows = await tx - .select({ userId: member.userId }) - .from(member) - .where(and(eq(member.organizationId, organizationId), eq(member.role, 'owner'))) - .for('update') - .limit(1) - const lockedOwnerId = lockedOwnerRows[0]?.userId - if (!lockedOwnerId) { - logger.error('Organization owner not found after locking organization', { organizationId }) - return - } + const orgBilledResult = await db.transaction( + async ( + tx + ): Promise<{ + amount: number + creditsApplied: number + ownerId: string + settledVia: 'stripe' | 'credits' + } | null> => { + await tx.execute(sql.raw(`SET LOCAL lock_timeout = '${BILLING_LOCK_TIMEOUT_MS}ms'`)) + + const lockedOwnerRows = await tx + .select({ userId: member.userId }) + .from(member) + .where(and(eq(member.organizationId, organizationId), eq(member.role, 'owner'))) + .for('update') + .limit(1) + const lockedOwnerId = lockedOwnerRows[0]?.userId + if (!lockedOwnerId) { + logger.error('Organization owner not found after locking organization', { + organizationId, + }) + return null + } - const ownerStatsLock = await tx - .select() - .from(userStats) - .where(eq(userStats.userId, lockedOwnerId)) - .for('update') - .limit(1) - if (ownerStatsLock.length === 0) { - logger.error('Owner stats not found', { organizationId, ownerId: lockedOwnerId }) - return - } + const ownerStatsLock = await tx + .select() + .from(userStats) + .where(eq(userStats.userId, lockedOwnerId)) + .for('update') + .limit(1) + if (ownerStatsLock.length === 0) { + logger.error('Owner stats not found', { organizationId, ownerId: lockedOwnerId }) + return null + } + + const orgLock = await tx + .select() + .from(organization) + .where(eq(organization.id, organizationId)) + .for('update') + .limit(1) + + if (orgLock.length === 0) { + logger.error('Organization not found', { organizationId }) + return null + } + + const lockedMemberUsageRows = await tx + .select({ + userId: member.userId, + role: member.role, + currentPeriodCost: userStats.currentPeriodCost, + departedMemberUsage: organization.departedMemberUsage, + }) + .from(member) + .leftJoin(userStats, eq(member.userId, userStats.userId)) + .innerJoin(organization, eq(organization.id, member.organizationId)) + .where(eq(member.organizationId, organizationId)) + + const lockedUsageSnapshot = buildOrganizationUsageSnapshot(lockedMemberUsageRows) + if ( + !lockedUsageSnapshot || + lockedOwnerId !== usageSnapshot.ownerId || + !organizationUsageSnapshotMatches(usageSnapshot, lockedUsageSnapshot) + ) { + logger.debug('Organization usage changed during threshold billing check; retry later', { + organizationId, + usageSnapshot, + lockedUsageSnapshot, + lockedOwnerId, + }) + return null + } - const orgLock = await tx - .select() - .from(organization) - .where(eq(organization.id, organizationId)) - .for('update') - .limit(1) + const totalBilledOverage = toNumber(toDecimal(ownerStatsLock[0].billedOverageThisPeriod)) + const orgCreditBalance = toNumber(toDecimal(orgLock[0].creditBalance)) - if (orgLock.length === 0) { - logger.error('Organization not found', { organizationId }) - return - } + const unbilledOverage = Math.max(0, currentOverage - totalBilledOverage) - const lockedMemberUsageRows = await tx - .select({ - userId: member.userId, - role: member.role, - currentPeriodCost: userStats.currentPeriodCost, - departedMemberUsage: organization.departedMemberUsage, - }) - .from(member) - .leftJoin(userStats, eq(member.userId, userStats.userId)) - .innerJoin(organization, eq(organization.id, member.organizationId)) - .where(eq(member.organizationId, organizationId)) - - const lockedUsageSnapshot = buildOrganizationUsageSnapshot(lockedMemberUsageRows) - if ( - !lockedUsageSnapshot || - lockedOwnerId !== usageSnapshot.ownerId || - !organizationUsageSnapshotMatches(usageSnapshot, lockedUsageSnapshot) - ) { - logger.debug('Organization usage changed during threshold billing check; retry later', { + logger.debug('Organization threshold billing check', { organizationId, - usageSnapshot, - lockedUsageSnapshot, - lockedOwnerId, + totalTeamUsage: + usageSnapshot.pooledCurrentPeriodCost + ledgerUsage + usageSnapshot.departedMemberUsage, + ledgerUsage, + effectiveTeamUsage, + basePrice, + currentOverage, + totalBilledOverage, + unbilledOverage, + threshold, }) - return - } - - const totalBilledOverage = toNumber(toDecimal(ownerStatsLock[0].billedOverageThisPeriod)) - const orgCreditBalance = toNumber(toDecimal(orgLock[0].creditBalance)) - const unbilledOverage = Math.max(0, currentOverage - totalBilledOverage) - - logger.debug('Organization threshold billing check', { - organizationId, - totalTeamUsage: - usageSnapshot.pooledCurrentPeriodCost + ledgerUsage + usageSnapshot.departedMemberUsage, - ledgerUsage, - effectiveTeamUsage, - basePrice, - currentOverage, - totalBilledOverage, - unbilledOverage, - threshold, - }) - - if (unbilledOverage < threshold) { - return - } - - let amountToBill = unbilledOverage - let creditsApplied = 0 - - if (orgCreditBalance > 0) { - creditsApplied = Math.min(orgCreditBalance, amountToBill) - await tx - .update(organization) - .set({ - creditBalance: sql`GREATEST(0, ${organization.creditBalance} - ${creditsApplied})`, + if (unbilledOverage < threshold) { + return null + } + + let amountToBill = unbilledOverage + let creditsApplied = 0 + + if (orgCreditBalance > 0) { + creditsApplied = Math.min(orgCreditBalance, amountToBill) + await tx + .update(organization) + .set({ + creditBalance: sql`GREATEST(0, ${organization.creditBalance} - ${creditsApplied})`, + }) + .where(eq(organization.id, organizationId)) + amountToBill = amountToBill - creditsApplied + + logger.info('Applied org credits to reduce threshold overage', { + organizationId, + creditBalance: orgCreditBalance, + creditsApplied, + remainingToBill: amountToBill, }) - .where(eq(organization.id, organizationId)) - amountToBill = amountToBill - creditsApplied - - logger.info('Applied org credits to reduce threshold overage', { - organizationId, - creditBalance: orgCreditBalance, - creditsApplied, - remainingToBill: amountToBill, - }) - } - - // If credits covered everything, bump billed tracker but don't enqueue Stripe invoice. - if (amountToBill <= 0) { + } + + // If credits covered everything, bump billed tracker but don't enqueue Stripe invoice. + if (amountToBill <= 0) { + await tx + .update(userStats) + .set({ + billedOverageThisPeriod: sql`${userStats.billedOverageThisPeriod} + ${unbilledOverage}`, + }) + .where(eq(userStats.userId, lockedOwnerId)) + + logger.info('Credits fully covered org threshold overage', { + organizationId, + creditsApplied, + unbilledOverage, + }) + return { + amount: unbilledOverage, + creditsApplied, + ownerId: lockedOwnerId, + settledVia: 'credits', + } + } + + const amountCents = Math.round(amountToBill * 100) + + // Bump billed tracker and enqueue Stripe invoice atomically. + // See user-path above for the full retry-invariant reasoning. await tx .update(userStats) .set({ @@ -489,52 +564,70 @@ async function checkAndBillOrganizationOverageThreshold(organizationId: string): }) .where(eq(userStats.userId, lockedOwnerId)) - logger.info('Credits fully covered org threshold overage', { + await enqueueOutboxEvent(tx, OUTBOX_EVENT_TYPES.STRIPE_THRESHOLD_OVERAGE_INVOICE, { + customerId, + stripeSubscriptionId, + amountCents, + description: `Team threshold overage billing – ${billingPeriod}`, + itemDescription: `Team usage overage ($${amountToBill.toFixed(2)})`, + billingPeriod, + invoiceIdemKeyStem: `threshold-overage-org-invoice:${customerId}:${stripeSubscriptionId}:${billingPeriod}:${totalOverageCents}:${amountCents}`, + itemIdemKeyStem: `threshold-overage-org-item:${customerId}:${stripeSubscriptionId}:${billingPeriod}:${totalOverageCents}:${amountCents}`, + metadata: { + type: 'overage_threshold_billing_org', + organizationId, + subscriptionId: stripeSubscriptionId, + billingPeriod, + totalOverageAtTimeOfBilling: currentOverage.toFixed(2), + }, + }) + + logger.info('Queued organization threshold overage invoice for Stripe', { organizationId, + ownerId: lockedOwnerId, creditsApplied, - unbilledOverage, + amountBilled: amountToBill, + totalProcessed: unbilledOverage, + billingPeriod, }) - return - } - - const amountCents = Math.round(amountToBill * 100) - // Bump billed tracker and enqueue Stripe invoice atomically. - // See user-path above for the full retry-invariant reasoning. - await tx - .update(userStats) - .set({ - billedOverageThisPeriod: sql`${userStats.billedOverageThisPeriod} + ${unbilledOverage}`, - }) - .where(eq(userStats.userId, lockedOwnerId)) - - await enqueueOutboxEvent(tx, OUTBOX_EVENT_TYPES.STRIPE_THRESHOLD_OVERAGE_INVOICE, { - customerId, - stripeSubscriptionId, - amountCents, - description: `Team threshold overage billing – ${billingPeriod}`, - itemDescription: `Team usage overage ($${amountToBill.toFixed(2)})`, - billingPeriod, - invoiceIdemKeyStem: `threshold-overage-org-invoice:${customerId}:${stripeSubscriptionId}:${billingPeriod}:${totalOverageCents}:${amountCents}`, - itemIdemKeyStem: `threshold-overage-org-item:${customerId}:${stripeSubscriptionId}:${billingPeriod}:${totalOverageCents}:${amountCents}`, + return { + amount: amountToBill, + creditsApplied, + ownerId: lockedOwnerId, + settledVia: 'stripe', + } + } + ) + + if (orgBilledResult) { + const { amount, creditsApplied, ownerId, settledVia } = orgBilledResult + const settledLabel = settledVia === 'credits' ? 'covered by credits' : 'billed' + recordAudit({ + actorId: ownerId, + action: AuditAction.OVERAGE_BILLED, + resourceType: AuditResourceType.BILLING, + resourceId: orgSubscription.id, + description: `Overage of $${amount.toFixed(2)} ${settledLabel} for organization ${organizationId}`, metadata: { - type: 'overage_threshold_billing_org', - organizationId, - subscriptionId: stripeSubscriptionId, + entityType: 'organization', + referenceId: organizationId, + plan: orgSubscription.plan, + amount, + currency: 'usd', + creditsApplied, + settledVia, billingPeriod, - totalOverageAtTimeOfBilling: currentOverage.toFixed(2), }, }) - - logger.info('Queued organization threshold overage invoice for Stripe', { - organizationId, - ownerId: lockedOwnerId, - creditsApplied, - amountBilled: amountToBill, - totalProcessed: unbilledOverage, - billingPeriod, + captureServerEvent(ownerId, 'overage_billed', { + amount, + currency: 'usd', + entity_type: 'organization', + reference_id: organizationId, + settled_via: settledVia, }) - }) + } } catch (error) { logger.error('Error in organization threshold billing', { organizationId, diff --git a/apps/sim/lib/billing/webhooks/disputes.ts b/apps/sim/lib/billing/webhooks/disputes.ts index 647ad8a9cae..ed0702c4a4b 100644 --- a/apps/sim/lib/billing/webhooks/disputes.ts +++ b/apps/sim/lib/billing/webhooks/disputes.ts @@ -1,10 +1,12 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' -import { subscription, user, userStats } from '@sim/db/schema' +import { member, subscription, user, userStats } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq } from 'drizzle-orm' import type Stripe from 'stripe' import { blockOrgMembers, unblockOrgMembers } from '@/lib/billing' import { requireStripeClient } from '@/lib/billing/stripe-client' +import { captureServerEvent } from '@/lib/posthog/server' const logger = createLogger('DisputeWebhooks') @@ -17,6 +19,59 @@ async function getCustomerIdFromDispute(dispute: Stripe.Dispute): Promise { + try { + const rows = await db + .select({ userId: member.userId }) + .from(member) + .where(and(eq(member.organizationId, organizationId), eq(member.role, 'owner'))) + .limit(1) + return rows[0]?.userId ?? null + } catch (error) { + logger.warn('Failed to resolve organization owner for dispute audit', { organizationId, error }) + return null + } +} + +/** + * Record audit + PostHog instrumentation for a charge dispute money event. + * `actorId` must be the responsible user (org owner for org-scoped charges). + */ +function recordDisputeInstrumentation( + status: 'opened' | 'closed', + dispute: Stripe.Dispute, + customerId: string, + actorId: string, + entity: { type: 'user' | 'organization'; id: string } +): void { + const amount = dispute.amount / 100 + recordAudit({ + actorId, + action: + status === 'opened' ? AuditAction.CHARGE_DISPUTE_OPENED : AuditAction.CHARGE_DISPUTE_CLOSED, + resourceType: AuditResourceType.BILLING, + resourceId: dispute.id, + description: `Charge dispute ${status} for $${amount.toFixed(2)} (${dispute.reason})`, + metadata: { + entityType: entity.type, + entityId: entity.id, + customerId, + amount, + currency: dispute.currency, + reason: dispute.reason, + status: dispute.status, + }, + }) + captureServerEvent(actorId, 'charge_disputed', { + amount, + currency: dispute.currency, + reason: dispute.reason, + status, + entity_type: entity.type, + reference_id: entity.id, + }) +} + /** * Handles charge.dispute.created - blocks the responsible user */ @@ -46,6 +101,11 @@ export async function handleChargeDispute(event: Stripe.Event): Promise { disputeId: dispute.id, userId: users[0].id, }) + + recordDisputeInstrumentation('opened', dispute, customerId, users[0].id, { + type: 'user', + id: users[0].id, + }) return } @@ -67,6 +127,12 @@ export async function handleChargeDispute(event: Stripe.Event): Promise { memberCount, }) } + + const actorId = (await getOrganizationOwnerId(orgId)) ?? orgId + recordDisputeInstrumentation('opened', dispute, customerId, actorId, { + type: 'organization', + id: orgId, + }) } } @@ -81,23 +147,15 @@ export async function handleChargeDispute(event: Stripe.Event): Promise { export async function handleDisputeClosed(event: Stripe.Event): Promise { const dispute = event.data.object as Stripe.Dispute - // Only unblock if we won or the warning was closed without a full dispute - const shouldUnblock = dispute.status === 'won' || dispute.status === 'warning_closed' - - if (!shouldUnblock) { - logger.info('Dispute resolved against us, user remains blocked', { - disputeId: dispute.id, - status: dispute.status, - }) - return - } - const customerId = await getCustomerIdFromDispute(dispute) if (!customerId) { return } - // Find and unblock user (Pro plans) - only if blocked for dispute, not other reasons + // Unblock only on won/warning_closed; a 'lost' dispute stays blocked. The close + // is audited in every case (dispute.status in metadata distinguishes the outcome). + const shouldUnblock = dispute.status === 'won' || dispute.status === 'warning_closed' + const users = await db .select({ id: user.id }) .from(user) @@ -105,20 +163,28 @@ export async function handleDisputeClosed(event: Stripe.Event): Promise { .limit(1) if (users.length > 0) { - await db - .update(userStats) - .set({ billingBlocked: false, billingBlockedReason: null }) - .where(and(eq(userStats.userId, users[0].id), eq(userStats.billingBlockedReason, 'dispute'))) - - logger.info('Unblocked user after dispute resolved in our favor', { + if (shouldUnblock) { + await db + .update(userStats) + .set({ billingBlocked: false, billingBlockedReason: null }) + .where( + and(eq(userStats.userId, users[0].id), eq(userStats.billingBlockedReason, 'dispute')) + ) + } + logger.info('Dispute closed for user', { disputeId: dispute.id, userId: users[0].id, status: dispute.status, + unblocked: shouldUnblock, + }) + + recordDisputeInstrumentation('closed', dispute, customerId, users[0].id, { + type: 'user', + id: users[0].id, }) return } - // Find and unblock all org members (Team/Enterprise) - consistent with payment success const subs = await db .select({ referenceId: subscription.referenceId }) .from(subscription) @@ -127,13 +193,20 @@ export async function handleDisputeClosed(event: Stripe.Event): Promise { if (subs.length > 0) { const orgId = subs[0].referenceId - const memberCount = await unblockOrgMembers(orgId, 'dispute') - - logger.info('Unblocked all org members after dispute resolved in our favor', { + if (shouldUnblock) { + await unblockOrgMembers(orgId, 'dispute') + } + logger.info('Dispute closed for organization', { disputeId: dispute.id, organizationId: orgId, - memberCount, status: dispute.status, + unblocked: shouldUnblock, + }) + + const actorId = (await getOrganizationOwnerId(orgId)) ?? orgId + recordDisputeInstrumentation('closed', dispute, customerId, actorId, { + type: 'organization', + id: orgId, }) } } diff --git a/apps/sim/lib/billing/webhooks/enterprise.ts b/apps/sim/lib/billing/webhooks/enterprise.ts index c717024001e..13ca7d0bf8e 100644 --- a/apps/sim/lib/billing/webhooks/enterprise.ts +++ b/apps/sim/lib/billing/webhooks/enterprise.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { organization, subscription, user } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -7,6 +8,7 @@ import type Stripe from 'stripe' import { getEmailSubject, renderEnterpriseSubscriptionEmail } from '@/components/emails' import { sendEmail } from '@/lib/messaging/email/mailer' import { getFromEmailAddress } from '@/lib/messaging/email/utils' +import { captureServerEvent } from '@/lib/posthog/server' import { parseEnterpriseSubscriptionMetadata } from '../types' const logger = createLogger('BillingEnterprise') @@ -145,6 +147,43 @@ export async function handleManualEnterpriseSubscription(event: Stripe.Event) { note: 'Seats from metadata, Stripe quantity set to 1', }) + let actorId = referenceId + try { + const [provisioningUser] = await db + .select({ id: user.id }) + .from(user) + .where(eq(user.stripeCustomerId, stripeCustomerId)) + .limit(1) + actorId = provisioningUser?.id ?? referenceId + } catch (error) { + logger.warn('Failed to resolve enterprise provisioning actor; falling back to reference id', { + referenceId, + error, + }) + } + + recordAudit({ + actorId, + action: AuditAction.ENTERPRISE_SUBSCRIPTION_PROVISIONED, + resourceType: AuditResourceType.SUBSCRIPTION, + resourceId: subscriptionRow.id, + description: `Enterprise subscription provisioned for organization ${referenceId} (${seats} seats)`, + metadata: { + organizationId: referenceId, + stripeCustomerId, + stripeSubscriptionId: stripeSubscription.id, + seats, + monthlyPrice, + currency: 'usd', + }, + }) + captureServerEvent(actorId, 'enterprise_subscription_created', { + reference_id: referenceId, + seats, + monthly_price: monthlyPrice, + currency: 'usd', + }) + try { const userDetails = await db .select({ diff --git a/apps/sim/lib/billing/webhooks/invoices.ts b/apps/sim/lib/billing/webhooks/invoices.ts index a3e026f2006..f72b39b9763 100644 --- a/apps/sim/lib/billing/webhooks/invoices.ts +++ b/apps/sim/lib/billing/webhooks/invoices.ts @@ -1,4 +1,5 @@ import { render } from '@react-email/render' +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { member, @@ -31,9 +32,35 @@ import { getBaseUrl } from '@/lib/core/utils/urls' import { sendEmail } from '@/lib/messaging/email/mailer' import { getPersonalEmailFrom } from '@/lib/messaging/email/utils' import { quickValidateEmail } from '@/lib/messaging/email/validation' +import { captureServerEvent } from '@/lib/posthog/server' const logger = createLogger('StripeInvoiceWebhooks') +/** + * Resolve the audit actor for a billing event. For org-scoped subscriptions the + * actor is the org owner; for personal subscriptions it is the reference (user) + * id. The owner lookup is best-effort — a failure must never break webhook + * processing, so it falls back to the reference id (which the audit layer nulls + * to a system actor if it is not a real user id). + */ +async function resolveBillingActorId(isOrgScoped: boolean, referenceId: string): Promise { + if (!isOrgScoped) return referenceId + try { + const ownerRows = await db + .select({ userId: member.userId }) + .from(member) + .where(and(eq(member.organizationId, referenceId), eq(member.role, 'owner'))) + .limit(1) + return ownerRows[0]?.userId ?? referenceId + } catch (error) { + logger.warn('Failed to resolve billing actor; falling back to reference id', { + referenceId, + error, + }) + return referenceId + } +} + function getSubscriptionLinePeriod( invoice: Stripe.Invoice, stripeSubscriptionId: string @@ -711,6 +738,30 @@ async function handleCreditPurchaseSuccess(invoice: Stripe.Invoice): Promise = [] @@ -862,6 +913,33 @@ export async function handleInvoicePaymentSucceeded(event: Stripe.Event) { periodEnd: invoicePeriod?.periodEnd ?? null, }) } + + const entityType = subIsOrgScoped ? 'organization' : 'user' + const amountPaid = (invoice.amount_paid ?? 0) / 100 + const actorId = await resolveBillingActorId(subIsOrgScoped, sub.referenceId) + + recordAudit({ + actorId, + action: AuditAction.INVOICE_PAYMENT_SUCCEEDED, + resourceType: AuditResourceType.BILLING, + resourceId: invoice.id, + description: `Invoice payment of $${amountPaid.toFixed(2)} succeeded for ${entityType} ${sub.referenceId}`, + metadata: { + entityType, + referenceId: sub.referenceId, + plan: sub.plan, + amount: amountPaid, + currency: invoice.currency ?? 'usd', + invoiceId: invoice.id, + }, + }) + captureServerEvent(actorId, 'payment_succeeded', { + plan: sub.plan ?? 'unknown', + amount: amountPaid, + currency: invoice.currency ?? 'usd', + entity_type: entityType, + reference_id: sub.referenceId, + }) } ) } catch (error) { @@ -914,6 +992,42 @@ export async function handleInvoicePaymentFailed(event: Stripe.Event) { resolutionSource, }) + // Best-effort instrumentation; its DB reads must never abort the + // user-blocking that follows, so the whole block is guarded. + try { + const failureOrgScoped = await isSubscriptionOrgScoped(sub) + const failureEntityType = failureOrgScoped ? 'organization' : 'user' + const failureActorId = await resolveBillingActorId(failureOrgScoped, sub.referenceId) + + recordAudit({ + actorId: failureActorId, + action: AuditAction.INVOICE_PAYMENT_FAILED, + resourceType: AuditResourceType.BILLING, + resourceId: invoice.id, + description: `Invoice payment of $${failedAmount.toFixed(2)} failed for ${failureEntityType} ${sub.referenceId} (attempt ${attemptCount})`, + metadata: { + entityType: failureEntityType, + referenceId: sub.referenceId, + plan: sub.plan, + amount: failedAmount, + currency: invoice.currency ?? 'usd', + attemptCount, + invoiceType: invoiceType ?? 'subscription', + invoiceId: invoice.id, + }, + }) + captureServerEvent(failureActorId, 'payment_failed', { + plan: sub.plan ?? 'unknown', + amount: failedAmount, + currency: invoice.currency ?? 'usd', + entity_type: failureEntityType, + reference_id: sub.referenceId, + attempt_count: attemptCount, + }) + } catch (auditError) { + logger.warn('Failed to record payment_failed instrumentation', { auditError }) + } + if (attemptCount >= 1) { logger.error('Payment failure - blocking users', { customerId, diff --git a/apps/sim/lib/billing/webhooks/subscription.ts b/apps/sim/lib/billing/webhooks/subscription.ts index 13b6b0517c9..8a022d8624f 100644 --- a/apps/sim/lib/billing/webhooks/subscription.ts +++ b/apps/sim/lib/billing/webhooks/subscription.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { member, subscription } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -18,6 +19,28 @@ import { detachOrganizationWorkspaces } from '@/lib/workspaces/organization-work const logger = createLogger('StripeSubscriptionWebhooks') +/** + * Resolve a real `user.id` to use as the audit actor for a subscription + * event. Org-scoped subscriptions resolve to the org owner; personally + * scoped subscriptions already reference a user. + */ +async function resolveSubscriptionActorId(referenceId: string): Promise { + try { + const rows = await db + .select({ userId: member.userId }) + .from(member) + .where(and(eq(member.organizationId, referenceId), eq(member.role, 'owner'))) + .limit(1) + return rows[0]?.userId ?? referenceId + } catch (error) { + logger.warn('Failed to resolve subscription actor; falling back to reference id', { + referenceId, + error, + }) + return referenceId + } +} + /** * Restore personal Pro subscriptions for every member of an organization * when the team/enterprise subscription ends. Errors propagate so the @@ -253,6 +276,19 @@ export async function handleSubscriptionCreated(subscriptionData: { } if (wasFreePreviously && isPaidPlan) { + const actorId = await resolveSubscriptionActorId(subscriptionData.referenceId) + recordAudit({ + actorId, + action: AuditAction.SUBSCRIPTION_CREATED, + resourceType: AuditResourceType.SUBSCRIPTION, + resourceId: subscriptionData.id, + description: `Subscription created on ${subscriptionData.plan ?? 'unknown'} plan for ${subscriptionData.referenceId}`, + metadata: { + plan: subscriptionData.plan, + status: subscriptionData.status, + referenceId: subscriptionData.referenceId, + }, + }) captureServerEvent(subscriptionData.referenceId, 'subscription_created', { plan: subscriptionData.plan ?? 'unknown', status: subscriptionData.status, @@ -329,6 +365,19 @@ export async function handleSubscriptionDeleted( ...dormantResult, }) + const enterpriseActorId = await resolveSubscriptionActorId(subscription.referenceId) + recordAudit({ + actorId: enterpriseActorId, + action: AuditAction.SUBSCRIPTION_CANCELLED, + resourceType: AuditResourceType.SUBSCRIPTION, + resourceId: subscription.id, + description: `Enterprise subscription cancelled for ${subscription.referenceId}`, + metadata: { + plan: subscription.plan, + referenceId: subscription.referenceId, + kind: 'enterprise', + }, + }) captureServerEvent(subscription.referenceId, 'subscription_cancelled', { plan: subscription.plan ?? 'unknown', reference_id: subscription.referenceId, @@ -451,6 +500,19 @@ export async function handleSubscriptionDeleted( workspacesDetached, }) + const cancelActorId = await resolveSubscriptionActorId(subscription.referenceId) + recordAudit({ + actorId: cancelActorId, + action: AuditAction.SUBSCRIPTION_CANCELLED, + resourceType: AuditResourceType.SUBSCRIPTION, + resourceId: subscription.id, + description: `Subscription cancelled on ${subscription.plan ?? 'unknown'} plan for ${subscription.referenceId}`, + metadata: { + plan: subscription.plan, + referenceId: subscription.referenceId, + totalOverage, + }, + }) captureServerEvent(subscription.referenceId, 'subscription_cancelled', { plan: subscription.plan ?? 'unknown', reference_id: subscription.referenceId, diff --git a/apps/sim/lib/copilot/chat/post.ts b/apps/sim/lib/copilot/chat/post.ts index ddd8ba36252..ca5943ae3b7 100644 --- a/apps/sim/lib/copilot/chat/post.ts +++ b/apps/sim/lib/copilot/chat/post.ts @@ -45,6 +45,7 @@ import type { ExecutionContext, OrchestratorResult } from '@/lib/copilot/request import { persistChatResources } from '@/lib/copilot/resources/persistence' import { prepareExecutionContext } from '@/lib/copilot/tools/handlers/context' import { getEffectiveDecryptedEnv } from '@/lib/environment/utils' +import { captureServerEvent } from '@/lib/posthog/server' import { resolveWorkflowIdForUser } from '@/lib/workflows/utils' import { getUserEntityPermissions, @@ -1070,6 +1071,19 @@ export async function handleUnifiedChatPost(req: NextRequest) { }, }) + captureServerEvent( + authenticatedUserId, + 'copilot_chat_sent', + { + ...(branch.kind === 'workflow' ? { workflow_id: branch.workflowId } : {}), + ...(workspaceId ? { workspace_id: workspaceId } : {}), + has_file_attachments: (body.fileAttachments?.length ?? 0) > 0, + has_contexts: normalizedContexts.length > 0, + mode: branch.kind === 'workflow' ? branch.mode : 'agent', + }, + workspaceId ? { groups: { workspace: workspaceId } } : undefined + ) + // Expose the root gen_ai.agent.execute span's trace identity to // the browser so subsequent HTTP calls (stop, abort, confirm, // SSE reconnect) can echo it back as `traceparent` — making diff --git a/apps/sim/lib/copilot/tools/handlers/management/manage-custom-tool.ts b/apps/sim/lib/copilot/tools/handlers/management/manage-custom-tool.ts index 9387e71f93e..1e22c7aabef 100644 --- a/apps/sim/lib/copilot/tools/handlers/management/manage-custom-tool.ts +++ b/apps/sim/lib/copilot/tools/handlers/management/manage-custom-tool.ts @@ -1,6 +1,8 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { createLogger } from '@sim/logger' import { getErrorMessage, toError } from '@sim/utils/errors' import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/request/types' +import { captureServerEvent } from '@/lib/posthog/server' import { deleteCustomTool, getCustomToolById, @@ -100,6 +102,30 @@ export async function executeManageCustomTool( }) const created = resultTools.find((tool) => tool.title === title) + recordAudit({ + workspaceId, + actorId: context.userId, + action: AuditAction.CUSTOM_TOOL_CREATED, + resourceType: AuditResourceType.CUSTOM_TOOL, + resourceId: created?.id, + resourceName: title, + description: `Created custom tool "${title}"`, + metadata: { source: 'tool_input' }, + }) + if (created?.id) { + captureServerEvent( + context.userId, + 'custom_tool_saved', + { + tool_id: created.id, + workspace_id: workspaceId, + tool_name: title, + source: 'tool_input', + }, + { groups: { workspace: workspaceId } } + ) + } + return { success: true, output: { @@ -148,6 +174,28 @@ export async function executeManageCustomTool( userId: context.userId, }) + recordAudit({ + workspaceId, + actorId: context.userId, + action: AuditAction.CUSTOM_TOOL_UPDATED, + resourceType: AuditResourceType.CUSTOM_TOOL, + resourceId: params.toolId, + resourceName: title, + description: `Updated custom tool "${title}"`, + metadata: { source: 'tool_input' }, + }) + captureServerEvent( + context.userId, + 'custom_tool_saved', + { + tool_id: params.toolId, + workspace_id: workspaceId, + tool_name: title, + source: 'tool_input', + }, + { groups: { workspace: workspaceId } } + ) + return { success: true, output: { @@ -182,6 +230,26 @@ export async function executeManageCustomTool( } } + for (const toolId of deleted) { + recordAudit({ + workspaceId: workspaceId ?? null, + actorId: context.userId, + action: AuditAction.CUSTOM_TOOL_DELETED, + resourceType: AuditResourceType.CUSTOM_TOOL, + resourceId: toolId, + description: 'Deleted custom tool', + metadata: { source: 'tool_input' }, + }) + if (workspaceId) { + captureServerEvent( + context.userId, + 'custom_tool_deleted', + { tool_id: toolId, workspace_id: workspaceId, source: 'tool_input' }, + { groups: { workspace: workspaceId } } + ) + } + } + return { success: deleted.length > 0, output: { diff --git a/apps/sim/lib/copilot/tools/handlers/management/manage-skill.ts b/apps/sim/lib/copilot/tools/handlers/management/manage-skill.ts index a9e0da7ece0..dc551e382e3 100644 --- a/apps/sim/lib/copilot/tools/handlers/management/manage-skill.ts +++ b/apps/sim/lib/copilot/tools/handlers/management/manage-skill.ts @@ -1,6 +1,8 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { createLogger } from '@sim/logger' import { getErrorMessage, toError } from '@sim/utils/errors' import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/request/types' +import { captureServerEvent } from '@/lib/posthog/server' import { deleteSkill, listSkills, upsertSkills } from '@/lib/workflows/skills/operations' const logger = createLogger('CopilotToolExecutor') @@ -79,6 +81,30 @@ export async function executeManageSkill( }) const created = resultSkills.find((s) => s.name === params.name) + recordAudit({ + workspaceId, + actorId: context.userId, + action: AuditAction.SKILL_CREATED, + resourceType: AuditResourceType.SKILL, + resourceId: created?.id, + resourceName: params.name, + description: `Created skill "${params.name}"`, + metadata: { source: 'tool_input' }, + }) + if (created?.id) { + captureServerEvent( + context.userId, + 'skill_created', + { + skill_id: created.id, + skill_name: params.name, + workspace_id: workspaceId, + source: 'tool_input', + }, + { groups: { workspace: workspaceId } } + ) + } + return { success: true, output: { @@ -121,6 +147,29 @@ export async function executeManageSkill( userId: context.userId, }) + const updatedName = params.name || found.name + recordAudit({ + workspaceId, + actorId: context.userId, + action: AuditAction.SKILL_UPDATED, + resourceType: AuditResourceType.SKILL, + resourceId: params.skillId, + resourceName: updatedName, + description: `Updated skill "${updatedName}"`, + metadata: { source: 'tool_input' }, + }) + captureServerEvent( + context.userId, + 'skill_updated', + { + skill_id: params.skillId, + skill_name: updatedName, + workspace_id: workspaceId, + source: 'tool_input', + }, + { groups: { workspace: workspaceId } } + ) + return { success: true, output: { @@ -143,6 +192,22 @@ export async function executeManageSkill( return { success: false, error: `Skill not found: ${params.skillId}` } } + recordAudit({ + workspaceId, + actorId: context.userId, + action: AuditAction.SKILL_DELETED, + resourceType: AuditResourceType.SKILL, + resourceId: params.skillId, + description: 'Deleted skill', + metadata: { source: 'tool_input' }, + }) + captureServerEvent( + context.userId, + 'skill_deleted', + { skill_id: params.skillId, workspace_id: workspaceId, source: 'tool_input' }, + { groups: { workspace: workspaceId } } + ) + return { success: true, output: { diff --git a/apps/sim/lib/copilot/tools/server/table/user-table.ts b/apps/sim/lib/copilot/tools/server/table/user-table.ts index ef1a8fed1b2..f23d2ec83f0 100644 --- a/apps/sim/lib/copilot/tools/server/table/user-table.ts +++ b/apps/sim/lib/copilot/tools/server/table/user-table.ts @@ -1,3 +1,4 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' import { generateId } from '@sim/utils/id' @@ -9,6 +10,7 @@ import { } from '@/lib/copilot/tools/server/base-tool' import { isTriggerDevEnabled } from '@/lib/core/config/env-flags' import { runDetached } from '@/lib/core/utils/background' +import { captureServerEvent } from '@/lib/posthog/server' import { buildAutoMapping, COLUMN_TYPES, @@ -373,6 +375,17 @@ export const userTableServerTool: BaseServerTool requestId ) + recordAudit({ + workspaceId, + actorId: context.userId, + action: AuditAction.TABLE_CREATED, + resourceType: AuditResourceType.TABLE, + resourceId: table.id, + resourceName: table.name, + description: `Created table "${table.name}"`, + metadata: { source: 'tool_input' }, + }) + return { success: true, message: `Created table "${table.name}" (${table.id})`, @@ -445,7 +458,13 @@ export const userTableServerTool: BaseServerTool const requestId = generateId().slice(0, 8) assertNotAborted() - await deleteTable(tableId, requestId) + await deleteTable(tableId, requestId, context.userId) + captureServerEvent( + context.userId, + 'table_deleted', + { table_id: tableId, workspace_id: workspaceId }, + { groups: { workspace: workspaceId } } + ) deleted.push(tableId) } @@ -894,6 +913,21 @@ export const userTableServerTool: BaseServerTool await releaseJobClaim(table.id, inlineDeleteId).catch(() => {}) } + recordAudit({ + workspaceId, + actorId: context.userId, + action: AuditAction.TABLE_UPDATED, + resourceType: AuditResourceType.TABLE, + resourceId: table.id, + resourceName: table.name, + description: `Deleted ${result.affectedCount} row(s) from table "${table.name}"`, + metadata: { + op: 'bulk_delete', + rowsDeleted: result.affectedCount, + source: 'tool_input', + }, + }) + return { success: true, message: `Deleted ${result.affectedCount} rows`, @@ -996,6 +1030,16 @@ export const userTableServerTool: BaseServerTool requestId ) + recordAudit({ + workspaceId, + actorId: context.userId, + action: AuditAction.TABLE_UPDATED, + resourceType: AuditResourceType.TABLE, + resourceId: args.tableId, + description: `Deleted ${result.deletedCount} row(s)`, + metadata: { op: 'bulk_delete', rowsDeleted: result.deletedCount, source: 'tool_input' }, + }) + return { success: true, message: `Deleted ${result.deletedCount} rows`, @@ -1530,7 +1574,7 @@ export const userTableServerTool: BaseServerTool const requestId = generateId().slice(0, 8) assertNotAborted() - const renamed = await renameTable(args.tableId, newName, requestId) + const renamed = await renameTable(args.tableId, newName, requestId, context.userId) return { success: true, diff --git a/apps/sim/lib/invitations/core.test.ts b/apps/sim/lib/invitations/core.test.ts index 02cfaa0b2cc..6168529fced 100644 --- a/apps/sim/lib/invitations/core.test.ts +++ b/apps/sim/lib/invitations/core.test.ts @@ -316,6 +316,7 @@ describe('acceptInvitation', () => { expect(mockReconcileOrganizationSeats).toHaveBeenCalledWith({ organizationId: 'org-new', reason: 'member-accepted-invite', + actorId: 'invitee-user', }) expect(mockSetActiveOrganizationForCurrentSession).toHaveBeenCalledWith('org-new') }) diff --git a/apps/sim/lib/invitations/core.ts b/apps/sim/lib/invitations/core.ts index ae99f6f3b11..2e245865831 100644 --- a/apps/sim/lib/invitations/core.ts +++ b/apps/sim/lib/invitations/core.ts @@ -374,6 +374,25 @@ export async function acceptInvitation( } else { membershipAlreadyExists = membershipResult.alreadyMember + if (!membershipResult.alreadyMember) { + const memberRole = inv.role || 'member' + recordAudit({ + workspaceId: null, + actorId: input.userId, + action: AuditAction.ORG_MEMBER_ADDED, + resourceType: AuditResourceType.ORGANIZATION, + resourceId: targetOrganizationId, + description: `Joined organization as ${memberRole} via invite acceptance`, + metadata: { invitationId: inv.id, memberRole }, + }) + captureServerEvent( + input.userId, + 'org_member_added', + { organization_id: targetOrganizationId, member_role: memberRole }, + { groups: { organization: targetOrganizationId } } + ) + } + // Grow the paid seat count to match the new member and push the charge // to Stripe asynchronously (Team plans only; Enterprise seats are // fixed). Best-effort: the member is already in, and a transient @@ -381,35 +400,11 @@ export async function acceptInvitation( // removal path's seat accounting. if (billingManagesSeats && !membershipResult.alreadyMember) { try { - const seatResult = await reconcileOrganizationSeats({ + await reconcileOrganizationSeats({ organizationId: targetOrganizationId, reason: 'member-accepted-invite', + actorId: input.userId, }) - - if (seatResult.changed) { - const previousSeats = seatResult.previousSeats ?? 0 - const seats = seatResult.seats ?? 0 - recordAudit({ - workspaceId: null, - actorId: input.userId, - action: AuditAction.ORG_SEAT_PROVISIONED, - resourceType: AuditResourceType.ORGANIZATION, - resourceId: targetOrganizationId, - description: `Provisioned ${seats} seat(s) after invite acceptance`, - metadata: { - invitationId: inv.id, - previousSeats, - seats, - reason: 'member-accepted-invite', - }, - }) - captureServerEvent(input.userId, 'seats_provisioned', { - organization_id: targetOrganizationId, - previous_seats: previousSeats, - seats, - reason: 'member-accepted-invite', - }) - } } catch (seatError) { logger.error('Failed to reconcile organization seats after invite acceptance', { userId: input.userId, diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index 8accb7a6e85..4755d6f0a80 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -14,7 +14,14 @@ import { generateId } from '@sim/utils/id' import { tasks } from '@trigger.dev/sdk' import { and, asc, desc, eq, inArray, isNotNull, isNull, type SQL, sql } from 'drizzle-orm' import { checkActorUsageLimits } from '@/lib/billing/calculations/usage-monitor' +import type { HighestPrioritySubscription } from '@/lib/billing/core/plan' +import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' import { recordUsage } from '@/lib/billing/core/usage-log' +import { + checkStorageQuota, + decrementStorageUsageInTx, + incrementStorageUsage, +} from '@/lib/billing/storage' import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing' import type { ChunkingStrategy, StrategyOptions } from '@/lib/chunkers/types' import { resolveTriggerRegion } from '@/lib/core/async-jobs/region' @@ -870,7 +877,9 @@ export async function createDocumentRecords( requestId: string, uploadedBy: string | null = null ): Promise { - return await db.transaction(async (tx) => { + let storageBilling: { userId: string; workspaceId: string | null; bytes: number } | null = null + + const returnData = await db.transaction(async (tx) => { await tx.execute(sql`SELECT 1 FROM knowledge_base WHERE id = ${knowledgeBaseId} FOR UPDATE`) const kb = await tx @@ -896,6 +905,16 @@ export async function createDocumentRecords( tx ) + const billedUserId = uploadedBy ?? kb[0].userId + const totalBytes = documents.reduce((sum, docData) => sum + (docData.fileSize || 0), 0) + if (totalBytes > 0) { + const quotaCheck = await checkStorageQuota(billedUserId, totalBytes) + if (!quotaCheck.allowed) { + throw new Error(quotaCheck.error || 'Storage limit exceeded') + } + storageBilling = { userId: billedUserId, workspaceId: kbWorkspaceId, bytes: totalBytes } + } + // One load per batch (was N+1); skip entirely if no doc carries tags. const hasTaggedDocs = documents.some((d) => d.documentTagsData) const tagDefinitions = hasTaggedDocs @@ -984,6 +1003,17 @@ export async function createDocumentRecords( return returnData }) + + if (storageBilling) { + const billing: { userId: string; workspaceId: string | null; bytes: number } = storageBilling + try { + await incrementStorageUsage(billing.userId, billing.bytes, billing.workspaceId ?? undefined) + } catch (storageError) { + logger.error(`[${requestId}] Failed to update storage tracking:`, storageError) + } + } + + return returnData } export async function getDocuments( @@ -1302,6 +1332,8 @@ export async function createSingleDocument( ...processedTags, } + let storageBilling: { userId: string; workspaceId: string | null; bytes: number } | null = null + await db.transaction(async (tx) => { await tx.execute(sql`SELECT 1 FROM knowledge_base WHERE id = ${knowledgeBaseId} FOR UPDATE`) @@ -1327,6 +1359,19 @@ export async function createSingleDocument( tx ) + const billedUserId = uploadedBy ?? kb[0].userId + if (documentData.fileSize > 0) { + const quotaCheck = await checkStorageQuota(billedUserId, documentData.fileSize) + if (!quotaCheck.allowed) { + throw new Error(quotaCheck.error || 'Storage limit exceeded') + } + storageBilling = { + userId: billedUserId, + workspaceId: kb[0].workspaceId, + bytes: documentData.fileSize, + } + } + await tx.insert(document).values(newDocument) await tx @@ -1334,6 +1379,16 @@ export async function createSingleDocument( .set({ updatedAt: now }) .where(eq(knowledgeBase.id, knowledgeBaseId)) }) + + if (storageBilling) { + const billing: { userId: string; workspaceId: string | null; bytes: number } = storageBilling + try { + await incrementStorageUsage(billing.userId, billing.bytes, billing.workspaceId ?? undefined) + } catch (storageError) { + logger.error(`[${requestId}] Failed to update storage tracking:`, storageError) + } + } + logger.info(`[${requestId}] Document created: ${documentId} in knowledge base ${knowledgeBaseId}`) return newDocument as { @@ -1980,7 +2035,11 @@ export async function hardDeleteDocuments( .select({ id: document.id, fileUrl: document.fileUrl, + fileSize: document.fileSize, + uploadedBy: document.uploadedBy, + connectorId: document.connectorId, workspaceId: knowledgeBase.workspaceId, + kbUserId: knowledgeBase.userId, }) .from(document) .innerJoin(knowledgeBase, eq(document.knowledgeBaseId, knowledgeBase.id)) @@ -1992,18 +2051,51 @@ export async function hardDeleteDocuments( const existingIds = documentsToDelete.map((doc) => doc.id) + // Resolve owner subscriptions before the transaction (reads). Connector-synced + // documents are never metered at ingest, so they are excluded from billing here. + const candidateUserIds = new Set() + for (const doc of documentsToDelete) { + if (doc.connectorId || doc.fileSize <= 0) continue + const billedUserId = doc.uploadedBy ?? doc.kbUserId + if (billedUserId) candidateUserIds.add(billedUserId) + } + const subByUser = new Map() + for (const billedUserId of candidateUserIds) { + subByUser.set(billedUserId, await getHighestPrioritySubscription(billedUserId)) + } + + // Key everything off the rows this tx actually deleted (`returning()`) so a + // concurrent delete that claimed some ids first isn't double-counted here. + let deletedDocs: typeof documentsToDelete = [] await db.transaction(async (tx) => { await tx.delete(embedding).where(inArray(embedding.documentId, existingIds)) - await tx.delete(document).where(inArray(document.id, existingIds)) + const deletedRows = await tx + .delete(document) + .where(inArray(document.id, existingIds)) + .returning({ id: document.id }) + + const deletedIds = new Set(deletedRows.map((row) => row.id)) + deletedDocs = documentsToDelete.filter((doc) => deletedIds.has(doc.id)) + + const bytesByUser = new Map() + for (const doc of deletedDocs) { + if (doc.connectorId || doc.fileSize <= 0) continue + const billedUserId = doc.uploadedBy ?? doc.kbUserId + if (!billedUserId) continue + bytesByUser.set(billedUserId, (bytesByUser.get(billedUserId) ?? 0) + doc.fileSize) + } + for (const [billedUserId, bytes] of bytesByUser) { + await decrementStorageUsageInTx(tx, subByUser.get(billedUserId) ?? null, billedUserId, bytes) + } }) - await deleteDocumentStorageFiles(documentsToDelete, requestId) + await deleteDocumentStorageFiles(deletedDocs, requestId) - logger.info(`[${requestId}] Hard deleted ${existingIds.length} documents`, { - documentIds: existingIds, + logger.info(`[${requestId}] Hard deleted ${deletedDocs.length} documents`, { + documentIds: deletedDocs.map((doc) => doc.id), }) - return existingIds.length + return deletedDocs.length } export async function deleteDocument( diff --git a/apps/sim/lib/posthog/events.ts b/apps/sim/lib/posthog/events.ts index a408325000e..a3765d44eb0 100644 --- a/apps/sim/lib/posthog/events.ts +++ b/apps/sim/lib/posthog/events.ts @@ -292,14 +292,16 @@ export interface PostHogEventMap { } api_key_created: { - workspace_id: string + workspace_id?: string key_name: string + scope?: 'workspace' | 'personal' source?: 'settings' | 'deploy_modal' } api_key_revoked: { - workspace_id: string + workspace_id?: string key_name: string + scope?: 'workspace' | 'personal' } mcp_server_connected: { @@ -339,13 +341,15 @@ export interface PostHogEventMap { } environment_updated: { - workspace_id: string + workspace_id?: string key_count: number + scope?: 'workspace' | 'personal' } environment_deleted: { - workspace_id: string + workspace_id?: string key_count: number + scope?: 'workspace' | 'personal' } seats_provisioned: { @@ -356,8 +360,8 @@ export interface PostHogEventMap { } copilot_chat_sent: { - workflow_id: string - workspace_id: string + workflow_id?: string + workspace_id?: string has_file_attachments: boolean has_contexts: boolean mode: string @@ -443,7 +447,7 @@ export interface PostHogEventMap { } file_downloaded: { - workspace_id: string + workspace_id?: string is_bulk: boolean file_count: number } @@ -653,6 +657,119 @@ export interface PostHogEventMap { file_name: string file_size: number } + + organization_created: { + organization_id: string + name?: string + } + + /** Org membership lifecycle (distinct from workspace-level membership). */ + org_member_added: { + organization_id: string + member_role: string + } + + org_member_removed: { + organization_id: string + is_self_removal: boolean + } + + org_member_role_changed: { + organization_id: string + new_role: string + } + + /** Org seat count decreased (member removal / drift reconciliation). */ + seats_deprovisioned: { + organization_id: string + previous_seats: number + seats: number + reason: string + } + + /** A workflow's edit-lock was toggled on or off. */ + workflow_lock_toggled: { + workflow_id: string + workspace_id?: string + locked: boolean + } + + workflow_schedule_created: { + workflow_id: string + workspace_id: string + } + + workflow_schedule_deleted: { + workflow_id: string + workspace_id: string + } + + /** A stored credential's plaintext secret was deliberately retrieved via the token API. */ + credential_used: { + credential_type: 'oauth' | 'env_workspace' | 'env_personal' | 'service_account' + provider_id: string + workspace_id?: string + } + + payment_succeeded: { + plan: string + amount: number + currency: string + entity_type: 'user' | 'organization' + reference_id: string + } + + payment_failed: { + plan: string + amount: number + currency: string + entity_type: 'user' | 'organization' + reference_id: string + attempt_count: number + } + + overage_billed: { + amount: number + currency: string + entity_type: 'user' | 'organization' + reference_id: string + settled_via: 'stripe' | 'credits' + } + + credits_purchased: { + amount: number + currency: string + entity_type: 'user' | 'organization' + reference_id: string + } + + charge_disputed: { + amount: number + currency: string + reason: string + status: 'opened' | 'closed' + entity_type: 'user' | 'organization' + reference_id: string + } + + plan_converted: { + organization_id: string + from_plan: string + to_plan: string + } + + enterprise_subscription_created: { + reference_id: string + seats: number + monthly_price: number + currency: string + } + + subscription_transferred: { + subscription_id: string + from_entity: 'user' | 'organization' + to_entity: 'user' | 'organization' + } } export type PostHogEventName = keyof PostHogEventMap diff --git a/apps/sim/lib/table/import-data.ts b/apps/sim/lib/table/import-data.ts index 52343113892..01740624e86 100644 --- a/apps/sim/lib/table/import-data.ts +++ b/apps/sim/lib/table/import-data.ts @@ -14,7 +14,7 @@ import { CSV_MAX_BATCH_SIZE } from '@/lib/table/import' import { nKeysBetween } from '@/lib/table/order-key' import { acquireRowOrderLock } from '@/lib/table/rows/ordering' import { batchInsertRowsWithTx, replaceTableRowsWithTx } from '@/lib/table/rows/service' -import { addTableColumnsWithTx } from '@/lib/table/service' +import { addTableColumnsWithTx, auditTableColumnsAdded } from '@/lib/table/service' import type { ReplaceRowsResult, RowData, @@ -124,9 +124,18 @@ export async function deleteAllTableRows(tableId: string): Promise { export async function addImportColumns( table: TableDefinition, additions: { name: string; type: string }[], - requestId: string + requestId: string, + actingUserId?: string ): Promise { - return db.transaction((trx) => addTableColumnsWithTx(trx, table, additions, requestId)) + const updated = await db.transaction((trx) => + addTableColumnsWithTx(trx, table, additions, requestId) + ) + auditTableColumnsAdded( + table, + additions.map((c) => c.name), + actingUserId + ) + return updated } /** Overwrites a table's schema during an import (used when inferring columns from the file). */ @@ -179,6 +188,14 @@ export async function importAppendRows( } return { inserted, table: working } }) + // Audit post-commit — a mid-import rollback means the columns weren't added. + if (additions.length > 0) { + auditTableColumnsAdded( + table, + additions.map((c) => c.name), + ctx.userId + ) + } notifyTableRowUsage({ workspaceId: ctx.workspaceId, currentRowCount: table.rowCount, @@ -218,6 +235,14 @@ export async function importReplaceRows( requestId ) }) + // Audit post-commit (see importAppendRows). + if (additions.length > 0) { + auditTableColumnsAdded( + table, + additions.map((c) => c.name), + data.userId + ) + } notifyTableRowUsage({ workspaceId: data.workspaceId, currentRowCount: 0, diff --git a/apps/sim/lib/table/import-runner.ts b/apps/sim/lib/table/import-runner.ts index 13887dd1c10..91eb44df318 100644 --- a/apps/sim/lib/table/import-runner.ts +++ b/apps/sim/lib/table/import-runner.ts @@ -173,7 +173,7 @@ export async function runTableImport(payload: TableImportPayload): Promise additions.push({ name: columnName, type: inferColumnType(sample.map((r) => r[header])) }) updatedMapping[header] = columnName } - const updated = await addImportColumns(table, additions, requestId) + const updated = await addImportColumns(table, additions, requestId, userId) targetSchema = updated.schema effectiveMapping = updatedMapping } diff --git a/apps/sim/lib/table/service.ts b/apps/sim/lib/table/service.ts index 0f70cbab1aa..0c13607f8b2 100644 --- a/apps/sim/lib/table/service.ts +++ b/apps/sim/lib/table/service.ts @@ -7,6 +7,7 @@ * Note: API routes have their own implementations for HTTP-specific concerns. */ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { tableJobs, userTableDefinitions, userTableRows } from '@sim/db/schema' import { createLogger } from '@sim/logger' @@ -486,6 +487,31 @@ export async function addTableColumnsWithTx( } } +/** + * Records the "columns added" audit for a table. The column add shares a + * transaction with the import's row inserts, so the caller MUST emit this only + * AFTER that transaction commits — auditing inside the tx would log a false + * success if a later row batch rolls it back. Skipped when no actor resolves. + */ +export function auditTableColumnsAdded( + table: TableDefinition, + columnNames: string[], + actingUserId?: string +): void { + const actorId = actingUserId ?? table.createdBy + if (!actorId || columnNames.length === 0) return + recordAudit({ + workspaceId: table.workspaceId ?? null, + actorId, + action: AuditAction.TABLE_UPDATED, + resourceType: AuditResourceType.TABLE, + resourceId: table.id, + resourceName: table.name, + description: `Added ${columnNames.length} column(s) to table "${table.name}"`, + metadata: { op: 'add_columns', columns: columnNames }, + }) +} + /** * Renames a table. * @@ -498,7 +524,8 @@ export async function addTableColumnsWithTx( export async function renameTable( tableId: string, newName: string, - requestId: string + requestId: string, + actingUserId?: string ): Promise<{ id: string; name: string }> { const nameValidation = validateTableName(newName) if (!nameValidation.valid) { @@ -511,12 +538,31 @@ export async function renameTable( .update(userTableDefinitions) .set({ name: newName, updatedAt: now }) .where(eq(userTableDefinitions.id, tableId)) - .returning({ id: userTableDefinitions.id }) + .returning({ + id: userTableDefinitions.id, + createdBy: userTableDefinitions.createdBy, + workspaceId: userTableDefinitions.workspaceId, + }) if (result.length === 0) { throw new Error(`Table ${tableId} not found`) } + const { createdBy, workspaceId } = result[0] + const renameActorId = actingUserId ?? createdBy + if (renameActorId) { + recordAudit({ + workspaceId: workspaceId ?? null, + actorId: renameActorId, + action: AuditAction.TABLE_UPDATED, + resourceType: AuditResourceType.TABLE, + resourceId: tableId, + resourceName: newName, + description: `Renamed table to "${newName}"`, + metadata: { op: 'rename' }, + }) + } + logger.info(`[${requestId}] Renamed table ${tableId} to "${newName}"`) return { id: tableId, name: newName } } catch (error: unknown) { @@ -598,11 +644,36 @@ export async function updateTableMetadata( * @param tableId - Table ID to delete * @param requestId - Request ID for logging */ -export async function deleteTable(tableId: string, requestId: string): Promise { - await db +export async function deleteTable( + tableId: string, + requestId: string, + actingUserId?: string +): Promise { + const now = new Date() + const result = await db .update(userTableDefinitions) - .set({ archivedAt: new Date(), updatedAt: new Date() }) + .set({ archivedAt: now, updatedAt: now }) .where(eq(userTableDefinitions.id, tableId)) + .returning({ + createdBy: userTableDefinitions.createdBy, + workspaceId: userTableDefinitions.workspaceId, + name: userTableDefinitions.name, + }) + + const deleted = result[0] + // Audit only genuine user deletes — rollback callers omit `actingUserId`. The + // caller emits the `table_deleted` PostHog event, so it is not duplicated here. + if (deleted && actingUserId) { + recordAudit({ + workspaceId: deleted.workspaceId ?? null, + actorId: actingUserId, + action: AuditAction.TABLE_DELETED, + resourceType: AuditResourceType.TABLE, + resourceId: tableId, + resourceName: deleted.name, + description: `Archived table "${deleted.name}"`, + }) + } logger.info(`[${requestId}] Archived table ${tableId}`) } diff --git a/apps/sim/lib/workflows/orchestration/workflow-lifecycle.ts b/apps/sim/lib/workflows/orchestration/workflow-lifecycle.ts index d3c38cdbbb0..15917517fd4 100644 --- a/apps/sim/lib/workflows/orchestration/workflow-lifecycle.ts +++ b/apps/sim/lib/workflows/orchestration/workflow-lifecycle.ts @@ -7,6 +7,7 @@ import { toError } from '@sim/utils/errors' import { generateId } from '@sim/utils/id' import { and, eq, isNull, min, ne } from 'drizzle-orm' import { generateRequestId } from '@/lib/core/utils/request' +import { captureServerEvent } from '@/lib/posthog/server' import { buildDefaultWorkflowArtifacts } from '@/lib/workflows/defaults' import { archiveWorkflow, restoreWorkflow } from '@/lib/workflows/lifecycle' import type { OrchestrationErrorCode } from '@/lib/workflows/orchestration/types' @@ -51,6 +52,8 @@ export interface PerformUpdateWorkflowParams { workspaceId: string currentName: string currentFolderId?: string | null + /** Prior `locked` value, used to detect lock-state transitions for instrumentation. */ + currentLocked?: boolean | null name?: string description?: string | null folderId?: string | null @@ -338,6 +341,31 @@ export async function performUpdateWorkflow( updates: updateData, }) + if (params.locked !== undefined && params.locked !== (params.currentLocked ?? false)) { + const workspaceId = updatedWorkflow.workspaceId + recordAudit({ + workspaceId: workspaceId ?? null, + actorId: params.userId, + action: params.locked ? AuditAction.WORKFLOW_LOCKED : AuditAction.WORKFLOW_UNLOCKED, + resourceType: AuditResourceType.WORKFLOW, + resourceId: params.workflowId, + resourceName: updatedWorkflow.name, + description: `${params.locked ? 'Locked' : 'Unlocked'} workflow "${updatedWorkflow.name}"`, + metadata: { locked: params.locked }, + }) + + captureServerEvent( + params.userId, + 'workflow_lock_toggled', + { + workflow_id: params.workflowId, + ...(workspaceId ? { workspace_id: workspaceId } : {}), + locked: params.locked, + }, + workspaceId ? { groups: { workspace: workspaceId } } : undefined + ) + } + return { success: true, workflow: updatedWorkflow } } catch (error) { logger.error(`[${requestId}] Failed to update workflow ${params.workflowId}`, { error }) diff --git a/apps/sim/lib/workflows/schedules/orchestration.ts b/apps/sim/lib/workflows/schedules/orchestration.ts index 36a9662e1ee..47da9c5e71d 100644 --- a/apps/sim/lib/workflows/schedules/orchestration.ts +++ b/apps/sim/lib/workflows/schedules/orchestration.ts @@ -255,6 +255,15 @@ export async function performCreateJob( { groups: { workspace: params.workspaceId } } ) + if (schedule?.workflowId) { + captureServerEvent( + params.userId, + 'workflow_schedule_created', + { workflow_id: schedule.workflowId, workspace_id: params.workspaceId }, + { groups: { workspace: params.workspaceId } } + ) + } + return { success: true, schedule, humanReadable } } catch (error) { logger.error('Failed to create job', { error: toError(error).message }) @@ -503,6 +512,15 @@ export async function performDeleteJob( { groups: { workspace: params.workspaceId } } ) + if (job.workflowId) { + captureServerEvent( + params.userId, + 'workflow_schedule_deleted', + { workflow_id: job.workflowId, workspace_id: params.workspaceId }, + { groups: { workspace: params.workspaceId } } + ) + } + return { success: true, schedule: job } } diff --git a/packages/audit/src/log.test.ts b/packages/audit/src/log.test.ts index 7f294499fb6..e47485ee085 100644 --- a/packages/audit/src/log.test.ts +++ b/packages/audit/src/log.test.ts @@ -318,12 +318,12 @@ describe('recordAudit', () => { ) }) - it('inserts without actor info when lookup fails', async () => { + it('nulls the actor FK when the lookup throws so the insert cannot FK-violate', async () => { dbChainMockFns.limit.mockRejectedValue(new Error('DB down')) recordAudit({ workspaceId: 'ws-1', - actorId: 'user-1', + actorId: 'admin-api', action: AuditAction.KNOWLEDGE_BASE_CREATED, resourceType: AuditResourceType.KNOWLEDGE_BASE, }) @@ -333,14 +333,14 @@ describe('recordAudit', () => { expect(dbChainMockFns.select).toHaveBeenCalledTimes(1) expect(dbChainMockFns.values).toHaveBeenCalledWith( expect.objectContaining({ - actorId: 'user-1', - actorName: undefined, + actorId: null, + actorName: 'Admin API', actorEmail: undefined, }) ) }) - it('sets actor info to null when user is not found', async () => { + it('nulls the actor FK and labels it System when the user is not found', async () => { dbChainMockFns.limit.mockResolvedValue([]) recordAudit({ @@ -355,8 +355,29 @@ describe('recordAudit', () => { expect(dbChainMockFns.select).toHaveBeenCalledTimes(1) expect(dbChainMockFns.values).toHaveBeenCalledWith( expect.objectContaining({ - actorId: 'deleted-user', - actorName: undefined, + actorId: null, + actorName: 'System', + actorEmail: undefined, + }) + ) + }) + + it('labels the admin-api system actor while nulling its FK', async () => { + dbChainMockFns.limit.mockResolvedValue([]) + + recordAudit({ + workspaceId: 'ws-1', + actorId: 'admin-api', + action: AuditAction.WORKFLOW_DELETED, + resourceType: AuditResourceType.WORKFLOW, + }) + + await flush() + + expect(dbChainMockFns.values).toHaveBeenCalledWith( + expect.objectContaining({ + actorId: null, + actorName: 'Admin API', actorEmail: undefined, }) ) diff --git a/packages/audit/src/log.ts b/packages/audit/src/log.ts index 4ee040ba43a..3a9a6ef66d4 100644 --- a/packages/audit/src/log.ts +++ b/packages/audit/src/log.ts @@ -8,7 +8,13 @@ const logger = createLogger('AuditLog') interface AuditLogParams { workspaceId?: string | null - actorId: string + /** + * The acting user's id (FK to `user.id`). Pass `null` for genuinely + * actor-less events such as anonymous public-share access — the row is then + * persisted with a null actor and the forensic context (ip/user-agent, + * metadata) carries the trail instead. + */ + actorId: string | null action: AuditActionType resourceType: AuditResourceTypeValue resourceId?: string @@ -44,24 +50,41 @@ async function insertAuditLog(params: AuditLogParams): Promise { let { actorName, actorEmail } = params - if (actorName === undefined && actorEmail === undefined && params.actorId) { + /** + * `actorId` is a FK to `user.id`. System actors (e.g. the shared `'admin-api'` + * key) have no user row, so we persist a null FK with a readable label instead + * of letting the insert fail. When the caller already supplies actorName/Email + * we trust the id is a real user and skip the lookup. + */ + let actorId: string | null = params.actorId + + if (actorName === undefined && actorEmail === undefined && actorId) { try { const [row] = await db .select({ name: user.name, email: user.email }) .from(user) - .where(eq(user.id, params.actorId)) + .where(eq(user.id, actorId)) .limit(1) - actorName = row?.name ?? undefined - actorEmail = row?.email ?? undefined + if (row) { + actorName = row.name ?? undefined + actorEmail = row.email ?? undefined + } else { + actorName = actorId === 'admin-api' ? 'Admin API' : 'System' + actorId = null + } } catch (error) { - logger.warn('Failed to resolve actor info', { error, actorId: params.actorId }) + // Couldn't confirm the user exists — null the FK so the insert can't violate + // it (system actor like 'admin-api', or a deleted user); the label remains. + logger.warn('Failed to resolve actor info', { error, actorId }) + actorName = actorId === 'admin-api' ? 'Admin API' : 'System' + actorId = null } } await db.insert(auditLog).values({ id: generateShortId(), workspaceId: params.workspaceId || null, - actorId: params.actorId, + actorId, action: params.action, resourceType: params.resourceType, resourceId: params.resourceId, diff --git a/packages/audit/src/types.ts b/packages/audit/src/types.ts index 0d5bbe02388..cbb93dfef53 100644 --- a/packages/audit/src/types.ts +++ b/packages/audit/src/types.ts @@ -33,6 +33,18 @@ export const AuditAction = { // Billing CREDIT_PURCHASED: 'credit.purchased', + CREDIT_ISSUED: 'credit.issued', + INVOICE_PAYMENT_SUCCEEDED: 'invoice.payment_succeeded', + INVOICE_PAYMENT_FAILED: 'invoice.payment_failed', + OVERAGE_BILLED: 'billing.overage_billed', + CHARGE_DISPUTE_OPENED: 'charge.dispute_opened', + CHARGE_DISPUTE_CLOSED: 'charge.dispute_closed', + + // Subscriptions + SUBSCRIPTION_CREATED: 'subscription.created', + SUBSCRIPTION_CANCELLED: 'subscription.cancelled', + SUBSCRIPTION_TRANSFERRED: 'subscription.transferred', + ENTERPRISE_SUBSCRIPTION_PROVISIONED: 'subscription.enterprise_provisioned', // Credential Sets CREDENTIAL_SET_CREATED: 'credential_set.created', @@ -66,6 +78,7 @@ export const AuditAction = { FILE_MOVED: 'file.moved', FILE_SHARED: 'file.shared', FILE_SHARE_DISABLED: 'file.share_disabled', + FILE_DOWNLOADED: 'file.downloaded', // Folders FOLDER_CREATED: 'folder.created', @@ -112,6 +125,7 @@ export const AuditAction = { CREDENTIAL_RENAMED: 'credential.renamed', CREDENTIAL_RECONNECTED: 'credential.reconnected', CREDENTIAL_DELETED: 'credential.deleted', + CREDENTIAL_ACCESSED: 'credential.accessed', CREDENTIAL_MEMBER_ADDED: 'credential_member.added', CREDENTIAL_MEMBER_REMOVED: 'credential_member.removed', CREDENTIAL_MEMBER_ROLE_CHANGED: 'credential_member.role_changed', @@ -135,6 +149,7 @@ export const AuditAction = { ORG_INVITATION_REVOKED: 'org_invitation.revoked', ORG_INVITATION_RESENT: 'org_invitation.resent', ORG_SEAT_PROVISIONED: 'org_seat.provisioned', + ORG_SEAT_DEPROVISIONED: 'org_seat.deprovisioned', ORG_PLAN_CONVERTED: 'org_plan.converted', // Permission Groups @@ -159,6 +174,7 @@ export const AuditAction = { TABLE_UPDATED: 'table.updated', TABLE_DELETED: 'table.deleted', TABLE_RESTORED: 'table.restored', + TABLE_EXPORTED: 'table.exported', // Webhooks WEBHOOK_CREATED: 'webhook.created', @@ -176,6 +192,8 @@ export const AuditAction = { WORKFLOW_LOCKED: 'workflow.locked', WORKFLOW_UNLOCKED: 'workflow.unlocked', WORKFLOW_VARIABLES_UPDATED: 'workflow.variables_updated', + WORKFLOW_PUBLIC_API_TOGGLED: 'workflow.public_api_toggled', + WORKFLOW_EXPORTED: 'workflow.exported', // Workspaces WORKSPACE_CREATED: 'workspace.created', @@ -185,6 +203,7 @@ export const AuditAction = { WORKSPACE_FORKED: 'workspace.forked', WORKSPACE_FORK_PROMOTED: 'workspace.fork_promoted', WORKSPACE_FORK_ROLLED_BACK: 'workspace.fork_rolled_back', + WORKSPACE_EXPORTED: 'workspace.exported', } as const export type AuditActionType = (typeof AuditAction)[keyof typeof AuditAction] @@ -214,6 +233,7 @@ export const AuditResourceType = { PERMISSION_GROUP: 'permission_group', SCHEDULE: 'schedule', SKILL: 'skill', + SUBSCRIPTION: 'subscription', TABLE: 'table', WEBHOOK: 'webhook', WORKFLOW: 'workflow', diff --git a/packages/testing/src/mocks/audit.mock.ts b/packages/testing/src/mocks/audit.mock.ts index d7a16b82364..f517ea5b8da 100644 --- a/packages/testing/src/mocks/audit.mock.ts +++ b/packages/testing/src/mocks/audit.mock.ts @@ -157,6 +157,23 @@ export const auditMock = { WORKSPACE_FORKED: 'workspace.forked', WORKSPACE_FORK_PROMOTED: 'workspace.fork_promoted', WORKSPACE_FORK_ROLLED_BACK: 'workspace.fork_rolled_back', + WORKSPACE_EXPORTED: 'workspace.exported', + CREDIT_ISSUED: 'credit.issued', + INVOICE_PAYMENT_SUCCEEDED: 'invoice.payment_succeeded', + INVOICE_PAYMENT_FAILED: 'invoice.payment_failed', + OVERAGE_BILLED: 'billing.overage_billed', + CHARGE_DISPUTE_OPENED: 'charge.dispute_opened', + CHARGE_DISPUTE_CLOSED: 'charge.dispute_closed', + SUBSCRIPTION_CREATED: 'subscription.created', + SUBSCRIPTION_CANCELLED: 'subscription.cancelled', + SUBSCRIPTION_TRANSFERRED: 'subscription.transferred', + ENTERPRISE_SUBSCRIPTION_PROVISIONED: 'subscription.enterprise_provisioned', + CREDENTIAL_ACCESSED: 'credential.accessed', + FILE_DOWNLOADED: 'file.downloaded', + ORG_SEAT_DEPROVISIONED: 'org_seat.deprovisioned', + TABLE_EXPORTED: 'table.exported', + WORKFLOW_PUBLIC_API_TOGGLED: 'workflow.public_api_toggled', + WORKFLOW_EXPORTED: 'workflow.exported', }, AuditResourceType: { API_KEY: 'api_key', @@ -180,6 +197,7 @@ export const auditMock = { PERMISSION_GROUP: 'permission_group', SCHEDULE: 'schedule', SKILL: 'skill', + SUBSCRIPTION: 'subscription', TABLE: 'table', WEBHOOK: 'webhook', WORKFLOW: 'workflow',