From a207cbf4fddadcf9909396b8eb05eebd2d6a1a53 Mon Sep 17 00:00:00 2001 From: Renan Fernandes Date: Sat, 27 Jun 2026 16:31:42 -0300 Subject: [PATCH 1/4] test: add comprehensive tests for git_ops module (coverage 90%) --- tests/test_git_ops.py | 92 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 2 deletions(-) diff --git a/tests/test_git_ops.py b/tests/test_git_ops.py index 28a1510..be8787e 100644 --- a/tests/test_git_ops.py +++ b/tests/test_git_ops.py @@ -1,7 +1,9 @@ import os +import pytest +from unittest.mock import patch, MagicMock +from gitauditor.core.exceptions import ScanError from gitauditor.core.git_ops import GitService - def test_get_commit_diff_empty_or_clean(tmp_git_repo): # Test diff on empty repo or HEAD where HEAD has no recent uncommitted changes # By default tmp_git_repo is initialized and has a README but maybe not committed @@ -12,7 +14,6 @@ def test_get_commit_diff_empty_or_clean(tmp_git_repo): assert diff is not None assert "Initial commit" in diff or "diff --git" in diff - def test_get_repo_details(tmp_git_repo): os.system(f"git -C {tmp_git_repo} add README.md") os.system(f"git -C {tmp_git_repo} commit -m 'Test commit log'") @@ -25,3 +26,90 @@ def test_get_repo_details(tmp_git_repo): # Test remote url mapping assert "remote" in details + +def test_sanitize_hash(): + assert GitService._sanitize_hash("abc123def456") == "abc123def456" + assert GitService._sanitize_hash("invalid#hash") == "invalidhash" + with pytest.raises(ScanError, match="Invalid commit hash format"): + GitService._sanitize_hash("git--show") + +def test_amend_commit_message(tmp_git_repo): + os.system(f"git -C {tmp_git_repo} add README.md") + os.system(f"git -C {tmp_git_repo} commit -m 'Old Message'") + + GitService.amend_commit_message(tmp_git_repo, "New Message") + details = GitService.get_repo_details(tmp_git_repo) + assert details["commits"][0]["message"] == "New Message" + +def test_rebase_flags(tmp_git_repo): + # Just basic coverage of the shell calls, assuming no active rebase + assert GitService.is_rebasing(tmp_git_repo) is False + # Calling abort or continue when not rebasing shouldn't crash + GitService.abort_rebase(tmp_git_repo) + GitService.continue_rebase(tmp_git_repo) + +@patch("subprocess.run") +def test_start_interactive_rebase(mock_run, tmp_git_repo): + GitService.start_interactive_rebase(tmp_git_repo, 5) + mock_run.assert_called() + +def test_extract_diff_for_commit(tmp_git_repo): + os.system(f"git -C {tmp_git_repo} add README.md") + os.system(f"git -C {tmp_git_repo} commit -m 'Commit 1'") + with open(f"{tmp_git_repo}/README.md", "a") as f: + f.write("Line 2\n") + os.system(f"git -C {tmp_git_repo} commit -am 'Commit 2'") + + diff = GitService.extract_diff_for_commit(tmp_git_repo, "HEAD") + assert "Line 2" in diff + + # Invalid commit + assert "Não foi possível obter o diff" in GitService.extract_diff_for_commit(tmp_git_repo, "invalidhash") + +def test_find_open_branches(tmp_git_repo): + os.system(f"git -C {tmp_git_repo} add README.md") + os.system(f"git -C {tmp_git_repo} commit -m 'C1'") + os.system(f"git -C {tmp_git_repo} branch test-branch") + + branches = GitService.find_open_branches(tmp_git_repo) + assert "test-branch" in branches + assert "main" in branches or "master" in branches + + # Invalid path + assert GitService.find_open_branches("/non/existent/path") == [] + +def test_get_latest_commit_info(tmp_git_repo): + os.system(f"git -C {tmp_git_repo} add README.md") + os.system(f"git -C {tmp_git_repo} commit -m 'Commit Msg 123'") + + info = GitService.get_latest_commit_info(tmp_git_repo) + assert info["message"] == "Commit Msg 123" + assert info["date"] != "1970-01-01 00:00" + + # Invalid path + bad_info = GitService.get_latest_commit_info("/non/existent/path") + assert bad_info["message"] == "N/A" + +@patch("subprocess.run") +def test_reword_commit_success(mock_run, tmp_git_repo): + mock_run.return_value = MagicMock(returncode=0) + # Using patch for 'git.Repo' and other internals to avoid actually rewriting real commits in testing if it's too complex + with patch("git.Repo") as mock_repo: + mock_repo_instance = MagicMock() + mock_repo.return_value = mock_repo_instance + # Fake commit parents + mock_repo_instance.commit.return_value = MagicMock(parents=[MagicMock()]) + + backup_branch = GitService.reword_commit(tmp_git_repo, "abc1234", "New Message") + assert backup_branch.startswith("gitauditor-backup-") + mock_run.assert_called() + +def test_rollback_amend_success(tmp_git_repo): + os.system(f"git -C {tmp_git_repo} add README.md") + os.system(f"git -C {tmp_git_repo} commit -m 'Initial'") + os.system(f"git -C {tmp_git_repo} branch backup-branch") + + assert GitService.rollback_amend(tmp_git_repo, "backup-branch") is True + + with pytest.raises(ScanError): + GitService.rollback_amend(tmp_git_repo, "non-existent-branch") From 33b540939ffcdd74edb87c5813e71ad438d6106f Mon Sep 17 00:00:00 2001 From: Renan Fernandes Date: Sat, 27 Jun 2026 17:14:55 -0300 Subject: [PATCH 2/4] style: fix ruff linting errors to unblock CI --- src/gitauditor/cli.py | 55 +++++--- src/gitauditor/commands/amend_cmd.py | 64 +++++----- src/gitauditor/commands/audit_cmd.py | 16 +-- src/gitauditor/commands/catalog_cmd.py | 67 +++++----- src/gitauditor/commands/config_cmd.py | 20 ++- src/gitauditor/commands/policy_cmd.py | 52 +++++--- src/gitauditor/commands/repo_app.py | 20 ++- src/gitauditor/commands/repo_cmd.py | 20 +-- src/gitauditor/commands/review_cmd.py | 16 +-- src/gitauditor/commands/ssh_cmd.py | 12 +- src/gitauditor/commands/worktree_cmd.py | 85 ++++++++++--- src/gitauditor/core/ai_api.py | 161 ++++++++++++------------ src/gitauditor/core/audit_log.py | 11 +- src/gitauditor/core/exceptions.py | 10 ++ src/gitauditor/core/git_ops.py | 20 ++- src/gitauditor/core/models.py | 8 +- src/gitauditor/core/policy_engine.py | 71 +++++++---- src/gitauditor/core/scanner.py | 11 +- src/gitauditor/core/semantic.py | 17 +-- src/gitauditor/core/ssh_audit.py | 5 +- tests/conftest.py | 7 +- tests/test_ai_api.py | 35 +++--- tests/test_catalog_cmd.py | 65 ++++++---- tests/test_cli_commands.py | 3 +- tests/test_config.py | 28 +++-- tests/test_git_ops.py | 45 ++++--- tests/test_point1_rebase_merges.py | 4 +- tests/test_point3_windows_rebase.py | 4 +- tests/test_policy_engine.py | 16 ++- tests/test_scanner.py | 10 +- tests/test_ssh_audit.py | 51 ++++---- 31 files changed, 563 insertions(+), 446 deletions(-) diff --git a/src/gitauditor/cli.py b/src/gitauditor/cli.py index e13f2b3..db70a42 100644 --- a/src/gitauditor/cli.py +++ b/src/gitauditor/cli.py @@ -3,6 +3,7 @@ # --- Inicialização da Internacionalização (i18n) --- import gettext import json +import locale import os import typer @@ -11,8 +12,6 @@ from rich.prompt import Prompt from rich.table import Table -import locale - try: system_lang = locale.getdefaultlocale()[0] or "en_US" except Exception: @@ -29,18 +28,21 @@ lang_to_use = cfg.get("lang", default_lang) except Exception as e: import sys + print(f"Aviso: Erro ao carregar config i18n: {e}", file=sys.stderr) -localedir = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'locales') +localedir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "locales") try: - translate = gettext.translation('gitauditor', localedir, languages=[lang_to_use], fallback=True) + translate = gettext.translation("gitauditor", localedir, languages=[lang_to_use], fallback=True) translate.install() _ = translate.gettext except Exception as e: import sys + print(f"Aviso: Não foi possível carregar a tradução: {e}", file=sys.stderr) import builtins - builtins.__dict__['_'] = lambda x: x + + builtins.__dict__["_"] = lambda x: x # ---------------------------------------------------- from gitauditor.commands.catalog_cmd import catalog_app @@ -55,7 +57,7 @@ app = typer.Typer( help=_("GitAuditor - O seu assistente IA e motor de políticas para repositórios Git."), invoke_without_command=True, - epilog="Dica: Use [bold]gitauditor ui[/bold] para o menu interativo clássico." + epilog="Dica: Use [bold]gitauditor ui[/bold] para o menu interativo clássico.", ) console = Console() @@ -101,14 +103,16 @@ def run(self): ) console.print("[0] 🚪 Sair") - total_pages = (total_filtered + self.page_size - 1) // self.page_size if total_filtered > 0 else 1 + total_pages = ( + (total_filtered + self.page_size - 1) // self.page_size if total_filtered > 0 else 1 + ) if total_pages > 1: - console.print(f"[dim]Página {self.current_page + 1}/{total_pages} - Digite 'n' para próxima, 'p' para anterior[/dim]") + console.print( + f"[dim]Página {self.current_page + 1}/{total_pages} - Digite 'n' para próxima, 'p' para anterior[/dim]" + ) choices = [str(i) for i in range(10)] + ["n", "N", "p", "P"] - choice = Prompt.ask( - "Escolha uma opção", choices=choices - ).lower() + choice = Prompt.ask("Escolha uma opção", choices=choices).lower() if choice == "0": console.print("[bold green]Até logo![/bold green] 👋") @@ -125,6 +129,7 @@ def run(self): self.current_page = total_pages - 1 elif choice == "1": from gitauditor.commands.repo_cmd import handle_repo_details + handle_repo_details(self) elif choice == "2": from gitauditor.commands.catalog_cmd import open_repo @@ -221,9 +226,7 @@ def _show_ai_menu(self): repo_idx = Prompt.ask("Digite o ID do repositório") if repo_idx.isdigit() and 0 <= int(repo_idx) < len(self.repos): repo_path = self.repos[int(repo_idx)] - limit = Prompt.ask( - "Quantos commits analisar? (0 para todos)", default="0" - ) + limit = Prompt.ask("Quantos commits analisar? (0 para todos)", default="0") changelog_command(path=repo_path, limit=int(limit)) else: console.print("[red]ID inválido![/red]") @@ -296,9 +299,7 @@ def _load_catalog(self, silent=False): console.print( "[yellow]Por favor, apague o banco antigo e ressincronize:[/yellow]" ) - console.print( - "rm ~/.gitauditor/catalog.db && gitauditor catalog sync\n" - ) + console.print("rm ~/.gitauditor/catalog.db && gitauditor catalog sync\n") raise typer.Exit(1) else: raise @@ -445,9 +446,7 @@ def _action_filter_table(self): console.print("[3] Apenas Negados 🔴") console.print("[4] Apenas Locais 📁") - choice = Prompt.ask( - "Escolha o filtro", choices=["1", "2", "3", "4"], default="1" - ) + choice = Prompt.ask("Escolha o filtro", choices=["1", "2", "3", "4"], default="1") if choice == "1": self.current_filter = "Todos" elif choice == "2": @@ -470,6 +469,7 @@ def _action_filter_table(self): app.command(name="config", help=_("Configurações do GitAuditor"))(config_command) + class AppState: def __init__(self): self._cli: GitAuditorCLI | None = None @@ -480,6 +480,7 @@ def cli(self) -> GitAuditorCLI: self._cli = GitAuditorCLI() return self._cli + @app.callback(invoke_without_command=True) def main_callback(ctx: typer.Context): if ctx.obj is None: @@ -487,6 +488,7 @@ def main_callback(ctx: typer.Context): if ctx.invoked_subcommand is None: ctx.obj.cli.run() + @app.command() def ui(ctx: typer.Context): """Modo Interativo (UI/Launcher Clássico).""" @@ -494,46 +496,59 @@ def ui(ctx: typer.Context): ctx.obj = AppState() ctx.obj.cli.run() + @app.command(name="sync", hidden=True) def sync_shortcut(): """Alias para catalog sync""" from gitauditor.commands.catalog_cmd import sync_catalog + sync_catalog() + @app.command(name="health", hidden=True) def health_shortcut(): """Alias para catalog health""" from gitauditor.commands.catalog_cmd import health_dashboard + health_dashboard() + @app.command(name="history", hidden=True) def history_shortcut(limit: int = 20): """Alias para policy log""" from gitauditor.commands.policy_cmd import policy_log + policy_log(limit=limit) + @app.command(name="amend", hidden=True) def amend_shortcut(ctx: typer.Context): """Alias para repo amend""" from gitauditor.commands.repo_app import repo_amend + if ctx.obj is None: ctx.obj = AppState() repo_amend(ctx) + @app.command(name="details", hidden=True) def details_shortcut(ctx: typer.Context): """Alias para repo details""" from gitauditor.commands.repo_app import repo_details + if ctx.obj is None: ctx.obj = AppState() repo_details(ctx) + @app.command(name="review", hidden=True) def review_shortcut(path: str = ".", staged: bool = False): """Alias para repo review""" from gitauditor.commands.review_cmd import review_command + review_command(path=path, staged=staged) + @app.command(name="ssh", help=_("Gerenciar Chaves e Identidades SSH.")) def ssh_cmd(ctx: typer.Context): if ctx.obj is None: diff --git a/src/gitauditor/commands/amend_cmd.py b/src/gitauditor/commands/amend_cmd.py index cc68c49..d10f9cf 100644 --- a/src/gitauditor/commands/amend_cmd.py +++ b/src/gitauditor/commands/amend_cmd.py @@ -50,9 +50,7 @@ def handle_ai_amend(cli): console.print("\n[bold yellow]Ações de Revisão e Paginação:[/bold yellow]") console.print("[1] 🎯 Revisar um único commit específico") - console.print( - "[2] 🔄 Revisão Sequencial em Lote (O Mais Recente -> O Mais Antigo)" - ) + console.print("[2] 🔄 Revisão Sequencial em Lote (O Mais Recente -> O Mais Antigo)") valid_choices = ["0", "1", "2"] if end < len(commits): @@ -96,9 +94,7 @@ def handle_ai_amend(cli): f"[dim]Hash original: {target_commit['hash']}\nMensagem atual: {target_commit['message']}[/dim]\n" ) - success = _process_single_amend( - cli, repo_path, target_commit, is_batch=True - ) + success = _process_single_amend(cli, repo_path, target_commit, is_batch=True) if not success: break console.print( @@ -119,9 +115,7 @@ def handle_ai_amend(cli): try: repo = git.Repo(repo_path) repo.git.push("--force-with-lease") - console.print( - "[bold green]✅ Push executado com sucesso![/bold green]" - ) + console.print("[bold green]✅ Push executado com sucesso![/bold green]") except Exception as e: console.print(f"[bold red]Erro ao fazer push:[/] {e}") @@ -130,22 +124,16 @@ def handle_ai_amend(cli): console.print("[red]ID inválido![/red]") -def _process_single_amend( - cli, repo_path: str, target_commit: dict, is_batch: bool = False -) -> bool: +def _process_single_amend(cli, repo_path: str, target_commit: dict, is_batch: bool = False) -> bool: """Processa um commit individualmente. Retorna False se o usuário cancelar o lote inteiro.""" commit_hash = target_commit["hash"] - with console.status( - f"[bold green]Isolando diff do commit {commit_hash}..." - ) as status: + with console.status(f"[bold green]Isolando diff do commit {commit_hash}...") as status: diff = GitService.extract_diff_for_commit(repo_path, commit_hash) if not diff or diff.startswith("Não foi possível"): from rich.markup import escape - console.print( - f"[red]Falha ao isolar diff para o commit {commit_hash}.[/red]" - ) + console.print(f"[red]Falha ao isolar diff para o commit {commit_hash}.[/red]") console.print(f"[yellow]{escape(diff[:1000]) if diff else ''}[/yellow]") return True # Retorna True para não cancelar o lote inteiro @@ -165,9 +153,7 @@ def _process_single_amend( return True if is_batch: - prompt_text = ( - "Deseja aplicar? [S]im / [N]Pular / [E]ditar manual / [C]ancelar Lote" - ) + prompt_text = "Deseja aplicar? [S]im / [N]Pular / [E]ditar manual / [C]ancelar Lote" choices = ["S", "N", "E", "C", "s", "n", "e", "c"] else: prompt_text = "Deseja aplicar esta mensagem e reescrever o histórico? (S/N)" @@ -177,21 +163,27 @@ def _process_single_amend( if confirm == "S": try: - with console.status( - "[bold yellow]Iniciando Rebase Interativo Automático..." - ): + with console.status("[bold yellow]Iniciando Rebase Interativo Automático..."): backup_branch = GitService.reword_commit(repo_path, commit_hash, suggestion) - console.print( - "[bold green]✅ Commit atualizado com sucesso via rebase![/bold green]" - ) + console.print("[bold green]✅ Commit atualizado com sucesso via rebase![/bold green]") console.print(f"[dim]Backup guardado na branch: {backup_branch}[/dim]") - if Prompt.ask("Deseja DESFAZER (Rollback) essa reescrita?", choices=["S", "N", "s", "n"], default="N").upper() == "S": + if ( + Prompt.ask( + "Deseja DESFAZER (Rollback) essa reescrita?", + choices=["S", "N", "s", "n"], + default="N", + ).upper() + == "S" + ): GitService.rollback_amend(repo_path, backup_branch) - console.print("[yellow]Rollback executado com sucesso! Histórico restaurado.[/yellow]") + console.print( + "[yellow]Rollback executado com sucesso! Histórico restaurado.[/yellow]" + ) else: # Se aceitou, podemos limpar a branch de backup para não sujar o repo import git + try: repo = git.Repo(repo_path) repo.delete_head(backup_branch, force=True) @@ -211,11 +203,21 @@ def _process_single_amend( ) console.print(f"[dim]Backup guardado na branch: {backup_branch}[/dim]") - if Prompt.ask("Deseja DESFAZER (Rollback) essa reescrita?", choices=["S", "N", "s", "n"], default="N").upper() == "S": + if ( + Prompt.ask( + "Deseja DESFAZER (Rollback) essa reescrita?", + choices=["S", "N", "s", "n"], + default="N", + ).upper() + == "S" + ): GitService.rollback_amend(repo_path, backup_branch) - console.print("[yellow]Rollback executado com sucesso! Histórico restaurado.[/yellow]") + console.print( + "[yellow]Rollback executado com sucesso! Histórico restaurado.[/yellow]" + ) else: import git + try: repo = git.Repo(repo_path) repo.delete_head(backup_branch, force=True) diff --git a/src/gitauditor/commands/audit_cmd.py b/src/gitauditor/commands/audit_cmd.py index 880615d..e4d33ea 100644 --- a/src/gitauditor/commands/audit_cmd.py +++ b/src/gitauditor/commands/audit_cmd.py @@ -38,9 +38,7 @@ def handle_audit_duplicates(cli): console.print("[yellow]Nenhum repositório para auditar.[/yellow]") return - console.print( - Panel.fit("[bold magenta]🧹 Auditoria de Duplicados e Branches[/bold magenta]") - ) + console.print(Panel.fit("[bold magenta]🧹 Auditoria de Duplicados e Branches[/bold magenta]")) url_map = defaultdict(list) for path in cli.repos: @@ -63,9 +61,7 @@ def handle_audit_duplicates(cli): return for set_idx, (url, paths) in enumerate(duplicate_sets): - console.print( - f"\n[bold red]⚠️ Set [{set_idx}] - Duplicados para o remote:[/] {url}" - ) + console.print(f"\n[bold red]⚠️ Set [{set_idx}] - Duplicados para o remote:[/] {url}") dup_table = Table(show_header=True, header_style="bold yellow") dup_table.add_column("Path ID", style="dim", width=7) @@ -130,16 +126,12 @@ def handle_audit_duplicates(cli): shutil.move(target_path, final_dest) cli.repos.remove(target_path) cli.repos.append(final_dest) - cli.repo_status[final_dest] = cli.repo_status.pop( - target_path, "⚪" - ) + cli.repo_status[final_dest] = cli.repo_status.pop(target_path, "⚪") console.print( "[bold green]✅ Repositório unificado e movido com sucesso![/bold green]" ) except Exception as e: - console.print( - f"[bold red]Erro ao mover repositório:[/] {e}" - ) + console.print(f"[bold red]Erro ao mover repositório:[/] {e}") else: console.print("[red]Diretório destino inválido![/red]") else: diff --git a/src/gitauditor/commands/catalog_cmd.py b/src/gitauditor/commands/catalog_cmd.py index ff3abf8..e3d154e 100644 --- a/src/gitauditor/commands/catalog_cmd.py +++ b/src/gitauditor/commands/catalog_cmd.py @@ -35,9 +35,7 @@ def sync_catalog(): ) repos_paths = asyncio.run(scanner.scan(roots)) - console.print( - "[cyan]Enriquecendo metadados (identificando remotes e owners)...[/cyan]" - ) + console.print("[cyan]Enriquecendo metadados (identificando remotes e owners)...[/cyan]") enriched_data = asyncio.run(enrich_all(repos_paths)) with Session(engine) as session: @@ -67,9 +65,7 @@ def list_catalog(): init_db() with Session(engine) as session: repos = session.exec(select(Repo)).all() - console.print( - f"Total: [bold green]{len(repos)}[/bold green] repositórios no catálogo." - ) + console.print(f"Total: [bold green]{len(repos)}[/bold green] repositórios no catálogo.") for r in repos: console.print(f"- [cyan]{r.name}[/cyan] ({r.path})") @@ -81,9 +77,7 @@ def health_dashboard(): with Session(engine) as session: repos = session.exec(select(Repo)).all() if not repos: - console.print( - "[yellow]O catálogo está vazio. Rode `gitauditor catalog sync`.[/yellow]" - ) + console.print("[yellow]O catálogo está vazio. Rode `gitauditor catalog sync`.[/yellow]") return orphans = [r for r in repos if r.status == "Orphan"] @@ -109,9 +103,7 @@ def health_dashboard(): @catalog_app.command("dedupe") def dedupe_repos( - plan: bool = typer.Option( - False, "--plan", help="Mostra apenas o plano de deduplicação" - ), + plan: bool = typer.Option(False, "--plan", help="Mostra apenas o plano de deduplicação"), ): """Identifica e propõe a normalização de repositórios duplicados lógicos.""" init_db() @@ -125,9 +117,7 @@ def dedupe_repos( duplicados = {c: reps for c, reps in canonical_map.items() if len(reps) > 1} if not duplicados: - console.print( - "[green]✅ Nenhum repositório duplicado logicamente encontrado![/green]" - ) + console.print("[green]✅ Nenhum repositório duplicado logicamente encontrado![/green]") return console.print( @@ -139,9 +129,7 @@ def dedupe_repos( console.print(f" - {r.path} [dim]({r.remote_url})[/dim]") if plan: - console.print( - "\n[dim]Modo --plan ativado. Nenhuma deleção será feita.[/dim]" - ) + console.print("\n[dim]Modo --plan ativado. Nenhuma deleção será feita.[/dim]") return import shutil @@ -153,7 +141,11 @@ def dedupe_repos( for i, r in enumerate(reps): console.print(f"[{i}] Manter: {r.path}") - choice = typer.prompt("Digite o número do repositório a MANTER (os outros serão excluídos). Digite -1 para pular", type=int, default=-1) + choice = typer.prompt( + "Digite o número do repositório a MANTER (os outros serão excluídos). Digite -1 para pular", + type=int, + default=-1, + ) if 0 <= choice < len(reps): to_keep = reps[choice] to_delete = [r for idx, r in enumerate(reps) if idx != choice] @@ -162,16 +154,27 @@ def dedupe_repos( safe_to_delete = [] for d in to_delete: import subprocess - res = subprocess.run(["git", "status", "--porcelain"], cwd=d.path, capture_output=True, text=True, timeout=15) + + res = subprocess.run( + ["git", "status", "--porcelain"], + cwd=d.path, + capture_output=True, + text=True, + timeout=15, + ) if res.stdout.strip() != "": - console.print(f"[red]⚠️ Bloqueio de Segurança:[/red] {d.path} tem mudanças não commitadas! Abortando exclusão deste diretório.") + console.print( + f"[red]⚠️ Bloqueio de Segurança:[/red] {d.path} tem mudanças não commitadas! Abortando exclusão deste diretório." + ) else: safe_to_delete.append(d) if not safe_to_delete: continue - console.print("[yellow]Atenção: Os seguintes diretórios serão APAGADOS do disco:[/yellow]") + console.print( + "[yellow]Atenção: Os seguintes diretórios serão APAGADOS do disco:[/yellow]" + ) for d in safe_to_delete: console.print(f"- {d.path}") @@ -201,17 +204,13 @@ def open_repo(query: str): ] if not matches: - console.print( - f"[red]❌ Nenhum repositório encontrado para '{query}'.[/red]" - ) + console.print(f"[red]❌ Nenhum repositório encontrado para '{query}'.[/red]") return if len(matches) == 1: target = matches[0].path else: - console.print( - f"[yellow]Foram encontrados {len(matches)} resultados:[/yellow]" - ) + console.print(f"[yellow]Foram encontrados {len(matches)} resultados:[/yellow]") for i, r in enumerate(matches): console.print(f"[{i}] {r.canonical_name or r.name} ({r.path})") @@ -292,9 +291,7 @@ async def analyze_all(): repo.ai_summary = result.get("summary") repo.ai_stack = result.get("stack") repo.ai_tags = ( - result.get("tags", []) - if isinstance(result.get("tags"), list) - else [] + result.get("tags", []) if isinstance(result.get("tags"), list) else [] ) repo.ai_risk = result.get("risk") @@ -312,9 +309,7 @@ async def analyze_all(): console.print(f" [dim]Tags: {repo.ai_tags}[/dim]") console.print(f" [dim]Resumo: {repo.ai_summary}[/dim]") else: - console.print( - "[red]✗ Falha ao processar a resposta estruturada do LLM.[/red]" - ) + console.print("[red]✗ Falha ao processar a resposta estruturada do LLM.[/red]") asyncio.run(analyze_all()) @@ -322,9 +317,7 @@ async def analyze_all(): @catalog_app.command("tag-auto") def tag_auto_catalog( path: str = typer.Option(None, help="Filtrar por nome do repositório"), - no_ai: bool = typer.Option( - False, "--no-ai", help="Usar apenas heurística bruta (sem IA)" - ), + no_ai: bool = typer.Option(False, "--no-ai", help="Usar apenas heurística bruta (sem IA)"), ): """ [P3.2] Híbrido: Gera e aplica tags automaticamente (Heurística determinística + LLM). diff --git a/src/gitauditor/commands/config_cmd.py b/src/gitauditor/commands/config_cmd.py index 9f9f5db..bb9b952 100644 --- a/src/gitauditor/commands/config_cmd.py +++ b/src/gitauditor/commands/config_cmd.py @@ -13,12 +13,8 @@ def config_command(): config = ConfigManager.load_config() ai_config = config.get("ai", {}) - console.print( - "\n[bold magenta]=== Configuração de IA do GitAuditor ===[/bold magenta]" - ) - console.print( - "[dim]Escolha qual provedor de Inteligência Artificial você quer usar.[/dim]\n" - ) + console.print("\n[bold magenta]=== Configuração de IA do GitAuditor ===[/bold magenta]") + console.print("[dim]Escolha qual provedor de Inteligência Artificial você quer usar.[/dim]\n") provider_choices = ["ollama", "openai", "openrouter", "azure"] current_provider = ai_config.get("provider", "ollama") @@ -72,9 +68,7 @@ def config_command(): current_key = ai_config.get("api_key", "") mask = "*" * 10 + current_key[-4:] if len(current_key) > 4 else "" - new_key = Prompt.ask( - f"Sua OpenRouter API Key [dim](Atual: {mask})[/dim]", password=True - ) + new_key = Prompt.ask(f"Sua OpenRouter API Key [dim](Atual: {mask})[/dim]", password=True) if new_key: ai_config["api_key"] = new_key @@ -84,11 +78,13 @@ def config_command(): current_model = ai_config.get("model", "gpt-4o") ai_config["model"] = Prompt.ask("Qual deployment name (modelo)?", default=current_model) - current_url = ai_config.get("base_url", "https://.services.ai.azure.com/openai/v1") + current_url = ai_config.get( + "base_url", "https://.services.ai.azure.com/openai/v1" + ) ai_config["base_url"] = Prompt.ask("URL base do Azure AI", default=current_url) use_default_cred = Prompt.ask("Usar Entra ID (DefaultAzureCredential)? [S/n]", default="s") - if use_default_cred.lower() == 's': + if use_default_cred.lower() == "s": ai_config["api_key"] = "azure_default_credential" else: current_key = ai_config.get("api_key", "") @@ -103,7 +99,7 @@ def config_command(): lang_choice = Prompt.ask( "Selecione o Idioma / Select Language: [1] Português (pt_BR) [2] English (en_US)", choices=["1", "2"], - default="1" if current_lang == "pt_BR" else "2" + default="1" if current_lang == "pt_BR" else "2", ) config["lang"] = "pt_BR" if lang_choice == "1" else "en_US" diff --git a/src/gitauditor/commands/policy_cmd.py b/src/gitauditor/commands/policy_cmd.py index a167775..2934741 100644 --- a/src/gitauditor/commands/policy_cmd.py +++ b/src/gitauditor/commands/policy_cmd.py @@ -26,16 +26,12 @@ def find_repo_or_exit(query: str): ] if not matches: - console.print( - f"[red]❌ Nenhum repositório encontrado para '{query}'.[/red]" - ) + console.print(f"[red]❌ Nenhum repositório encontrado para '{query}'.[/red]") raise typer.Exit(1) if len(matches) == 1: return matches[0].path - console.print( - "[yellow]Múltiplos encontrados, selecione um para continuar:[/yellow]" - ) + console.print("[yellow]Múltiplos encontrados, selecione um para continuar:[/yellow]") for i, r in enumerate(matches): console.print(f"[{i}] {r.canonical_name or r.name} ({r.path})") @@ -50,7 +46,9 @@ def find_repo_or_exit(query: str): @policy_app.command("check") def check_policy( query: str | None = typer.Argument(None, help="Nome do repositório a ser verificado"), - output_json: bool = typer.Option(False, "--json", help="Retorna o output como JSON estruturado") + output_json: bool = typer.Option( + False, "--json", help="Retorna o output como JSON estruturado" + ), ): """Verifica a saúde de governança de um repositório (README, CI, Secrets, etc).""" @@ -79,11 +77,12 @@ def check_policy( # Audit logging for the command from gitauditor.core.audit_log import AuditLogger + AuditLogger.log( "policy_check", "SUCCESS", f"Checou políticas para {len(paths_to_check)} repo(s).", - details=json.dumps({"checked_count": len(paths_to_check)}) + details=json.dumps({"checked_count": len(paths_to_check)}), ) if output_json: @@ -102,7 +101,10 @@ def check_policy( color = "green" if score >= 80 else "yellow" if score >= 50 else "red" - table = Table(title=f"Governance Report: {repo_name} (Score: [{color}]{score}/100[/{color}])", show_header=True) + table = Table( + title=f"Governance Report: {repo_name} (Score: [{color}]{score}/100[/{color}])", + show_header=True, + ) table.add_column("Critério", style="cyan") table.add_column("Status", justify="center") @@ -113,9 +115,20 @@ def _format_status(passed: bool) -> str: table.add_row("LICENSE", _format_status(report["checks"]["license"])) table.add_row("Gitignore", _format_status(report["checks"]["gitignore"])) table.add_row("CI/CD Pipeline", _format_status(report["checks"]["ci_cd"])) - table.add_row("Community (CODEOWNERS/etc)", _format_status(report["checks"]["codeowners"] and report["checks"]["contributing"] and report["checks"]["security"])) + table.add_row( + "Community (CODEOWNERS/etc)", + _format_status( + report["checks"]["codeowners"] + and report["checks"]["contributing"] + and report["checks"]["security"] + ), + ) - env_status = "[red]❌ VAZADO[/red]" if report["checks"]["env_exposed"] else "[green]✅ Seguro[/green]" + env_status = ( + "[red]❌ VAZADO[/red]" + if report["checks"]["env_exposed"] + else "[green]✅ Seguro[/green]" + ) table.add_row("Segurança (.env commitado)", env_status) console.print(table) @@ -129,7 +142,8 @@ def _format_status(passed: bool) -> str: for w in report["warnings"]: console.print(f" - {w}") - console.print("") # spacing + console.print("") # spacing + @policy_app.command("log") def policy_log(limit: int = 20): @@ -141,13 +155,19 @@ def policy_log(limit: int = 20): init_audit_db() with Session(audit_engine) as session: - records = session.exec(select(AuditRecord).order_by(AuditRecord.id.desc()).limit(limit)).all() + records = session.exec( + select(AuditRecord).order_by(AuditRecord.id.desc()).limit(limit) + ).all() if not records: console.print("[yellow]Nenhum registro de auditoria encontrado.[/yellow]") return - table = Table(title=f"Histórico de Auditoria (Últimos {limit})", show_header=True, header_style="bold magenta") + table = Table( + title=f"Histórico de Auditoria (Últimos {limit})", + show_header=True, + header_style="bold magenta", + ) table.add_column("Data", style="dim", width=16) table.add_column("Comando", style="cyan") table.add_column("Status", justify="center") @@ -156,7 +176,9 @@ def policy_log(limit: int = 20): for r in records: dt = r.timestamp.strftime("%Y-%m-%d %H:%M") - status_color = "green" if r.status == "SUCCESS" else "red" if r.status == "ERROR" else "yellow" + status_color = ( + "green" if r.status == "SUCCESS" else "red" if r.status == "ERROR" else "yellow" + ) st = f"[{status_color}]{r.status}[/{status_color}]" ai_info = f"{r.ai_provider}/{r.ai_model}" if r.ai_provider else "-" table.add_row(dt, r.command, st, r.summary, ai_info) diff --git a/src/gitauditor/commands/repo_app.py b/src/gitauditor/commands/repo_app.py index 35cccb0..1d0772b 100644 --- a/src/gitauditor/commands/repo_app.py +++ b/src/gitauditor/commands/repo_app.py @@ -11,34 +11,48 @@ $ gitauditor repo analyze . # Avalia tech-stack, dependências e padrões $ gitauditor repo reword . a1b2c3d # Reescreve a mensagem de um commit antigo interativamente $ gitauditor repo history . # Exibe histórico visual de commits recentes -""" +""", ) + @repo_app.command("review") -def repo_review(path: str = typer.Option(".", help="Caminho do repositório"), staged: bool = typer.Option(False, "--staged")): +def repo_review( + path: str = typer.Option(".", help="Caminho do repositório"), + staged: bool = typer.Option(False, "--staged"), +): """Realiza Code Review de diffs via IA.""" from gitauditor.commands.review_cmd import review_command + review_command(path=path, staged=staged) + @repo_app.command("changelog") -def repo_changelog(path: str = typer.Option(".", help="Caminho do repositório"), limit: int = typer.Option(0, "--limit")): +def repo_changelog( + path: str = typer.Option(".", help="Caminho do repositório"), + limit: int = typer.Option(0, "--limit"), +): """Gera changelog baseado no histórico de commits via IA.""" from gitauditor.commands.changelog_cmd import changelog_command + changelog_command(path=path, limit=limit) + @repo_app.command("amend") def repo_amend(ctx: typer.Context): """Abre fluxo interativo para reescrever commits com IA.""" from gitauditor.commands.amend_cmd import handle_ai_amend + cli_state = ctx.obj.cli cli_state._load_catalog() cli_state._show_repo_table() handle_ai_amend(cli_state) + @repo_app.command("details") def repo_details(ctx: typer.Context): """Visualiza detalhes e gerencia um repositório interativamente.""" from gitauditor.commands.repo_cmd import handle_repo_details + cli_state = ctx.obj.cli cli_state._load_catalog() cli_state._show_repo_table() diff --git a/src/gitauditor/commands/repo_cmd.py b/src/gitauditor/commands/repo_cmd.py index be88e34..329952c 100644 --- a/src/gitauditor/commands/repo_cmd.py +++ b/src/gitauditor/commands/repo_cmd.py @@ -24,9 +24,7 @@ def handle_repo_details(cli): console.print(f"[bold red]Erro:[/] {details['error']}") return - status_obj = cli.repo_status.get( - repo_path, {"icon": "⚪", "reason": "Desconhecido"} - ) + status_obj = cli.repo_status.get(repo_path, {"icon": "⚪", "reason": "Desconhecido"}) if isinstance(status_obj, str): push_status = status_obj push_reason = "" @@ -68,9 +66,7 @@ def handle_repo_details(cli): commits_table.add_row( c["hash"], c["date"], - c["message"][:50] + "..." - if len(c["message"]) > 50 - else c["message"], + c["message"][:50] + "..." if len(c["message"]) > 50 else c["message"], ) console.print(commits_table) @@ -113,20 +109,14 @@ def handle_repo_details(cli): try: shutil.move(repo_path, final_dest) cli.repos[repo_idx] = final_dest - cli.repo_status[final_dest] = cli.repo_status.pop( - repo_path, "⚪" - ) + cli.repo_status[final_dest] = cli.repo_status.pop(repo_path, "⚪") console.print( f"[bold green]✅ Repositório movido com sucesso para {final_dest}![/bold green]" ) except Exception as e: - console.print( - f"[bold red]Erro ao mover repositório:[/] {e}" - ) + console.print(f"[bold red]Erro ao mover repositório:[/] {e}") else: - console.print( - "[red]Diretório de destino inválido ou inexistente![/red]" - ) + console.print("[red]Diretório de destino inválido ou inexistente![/red]") elif sub_choice == "2": confirm = Prompt.ask( diff --git a/src/gitauditor/commands/review_cmd.py b/src/gitauditor/commands/review_cmd.py index 01646df..016b36c 100644 --- a/src/gitauditor/commands/review_cmd.py +++ b/src/gitauditor/commands/review_cmd.py @@ -28,9 +28,7 @@ def get_git_diff(path: str, staged: bool) -> str: def review_command( path: str = typer.Option(".", help="Caminho do repositório"), - staged: bool = typer.Option( - False, "--staged", help="Analisar apenas mudanças em stage" - ), + staged: bool = typer.Option(False, "--staged", help="Analisar apenas mudanças em stage"), ): """ [P3.3] Code Review Local: Analisa o diff atual buscando code smells e riscos. @@ -50,14 +48,10 @@ def review_command( # Truncate if too large to save LLM context max_diff_length = 5000 if len(diff_text) > max_diff_length: - console.print( - "[yellow]Diff muito grande. Truncando para os primeiros 5KB...[/yellow]" - ) + console.print("[yellow]Diff muito grande. Truncando para os primeiros 5KB...[/yellow]") diff_text = diff_text[:max_diff_length] + "\n...[TRUNCATED]" - console.print( - "[cyan]Chamando IA para review (isso pode levar alguns segundos)...[/cyan]" - ) + console.print("[cyan]Chamando IA para review (isso pode levar alguns segundos)...[/cyan]") from gitauditor.core.ai_api import AIClient @@ -80,9 +74,7 @@ async def run_review(): for s in smells: console.print(f" - {s}") else: - console.print( - "\n[bold green]✅ Nenhum code smell evidente detectado![/bold green]" - ) + console.print("\n[bold green]✅ Nenhum code smell evidente detectado![/bold green]") if risks: console.print("\n[bold red]🚨 Riscos Arquiteturais/Lógicos:[/bold red]") diff --git a/src/gitauditor/commands/ssh_cmd.py b/src/gitauditor/commands/ssh_cmd.py index a0a60c5..f9870a2 100644 --- a/src/gitauditor/commands/ssh_cmd.py +++ b/src/gitauditor/commands/ssh_cmd.py @@ -12,16 +12,12 @@ def handle_manage_ssh(cli): console.print( - Panel.fit( - "[bold magenta]🔑 Gerenciador de Chaves e Identidades SSH[/bold magenta]" - ) + Panel.fit("[bold magenta]🔑 Gerenciador de Chaves e Identidades SSH[/bold magenta]") ) # Globals globals_cfg = IdentityManager.get_global_git_config() - console.print( - f"[b]Identidade Global Git:[/] {globals_cfg['name']} <{globals_cfg['email']}>\n" - ) + console.print(f"[b]Identidade Global Git:[/] {globals_cfg['name']} <{globals_cfg['email']}>\n") keys = IdentityManager.list_ssh_keys() if not keys: @@ -47,9 +43,7 @@ def handle_manage_ssh(cli): default="github.com", ) with console.status(f"[bold blue]Testando conexão com {provider}..."): - success = asyncio.run( - IdentityManager.test_provider_connection(provider) - ) + success = asyncio.run(IdentityManager.test_provider_connection(provider)) if success: console.print( f"[bold green]✅ Autenticação bem-sucedida no {provider}![/bold green]" diff --git a/src/gitauditor/commands/worktree_cmd.py b/src/gitauditor/commands/worktree_cmd.py index 36c5c58..dd4c288 100644 --- a/src/gitauditor/commands/worktree_cmd.py +++ b/src/gitauditor/commands/worktree_cmd.py @@ -24,16 +24,12 @@ def find_repo_or_exit(query: str): ] if not matches: - console.print( - f"[red]❌ Nenhum repositório encontrado para '{query}'.[/red]" - ) + console.print(f"[red]❌ Nenhum repositório encontrado para '{query}'.[/red]") raise typer.Exit(1) if len(matches) == 1: return matches[0].path - console.print( - "[yellow]Múltiplos encontrados, selecione um para continuar:[/yellow]" - ) + console.print("[yellow]Múltiplos encontrados, selecione um para continuar:[/yellow]") for i, r in enumerate(matches): console.print(f"[{i}] {r.canonical_name or r.name} ({r.path})") @@ -67,9 +63,7 @@ def create_worktree(query: str, branch: str): console.print(f"[yellow]⚠️ O diretório {dest_path} já existe![/yellow]") raise typer.Exit(1) - console.print( - f"[cyan]Criando worktree para branch '{branch}' em '{dest_path}'...[/cyan]" - ) + console.print(f"[cyan]Criando worktree para branch '{branch}' em '{dest_path}'...[/cyan]") res = subprocess.run( ["git", "worktree", "add", dest_path, branch], @@ -82,12 +76,21 @@ def create_worktree(query: str, branch: str): console.print("[bold green]✅ Worktree criada com sucesso![/bold green]") console.print(f"Você já pode abrir o código em: {dest_path}") from gitauditor.core.audit_log import AuditLogger - AuditLogger.log("worktree_create", "SUCCESS", f"Criada branch {branch}", repo_path=dest_path) + + AuditLogger.log( + "worktree_create", "SUCCESS", f"Criada branch {branch}", repo_path=dest_path + ) else: console.print(f"[red]❌ Falha ao criar worktree:[/red] {res.stderr}") + @worktree_app.command("clean") -def clean_worktrees(query: str, force: bool = typer.Option(False, "--force", "-f", help="Apaga sem perguntar (requer repositórios limpos)")): +def clean_worktrees( + query: str, + force: bool = typer.Option( + False, "--force", "-f", help="Apaga sem perguntar (requer repositórios limpos)" + ), +): """Detecta worktrees órfãs, limpas ou sujas, e permite limpá-las com segurança.""" from rich.prompt import Confirm from rich.table import Table @@ -99,7 +102,13 @@ def clean_worktrees(query: str, force: bool = typer.Option(False, "--force", "-f # Prune para remover refs mortas subprocess.run(["git", "worktree", "prune"], cwd=path, capture_output=True, timeout=15) - res = subprocess.run(["git", "worktree", "list", "--porcelain"], cwd=path, capture_output=True, text=True, timeout=15) + res = subprocess.run( + ["git", "worktree", "list", "--porcelain"], + cwd=path, + capture_output=True, + text=True, + timeout=15, + ) if res.returncode != 0: console.print("[red]Erro ao listar worktrees.[/red]") return @@ -119,7 +128,13 @@ def clean_worktrees(query: str, force: bool = typer.Option(False, "--force", "-f worktrees.append(current_wt) # Filtra a worktree principal (que costuma não ter a palavra worktree no seu .git, ou é a mesma raiz) - main_res = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=path, capture_output=True, text=True, timeout=15) + main_res = subprocess.run( + ["git", "rev-parse", "--show-toplevel"], + cwd=path, + capture_output=True, + text=True, + timeout=15, + ) main_root = main_res.stdout.strip() to_clean = [] @@ -142,7 +157,11 @@ def clean_worktrees(query: str, force: bool = typer.Option(False, "--force", "-f else: # Pega o tamanho da pasta try: - wt_size = sum(os.path.getsize(os.path.join(dirpath, filename)) for dirpath, _, filenames in os.walk(wt_path) for filename in filenames) + wt_size = sum( + os.path.getsize(os.path.join(dirpath, filename)) + for dirpath, _, filenames in os.walk(wt_path) + for filename in filenames + ) except Exception: wt_size = 0 @@ -150,14 +169,25 @@ def clean_worktrees(query: str, force: bool = typer.Option(False, "--force", "-f total_bytes += wt_size # Verifica sujeira - status_res = subprocess.run(["git", "status", "--porcelain"], cwd=wt_path, capture_output=True, text=True, timeout=15) + status_res = subprocess.run( + ["git", "status", "--porcelain"], + cwd=wt_path, + capture_output=True, + text=True, + timeout=15, + ) if status_res.stdout.strip() != "": status = "[yellow]Suja (Uncommitted)[/yellow]" else: status = "[green]Limpa[/green]" to_clean.append((wt_path, wt.get("branch", "detached"))) - table.add_row(os.path.basename(wt_path), wt.get("branch", "detached").split("/")[-1], status, f"{size_mb:.1f} MB") + table.add_row( + os.path.basename(wt_path), + wt.get("branch", "detached").split("/")[-1], + status, + f"{size_mb:.1f} MB", + ) console.print(table) @@ -165,10 +195,14 @@ def clean_worktrees(query: str, force: bool = typer.Option(False, "--force", "-f console.print("[dim]Nenhuma worktree secundária limpa para remover.[/dim]") return - console.print(f"\n[bold green]Espaço recuperável estimado: {total_bytes / (1024*1024):.1f} MB[/bold green]") + console.print( + f"\n[bold green]Espaço recuperável estimado: {total_bytes / (1024 * 1024):.1f} MB[/bold green]" + ) if not to_clean: - console.print("[yellow]As worktrees encontradas estão SUJAS ou ausentes. Nenhuma ação automática será tomada.[/yellow]") + console.print( + "[yellow]As worktrees encontradas estão SUJAS ou ausentes. Nenhuma ação automática será tomada.[/yellow]" + ) return console.print("\nAs seguintes worktrees estão LIMPAS e prontas para remoção segura:") @@ -179,10 +213,21 @@ def clean_worktrees(query: str, force: bool = typer.Option(False, "--force", "-f removed = 0 for c_path, c_branch in to_clean: try: - subprocess.run(["git", "worktree", "remove", "-f", c_path], cwd=path, check=True, capture_output=True, timeout=15) + subprocess.run( + ["git", "worktree", "remove", "-f", c_path], + cwd=path, + check=True, + capture_output=True, + timeout=15, + ) removed += 1 except subprocess.CalledProcessError as e: console.print(f"[red]Erro ao remover {c_path}:[/red] {e.stderr.decode()}") console.print(f"[bold green]Limpeza concluída! {removed} worktrees removidas.[/bold green]") - AuditLogger.log("worktree_clean", "SUCCESS", f"Limpou {removed} worktrees em {os.path.basename(path)}", repo_path=path) + AuditLogger.log( + "worktree_clean", + "SUCCESS", + f"Limpou {removed} worktrees em {os.path.basename(path)}", + repo_path=path, + ) diff --git a/src/gitauditor/core/ai_api.py b/src/gitauditor/core/ai_api.py index e2e135d..ea83dba 100644 --- a/src/gitauditor/core/ai_api.py +++ b/src/gitauditor/core/ai_api.py @@ -5,6 +5,7 @@ providers (OpenAI, OpenRouter, Azure, Ollama) and enforces structured JSON outputs for semantic analysis tasks. """ + import json import httpx @@ -19,10 +20,11 @@ class AIClient: """ Unified client for interacting with AI models across different providers. - + Handles configuration, retries, timeouts, and structured JSON parsing. Supports local (Ollama) and cloud (OpenAI, OpenRouter, Azure) endpoints. """ + def __init__(self): config = ConfigManager.load_config() self.ai_config = config.get("ai", {}) @@ -36,18 +38,20 @@ def __init__(self): elif self.provider == "openai": self.base_url = self.ai_config.get("base_url", "https://api.openai.com/v1") elif self.provider == "openrouter": + self.base_url = self.ai_config.get("base_url", "https://openrouter.ai/api/v1") + elif self.provider == "azure": self.base_url = self.ai_config.get( - "base_url", "https://openrouter.ai/api/v1" + "base_url", "https://.services.ai.azure.com/openai/v1" ) - elif self.provider == "azure": - self.base_url = self.ai_config.get("base_url", "https://.services.ai.azure.com/openai/v1") else: self.base_url = self.ai_config.get("base_url", "") @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), - retry_error_callback=lambda s: print(f"\n[bold red]Erro do LLM:[/bold red] {s.outcome.exception()}") or None + retry_error_callback=lambda s: ( + print(f"\n[bold red]Erro do LLM:[/bold red] {s.outcome.exception()}") or None + ), ) async def _generate_structured( self, prompt: str, schema_dict: dict, timeout: float = 120.0 @@ -57,81 +61,82 @@ async def _generate_structured( Retries up to 3 times on failure. """ async with httpx.AsyncClient(timeout=timeout) as client: - if self.provider == "ollama": - # Ollama direct generate endpoint - response = await client.post( - f"{self.base_url}/api/generate", - json={ - "model": self.model, - "prompt": prompt, - "stream": False, - "format": schema_dict, - }, + if self.provider == "ollama": + # Ollama direct generate endpoint + response = await client.post( + f"{self.base_url}/api/generate", + json={ + "model": self.model, + "prompt": prompt, + "stream": False, + "format": schema_dict, + }, + ) + if response.status_code == 200: + raw = response.json().get("response", "{}").strip() + raw = raw.removeprefix("```json").removesuffix("```").strip() + raw = raw.removeprefix("```").strip() + return json.loads(raw) + if response.status_code != 200: + raise AIProviderError( + f"Ollama API Error: {response.status_code} - {response.text}" ) - if response.status_code == 200: - raw = response.json().get("response", "{}").strip() - raw = raw.removeprefix("```json").removesuffix("```").strip() - raw = raw.removeprefix("```").strip() - return json.loads(raw) - if response.status_code != 200: - raise AIProviderError(f"Ollama API Error: {response.status_code} - {response.text}") - else: - # OpenAI / OpenRouter / Azure Chat Completions API - headers = { - "Content-Type": "application/json", - } - - if self.provider == "azure" and self.api_key == "azure_default_credential": - from azure.identity import DefaultAzureCredential - credential = DefaultAzureCredential() - token = credential.get_token("https://ai.azure.com/.default").token - headers["Authorization"] = f"Bearer {token}" - else: - headers["Authorization"] = f"Bearer {self.api_key}" - if self.provider == "openrouter": - headers["HTTP-Referer"] = "https://github.com/gitauditor" - headers["X-Title"] = "GitAuditor" - - # Appending schema to prompt to enforce JSON shape - # (since some OpenRouter models don't support response_format strict) - system_msg = ( - "You must respond ONLY with a valid JSON object matching this schema:\n" - f"{json.dumps(schema_dict)}\n" - "Do not include markdown blocks or any other text." - ) + else: + # OpenAI / OpenRouter / Azure Chat Completions API + headers = { + "Content-Type": "application/json", + } - payload = { - "model": self.model, - "messages": [ - {"role": "system", "content": system_msg}, - {"role": "user", "content": prompt}, - ], - "response_format": {"type": "json_object"}, - } - - response = await client.post( - f"{self.base_url}/chat/completions", - headers=headers, - json=payload, - ) + if self.provider == "azure" and self.api_key == "azure_default_credential": + from azure.identity import DefaultAzureCredential - if response.status_code == 200: - data = response.json() - raw = data["choices"][0]["message"]["content"].strip() - raw = raw.removeprefix("```json").removesuffix("```").strip() - raw = raw.removeprefix("```").strip() - return json.loads(raw) - if response.status_code != 200: - raise AIProviderError(f"API Error: {response.status_code} - {response.text}") + credential = DefaultAzureCredential() + token = credential.get_token("https://ai.azure.com/.default").token + headers["Authorization"] = f"Bearer {token}" + else: + headers["Authorization"] = f"Bearer {self.api_key}" + if self.provider == "openrouter": + headers["HTTP-Referer"] = "https://github.com/gitauditor" + headers["X-Title"] = "GitAuditor" + + # Appending schema to prompt to enforce JSON shape + # (since some OpenRouter models don't support response_format strict) + system_msg = ( + "You must respond ONLY with a valid JSON object matching this schema:\n" + f"{json.dumps(schema_dict)}\n" + "Do not include markdown blocks or any other text." + ) + + payload = { + "model": self.model, + "messages": [ + {"role": "system", "content": system_msg}, + {"role": "user", "content": prompt}, + ], + "response_format": {"type": "json_object"}, + } + + response = await client.post( + f"{self.base_url}/chat/completions", + headers=headers, + json=payload, + ) + + if response.status_code == 200: + data = response.json() + raw = data["choices"][0]["message"]["content"].strip() + raw = raw.removeprefix("```json").removesuffix("```").strip() + raw = raw.removeprefix("```").strip() + return json.loads(raw) + if response.status_code != 200: + raise AIProviderError(f"API Error: {response.status_code} - {response.text}") # --------------------------------------------------------- # GITAUDITOR SEMANTIC FEATURES # --------------------------------------------------------- - async def analyze_commit_message( - self, commit_msg: str, diff_text: str - ) -> str | None: + async def analyze_commit_message(self, commit_msg: str, diff_text: str) -> str | None: prompt = ( "You are an expert Git hook enforcing conventional commits.\n" f"Original Message: {commit_msg}\n\n" @@ -152,7 +157,7 @@ async def analyze_commit_message( summary="Gerou nova mensagem de commit com base no diff.", ai_provider=self.provider, ai_model=self.model, - details=json.dumps(res) if res else "RetryError or Invalid Response" + details=json.dumps(res) if res else "RetryError or Invalid Response", ) return res.get("suggested_message") if res else None @@ -166,9 +171,7 @@ async def analyze_repo_semantics(self, context_str: str) -> dict | None: "the tech stack list, suggested tags, and the risk/activity level.\n\n" f"CONTEXT:\n{context_str}\n" ) - res = await self._generate_structured( - prompt, RepoSummarySchema.model_json_schema() - ) + res = await self._generate_structured(prompt, RepoSummarySchema.model_json_schema()) status = "SUCCESS" if res else "ERROR" AuditLogger.log( @@ -212,9 +215,7 @@ async def analyze_local_diff(self, diff_content: str) -> dict | None: "Do NOT focus on secrets.\n\n" f"DIFF:\n{diff_content}\n" ) - res = await self._generate_structured( - prompt, RepoReviewSchema.model_json_schema() - ) + res = await self._generate_structured(prompt, RepoReviewSchema.model_json_schema()) status = "SUCCESS" if res else "ERROR" AuditLogger.log( @@ -235,9 +236,7 @@ async def generate_changelog(self, commits_log: str) -> dict | None: "Group the information into features, fixes, and breaking changes. Summarize the overall evolution.\n\n" f"COMMITS LOG:\n{commits_log}\n" ) - res = await self._generate_structured( - prompt, RepoChangelogSchema.model_json_schema() - ) + res = await self._generate_structured(prompt, RepoChangelogSchema.model_json_schema()) status = "SUCCESS" if res else "ERROR" AuditLogger.log( diff --git a/src/gitauditor/core/audit_log.py b/src/gitauditor/core/audit_log.py index 21491cd..6519650 100644 --- a/src/gitauditor/core/audit_log.py +++ b/src/gitauditor/core/audit_log.py @@ -12,9 +12,12 @@ sqlite_url = f"sqlite:///{AUDIT_DB_PATH}" audit_engine = create_engine(sqlite_url) + class AuditRecord(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) - timestamp: datetime.datetime = Field(default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)) + timestamp: datetime.datetime = Field( + default_factory=lambda: datetime.datetime.now(datetime.timezone.utc) + ) command: str repo_path: str | None = None ai_provider: str | None = None @@ -23,9 +26,11 @@ class AuditRecord(SQLModel, table=True): summary: str details: str | None = None # Could be JSON diff, exception trace, or raw output + def init_audit_db(): SQLModel.metadata.create_all(audit_engine) + class AuditLogger: @staticmethod def log( @@ -35,7 +40,7 @@ def log( repo_path: str | None = None, ai_provider: str | None = None, ai_model: str | None = None, - details: str | None = None + details: str | None = None, ): """Grava uma ação no log de auditoria persistente.""" try: @@ -47,7 +52,7 @@ def log( repo_path=repo_path, ai_provider=ai_provider, ai_model=ai_model, - details=details + details=details, ) with Session(audit_engine) as session: session.add(record) diff --git a/src/gitauditor/core/exceptions.py b/src/gitauditor/core/exceptions.py index 0528e3d..86e9051 100644 --- a/src/gitauditor/core/exceptions.py +++ b/src/gitauditor/core/exceptions.py @@ -2,22 +2,32 @@ Custom exception hierarchy for GitAuditor. """ + class GitAuditorError(Exception): """Base exception for all GitAuditor errors.""" + pass + class CatalogError(GitAuditorError): """Raised when there is an issue with the repository catalog or database operations.""" + pass + class AIProviderError(GitAuditorError): """Raised when there is an issue communicating with or parsing responses from the AI provider.""" + pass + class PolicyError(GitAuditorError): """Raised when there is an issue loading or enforcing repository policies.""" + pass + class ScanError(GitAuditorError): """Raised when there is an issue scanning or parsing local git repositories.""" + pass diff --git a/src/gitauditor/core/git_ops.py b/src/gitauditor/core/git_ops.py index e3265f1..d529e54 100644 --- a/src/gitauditor/core/git_ops.py +++ b/src/gitauditor/core/git_ops.py @@ -12,6 +12,7 @@ class GitService: def _sanitize_hash(commit_hash: str) -> str: """Sanitiza strings de hash para evitar injection no GitPython (--flag ou shell).""" import re + if not commit_hash or not isinstance(commit_hash, str): return "HEAD" # Permite alfanuméricos, ~, ^, mas proíbe espaços, ; e duplos hifens @@ -28,14 +29,10 @@ def get_repo_details(path: str) -> dict: # Info básica current_branch = ( - repo.active_branch.name - if not repo.head.is_detached - else "Detached HEAD" + repo.active_branch.name if not repo.head.is_detached else "Detached HEAD" ) remote_url = ( - repo.remotes.origin.url - if "origin" in repo.remotes - else "Sem remote 'origin'" + repo.remotes.origin.url if "origin" in repo.remotes else "Sem remote 'origin'" ) # Status @@ -63,12 +60,8 @@ def get_repo_details(path: str) -> dict: "remote": remote_url, "is_dirty": is_dirty, "commits": commits, - "user_name": repo.config_reader().get_value( - "user", "name", "Não configurado" - ), - "user_email": repo.config_reader().get_value( - "user", "email", "Não configurado" - ), + "user_name": repo.config_reader().get_value("user", "name", "Não configurado"), + "user_email": repo.config_reader().get_value("user", "email", "Não configurado"), } except Exception as e: return {"error": str(e)} @@ -220,11 +213,12 @@ def reword_commit(path: str, commit_hash: str, new_message: str) -> bool: # GUARDRAIL: Create a backup branch for rollback before destructive rebase import datetime + backup_branch_name = f"gitauditor-backup-{datetime.datetime.now(datetime.timezone.utc).strftime('%Y%m%d%H%M%S')}-{commit_hash[:7]}" try: repo.create_head(backup_branch_name, "HEAD") except Exception: - pass # Ignore if it fails, just a safety measure + pass # Ignore if it fails, just a safety measure try: # 1. Script para alterar a instrução do rebase ('pick' para 'reword') no commit alvo diff --git a/src/gitauditor/core/models.py b/src/gitauditor/core/models.py index 30dd937..061ecb4 100644 --- a/src/gitauditor/core/models.py +++ b/src/gitauditor/core/models.py @@ -4,18 +4,20 @@ This module defines the SQLModel schema used to persist repository metadata and AI-generated semantic information locally. """ + from datetime import datetime, timezone -from sqlmodel import Field, SQLModel, Column, JSON +from sqlmodel import JSON, Column, Field, SQLModel class Repo(SQLModel, table=True): """ Represents a local Git repository cataloged by GitAuditor. - - Stores physical location, git remote details, health status, and + + Stores physical location, git remote details, health status, and rich semantic metadata generated by AI providers (P3 Blueprint). """ + id: int | None = Field(default=None, primary_key=True) path: str = Field(index=True, unique=True) name: str = Field(index=True) diff --git a/src/gitauditor/core/policy_engine.py b/src/gitauditor/core/policy_engine.py index 093c4c3..92876db 100644 --- a/src/gitauditor/core/policy_engine.py +++ b/src/gitauditor/core/policy_engine.py @@ -1,10 +1,11 @@ """ Policy engine module. -Enforces basic hygiene, community, and security governance rules on +Enforces basic hygiene, community, and security governance rules on cataloged repositories. Calculates a health score based on standard open-source requirements and secret leakage detection. """ + import os from typing import Any @@ -13,38 +14,40 @@ class PolicyEngine: """ Evaluates repository structure and contents against defined policies. """ + @staticmethod def check_repository(repo_path: str) -> dict[str, Any]: """ Passive check for repository governance and health. - + Evaluates the presence of standard community files (README, LICENSE, etc.), CI/CD pipelines, and checks for critical security risks like committed .env files. - + Args: repo_path (str): The absolute path to the local git repository. - + Returns: - dict[str, Any]: A report dictionary containing the final 'score', - a dictionary of individual boolean 'checks', + dict[str, Any]: A report dictionary containing the final 'score', + a dictionary of individual boolean 'checks', and lists for 'warnings' and 'critical' alerts. """ - report = { - "score": 100, - "checks": {}, - "warnings": [], - "critical": [] - } + report = {"score": 100, "checks": {}, "warnings": [], "critical": []} # 1. Higiene Básica - has_readme = any(os.path.exists(os.path.join(repo_path, f)) for f in ["README.md", "readme.md", "README.txt"]) + has_readme = any( + os.path.exists(os.path.join(repo_path, f)) + for f in ["README.md", "readme.md", "README.txt"] + ) report["checks"]["readme"] = has_readme if not has_readme: report["score"] -= 20 report["warnings"].append("Ausência de README.md documentando o projeto.") - has_license = any(os.path.exists(os.path.join(repo_path, f)) for f in ["LICENSE", "LICENSE.md", "LICENSE.txt"]) + has_license = any( + os.path.exists(os.path.join(repo_path, f)) + for f in ["LICENSE", "LICENSE.md", "LICENSE.txt"] + ) report["checks"]["license"] = has_license if not has_license: report["score"] -= 10 @@ -54,7 +57,9 @@ def check_repository(repo_path: str) -> dict[str, Any]: report["checks"]["gitignore"] = has_gitignore if not has_gitignore: report["score"] -= 10 - report["warnings"].append("Ausência de arquivo .gitignore (risco de lixo no histórico).") + report["warnings"].append( + "Ausência de arquivo .gitignore (risco de lixo no histórico)." + ) # 2. CI/CD Presence has_github_actions = os.path.isdir(os.path.join(repo_path, ".github", "workflows")) @@ -63,37 +68,55 @@ def check_repository(repo_path: str) -> dict[str, Any]: report["checks"]["ci_cd"] = has_ci if not has_ci: report["score"] -= 10 - report["warnings"].append("Ausência de pipeline de CI/CD detectável (.github ou .gitlab-ci).") + report["warnings"].append( + "Ausência de pipeline de CI/CD detectável (.github ou .gitlab-ci)." + ) # 3. Community / Governance Docs - has_codeowners = any(os.path.exists(os.path.join(repo_path, f)) for f in ["CODEOWNERS", ".github/CODEOWNERS", "docs/CODEOWNERS"]) + has_codeowners = any( + os.path.exists(os.path.join(repo_path, f)) + for f in ["CODEOWNERS", ".github/CODEOWNERS", "docs/CODEOWNERS"] + ) report["checks"]["codeowners"] = has_codeowners if not has_codeowners: report["score"] -= 5 - has_contributing = any(os.path.exists(os.path.join(repo_path, f)) for f in ["CONTRIBUTING.md", "contributing.md"]) + has_contributing = any( + os.path.exists(os.path.join(repo_path, f)) + for f in ["CONTRIBUTING.md", "contributing.md"] + ) report["checks"]["contributing"] = has_contributing if not has_contributing: report["score"] -= 5 - has_security = any(os.path.exists(os.path.join(repo_path, f)) for f in ["SECURITY.md", "security.md"]) + has_security = any( + os.path.exists(os.path.join(repo_path, f)) for f in ["SECURITY.md", "security.md"] + ) report["checks"]["security"] = has_security if not has_security: report["score"] -= 5 # Se faltar qualquer doc de governança open source, gera warning condensado if not (has_codeowners and has_contributing and has_security): - report["warnings"].append("Faltam arquivos de Governança (CODEOWNERS, CONTRIBUTING.md ou SECURITY.md).") + report["warnings"].append( + "Faltam arquivos de Governança (CODEOWNERS, CONTRIBUTING.md ou SECURITY.md)." + ) # 4. Critical Security Risks (.env check) # Verify if .env is tracked by git (not just existing on disk) import subprocess - + if not os.path.isdir(repo_path) or not os.path.exists(os.path.join(repo_path, ".git")): return report try: - res = subprocess.run(["git", "ls-files", ".env"], cwd=repo_path, capture_output=True, text=True, timeout=15) + res = subprocess.run( + ["git", "ls-files", ".env"], + cwd=repo_path, + capture_output=True, + text=True, + timeout=15, + ) is_env_tracked = res.stdout.strip() != "" except Exception: is_env_tracked = False @@ -101,7 +124,9 @@ def check_repository(repo_path: str) -> dict[str, Any]: if is_env_tracked: report["score"] -= 50 - report["critical"].append("CRÍTICO: O arquivo '.env' está versionado no repositório! Risco de vazamento de credenciais.") + report["critical"].append( + "CRÍTICO: O arquivo '.env' está versionado no repositório! Risco de vazamento de credenciais." + ) # Floor score at 0 if report["score"] < 0: diff --git a/src/gitauditor/core/scanner.py b/src/gitauditor/core/scanner.py index 5d9057f..a28a818 100644 --- a/src/gitauditor/core/scanner.py +++ b/src/gitauditor/core/scanner.py @@ -4,6 +4,7 @@ Provides asynchronous file system traversal to locate Git repositories while ignoring common dependency and build directories to optimize speed. """ + import asyncio import os @@ -43,7 +44,7 @@ class GitScanner: """ Asynchronous service for scanning and discovering local Git repositories. - + Uses asyncio threads to traverse directories without blocking the event loop, and supports concurrency limits and progress callbacks. """ @@ -57,10 +58,10 @@ def __init__(self, callback=None): async def scan(self, root_dirs: list[str]) -> list[str]: """ Starts an asynchronous scan across multiple root directories. - + Args: root_dirs: A list of absolute paths to start the scan from. - + Returns: list[str]: A list of absolute paths to discovered Git repositories. """ @@ -92,9 +93,7 @@ def _sync_walk(self, directory: str): for root, dirs, files in os.walk(directory, topdown=True): # Filtra in-place para eficiência dirs[:] = [ - d - for d in dirs - if d not in IGNORED_DIRS and (not d.startswith(".") or d == ".git") + d for d in dirs if d not in IGNORED_DIRS and (not d.startswith(".") or d == ".git") ] if ".git" in dirs: diff --git a/src/gitauditor/core/semantic.py b/src/gitauditor/core/semantic.py index 12742a1..9c9cb5a 100644 --- a/src/gitauditor/core/semantic.py +++ b/src/gitauditor/core/semantic.py @@ -44,12 +44,8 @@ class RepoSummarySchema(BaseModel): Schema for Pydantic Validation of the LLM Structured Output """ - summary: str = Field( - description="Short human-readable summary of the repository's purpose." - ) - stack: str = Field( - description="Comma separated list of main technologies detected." - ) + summary: str = Field(description="Short human-readable summary of the repository's purpose.") + stack: str = Field(description="Comma separated list of main technologies detected.") tags: list[str] = Field( description="Suggested categories like: work, study, lab, api, infra, frontend." ) @@ -97,9 +93,7 @@ class RepoChangelogSchema(BaseModel): ) features: list[str] = Field(description="List of new features added.") fixes: list[str] = Field(description="List of bugs or issues fixed.") - breaking_changes: list[str] = Field( - description="List of breaking changes or major refactors." - ) + breaking_changes: list[str] = Field(description="List of breaking changes or major refactors.") # --------------------------------------------------------- @@ -142,9 +136,7 @@ def extract_repo_context(repo_path: str) -> dict[str, str]: if level <= 1 and f in MANIFEST_FILES: try: with open(os.path.join(root, f), encoding="utf-8") as f_obj: - manifests_content[f] = f_obj.read()[ - :2048 - ] # Truncate to 2KB to save tokens + manifests_content[f] = f_obj.read()[:2048] # Truncate to 2KB to save tokens except Exception: pass @@ -159,6 +151,7 @@ def extract_repo_context(repo_path: str) -> dict[str, str]: tree_str = "\n".join(tree_lines) import json + # Generate Invalidation Hash hash_input = tree_str + readme_content + json.dumps(manifests_content, sort_keys=True) source_hash = hashlib.sha256(hash_input.encode("utf-8")).hexdigest() diff --git a/src/gitauditor/core/ssh_audit.py b/src/gitauditor/core/ssh_audit.py index a963ee5..c9f92c6 100644 --- a/src/gitauditor/core/ssh_audit.py +++ b/src/gitauditor/core/ssh_audit.py @@ -114,10 +114,7 @@ async def test_provider_connection(provider: str = "github.com") -> bool: output = stderr.decode() + stdout.decode() # Github e Gitlab retornam mensagens de boas vindas no stderr - if ( - "successfully authenticated" in output.lower() - or "welcome" in output.lower() - ): + if "successfully authenticated" in output.lower() or "welcome" in output.lower(): return True return False except Exception: diff --git a/tests/conftest.py b/tests/conftest.py index 9f332e5..5efddcd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import os + import pytest @@ -7,14 +8,14 @@ def tmp_git_repo(tmp_path): """Cria um repositório Git real temporário para testes.""" repo_dir = tmp_path / "test_repo" repo_dir.mkdir() - + # Initialize git repo with config os.system(f"git init {repo_dir}") os.system(f"git -C {repo_dir} config user.email 'test@example.com'") os.system(f"git -C {repo_dir} config user.name 'Test User'") - + (repo_dir / "README.md").write_text("# Test") - + return str(repo_dir) diff --git a/tests/test_ai_api.py b/tests/test_ai_api.py index 9db0963..d344eaf 100644 --- a/tests/test_ai_api.py +++ b/tests/test_ai_api.py @@ -1,6 +1,7 @@ +from unittest.mock import AsyncMock, MagicMock, patch + import pytest -from unittest.mock import patch, AsyncMock, MagicMock -import httpx + from gitauditor.core.ai_api import AIClient @@ -8,11 +9,7 @@ def override_config(): with patch("gitauditor.core.config.ConfigManager.load_config") as mock_load: mock_load.return_value = { - "ai": { - "provider": "ollama", - "model": "llama3", - "base_url": "http://localhost:11434" - } + "ai": {"provider": "ollama", "model": "llama3", "base_url": "http://localhost:11434"} } yield mock_load @@ -21,16 +18,16 @@ def override_config(): async def test_generate_structured_success(override_config): """Teste envio de prompt base retorna json válido.""" client = AIClient() - + mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = {"response": '{"test": "valid"}'} - + with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: mock_post.return_value = mock_response - + result = await client._generate_structured("test prompt", {"type": "object"}) - + assert mock_post.called assert result == {"test": "valid"} @@ -39,23 +36,19 @@ async def test_generate_structured_success(override_config): async def test_generate_structured_retry_on_error(override_config): """Teste tratamento de erro/timeout gracefully (tenacity retries).""" client = AIClient() - + mock_response_error = MagicMock() mock_response_error.status_code = 500 mock_response_error.text = "Internal Server Error" - + mock_response_success = MagicMock() mock_response_success.status_code = 200 mock_response_success.json.return_value = {"response": '{"recovered": true}'} - + with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: - mock_post.side_effect = [ - Exception("Timeout"), - mock_response_error, - mock_response_success - ] - + mock_post.side_effect = [Exception("Timeout"), mock_response_error, mock_response_success] + result = await client._generate_structured("test prompt", {"type": "object"}) - + assert mock_post.call_count == 3 assert result == {"recovered": True} diff --git a/tests/test_catalog_cmd.py b/tests/test_catalog_cmd.py index 1699972..30c4c37 100644 --- a/tests/test_catalog_cmd.py +++ b/tests/test_catalog_cmd.py @@ -1,7 +1,9 @@ +from unittest.mock import AsyncMock, patch + import pytest -from unittest.mock import patch, AsyncMock +from sqlmodel import Session, SQLModel, create_engine from typer.testing import CliRunner -from sqlmodel import create_engine, SQLModel, Session + from gitauditor.commands.catalog_cmd import catalog_app from gitauditor.core.models import Repo @@ -13,38 +15,44 @@ def mock_db_engine(): """Mock the DB engine to use an in-memory SQLite database.""" engine = create_engine("sqlite:///:memory:") SQLModel.metadata.create_all(engine) - + # We patch the engine in both catalog_cmd and catalog modules where it's used - with patch("gitauditor.commands.catalog_cmd.engine", engine), \ - patch("gitauditor.core.catalog.engine", engine): + with ( + patch("gitauditor.commands.catalog_cmd.engine", engine), + patch("gitauditor.core.catalog.engine", engine), + ): yield engine def test_sync_catalog(mock_db_engine, tmp_git_repo): """Testa se sync_catalog() popula o banco corretamente com os repositórios encontrados.""" - with patch("gitauditor.commands.catalog_cmd.GitScanner") as mock_scanner_cls, \ - patch("gitauditor.commands.catalog_cmd.enrich_all", new_callable=AsyncMock) as mock_enrich: - + with ( + patch("gitauditor.commands.catalog_cmd.GitScanner") as mock_scanner_cls, + patch("gitauditor.commands.catalog_cmd.enrich_all", new_callable=AsyncMock) as mock_enrich, + ): # Mock scanner to return our tmp_git_repo (it's an async function) mock_instance = mock_scanner_cls.return_value mock_instance.scan = AsyncMock(return_value=[tmp_git_repo]) - - mock_enrich.return_value = [{ - "path": tmp_git_repo, - "remote_url": "git@github.com/owner/test_repo.git", - "host": "github.com", - "owner": "owner", - "canonical_name": "owner/test_repo", - "status": "Synced" - }] - + + mock_enrich.return_value = [ + { + "path": tmp_git_repo, + "remote_url": "git@github.com/owner/test_repo.git", + "host": "github.com", + "owner": "owner", + "canonical_name": "owner/test_repo", + "status": "Synced", + } + ] + result = runner.invoke(catalog_app, ["sync"]) - + assert result.exit_code == 0 assert "Catálogo sincronizado com sucesso!" in result.stdout - + # Check DB from sqlmodel import select + with Session(mock_db_engine) as session: repos = session.exec(select(Repo)).all() assert len(repos) == 1 @@ -62,10 +70,14 @@ def test_health_dashboard_empty(mock_db_engine): def test_health_dashboard_with_repos(mock_db_engine): """Testa se health_dashboard() retorna contagem correta de repos.""" with Session(mock_db_engine) as session: - session.add(Repo(name="repo1", path="/path/1", branch="main", status="Synced", canonical_name="a")) - session.add(Repo(name="repo2", path="/path/2", branch="main", status="Orphan", canonical_name="b")) + session.add( + Repo(name="repo1", path="/path/1", branch="main", status="Synced", canonical_name="a") + ) + session.add( + Repo(name="repo2", path="/path/2", branch="main", status="Orphan", canonical_name="b") + ) session.commit() - + result = runner.invoke(catalog_app, ["health"]) assert result.exit_code == 0 assert "Repositórios" in result.stdout @@ -80,14 +92,15 @@ def test_dedupe_repos_plan(mock_db_engine): session.add(Repo(name="repo1", path="/path/old", canonical_name="owner/repo1")) session.add(Repo(name="repo1", path="/path/new", canonical_name="owner/repo1")) session.commit() - + result = runner.invoke(catalog_app, ["dedupe", "--plan"]) - + assert result.exit_code == 0 assert "Modo --plan ativado. Nenhuma deleção será feita." in result.stdout - + # Verify nothing was deleted from sqlmodel import select + with Session(mock_db_engine) as session: repos = session.exec(select(Repo)).all() assert len(repos) == 2 diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index 425c0e7..36add67 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -1,6 +1,7 @@ -from typer.testing import CliRunner from unittest.mock import patch +from typer.testing import CliRunner + from gitauditor.cli import app runner = CliRunner() diff --git a/tests/test_config.py b/tests/test_config.py index b1cda73..3d05781 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,50 +1,54 @@ -import os import json -import pytest +import os + from gitauditor.core.config import ConfigManager + def test_load_config_creates_default(mocker, tmp_path): mocker.patch("gitauditor.core.config.CONFIG_DIR", str(tmp_path)) mocker.patch("gitauditor.core.config.CONFIG_FILE", str(tmp_path / "config.json")) - + config = ConfigManager.load_config() assert config["ai"]["provider"] == "ollama" assert os.path.exists(tmp_path / "config.json") + def test_load_config_reads_existing(mocker, tmp_path): mocker.patch("gitauditor.core.config.CONFIG_DIR", str(tmp_path)) config_file = tmp_path / "config.json" mocker.patch("gitauditor.core.config.CONFIG_FILE", str(config_file)) - + with open(config_file, "w") as f: json.dump({"ai": {"provider": "openai"}}, f) - + config = ConfigManager.load_config() assert config["ai"]["provider"] == "openai" + def test_load_config_handles_exception(mocker, tmp_path): mocker.patch("gitauditor.core.config.CONFIG_DIR", str(tmp_path)) config_file = tmp_path / "config.json" mocker.patch("gitauditor.core.config.CONFIG_FILE", str(config_file)) - + with open(config_file, "w") as f: f.write("invalid json") - + config = ConfigManager.load_config() assert config["ai"]["provider"] == "ollama" + def test_save_config(mocker, tmp_path): mocker.patch("gitauditor.core.config.CONFIG_DIR", str(tmp_path)) config_file = tmp_path / "config.json" mocker.patch("gitauditor.core.config.CONFIG_FILE", str(config_file)) - + ConfigManager.save_config({"ai": {"provider": "azure"}}) - - with open(config_file, "r") as f: + + with open(config_file) as f: data = json.load(f) - + assert data["ai"]["provider"] == "azure" - + # check permissions st = os.stat(config_file) assert oct(st.st_mode)[-3:] == "600" diff --git a/tests/test_git_ops.py b/tests/test_git_ops.py index be8787e..e3b8cec 100644 --- a/tests/test_git_ops.py +++ b/tests/test_git_ops.py @@ -1,87 +1,100 @@ import os +from unittest.mock import MagicMock, patch + import pytest -from unittest.mock import patch, MagicMock + from gitauditor.core.exceptions import ScanError from gitauditor.core.git_ops import GitService + def test_get_commit_diff_empty_or_clean(tmp_git_repo): # Test diff on empty repo or HEAD where HEAD has no recent uncommitted changes # By default tmp_git_repo is initialized and has a README but maybe not committed os.system(f"git -C {tmp_git_repo} add README.md") os.system(f"git -C {tmp_git_repo} commit -m 'Initial commit'") - + diff = GitService.get_commit_diff(tmp_git_repo, "HEAD") assert diff is not None assert "Initial commit" in diff or "diff --git" in diff + def test_get_repo_details(tmp_git_repo): os.system(f"git -C {tmp_git_repo} add README.md") os.system(f"git -C {tmp_git_repo} commit -m 'Test commit log'") - + details = GitService.get_repo_details(tmp_git_repo) assert details["name"] == "test_repo" assert details["is_dirty"] is False assert len(details["commits"]) >= 1 assert details["commits"][0]["message"] == "Test commit log" - + # Test remote url mapping assert "remote" in details + def test_sanitize_hash(): assert GitService._sanitize_hash("abc123def456") == "abc123def456" assert GitService._sanitize_hash("invalid#hash") == "invalidhash" with pytest.raises(ScanError, match="Invalid commit hash format"): GitService._sanitize_hash("git--show") + def test_amend_commit_message(tmp_git_repo): os.system(f"git -C {tmp_git_repo} add README.md") os.system(f"git -C {tmp_git_repo} commit -m 'Old Message'") - + GitService.amend_commit_message(tmp_git_repo, "New Message") details = GitService.get_repo_details(tmp_git_repo) assert details["commits"][0]["message"] == "New Message" + def test_rebase_flags(tmp_git_repo): # Just basic coverage of the shell calls, assuming no active rebase assert GitService.is_rebasing(tmp_git_repo) is False # Calling abort or continue when not rebasing shouldn't crash GitService.abort_rebase(tmp_git_repo) GitService.continue_rebase(tmp_git_repo) - + + @patch("subprocess.run") def test_start_interactive_rebase(mock_run, tmp_git_repo): GitService.start_interactive_rebase(tmp_git_repo, 5) mock_run.assert_called() + def test_extract_diff_for_commit(tmp_git_repo): os.system(f"git -C {tmp_git_repo} add README.md") os.system(f"git -C {tmp_git_repo} commit -m 'Commit 1'") with open(f"{tmp_git_repo}/README.md", "a") as f: f.write("Line 2\n") os.system(f"git -C {tmp_git_repo} commit -am 'Commit 2'") - + diff = GitService.extract_diff_for_commit(tmp_git_repo, "HEAD") assert "Line 2" in diff - + # Invalid commit - assert "Não foi possível obter o diff" in GitService.extract_diff_for_commit(tmp_git_repo, "invalidhash") + assert "Não foi possível obter o diff" in GitService.extract_diff_for_commit( + tmp_git_repo, "invalidhash" + ) + def test_find_open_branches(tmp_git_repo): os.system(f"git -C {tmp_git_repo} add README.md") os.system(f"git -C {tmp_git_repo} commit -m 'C1'") os.system(f"git -C {tmp_git_repo} branch test-branch") - + branches = GitService.find_open_branches(tmp_git_repo) assert "test-branch" in branches assert "main" in branches or "master" in branches - + # Invalid path assert GitService.find_open_branches("/non/existent/path") == [] + def test_get_latest_commit_info(tmp_git_repo): os.system(f"git -C {tmp_git_repo} add README.md") os.system(f"git -C {tmp_git_repo} commit -m 'Commit Msg 123'") - + info = GitService.get_latest_commit_info(tmp_git_repo) assert info["message"] == "Commit Msg 123" assert info["date"] != "1970-01-01 00:00" @@ -90,6 +103,7 @@ def test_get_latest_commit_info(tmp_git_repo): bad_info = GitService.get_latest_commit_info("/non/existent/path") assert bad_info["message"] == "N/A" + @patch("subprocess.run") def test_reword_commit_success(mock_run, tmp_git_repo): mock_run.return_value = MagicMock(returncode=0) @@ -99,17 +113,18 @@ def test_reword_commit_success(mock_run, tmp_git_repo): mock_repo.return_value = mock_repo_instance # Fake commit parents mock_repo_instance.commit.return_value = MagicMock(parents=[MagicMock()]) - + backup_branch = GitService.reword_commit(tmp_git_repo, "abc1234", "New Message") assert backup_branch.startswith("gitauditor-backup-") mock_run.assert_called() + def test_rollback_amend_success(tmp_git_repo): os.system(f"git -C {tmp_git_repo} add README.md") os.system(f"git -C {tmp_git_repo} commit -m 'Initial'") os.system(f"git -C {tmp_git_repo} branch backup-branch") - + assert GitService.rollback_amend(tmp_git_repo, "backup-branch") is True - + with pytest.raises(ScanError): GitService.rollback_amend(tmp_git_repo, "non-existent-branch") diff --git a/tests/test_point1_rebase_merges.py b/tests/test_point1_rebase_merges.py index 0ca0d43..ca5477d 100644 --- a/tests/test_point1_rebase_merges.py +++ b/tests/test_point1_rebase_merges.py @@ -17,9 +17,7 @@ def sandbox_repo(tmp_path): # Inicializa repo e configura user subprocess.run(["git", "init"], cwd=repo_dir, check=True) subprocess.run(["git", "config", "user.name", "Test"], cwd=repo_dir, check=True) - subprocess.run( - ["git", "config", "user.email", "test@example.com"], cwd=repo_dir, check=True - ) + subprocess.run(["git", "config", "user.email", "test@example.com"], cwd=repo_dir, check=True) # Cria estrutura com um merge # Commit A na main diff --git a/tests/test_point3_windows_rebase.py b/tests/test_point3_windows_rebase.py index 78acd78..89d3a0a 100644 --- a/tests/test_point3_windows_rebase.py +++ b/tests/test_point3_windows_rebase.py @@ -16,9 +16,7 @@ def sandbox_repo_rebase(tmp_path): subprocess.run(["git", "init"], cwd=repo_dir, check=True) subprocess.run(["git", "config", "user.name", "Test"], cwd=repo_dir, check=True) - subprocess.run( - ["git", "config", "user.email", "test@example.com"], cwd=repo_dir, check=True - ) + subprocess.run(["git", "config", "user.email", "test@example.com"], cwd=repo_dir, check=True) # Cria 3 commits for i in range(3): diff --git a/tests/test_policy_engine.py b/tests/test_policy_engine.py index 2a53245..b98f465 100644 --- a/tests/test_policy_engine.py +++ b/tests/test_policy_engine.py @@ -1,29 +1,35 @@ import os + from gitauditor.core.policy_engine import PolicyEngine + def test_policy_engine_no_docs(tmp_git_repo): report = PolicyEngine.check_repository(tmp_git_repo) - - assert report["score"] == 55 # 100 - missing license(-10), gitignore(-10), ci(-10), codeowners(-5), contributing(-5), security(-5) = 100 - 45 = 55 + + assert ( + report["score"] == 55 + ) # 100 - missing license(-10), gitignore(-10), ci(-10), codeowners(-5), contributing(-5), security(-5) = 100 - 45 = 55 # The fixture creates `README.md`. So readme check is True. assert report["checks"]["readme"] is True assert report["score"] == 55 assert report["checks"]["license"] is False assert report["checks"]["ci_cd"] is False + def test_policy_engine_with_env(tmp_git_repo): env_file = os.path.join(tmp_git_repo, ".env") with open(env_file, "w") as f: f.write("SECRET=123") - + os.system(f"git -C {tmp_git_repo} add .env") os.system(f"git -C {tmp_git_repo} commit -m 'add .env'") - + report = PolicyEngine.check_repository(tmp_git_repo) assert report["checks"]["env_exposed"] is True assert report["score"] == 5 # 55 - 50 assert any("CRÍTICO" in c for c in report["critical"]) + def test_policy_engine_perfect(tmp_git_repo): # Add all files to get 100 with open(os.path.join(tmp_git_repo, "LICENSE"), "w") as f: @@ -37,7 +43,7 @@ def test_policy_engine_perfect(tmp_git_repo): f.write("Guidelines") with open(os.path.join(tmp_git_repo, "SECURITY.md"), "w") as f: f.write("Security") - + report = PolicyEngine.check_repository(tmp_git_repo) assert report["score"] == 100 assert not report["warnings"] diff --git a/tests/test_scanner.py b/tests/test_scanner.py index f4f430a..e53a78e 100644 --- a/tests/test_scanner.py +++ b/tests/test_scanner.py @@ -1,13 +1,15 @@ -import os import pytest + from gitauditor.core.scanner import GitScanner + @pytest.mark.asyncio async def test_scan_empty_directory(tmp_path): scanner = GitScanner() repos = await scanner.scan([str(tmp_path)]) assert repos == [] + @pytest.mark.asyncio async def test_scan_directory_with_repos(tmp_path): repo1 = tmp_path / "repo1" @@ -23,16 +25,18 @@ async def test_scan_directory_with_repos(tmp_path): assert str(repo1.resolve()) in repos assert str(repo2.resolve()) in repos + @pytest.mark.asyncio async def test_scan_ignores_node_modules(tmp_path): nm_dir = tmp_path / "node_modules" nm_dir.mkdir() (nm_dir / ".git").mkdir() - + scanner = GitScanner() repos = await scanner.scan([str(tmp_path)]) assert repos == [] + @pytest.mark.asyncio async def test_scan_finds_nested_repo_and_avoids_recursing_dot_git(tmp_path): repo_dir = tmp_path / "my_project" @@ -40,7 +44,7 @@ async def test_scan_finds_nested_repo_and_avoids_recursing_dot_git(tmp_path): git_dir = repo_dir / ".git" git_dir.mkdir() (git_dir / "objects").mkdir() - + scanner = GitScanner() repos = await scanner.scan([str(tmp_path)]) assert len(repos) == 1 diff --git a/tests/test_ssh_audit.py b/tests/test_ssh_audit.py index 2622a6c..405e6c1 100644 --- a/tests/test_ssh_audit.py +++ b/tests/test_ssh_audit.py @@ -1,81 +1,86 @@ -import pytest -import os from unittest.mock import MagicMock + +import pytest + from gitauditor.core.ssh_audit import IdentityManager + def test_get_global_git_config(mocker): # Mock subprocess.run mock_run = mocker.patch("subprocess.run") - + # Setup mock returns mock_name_result = MagicMock() mock_name_result.returncode = 0 mock_name_result.stdout = "Test User\n" - + mock_email_result = MagicMock() mock_email_result.returncode = 0 mock_email_result.stdout = "test@example.com\n" - + mock_run.side_effect = [mock_name_result, mock_email_result] - + configs = IdentityManager.get_global_git_config() - + assert configs["name"] == "Test User" assert configs["email"] == "test@example.com" + def test_list_ssh_keys(mocker, tmp_path): mocker.patch("os.path.expanduser", return_value=str(tmp_path)) - + # Create fake keys (tmp_path / "id_rsa").write_text("private") (tmp_path / "id_rsa.pub").write_text("public") - + (tmp_path / "id_ed25519").write_text("private") (tmp_path / "id_ed25519.pub").write_text("public") - - (tmp_path / "other.pub").write_text("public") # no private key - + + (tmp_path / "other.pub").write_text("public") # no private key + keys = IdentityManager.list_ssh_keys() - + assert len(keys) == 2 types = {k["type"] for k in keys} assert "RSA" in types assert "Ed25519" in types + def test_set_repo_identity(mocker, tmp_git_repo): mock_run = mocker.patch("subprocess.run") mock_run.return_value = MagicMock(returncode=0) - + result = IdentityManager.set_repo_identity( - tmp_git_repo, - ssh_key_path="/fake/key", - name="New Name", - email="new@email.com" + tmp_git_repo, ssh_key_path="/fake/key", name="New Name", email="new@email.com" ) - + assert result is True assert mock_run.call_count == 3 + @pytest.mark.asyncio async def test_test_provider_connection_success(mocker): from unittest.mock import AsyncMock + mock_create_subprocess_exec = mocker.patch("asyncio.create_subprocess_exec") mock_process = MagicMock() mock_process.communicate = AsyncMock(return_value=(b"", b"successfully authenticated")) mock_create_subprocess_exec.return_value = mock_process - + result = await IdentityManager.test_provider_connection("github.com") - + assert result is True + @pytest.mark.asyncio async def test_test_provider_connection_failure(mocker): from unittest.mock import AsyncMock + mock_create_subprocess_exec = mocker.patch("asyncio.create_subprocess_exec") mock_process = MagicMock() mock_process.communicate = AsyncMock(return_value=(b"", b"Permission denied")) mock_create_subprocess_exec.return_value = mock_process - + result = await IdentityManager.test_provider_connection("github.com") - + assert result is False From ba95fe4813ae872dac7771f5b211a8c71987d944 Mon Sep 17 00:00:00 2001 From: Renan Fernandes Date: Sat, 27 Jun 2026 17:38:15 -0300 Subject: [PATCH 3/4] test: add tests for worktree_cmd to reach 89% coverage --- tests/test_worktree_cmd.py | 126 +++++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 tests/test_worktree_cmd.py diff --git a/tests/test_worktree_cmd.py b/tests/test_worktree_cmd.py new file mode 100644 index 0000000..5ee1ec0 --- /dev/null +++ b/tests/test_worktree_cmd.py @@ -0,0 +1,126 @@ +import os +import subprocess +import pytest +import typer +from unittest.mock import patch, MagicMock +from sqlmodel import Session +from gitauditor.commands.worktree_cmd import ( + find_repo_or_exit, + list_worktrees, + create_worktree, + clean_worktrees, +) +from gitauditor.core.models import Repo + + +# Mocks for DB +@pytest.fixture +def mock_db_session(): + with patch("gitauditor.commands.worktree_cmd.Session") as mock_session_cls: + mock_session = MagicMock() + mock_session_cls.return_value.__enter__.return_value = mock_session + + # Setup fake DB content + repo1 = Repo(path="/fake/repo1", name="repo1") + repo2 = Repo(path="/fake/repo2", name="repo2", canonical_name="my_repo2") + mock_session.exec.return_value.all.return_value = [repo1, repo2] + yield mock_session + + +def test_find_repo_or_exit_single_match(mock_db_session): + with patch("gitauditor.commands.worktree_cmd.init_db"): + path = find_repo_or_exit("repo1") + assert path == "/fake/repo1" + + +def test_find_repo_or_exit_no_match(mock_db_session): + with patch("gitauditor.commands.worktree_cmd.init_db"): + with pytest.raises(typer.Exit): + find_repo_or_exit("nonexistent") + + +@patch("typer.prompt") +def test_find_repo_or_exit_multiple_matches(mock_prompt, mock_db_session): + mock_prompt.return_value = 1 + with patch("gitauditor.commands.worktree_cmd.init_db"): + # "repo" will match both repo1 and repo2 + path = find_repo_or_exit("repo") + assert path == "/fake/repo2" + + +@patch("typer.prompt") +def test_find_repo_or_exit_multiple_invalid_choice(mock_prompt, mock_db_session): + mock_prompt.return_value = 99 + with patch("gitauditor.commands.worktree_cmd.init_db"): + with pytest.raises(typer.Exit): + find_repo_or_exit("repo") + + +@patch("os.system") +def test_list_worktrees(mock_system, mock_db_session): + with patch("gitauditor.commands.worktree_cmd.init_db"): + list_worktrees("repo1") + mock_system.assert_called_once_with("git -C '/fake/repo1' worktree list") + + +@patch("subprocess.run") +def test_create_worktree_success(mock_run, mock_db_session): + mock_run.return_value = MagicMock(returncode=0) + with patch("gitauditor.commands.worktree_cmd.init_db"): + with patch("os.path.exists", return_value=False): + create_worktree("repo1", "feature/test") + + mock_run.assert_called_once() + args = mock_run.call_args[0][0] + assert "add" in args + assert "feature/test" in args + assert "repo1-feature-test" in args[3] + + +@patch("subprocess.run") +def test_create_worktree_exists(mock_run, mock_db_session): + with patch("gitauditor.commands.worktree_cmd.init_db"): + with patch("os.path.exists", return_value=True): + with pytest.raises(typer.Exit): + create_worktree("repo1", "feature/test") + + +@patch("subprocess.run") +def test_create_worktree_failure(mock_run, mock_db_session): + mock_run.return_value = MagicMock(returncode=1, stderr="error") + with patch("gitauditor.commands.worktree_cmd.init_db"): + with patch("os.path.exists", return_value=False): + create_worktree("repo1", "feature/test") + mock_run.assert_called_once() + + +# Testing clean_worktrees requires extensive subprocess mocking +@patch("subprocess.run") +def test_clean_worktrees(mock_run, mock_db_session): + # Mocking different subprocess calls within clean_worktrees + def side_effect(*args, **kwargs): + cmd = args[0] + if "prune" in cmd: + return MagicMock(returncode=0) + elif "list" in cmd and "--porcelain" in cmd: + stdout = "worktree /fake/repo1-wt\nbranch refs/heads/wt\n\nworktree /fake/repo1\nbare\n" + return MagicMock(returncode=0, stdout=stdout) + elif "rev-parse" in cmd: + return MagicMock(returncode=0, stdout="/fake/repo1\n") + elif "status" in cmd: + return MagicMock(returncode=0, stdout="") # Clean + elif "remove" in cmd: + return MagicMock(returncode=0) + return MagicMock(returncode=0) + + mock_run.side_effect = side_effect + + with patch("gitauditor.commands.worktree_cmd.init_db"): + with patch("os.path.exists", return_value=True): + with patch("os.walk", return_value=[("/fake/repo1-wt", [], ["file.txt"])]): + with patch("os.path.getsize", return_value=1024): + clean_worktrees("repo1", force=True) + + # Should call worktree remove for the clean worktree + remove_called = any("remove" in call.args[0] for call in mock_run.call_args_list) + assert remove_called is True From 1808a5e2d45fb1ae7d1706e7b67cad845c91d07c Mon Sep 17 00:00:00 2001 From: Renan Fernandes Date: Sat, 27 Jun 2026 17:44:17 -0300 Subject: [PATCH 4/4] test: reach 100% coverage on ai_api.py --- tests/test_ai_api.py | 191 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 183 insertions(+), 8 deletions(-) diff --git a/tests/test_ai_api.py b/tests/test_ai_api.py index d344eaf..646fde9 100644 --- a/tests/test_ai_api.py +++ b/tests/test_ai_api.py @@ -1,19 +1,27 @@ -from unittest.mock import AsyncMock, MagicMock, patch - +import json import pytest - +from unittest.mock import patch, AsyncMock, MagicMock from gitauditor.core.ai_api import AIClient +from gitauditor.core.exceptions import AIProviderError +# Dummy schemas +DUMMY_SCHEMA = { + "type": "object", + "properties": {"result": {"type": "string"}}, +} @pytest.fixture def override_config(): with patch("gitauditor.core.config.ConfigManager.load_config") as mock_load: mock_load.return_value = { - "ai": {"provider": "ollama", "model": "llama3", "base_url": "http://localhost:11434"} + "ai": { + "provider": "ollama", + "model": "llama3", + "base_url": "http://localhost:11434" + } } yield mock_load - @pytest.mark.asyncio async def test_generate_structured_success(override_config): """Teste envio de prompt base retorna json válido.""" @@ -21,7 +29,7 @@ async def test_generate_structured_success(override_config): mock_response = MagicMock() mock_response.status_code = 200 - mock_response.json.return_value = {"response": '{"test": "valid"}'} + mock_response.json.return_value = {"response": '```json\n{"test": "valid"}\n```'} with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: mock_post.return_value = mock_response @@ -31,7 +39,6 @@ async def test_generate_structured_success(override_config): assert mock_post.called assert result == {"test": "valid"} - @pytest.mark.asyncio async def test_generate_structured_retry_on_error(override_config): """Teste tratamento de erro/timeout gracefully (tenacity retries).""" @@ -48,7 +55,175 @@ async def test_generate_structured_retry_on_error(override_config): with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: mock_post.side_effect = [Exception("Timeout"), mock_response_error, mock_response_success] - result = await client._generate_structured("test prompt", {"type": "object"}) + # Use patch on sleep so tests are fast + with patch("tenacity.nap.time.sleep"): + result = await client._generate_structured("test prompt", {"type": "object"}) assert mock_post.call_count == 3 assert result == {"recovered": True} + +@pytest.mark.asyncio +async def test_openai_generate_success(override_config): + override_config.return_value = { + "ai": { + "provider": "openai", + "model": "gpt-4o", + "api_key": "sk-test" + } + } + client = AIClient() + assert client.provider == "openai" + assert client.base_url == "https://api.openai.com/v1" + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "choices": [{"message": {"content": '```json\n{"result": "success"}\n```'}}] + } + + with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: + mock_post.return_value = mock_response + + result = await client._generate_structured("test prompt", DUMMY_SCHEMA) + + assert result == {"result": "success"} + mock_post.assert_called_once() + args, kwargs = mock_post.call_args + assert "Authorization" in kwargs["headers"] + assert kwargs["headers"]["Authorization"] == "Bearer sk-test" + +@pytest.mark.asyncio +async def test_openai_generate_failure(override_config): + override_config.return_value = { + "ai": { + "provider": "openai", + "model": "gpt-4o", + "api_key": "sk-test" + } + } + client = AIClient() + + mock_response_error = MagicMock() + mock_response_error.status_code = 500 + mock_response_error.text = "Internal Server Error" + + with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: + mock_post.return_value = mock_response_error + + with patch("tenacity.nap.time.sleep"): + result = await client._generate_structured("test prompt", DUMMY_SCHEMA) + assert result is None + +@pytest.mark.asyncio +async def test_azure_generate_with_default_credential(override_config): + override_config.return_value = { + "ai": { + "provider": "azure", + "model": "gpt-4o", + "api_key": "azure_default_credential", + "base_url": "https://test.services.ai.azure.com/openai/v1" + } + } + client = AIClient() + assert client.provider == "azure" + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "choices": [{"message": {"content": '{"result": "success"}'}}] + } + + mock_credential_instance = MagicMock() + mock_credential_instance.get_token.return_value = MagicMock(token="fake-azure-token") + + with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: + mock_post.return_value = mock_response + with patch("azure.identity.DefaultAzureCredential", return_value=mock_credential_instance): + result = await client._generate_structured("test prompt", DUMMY_SCHEMA) + + assert result == {"result": "success"} + mock_post.assert_called_once() + args, kwargs = mock_post.call_args + assert kwargs["headers"]["Authorization"] == "Bearer fake-azure-token" + +@pytest.mark.asyncio +async def test_analyze_commit_message(override_config): + client = AIClient() + with patch.object(client, "_generate_structured", new_callable=AsyncMock) as mock_gen: + mock_gen.return_value = {"suggested_message": "feat: test"} + with patch("gitauditor.core.audit_log.AuditLogger.log"): + res = await client.analyze_commit_message("msg", "diff") + assert res == "feat: test" + +@pytest.mark.asyncio +async def test_analyze_repo_semantics(override_config): + client = AIClient() + with patch.object(client, "_generate_structured", new_callable=AsyncMock) as mock_gen: + mock_gen.return_value = {"summary": "A cool repo"} + with patch("gitauditor.core.audit_log.AuditLogger.log"): + res = await client.analyze_repo_semantics("context") + assert res == {"summary": "A cool repo"} + +@pytest.mark.asyncio +async def test_refine_repo_tags(override_config): + client = AIClient() + with patch.object(client, "_generate_structured", new_callable=AsyncMock) as mock_gen: + mock_gen.return_value = {"tags": ["python", "ai"]} + with patch("gitauditor.core.audit_log.AuditLogger.log"): + res = await client.refine_repo_tags("context", ["python"]) + assert res == ["python", "ai"] + +@pytest.mark.asyncio +async def test_analyze_local_diff(override_config): + client = AIClient() + with patch.object(client, "_generate_structured", new_callable=AsyncMock) as mock_gen: + mock_gen.return_value = {"score": 90} + with patch("gitauditor.core.audit_log.AuditLogger.log"): + res = await client.analyze_local_diff("diff content") + assert res == {"score": 90} + +@pytest.mark.asyncio +async def test_generate_changelog(override_config): + client = AIClient() + with patch.object(client, "_generate_structured", new_callable=AsyncMock) as mock_gen: + mock_gen.return_value = {"features": ["A"]} + with patch("gitauditor.core.audit_log.AuditLogger.log"): + res = await client.generate_changelog("commits") + assert res == {"features": ["A"]} + +@pytest.mark.asyncio +async def test_openrouter_and_other_providers(override_config): + # openrouter + override_config.return_value = { + "ai": { + "provider": "openrouter", + "model": "gpt-4", + "api_key": "sk-test" + } + } + client = AIClient() + assert client.provider == "openrouter" + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "choices": [{"message": {"content": '{"result": "success"}'}}] + } + + with patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post: + mock_post.return_value = mock_response + result = await client._generate_structured("prompt", DUMMY_SCHEMA) + assert result == {"result": "success"} + args, kwargs = mock_post.call_args + assert kwargs["headers"]["HTTP-Referer"] == "https://github.com/gitauditor" + + # other provider + override_config.return_value = { + "ai": { + "provider": "anthropic", + "model": "claude", + "base_url": "custom_url" + } + } + client2 = AIClient() + assert client2.base_url == "custom_url"