Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 95 additions & 33 deletions harmonizer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import os
from dataclasses import dataclass, field
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional, Tuple

# Try to import tomli for TOML parsing
try:
Expand Down Expand Up @@ -48,48 +48,55 @@ class HarmonizerConfig:

# Analysis
complexity_weight: float = 0.2 # For dynamic simulation
custom_vocabulary: Dict[str, str] = field(default_factory=dict)
source_path: Optional[str] = None
root_dir: Optional[str] = None


class ConfigLoader:
_YAML_FILENAMES = (
".harmonizer.yml",
".harmonizer.yaml",
"harmonizer.yml",
"harmonizer.yaml",
)

@staticmethod
def load(target_dir: str = ".") -> HarmonizerConfig:
def load(target_dir: str = ".", search_parents: bool = False) -> HarmonizerConfig:
"""
Load configuration from target directory.
Priority:
1. harmonizer.yaml
1. harmonizer.yaml / .harmonizer.yaml / .harmonizer.yml
2. pyproject.toml
3. Defaults
"""
config = HarmonizerConfig()
config.root_dir = os.path.abspath(target_dir)

# 1. Try harmonizer.yaml
yaml_path = os.path.join(target_dir, "harmonizer.yaml")
if os.path.exists(yaml_path) and yaml:
try:
with open(yaml_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if data:
ConfigLoader._update_config(config, data)
print(f"Loaded config from {yaml_path}")
return config
except Exception as e:
print(f"Warning: Failed to load {yaml_path}: {e}")

# 2. Try pyproject.toml
toml_path = os.path.join(target_dir, "pyproject.toml")
if os.path.exists(toml_path) and tomli:
try:
with open(toml_path, "rb") as f:
data = tomli.load(f)
tool_config = data.get("tool", {}).get("harmonizer", {})
if tool_config:
ConfigLoader._update_config(config, tool_config)
print(f"Loaded config from {toml_path}")
except Exception as e:
print(f"Warning: Failed to load {toml_path}: {e}")
config_path, config_type = ConfigLoader._locate_config_path(target_dir, search_parents)
if not config_path:
return config

try:
if config_type == "toml":
ConfigLoader._load_from_pyproject(config, config_path)
else:
ConfigLoader._load_from_yaml(config, config_path)
except Exception as exc: # pragma: no cover - defensive logging
print(f"Warning: Failed to load {config_path}: {exc}")
return config

config.source_path = config_path
config.root_dir = os.path.dirname(config_path)
return config

@staticmethod
def load_nearest(start_dir: str = ".") -> HarmonizerConfig:
"""
Load configuration searching parent directories for the first config file.
"""
return ConfigLoader.load(start_dir, search_parents=True)

@staticmethod
def _update_config(config: HarmonizerConfig, data: Dict[str, Any]):
"""Update config object with dictionary data"""
Expand All @@ -102,14 +109,69 @@ def _update_config(config: HarmonizerConfig, data: Dict[str, Any]):
if "min_density" in t:
config.min_density = float(t["min_density"])

if "analysis" in data:
a = data["analysis"]
if "complexity_weight" in a:
config.complexity_weight = float(a["complexity_weight"])

if "paths" in data:
p = data["paths"]
if "exclude" in p:
config.exclude_patterns = p["exclude"]
config.exclude_patterns = list(p["exclude"])
if "report" in p:
config.report_output = p["report"]

if "analysis" in data:
a = data["analysis"]
if "complexity_weight" in a:
config.complexity_weight = float(a["complexity_weight"])
if "exclude" in data:
config.exclude_patterns = list(data["exclude"])

if "custom_vocabulary" in data:
custom_vocab = data.get("custom_vocabulary") or {}
# Merge so later sources can override defaults
config.custom_vocabulary.update(custom_vocab)

@staticmethod
def _locate_config_path(
start_dir: str, search_parents: bool
) -> Tuple[Optional[str], Optional[str]]:
current_dir = os.path.abspath(start_dir)
while True:
for filename in ConfigLoader._YAML_FILENAMES:
candidate = os.path.join(current_dir, filename)
if os.path.exists(candidate) and yaml:
return candidate, "yaml"

toml_path = os.path.join(current_dir, "pyproject.toml")
if os.path.exists(toml_path) and tomli:
return toml_path, "toml"

if not search_parents:
break

parent_dir = os.path.dirname(current_dir)
if parent_dir == current_dir:
break
current_dir = parent_dir

return None, None

@staticmethod
def _load_from_yaml(config: HarmonizerConfig, path: str) -> None:
if not yaml:
raise RuntimeError("PyYAML is not installed")

with open(path, "r", encoding="utf-8") as handle:
data = yaml.safe_load(handle) or {}
if data:
ConfigLoader._update_config(config, data)

@staticmethod
def _load_from_pyproject(config: HarmonizerConfig, path: str) -> None:
if not tomli:
raise RuntimeError("tomli/tomllib is not available for TOML parsing")

with open(path, "rb") as handle:
data = tomli.load(handle)

tool_config = data.get("tool", {}).get("harmonizer", {})
if tool_config:
ConfigLoader._update_config(config, tool_config)
37 changes: 27 additions & 10 deletions harmonizer/divine_invitation_engine_V2.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,24 @@ class SemanticResult:
class VocabularyManager:
"""Optimized vocabulary management with caching"""

def __init__(self, custom_vocabulary: Optional[Dict[str, str]] = None):
self._keyword_map: Dict[str, Dimension] = {}
_BASE_KEYWORD_MAP: Dict[str, Dimension] = {}
_BASE_ICE_DIMENSION_MAP: Dict[Dimension, Dimension] = {}

def __init__(self, custom_vocabulary: Optional[Dict[str, str]] = None, quiet: bool = True):
self._quiet = quiet
self._word_cache: Dict[str, Tuple[Coordinates, int]] = {}
self._ice_dimension_map: Dict[Dimension, Dimension] = {}
self._build_complete_vocabulary()

if VocabularyManager._BASE_KEYWORD_MAP:
# Reuse previously built vocabulary to avoid repeated startup cost
self._keyword_map = dict(VocabularyManager._BASE_KEYWORD_MAP)
self._ice_dimension_map = dict(VocabularyManager._BASE_ICE_DIMENSION_MAP)
else:
self._keyword_map = {}
self._ice_dimension_map = {}
self._build_complete_vocabulary()
VocabularyManager._BASE_KEYWORD_MAP = dict(self._keyword_map)
VocabularyManager._BASE_ICE_DIMENSION_MAP = dict(self._ice_dimension_map)

if custom_vocabulary:
self._apply_custom_vocabulary(custom_vocabulary)

Expand Down Expand Up @@ -388,12 +401,13 @@ def _build_complete_vocabulary(self) -> None:
self._keyword_map[word] = dimension

# Print to stderr to avoid breaking JSON output on stdout
import sys
if not self._quiet:
import sys

print(
f"VocabularyManager: Initialized with {len(self._keyword_map)} unique keywords.",
file=sys.stderr,
)
print(
f"VocabularyManager: Initialized with {len(self._keyword_map)} unique keywords.",
file=sys.stderr,
)

def analyze_text(self, text: str) -> Tuple[Coordinates, int]:
"""Optimized text analysis with caching"""
Expand Down Expand Up @@ -899,7 +913,10 @@ def __init__(self, config: Optional[Dict] = None):

# Build core components
custom_vocabulary = self.config.get("custom_vocabulary", {})
self.vocabulary = VocabularyManager(custom_vocabulary=custom_vocabulary)
verbose_vocab = bool(self.config.get("verbose_vocab"))
self.vocabulary = VocabularyManager(
custom_vocabulary=custom_vocabulary, quiet=not verbose_vocab
)
self.semantic_analyzer = SemanticAnalyzer(self.vocabulary, self.ANCHOR_POINT)

# Build specialized sub-engines
Expand Down
61 changes: 31 additions & 30 deletions harmonizer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,46 +34,37 @@
import json # noqa: E402
from typing import Dict, List, Tuple # noqa: E402

import yaml # noqa: E402

from harmonizer import divine_invitation_engine_V2 as dive # noqa: E402
from harmonizer.ast_semantic_parser import AST_Semantic_Parser # noqa: E402
from harmonizer.refactorer import Refactorer # noqa: E402
from harmonizer.semantic_map import SemanticMapGenerator # noqa: E402
from harmonizer.semantic_naming import SemanticNamingEngine # noqa: E402
from harmonizer.config import ConfigLoader # noqa: E402

# --- CONFIGURATION LOADING ---


def load_configuration() -> Dict:
"""
Searches for and loads .harmonizer.yml from the current directory
up to the root.
Load Harmonizer configuration by searching for the nearest YAML/TOML config.
"""
current_dir = os.getcwd()
while True:
config_path = os.path.join(current_dir, ".harmonizer.yml")
if os.path.exists(config_path):
try:
with open(config_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
if config:
# Use stderr to avoid polluting JSON output
print(
f"INFO: Loaded configuration from {config_path}",
file=sys.stderr,
)
return config
return {}
except (yaml.YAMLError, IOError) as e:
print(f"WARNING: Could not load or parse config: {e}", file=sys.stderr)
return {}

parent_dir = os.path.dirname(current_dir)
if parent_dir == current_dir: # Reached file system root
break
current_dir = parent_dir
return {}
config = ConfigLoader.load_nearest(os.getcwd())
config_dict = {
"exclude": list(config.exclude_patterns),
"custom_vocabulary": dict(config.custom_vocabulary),
"thresholds": {
"max_disharmony": config.max_disharmony,
"max_imbalance": config.max_imbalance,
"min_density": config.min_density,
},
"config_root": config.root_dir or os.getcwd(),
}
if config.source_path:
print(
f"INFO: Loaded configuration from {config.source_path}",
file=sys.stderr,
)
return config_dict


# --- THE HARMONIZER APPLICATION ---
Expand Down Expand Up @@ -169,8 +160,9 @@ def _parse_code_to_ast(self, content: str, file_path: str) -> ast.AST:

def _analyze_all_functions(self, tree: ast.AST) -> Dict[str, Dict]:
harmony_report = {}
function_nodes = (ast.FunctionDef, ast.AsyncFunctionDef)
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
if isinstance(node, function_nodes):
function_name = node.name
docstring = ast.get_docstring(node)
intent_concepts = self.parser.get_intent_concepts(function_name, docstring)
Expand Down Expand Up @@ -424,8 +416,17 @@ def validate_cli_arguments(args: argparse.Namespace, config: Dict) -> List[str]:
invalid_files = []
excluded_files = []
exclude_patterns = config.get("exclude", [])
config_root = config.get("config_root") or os.getcwd()
for file_path in args.files:
if any(fnmatch.fnmatch(file_path, pattern) for pattern in exclude_patterns):
normalized_path = os.path.normpath(file_path)
rel_path = os.path.normpath(os.path.relpath(normalized_path, config_root))
basename = os.path.basename(normalized_path)
if any(
fnmatch.fnmatch(normalized_path, pattern)
or fnmatch.fnmatch(rel_path, pattern)
or fnmatch.fnmatch(basename, pattern)
for pattern in exclude_patterns
):
excluded_files.append(file_path)
continue
if os.path.exists(file_path):
Expand Down