diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..9a920fb --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,2 @@ +services/worker/executors/ @sergio +.github/ @sergio diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..4537b87 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,23 @@ +version: 2 +updates: + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" + day: "monday" + commit-message: + prefix: "chore(deps):" + + - package-ecosystem: "pip" + directory: "/services/api" + schedule: + interval: "weekly" + commit-message: + prefix: "chore(deps):" + + - package-ecosystem: "pip" + directory: "/services/worker" + schedule: + interval: "weekly" + commit-message: + prefix: "chore(deps):" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..baca9c7 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,203 @@ +name: CI + +on: + push: + branches: + - main + - develop + pull_request: + branches: + - main + - develop + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + + lint: + runs-on: ubuntu-latest + timeout-minutes: 15 + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install Ruff + run: pip install ruff + + - name: Run Lint + run: ruff check services/api/ services/worker/ --exclude __pycache__,*.apagar,*.pyc || true + + + typecheck: + runs-on: ubuntu-latest + timeout-minutes: 15 + steps: + - uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + cache: pip + - name: Install dependencies + run: | + pip install mypy + pip install -r services/api/requirements.txt + pip install -r services/worker/requirements.txt + - name: Run Mypy + run: | + # Focamos apenas na lógica da API e Worker da plataforma + mypy --ignore-missing-imports services/api/main.py services/worker/worker.py || true + + + security: + runs-on: ubuntu-latest + timeout-minutes: 15 + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install Bandit + run: pip install bandit + + - name: Run Bandit + run: bandit -r services/api/ services/worker/ -ll -ii -f txt -o bandit-report.txt || true + + - name: Upload Bandit Report + uses: actions/upload-artifact@v4 + with: + name: bandit-report + path: bandit-report.txt + + + test-api: + runs-on: ubuntu-latest + timeout-minutes: 15 + steps: + - uses: actions/checkout@v4 + + - name: Create empty .env file + run: touch .env + + - name: Start Redis and MinIO + run: docker compose up -d redis minio + + - name: Wait for Redis + run: | + for i in $(seq 1 10); do + docker compose exec redis redis-cli ping && break + sleep 3 + done + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + cache: pip + + - name: Install dependencies + run: | + pip install -r services/api/requirements.txt + pip install pytest + + - name: Run API Tests + run: | + if [ -d "services/api/tests/" ]; then + pytest services/api/tests/ + else + echo "Aviso: diretório de testes da API não encontrado, continuando..." + fi + + - name: Stop services + if: always() + run: docker compose --env-file /dev/null down + + + test-worker: + runs-on: ubuntu-latest + timeout-minutes: 15 + steps: + - uses: actions/checkout@v4 + + - name: Create empty .env file + run: touch .env + + - name: Start Redis + run: docker compose up -d redis + + - name: Wait for Redis + run: | + for i in $(seq 1 10); do + docker compose exec redis redis-cli ping && break + sleep 3 + done + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + cache: pip + + - name: Install dependencies + run: | + pip install -r services/worker/requirements.txt + pip install pytest + + - name: Run Worker Tests + run: | + if [ -d "services/worker/tests/" ]; then + pytest services/worker/tests/ + else + echo "Sem diretório de testes — validando contratos dos executors..." + PYTHONPATH=$PWD/services python3 scripts/validate_executors.py + fi + + - name: Stop services + if: always() + run: docker compose --env-file /dev/null down + + + build-docker: + runs-on: ubuntu-latest + timeout-minutes: 30 + steps: + - uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Build API + uses: docker/build-push-action@v5 + with: + context: ./services/api + file: ./services/api/Dockerfile + push: false + cache-from: type=gha + cache-to: type=gha,mode=max + + - name: Build Worker + uses: docker/build-push-action@v5 + with: + context: ./services/worker + file: ./services/worker/Dockerfile + push: false + cache-from: type=gha + cache-to: type=gha,mode=max + + - name: Build Frontend + uses: docker/build-push-action@v5 + with: + context: ./services/frontend + file: ./services/frontend/Dockerfile + push: false + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/.github/workflows/deploy.yml.disabled b/.github/workflows/deploy.yml.disabled new file mode 100644 index 0000000..376d1a6 --- /dev/null +++ b/.github/workflows/deploy.yml.disabled @@ -0,0 +1,151 @@ +name: Deploy + +# Secrets necessários: +# DEPLOY_HOST → IP ou hostname do servidor +# DEPLOY_USER → usuário SSH +# DEPLOY_SSH_KEY → chave privada SSH (RSA ou Ed25519) +# DEPLOY_PATH → path no servidor (ex: /opt/dissmodel-platform) +# MINIO_ROOT_USER → usado no .env do servidor +# MINIO_ROOT_PASSWORD → usado no .env do servidor +# API_KEYS → chaves de API para o FastAPI + +on: + push: + branches: + - main + workflow_dispatch: + inputs: + environment: + description: 'Ambiente para deploy' + required: true + type: choice + options: + - staging + - production + +jobs: + build-and-push: + runs-on: ubuntu-latest + outputs: + image_tag: ${{ github.sha }} + steps: + - uses: actions/checkout@v4 + + - name: Log in to the Container registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Build and push API + uses: docker/build-push-action@v5 + with: + context: ./services/api + file: ./services/api/Dockerfile + push: true + tags: | + ghcr.io/${{ github.repository }}/api:${{ github.sha }} + ghcr.io/${{ github.repository }}/api:latest + cache-from: type=gha + cache-to: type=gha,mode=max + + - name: Build and push Worker + uses: docker/build-push-action@v5 + with: + context: ./services/worker + file: ./services/worker/Dockerfile + push: true + tags: | + ghcr.io/${{ github.repository }}/worker:${{ github.sha }} + ghcr.io/${{ github.repository }}/worker:latest + cache-from: type=gha + cache-to: type=gha,mode=max + + - name: Build and push Frontend + uses: docker/build-push-action@v5 + with: + context: ./services/frontend + file: ./services/frontend/Dockerfile + push: true + tags: | + ghcr.io/${{ github.repository }}/frontend:${{ github.sha }} + ghcr.io/${{ github.repository }}/frontend:latest + cache-from: type=gha + cache-to: type=gha,mode=max + + deploy-ssh: + needs: build-and-push + runs-on: ubuntu-latest + steps: + - name: Deploy via SSH + uses: appleboy/ssh-action@v1.0.3 + with: + host: ${{ secrets.DEPLOY_HOST }} + username: ${{ secrets.DEPLOY_USER }} + key: ${{ secrets.DEPLOY_SSH_KEY }} + script: | + cd ${{ secrets.DEPLOY_PATH }} || exit 1 + git pull origin main + + sed -i '/^IMAGE_TAG=/d' .env + echo "IMAGE_TAG=${{ github.sha }}" >> .env + + sed -i '/^MINIO_ROOT_USER=/d' .env + echo "MINIO_ROOT_USER=${{ secrets.MINIO_ROOT_USER }}" >> .env + + sed -i '/^MINIO_ROOT_PASSWORD=/d' .env + echo "MINIO_ROOT_PASSWORD=${{ secrets.MINIO_ROOT_PASSWORD }}" >> .env + + sed -i '/^API_KEYS=/d' .env + echo "API_KEYS=${{ secrets.API_KEYS }}" >> .env + + docker compose -f docker-compose.prod.yml pull + docker compose -f docker-compose.prod.yml up -d --remove-orphans + + sleep 10 + + if ! docker compose -f docker-compose.prod.yml exec api curl -f http://localhost:8000/health; then + echo "Healthcheck API falhou, iniciando rollback..." + git checkout HEAD^ + PREV_SHA=$(git rev-parse HEAD) + sed -i '/^IMAGE_TAG=/d' .env + echo "IMAGE_TAG=$PREV_SHA" >> .env + docker compose -f docker-compose.prod.yml pull + docker compose -f docker-compose.prod.yml up -d --remove-orphans + exit 1 + fi + + sleep 30 + if [ -f "scripts/health-check.sh" ]; then + bash scripts/health-check.sh || { + echo "Healthcheck completo falhou, iniciando rollback..." + git checkout HEAD^ + PREV_SHA=$(git rev-parse HEAD) + sed -i '/^IMAGE_TAG=/d' .env + echo "IMAGE_TAG=$PREV_SHA" >> .env + docker compose -f docker-compose.prod.yml pull + docker compose -f docker-compose.prod.yml up -d --remove-orphans + exit 1 + } + fi + + notify: + needs: [deploy-ssh] + if: always() + runs-on: ubuntu-latest + steps: + - name: Generate Summary + run: | + echo "## Deploy Summary" >> $GITHUB_STEP_SUMMARY + echo "- **Tag Deployada**: \`${{ github.sha }}\`" >> $GITHUB_STEP_SUMMARY + echo "- **Ambiente**: \`${{ github.event.inputs.environment || 'production' }}\`" >> $GITHUB_STEP_SUMMARY + if [ "${{ needs.deploy-ssh.result }}" = "success" ]; then + echo "- **Status**: ✅ Sucesso" >> $GITHUB_STEP_SUMMARY + else + echo "- **Status**: ❌ Falha" >> $GITHUB_STEP_SUMMARY + fi + echo "- **Link para o Run**: [Run #${{ github.run_id }}](${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }})" >> $GITHUB_STEP_SUMMARY diff --git a/.github/workflows/pr-executor.yml b/.github/workflows/pr-executor.yml new file mode 100644 index 0000000..3986887 --- /dev/null +++ b/.github/workflows/pr-executor.yml @@ -0,0 +1,152 @@ +name: PR Executor Validation + +# Este é o mecanismo de segurança do MVP para novos executors. +# Um executor que passa aqui pode ser mergeado com confiança. +# O workflow bloqueia falhas na validação do contrato e tipagem e foca apenas em PRs para novos executors. + +on: + pull_request: + paths: + - 'services/worker/executors/**' + +jobs: + security: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + - name: Install Bandit + run: pip install bandit + - name: Run Security Check on modified files + run: | + FILES=$(git diff --name-only origin/${{ github.base_ref }}...HEAD -- '*.py' || true) + if [ -n "$FILES" ]; then + bandit -r $FILES -ll -ii + else + echo "No python files changed." + fi + + typecheck: + needs: security + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + cache: 'pip' + - name: Install dependencies + run: | + pip install mypy + pip install -r services/worker/requirements.txt + - name: Run Mypy on modified executor files + run: | + FILES=$(git diff --name-only origin/${{ github.base_ref }}...HEAD -- 'services/worker/executors/*.py' || true) + if [ -n "$FILES" ]; then + mypy --strict $FILES + else + echo "No executor files changed." + fi + + contract: + needs: typecheck + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + cache: 'pip' + - name: Install dependencies + run: | + pip install -r services/worker/requirements.txt + - name: Run Contract Tests + run: | + export PYTHONPATH=$PWD/services + python -c " + from dissmodel.executor.testing import ExecutorTestHarness + from dissmodel.executor.registry import ExecutorRegistry + import worker.executors + import sys + + failed = False + for name, cls in ExecutorRegistry._executors.items(): + print(f'Testing {name}...') + harness = ExecutorTestHarness(cls) + if not harness.run_contract_tests(): + failed = True + + if failed: + sys.exit(1) + " + + unittest: + needs: contract + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Check if tests exist + id: check_tests + run: | + if [ -d "services/worker/tests/" ] && [ "$(ls -A services/worker/tests/)" ]; then + echo "has_tests=true" >> $GITHUB_OUTPUT + else + echo "has_tests=false" >> $GITHUB_OUTPUT + fi + - name: Set up Python + if: steps.check_tests.outputs.has_tests == 'true' + uses: actions/setup-python@v5 + with: + python-version: '3.11' + cache: 'pip' + - name: Install dependencies + if: steps.check_tests.outputs.has_tests == 'true' + run: | + pip install pytest + pip install -r services/worker/requirements.txt + - name: Run Unit Tests + if: steps.check_tests.outputs.has_tests == 'true' + run: pytest services/worker/tests/ + + pr-comment: + needs: [security, typecheck, contract, unittest] + if: always() + runs-on: ubuntu-latest + steps: + - name: Comment PR + uses: actions/github-script@v7 + with: + script: | + const statusMap = { + 'success': '✅', + 'failure': '❌', + 'skipped': '⏭️', + 'cancelled': '🚫' + }; + + const secStatus = statusMap['${{ needs.security.result }}'] || '❓'; + const typStatus = statusMap['${{ needs.typecheck.result }}'] || '❓'; + const conStatus = statusMap['${{ needs.contract.result }}'] || '❓'; + + let uniStatus = '⏭️'; + if ('${{ needs.unittest.result }}' && '${{ needs.unittest.result }}' !== 'skipped') { + uniStatus = statusMap['${{ needs.unittest.result }}'] || '❓'; + } + + const body = `### PR Validation Results\n\n| Job | Status |\n|-----------|--------|\n| Security | ${secStatus} |\n| Typecheck | ${typStatus} |\n| Contract | ${conStatus} |\n| Tests | ${uniStatus} |`; + + github.rest.issues.createComment({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + body: body + }); diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index 93cd966..024de04 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -1,5 +1,3 @@ -version: '3.8' - services: nginx: image: nginx:alpine diff --git a/docker-compose.yml b/docker-compose.yml index ec928fa..be6c280 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: '3.8' - services: configs-sync: image: alpine/git diff --git a/scripts/validate_executors.py b/scripts/validate_executors.py new file mode 100644 index 0000000..2c6f4dc --- /dev/null +++ b/scripts/validate_executors.py @@ -0,0 +1,66 @@ +# scripts/validate_executors.py +# +# Valida automaticamente todos os executors na pasta services/worker/executors/ +# Não requer imports manuais em __init__.py + +import sys +import os +import importlib.util +from pathlib import Path + +# Configura o path para encontrar o pacote 'worker' +BASE_DIR = Path(__file__).parent.parent +SERVICES_DIR = BASE_DIR / "services" +sys.path.insert(0, str(SERVICES_DIR)) + +from dissmodel.executor.testing import ExecutorTestHarness +from dissmodel.executor.registry import ExecutorRegistry + +def discover_and_import_executors(): + """Varre a pasta de executors e importa todos os arquivos .py""" + executors_path = SERVICES_DIR / "worker" / "executors" + if not executors_path.exists(): + return + + for path in executors_path.glob("*.py"): + if path.name in ("__init__.py", "schemas.py", "testing.py"): + continue + + # Importa o módulo dinamicamente para disparar o __init_subclass__ + module_name = f"worker.executors.{path.stem}" + spec = importlib.util.spec_from_file_location(module_name, path) + if spec and spec.loader: + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + print(f"📦 Carregado: {module_name}") + +if __name__ == "__main__": + print("🔍 Iniciando descoberta de executors...") + discover_and_import_executors() + + executors = ExecutorRegistry._executors + + if not executors: + print("⚠️ Nenhum executor encontrado em services/worker/executors/") + sys.exit(0) + + failed = [] + print(f"🧪 Validando {len(executors)} executor(es)...\n") + + for name, cls in executors.items(): + try: + harness = ExecutorTestHarness(cls) + if harness.run_contract_tests(): + print(f"✅ {name} passou nos testes de contrato") + else: + print(f"❌ {name} falhou nos testes de contrato") + failed.append(name) + except Exception as e: + print(f"💥 Erro ao testar {name}: {e}") + failed.append(name) + + if failed: + print(f"\n❌ Falha na validação de {len(failed)} executor(es).") + sys.exit(1) + + print("\n✨ Todos os executors validados com sucesso!") diff --git a/services/worker/__init__.py b/services/worker/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/worker/base.py b/services/worker/base.py deleted file mode 100644 index 8a3aae1..0000000 --- a/services/worker/base.py +++ /dev/null @@ -1,137 +0,0 @@ -# services/worker/base.py -from __future__ import annotations - -from abc import ABC, abstractmethod -from typing import ClassVar, TYPE_CHECKING - -if TYPE_CHECKING: - from worker.schemas import ExperimentRecord - - -class ModelExecutor(ABC): - """ - Interface base para executores de modelos DisSModel. - - Subclasses são registradas automaticamente no ExecutorRegistry - via __init_subclass__ apenas por existirem — sem boilerplate. - - Exemplo mínimo - -------------- - class MyExecutor(ModelExecutor): - name = "my_model" - - def load(self, record: ExperimentRecord): - return gpd.read_file(record.source.uri) - - def run(self, record: ExperimentRecord): - data = self.load(record) - # ... executa simulação ... - return data - - def save(self, result, record: ExperimentRecord) -> ExperimentRecord: - # ... persiste resultado ... - record.status = "completed" - return record - """ - - # Atributo de classe — define a chave no registry. - # Deve ser uma string estática, nunca uma property. - name: ClassVar[str] - - def __init_subclass__(cls, **kwargs: object) -> None: - super().__init_subclass__(**kwargs) - # Importa aqui para evitar circular import entre base e registry - from worker.registry import ExecutorRegistry - if hasattr(cls, "name"): - ExecutorRegistry.register(cls) - - # ── Ciclo de vida obrigatório ───────────────────────────────────────── - - @abstractmethod - def load(self, record: ExperimentRecord): - """ - Carrega e resolve o input (GDF, RasterBackend, etc.). - - Responsabilidades: - - Resolver a URI (s3://, http://, path local) - - Aplicar column_map (vector) ou band_map (raster) - - Preencher record.source.checksum com sha256 do dado baixado - - Retornar o dado carregado no formato esperado por run() - """ - - @abstractmethod - def run(self, record: ExperimentRecord): - """ - Executa a simulação. - - Recebe record com resolved_spec e parameters já mesclados. - Retorna o resultado bruto — formato definido pela subclasse - e consumido por save(). - """ - - @abstractmethod - def save(self, result, record: ExperimentRecord) -> ExperimentRecord: - """ - Persiste o resultado e retorna o record atualizado. - - Responsabilidades: - - Salvar output no MinIO - - Preencher record.output_path e record.output_sha256 - - Definir record.status = "completed" - - Retornar record completo - """ - - # ── Hook opcional ───────────────────────────────────────────────────── - - def validate(self, record: ExperimentRecord) -> None: - """ - Valida spec e dado antes de rodar. - - Chamado pelo runner antes de run(). Subclasses sobrescrevem - para verificar colunas/bandas canônicas, ranges de valores, etc. - Lança ValueError com mensagem acionável se inválido. - - Implementação padrão: no-op. - """ - - # ── Utilitários disponíveis para subclasses ─────────────────────────── - - def _resolve_uri(self, uri: str) -> str: - """ - Resolve uma URI para path local acessível pelo worker. - - s3://bucket/key → baixa para /tmp/, retorna path - http(s)://... → baixa para /tmp/, retorna path - /path/local → retorna como está - """ - import os, hashlib, urllib.request - from worker.storage import minio_client - - if uri.startswith("s3://"): - # s3://bucket/path/to/file.tif - parts = uri[5:].split("/", 1) - bucket = parts[0] - object_key = parts[1] - local_path = f"/tmp/{os.path.basename(object_key)}" - minio_client.fget_object(bucket, object_key, local_path) - return local_path - - if uri.startswith("http://") or uri.startswith("https://"): - filename = uri.split("/")[-1] - local_path = f"/tmp/{filename}" - urllib.request.urlretrieve(uri, local_path) - return local_path - - return uri # path local — funciona igual ao script original - - @staticmethod - def _sha256(path_or_bytes) -> str: - """Calcula sha256 de um arquivo ou bytes.""" - import hashlib - if isinstance(path_or_bytes, (str, bytes.__class__)) and \ - not isinstance(path_or_bytes, bytes): - with open(path_or_bytes, "rb") as f: - data = f.read() - else: - data = path_or_bytes - return hashlib.sha256(data).hexdigest() \ No newline at end of file diff --git a/services/worker/executors/__init__.py b/services/worker/executors/__init__.py index 0e02947..e69de29 100644 --- a/services/worker/executors/__init__.py +++ b/services/worker/executors/__init__.py @@ -1,3 +0,0 @@ -# services/worker/executors/__init__.py - - diff --git a/services/worker/executors/schemas.py b/services/worker/executors/schemas.py deleted file mode 100644 index aeabf5b..0000000 --- a/services/worker/executors/schemas.py +++ /dev/null @@ -1,111 +0,0 @@ -# services/worker/schemas.py -from __future__ import annotations - -from datetime import datetime -from typing import Literal -from uuid import uuid4 - -from pydantic import BaseModel, Field - - -# ── Data provenance ─────────────────────────────────────────────────────────── - -class DataSource(BaseModel): - """Tracks the origin and integrity of an input dataset.""" - - type: Literal["local", "s3", "http", "bdc_stac", "wcpms"] = "local" - uri: str = "" - collection: str = "" # reserved for BDC/STAC integration (post-MVP) - version: str = "" # reserved for BDC collection version - checksum: str = "" # sha256 — filled by executor.load() - - -# ── Core experiment record ──────────────────────────────────────────────────── - -class ExperimentRecord(BaseModel): - """ - Immutable provenance object for a single simulation run. - - Captures everything needed to reproduce the result exactly: - model spec snapshot, input provenance, variable mapping, and - output checksums. Filled progressively by the runner and executor. - """ - - # Identity - experiment_id: str = Field(default_factory=lambda: str(uuid4())) - created_at: datetime = Field(default_factory=datetime.utcnow) - - # Model provenance - model_name: str = "" - model_commit: str = "" # git hash of dissmodel-configs at execution time - code_version: str = "" # dissmodel PyPI tag (e.g. "0.1.5") - resolved_spec: dict = {} # full TOML snapshot — immutable after job starts - - # Input provenance - source: DataSource = Field(default_factory=DataSource) - input_format: Literal["tiff", "vector", "auto"] = "auto" - - # Variable mapping — travels with the request, stored for reproducibility - column_map: dict = {} # canonical → real column name (vector input) - band_map: dict = {} # canonical → real band name (raster input) - - # Execution parameters — override TOML defaults per run - parameters: dict = {} # resolution, n_steps, start_year, etc. - - # Results - output_path: str | None = None - output_sha256: str | None = None - metrics: dict = {} - - # Lifecycle - status: Literal["pending", "running", "completed", "failed"] = "pending" - logs: list[str] = [] - - def add_log(self, message: str) -> None: - """Append a timestamped log entry.""" - ts = datetime.utcnow().strftime("%H:%M:%S") - self.logs.append(f"[{ts}] {message}") - - -# ── API request / response ──────────────────────────────────────────────────── - -class JobRequest(BaseModel): - """Payload for POST /submit_job.""" - - model_name: str - input_dataset: str - input_format: Literal["tiff", "vector", "auto"] = "auto" - parameters: dict = {} - column_map: dict = {} - band_map: dict = {} - priority: Literal["low", "normal", "high"] = "normal" - - -class JobResponse(BaseModel): - """Response for POST /submit_job and GET /job/{id}.""" - - job_id: str - experiment_id: str - status: str - model_name: str - created_at: datetime - output_path: str | None = None - output_sha256: str | None = None - logs: list[str] = [] - - -class InlineJobRequest(BaseModel): - """ - Payload for POST /submit_job_inline. - - Accepts a raw TOML string instead of a registered model name. - Intended for calibration and exploration in Jupyter — results - are not reproducible via the registry and are marked as such. - """ - - input_dataset: str - model_spec_toml: str - input_format: Literal["tiff", "vector", "auto"] = "auto" - parameters: dict = {} - column_map: dict = {} - band_map: dict = {} \ No newline at end of file diff --git a/services/worker/executors/testing.py b/services/worker/executors/testing.py deleted file mode 100644 index bf7dc6a..0000000 --- a/services/worker/executors/testing.py +++ /dev/null @@ -1,186 +0,0 @@ -# services/worker/testing.py -from __future__ import annotations - -import inspect -import traceback -from typing import TYPE_CHECKING - -from worker.schemas import DataSource, ExperimentRecord - -if TYPE_CHECKING: - from worker.base import ModelExecutor - - -class ExecutorTestHarness: - """ - Validates that an executor fulfills the ModelExecutor contract. - - Designed to run in Jupyter before opening a PR — the same checks - are reused in CI via pytest parametrize, so a passing notebook - guarantees a passing pipeline. - - Usage - ----- - harness = ExecutorTestHarness(MyExecutor) - harness.run_contract_tests() # structural — no data needed - harness.run_with_sample_data(record) # full cycle with real data - """ - - def __init__(self, executor_cls: type[ModelExecutor]) -> None: - self.executor_cls = executor_cls - self.executor = executor_cls() - self._passed: list[str] = [] - self._failed: list[str] = [] - - # ── Public interface ────────────────────────────────────────────────────── - - def run_contract_tests(self) -> bool: - """ - Run structural checks — no data required. - Returns True if all checks pass, False otherwise. - Prints a summary report. - """ - self._passed.clear() - self._failed.clear() - - self._check("name attribute exists", self._check_name_exists) - self._check("name is a non-empty string", self._check_name_type) - self._check("name has no whitespace", self._check_name_format) - self._check("load() is implemented", self._check_load) - self._check("run() is implemented", self._check_run) - self._check("save() is implemented", self._check_save) - self._check("run() signature is correct", self._check_run_signature) - self._check("save() signature is correct", self._check_save_signature) - self._check("executor is registered", self._check_registered) - - self._print_report() - return len(self._failed) == 0 - - def run_with_sample_data(self, record: ExperimentRecord | None = None) -> bool: - """ - Run the full executor lifecycle with real or synthetic data. - Returns True if the cycle completes without error. - """ - if record is None: - record = self._minimal_record() - print(f" No record provided — using minimal synthetic record") - - print(f"\n▶ Running {self.executor_cls.name}...") - - try: - print(" validate()...") - self.executor.validate(record) - - print(" run()...") - result = self.executor.run(record) - - print(" save()...") - completed = self.executor.save(result, record) - - if completed.status != "completed": - print(f" ⚠ save() returned status='{completed.status}' — expected 'completed'") - return False - - if not completed.output_sha256: - print(" ⚠ save() did not set output_sha256") - return False - - print(f" ✅ Cycle OK — status={completed.status} sha256={completed.output_sha256[:12]}...") - return True - - except NotImplementedError: - print(" ⚠ Some methods are not yet implemented") - return False - - except Exception as exc: - print(f" ❌ Error during execution:\n{traceback.format_exc()}") - return False - - # ── Individual checks ───────────────────────────────────────────────────── - - def _check_name_exists(self) -> None: - assert hasattr(self.executor_cls, "name"), \ - "Class must define a 'name' class attribute" - - def _check_name_type(self) -> None: - assert isinstance(self.executor_cls.name, str) and self.executor_cls.name, \ - f"'name' must be a non-empty string, got {self.executor_cls.name!r}" - - def _check_name_format(self) -> None: - assert " " not in self.executor_cls.name, \ - f"'name' must not contain whitespace: {self.executor_cls.name!r}" - - def _check_load(self) -> None: - assert not _is_abstract(self.executor, "load"), \ - "load() must be implemented" - - def _check_run(self) -> None: - assert not _is_abstract(self.executor, "run"), \ - "run() must be implemented" - - def _check_save(self) -> None: - assert not _is_abstract(self.executor, "save"), \ - "save() must be implemented" - - def _check_run_signature(self) -> None: - sig = inspect.signature(self.executor.run) - params = [p for p in sig.parameters.values() - if p.name != "self"] - assert len(params) == 1, \ - f"run() must accept exactly one parameter (record), got {[p.name for p in params]}" - - def _check_save_signature(self) -> None: - sig = inspect.signature(self.executor.save) - params = [p for p in sig.parameters.values() - if p.name != "self"] - assert len(params) == 2, \ - f"save() must accept exactly two parameters (result, record), got {[p.name for p in params]}" - - def _check_registered(self) -> None: - from worker.registry import ExecutorRegistry - import worker.executors # noqa: F401 — trigger __init_subclass__ - assert self.executor_cls.name in ExecutorRegistry._executors, \ - f"Executor '{self.executor_cls.name}' is not registered. " \ - f"Is it imported in worker/executors/__init__.py?" - - # ── Helpers ─────────────────────────────────────────────────────────────── - - def _check(self, label: str, fn) -> None: - try: - fn() - self._passed.append(label) - except AssertionError as exc: - self._failed.append(f"{label}: {exc}") - except Exception as exc: - self._failed.append(f"{label}: unexpected error — {exc}") - - def _print_report(self) -> None: - print(f"\nExecutorTestHarness — {self.executor_cls.__name__}") - print("─" * 52) - for label in self._passed: - print(f" ✅ {label}") - for label in self._failed: - print(f" ❌ {label}") - print("─" * 52) - if self._failed: - print(f" {len(self._passed)} passed, {len(self._failed)} failed\n") - else: - print(f" All {len(self._passed)} checks passed ✅\n") - - def _minimal_record(self) -> ExperimentRecord: - """Synthetic record for contract testing without real data.""" - return ExperimentRecord( - model_name = self.executor_cls.name, - model_commit = "local-test", - code_version = "dev", - resolved_spec = {"model": {"class": self.executor_cls.name, "parameters": {}}}, - source = DataSource(type="local", uri=""), - ) - - -# ── Standalone helper ───────────────────────────────────────────────────────── - -def _is_abstract(obj: object, method_name: str) -> bool: - """Return True if a method is still abstract on the given instance.""" - method = getattr(type(obj), method_name, None) - return getattr(method, "__isabstractmethod__", False) \ No newline at end of file diff --git a/services/worker/registry.py b/services/worker/registry.py deleted file mode 100644 index 1255141..0000000 --- a/services/worker/registry.py +++ /dev/null @@ -1,34 +0,0 @@ -# services/worker/registry.py -from __future__ import annotations - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from worker.base import ModelExecutor - - -class ExecutorRegistry: - """Central registry mapping model names to executor classes.""" - - _executors: dict[str, type[ModelExecutor]] = {} - - @classmethod - def register(cls, executor_cls: type[ModelExecutor]) -> None: - """Called automatically by ModelExecutor.__init_subclass__.""" - cls._executors[executor_cls.name] = executor_cls - - @classmethod - def get(cls, name: str) -> type[ModelExecutor]: - """Resolve executor class by name. Raises KeyError if not registered.""" - if name not in cls._executors: - available = ", ".join(cls._executors) or "none" - raise KeyError( - f"Executor '{name}' not registered. " - f"Available: {available}" - ) - return cls._executors[name] - - @classmethod - def list(cls) -> list[str]: - """Return all registered executor names.""" - return list(cls._executors.keys()) \ No newline at end of file diff --git a/services/worker/runner.apagar b/services/worker/runner.apagar deleted file mode 100644 index 760be36..0000000 --- a/services/worker/runner.apagar +++ /dev/null @@ -1,260 +0,0 @@ -# services/worker/runner.py -from __future__ import annotations - -import importlib -import importlib.metadata -import subprocess -import sys -import tomllib -from pathlib import Path - -from dissmodel.executor.registry import ExecutorRegistry -from dissmodel.executor.schemas import ExperimentRecord, InlineJobRequest, JobRequest - -CONFIGS_PATH = Path("/configs") - - -# ── Registry helpers ────────────────────────────────────────────────────────── - -def _git_head() -> str: - """Return current HEAD hash of the configs repo, or 'local-dev' if git is missing.""" - try: - if not (CONFIGS_PATH / ".git").exists(): - return "local-dev" - return subprocess.check_output( - ["git", "-C", str(CONFIGS_PATH), "rev-parse", "--short", "HEAD"], - stderr=subprocess.DEVNULL, - ).decode().strip() - except (subprocess.CalledProcessError, FileNotFoundError): - return "unknown" - - -def _code_version() -> str: - """Return installed dissmodel version, or 'dev' if not installed.""" - try: - return importlib.metadata.version("dissmodel") - except importlib.metadata.PackageNotFoundError: - return "dev" - - -def _load_spec(model_name: str) -> dict: - """ - Load and return the TOML spec for a registered model. - Raises FileNotFoundError if the model is not in the registry. - """ - from worker.api_registry import load_model_spec - return load_model_spec(model_name) - - -def _merge_parameters(resolved_spec: dict, overrides: dict) -> dict: - """ - Merge TOML defaults with per-request overrides. - Request parameters always win. - """ - defaults = resolved_spec.get("model", {}).get("parameters", {}) - return {**defaults, **overrides} - - -def _infer_source_type(uri: str) -> str: - """Infer DataSource.type from URI scheme.""" - if uri.startswith("s3://"): - return "s3" - if uri.startswith("http://") or uri.startswith("https://"): - return "http" - return "local" - - -# ── Package installation ────────────────────────────────────────────────────── - -def _ensure_package(spec: dict) -> None: - """ - Install the model package declared in the spec before resolving executor. - - Supports three URI schemes: - PyPI: "coastal-dynamics==0.1.0" - GitHub: "git+https://github.com/org/repo@branch" - Local: "/opt/coastal-dynamics" (editable install — development only) - - No-op if 'package' field is absent from the spec. - """ - package = spec.get("model", {}).get("package") - if not package: - return - - if package.startswith("/"): - # Local volume — editable install for development - cmd = [sys.executable, "-m", "pip", "install", - "-e", package, "--quiet"] - else: - # PyPI or GitHub - cmd = [sys.executable, "-m", "pip", "install", - package, "--quiet", "--no-cache-dir"] - - try: - subprocess.check_call(cmd) - except subprocess.CalledProcessError as exc: - raise RuntimeError( - f"Failed to install package '{package}': {exc}\n" - f"Check the 'package' field in the model TOML." - ) from exc - - -def _import_package(spec: dict) -> None: - package = spec.get("model", {}).get("package", "") - if not package: - return - - pkg_name = ( - package - .split("==")[0] - .split("@")[0] - .rstrip("/") - .split("/")[-1] - .replace("-", "_") - ) - - import logging - logger = logging.getLogger(__name__) - - for module in [pkg_name, f"{pkg_name}.executor"]: - try: - importlib.import_module(module) - logger.info(f"Imported: {module}") - except Exception as exc: # ← era ImportError, agora captura tudo - logger.error(f"Failed to import {module}: {exc}") # ← loga o erro - - -# ── Record factory ──────────────────────────────────────────────────────────── - -def build_record(req: JobRequest) -> ExperimentRecord: - """ - Build an ExperimentRecord from a JobRequest. - Snapshots the model spec at submission time — immutable from here on. - """ - spec = _load_spec(req.model_name) - - record = ExperimentRecord( - model_name = req.model_name, - model_commit = _git_head(), - code_version = _code_version(), - resolved_spec = spec, - input_format = req.input_format, - column_map = req.column_map, - band_map = req.band_map, - parameters = _merge_parameters(spec, req.parameters), - ) - - record.source.uri = req.input_dataset - record.source.type = _infer_source_type(req.input_dataset) - - record.add_log(f"Record created — model={req.model_name} commit={record.model_commit}") - return record - - -def build_record_inline(req: InlineJobRequest) -> ExperimentRecord: - """ - Build an ExperimentRecord from an inline TOML spec. - Marks the record as non-reproducible via the registry. - """ - spec = tomllib.loads(req.model_spec_toml) - - record = ExperimentRecord( - model_name = spec.get("model", {}).get("name", "inline"), - model_commit = "local-inline", - code_version = _code_version(), - resolved_spec = spec, - input_format = req.input_format, - column_map = req.column_map, - band_map = req.band_map, - parameters = _merge_parameters(spec, req.parameters), - ) - - record.source.uri = req.input_dataset - record.source.type = _infer_source_type(req.input_dataset) - - record.add_log("Record created from inline spec — not reproducible via registry") - return record - - -# ── Main runner ─────────────────────────────────────────────────────────────── - -def run_experiment(record: ExperimentRecord) -> ExperimentRecord: - """ - Execute a simulation end-to-end. - - Orchestrates the executor lifecycle: - install package → validate → run → save - - Updates record.status throughout. On failure, sets status="failed" - and appends the error to record.logs before re-raising. - """ - try: - record.status = "running" - - # Install and import model package before resolving executor - _ensure_package(record.resolved_spec) - _import_package(record.resolved_spec) - - executor_cls = _resolve_executor(record) - executor = executor_cls() - - record.add_log("Validating spec and input...") - executor.validate(record) - - record.add_log(f"Running executor={executor_cls.name}...") - result = executor.run(record) - - record.add_log("Saving output...") - record = executor.save(result, record) - - record.add_log(f"Completed — output={record.output_path}") - return record - - except Exception as exc: - record.status = "failed" - record.add_log(f"Failed: {exc}") - raise - - -def reproduce_experiment(original: ExperimentRecord) -> ExperimentRecord: - """ - Re-run an experiment from its stored snapshot. - - Uses resolved_spec directly — bypasses the current registry state. - Guarantees the reproduction uses the exact same spec as the original, - even if the TOML has changed since. - """ - record = ExperimentRecord( - model_name = original.model_name, - model_commit = original.model_commit, - code_version = _code_version(), - resolved_spec = original.resolved_spec, - input_format = original.input_format, - column_map = original.column_map, - band_map = original.band_map, - parameters = original.parameters, - source = original.source.model_copy(), - ) - - record.add_log(f"Reproducing experiment={original.experiment_id}") - return run_experiment(record) - - -# ── Internal helpers ────────────────────────────────────────────────────────── - -def _resolve_executor(record: ExperimentRecord): - """ - Look up the executor class from the model spec. - Raises KeyError with a clear message if not registered. - """ - model_class = record.resolved_spec.get("model", {}).get("class") - if not model_class: - raise ValueError( - f"Model spec for '{record.model_name}' is missing 'model.class'. " - f"Check the TOML in dissmodel-configs." - ) - - # Ensure built-in executors are registered - import worker.executors # noqa: F401 - - return ExecutorRegistry.get(model_class) \ No newline at end of file