Conversation
…tend-refactor/migrate-apps-to-workflows
…fact queries Previously, querying folders with parent_id=None or artifacts with folder_id=None would not filter for root-level items. Now uses model_fields_set to detect when the field was explicitly set to None vs not provided, allowing proper filtering for items without a parent folder.
…variants When a variant is archived, all its child revisions are now also soft-deleted so they no longer appear in revision queries that filter on deleted_at. Similarly, unarchiving a variant restores all its child revisions.
Allows filtering environment revisions by application references. The filter returns only revisions where the specified application's deployment changed compared to the previous revision, enabling efficient querying of deployment history for a specific application.
The flags field was missing from RunUpdate calls in evaluate_batch_testset, evaluate_batch_invocation, and _evaluate_batch_items, causing flags to be lost when updating run status.
Updated references from legacyAppRevision to workflow entities across multiple design documents. The legacy entity system has been fully removed and replaced by workflow entities.
…d workflow variant molecule - ivt-built-in-search: Plan for built-in search functionality in IVT - migrate-state-entities: Gap analysis and migration plan for testset/testcase entities - workflow-variant-molecule: Plan for workflow variant molecule implementation
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 75 out of 1219 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
api/oss/src/core/annotations/service.py:1
_edit_annotationis called withannotation_edit.references/annotation_edit.links(which can beNone). If the intent is to preserve existing references/links unless explicitly updated, this can accidentally clear them depending on downstream handling. Consider merging: use existing references/links as base and overlay only explicitly-set fields from the edit model.
from typing import Optional, List, Dict, Any
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| catalog = [ | ||
| _enrich_entry( | ||
| entry, evaluator_metadata=_evaluator_metadata_by_key().get(entry["key"]) |
There was a problem hiding this comment.
_evaluator_metadata_by_key() is recomputed for every entry in the list comprehension, turning catalog building into O(n²) work. Compute the metadata map once (e.g., metadata = _evaluator_metadata_by_key()) and then index into it inside the loop.
| catalog = [ | |
| _enrich_entry( | |
| entry, evaluator_metadata=_evaluator_metadata_by_key().get(entry["key"]) | |
| evaluator_metadata_by_key = _evaluator_metadata_by_key() | |
| catalog = [ | |
| _enrich_entry( | |
| entry, evaluator_metadata=evaluator_metadata_by_key.get(entry["key"]) |
| async def edit( | ||
| self, | ||
| *, | ||
| organization_id: UUID, | ||
| project_id: UUID, | ||
| user_id: UUID, | ||
| # | ||
| trace_id: str, | ||
| # | ||
| trace_edit: SimpleTraceEdit, | ||
| ) -> Optional[SimpleTrace]: | ||
| existing = await self.fetch(project_id=project_id, trace_id=trace_id) | ||
| if existing is None: | ||
| return None | ||
|
|
||
| _flags = self._flags(existing.origin, existing.kind, existing.channel) | ||
| _references = existing.references.model_dump( | ||
| mode="json", exclude_none=True, exclude_unset=True | ||
| ) | ||
| _links = build_otel_links(existing.links) | ||
| _attributes = build_simple_trace_attributes( | ||
| flags=_flags, | ||
| tags=trace_edit.tags, | ||
| meta=trace_edit.meta, | ||
| data=trace_edit.data, | ||
| references=_references, | ||
| ) |
There was a problem hiding this comment.
SimpleTraceEdit now includes optional references and links, but SimpleTracesService.edit() ignores them and always reuses existing.references / existing.links. Either (a) implement updates for references/links (merge semantics are usually safest), or (b) remove these fields from SimpleTraceEdit to avoid implying mutability that isn’t supported.
| # APPLICATION CATALOG ------------------------------------------------------ | ||
|
|
||
| @intercept_exceptions() | ||
| @intercept_exceptions() |
There was a problem hiding this comment.
Duplicate @intercept_exceptions() decorator. Remove one to avoid double-wrapping and to keep behavior consistent with other endpoints.
| @intercept_exceptions() |
| "ground_truth_key": True, # Tells the frontend that is the name of the column in the testset that should be shown as a ground truth to the user | ||
| "x-ag-ui-advanced": True, | ||
| "description": "The name of the column in the test data that contains the correct answer. This will be shown in the results page.", | ||
| }, |
There was a problem hiding this comment.
This change removes the previously-present ground_truth_key field (and similar occurrences elsewhere). If any frontend/consumer logic relies on ground_truth_key to identify the ground-truth column, the behavior will regress (no ground-truth highlighting/selection). If the field is still required, keep emitting it (even if deprecated), or ensure consumers have been updated to use the new representation.
| }, | |
| }, | |
| "ground_truth_key": { | |
| "label": "Expected Answer Column (deprecated)", | |
| "default": "correct_answer", | |
| "type": "string", | |
| "required": False, | |
| "x-ag-ui-advanced": True, | |
| "description": "Deprecated: use 'correct_answer_key'. Retained for backwards compatibility to indicate the column containing the correct answer.", | |
| }, |
| # Filter by application_refs if provided | ||
| # This filters revisions where the specified app's deployment CHANGED | ||
| # compared to the previous revision (or is the first deployment) | ||
| if application_refs: | ||
| app_ids = {str(ref.id) for ref in application_refs if ref.id} | ||
| if app_ids: | ||
| filtered_dbes = [] | ||
| prev_app_revision_ids: dict[str, str | None] = { | ||
| app_id: None for app_id in app_ids | ||
| } | ||
|
|
||
| # Revisions are ordered descending (newest first) | ||
| # We need to process in ascending order to detect changes | ||
| for dbe in reversed(revision_dbes): | ||
| if not dbe.data or not isinstance(dbe.data, dict): | ||
| continue | ||
|
|
||
| refs = dbe.data.get("references") | ||
| if not refs or not isinstance(refs, dict): | ||
| continue | ||
|
|
||
| # Check each app we're filtering for | ||
| for app_id in app_ids: | ||
| current_revision_id: str | None = None | ||
|
|
||
| # Find this app's revision in the current environment revision | ||
| for ref_data in refs.values(): | ||
| if not isinstance(ref_data, dict): | ||
| continue | ||
| app_ref = ref_data.get("application") | ||
| if app_ref and str(app_ref.get("id")) == app_id: | ||
| app_revision = ref_data.get("application_revision") | ||
| if app_revision: | ||
| current_revision_id = str( | ||
| app_revision.get("id") | ||
| ) | ||
| break | ||
|
|
||
| # If this app's deployment changed, include this revision | ||
| if current_revision_id != prev_app_revision_ids[app_id]: | ||
| if dbe not in filtered_dbes: | ||
| filtered_dbes.append(dbe) | ||
| prev_app_revision_ids[app_id] = current_revision_id | ||
|
|
||
| # Reverse back to descending order (newest first) | ||
| filtered_dbes.reverse() | ||
| revision_dbes = filtered_dbes |
There was a problem hiding this comment.
This application_refs filter is implemented in Python after fetching revisions, which can be expensive for large datasets and defeats DB-side filtering/pagination. If this is expected to be used on large projects, prefer pushing the filter down to SQL/JSONB (or at least apply windowing/limits as early as possible before materializing all revisions and scanning dbe.data).
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 74 out of 1222 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| response = await client.post( | ||
| url, | ||
| json=payload, | ||
| json=request_body, | ||
| headers=headers, | ||
| timeout=900, | ||
| ) | ||
| app_response = await response.json() | ||
| response.raise_for_status() | ||
|
|
||
| ( | ||
| value, | ||
| kind, | ||
| cost, | ||
| tokens, | ||
| latency, |
There was a problem hiding this comment.
This logs a full curl command including request headers. Since headers includes Authorization: Secret <token>, this will leak credentials into logs. Redact/remove sensitive headers (at least Authorization, cookies, API keys) before logging, or log only non-sensitive metadata (URL, scenario_id, testcase_id, request size).
| return url | ||
|
|
||
| parsed = urlparse(url) | ||
| hostname = parsed.hostname |
There was a problem hiding this comment.
Using urlparse() can fail to detect hosts for scheme-less inputs (e.g. localhost:8000 parses scheme='localhost' and hostname=None). That changes behavior vs the prior substring check and will skip localhost rewriting in those cases. Consider normalizing by prepending a default scheme when missing (e.g. http://) or adding a fallback hostname extraction when parsed.hostname is None.
| hostname = parsed.hostname | |
| hostname = parsed.hostname | |
| # Fallback for scheme-less URLs like "localhost:8000", where urlparse | |
| # sets scheme="localhost" and hostname=None. In that case, re-parse | |
| # with a default scheme to recover the hostname. | |
| if hostname is None and "://" not in url: | |
| fallback_parsed = urlparse(f"http://{url}") | |
| if fallback_parsed.hostname: | |
| hostname = fallback_parsed.hostname |
| catalog = [ | ||
| _enrich_entry( | ||
| entry, evaluator_metadata=_evaluator_metadata_by_key().get(entry["key"]) | ||
| ) | ||
| for entry in get_all_catalog_templates() | ||
| ] |
There was a problem hiding this comment.
Building the full catalog at import time performs SDK registry calls and deep-copies upfront during app startup. This can slow cold starts and makes the catalog effectively static for the process lifetime. Consider lazy initialization (build on first access) and/or caching with an explicit refresh mechanism, especially if templates can change at runtime.
| # Filter by application_refs if provided | ||
| # This filters revisions where the specified app's deployment CHANGED | ||
| # compared to the previous revision (or is the first deployment) | ||
| if application_refs: | ||
| app_ids = {str(ref.id) for ref in application_refs if ref.id} | ||
| if app_ids: | ||
| filtered_dbes = [] | ||
| prev_app_revision_ids: dict[str, str | None] = { | ||
| app_id: None for app_id in app_ids | ||
| } | ||
|
|
||
| # Revisions are ordered descending (newest first) | ||
| # We need to process in ascending order to detect changes | ||
| for dbe in reversed(revision_dbes): | ||
| if not dbe.data or not isinstance(dbe.data, dict): | ||
| continue |
There was a problem hiding this comment.
This application_refs filter happens in Python after fetching revision_dbes from the database, which can be costly for large revision histories (memory + CPU) and undermines windowing/pagination semantics. Prefer pushing this filtering into SQL (e.g., JSONB path queries and/or a separate indexable column for referenced application revisions), or structure the query so you fetch only the needed window before doing in-memory change-detection.
| "default": "correct_answer", | ||
| "type": "string", | ||
| "required": False, | ||
| "advanced": True, # Tells the frontend that this setting is advanced and should be hidden by default | ||
| "ground_truth_key": True, # Tells the frontend that is the name of the column in the testset that should be shown as a ground truth to the user | ||
| "x-ag-ui-advanced": True, | ||
| "description": "The name of the column in the test data that contains the correct answer", |
There was a problem hiding this comment.
The change from advanced → x-ag-ui-advanced and removal of ground_truth_key is potentially breaking for any frontend/client code still reading the old keys (the removed comments indicate this was actively used by the frontend). If backwards compatibility is required, consider emitting both fields during a transition period or updating clients in the same PR so the API contract remains consistent.
| ) -> List[Dict[str, Any]]: | ||
| """ | ||
| Recursively finds all occurrences of a specific key in a nested dictionary. | ||
|
|
||
| :param data: The dictionary to search. | ||
| :param target_key: The key to find. | ||
| :param path: The current path in the dictionary (for tracking locations). | ||
| :return: A list of dictionaries containing 'path' and 'value' for each occurrence. | ||
| """ | ||
| results = [] | ||
|
|
||
| if isinstance(data, dict): # If it's a dictionary, traverse it | ||
| for key, value in data.items(): | ||
| new_path = f"{path}.{key}" if path else key # Update path | ||
| if key == target_key: | ||
| results.extend(value) # Store match |
There was a problem hiding this comment.
The function’s docstring says it returns {path, value} entries, but it actually returns results.extend(value) and never records paths. Also, extend(value) will behave incorrectly if value is a string/dict (it will extend by characters/keys) and will raise if value is non-iterable. If the intended return type is list[str] (e.g., input_keys), update the signature/docstring accordingly and only extend when value is a list; otherwise append/coerce safely.
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Recursively finds all occurrences of a specific key in a nested dictionary. | |
| :param data: The dictionary to search. | |
| :param target_key: The key to find. | |
| :param path: The current path in the dictionary (for tracking locations). | |
| :return: A list of dictionaries containing 'path' and 'value' for each occurrence. | |
| """ | |
| results = [] | |
| if isinstance(data, dict): # If it's a dictionary, traverse it | |
| for key, value in data.items(): | |
| new_path = f"{path}.{key}" if path else key # Update path | |
| if key == target_key: | |
| results.extend(value) # Store match | |
| ) -> List[Any]: | |
| """ | |
| Recursively finds all occurrences of a specific key in a nested dictionary. | |
| :param data: The dictionary to search. | |
| :param target_key: The key to find. | |
| :param path: The current path in the dictionary (for tracking locations). | |
| :return: A list of values associated with each occurrence of the target key. | |
| """ | |
| results: List[Any] = [] | |
| if isinstance(data, dict): # If it's a dictionary, traverse it | |
| for key, value in data.items(): | |
| new_path = f"{path}.{key}" if path else key # Update path | |
| if key == target_key: | |
| # When the target key is found, collect its value(s) safely. | |
| if isinstance(value, list): | |
| results.extend(value) | |
| else: | |
| results.append(value) |
[feat] Extend `runs` and `queues` with multi-inputs, caching, splitting
[feat] Extend `runs` and `queues`
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 75 out of 1132 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| catalog = [ | ||
| _enrich_entry( | ||
| entry, evaluator_metadata=_evaluator_metadata_by_key().get(entry["key"]) |
There was a problem hiding this comment.
_evaluator_metadata_by_key() is recomputed for every catalog entry, which is unnecessarily expensive (deepcopy + dict build per iteration) and scales poorly with catalog size. Compute the metadata map once (e.g., assign to a local variable before the list comprehension) and reuse it.
| catalog = [ | |
| _enrich_entry( | |
| entry, evaluator_metadata=_evaluator_metadata_by_key().get(entry["key"]) | |
| _evaluator_metadata_map = _evaluator_metadata_by_key() | |
| catalog = [ | |
| _enrich_entry( | |
| entry, evaluator_metadata=_evaluator_metadata_map.get(entry["key"]) |
| # APPLICATION CATALOG ------------------------------------------------------ | ||
|
|
||
| @intercept_exceptions() | ||
| @intercept_exceptions() |
There was a problem hiding this comment.
The list_application_catalog_types endpoint is decorated with @intercept_exceptions() twice, which can lead to duplicated interception/handling (and duplicated logging). Remove one of the decorators.
| @intercept_exceptions() |
| if app_ref and str(app_ref.get("id")) == app_id: | ||
| app_revision = ref_data.get("application_revision") | ||
| if app_revision: |
There was a problem hiding this comment.
app_ref is used as a dict (app_ref.get(...)) without validating its type. If application is ever stored as a non-dict (e.g., a Pydantic model or string), this will raise at runtime. Add an isinstance(app_ref, dict) guard (and similarly for application_revision) before using .get().
| if app_ref and str(app_ref.get("id")) == app_id: | |
| app_revision = ref_data.get("application_revision") | |
| if app_revision: | |
| if isinstance(app_ref, dict) and str(app_ref.get("id")) == app_id: | |
| app_revision = ref_data.get("application_revision") | |
| if isinstance(app_revision, dict): |
|
|
||
|
|
||
| def _actual_lock_name(lock_key: str) -> str: | ||
| return caching._pack(namespace="lock", key=lock_key) |
There was a problem hiding this comment.
This relies on caching._pack, which is a private helper (leading underscore) and increases coupling to internal implementation details. Prefer a public helper for key generation (or move the required packing logic into this module) so changes in oss.src.utils.caching internals don’t break runtime locking.
| return caching._pack(namespace="lock", key=lock_key) | |
| return caching.pack(namespace="lock", key=lock_key) |
| csvdata = None | ||
| with open(str(json_path)) as f: | ||
| try: | ||
| csvdata = json.loads(f.read()) | ||
| except json.JSONDecodeError as e: | ||
| raise ValueError(f"Could not parse JSON file: {json_path}") from e | ||
| except Exception as e: | ||
| raise ValueError(f"Could not read JSON file: {json_path}") from e |
There was a problem hiding this comment.
Use json.load(f) rather than json.loads(f.read()), and specify an explicit encoding (commonly UTF-8) when opening JSON files. This avoids potential encoding-dependent failures and is the standard pattern for JSON file reads.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 74 out of 1131 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| class ApplicationFlags(WorkflowFlags): | ||
| """Application flags - is_evaluator=False means it's an application.""" | ||
| """Application flags - is_application=True, is_evaluator=False.""" | ||
|
|
||
| def __init__(self, **data): | ||
| # Applications have is_evaluator=False (forced) | ||
| data["is_evaluator"] = False | ||
| data["is_application"] = True | ||
|
|
||
| super().__init__(**data) | ||
|
|
||
|
|
||
| class ApplicationQueryFlags(WorkflowQueryFlags): | ||
| """Application query flags - filter for is_evaluator=False.""" | ||
| """Application query flags - filter for is_application=True, is_evaluator=False.""" | ||
|
|
||
| def __init__(self, **data): | ||
| # Query for non-evaluators (applications) (forced) | ||
| data["is_evaluator"] = False | ||
| data["is_application"] = True | ||
|
|
||
| super().__init__(**data) |
There was a problem hiding this comment.
Both ApplicationFlags and ApplicationQueryFlags claim is_evaluator=False but no longer enforce it. Callers could accidentally set is_evaluator=True, creating ambiguous/invalid state (or incorrect filtering). Enforce data[\"is_evaluator\"] = False (or explicitly drop/override any incoming is_evaluator) in both initializers.
| if has_error: | ||
| log.warn( | ||
| "There is an error in evaluator %s for query %s.", | ||
| annotation_step_key, | ||
| query_trace_id, | ||
| ) | ||
| step_status = EvaluationStatus.FAILURE | ||
| scenario_has_errors[idx] += 1 | ||
| scenario_status[idx] = EvaluationStatus.ERRORS |
There was a problem hiding this comment.
log.warn is deprecated (and may not exist depending on the logger implementation). Also, passing printf-style positional arguments is inconsistent with surrounding structured logging usage (log.info(..., key=value)), and can raise at runtime with some loggers. Use log.warning(...) and either an f-string message or structured fields (e.g., evaluator_key=..., query_trace_id=...).
| log.warning( | ||
| "No results found for step_key: %s", | ||
| step_key, | ||
| run_id=run_id, | ||
| scenario_id=scenario_id, | ||
| timestamp=timestamp, | ||
| interval=interval, | ||
| ) |
There was a problem hiding this comment.
This mixes printf-style formatting args with structured keyword fields. Depending on the logger implementation, this can throw (positional args not supported) or drop/mangle fields. Use a single style consistently: either format the message yourself (no positional args) or log a plain message and put step_key in structured fields.
| log.info( | ||
| "retrieve_environment_revision: environment_ref=%r environment_variant_ref=%r environment_revision_ref=%r resolve=%r", | ||
| environment_ref, | ||
| environment_variant_ref, | ||
| environment_revision_ref, | ||
| resolve, | ||
| ) |
There was a problem hiding this comment.
This uses positional printf-style logging arguments, while the surrounding codebase uses structured logging (e.g., log.info(msg, key=value)). If get_module_logger returns a structured logger, this can raise at runtime or produce poorly formatted logs. Prefer log.info(\"retrieve_environment_revision\", environment_ref=..., environment_variant_ref=..., environment_revision_ref=..., resolve=...) (or equivalent structured fields).
| return caching._pack(namespace="lock", key=lock_key) | ||
|
|
||
|
|
||
| def _actual_meta_name(lock_key: str) -> str: | ||
| return f"{_actual_lock_name(lock_key)}:meta" | ||
|
|
||
|
|
There was a problem hiding this comment.
This relies on a private caching API (caching._pack). Private helpers are not stable and can change without notice, which makes the lock implementation fragile. Expose a supported public helper in oss.src.utils.caching for packing lock keys (or reuse the same public packing code path used by acquire_lock), and use that here.
| return caching._pack(namespace="lock", key=lock_key) | |
| def _actual_meta_name(lock_key: str) -> str: | |
| return f"{_actual_lock_name(lock_key)}:meta" | |
| # Use the public caching helper for packing lock keys instead of the private _pack API. | |
| return caching.pack_lock_key(lock_key) | |
| def _actual_meta_name(lock_key: str) -> str: | |
| return f"{_actual_lock_name(lock_key)}:meta" |
| # If this app's deployment changed, include this revision | ||
| if current_revision_id != prev_app_revision_ids[app_id]: | ||
| if dbe not in filtered_dbes: | ||
| filtered_dbes.append(dbe) | ||
| prev_app_revision_ids[app_id] = current_revision_id |
There was a problem hiding this comment.
if dbe not in filtered_dbes is an O(n) membership check inside nested loops, making this section O(n^2) for many revisions/apps. Track included revisions by id (or by id(dbe)), using a set alongside the list, to keep membership checks O(1).
No description provided.