Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,42 @@ def _extract_agent_references(self, yaml_content: str) -> List[str]:
logger.error(f'Error extracting agent references from YAML: {str(e)}')
return []

def _extract_subworkflow_references(self, yaml_content: str) -> List[str]:
"""
Extract subworkflow references (namespace/workflow_name) from workflow YAML

Args:
yaml_content: YAML configuration content

Returns:
List of subworkflow references in format 'namespace/workflow_name'
"""
try:
yaml_data = yaml.safe_load(yaml_content)
arium_config = yaml_data.get('arium', {})
ariums_config = arium_config.get('ariums', [])

subworkflow_references = []
for arium_def in ariums_config:
arium_name = arium_def.get('name', '')
# If arium name contains '/', it's a reference to cloud storage
if '/' in arium_name:
# Check if this is a reference (not an inline definition)
# If it has agents, workflow, etc., it's inline, not a reference
if (
arium_def.get('agents') is None
and arium_def.get('workflow') is None
and arium_def.get('function_nodes') is None
and arium_def.get('yaml_file') is None
):
subworkflow_references.append(arium_name)
logger.info(f'Found subworkflow reference: {arium_name}')

return subworkflow_references
except Exception as e:
logger.error(f'Error extracting subworkflow references from YAML: {str(e)}')
return []

async def _build_referenced_agents(
self,
agent_references: List[str],
Expand Down Expand Up @@ -158,6 +194,132 @@ async def _build_referenced_agents(

return agents_dict

async def get_workflow_yaml_from_bucket(
self, workflow_name: str, namespace: str
) -> str:
"""
Get workflow YAML content by name and namespace (for workflow references)

This method is used to fetch subworkflow YAML when they
have namespace/workflow_name references

Args:
workflow_name: The workflow name
namespace: The namespace name

Returns:
str: The YAML content as string

Raises:
ValueError: If workflow not found
"""
# Try YAML cache first
yaml_cache_key = get_workflow_yaml_cache_key(namespace, workflow_name)
cached_yaml = self.cache_manager.get_str(yaml_cache_key)

if cached_yaml:
logger.info(
f'Cache hit for workflow YAML - namespace: {namespace}, name: {workflow_name}'
)
return cached_yaml

# Fetch YAML from cloud storage
yaml_key = get_workflow_yaml_key(namespace, workflow_name)
logger.info(f'Fetching workflow YAML from storage - key: {yaml_key}')

try:
yaml_bytes = self.cloud_storage_manager.read_file(
self.bucket_name, yaml_key
)
yaml_content = yaml_bytes.decode('utf-8')

# Cache YAML
self.cache_manager.add(yaml_cache_key, yaml_content, expiry=self.cache_ttl)
except CloudStorageFileNotFoundError:
logger.error(
f'YAML not found in cloud storage for workflow: {namespace}/{workflow_name}'
)
raise ValueError(
f'Workflow YAML not found for workflow: {namespace}/{workflow_name}'
)

logger.info(
f'Successfully retrieved workflow YAML - namespace: {namespace}, name: {workflow_name}'
)
return yaml_content

async def _inline_subworkflow_references(
self,
yaml_content: str,
subworkflow_references: List[str],
) -> str:
"""
Fetch subworkflow YAML and inline them into the parent workflow YAML

Args:
yaml_content: Original YAML configuration content
subworkflow_references: List of subworkflow references to inline

Returns:
Modified YAML content with inlined subworkflow definitions
"""
if not subworkflow_references:
return yaml_content

yaml_data = yaml.safe_load(yaml_content)
arium_config = yaml_data.get('arium', {})
ariums_config = arium_config.get('ariums', [])

# Build a dict to quickly look up subworkflow configs
subworkflow_configs = {}
for ref in subworkflow_references:
parts = ref.split('/', 1)
namespace = parts[0]
workflow_name = parts[1]

logger.info(f'Fetching subworkflow YAML for inlining: {ref}')

# Fetch the subworkflow YAML
subworkflow_yaml_content = await self.get_workflow_yaml_from_bucket(
workflow_name, namespace
)

# Parse and extract the arium config
subworkflow_data = yaml.safe_load(subworkflow_yaml_content)
subworkflow_arium = subworkflow_data.get('arium', {})

subworkflow_configs[ref] = subworkflow_arium

# Replace references with inline definitions
updated_ariums = []
for arium_def in ariums_config:
arium_name = arium_def.get('name', '')

if arium_name in subworkflow_configs:
# This is a reference - inline it
logger.info(f'Inlining subworkflow: {arium_name}')

# Get the fetched subworkflow config
inline_config = subworkflow_configs[arium_name].copy()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Same shallow copy concern as in workflow_inference_service.py.

Use copy.deepcopy() to avoid potential mutation of cached subworkflow configurations.

Proposed fix
-                inline_config = subworkflow_configs[arium_name].copy()
+                import copy
+                inline_config = copy.deepcopy(subworkflow_configs[arium_name])

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In
@wavefront/server/modules/agents_module/agents_module/services/workflow_crud_service.py
around line 303, The shallow copy of subworkflow configs using
subworkflow_configs[arium_name].copy() can lead to mutations of cached configs;
replace the shallow copy with a deep copy by importing and using copy.deepcopy
when creating inline_config in workflow_crud_service (the assignment to
inline_config referencing subworkflow_configs and arium_name) so nested
structures are fully cloned and cached entries aren't mutated.


# Preserve inherit_variables from the reference
if 'inherit_variables' in arium_def:
inline_config['inherit_variables'] = arium_def['inherit_variables']

# Update the name to be local (remove namespace prefix)
inline_config['name'] = arium_name.split('/')[-1]

updated_ariums.append(inline_config)
else:
# Not a reference - keep as is
updated_ariums.append(arium_def)

# Update the YAML data
yaml_data['arium']['ariums'] = updated_ariums

# Convert back to YAML string
return yaml.dump(yaml_data, default_flow_style=False, sort_keys=False)

async def _validate_yaml_content(
self,
yaml_content: str,
Expand All @@ -178,6 +340,16 @@ async def _validate_yaml_content(
ValueError: If YAML is invalid or workflow cannot be built
"""
try:
# Extract and inline subworkflow references
subworkflow_references = self._extract_subworkflow_references(yaml_content)
if subworkflow_references:
logger.info(
f'Inlining {len(subworkflow_references)} referenced subworkflows for validation'
)
yaml_content = await self._inline_subworkflow_references(
yaml_content, subworkflow_references
)

# Extract and build referenced agents
agent_references = self._extract_agent_references(yaml_content)
agents_dict = {}
Expand All @@ -190,7 +362,7 @@ async def _validate_yaml_content(
agent_references, access_token, app_key
)

# Validate workflow with pre-built agents
# Validate workflow with pre-built agents and inlined subworkflows
arium_instance = AriumBuilder.from_yaml(
yaml_str=yaml_content,
agents=agents_dict,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,42 @@ def _extract_agent_references(self, yaml_content: str) -> List[str]:
logger.error(f'Error extracting agent references from YAML: {str(e)}')
return []

def _extract_subworkflow_references(self, yaml_content: str) -> List[str]:
"""
Extract subworkflow references (namespace/workflow_name) from workflow YAML

Args:
yaml_content: YAML configuration content

Returns:
List of subworkflow references in format 'namespace/workflow_name'
"""
try:
yaml_data = yaml.safe_load(yaml_content)
arium_config = yaml_data.get('arium', {})
ariums_config = arium_config.get('ariums', [])

subworkflow_references = []
for arium_def in ariums_config:
arium_name = arium_def.get('name', '')
# If arium name contains '/', it's a reference to cloud storage
if '/' in arium_name:
# Check if this is a reference (not an inline definition)
# If it has agents, workflow, etc., it's inline, not a reference
if (
arium_def.get('agents') is None
and arium_def.get('workflow') is None
and arium_def.get('function_nodes') is None
and arium_def.get('yaml_file') is None
):
subworkflow_references.append(arium_name)
logger.info(f'Found subworkflow reference: {arium_name}')

return subworkflow_references
except Exception as e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we catching this here ? Shouldnt we just throw exception, and let the caller know ?

logger.error(f'Error extracting subworkflow references from YAML: {str(e)}')
return []

async def _build_referenced_agents(
self,
agent_references: List[str],
Expand Down Expand Up @@ -181,6 +217,78 @@ async def _build_referenced_agents(

return agents_dict

async def _inline_subworkflow_references(
self,
yaml_content: str,
subworkflow_references: List[str],
) -> str:
"""
Fetch subworkflow YAML and inline them into the parent workflow YAML

Args:
yaml_content: Original YAML configuration content
subworkflow_references: List of subworkflow references to inline

Returns:
Modified YAML content with inlined subworkflow definitions
"""
if not subworkflow_references:
return yaml_content

yaml_data = yaml.safe_load(yaml_content)
arium_config = yaml_data.get('arium', {})
ariums_config = arium_config.get('ariums', [])

# Build a dict to quickly look up subworkflow configs
subworkflow_configs = {}
for ref in subworkflow_references:
parts = ref.split('/', 1)
namespace = parts[0]
workflow_name = parts[1]

logger.info(f'Fetching subworkflow YAML for inlining: {ref}')

# Fetch the subworkflow YAML using existing fetch method
subworkflow_yaml_content = await self.fetch_workflow_yaml(
workflow_name, namespace
)

# Parse and extract the arium config
subworkflow_data = yaml.safe_load(subworkflow_yaml_content)
subworkflow_arium = subworkflow_data.get('arium', {})

subworkflow_configs[ref] = subworkflow_arium

# Replace references with inline definitions
updated_ariums = []
for arium_def in ariums_config:
arium_name = arium_def.get('name', '')

if arium_name in subworkflow_configs:
# This is a reference - inline it
logger.info(f'Inlining subworkflow: {arium_name}')

# Get the fetched subworkflow config
inline_config = subworkflow_configs[arium_name].copy()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Shallow copy may cause unintended side effects.

Using .copy() creates a shallow copy. If subworkflow_arium contains nested dictionaries (like agents, workflow, etc.), modifications to inline_config could mutate the cached original, causing subtle bugs in subsequent workflow builds.

Proposed fix
-                inline_config = subworkflow_configs[arium_name].copy()
+                import copy
+                inline_config = copy.deepcopy(subworkflow_configs[arium_name])

Or import copy at the top of the file.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
inline_config = subworkflow_configs[arium_name].copy()
import copy
inline_config = copy.deepcopy(subworkflow_configs[arium_name])
🤖 Prompt for AI Agents
In
@wavefront/server/modules/agents_module/agents_module/services/workflow_inference_service.py
around line 272, The current use of subworkflow_configs[arium_name].copy()
creates a shallow copy and can allow nested structures to be mutated; replace it
with a deep copy by importing the copy module and using
copy.deepcopy(subworkflow_configs[arium_name]) when assigning inline_config so
changes to inline_config (e.g., nested keys like 'agents' or 'workflow') won’t
mutate the cached subworkflow_configs entry.


# Preserve inherit_variables from the reference
if 'inherit_variables' in arium_def:
inline_config['inherit_variables'] = arium_def['inherit_variables']

# Update the name to be local (remove namespace prefix)
inline_config['name'] = arium_name.split('/')[-1]

updated_ariums.append(inline_config)
else:
# Not a reference - keep as is
updated_ariums.append(arium_def)

# Update the YAML data
yaml_data['arium']['ariums'] = updated_ariums

# Convert back to YAML string
return yaml.dump(yaml_data, default_flow_style=False, sort_keys=False)

async def create_workflow_from_yaml(
self,
yaml_content: str,
Expand All @@ -200,6 +308,16 @@ async def create_workflow_from_yaml(
"""
logger.info(f'Creating workflow from YAML for workflow: {workflow_name}')

# Extract and inline subworkflow references
subworkflow_references = self._extract_subworkflow_references(yaml_content)
if subworkflow_references:
logger.info(
f'Inlining {len(subworkflow_references)} referenced subworkflows for workflow {workflow_name}'
)
yaml_content = await self._inline_subworkflow_references(
yaml_content, subworkflow_references
)

# Extract and build referenced agents
agent_references = self._extract_agent_references(yaml_content)
agents_dict = {}
Expand All @@ -212,7 +330,7 @@ async def create_workflow_from_yaml(
agent_references, access_token, app_key
)

# Build workflow with pre-built agents
# Build workflow with pre-built agents and inlined subworkflows
workflow_builder = AriumBuilder.from_yaml(
agents=agents_dict,
yaml_str=yaml_content,
Expand Down