From e7e0ab9c90cde992e48074eacb800b8e8cc1a6e1 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 27 Nov 2025 10:09:06 +0000 Subject: [PATCH 1/2] feat: Add ICE metrics and coordinate blending utilities This commit introduces new ICE metrics for execution intensity and alignment pressure. It also adds coordinate blending and weighted average utilities to `CoordinateUtils`. Co-authored-by: taurekaw --- harmonizer/coordinate_utils.py | 155 +++++++++++++++++- harmonizer/divine_invitation_engine_V2.py | 58 +++++++ harmonizer/main.py | 189 ++++++++++++++++++---- 3 files changed, 368 insertions(+), 34 deletions(-) diff --git a/harmonizer/coordinate_utils.py b/harmonizer/coordinate_utils.py index a104987..61d4303 100644 --- a/harmonizer/coordinate_utils.py +++ b/harmonizer/coordinate_utils.py @@ -8,9 +8,9 @@ """ import math -from typing import Tuple +from typing import Iterable, Optional, Tuple -from harmonizer.divine_invitation_engine_V2 import Coordinates +from harmonizer.divine_invitation_engine_V2 import Coordinates, Dimension class CoordinateUtils: @@ -194,6 +194,157 @@ def from_tuple(coord_tuple: Tuple[float, float, float, float]) -> Coordinates: wisdom=coord_tuple[3], ) + @staticmethod + def blend( + base: Coordinates, + overlay: Coordinates, + ratio: float = 0.5, + ) -> Coordinates: + """ + Blend two coordinates together with the supplied ratio. + + Args: + base: Primary coordinate that receives the overlay. + overlay: Secondary coordinate to blend into the base. + ratio: Value between 0.0 and 1.0 indicating overlay strength. + + Returns: + Blended Coordinates object. + """ + ratio = max(0.0, min(1.0, ratio)) + inverse = 1.0 - ratio + return Coordinates( + love=(base.love * inverse) + (overlay.love * ratio), + justice=(base.justice * inverse) + (overlay.justice * ratio), + power=(base.power * inverse) + (overlay.power * ratio), + wisdom=(base.wisdom * inverse) + (overlay.wisdom * ratio), + ) + + @staticmethod + def weighted_average( + coords: Iterable[Coordinates], + weights: Optional[Iterable[float]] = None, + ) -> Coordinates: + """ + Compute a weighted average of coordinates. + + Args: + coords: Iterable of Coordinates objects. + weights: Optional iterable of weights (defaults to equal weighting). + + Returns: + Coordinates representing the weighted average. + """ + coords = list(coords) + if not coords: + return Coordinates(0.0, 0.0, 0.0, 0.0) + + if weights is None: + weights = [1.0] * len(coords) + else: + weights = list(weights) + if len(weights) != len(coords): + raise ValueError("weights length must match coords length") + + total_weight = sum(weights) + if total_weight == 0.0: + return Coordinates(0.0, 0.0, 0.0, 0.0) + + love_sum = justice_sum = power_sum = wisdom_sum = 0.0 + for coord, weight in zip(coords, weights): + love_sum += coord.love * weight + justice_sum += coord.justice * weight + power_sum += coord.power * weight + wisdom_sum += coord.wisdom * weight + + return Coordinates( + love=love_sum / total_weight, + justice=justice_sum / total_weight, + power=power_sum / total_weight, + wisdom=wisdom_sum / total_weight, + ) + + @staticmethod + def ensure_power_floor( + coord: Tuple[float, float, float, float], + minimum_power: float = 0.2, + ) -> Tuple[float, float, float, float]: + """ + Ensure that a coordinate tuple has at least ``minimum_power`` in the power dimension. + + If power is already at or above the floor, the coordinate is returned unchanged. + Otherwise the deficit is proportionally reallocated from the remaining dimensions. + """ + love, justice, power, wisdom = coord + if power >= minimum_power: + return coord + + deficit = minimum_power - power + remaining = love + justice + wisdom + if remaining == 0.0: + # Nothing to redistribute, so just set the floor directly. + return (0.0, 0.0, minimum_power, 0.0) + + love_ratio = love / remaining + justice_ratio = justice / remaining + wisdom_ratio = wisdom / remaining + + love -= deficit * love_ratio + justice -= deficit * justice_ratio + wisdom -= deficit * wisdom_ratio + power = minimum_power + + # Normalize to keep the tuple on the unit simplex. + total = love + justice + power + wisdom + if total == 0.0: + return (0.0, 0.0, power, 0.0) + + return (love / total, justice / total, power / total, wisdom / total) + + @staticmethod + def prioritize_dimension( + coord: Tuple[float, float, float, float], + dimension: Dimension, + boost: float = 0.05, + ) -> Tuple[float, float, float, float]: + """ + Aggressively boosts a selected dimension by reallocating weight from others. + + Args: + coord: Original coordinate tuple. + dimension: Dimension enum value to prioritize (e.g., Dimension.POWER). + boost: Amount to move into the prioritized dimension. + + Returns: + Tuple with the prioritized dimension amplified. + """ + index_map = { + Dimension.LOVE: 0, + Dimension.JUSTICE: 1, + Dimension.POWER: 2, + Dimension.WISDOM: 3, + } + if dimension not in index_map: + raise ValueError("dimension must be a primary LJPW dimension") + + values = list(coord) + idx = index_map[dimension] + boost = max(0.0, boost) + available_indices = [i for i in range(4) if i != idx] + available_total = sum(values[i] for i in available_indices) + if available_total == 0.0: + values[idx] = min(1.0, values[idx] + boost) + else: + for i in available_indices: + share = boost * (values[i] / available_total) + values[i] = max(0.0, values[i] - share) + values[idx] = min(1.0, values[idx] + boost) + + total = sum(values) + if total == 0.0: + return (0.0, 0.0, 0.0, 0.0) + return tuple(v / total for v in values) + @staticmethod def get_dominant_dimension(coord: Tuple[float, float, float, float]) -> str: """ diff --git a/harmonizer/divine_invitation_engine_V2.py b/harmonizer/divine_invitation_engine_V2.py index fd6e800..17c776e 100644 --- a/harmonizer/divine_invitation_engine_V2.py +++ b/harmonizer/divine_invitation_engine_V2.py @@ -704,6 +704,9 @@ def analyze_ice( context_result.coordinates, execution_result.coordinates ) + execution_intensity = self._calculate_execution_intensity(execution_result) + alignment_pressure = self._calculate_alignment_pressure(intent_result, execution_result) + # Calculate ICE metrics avg_disharmony = (intent_context_dist + intent_exec_dist + context_exec_dist) / 3.0 ice_coherence = max(0.0, 1.0 - (avg_disharmony / 2.0)) @@ -767,6 +770,8 @@ def analyze_ice( "ice_coherence": ice_coherence, "ice_balance": ice_balance, "benevolence_score": benevolence_score, + "execution_intensity": execution_intensity, + "alignment_pressure": alignment_pressure, "intent_execution_disharmony": intent_exec_dist, # Baseline-enhanced metrics "baseline_disharmony": baseline_disharmony, @@ -841,6 +846,36 @@ def _determine_ice_harmony_level(self, coherence: float, balance: float) -> str: else: return "POOR_ICE_BALANCE" + def _calculate_execution_intensity(self, execution_result: SemanticResult) -> float: + """ + Measure how forceful the execution semantics are. + + High concept counts with strong power coordinates produce higher intensity. + """ + coords = execution_result.coordinates + base_intensity = coords.power + if execution_result.concept_count == 0: + return 0.0 + # Scale up with concept density but clamp to 1.0 + density_bonus = min(1.0, execution_result.concept_count / 10.0) + return min(1.0, base_intensity * 0.6 + density_bonus * 0.4) + + def _calculate_alignment_pressure( + self, intent_result: SemanticResult, execution_result: SemanticResult + ) -> float: + """ + Determine how much force is required to align execution with intent. + + Larger distances and weak execution intensity increase the pressure score. + """ + distance = self.vocab.get_distance(intent_result.coordinates, execution_result.coordinates) + intensity = self._calculate_execution_intensity(execution_result) + if intensity == 0.0: + return min(1.0, distance) + # When execution is already forceful, pressure drops significantly + pressure = distance / (1.0 + (intensity * 2.0)) + return min(1.0, pressure) + class PhiOptimizer: """Optimized phi-enhanced mathematical optimization""" @@ -987,6 +1022,29 @@ def perform_phi_optimization(self, concepts: List[str]) -> Dict: """Perform phi-enhanced optimization""" return self.phi_optimizer.calculate_phi_optimization(concepts) + def evaluate_action_gap( + self, + intent_words: List[str], + execution_words: List[str], + context_words: Optional[List[str]] = None, + ) -> Dict: + """ + Rapid assessment focused on action: quantifies how forcefully code executes intent. + + Returns execution intensity, alignment pressure, and baseline disharmony metrics so that + callers can decide whether to refactor the implementation or rename the API surface. + """ + context_words = context_words or [] + ice_result = self.perform_ice_analysis(intent_words, context_words, execution_words) + metrics = ice_result["ice_metrics"] + return { + "execution_intensity": metrics.get("execution_intensity", 0.0), + "alignment_pressure": metrics.get("alignment_pressure", 0.0), + "baseline_disharmony": metrics.get("baseline_disharmony", 0.0), + "intent_execution_disharmony": metrics.get("intent_execution_disharmony", 0.0), + "ice_harmony_level": ice_result.get("ice_harmony_level"), + } + def run_comprehensive_demo(): """Optimized demonstration of DIVE-V2 capabilities""" diff --git a/harmonizer/main.py b/harmonizer/main.py index c4ec488..cfc547e 100644 --- a/harmonizer/main.py +++ b/harmonizer/main.py @@ -32,7 +32,7 @@ import ast # noqa: E402 import fnmatch # noqa: E402 import json # noqa: E402 -from typing import Dict, List, Tuple # noqa: E402 +from typing import Dict, List, Optional, Tuple # noqa: E402 from harmonizer import divine_invitation_engine_V2 as dive # noqa: E402 from harmonizer.ast_semantic_parser import AST_Semantic_Parser # noqa: E402 @@ -67,6 +67,88 @@ def load_configuration() -> Dict: return config_dict +def _should_exclude(path: str, rel_path: str, basename: str, patterns: List[str]) -> bool: + return any( + fnmatch.fnmatch(path, pattern) + or fnmatch.fnmatch(rel_path, pattern) + or fnmatch.fnmatch(basename, pattern) + for pattern in patterns + ) + + +def discover_python_files( + targets: List[str], + config: Dict, + recursive: bool = False, + max_files: Optional[int] = None, +) -> Tuple[List[str], List[Tuple[str, str]], List[str]]: + """ + Expand CLI targets into actual Python files while respecting exclusion rules. + + Returns: + (valid_files, invalid_entries, excluded_files) + """ + exclude_patterns = config.get("exclude", []) + config_root = config.get("config_root") or os.getcwd() + + valid_files: List[str] = [] + invalid_files: List[Tuple[str, str]] = [] + excluded_files: List[str] = [] + + for target in targets: + normalized = os.path.normpath(target) + rel_path = os.path.normpath(os.path.relpath(normalized, config_root)) + basename = os.path.basename(normalized) + + if _should_exclude(normalized, rel_path, basename, exclude_patterns): + excluded_files.append(target) + continue + + if os.path.isdir(normalized): + walker = os.walk(normalized) + for root, _, files in walker: + for file_name in files: + if not file_name.endswith(".py"): + continue + candidate = os.path.join(root, file_name) + rel_candidate = os.path.normpath(os.path.relpath(candidate, config_root)) + if _should_exclude(candidate, rel_candidate, file_name, exclude_patterns): + excluded_files.append(candidate) + continue + valid_files.append(candidate) + if max_files and len(valid_files) >= max_files: + return valid_files, invalid_files, excluded_files + if not recursive: + break + elif os.path.isfile(normalized) and normalized.endswith(".py"): + valid_files.append(target) + else: + error = "File not found" + if os.path.exists(normalized) and not normalized.endswith(".py"): + error = "Not a Python file" + invalid_files.append((target, error)) + + if max_files and len(valid_files) >= max_files: + break + + return valid_files, invalid_files, excluded_files + + +def save_text_report(blocks: List[str], destination: str) -> None: + """Persist textual report blocks to disk.""" + if not blocks: + blocks = ["No analysis output generated."] + payload = "\n\n".join(blocks) + with open(destination, "w", encoding="utf-8") as handle: + handle.write(payload) + + +def save_json_report(payload: Dict, destination: str) -> None: + """Persist JSON payload to disk.""" + with open(destination, "w", encoding="utf-8") as handle: + json.dump(payload, handle, indent=2) + + # --- THE HARMONIZER APPLICATION --- @@ -318,7 +400,7 @@ def _generate_naming_suggestions(self, func_name: str, data: Dict) -> str: def output_report(self, formatted_report: str): print(formatted_report) - def print_json_report(self, all_reports: Dict[str, Dict[str, Dict]]): + def build_json_payload(self, all_reports: Dict[str, Dict[str, Dict]]) -> Dict: output = { "version": "1.5", "threshold": self.disharmony_threshold, @@ -374,7 +456,11 @@ def print_json_report(self, all_reports: Dict[str, Dict[str, Dict]]): "severity_counts": severity_counts, "highest_severity": self._get_highest_severity_name(severity_counts), } - print(json.dumps(output, indent=2)) + return output + + def print_json_report(self, all_reports: Dict[str, Dict[str, Dict]]): + payload = self.build_json_payload(all_reports) + print(json.dumps(payload, indent=2)) def _get_highest_severity_name(self, severity_counts: Dict[str, int]) -> str: for severity in ["critical", "high", "medium", "low", "excellent"]: @@ -407,35 +493,42 @@ def parse_cli_arguments() -> argparse.Namespace: default=3, help="Number of naming suggestions to show (default: 3).", ) + parser.add_argument( + "--recursive", + action="store_true", + help="When targets include directories, walk them recursively to gather .py files.", + ) + parser.add_argument( + "--max-files", + type=int, + help="Limit the number of files analyzed (useful for CI smoke checks).", + ) + parser.add_argument( + "--save-text", + type=str, + help="Write the text report to the specified path in addition to stdout.", + ) + parser.add_argument( + "--save-json", + type=str, + help="Write the JSON payload to the specified path (format must be json).", + ) + parser.add_argument( + "--fail-on-attention", + action="store_true", + help="Exit with code 3 when any function needs immediate attention.", + ) parser.add_argument("--version", action="version", version="Python Code Harmonizer v1.5") return parser.parse_args() def validate_cli_arguments(args: argparse.Namespace, config: Dict) -> List[str]: - valid_files = [] - invalid_files = [] - excluded_files = [] - exclude_patterns = config.get("exclude", []) - config_root = config.get("config_root") or os.getcwd() - for file_path in args.files: - normalized_path = os.path.normpath(file_path) - rel_path = os.path.normpath(os.path.relpath(normalized_path, config_root)) - basename = os.path.basename(normalized_path) - if any( - fnmatch.fnmatch(normalized_path, pattern) - or fnmatch.fnmatch(rel_path, pattern) - or fnmatch.fnmatch(basename, pattern) - for pattern in exclude_patterns - ): - excluded_files.append(file_path) - continue - if os.path.exists(file_path): - if file_path.endswith(".py"): - valid_files.append(file_path) - else: - invalid_files.append((file_path, "Not a Python file")) - else: - invalid_files.append((file_path, "File not found")) + valid_files, invalid_files, excluded_files = discover_python_files( + args.files, + config, + recursive=getattr(args, "recursive", False), + max_files=getattr(args, "max_files", None), + ) if (invalid_files or excluded_files) and args.format == "text": for file_path, error in invalid_files: print(f"\n⚠️ Skipping '{file_path}' - {error}", file=sys.stderr) @@ -453,9 +546,11 @@ def execute_analysis( file_paths: List[str], output_format: str, suggest_refactor: bool, -) -> Tuple[Dict, int]: + capture_text: bool = False, +) -> Tuple[Dict, int, List[str]]: all_reports = {} highest_exit_code = 0 + text_blocks: List[str] = [] for file_path in file_paths: report = harmonizer.analyze_file(file_path) all_reports[file_path] = report @@ -464,11 +559,19 @@ def execute_analysis( if output_format == "text": formatted = harmonizer.format_report(report, suggest_refactor=suggest_refactor) harmonizer.output_report(formatted) - return all_reports, highest_exit_code + if capture_text: + text_blocks.append(f"# {file_path}\n{formatted}") + return all_reports, highest_exit_code, text_blocks def run_cli(): args = parse_cli_arguments() + if args.save_json and args.format != "json": + print( + "⚠️ --save-json requires --format json so the payload matches the CLI output.", + file=sys.stderr, + ) + sys.exit(2) config = load_configuration() valid_files = validate_cli_arguments(args, config) if not valid_files: @@ -485,12 +588,34 @@ def run_cli(): top_suggestions=args.top_suggestions, ) - all_reports, highest_exit_code = execute_analysis( - harmonizer, valid_files, args.format, args.suggest_refactor + capture_text = bool(args.save_text) + all_reports, highest_exit_code, text_blocks = execute_analysis( + harmonizer, + valid_files, + args.format, + args.suggest_refactor, + capture_text=capture_text, ) + need_json_payload = args.format == "json" or args.save_json or args.fail_on_attention + json_payload = None + if need_json_payload: + json_payload = harmonizer.build_json_payload(all_reports) + if args.format == "json": - harmonizer.print_json_report(all_reports) + print(json.dumps(json_payload, indent=2)) + + if args.save_text: + save_text_report(text_blocks, args.save_text) + + if args.save_json and json_payload: + save_json_report(json_payload, args.save_json) + + if args.fail_on_attention and json_payload: + severity_counts = json_payload.get("summary", {}).get("severity_counts", {}) + attention_count = severity_counts.get("high", 0) + severity_counts.get("critical", 0) + if attention_count > 0: + highest_exit_code = max(highest_exit_code, 3) sys.exit(highest_exit_code) From 965448ebcbfc3654096dbd49021d28a73e187a55 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 27 Nov 2025 10:14:58 +0000 Subject: [PATCH 2/2] Refactor LJPW baseline metrics extraction for clarity Co-authored-by: taurekaw --- harmonizer/main.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/harmonizer/main.py b/harmonizer/main.py index cfc547e..e874994 100644 --- a/harmonizer/main.py +++ b/harmonizer/main.py @@ -435,16 +435,21 @@ def build_json_payload(self, all_reports: Dict[str, Dict[str, Dict]]) -> Dict: } # Add LJPW baseline metrics if available ice_metrics = data.get("ice_result", {}).get("ice_metrics", {}) - if "baseline_disharmony" in ice_metrics: - function_data["ljpw_baselines"] = { - "baseline_disharmony": round(ice_metrics["baseline_disharmony"], 4), - "intent_composite_score": round( - ice_metrics.get("intent_composite_score", 0), 4 - ), - "execution_composite_score": round( - ice_metrics.get("execution_composite_score", 0), 4 - ), - } + baseline_metrics = {} + baseline_disharmony = ice_metrics.get("baseline_disharmony") + if baseline_disharmony is not None: + baseline_metrics["baseline_disharmony"] = round(baseline_disharmony, 4) + + intent_comp = ice_metrics.get("intent_composite_score") + if intent_comp is not None: + baseline_metrics["intent_composite_score"] = round(intent_comp, 4) + + execution_comp = ice_metrics.get("execution_composite_score") + if execution_comp is not None: + baseline_metrics["execution_composite_score"] = round(execution_comp, 4) + + if baseline_metrics: + function_data["ljpw_baselines"] = baseline_metrics if self.show_semantic_maps: function_data["semantic_map"] = data["semantic_map"] file_data["functions"].append(function_data)