Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions adaptive_runtime/core/confidence_engine.py.bak
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""
Confidence Engine - adaptive probabilistic confidence scoring.
"""

import math
from collections import deque
from dataclasses import dataclass, field
from typing import Deque

from pydantic import BaseModel
from ..observability.logger import get_logger
from ..observability.metrics import metrics

logger = get_logger("confidence_engine")


@dataclass
class OutcomeRecord:
success: bool
confidence_at_time: float
context_risk: str


class ConfidenceResult(BaseModel):
confidence: float
decay_factor: float
history_weight: float
context_adjustment: float
final: float


class ConfidenceEngine:
"""
Calculates adaptive confidence using:
- base score from event severity
- contextual adjustment from risk level
- historical outcome weighting (exponential decay)
- configurable decay over time
"""

RISK_WEIGHTS = {
"low": 1.00,
"medium": 0.90,
"high": 0.75,
"critical": 0.55,
}

def __init__(
self,
base_confidence: float = 0.75,
decay_rate: float = 0.05,
history_window: int = 50,
):
self._base = base_confidence
self._decay_rate = decay_rate
self._history: Deque[OutcomeRecord] = deque(maxlen=history_window)
self._call_count = 0


def calculate(self, event: dict, context_risk: str) -> ConfidenceResult:
self._call_count += 1

severity = float(event.get("severity", 0.5))
base = max(0.1, self._base - severity * 0.2)

decay = self._decay_factor()
hist_weight = self._history_weight(context_risk)
ctx_adj = self.RISK_WEIGHTS.get(context_risk, 0.8)

raw = base * decay * hist_weight * ctx_adj
final = round(min(max(raw, 0.05), 0.99), 4)

result = ConfidenceResult(
confidence=round(base, 4),
decay_factor=round(decay, 4),
history_weight=round(hist_weight, 4),
context_adjustment=round(ctx_adj, 4),
final=final,
)
metrics.record("confidence.final", final)
logger.info(
"Confidence → base=%.2f decay=%.2f hist=%.2f ctx=%.2f final=%.4f",
base, decay, hist_weight, ctx_adj, final,
)
return result

def record_outcome(self, success: bool, confidence: float, context_risk: str) -> None:
"""Feed outcome back for adaptive weighting."""
self._history.append(OutcomeRecord(
success=success,
confidence_at_time=confidence,
context_risk=context_risk,
))
logger.debug("Outcome recorded: success=%s conf=%.3f", success, confidence)


def _decay_factor(self) -> float:
"""Confidence decays slightly as call volume grows (simulate drift)."""
return math.exp(-self._decay_rate * (self._call_count // 20))

def _history_weight(self, context_risk: str) -> float:
"""Adjust weight based on past success rate for the same risk tier."""
relevant = [r for r in self._history if r.context_risk == context_risk]
if len(relevant) < 3:
return 1.0
success_rate = sum(1 for r in relevant if r.success) / len(relevant)
# Map [0, 1] success rate → [0.6, 1.1] weight
return 0.6 + success_rate * 0.5


132 changes: 132 additions & 0 deletions adaptive_runtime/core/context_engine.py.bak
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
Context Engine - transforms raw events into contextual understanding.
"""

from pydantic import BaseModel, Field
from ..observability.logger import get_logger
from ..observability.metrics import metrics

logger = get_logger("context_engine")



class RawEvent(BaseModel):
type: str
severity: float = 0.0
cpu: float = 0.0
memory: float = 0.0
error_rate: float = 0.0
latency_ms: float = 0.0
extra: dict = Field(default_factory=dict)


class ContextResult(BaseModel):
risk: str # low | medium | high | critical
stability: str # stable | degraded | low | critical
context: str # symbolic label
pressure_score: float
tags: list[str]



_RISK_RULES: list[tuple[float, str]] = [
(0.85, "critical"),
(0.60, "high"),
(0.35, "medium"),
(0.00, "low"),
]

_STABILITY_RULES: list[tuple[float, str]] = [
(0.85, "critical"),
(0.65, "low"),
(0.40, "degraded"),
(0.00, "stable"),
]

_CONTEXT_MAP: dict[str, str] = {
"service_overload": "resource_pressure",
"anomaly_detected": "anomaly_signal",
"memory_leak": "resource_pressure",
"timeout": "latency_issue",
"auth_failure": "security_signal",
"recovery_needed": "self_healing",
"degraded_service": "service_degradation",
}



class ContextEngine:
"""
Classifies events and scores runtime context.

Uses lightweight rule-based scoring - no ML dependencies.
"""

def __init__(self, custom_context_map: dict[str, str] | None = None):
self._ctx_map = {**_CONTEXT_MAP, **(custom_context_map or {})}

def analyze(self, event: dict) -> ContextResult:
raw = RawEvent(**{k: v for k, v in event.items() if k in RawEvent.model_fields or k == "extra"},
extra={k: v for k, v in event.items() if k not in RawEvent.model_fields})

pressure = self._pressure_score(raw)
risk = self._classify(pressure, _RISK_RULES)
stability = self._classify(pressure, _STABILITY_RULES)
context_label = self._ctx_map.get(raw.type, f"unknown_{raw.type}")
tags = self._build_tags(raw, risk)

result = ContextResult(
risk=risk,
stability=stability,
context=context_label,
pressure_score=round(pressure, 4),
tags=tags,
)

metrics.record("context.pressure", pressure)
logger.info(
"Context → risk=%s stability=%s ctx=%s pressure=%.2f",
risk, stability, context_label, pressure,
)
return result


def _pressure_score(self, raw: RawEvent) -> float:
"""Weighted composite score in [0, 1]."""
weights = {
"severity": 0.35,
"cpu": 0.20,
"memory": 0.20,
"error_rate": 0.15,
"latency_ms": 0.10,
}
score = (
raw.severity * weights["severity"]
+ (raw.cpu / 100) * weights["cpu"]
+ (raw.memory / 100) * weights["memory"]
+ raw.error_rate * weights["error_rate"]
+ min(raw.latency_ms / 5000, 1.0) * weights["latency_ms"]
)
return min(score, 1.0)

@staticmethod
def _classify(score: float, rules: list[tuple[float, str]]) -> str:
for threshold, label in rules:
if score >= threshold:
return label
return rules[-1][1]

@staticmethod
def _build_tags(raw: RawEvent, risk: str) -> list[str]:
tags = [f"risk:{risk}", f"event:{raw.type}"]
if raw.cpu > 80:
tags.append("cpu:high")
if raw.memory > 80:
tags.append("memory:high")
if raw.error_rate > 0.5:
tags.append("errors:elevated")
if raw.latency_ms > 2000:
tags.append("latency:high")
return tags


96 changes: 96 additions & 0 deletions adaptive_runtime/core/decision_engine.py.bak
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""
Decision Engine - generates adaptive runtime decisions.
"""

from pydantic import BaseModel
from ..observability.logger import get_logger
from ..observability.metrics import metrics

logger = get_logger("decision_engine")


class DecisionResult(BaseModel):
action: str
confidence: float
reason: str
priority: str # low | normal | high | critical
metadata: dict = {}


# Each rule: (context_label, risk_level, min_confidence) → action
# Evaluated top-to-bottom; first match wins.

_RULES: list[tuple[str | None, str | None, float, str, str]] = [
# (context, risk, min_conf, action, reason)
("resource_pressure", "critical", 0.0, "scale_up_immediate", "critical_resource_pressure"),
("resource_pressure", "high", 0.70, "restart_service", "high_resource_pressure"),
("resource_pressure", "high", 0.0, "throttle_requests", "resource_pressure_low_confidence"),
("resource_pressure", "medium", 0.0, "optimize_resources", "medium_resource_pressure"),
("anomaly_signal", "critical", 0.0, "isolate_component", "critical_anomaly"),
("anomaly_signal", "high", 0.65, "rollback_deployment", "anomaly_with_confidence"),
("anomaly_signal", "high", 0.0, "increase_monitoring", "anomaly_low_confidence"),
("anomaly_signal", None, 0.0, "flag_for_review", "anomaly_detected"),
("latency_issue", "high", 0.0, "optimize_query", "latency_degradation"),
("latency_issue", None, 0.0, "cache_warmup", "latency_signal"),
("security_signal", None, 0.0, "trigger_security_audit", "security_event"),
("self_healing", None, 0.0, "run_recovery", "recovery_requested"),
("service_degradation","high", 0.0, "circuit_breaker_open", "service_degraded_high"),
("service_degradation", None, 0.0, "health_check", "service_degraded"),
]

_PRIORITY_MAP = {
"critical": "critical",
"high": "high",
"medium": "normal",
"low": "low",
}


class DecisionEngine:
"""
Matches context + risk + confidence against a rule table to produce
a ranked, explainable action. No ML required.
"""

def __init__(self, custom_rules: list | None = None):
self._rules = (custom_rules or []) + _RULES

def decide(
self,
event: dict,
context_label: str,
risk: str,
confidence: float,
) -> DecisionResult:
action, reason = self._match(context_label, risk, confidence)
priority = _PRIORITY_MAP.get(risk, "normal")

result = DecisionResult(
action=action,
confidence=round(confidence, 4),
reason=reason,
priority=priority,
metadata={
"event_type": event.get("type"),
"context": context_label,
"risk": risk,
},
)
metrics.record("decision.confidence", confidence)
logger.info(
"Decision → action=%s confidence=%.3f reason=%s priority=%s",
action, confidence, reason, priority,
)
return result


def _match(self, context: str, risk: str, confidence: float) -> tuple[str, str]:
for ctx_rule, risk_rule, min_conf, action, reason in self._rules:
ctx_ok = ctx_rule is None or ctx_rule == context
risk_ok = risk_rule is None or risk_rule == risk
conf_ok = confidence >= min_conf
if ctx_ok and risk_ok and conf_ok:
return action, reason
return "monitor_and_wait", "no_matching_rule"


52 changes: 52 additions & 0 deletions adaptive_runtime/runtime/cache.py.bak
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""
Lightweight TTL-based in-memory cache for the runtime.
"""

import time
from threading import Lock


class TTLCache:
"""
Simple thread-safe in-memory cache with per-entry TTL.
Suitable for deduplicating events, caching context results, etc.
"""

def __init__(self, default_ttl: float = 60.0):
self._store: dict[str, tuple[any, float]] = {} # key → (value, expires_at)
self._default_ttl = default_ttl
self._lock = Lock()

def set(self, key: str, value, ttl: float | None = None) -> None:
expires = time.monotonic() + (ttl if ttl is not None else self._default_ttl)
with self._lock:
self._store[key] = (value, expires)

def get(self, key: str):
with self._lock:
entry = self._store.get(key)
if entry is None:
return None
value, expires = entry
if time.monotonic() > expires:
del self._store[key]
return None
return value

def delete(self, key: str) -> None:
with self._lock:
self._store.pop(key, None)

def purge_expired(self) -> int:
now = time.monotonic()
with self._lock:
expired = [k for k, (_, exp) in self._store.items() if now > exp]
for k in expired:
del self._store[k]
return len(expired)

def __len__(self) -> int:
self.purge_expired()
return len(self._store)


Loading
Loading