Skip to content

Commit 7153141

Browse files
committed
Fix run workflow
1 parent 48c9a3a commit 7153141

File tree

9 files changed

+670
-16
lines changed

9 files changed

+670
-16
lines changed

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -62,20 +62,23 @@ const ExecuteWorkflowSchema = z.object({
6262
runFromBlock: z
6363
.object({
6464
startBlockId: z.string().min(1, 'Start block ID is required'),
65-
sourceSnapshot: z.object({
66-
blockStates: z.record(z.any()),
67-
executedBlocks: z.array(z.string()),
68-
blockLogs: z.array(z.any()),
69-
decisions: z.object({
70-
router: z.record(z.string()),
71-
condition: z.record(z.string()),
72-
}),
73-
completedLoops: z.array(z.string()),
74-
loopExecutions: z.record(z.any()).optional(),
75-
parallelExecutions: z.record(z.any()).optional(),
76-
parallelBlockMapping: z.record(z.any()).optional(),
77-
activeExecutionPath: z.array(z.string()),
78-
}),
65+
sourceSnapshot: z
66+
.object({
67+
blockStates: z.record(z.any()),
68+
executedBlocks: z.array(z.string()),
69+
blockLogs: z.array(z.any()),
70+
decisions: z.object({
71+
router: z.record(z.string()),
72+
condition: z.record(z.string()),
73+
}),
74+
completedLoops: z.array(z.string()),
75+
loopExecutions: z.record(z.any()).optional(),
76+
parallelExecutions: z.record(z.any()).optional(),
77+
parallelBlockMapping: z.record(z.any()).optional(),
78+
activeExecutionPath: z.array(z.string()),
79+
})
80+
.optional(),
81+
executionId: z.string().optional(),
7982
})
8083
.optional(),
8184
})
@@ -269,9 +272,30 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
269272
base64MaxBytes,
270273
workflowStateOverride,
271274
stopAfterBlockId,
272-
runFromBlock,
275+
runFromBlock: rawRunFromBlock,
273276
} = validation.data
274277

278+
// Resolve runFromBlock snapshot from executionId if needed
279+
let runFromBlock = rawRunFromBlock
280+
if (runFromBlock && !runFromBlock.sourceSnapshot && runFromBlock.executionId) {
281+
const {
282+
getExecutionState,
283+
getLatestExecutionState,
284+
} = await import('@/lib/workflows/executor/execution-state')
285+
const snapshot = runFromBlock.executionId === 'latest'
286+
? await getLatestExecutionState(id)
287+
: await getExecutionState(runFromBlock.executionId)
288+
if (!snapshot) {
289+
return NextResponse.json(
290+
{
291+
error: `No execution state found for ${runFromBlock.executionId === 'latest' ? 'workflow' : `execution ${runFromBlock.executionId}`}. Run the full workflow first.`,
292+
},
293+
{ status: 400 }
294+
)
295+
}
296+
runFromBlock = { startBlockId: runFromBlock.startBlockId, sourceSnapshot: snapshot }
297+
}
298+
275299
// For API key and internal JWT auth, the entire body is the input (except for our control fields)
276300
// For session auth, the input is explicitly provided in the input field
277301
const input =

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/components/tool-call/tool-call.tsx

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ import { LoopTool } from '@/app/workspace/[workspaceId]/w/[workflowId]/component
1818
import { ParallelTool } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/subflows/parallel/parallel-config'
1919
import { getDisplayValue } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/workflow-block'
2020
import { getBlock } from '@/blocks/registry'
21+
import {
22+
CLIENT_EXECUTABLE_RUN_TOOLS,
23+
executeRunToolOnClient,
24+
} from '@/lib/copilot/client-sse/run-tool-execution'
2125
import type { CopilotToolCall } from '@/stores/panel'
2226
import { useCopilotStore } from '@/stores/panel'
2327
import type { SubAgentContentBlock } from '@/stores/panel/copilot/types'
@@ -1277,6 +1281,14 @@ async function handleRun(
12771281
setToolCallState(toolCall, 'executing', editedParams ? { params: editedParams } : undefined)
12781282
onStateChange?.('executing')
12791283
await sendToolDecision(toolCall.id, 'accepted')
1284+
1285+
// Client-executable run tools: execute on the client for real-time feedback
1286+
// (block pulsing, console logs, stop button). The server defers execution
1287+
// for these tools; the client reports back via mark-complete.
1288+
if (CLIENT_EXECUTABLE_RUN_TOOLS.has(toolCall.name)) {
1289+
const params = editedParams || toolCall.params || {}
1290+
executeRunToolOnClient(toolCall.id, toolCall.name, params)
1291+
}
12801292
}
12811293

12821294
async function handleSkip(toolCall: CopilotToolCall, setToolCallState: any, onStateChange?: any) {

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ export interface WorkflowExecutionOptions {
1111
executionId?: string
1212
onBlockComplete?: (blockId: string, output: any) => Promise<void>
1313
overrideTriggerType?: 'chat' | 'manual' | 'api'
14+
stopAfterBlockId?: string
15+
/** For run_from_block / run_block: start from a specific block using cached state */
16+
runFromBlock?: {
17+
startBlockId: string
18+
executionId?: string
19+
}
1420
}
1521

1622
/**
@@ -39,6 +45,15 @@ export async function executeWorkflowWithFullLogging(
3945
triggerType: options.overrideTriggerType || 'manual',
4046
useDraftState: true,
4147
isClientSession: true,
48+
...(options.stopAfterBlockId ? { stopAfterBlockId: options.stopAfterBlockId } : {}),
49+
...(options.runFromBlock
50+
? {
51+
runFromBlock: {
52+
startBlockId: options.runFromBlock.startBlockId,
53+
executionId: options.runFromBlock.executionId || 'latest',
54+
},
55+
}
56+
: {}),
4257
}
4358

4459
const response = await fetch(`/api/workflows/${activeWorkflowId}/execute`, {

apps/sim/lib/copilot/client-sse/handlers.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,15 @@ import { useWorkflowDiffStore } from '@/stores/workflow-diff/store'
1616
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
1717
import type { WorkflowState } from '@/stores/workflows/workflow/types'
1818
import { appendTextBlock, beginThinkingBlock, finalizeThinkingBlock } from './content-blocks'
19+
import {
20+
CLIENT_EXECUTABLE_RUN_TOOLS,
21+
executeRunToolOnClient,
22+
} from './run-tool-execution'
1923
import type { ClientContentBlock, ClientStreamingContext } from './types'
2024

2125
const logger = createLogger('CopilotClientSseHandlers')
2226
const TEXT_BLOCK_TYPE = 'text'
27+
2328
const MAX_BATCH_INTERVAL = 50
2429
const MIN_BATCH_INTERVAL = 16
2530
const MAX_QUEUE_SIZE = 5
@@ -408,6 +413,39 @@ export const sseHandlers: Record<string, SSEHandler> = {
408413
})
409414
}
410415
}
416+
417+
// Generate API key: update deployment status with the new key
418+
if (targetState === ClientToolCallState.success && current.name === 'generate_api_key') {
419+
try {
420+
const resultPayload = asRecord(
421+
data?.result || eventData.result || eventData.data || data?.data
422+
)
423+
const input = asRecord(current.params)
424+
const workflowId =
425+
(input?.workflowId as string) || useWorkflowRegistry.getState().activeWorkflowId
426+
const apiKey = (resultPayload?.apiKey || resultPayload?.key) as string | undefined
427+
if (workflowId) {
428+
const existingStatus =
429+
useWorkflowRegistry.getState().getWorkflowDeploymentStatus(workflowId)
430+
useWorkflowRegistry
431+
.getState()
432+
.setDeploymentStatus(
433+
workflowId,
434+
existingStatus?.isDeployed ?? false,
435+
existingStatus?.deployedAt,
436+
apiKey
437+
)
438+
logger.info('[SSE] Updated deployment status with API key', {
439+
workflowId,
440+
hasKey: !!apiKey,
441+
})
442+
}
443+
} catch (err) {
444+
logger.warn('[SSE] Failed to hydrate API key status', {
445+
error: err instanceof Error ? err.message : String(err),
446+
})
447+
}
448+
}
411449
}
412450

413451
for (let i = 0; i < context.contentBlocks.length; i++) {
@@ -588,6 +626,16 @@ export const sseHandlers: Record<string, SSEHandler> = {
588626
sendAutoAcceptConfirmation(id)
589627
}
590628

629+
// Client-executable run tools: execute on the client for real-time feedback
630+
// (block pulsing, console logs, stop button). The server defers execution
631+
// for these tools in interactive mode; the client reports back via mark-complete.
632+
if (
633+
CLIENT_EXECUTABLE_RUN_TOOLS.has(toolName) &&
634+
initialState === ClientToolCallState.executing
635+
) {
636+
executeRunToolOnClient(id, toolName, args || existing?.params || {})
637+
}
638+
591639
// OAuth: dispatch event to open the OAuth connect modal
592640
if (toolName === 'oauth_request_access' && args && typeof window !== 'undefined') {
593641
try {

0 commit comments

Comments
 (0)