Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion api/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
from fastapi_pagination.ext.motor import paginate
from motor import motor_asyncio
from redis import asyncio as aioredis
from kernelci.api.models import EventHistory, Hierarchy, Node, parse_node_obj
from kernelci.api.models import (

Check failure on line 14 in api/db.py

View workflow job for this annotation

GitHub Actions / Lint

Unable to import 'kernelci.api.models'
EventHistory, Hierarchy, Node, TelemetryEvent, parse_node_obj
)
from .models import User, UserGroup


Expand All @@ -28,6 +30,7 @@
Node: 'node',
UserGroup: 'usergroup',
EventHistory: 'eventhistory',
TelemetryEvent: 'telemetry',
}

OPERATOR_MAP = {
Expand Down Expand Up @@ -242,6 +245,12 @@
obj.id = res.inserted_id
return obj

async def insert_many(self, model, documents):
"""Create multiple documents in a collection."""
col = self._get_collection(model)
result = await col.insert_many(documents)
return result.inserted_ids

async def _create_recursively(self, hierarchy: Hierarchy, parent: Node,
cls, col):
obj = parse_node_obj(hierarchy.node)
Expand Down Expand Up @@ -294,6 +303,12 @@
raise ValueError(f"No object found with id: {obj.id}")
return obj.__class__(**await col.find_one(ObjectId(obj.id)))

async def aggregate(self, model, pipeline):
"""Run an aggregation pipeline on a model's collection"""
col = self._get_collection(model)
cursor = col.aggregate(pipeline)
return await cursor.to_list(length=None)
Comment on lines +306 to +310
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The aggregate method returns all results with length=None, which could lead to memory issues if the aggregation returns a large dataset. Consider adding a reasonable limit or documenting that callers should use appropriate $limit stages in their pipelines to avoid exhausting memory.

Suggested change
async def aggregate(self, model, pipeline):
"""Run an aggregation pipeline on a model's collection"""
col = self._get_collection(model)
cursor = col.aggregate(pipeline)
return await cursor.to_list(length=None)
async def aggregate(self, model, pipeline, *, max_results: int | None = 1000):
"""Run an aggregation pipeline on a model's collection
:param model: Document model whose collection will be queried.
:param pipeline: MongoDB aggregation pipeline.
:param max_results: Maximum number of documents to materialize from the
aggregation cursor. Defaults to 1000. Set to ``None`` only if the
aggregation pipeline already includes an appropriate ``$limit`` stage
or if the caller can otherwise guarantee that the result set is
reasonably bounded, as unbounded aggregations can exhaust memory.
"""
col = self._get_collection(model)
cursor = col.aggregate(pipeline)
# Use a reasonable default limit to avoid loading unbounded result sets
# into memory. Callers that need a different bound can override
# ``max_results``, or explicitly set it to ``None`` if they are sure
# the pipeline is already suitably constrained.
return await cursor.to_list(length=max_results)

Copilot uses AI. Check for mistakes.

async def delete_by_id(self, model, obj_id):
"""Delete one object matching a given id"""
col = self._get_collection(model)
Expand Down
319 changes: 319 additions & 0 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@
from pydantic import BaseModel
from jose import jwt
from jose.exceptions import JWTError
from kernelci.api.models import (

Check failure on line 49 in api/main.py

View workflow job for this annotation

GitHub Actions / Lint

Unable to import 'kernelci.api.models'
Node,
Hierarchy,
PublishEvent,
parse_node_obj,
KernelVersion,
EventHistory,
TelemetryEvent,
)
from .auth import Authentication
from .db import Database
Expand Down Expand Up @@ -953,6 +954,324 @@
return JSONResponse(content=json_comp)


# -----------------------------------------------------------------------------
# Telemetry of pipeline execution and other events(not node stuff).
# This is a separate collection from
# EventHistory since it may have a much higher volume and different
# query patterns and allows us to optimize indexes and storage
# separately.

@app.post('/telemetry', response_model=dict, tags=["telemetry"])
async def post_telemetry(
events: List[dict],
current_user: User = Depends(get_current_user),
):
"""Bulk insert telemetry events.

Accepts a list of telemetry event dicts. Each event must have at
least 'kind' and 'runtime' fields. Events are validated against
the TelemetryEvent model before insertion.
"""
metrics.add('http_requests_total', 1)
if not events:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Events list cannot be empty",
)
docs = []
for event in events:
try:
obj = TelemetryEvent(**event)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid telemetry event: {exc}",
) from exc
Comment on lines +983 to +989
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broad exception handling could expose internal implementation details. Catching all Exceptions and including the exception message in the HTTP response might leak sensitive information about the system's internal structure or validation logic. Consider catching specific Pydantic ValidationError instead and providing a sanitized error message.

Copilot uses AI. Check for mistakes.
doc = obj.model_dump(by_alias=True)
doc.pop('_id', None)
docs.append(doc)
inserted_ids = await db.insert_many(TelemetryEvent, docs)
return {"inserted": len(inserted_ids)}
Comment on lines +964 to +994
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new POST /telemetry endpoint lacks test coverage. Given that the codebase has comprehensive test coverage for other API endpoints (e.g., test_node_handler.py, test_events_handler.py, test_user_handler.py), tests should be added to verify authentication requirements, validation logic, and error handling for invalid events.

Copilot uses AI. Check for mistakes.


@app.get('/telemetry', response_model=PageModel, tags=["telemetry"])
async def get_telemetry(request: Request):
"""Query telemetry events with optional filters.

Supports filtering by any TelemetryEvent field, plus time range
via 'since' and 'until' parameters (ISO 8601 format).
Results are paginated (default limit=50).
"""
metrics.add('http_requests_total', 1)
query_params = dict(request.query_params)

for pg_key in ['limit', 'offset']:
query_params.pop(pg_key, None)

since = query_params.pop('since', None)
until = query_params.pop('until', None)
if since or until:
ts_filter = {}
if since:
ts_filter['$gte'] = datetime.fromisoformat(since)
if until:
ts_filter['$lte'] = datetime.fromisoformat(until)
Comment on lines +1015 to +1018
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing error handling for datetime.fromisoformat(). If 'since' or 'until' parameters contain invalid ISO 8601 format strings, this will raise a ValueError that is not caught, resulting in an unhandled 500 error instead of a user-friendly 400 Bad Request. Wrap these calls in a try-except block and raise HTTPException with status 400.

Suggested change
if since:
ts_filter['$gte'] = datetime.fromisoformat(since)
if until:
ts_filter['$lte'] = datetime.fromisoformat(until)
try:
if since:
ts_filter['$gte'] = datetime.fromisoformat(since)
if until:
ts_filter['$lte'] = datetime.fromisoformat(until)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid 'since' or 'until' timestamp; expected ISO 8601 format.",
) from exc

Copilot uses AI. Check for mistakes.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

query_params['ts'] = ts_filter

# Convert string 'true'/'false' for boolean fields
if 'is_infra_error' in query_params:
val = query_params['is_infra_error'].lower()
if val not in ['true', 'false']:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Bad is_infra_error value, use 'true' or 'false'",
)
if val == 'true':
query_params['is_infra_error'] = True
else:
query_params['is_infra_error'] = False

paginated_resp = await db.find_by_attributes(
TelemetryEvent, query_params
)
paginated_resp.items = serialize_paginated_data(
TelemetryEvent, paginated_resp.items
)
return paginated_resp


TELEMETRY_STATS_GROUP_FIELDS = {
'runtime', 'device_type', 'job_name', 'tree', 'branch',
'arch', 'kind', 'error_type',
}


@app.get('/telemetry/stats', tags=["telemetry"])
async def get_telemetry_stats(request: Request):
"""Get aggregated telemetry statistics.

This is rule-based anomaly detection using
thresholded empirical rates computed over
a sliding (rolling) time window.
This is not a full anomaly detection system
with baselines or machine learning, but at
last something to start with.

Query parameters:
- group_by: Comma-separated fields to group by
(runtime, device_type, job_name, tree, branch, arch,
kind, error_type)
- kind: Filter by event kind before aggregating
- runtime: Filter by runtime name
- since/until: Time range (ISO 8601)

Returns grouped counts with pass/fail/incomplete/infra_error
breakdowns for result-bearing events.
"""
Comment on lines +1051 to +1070
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misleading or incorrect documentation. The docstring states "This is rule-based anomaly detection using thresholded empirical rates..." but this endpoint (get_telemetry_stats) only provides aggregated statistics without any anomaly detection logic. Anomaly detection is actually performed by the get_telemetry_anomalies endpoint. This docstring should describe the stats aggregation functionality, not anomaly detection.

Copilot uses AI. Check for mistakes.
metrics.add('http_requests_total', 1)
query_params = dict(request.query_params)

group_by_str = query_params.pop('group_by', None)
if not group_by_str:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="'group_by' parameter is required",
)
group_by = [f.strip() for f in group_by_str.split(',')]
invalid = set(group_by) - TELEMETRY_STATS_GROUP_FIELDS
if invalid:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid group_by fields: {invalid}",
)

match_stage = {
key: query_params.pop(key)
for key in ('kind', 'runtime', 'device_type', 'job_name',
'tree', 'branch', 'arch')
if query_params.get(key)
}

since = query_params.pop('since', None)
until = query_params.pop('until', None)
if since or until:
match_stage['ts'] = {
**({'$gte': datetime.fromisoformat(since)} if since else {}),
**({'$lte': datetime.fromisoformat(until)} if until else {}),
Comment on lines +1098 to +1100
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing error handling for datetime.fromisoformat(). If 'since' or 'until' parameters contain invalid ISO 8601 format strings, this will raise a ValueError that is not caught, resulting in an unhandled 500 error instead of a user-friendly 400 Bad Request. Wrap these calls in a try-except block and raise HTTPException with status 400.

Suggested change
match_stage['ts'] = {
**({'$gte': datetime.fromisoformat(since)} if since else {}),
**({'$lte': datetime.fromisoformat(until)} if until else {}),
since_dt = None
until_dt = None
try:
if since:
since_dt = datetime.fromisoformat(since)
if until:
until_dt = datetime.fromisoformat(until)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid 'since' or 'until' datetime format; expected ISO 8601"
) from exc
match_stage['ts'] = {
**({'$gte': since_dt} if since_dt is not None else {}),
**({'$lte': until_dt} if until_dt is not None else {}),

Copilot uses AI. Check for mistakes.
}

pipeline = [{'$match': match_stage}] if match_stage else []
pipeline.append({
'$group': {
'_id': {f: f'${f}' for f in group_by},
'total': {'$sum': 1},
'pass': {'$sum': {
'$cond': [{'$eq': ['$result', 'pass']}, 1, 0]
}},
'fail': {'$sum': {
'$cond': [{'$eq': ['$result', 'fail']}, 1, 0]
}},
'incomplete': {'$sum': {
'$cond': [{'$eq': ['$result', 'incomplete']}, 1, 0]
}},
'skip': {'$sum': {
'$cond': [{'$eq': ['$result', 'skip']}, 1, 0]
}},
'infra_error': {'$sum': {
'$cond': ['$is_infra_error', 1, 0]
}},
}
})
pipeline.append({'$sort': {'total': -1}})

results = await db.aggregate(TelemetryEvent, pipeline)

results = await db.aggregate(TelemetryEvent, pipeline)
Comment on lines +1128 to +1129
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate database aggregation call. The same aggregation pipeline is executed twice on consecutive lines. Remove the first call on line 1127.

Suggested change
results = await db.aggregate(TelemetryEvent, pipeline)

Copilot uses AI. Check for mistakes.
return JSONResponse(content=jsonable_encoder([
{
**doc['_id'].copy(),
'total': doc['total'],
'pass': doc['pass'],
'fail': doc['fail'],
'incomplete': doc['incomplete'],
'skip': doc['skip'],
'infra_error': doc['infra_error'],
}
for doc in results
]))

# This is test value, can adjust based on expected query patterns and volumes.
ANOMALY_WINDOW_MAP = {
'1h': 1, '3h': 3, '6h': 6, '12h': 12, '24h': 24, '48h': 48,
}


@app.get('/telemetry/anomalies', tags=["telemetry"])
async def get_telemetry_anomalies(
window: str = Query(
'6h', description='Time window: 1h, 3h, 6h, 12h, 24h, 48h'
),
threshold: float = Query(
0.5, ge=0.0, le=1.0,
description='Min failure/infra error rate to flag (0.0-1.0)'
),
min_total: int = Query(
3, ge=1,
description='Min events in window to consider (avoids noise)'
),
):
"""Detect anomalies in telemetry data.

Finds runtime+device_type combinations where the infra error
rate or failure rate exceeds the threshold within the given
time window. Also detects runtimes with submission errors.

Returns a list sorted by severity (highest error rate first).
"""
metrics.add('http_requests_total', 1)

hours = ANOMALY_WINDOW_MAP.get(window)
if not hours:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid window '{window}'. "
f"Use: {', '.join(ANOMALY_WINDOW_MAP.keys())}",
)
since = datetime.utcnow() - timedelta(hours=hours)
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datetime.utcnow() is deprecated as of Python 3.12 in favor of datetime.now(timezone.utc). This method is scheduled for removal in a future Python version. Consider using datetime.now(timezone.utc) instead for timezone-aware datetime objects.

Suggested change
since = datetime.utcnow() - timedelta(hours=hours)
since = datetime.now(timezone.utc) - timedelta(hours=hours)

Copilot uses AI. Check for mistakes.

# Anomaly 1: High infra error / failure rate per runtime+device_type
result_pipeline = [
{'$match': {
'kind': {'$in': ['job_result', 'test_result']},
'ts': {'$gte': since},
}},
{'$group': {
'_id': {
'runtime': '$runtime',
'device_type': '$device_type',
},
'total': {'$sum': 1},
'fail': {'$sum': {
'$cond': [{'$eq': ['$result', 'fail']}, 1, 0]
}},
'incomplete': {'$sum': {
'$cond': [{'$eq': ['$result', 'incomplete']}, 1, 0]
}},
'infra_error': {'$sum': {
'$cond': ['$is_infra_error', 1, 0]
}},
}},
{'$match': {'total': {'$gte': min_total}}},
{'$addFields': {
'infra_rate': {
'$divide': ['$infra_error', '$total']
},
'fail_rate': {
'$divide': [
{'$add': ['$fail', '$incomplete']}, '$total'
]
},
}},
{'$match': {
'$or': [
{'infra_rate': {'$gte': threshold}},
{'fail_rate': {'$gte': threshold}},
]
}},
{'$sort': {'infra_rate': -1, 'fail_rate': -1}},
]

# Anomaly 2: Submission/connectivity errors per runtime
error_pipeline = [
{'$match': {
'kind': {'$in': ['runtime_error', 'job_skip']},
'ts': {'$gte': since},
}},
{'$group': {
'_id': {
'runtime': '$runtime',
'error_type': '$error_type',
},
'count': {'$sum': 1},
}},
{'$match': {'count': {'$gte': min_total}}},
{'$sort': {'count': -1}},
]

result_anomalies = await db.aggregate(
TelemetryEvent, result_pipeline
)
error_anomalies = await db.aggregate(
TelemetryEvent, error_pipeline
)

output = {
'window': window,
'threshold': threshold,
'min_total': min_total,
'since': since.isoformat(),
'result_anomalies': [],
'error_anomalies': [],
}

for doc in result_anomalies:
row = doc['_id'].copy()
row['total'] = doc['total']
row['fail'] = doc['fail']
row['incomplete'] = doc['incomplete']
row['infra_error'] = doc['infra_error']
row['infra_rate'] = round(doc['infra_rate'], 3)
row['fail_rate'] = round(doc['fail_rate'], 3)
output['result_anomalies'].append(row)

for doc in error_anomalies:
row = doc['_id'].copy()
row['count'] = doc['count']
output['error_anomalies'].append(row)

return JSONResponse(content=jsonable_encoder(output))


# -----------------------------------------------------------------------------
# Nodes
def _get_node_event_data(operation, node, is_hierarchy=False):
Expand Down