diff --git a/wavefront/server/modules/agents_module/agents_module/services/workflow_crud_service.py b/wavefront/server/modules/agents_module/agents_module/services/workflow_crud_service.py index e09087c..b84330c 100644 --- a/wavefront/server/modules/agents_module/agents_module/services/workflow_crud_service.py +++ b/wavefront/server/modules/agents_module/agents_module/services/workflow_crud_service.py @@ -85,6 +85,42 @@ def _extract_agent_references(self, yaml_content: str) -> List[str]: logger.error(f'Error extracting agent references from YAML: {str(e)}') return [] + def _extract_subworkflow_references(self, yaml_content: str) -> List[str]: + """ + Extract subworkflow references (namespace/workflow_name) from workflow YAML + + Args: + yaml_content: YAML configuration content + + Returns: + List of subworkflow references in format 'namespace/workflow_name' + """ + try: + yaml_data = yaml.safe_load(yaml_content) + arium_config = yaml_data.get('arium', {}) + ariums_config = arium_config.get('ariums', []) + + subworkflow_references = [] + for arium_def in ariums_config: + arium_name = arium_def.get('name', '') + # If arium name contains '/', it's a reference to cloud storage + if '/' in arium_name: + # Check if this is a reference (not an inline definition) + # If it has agents, workflow, etc., it's inline, not a reference + if ( + arium_def.get('agents') is None + and arium_def.get('workflow') is None + and arium_def.get('function_nodes') is None + and arium_def.get('yaml_file') is None + ): + subworkflow_references.append(arium_name) + logger.info(f'Found subworkflow reference: {arium_name}') + + return subworkflow_references + except Exception as e: + logger.error(f'Error extracting subworkflow references from YAML: {str(e)}') + return [] + async def _build_referenced_agents( self, agent_references: List[str], @@ -158,6 +194,132 @@ async def _build_referenced_agents( return agents_dict + async def get_workflow_yaml_from_bucket( + self, workflow_name: str, namespace: str + ) -> str: + """ + Get workflow YAML content by name and namespace (for workflow references) + + This method is used to fetch subworkflow YAML when they + have namespace/workflow_name references + + Args: + workflow_name: The workflow name + namespace: The namespace name + + Returns: + str: The YAML content as string + + Raises: + ValueError: If workflow not found + """ + # Try YAML cache first + yaml_cache_key = get_workflow_yaml_cache_key(namespace, workflow_name) + cached_yaml = self.cache_manager.get_str(yaml_cache_key) + + if cached_yaml: + logger.info( + f'Cache hit for workflow YAML - namespace: {namespace}, name: {workflow_name}' + ) + return cached_yaml + + # Fetch YAML from cloud storage + yaml_key = get_workflow_yaml_key(namespace, workflow_name) + logger.info(f'Fetching workflow YAML from storage - key: {yaml_key}') + + try: + yaml_bytes = self.cloud_storage_manager.read_file( + self.bucket_name, yaml_key + ) + yaml_content = yaml_bytes.decode('utf-8') + + # Cache YAML + self.cache_manager.add(yaml_cache_key, yaml_content, expiry=self.cache_ttl) + except CloudStorageFileNotFoundError: + logger.error( + f'YAML not found in cloud storage for workflow: {namespace}/{workflow_name}' + ) + raise ValueError( + f'Workflow YAML not found for workflow: {namespace}/{workflow_name}' + ) + + logger.info( + f'Successfully retrieved workflow YAML - namespace: {namespace}, name: {workflow_name}' + ) + return yaml_content + + async def _inline_subworkflow_references( + self, + yaml_content: str, + subworkflow_references: List[str], + ) -> str: + """ + Fetch subworkflow YAML and inline them into the parent workflow YAML + + Args: + yaml_content: Original YAML configuration content + subworkflow_references: List of subworkflow references to inline + + Returns: + Modified YAML content with inlined subworkflow definitions + """ + if not subworkflow_references: + return yaml_content + + yaml_data = yaml.safe_load(yaml_content) + arium_config = yaml_data.get('arium', {}) + ariums_config = arium_config.get('ariums', []) + + # Build a dict to quickly look up subworkflow configs + subworkflow_configs = {} + for ref in subworkflow_references: + parts = ref.split('/', 1) + namespace = parts[0] + workflow_name = parts[1] + + logger.info(f'Fetching subworkflow YAML for inlining: {ref}') + + # Fetch the subworkflow YAML + subworkflow_yaml_content = await self.get_workflow_yaml_from_bucket( + workflow_name, namespace + ) + + # Parse and extract the arium config + subworkflow_data = yaml.safe_load(subworkflow_yaml_content) + subworkflow_arium = subworkflow_data.get('arium', {}) + + subworkflow_configs[ref] = subworkflow_arium + + # Replace references with inline definitions + updated_ariums = [] + for arium_def in ariums_config: + arium_name = arium_def.get('name', '') + + if arium_name in subworkflow_configs: + # This is a reference - inline it + logger.info(f'Inlining subworkflow: {arium_name}') + + # Get the fetched subworkflow config + inline_config = subworkflow_configs[arium_name].copy() + + # Preserve inherit_variables from the reference + if 'inherit_variables' in arium_def: + inline_config['inherit_variables'] = arium_def['inherit_variables'] + + # Update the name to be local (remove namespace prefix) + inline_config['name'] = arium_name.split('/')[-1] + + updated_ariums.append(inline_config) + else: + # Not a reference - keep as is + updated_ariums.append(arium_def) + + # Update the YAML data + yaml_data['arium']['ariums'] = updated_ariums + + # Convert back to YAML string + return yaml.dump(yaml_data, default_flow_style=False, sort_keys=False) + async def _validate_yaml_content( self, yaml_content: str, @@ -178,6 +340,16 @@ async def _validate_yaml_content( ValueError: If YAML is invalid or workflow cannot be built """ try: + # Extract and inline subworkflow references + subworkflow_references = self._extract_subworkflow_references(yaml_content) + if subworkflow_references: + logger.info( + f'Inlining {len(subworkflow_references)} referenced subworkflows for validation' + ) + yaml_content = await self._inline_subworkflow_references( + yaml_content, subworkflow_references + ) + # Extract and build referenced agents agent_references = self._extract_agent_references(yaml_content) agents_dict = {} @@ -190,7 +362,7 @@ async def _validate_yaml_content( agent_references, access_token, app_key ) - # Validate workflow with pre-built agents + # Validate workflow with pre-built agents and inlined subworkflows arium_instance = AriumBuilder.from_yaml( yaml_str=yaml_content, agents=agents_dict, diff --git a/wavefront/server/modules/agents_module/agents_module/services/workflow_inference_service.py b/wavefront/server/modules/agents_module/agents_module/services/workflow_inference_service.py index 8762e41..3a69466 100644 --- a/wavefront/server/modules/agents_module/agents_module/services/workflow_inference_service.py +++ b/wavefront/server/modules/agents_module/agents_module/services/workflow_inference_service.py @@ -106,6 +106,42 @@ def _extract_agent_references(self, yaml_content: str) -> List[str]: logger.error(f'Error extracting agent references from YAML: {str(e)}') return [] + def _extract_subworkflow_references(self, yaml_content: str) -> List[str]: + """ + Extract subworkflow references (namespace/workflow_name) from workflow YAML + + Args: + yaml_content: YAML configuration content + + Returns: + List of subworkflow references in format 'namespace/workflow_name' + """ + try: + yaml_data = yaml.safe_load(yaml_content) + arium_config = yaml_data.get('arium', {}) + ariums_config = arium_config.get('ariums', []) + + subworkflow_references = [] + for arium_def in ariums_config: + arium_name = arium_def.get('name', '') + # If arium name contains '/', it's a reference to cloud storage + if '/' in arium_name: + # Check if this is a reference (not an inline definition) + # If it has agents, workflow, etc., it's inline, not a reference + if ( + arium_def.get('agents') is None + and arium_def.get('workflow') is None + and arium_def.get('function_nodes') is None + and arium_def.get('yaml_file') is None + ): + subworkflow_references.append(arium_name) + logger.info(f'Found subworkflow reference: {arium_name}') + + return subworkflow_references + except Exception as e: + logger.error(f'Error extracting subworkflow references from YAML: {str(e)}') + return [] + async def _build_referenced_agents( self, agent_references: List[str], @@ -181,6 +217,78 @@ async def _build_referenced_agents( return agents_dict + async def _inline_subworkflow_references( + self, + yaml_content: str, + subworkflow_references: List[str], + ) -> str: + """ + Fetch subworkflow YAML and inline them into the parent workflow YAML + + Args: + yaml_content: Original YAML configuration content + subworkflow_references: List of subworkflow references to inline + + Returns: + Modified YAML content with inlined subworkflow definitions + """ + if not subworkflow_references: + return yaml_content + + yaml_data = yaml.safe_load(yaml_content) + arium_config = yaml_data.get('arium', {}) + ariums_config = arium_config.get('ariums', []) + + # Build a dict to quickly look up subworkflow configs + subworkflow_configs = {} + for ref in subworkflow_references: + parts = ref.split('/', 1) + namespace = parts[0] + workflow_name = parts[1] + + logger.info(f'Fetching subworkflow YAML for inlining: {ref}') + + # Fetch the subworkflow YAML using existing fetch method + subworkflow_yaml_content = await self.fetch_workflow_yaml( + workflow_name, namespace + ) + + # Parse and extract the arium config + subworkflow_data = yaml.safe_load(subworkflow_yaml_content) + subworkflow_arium = subworkflow_data.get('arium', {}) + + subworkflow_configs[ref] = subworkflow_arium + + # Replace references with inline definitions + updated_ariums = [] + for arium_def in ariums_config: + arium_name = arium_def.get('name', '') + + if arium_name in subworkflow_configs: + # This is a reference - inline it + logger.info(f'Inlining subworkflow: {arium_name}') + + # Get the fetched subworkflow config + inline_config = subworkflow_configs[arium_name].copy() + + # Preserve inherit_variables from the reference + if 'inherit_variables' in arium_def: + inline_config['inherit_variables'] = arium_def['inherit_variables'] + + # Update the name to be local (remove namespace prefix) + inline_config['name'] = arium_name.split('/')[-1] + + updated_ariums.append(inline_config) + else: + # Not a reference - keep as is + updated_ariums.append(arium_def) + + # Update the YAML data + yaml_data['arium']['ariums'] = updated_ariums + + # Convert back to YAML string + return yaml.dump(yaml_data, default_flow_style=False, sort_keys=False) + async def create_workflow_from_yaml( self, yaml_content: str, @@ -200,6 +308,16 @@ async def create_workflow_from_yaml( """ logger.info(f'Creating workflow from YAML for workflow: {workflow_name}') + # Extract and inline subworkflow references + subworkflow_references = self._extract_subworkflow_references(yaml_content) + if subworkflow_references: + logger.info( + f'Inlining {len(subworkflow_references)} referenced subworkflows for workflow {workflow_name}' + ) + yaml_content = await self._inline_subworkflow_references( + yaml_content, subworkflow_references + ) + # Extract and build referenced agents agent_references = self._extract_agent_references(yaml_content) agents_dict = {} @@ -212,7 +330,7 @@ async def create_workflow_from_yaml( agent_references, access_token, app_key ) - # Build workflow with pre-built agents + # Build workflow with pre-built agents and inlined subworkflows workflow_builder = AriumBuilder.from_yaml( agents=agents_dict, yaml_str=yaml_content,