diff --git a/tests/benchmark/mlebench/Dockerfile b/tests/benchmark/mlebench/Dockerfile new file mode 100644 index 00000000..e6131ae4 --- /dev/null +++ b/tests/benchmark/mlebench/Dockerfile @@ -0,0 +1,23 @@ +FROM mlebench-env + +ARG SUBMISSION_DIR +ENV SUBMISSION_DIR=${SUBMISSION_DIR} + +ARG LOGS_DIR +ENV LOGS_DIR=${LOGS_DIR} + +ARG CODE_DIR +ENV CODE_DIR=${CODE_DIR} + +ARG AGENT_DIR +ENV AGENT_DIR=${AGENT_DIR} + +ARG CONDA_ENV_NAME=agent +ARG PLEXE_PACKAGE="plexe[pyspark,tabular]" + +RUN mkdir -p ${SUBMISSION_DIR} ${LOGS_DIR} ${CODE_DIR} ${AGENT_DIR} + +COPY . ${AGENT_DIR} + +RUN conda run -n ${CONDA_ENV_NAME} pip install --no-cache-dir "${PLEXE_PACKAGE}" && \ + conda clean -afy diff --git a/tests/benchmark/mlebench/README.md b/tests/benchmark/mlebench/README.md new file mode 100644 index 00000000..49cc4281 --- /dev/null +++ b/tests/benchmark/mlebench/README.md @@ -0,0 +1,78 @@ +# Plexe MLE-bench adapter + +This directory contains a minimal MLE-bench agent adapter for running Plexe +against the OpenAI MLE-bench harness. + +It follows the agent contract used by `openai/mle-bench`: + +- `config.yaml` registers the agent id as `plexe`. +- `Dockerfile` builds an agent image from `mlebench-env`. +- `start.sh` is the container entrypoint called by MLE-bench. +- `plexe/run_mlebench.py` reads `/home/data`, runs Plexe, writes + `/home/submission/submission.csv`, and records diagnostics in `/home/logs`. + +## Build inside an MLE-bench checkout + +Copy or symlink this directory into `openai/mle-bench/agents/plexe`, then build: + +```bash +export SUBMISSION_DIR=/home/submission +export LOGS_DIR=/home/logs +export CODE_DIR=/home/code +export AGENT_DIR=/home/agent + +docker build --platform=linux/amd64 -t plexe \ + agents/plexe/ \ + --build-arg SUBMISSION_DIR=$SUBMISSION_DIR \ + --build-arg LOGS_DIR=$LOGS_DIR \ + --build-arg CODE_DIR=$CODE_DIR \ + --build-arg AGENT_DIR=$AGENT_DIR +``` + +## Run a smoke competition + +```bash +python run_agent.py \ + --agent-id plexe \ + --competition-set experiments/splits/spaceship-titanic.txt \ + --n-seeds 1 \ + --n-workers 1 +``` + +Then compile and grade the run group: + +```bash +python experiments/make_submission.py \ + --metadata runs//metadata.json \ + --output runs//submission.jsonl + +mlebench grade \ + --submission runs//submission.jsonl \ + --output-dir runs/ +``` + +## Environment + +The adapter expects the benchmark-provided paths: + +- `DATA_DIR`, default `/home/data` +- `SUBMISSION_DIR`, default `/home/submission` +- `LOGS_DIR`, default `/home/logs` +- `CODE_DIR`, default `/home/code` +- `PLEXE_WORK_DIR`, default `/home/code/plexe-work` +- `PLEXE_MAX_ITERATIONS`, default `10` +- `PLEXE_PROVIDER`, optional provider string for documentation/logging + +The Dockerfile has a `PLEXE_PACKAGE` build arg, defaulting to +`plexe[pyspark,tabular]`. Override it when you need to benchmark a specific +branch or wheel. + +The actual LLM credentials are read by Plexe/LiteLLM from the normal provider +environment variables. + +## Scope + +This adapter does not include benchmark results. It is intentionally structured +so results can be produced separately with the official MLE-bench runner and +submitted without mixing code changes with private API-key or Kaggle execution +state. diff --git a/tests/benchmark/mlebench/config.yaml b/tests/benchmark/mlebench/config.yaml new file mode 100644 index 00000000..f8029894 --- /dev/null +++ b/tests/benchmark/mlebench/config.yaml @@ -0,0 +1,6 @@ +plexe: + start: plexe/start.sh + dockerfile: plexe/Dockerfile + env_vars: + PLEXE_MAX_ITERATIONS: ${PLEXE_MAX_ITERATIONS} + PLEXE_PROVIDER: ${PLEXE_PROVIDER} diff --git a/tests/benchmark/mlebench/plexe/__init__.py b/tests/benchmark/mlebench/plexe/__init__.py new file mode 100644 index 00000000..3c5b33b4 --- /dev/null +++ b/tests/benchmark/mlebench/plexe/__init__.py @@ -0,0 +1 @@ +"""MLE-bench adapter helpers for Plexe.""" diff --git a/tests/benchmark/mlebench/plexe/run_mlebench.py b/tests/benchmark/mlebench/plexe/run_mlebench.py new file mode 100644 index 00000000..151fa633 --- /dev/null +++ b/tests/benchmark/mlebench/plexe/run_mlebench.py @@ -0,0 +1,332 @@ +"""Run Plexe inside the OpenAI MLE-bench agent container.""" + +from __future__ import annotations + +import csv +import importlib.util +import json +import logging +import os +import shutil +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import pandas as pd +import yaml + + +LOG_FORMAT = "%(asctime)s %(levelname)s %(name)s: %(message)s" +SUPPORTED_DATASET_SUFFIXES = (".csv", ".parquet", ".json", ".jsonl", ".tsv") + + +@dataclass(frozen=True) +class MLEBenchPaths: + """Filesystem paths provided by the MLE-bench container.""" + + data_dir: Path + submission_dir: Path + logs_dir: Path + code_dir: Path + work_dir: Path + + +def configure_logging(logs_dir: Path) -> None: + """Configure console and file logging for the adapter.""" + + logs_dir.mkdir(parents=True, exist_ok=True) + handlers: list[logging.Handler] = [ + logging.StreamHandler(sys.stdout), + logging.FileHandler(logs_dir / "plexe_mlebench.log"), + ] + logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, handlers=handlers, force=True) + + +def resolve_paths() -> MLEBenchPaths: + """Resolve MLE-bench container paths from environment variables.""" + + code_dir = Path(os.environ.get("CODE_DIR", "/home/code")) + return MLEBenchPaths( + data_dir=Path(os.environ.get("DATA_DIR", "/home/data")), + submission_dir=Path(os.environ.get("SUBMISSION_DIR", "/home/submission")), + logs_dir=Path(os.environ.get("LOGS_DIR", "/home/logs")), + code_dir=code_dir, + work_dir=Path(os.environ.get("PLEXE_WORK_DIR", str(code_dir / "plexe-work"))), + ) + + +def read_competition_description(data_dir: Path) -> str: + """Read the MLE-bench competition description.""" + + description_path = data_dir / "description.md" + if not description_path.exists(): + return "Build the best possible machine learning model for this competition." + return description_path.read_text(encoding="utf-8") + + +def find_training_dataset(data_dir: Path) -> Path: + """Find the most likely public training dataset in an MLE-bench data directory.""" + + preferred_names = ( + "train.csv", + "training.csv", + "train.parquet", + "training.parquet", + "train.jsonl", + "train.json", + "train.tsv", + ) + for name in preferred_names: + candidate = data_dir / name + if candidate.exists(): + return candidate + + candidates = sorted( + path + for path in data_dir.rglob("*") + if path.is_file() + and path.suffix.lower() in SUPPORTED_DATASET_SUFFIXES + and "sample_submission" not in path.name.lower() + and "submission" not in path.name.lower() + and "test" not in path.name.lower() + ) + if not candidates: + raise FileNotFoundError(f"No supported training dataset found under {data_dir}") + return candidates[0] + + +def find_test_dataset(data_dir: Path) -> Path: + """Find the most likely public test dataset in an MLE-bench data directory.""" + + preferred_names = ("test.csv", "test.parquet", "test.jsonl", "test.json", "test.tsv") + for name in preferred_names: + candidate = data_dir / name + if candidate.exists(): + return candidate + + candidates = sorted( + path + for path in data_dir.rglob("*") + if path.is_file() + and path.suffix.lower() in SUPPORTED_DATASET_SUFFIXES + and "sample_submission" not in path.name.lower() + and "submission" not in path.name.lower() + and "test" in path.name.lower() + ) + if not candidates: + raise FileNotFoundError(f"No supported test dataset found under {data_dir}") + return candidates[0] + + +def find_sample_submission(data_dir: Path) -> Path | None: + """Return a sample submission file when the competition provides one.""" + + candidates = sorted( + path + for path in data_dir.rglob("*") + if path.is_file() + and path.suffix.lower() == ".csv" + and ("sample_submission" in path.name.lower() or path.name.lower() == "submission.csv") + ) + return candidates[0] if candidates else None + + +def infer_id_column(sample_submission: Path | None, test_dataset: Path) -> str | None: + """Infer the submission id column from sample submission or test data.""" + + if sample_submission is not None: + columns = list(pd.read_csv(sample_submission, nrows=0).columns) + if columns: + return columns[0] + + test_columns = list(read_tabular_sample(test_dataset, n_rows=1).columns) + for candidate in ("id", "Id", "ID"): + if candidate in test_columns: + return candidate + return test_columns[0] if test_columns else None + + +def read_tabular_sample(path: Path, n_rows: int | None = None) -> pd.DataFrame: + """Read a small tabular sample for submission shaping.""" + + suffix = path.suffix.lower() + if suffix == ".csv": + return pd.read_csv(path, nrows=n_rows) + if suffix == ".tsv": + return pd.read_csv(path, sep="\t", nrows=n_rows) + if suffix == ".parquet": + df = pd.read_parquet(path) + return df.head(n_rows) if n_rows is not None else df + if suffix in {".json", ".jsonl"}: + return pd.read_json(path, lines=suffix == ".jsonl").head(n_rows) + raise ValueError(f"Unsupported dataset suffix: {path.suffix}") + + +def load_predictor(package_dir: Path) -> Any: + """Load Plexe's packaged predictor object from a completed model package.""" + + model_yaml = package_dir / "model.yaml" + predictor_file = package_dir / "predictor.py" + if not model_yaml.exists() or not predictor_file.exists(): + raise FileNotFoundError(f"Missing packaged predictor files in {package_dir}") + + model_metadata = yaml.safe_load(model_yaml.read_text(encoding="utf-8")) or {} + model_type = model_metadata.get("model_type") + class_map = { + "xgboost": "XGBoostPredictor", + "catboost": "CatBoostPredictor", + "lightgbm": "LightGBMPredictor", + "keras": "KerasPredictor", + "pytorch": "PyTorchPredictor", + } + if model_type not in class_map: + expected = ", ".join(sorted(class_map)) + raise ValueError(f"Unknown or missing model_type {model_type!r} in {model_yaml}; expected one of: {expected}") + class_name = class_map[model_type] + + spec = importlib.util.spec_from_file_location("plexe_mlebench_predictor", predictor_file) + if spec is None or spec.loader is None: + raise ImportError(f"Could not import predictor from {predictor_file}") + module = importlib.util.module_from_spec(spec) + sys.modules["plexe_mlebench_predictor"] = module + spec.loader.exec_module(module) + predictor_cls = getattr(module, class_name) + return predictor_cls(model_dir=str(package_dir)) + + +def coerce_predictions_to_submission( + predictions: pd.DataFrame | pd.Series | list[Any], + test_df: pd.DataFrame, + sample_submission: Path | None, + id_column: str | None, +) -> pd.DataFrame: + """Shape Plexe predictions into the CSV columns expected by MLE-bench.""" + + if isinstance(predictions, pd.Series): + prediction_df = predictions.to_frame(name="prediction") + elif isinstance(predictions, pd.DataFrame): + prediction_df = predictions.copy() + else: + prediction_df = pd.DataFrame({"prediction": predictions}) + + if sample_submission is not None: + sample_df = pd.read_csv(sample_submission) + output_columns = list(sample_df.columns) + submission = pd.DataFrame(index=range(len(test_df)), columns=output_columns) + if output_columns and id_column and id_column in test_df.columns: + submission[output_columns[0]] = test_df[id_column].values + elif output_columns: + submission[output_columns[0]] = sample_df.iloc[: len(test_df), 0].values + + prediction_columns = output_columns[1:] or ["prediction"] + for column in prediction_columns: + source_column = column if column in prediction_df.columns else prediction_df.columns[0] + submission[column] = prediction_df[source_column].values[: len(test_df)] + return submission + + if id_column and id_column in test_df.columns: + return pd.DataFrame({id_column: test_df[id_column].values, "prediction": prediction_df.iloc[:, 0].values}) + return pd.DataFrame({"prediction": prediction_df.iloc[:, 0].values}) + + +def copy_existing_submission(work_dir: Path, submission_path: Path) -> bool: + """Copy an existing submission.csv from Plexe artifacts if one exists.""" + + candidates = sorted(work_dir.rglob("submission.csv")) + for candidate in candidates: + if candidate.resolve() != submission_path.resolve(): + shutil.copy2(candidate, submission_path) + return True + return False + + +def write_submission_from_package( + work_dir: Path, + test_dataset: Path, + sample_submission: Path | None, + submission_path: Path, +) -> None: + """Create `submission.csv` by running the packaged Plexe predictor on test rows.""" + + package_dir = work_dir / "model" + predictor = load_predictor(package_dir) + test_df = read_tabular_sample(test_dataset) + id_column = infer_id_column(sample_submission, test_dataset) + feature_df = test_df.drop(columns=[id_column], errors="ignore") if id_column else test_df + predictions = predictor.predict(feature_df) + submission = coerce_predictions_to_submission(predictions, test_df, sample_submission, id_column) + submission.to_csv(submission_path, index=False, quoting=csv.QUOTE_MINIMAL) + + +def write_metadata(paths: MLEBenchPaths, metadata: dict[str, Any]) -> None: + """Persist adapter metadata for debugging and benchmark result review.""" + + metadata_path = paths.logs_dir / "plexe_mlebench_metadata.json" + metadata_path.write_text(json.dumps(metadata, indent=2, sort_keys=True), encoding="utf-8") + + +def run_plexe(paths: MLEBenchPaths, train_dataset: Path, description: str) -> None: + """Run Plexe on the selected training dataset.""" + + from plexe.main import main as plexe_main + + max_iterations = int(os.environ.get("PLEXE_MAX_ITERATIONS", "10")) + provider = os.environ.get("PLEXE_PROVIDER") + + logging.info( + "Running Plexe: train_dataset=%s max_iterations=%s provider=%s", train_dataset, max_iterations, provider + ) + plexe_main( + intent=description, + train_dataset_uri=str(train_dataset), + work_dir=paths.work_dir, + user_id="mlebench", + experiment_id=os.environ.get("COMPETITION_ID", "mlebench"), + max_iterations=max_iterations, + enable_final_evaluation=False, + ) + + +def main() -> None: + """Execute the Plexe MLE-bench adapter.""" + + paths = resolve_paths() + configure_logging(paths.logs_dir) + paths.submission_dir.mkdir(parents=True, exist_ok=True) + paths.code_dir.mkdir(parents=True, exist_ok=True) + paths.work_dir.mkdir(parents=True, exist_ok=True) + + submission_path = paths.submission_dir / "submission.csv" + train_dataset = find_training_dataset(paths.data_dir) + test_dataset = find_test_dataset(paths.data_dir) + sample_submission = find_sample_submission(paths.data_dir) + description = read_competition_description(paths.data_dir) + + metadata: dict[str, Any] = { + "competition_id": os.environ.get("COMPETITION_ID"), + "train_dataset": str(train_dataset), + "test_dataset": str(test_dataset), + "sample_submission": str(sample_submission) if sample_submission else None, + "work_dir": str(paths.work_dir), + } + + try: + run_plexe(paths, train_dataset, description) + if not copy_existing_submission(paths.work_dir, submission_path): + write_submission_from_package(paths.work_dir, test_dataset, sample_submission, submission_path) + metadata["submission_created"] = submission_path.exists() + metadata["status"] = "ok" + except Exception as exc: + metadata["status"] = "failed" + metadata["error"] = repr(exc) + logging.exception("Plexe MLE-bench run failed") + raise + finally: + write_metadata(paths, metadata) + + logging.info("Wrote MLE-bench submission to %s", submission_path) + + +if __name__ == "__main__": + main() diff --git a/tests/benchmark/mlebench/start.sh b/tests/benchmark/mlebench/start.sh new file mode 100755 index 00000000..6a9757d0 --- /dev/null +++ b/tests/benchmark/mlebench/start.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +set -euo pipefail + +eval "$(conda shell.bash hook)" +conda activate agent + +python "${AGENT_DIR}/plexe/run_mlebench.py" + +bash /home/validate_submission.sh "${SUBMISSION_DIR}/submission.csv" diff --git a/tests/unit/benchmark/test_mlebench_adapter.py b/tests/unit/benchmark/test_mlebench_adapter.py new file mode 100644 index 00000000..94ede8bc --- /dev/null +++ b/tests/unit/benchmark/test_mlebench_adapter.py @@ -0,0 +1,102 @@ +"""Tests for the Plexe MLE-bench adapter.""" + +from __future__ import annotations + +import importlib.util +import sys +from pathlib import Path + +import pandas as pd +import pytest + + +ADAPTER_PATH = Path(__file__).parents[2] / "benchmark" / "mlebench" / "plexe" / "run_mlebench.py" +ENTRYPOINT_PATH = Path(__file__).parents[2] / "benchmark" / "mlebench" / "start.sh" + + +def _load_adapter(): + spec = importlib.util.spec_from_file_location("plexe_mlebench_adapter", ADAPTER_PATH) + assert spec is not None and spec.loader is not None + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + + +def test_find_training_dataset_prefers_train_csv(tmp_path): + adapter = _load_adapter() + (tmp_path / "test.csv").write_text("id,x\n1,a\n", encoding="utf-8") + (tmp_path / "train.csv").write_text("id,y\n1,0\n", encoding="utf-8") + (tmp_path / "sample_submission.csv").write_text("id,target\n1,0\n", encoding="utf-8") + + assert adapter.find_training_dataset(tmp_path) == tmp_path / "train.csv" + + +def test_find_training_dataset_ignores_submission_and_test_files(tmp_path): + adapter = _load_adapter() + nested = tmp_path / "nested" + nested.mkdir() + (tmp_path / "sample_submission.csv").write_text("id,target\n1,0\n", encoding="utf-8") + (tmp_path / "test.csv").write_text("id,x\n1,a\n", encoding="utf-8") + (nested / "fold_train.parquet").write_text("not real parquet", encoding="utf-8") + + assert adapter.find_training_dataset(tmp_path) == nested / "fold_train.parquet" + + +def test_coerce_predictions_matches_sample_submission_columns(tmp_path): + adapter = _load_adapter() + sample_submission = tmp_path / "sample_submission.csv" + sample_submission.write_text("PassengerId,Transported\n0001_01,False\n0002_01,False\n", encoding="utf-8") + test_df = pd.DataFrame({"PassengerId": ["0001_01", "0002_01"], "Cabin": ["A/1/S", "B/2/P"]}) + predictions = pd.DataFrame({"prediction": [True, False]}) + + submission = adapter.coerce_predictions_to_submission( + predictions=predictions, + test_df=test_df, + sample_submission=sample_submission, + id_column="PassengerId", + ) + + assert list(submission.columns) == ["PassengerId", "Transported"] + assert submission["PassengerId"].tolist() == ["0001_01", "0002_01"] + assert submission["Transported"].tolist() == [True, False] + + +def test_copy_existing_submission_ignores_destination(tmp_path): + adapter = _load_adapter() + work_dir = tmp_path / "work" + submission_dir = tmp_path / "submission" + work_dir.mkdir() + submission_dir.mkdir() + existing = work_dir / "nested" / "submission.csv" + existing.parent.mkdir() + existing.write_text("id,prediction\n1,0\n", encoding="utf-8") + destination = submission_dir / "submission.csv" + + assert adapter.copy_existing_submission(work_dir, destination) is True + assert destination.read_text(encoding="utf-8") == "id,prediction\n1,0\n" + + +def test_entrypoint_targets_packaged_runner_location(): + entrypoint = ENTRYPOINT_PATH.read_text(encoding="utf-8") + + assert 'python "${AGENT_DIR}/plexe/run_mlebench.py"' in entrypoint + assert "${AGENT_DIR}/run_mlebench.py" not in entrypoint + + +def test_load_predictor_rejects_missing_model_type(tmp_path): + adapter = _load_adapter() + package_dir = tmp_path / "packaged_model" + package_dir.mkdir() + (package_dir / "model.yaml").write_text("target: label\n", encoding="utf-8") + predictor_source = "\n".join( + [ + "class XGBoostPredictor:", + " def __init__(self, model_dir):", + " self.model_dir = model_dir", + ] + ) + (package_dir / "predictor.py").write_text(predictor_source, encoding="utf-8") + + with pytest.raises(ValueError, match="Unknown or missing model_type"): + adapter.load_predictor(package_dir)