Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ node_modules/
data/
temp/
WareHouse/

3 changes: 2 additions & 1 deletion frontend/src/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@
"alert_download_failed": "Failed to download file, please try again.",
"alert_download_logs_failed": "Download failed, please try again later",
"no_initial_instructions": "No initial instructions provided",
"workflow_cancelled": "Workflow cancelled"
"workflow_cancelled": "Workflow cancelled",
"reconnected": "Reconnected to existing session"
},
"form_generator": {
"advanced_settings": "Advanced Settings",
Expand Down
3 changes: 2 additions & 1 deletion frontend/src/locales/zh.json
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@
"alert_download_failed": "下载文件失败,请重试。",
"alert_download_logs_failed": "下载失败,请稍后重试",
"no_initial_instructions": "未提供初始说明",
"workflow_cancelled": "工作流已取消"
"workflow_cancelled": "工作流已取消",
"reconnected": "已重新连接到现有会话"
},
"components": {
"workflow_edge": {
Expand Down
174 changes: 149 additions & 25 deletions frontend/src/pages/LaunchView.vue
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ const clearUploadedAttachments = () => {
}

// Reset the WebSocket connection and related state
const resetConnectionState = ({ closeSocket = true } = {}) => {
const resetConnectionState = ({ closeSocket = true, keepSession = false } = {}) => {
if (closeSocket && ws) {
try {
ws.close()
Expand All @@ -769,20 +769,29 @@ const resetConnectionState = ({ closeSocket = true } = {}) => {
}

ws = null
sessionId = null
isConnectionReady.value = false
shouldGlow.value = false
isWorkflowRunning.value = false
activeNodes.value = []

if (!keepSession) {
sessionId = null
isWorkflowRunning.value = false
activeNodes.value = []
shouldGlow.value = false
clearUploadedAttachments()
chatMessages.value = []
nodesLoadingMessagesMap.clear()
nameToSpriteMap.value.clear()
nodeSpriteMap.value.clear()
}

if (attachmentHoverTimeout) {
clearTimeout(attachmentHoverTimeout)
attachmentHoverTimeout = null
}
clearUploadedAttachments()
}

// Button state management
const isWorkflowRunning = ref(false)
const isReconnecting = ref(false)

// Active node list
const activeNodes = ref([])
Expand Down Expand Up @@ -1432,12 +1441,28 @@ const sendHumanInput = () => {
}

// Establish a WebSocket connection
const establishWebSocketConnection = () => {
// Reset any previous state before creating a new socket
resetConnectionState()
const establishWebSocketConnection = (options = {}) => {
let { sessionId: reconnectSid } = options

// If no explicit sessionId, check URL for an existing session
if (!reconnectSid) {
const urlSession = route.query?.session
if (urlSession && typeof urlSession === 'string' && urlSession.trim()) {
reconnectSid = urlSession.trim()
}
}

if (!selectedFile.value) {
return
const reconnecting = !!reconnectSid

if (reconnecting) {
isReconnecting.value = true
resetConnectionState({ closeSocket: true, keepSession: true })
status.value = 'Connecting...'
} else {
resetConnectionState()
if (!selectedFile.value) {
return
}
}

const apiBase = import.meta.env.VITE_API_BASE_URL || ''
Expand All @@ -1457,7 +1482,9 @@ const establishWebSocketConnection = () => {
}
}

const wsUrl = `${scheme}//${host}/ws`
const wsUrl = reconnecting
? `${scheme}//${host}/ws?session_id=${encodeURIComponent(reconnectSid)}`
: `${scheme}//${host}/ws`
const socket = new WebSocket(wsUrl)
ws = socket

Expand Down Expand Up @@ -1485,12 +1512,15 @@ const establishWebSocketConnection = () => {
}

isConnectionReady.value = true
shouldGlow.value = true
status.value = 'Waiting for launch...'

nextTick(() => {
taskInputRef.value?.focus()
})
// For new connections, set initial state; reconnections are handled by session_resumed
if (!isReconnecting.value) {
shouldGlow.value = true
status.value = 'Waiting for launch...'
nextTick(() => {
taskInputRef.value?.focus()
})
}
} else {
processMessage(msg)
}
Expand Down Expand Up @@ -1522,6 +1552,11 @@ const establishWebSocketConnection = () => {

// Watch for file selection changes
watch(selectedFile, (newFile) => {
// When reconnecting, selectedFile is set by session_resumed; skip the normal flow
if (isReconnecting.value) {
return
}

taskPrompt.value = ''
fileSearchQuery.value = newFile || ''
isFileSearchDirty.value = false
Expand Down Expand Up @@ -1555,10 +1590,18 @@ watch(
}
)

onMounted(() => {
onMounted(async () => {
document.addEventListener('click', handleClickOutside)
document.addEventListener('keydown', handleKeydown)
loadWorkflows()
await loadWorkflows()
// If URL contains a session id, the watch on selectedFile (triggered by
// applyWorkflowFromRoute inside loadWorkflows) will call establishWebSocketConnection,
// which auto-detects the session param and reconnects.
// Fallback: if session is present but no workflow was in URL, connect directly.
const sessionParam = route.query?.session
if (sessionParam && typeof sessionParam === 'string' && sessionParam.trim() && !selectedFile.value) {
establishWebSocketConnection({ sessionId: sessionParam.trim() })
}
})

onUnmounted(() => {
Expand Down Expand Up @@ -1836,6 +1879,15 @@ const launchWorkflow = async () => {

status.value = 'Running...'
isWorkflowRunning.value = true

// Persist session id in URL for reconnection after refresh
router.push({
query: {
...route.query,
workflow: selectedFile.value,
session: sessionId
}
})
} else {
const error = await response.json().catch(() => ({}))
console.error('Failed to launch workflow:', error)
Expand Down Expand Up @@ -2025,6 +2077,68 @@ const animateSpriteAlongEdge = (edge) => {
const processMessage = async (msg) => {
console.log('Message: ', msg)

// Session resumed after reconnection — sync final UI state
if (msg.type === 'session_resumed') {
const data = msg.data
sessionId = data.session_id

// Restore workflow selection without clearing chat (messages were already replayed)
// Set selectedFile BEFORE clearing isReconnecting so the watch skips
if (data.yaml_file) {
selectedFile.value = data.yaml_file
fileSearchQuery.value = data.yaml_file
// Load YAML data and sprites (but don't clear chat)
try {
const yamlContentString = await fetchWorkflowYAML(data.yaml_file)
const parsedYaml = yaml.load(yamlContentString)
workflowYaml.value = parsedYaml || {}

const yamlNodes = Array.isArray(parsedYaml?.graph?.nodes) ? parsedYaml.graph.nodes : []
for (const node of yamlNodes) {
if (node.id && !nodeSpriteMap.value.has(node.id)) {
const spritePath = spriteFetcher.fetchSprite(node.id, 'D', 1)
nodeSpriteMap.value.set(node.id, spritePath)
}
}
} catch (e) {
console.error('Failed to load YAML on reconnect:', e)
}
}

isReconnecting.value = false

// Restore workflow status
const statusMap = {
'idle': 'Connected',
'running': 'Running...',
'waiting_for_input': 'Waiting for input...',
'completed': 'Completed',
'error': 'Error',
'cancelled': 'Cancelled',
}
status.value = statusMap[data.status] || 'Connected'

if (data.status === 'running' || data.status === 'waiting_for_input') {
isWorkflowRunning.value = true
}

if (data.status === 'waiting_for_input') {
shouldGlow.value = true
}

if (data.status === 'completed' || data.status === 'error' || data.status === 'cancelled') {
sessionIdToDownload = sessionId
}

if (data.current_node_id && !activeNodes.value.includes(data.current_node_id)) {
activeNodes.value.push(data.current_node_id)
}

isConnectionReady.value = true
addChatNotification(t('launch.reconnected'))
return
}

// Prompt for human input
if (msg.type === 'human_input_required') {
const fullMessage = msg.data.task_description + '\n\n' + msg.data.input
Expand Down Expand Up @@ -2184,6 +2298,14 @@ const processMessage = async (msg) => {
sessionIdToDownload = sessionId
}

// Workflow cancelled (e.g., from server-side cancellation)
if (msg.type === 'workflow_cancelled') {
addChatNotification(msg.data?.message || t('launch.workflow_cancelled'))
status.value = 'Cancelled'
isWorkflowRunning.value = false
sessionIdToDownload = sessionId
}

// Handle direct error messages (e.g., workflow execution errors)
if (msg.type === 'error') {
const errorMessage = msg.data?.message || 'Unknown error occurred'
Expand All @@ -2199,6 +2321,14 @@ const cancelWorkflow = () => {
if (!isWorkflowRunning.value || !ws) {
return
}

// Send cancel request through WebSocket so the server stops the workflow
try {
ws.send(JSON.stringify({ type: 'cancel' }))
} catch (sendError) {
console.warn('Failed to send cancel message:', sendError)
}

addChatNotification(t('launch.workflow_cancelled'))
status.value = 'Cancelled'
isWorkflowRunning.value = false
Expand All @@ -2214,12 +2344,6 @@ const cancelWorkflow = () => {
nodesLoadingMessagesMap.delete(nodeId)
}
}

try {
ws.close()
} catch (closeError) {
console.warn('Failed to close WebSocket:', closeError)
}
}

// Download logs
Expand Down
8 changes: 4 additions & 4 deletions server/routes/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@


@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
async def websocket_endpoint(websocket: WebSocket, session_id: str = ""):
manager = get_websocket_manager()
session_id = await manager.connect(websocket)
sid = await manager.connect(websocket, session_id=session_id or None)
try:
while True:
message = await websocket.receive_text()
await manager.handle_message(session_id, message)
await manager.handle_message(sid, message)
except WebSocketDisconnect:
manager.disconnect(session_id)
manager.disconnect(sid)
10 changes: 10 additions & 0 deletions server/services/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,22 @@ async def handle_message(self, session_id: str, data: Dict[str, Any], websocket_
await self._handle_ping(session_id, websocket_manager)
elif message_type == "get_status":
await self._handle_get_status(session_id, websocket_manager)
elif message_type == "cancel":
await self._handle_cancel(session_id, websocket_manager)
else:
await websocket_manager.send_message(
session_id,
{"type": "error", "data": {"message": f"Unknown message type: {message_type}"}},
)

async def _handle_cancel(self, session_id: str, websocket_manager):
if self.workflow_run_service:
self.workflow_run_service.request_cancel(session_id, reason="User requested cancellation")
await websocket_manager.send_message(
session_id,
{"type": "input_received", "data": {"message": "Cancellation requested"}},
)

async def _handle_human_input(self, session_id: str, data: Dict[str, Any], websocket_manager):
try:
payload = data.get("data", {}) or {}
Expand Down
27 changes: 27 additions & 0 deletions server/services/session_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ class WorkflowSession:
cancel_event: Event = field(default_factory=Event)
cancel_reason: Optional[str] = None

# Message buffer for reconnection replay
message_buffer: list = field(default_factory=list)

MAX_BUFFER_SIZE: int = 1000

def append_message(self, message: Dict[str, Any]) -> None:
if len(self.message_buffer) >= self.MAX_BUFFER_SIZE:
self.message_buffer.pop(0)
self.message_buffer.append(message)


class WorkflowSessionStore:
"""In-memory registry that tracks workflow session metadata."""
Expand Down Expand Up @@ -129,3 +139,20 @@ def list_sessions(self) -> Dict[str, Dict[str, Any]]:
def get_artifact_queue(self, session_id: str) -> Optional[ArtifactEventQueue]:
session = self._sessions.get(session_id)
return session.artifact_queue if session else None

def get_session_snapshot(self, session_id: str) -> Optional[Dict[str, Any]]:
session = self._sessions.get(session_id)
if not session:
return None
return {
"session_id": session.session_id,
"yaml_file": session.yaml_file,
"task_prompt": session.task_prompt,
"status": session.status.value,
"current_node_id": session.current_node_id,
"created_at": session.created_at,
"updated_at": session.updated_at,
"waiting_for_input": session.waiting_for_input,
"error_message": session.error_message,
"message_count": len(session.message_buffer),
}
Loading