diff --git a/BENCHMARKS.md b/BENCHMARKS.md index b3483467..4d7763b9 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -66,6 +66,20 @@ CONVERSE evaluates contextual safety in agent-to-agent conversations. It focuses --- +## 6. ColBench + +ColBench (Collaborative Agent Bench) evaluates LLM agents on multi-turn collaborative artifact creation. An agent interacts with a simulated human collaborator over multiple dialogue turns to gather requirements, ask clarifying questions, and produce a final artifact (Python code) that satisfies hidden constraints, testing an agent's ability to perform effective information gathering and reasoning across turns. + +### Source and License + +- **Original Repository:** [https://github.com/facebookresearch/sweet_rl](https://github.com/facebookresearch/sweet_rl) +- **Paper:** [SWEET-RL: Training Multi-Turn LLM Agents on Collaborative Reasoning Tasks](https://arxiv.org/abs/2503.15478) +- **Dataset:** [facebook/collaborative_agent_bench](https://huggingface.co/datasets/facebook/collaborative_agent_bench) +- **Code License:** MIT (as provided by the upstream repository) +- **Data License:** CC-BY-NC; data is an output of Llama 3.1 and subject to the [Llama 3.1 license](https://huggingface.co/meta-llama/Llama-3.1-8B/blob/main/LICENSE) + +--- + ## 6. [Name of Next Benchmark] (Description for the next benchmark...) diff --git a/examples/colbench_benchmark/README.md b/examples/colbench_benchmark/README.md new file mode 100644 index 00000000..05d9c3d6 --- /dev/null +++ b/examples/colbench_benchmark/README.md @@ -0,0 +1,232 @@ +# ColBench: Collaborative Agent Benchmark + +ColBench evaluates LLM agents on **collaborative backend-programming tasks** where the agent must interact with a simulated human user to gather requirements before producing a solution. It tests an agent's ability to ask clarifying questions, extract hidden constraints from user responses, and generate correct Python code, all within a limited number of dialogue turns. + +Originally introduced in [Collaborative Agent Bench (sweet_rl)](https://github.com/facebookresearch/collaborative-llm-agent), this integration adapts ColBench to the MASEval framework. + +## How It Works + +Each task follows a multi-turn dialogue loop between the **agent** (LLM under test) and a **human simulator** (another LLM acting as the user): + +``` +┌─────────────────────────────────────────────────────┐ +│ ColBench Task Loop │ +├─────────────────────────────────────────────────────┤ +│ │ +│ 1. User presents a problem description │ +│ "Write a function that calculates tip totals" │ +│ │ +│ 2. Agent asks clarifying questions │ +│ "What format is the input? Are tips per-person?"│ +│ │ +│ 3. Simulator responds with hidden information │ +│ "Tips are a dict mapping provider to amount..." │ +│ │ +│ 4. Repeat for up to max_steps turns │ +│ │ +│ 5. Agent signals: "I WANT TO ANSWER:" + code │ +│ │ +│ 6. Code is evaluated against hidden unit tests │ +│ │ +└─────────────────────────────────────────────────────┘ +``` + +The human simulator has access to **hidden information** (ground-truth code and constraints) that the agent must extract through dialogue. The agent signals completion by prefixing its response with `"I WANT TO ANSWER:"` followed by the Python code. + +## Architecture + +ColBench maps to MASEval's component model: + +| MASEval Component | ColBench Implementation | Role | +|---|---|---| +| `Benchmark` | `ColBenchBenchmark` | Orchestrates the task loop | +| `User` | `ColBenchUser` | Human simulator (LLM-backed) | +| `AgentAdapter` | `ColBenchAgentAdapter` | Agent under test | +| `Environment` | `ColBenchEnvironment` | Holds task artifacts | +| `Evaluator` | `ColBenchCodeEvaluator` | Unit-test scoring | +| `ModelAdapter` | `OpenAIModelAdapter` | OpenAI-compatible API client | + +## Quick Start + +### 1. Start a vLLM Server + +ColBench uses OpenAI-compatible API servers for both the agent and the human simulator. Start a vLLM server: + +```bash +vllm serve \ + --model meta-llama/Llama-3.1-8B-Instruct \ + --port 8001 \ + --tensor-parallel-size 1 \ + --gpu_memory_utilization 0.9 \ + --dtype bfloat16 \ + --max_model_len 23000 +``` + +For production runs, use a larger model (e.g., `Llama-3.1-70B-Instruct`) for the human simulator and a separate server. + +### 2. Run the Benchmark + +```bash +python examples/colbench_benchmark/colbench.py \ + --agent_model meta-llama/Llama-3.1-8B-Instruct \ + --hostname localhost \ + --port 8001 \ + --env_model meta-llama/Llama-3.1-8B-Instruct \ + --input_path examples/colbench_benchmark/results/test.jsonl \ + --output_path examples/colbench_benchmark/results/temp_test.jsonl \ + --num_tasks 1000 +``` + +### 3. Evaluate Existing Trajectories + +To re-evaluate previously saved trajectories without re-running interactions: + +```bash +python examples/colbench_benchmark/colbench.py \ + --evaluate_only \ + --output_path examples/colbench_benchmark/results/temp_test.jsonl +``` + +## CLI Reference + +| Argument | Default | Description | +|---|---|---| +| `--input_path` | *(required)* | Path to JSONL task file | +| `--output_path` | `colbench_results.jsonl` | Path to save trajectory results | +| `--num_tasks` | `1000` | Number of tasks to run | +| `--agent_model` | `Llama-3.1-8B-Instruct` | Model ID for the agent under test | +| `--env_model` | `auto` | Model ID for the human simulator (`auto` → `Llama-3.1-70B-Instruct`) | +| `--hostname` | `localhost` | Hostname of the vLLM server (human simulator) | +| `--port` | `8000` | Port of the vLLM server (human simulator) | +| `--agent_hostname` | same as `--hostname` | Hostname for the agent vLLM server | +| `--agent_port` | same as `--port` | Port for the agent vLLM server | +| `--max_steps` | `10` | Maximum dialogue turns per task | +| `--best_of_n` | `1` | Independent runs per task | +| `--temperature` | `1.0` | Agent sampling temperature | +| `--num_workers` | `1` | Parallel task workers | +| `--user_prompt_path` | *(built-in)* | Custom human simulator prompt file | +| `--agent_prompt_path` | *(built-in)* | Custom agent system prompt file | +| `--evaluate_only` | `False` | Skip interaction; evaluate existing output file | +| `--debug` | `False` | Fail fast on any error | + +## Multi-Server Setup + +For best results, use separate models for the agent and human simulator: + +```bash +# Terminal 1: Human simulator (larger model) +vllm serve \ + --model meta-llama/Llama-3.1-70B-Instruct \ + --port 8000 \ + --tensor-parallel-size 8 + +# Terminal 2: Agent under test +vllm serve \ + --model meta-llama/Llama-3.1-8B-Instruct \ + --port 8001 \ + --tensor-parallel-size 1 + +# Terminal 3: Run benchmark +python examples/colbench_benchmark/colbench.py \ + --agent_model meta-llama/Llama-3.1-8B-Instruct \ + --env_model meta-llama/Llama-3.1-70B-Instruct \ + --hostname localhost --port 8000 \ + --agent_hostname localhost --agent_port 8001 \ + --input_path examples/colbench_benchmark/results/test.jsonl \ + --output_path examples/colbench_benchmark/results/results.jsonl +``` + +## Task Format + +Input tasks are stored in JSONL format. Each line contains: + +```json +{ + "problem_description": "Write a Python function that ...", + "ground_truth": "def my_function(x):\n return x * 2", + "test_cases": { + "test1": "my_function(1)", + "test2": "my_function(5)" + } +} +``` + +| Field | Description | +|---|---| +| `problem_description` | The task shown to the agent as the opening message | +| `ground_truth` | Reference Python code (hidden from agent, visible to simulator) | +| `test_cases` | Dict of Python expressions used to evaluate correctness | + +## Output Format + +Trajectories are saved in JSONL format, backward-compatible with the original sweet_rl evaluation scripts: + +```json +{ + "task": { + "problem_description": "Write a Python function ...", + "ground_truth": "def my_function(x): ...", + "test_cases": {"test1": "my_function(1)", ...} + }, + "dialogue_history": [ + {"role": "user", "content": "Write a Python function ..."}, + {"role": "assistant", "content": "Can you clarify ..."}, + {"role": "user", "content": "The input is a list ..."}, + {"role": "assistant", "content": "I WANT TO ANSWER:\ndef my_function(...): ..."} + ], + "answer": "\ndef my_function(...): ...", + "reward": 0.8 +} +``` + +## Evaluation + +The evaluator runs each hidden test case against both the ground-truth and agent-generated code, comparing outputs for equality: + +- **`correctness`**: Fraction of test cases passed (0.0–1.0) +- **`success`**: Boolean — `True` only if all tests pass +- **`num_tests`**: Total number of test cases +- **`num_passed`**: Number of tests matching ground truth + +Safety measures include blocked patterns (`import os`, `open(`, `exit(`, etc.) and 1-second execution timeouts. + +## Programmatic Usage + +```python +from openai import OpenAI +from maseval.benchmark.colbench import ColBenchBenchmark, OpenAIModelAdapter + +client = OpenAI(base_url="http://localhost:8001/v1", api_key="EMPTY") + +def model_factory(model_id, **kwargs): + return OpenAIModelAdapter(client, model_id=model_id) + +tasks = ColBenchBenchmark.load_tasks("examples/colbench_benchmark/results/test.jsonl", num_tasks=100) + +benchmark = ColBenchBenchmark( + model_factory=model_factory, + human_simulator_model_id="meta-llama/Llama-3.1-8B-Instruct", + agent_model_id="meta-llama/Llama-3.1-8B-Instruct", + max_steps=10, +) + +reports = benchmark.run( + tasks=tasks, + agent_data={"model": "meta-llama/Llama-3.1-8B-Instruct"}, +) + +# Access results +for report in reports: + eval_result = report["eval"][0] + print(f"Task {report['task_id']}: correctness={eval_result['correctness']:.1%}") +``` + +## Citation + +```bibtex +@article{zhou2025colbench, + title={Sweet-RL: Training Multi-Turn LLM Agents on Collaborative Tasks with Reinforcement Learning}, + author={Zhou, Yifei and Yan, An and Jansen, Peter and Peng, Hao and Choi, Yejin}, + year={2025} +} +``` \ No newline at end of file diff --git a/examples/colbench_benchmark/__init__.py b/examples/colbench_benchmark/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/colbench_benchmark/colbench.py b/examples/colbench_benchmark/colbench.py new file mode 100644 index 00000000..a4b72e42 --- /dev/null +++ b/examples/colbench_benchmark/colbench.py @@ -0,0 +1,306 @@ +#!/usr/bin/env python3 +""" +ColBench runner — drop-in replacement for the original two-step workflow. + + python -m maseval.benchmark.colbench.run \\ + --agent_model meta-llama/Llama-3.1-8B-Instruct \\ + --hostname localhost --port 8001 \\ + --env_model meta-llama/Llama-3.1-8B-Instruct \\ + --input_path examples/colbench_benchmark/results/test.jsonl \\ + --output_path examples/colbench_benchmark/results/temp_test.jsonl + + # Evaluate-only: + python -m maseval.benchmark.colbench.run \\ + --evaluate_only \\ + --output_path examples/colbench_benchmark/results/temp_test.jsonl +""" + +from __future__ import annotations + +import argparse +import json +import logging +from pathlib import Path +from typing import Any, Dict, List, Optional + +import numpy as np + +logger = logging.getLogger(__name__) + + +def build_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser( + description="ColBench benchmark runner (MASEval integration)", + ) + + # Task input + p.add_argument("--input_path", type=str, required=False) + p.add_argument("--output_path", type=str, default="colbench_results.jsonl") + p.add_argument("--num_tasks", type=int, default=1000) + + # Models + p.add_argument("--agent_model", type=str, default="meta-llama/Llama-3.1-8B-Instruct") + p.add_argument("--env_model", type=str, default="auto") + + # Server + p.add_argument("--hostname", type=str, default="localhost") + p.add_argument("--port", type=int, default=8000) + p.add_argument("--agent_hostname", type=str, default=None) + p.add_argument("--agent_port", type=int, default=None) + + # Benchmark params + p.add_argument("--task_type", type=str, default="code", choices=["code"]) + p.add_argument("--max_steps", type=int, default=100) + p.add_argument("--best_of_n", type=int, default=1) + p.add_argument("--temperature", type=float, default=1.0) + p.add_argument("--num_workers", type=int, default=1) + + # Prompts + p.add_argument("--user_prompt_path", type=str, default=None) + p.add_argument("--agent_prompt_path", type=str, default=None) + + # Modes + p.add_argument("--evaluate_only", action="store_true") + p.add_argument( + "--debug", action="store_true", + help="Fail fast on any error (sets fail_on_*_error=True)", + ) + + return p + + +def resolve_env_model(env_model: str, task_type: str) -> str: + if env_model != "auto": + return env_model + if task_type == "html": + return "Qwen/Qwen2-VL-72B-Instruct" + return "meta-llama/Llama-3.1-70B-Instruct" + + +def load_prompt(path: Optional[str]) -> Optional[str]: + if path is None: + return None + with open(path) as f: + return f.read() + + +def evaluate_only(output_path: str, k: int = 1) -> None: + """Evaluate existing trajectories (replaces evaluate_code.py).""" + from maseval.benchmark.colbench.evaluator import check_correctness + + with open(output_path) as f: + trajectories = [json.loads(line) for line in f] + + print(f"Number of trajectories: {len(trajectories)}") + + all_correctness = [] + for traj in trajectories: + gt = traj["task"]["ground_truth"] + answer = traj.get("answer", "No answer") + test_cases = traj["task"]["test_cases"] + correctness = check_correctness(gt, answer, test_cases) + traj["reward"] = correctness + all_correctness.append(correctness) + + raw = np.array(all_correctness).reshape(k, -1) + best = np.max(raw, axis=0) + + print(f"Average correctness: {np.mean(all_correctness):.4f}") + print(f"Success rate: {np.mean(np.array(all_correctness) == 1.0):.4f}") + print(f"Best-of-{k} average correctness: {np.mean(best):.4f}") + print(f"Best-of-{k} success rate: {np.mean(best == 1.0):.4f}") + + with open(output_path, "w") as f: + for traj in trajectories: + f.write(json.dumps(traj) + "\n") + print(f"Saved to {output_path}") + + +def run_benchmark(args: argparse.Namespace) -> None: + from openai import OpenAI + + from maseval.benchmark.colbench.colbench import ColBenchBenchmark + from maseval.benchmark.colbench.openai_model_adapter import OpenAIModelAdapter + + env_model_id = resolve_env_model(args.env_model, args.task_type) + agent_hostname = args.agent_hostname or args.hostname + agent_port = args.agent_port or args.port + + # Create OpenAI clients + env_client = OpenAI( + base_url=f"http://{args.hostname}:{args.port}/v1", + api_key="EMPTY", + ) + if agent_hostname == args.hostname and agent_port == args.port: + agent_client = env_client + else: + agent_client = OpenAI( + base_url=f"http://{agent_hostname}:{agent_port}/v1", + api_key="EMPTY", + ) + + # Model factory — routes model_id to the correct client + def model_factory(model_id: str, **kwargs) -> OpenAIModelAdapter: + if model_id == env_model_id: + return OpenAIModelAdapter( + env_client, model_id=model_id, + default_temperature=0.0, + ) + else: + return OpenAIModelAdapter( + agent_client, model_id=model_id, + default_temperature=args.temperature, + default_max_tokens=1024, + ) + + human_prompt = load_prompt(args.user_prompt_path) + agent_prompt = load_prompt(args.agent_prompt_path) + + # Fail fast by default so errors surface immediately + benchmark = ColBenchBenchmark( + model_factory=model_factory, + human_simulator_model_id=env_model_id, + agent_model_id=args.agent_model, + human_prompt=human_prompt, + agent_prompt=agent_prompt, + max_steps=args.max_steps, + agent_temperature=args.temperature, + n_task_repeats=args.best_of_n, + num_workers=args.num_workers, + fail_on_setup_error=True, + fail_on_task_error=True, + fail_on_evaluation_error=True, + ) + + # ── Load tasks ─────────────────────────────────────────────────────── + tasks = ColBenchBenchmark.load_tasks(args.input_path, num_tasks=args.num_tasks) + + # Also load raw task data for saving trajectories later + # (SequentialTaskQueue is consumed by benchmark.run()) + with open(args.input_path) as f: + raw_tasks = [json.loads(line) for i, line in enumerate(f) + if args.num_tasks is None or i < args.num_tasks] + + print(f"Loaded {len(raw_tasks)} tasks from {args.input_path}") + + agent_data = { + "model": args.agent_model, + "env_model": env_model_id, + } + + print(f"Running ColBench benchmark:") + print(f" Agent model: {args.agent_model}") + print(f" Simulator model: {env_model_id}") + print(f" Agent server: http://{agent_hostname}:{agent_port}/v1") + print(f" Env server: http://{args.hostname}:{args.port}/v1") + print(f" Max steps: {args.max_steps}") + print(f" Best-of-n: {args.best_of_n}") + print(f" Num workers: {args.num_workers}") + print() + + reports = benchmark.run(tasks=tasks, agent_data=agent_data) + + # ── Save results in original trajectory format ─────────────────────── + trajectories = _reports_to_trajectories(reports, raw_tasks) + output_path = Path(args.output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w") as f: + for traj in trajectories: + f.write(json.dumps(traj) + "\n") + + _print_summary(reports, args.best_of_n) + print(f"\nTrajectories saved to {output_path}") + + +def _reports_to_trajectories( + reports: List[Dict[str, Any]], + raw_tasks: List[Dict[str, Any]], +) -> List[Dict[str, Any]]: + """Convert MASEval reports to original sweet_rl trajectory format. + + Reports come in the same order as tasks (sequential execution), + so we zip them directly with the raw task dicts from the JSONL. + """ + trajectories = [] + for i, report in enumerate(reports): + # Get original task data by index + raw_task = raw_tasks[i % len(raw_tasks)] if raw_tasks else {} + + # Extract answer from user traces + user_traces = report.get("traces", {}).get("user", {}) + answer = "No answer" + if isinstance(user_traces, dict): + answer = user_traces.get("answer", "No answer") or "No answer" + + # Extract dialogue from user traces + dialogue_history = [] + if isinstance(user_traces, dict): + dialogue_history = user_traces.get("messages", []) + + # Build trajectory in original format + traj: Dict[str, Any] = { + "task": { + "problem_description": raw_task.get("problem_description", ""), + "ground_truth": raw_task.get("ground_truth", ""), + "test_cases": raw_task.get("test_cases", {}), + }, + "dialogue_history": dialogue_history, + "answer": answer, + } + + # Add evaluation results if available + eval_results = report.get("eval") + if eval_results and len(eval_results) > 0: + traj["reward"] = eval_results[0].get("correctness", 0.0) + + trajectories.append(traj) + + return trajectories + + +def _print_summary(reports: List[Dict[str, Any]], k: int = 1) -> None: + eval_results = [] + for report in reports: + evals = report.get("eval") or [{}] + correctness = evals[0].get("correctness", 0.0) if evals else 0.0 + eval_results.append(correctness) + + if not eval_results: + print("No results to report.") + return + + arr = np.array(eval_results) + print(f"\n{'='*60}") + print(f"ColBench Evaluation Results") + print(f"{'='*60}") + print(f"Number of trajectories: {len(arr)}") + print(f"Average correctness: {np.mean(arr):.4f}") + print(f"Success rate: {np.mean(arr == 1.0):.4f}") + + if k > 1: + num_tasks = len(arr) // k + raw = arr.reshape(k, num_tasks) + best = np.max(raw, axis=0) + print(f"Best-of-{k} average correctness: {np.mean(best):.4f}") + print(f"Best-of-{k} success rate: {np.mean(best == 1.0):.4f}") + print(f"{'='*60}") + + +def main(): + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(name)s %(levelname)s %(message)s", + ) + parser = build_parser() + args = parser.parse_args() + + if args.evaluate_only: + evaluate_only(args.output_path, k=args.best_of_n) + else: + if args.input_path is None: + parser.error("--input_path is required for interaction mode") + run_benchmark(args) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/colbench_benchmark/results/.gitignore b/examples/colbench_benchmark/results/.gitignore new file mode 100644 index 00000000..88985be8 --- /dev/null +++ b/examples/colbench_benchmark/results/.gitignore @@ -0,0 +1,6 @@ +# Ignore all result files +*.jsonl +*.json + +# But keep this .gitignore +!.gitignore diff --git a/maseval/benchmark/colbench/__init__.py b/maseval/benchmark/colbench/__init__.py new file mode 100644 index 00000000..2191fa0d --- /dev/null +++ b/maseval/benchmark/colbench/__init__.py @@ -0,0 +1,25 @@ +""" +ColBench benchmark integration for MASEval. + +Evaluates LLM agents on collaborative backend-programming tasks from +Facebook's Collaborative Agent Bench (ColBench / sweet_rl). +""" + +from .colbench import ColBenchBenchmark +from .user import ColBenchUser, DEFAULT_HUMAN_SIMULATOR_CODE_PROMPT +from .environment import ColBenchEnvironment +from .evaluator import ColBenchCodeEvaluator, check_correctness +from .agent import ColBenchAgentInner, DEFAULT_AGENT_CODE_PROMPT +from .openai_model_adapter import OpenAIModelAdapter + +__all__ = [ + "ColBenchBenchmark", + "ColBenchUser", + "ColBenchEnvironment", + "ColBenchCodeEvaluator", + "ColBenchAgentInner", + "OpenAIModelAdapter", + "DEFAULT_HUMAN_SIMULATOR_CODE_PROMPT", + "DEFAULT_AGENT_CODE_PROMPT", + "check_correctness", +] \ No newline at end of file diff --git a/maseval/benchmark/colbench/agent.py b/maseval/benchmark/colbench/agent.py new file mode 100644 index 00000000..6dc5198a --- /dev/null +++ b/maseval/benchmark/colbench/agent.py @@ -0,0 +1,119 @@ +""" +ColBench Agent — the agent-under-test for ColBench tasks. + +Replaces ``VLLMAgent`` from sweet_rl for per-task execution. The original +``VLLMAgent`` did batched local inference via vLLM's ``LLM`` class. In +MASEval, each task runs independently, so we call the agent model via the +OpenAI-compatible API served by vLLM. + +This class is wrapped by ``AgentAdapter`` in ``setup_agents()`` — it does +NOT subclass ``AgentAdapter`` directly. + +The agent receives: + - A system prompt (ColBench agent instructions) + - The growing dialogue history + +And returns raw text, which may contain ``"I WANT TO ANSWER:"`` to signal +the final code submission. +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional + +from maseval.core.model import ModelAdapter +from maseval.core.agent import AgentAdapter + +logger = logging.getLogger(__name__) + +DEFAULT_AGENT_CODE_PROMPT = ( + "You are a helpful LLM agent. \n" + "Your task is to help a human user to resolve their problem, in particular " + "python programming.\n" + "1) Note that the problem is highly personalized so you need to explicitly " + "gather information \nby asking questions to the human user about some " + "hidden information and implicit constraints.\n" + "YOU SHOULD TRY TO ASK CLARIFICATION QUESTIONS.\n" + "2) Note that you should not ask human users complicated questions as they " + "will only answer questions briefly in two sentences.\n" + '3) When you have gathered enough information to answer, say "I WANT TO ' + 'ANSWER:" in the beginning of your response and provide your final answer.\n' + "4) Note that you can only interact with the human users WITHIN 10 " + "back-and-forth rounds and you have to provide your final answer before " + "the conversation ends.\n" + "5) You should be as concise as possible in your response to human.\n\n\n" + '"I WANT TO ANSWER:" should be included in your response to human if you ' + "think that you have gathered enough information for addressing this problem.\n" + 'Directly output the raw python code after "I WANT TO ANSWER:".\n\n' +) + + +class ColBenchAgentInner: + """Inner agent logic for ColBench. + + This is a simple stateful agent that accumulates dialogue history and + calls the model via ``ModelAdapter.chat()``. It is wrapped by MASEval's + ``AgentAdapter`` for tracing. + + Attributes: + model: The ``ModelAdapter`` for the agent LLM. + system_prompt: Agent instructions. + temperature: Sampling temperature. + max_tokens: Max tokens per response. + dialogue_history: Accumulated messages. + """ + + def __init__( + self, + model: ModelAdapter, + system_prompt: Optional[str] = None, + temperature: float = 1.0, + max_tokens: int = 1024, + ): + self.model = model + self.system_prompt = system_prompt or DEFAULT_AGENT_CODE_PROMPT + self.temperature = temperature + self.max_tokens = max_tokens + self.dialogue_history: List[Dict[str, str]] = [] + + def run(self, query: str) -> str: + """Execute one agent turn. + + Called by ``ColBenchBenchmark.run_agents()`` via ``execution_loop()``. + + Args: + query: The current user message (problem description or + human simulator reply). + + Returns: + The agent's raw response text. + """ + self.dialogue_history.append({"role": "user", "content": query}) + + api_messages = [ + {"role": "system", "content": self.system_prompt}, + ] + self.dialogue_history + + response = self.model.chat( + messages=api_messages, + generation_params={ + "max_tokens": self.max_tokens, + "temperature": self.temperature, + }, + ) + text = response.content or "" + + self.dialogue_history.append({"role": "assistant", "content": text}) + return text + + +class ColBenchAgentAdapter(AgentAdapter): + """Concrete AgentAdapter that delegates _run_agent to ColBenchAgentInner.run().""" + + def __init__(self, inner_agent: ColBenchAgentInner, name: str = "colbench_agent"): + super().__init__(agent_instance=inner_agent, name=name) + self.inner_agent = inner_agent + + def _run_agent(self, query: str) -> str: + return self.inner_agent.run(query) \ No newline at end of file diff --git a/maseval/benchmark/colbench/colbench.py b/maseval/benchmark/colbench/colbench.py new file mode 100644 index 00000000..35a6e55e --- /dev/null +++ b/maseval/benchmark/colbench/colbench.py @@ -0,0 +1,269 @@ +""" +ColBench Benchmark — MASEval Benchmark orchestrator. + +Subclasses ``maseval.core.Benchmark`` to wire together: + - **Environment** → ``ColBenchEnvironment`` (task state) + - **User** → ``ColBenchUser`` (human simulator) + - **Agent** → ``ColBenchAgentInner`` (agent-under-test) + - **Evaluator** → ``ColBenchCodeEvaluator`` (unit-test scoring) + +The ``execution_loop`` flow (inherited from ``Benchmark``) maps exactly to +the original sweet_rl interaction loop:: + + # sweet_rl # MASEval execution_loop + obs = env.reset(desc, gt) query = user.get_initial_query() + for step in range(max_steps): for _ in range(max_invocations): + response = agent(obs) answer = run_agents(query) + obs, _, done = env.step(resp) reply = user.respond(answer) + if done: break if user.is_done(): break + query = reply +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple + +from maseval.core.benchmark import Benchmark +from maseval.core.task import Task, SequentialTaskQueue +from maseval.core.environment import Environment +from maseval.core.agent import AgentAdapter +from maseval.core.model import ModelAdapter +from maseval.core.evaluator import Evaluator +from maseval.core.user import User +from maseval.core.callback import BenchmarkCallback +from maseval.core.seeding import SeedGenerator + +from .environment import ColBenchEnvironment +from .user import ColBenchUser, DEFAULT_HUMAN_SIMULATOR_CODE_PROMPT +from .agent import ColBenchAgentInner, ColBenchAgentAdapter, DEFAULT_AGENT_CODE_PROMPT +from .evaluator import ColBenchCodeEvaluator + +logger = logging.getLogger(__name__) + +ModelFactory = Callable[..., ModelAdapter] + + +class ColBenchBenchmark(Benchmark): + """MASEval benchmark for ColBench collaborative agent evaluation. + + Args: + model_factory: Callable ``(model_id, **kwargs) → ModelAdapter``. + human_simulator_model_id: Model ID for the human-simulator LLM. + agent_model_id: Default agent model ID. + human_prompt: Prompt template for the simulator. + agent_prompt: System prompt for the agent. + max_steps: Maximum interaction rounds (default 10). + agent_temperature: Agent sampling temperature (default 1.0). + agent_max_tokens: Max tokens per agent response (default 1024). + **kwargs: Forwarded to ``Benchmark.__init__()``. + """ + + def __init__( + self, + model_factory: Optional[ModelFactory] = None, + human_simulator_model_id: str = "meta-llama/Llama-3.1-70B-Instruct", + agent_model_id: str = "meta-llama/Llama-3.1-8B-Instruct", + human_prompt: Optional[str] = None, + agent_prompt: Optional[str] = None, + max_steps: int = 10, + agent_temperature: float = 1.0, + agent_max_tokens: int = 1024, + callbacks: Optional[List[BenchmarkCallback]] = None, + n_task_repeats: int = 1, + num_workers: int = 1, + seed: Optional[int] = None, + **kwargs, + ): + super().__init__( + callbacks=callbacks, + n_task_repeats=n_task_repeats, + max_invocations=max_steps + 1, + num_workers=num_workers, + seed=seed, + **kwargs, + ) + self._model_factory = model_factory + self.human_simulator_model_id = human_simulator_model_id + self.agent_model_id = agent_model_id + self.human_prompt = human_prompt or DEFAULT_HUMAN_SIMULATOR_CODE_PROMPT + self.agent_prompt = agent_prompt or DEFAULT_AGENT_CODE_PROMPT + self.max_steps = max_steps + self.agent_temperature = agent_temperature + self.agent_max_tokens = agent_max_tokens + + # Per-task inner agent reference (set in setup_agents, used in run_agents) + self._current_inner_agent: Optional[ColBenchAgentInner] = None + + # ── Abstract method implementations ────────────────────────────────── + + def setup_environment( + self, + agent_data: Dict[str, Any], + task: Task, + seed_generator: SeedGenerator, + ) -> ColBenchEnvironment: + env_data = task.environment_data or {} + return ColBenchEnvironment(task_data={ + "problem_description": env_data.get("problem_description", task.query), + "ground_truth": env_data.get("ground_truth", ""), + "test_cases": env_data.get("test_cases", {}), + "task_type": env_data.get("task_type", "code"), + }) + + def setup_user( + self, + agent_data: Dict[str, Any], + environment: Environment, + task: Task, + seed_generator: SeedGenerator, + ) -> ColBenchUser: + env: ColBenchEnvironment = environment # type: ignore[assignment] + + sim_model_id = agent_data.get("env_model", self.human_simulator_model_id) + simulator_model = self.get_model_adapter( + sim_model_id, + register_category="models", + register_name="human_simulator", + ) + + return ColBenchUser( + problem_description=env.problem_description, + hidden_information=env.ground_truth, + model=simulator_model, + human_prompt=agent_data.get("human_prompt", self.human_prompt), + max_steps=agent_data.get("max_steps", self.max_steps), + ) + + def setup_agents( + self, + agent_data: Dict[str, Any], + environment: Environment, + task: Task, + user: Optional[User], + seed_generator: SeedGenerator, + ) -> Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]: + model_id = agent_data.get("model", self.agent_model_id) + model = self.get_model_adapter( + model_id, + register_category="models", + register_name="agent_model", + ) + + # Create inner agent with dialogue state + inner_agent = ColBenchAgentInner( + model=model, + system_prompt=agent_data.get("agent_prompt", self.agent_prompt), + temperature=agent_data.get("temperature", self.agent_temperature), + max_tokens=agent_data.get("max_tokens", self.agent_max_tokens), + ) + + # Store reference for run_agents() to call directly + self._current_inner_agent = inner_agent + + # Wrap in concrete AgentAdapter for MASEval tracing + agent_adapter = ColBenchAgentAdapter(inner_agent) + + agents_dict = {"colbench_agent": agent_adapter} + return [agent_adapter], agents_dict + + def setup_evaluators( + self, + environment: Environment, + task: Task, + agents: Sequence[AgentAdapter], + user: Optional[User], + seed_generator: SeedGenerator, + ) -> Sequence[Evaluator]: + return [ + ColBenchCodeEvaluator( + task=task, + environment=environment, + user=user, + ) + ] + + def get_model_adapter(self, model_id: str, **kwargs) -> ModelAdapter: + category = kwargs.pop("register_category", "models") + name = kwargs.pop("register_name", model_id) + + if self._model_factory is None: + raise RuntimeError( + "No model_factory provided. Either pass model_factory= to " + "ColBenchBenchmark() or subclass and override get_model_adapter()." + ) + + adapter = self._model_factory(model_id, **kwargs) + self.register(category, name, adapter) + return adapter + + def run_agents( + self, + agents: Sequence[AgentAdapter], + task: Task, + environment: Environment, + query: str, + ) -> Any: + """Execute one agent turn via the inner agent. + + Calls ``ColBenchAgentInner.run(query)`` directly (bypassing + ``AgentAdapter`` delegation) to ensure the correct ``chat()`` + call is made. + """ + assert self._current_inner_agent is not None, ( + "setup_agents() must be called before run_agents()" + ) + return self._current_inner_agent.run(query) + + def evaluate( + self, + evaluators: Sequence[Evaluator], + agents: Dict[str, AgentAdapter], + final_answer: Any, + traces: Dict[str, Any], + ) -> List[Dict[str, Any]]: + results = [] + for evaluator in evaluators: + filtered = evaluator.filter_traces(traces) + result = evaluator(filtered, final_answer=final_answer) + results.append(result) + return results + + # ── Task loading ───────────────────────────────────────────────────── + + @staticmethod + def load_tasks( + jsonl_path: str, + num_tasks: Optional[int] = None, + ) -> SequentialTaskQueue: + """Load ColBench tasks from a JSONL file. + + Each line:: + + {"problem_description": "...", "ground_truth": "...", + "test_cases": {"t1": "foo(1)", ...}} + """ + tasks: List[Task] = [] + with open(jsonl_path) as f: + for i, line in enumerate(f): + if num_tasks is not None and i >= num_tasks: + break + raw = json.loads(line) + task = Task( + query=raw["problem_description"], + environment_data={ + "problem_description": raw["problem_description"], + "ground_truth": raw["ground_truth"], + "test_cases": raw.get("test_cases", {}), + "task_type": "code", + }, + evaluation_data={ + "ground_truth": raw["ground_truth"], + "test_cases": raw.get("test_cases", {}), + }, + ) + tasks.append(task) + + logger.info("Loaded %d ColBench tasks from %s", len(tasks), jsonl_path) + return SequentialTaskQueue(tasks) \ No newline at end of file diff --git a/maseval/benchmark/colbench/environment.py b/maseval/benchmark/colbench/environment.py new file mode 100644 index 00000000..a51ad241 --- /dev/null +++ b/maseval/benchmark/colbench/environment.py @@ -0,0 +1,64 @@ +""" +ColBench Environment — MASEval Environment adapter. + +In MASEval, Environment.__init__(task_data) automatically calls: + 1. setup_state(task_data) → stored as self.state + 2. create_tools() → stored as self.tools + +So all initialization happens via task_data, not constructor args. +""" + +from __future__ import annotations + +from typing import Any, Dict, Optional + +from maseval.core.environment import Environment + + +class ColBenchEnvironment(Environment): + """MASEval Environment for a single ColBench task instance. + + Holds the ground-truth artifacts needed for post-hoc evaluation. + No tools — the agent interacts only via dialogue with the User. + + Attributes (set in setup_state): + problem_description: Task description shown to the agent. + ground_truth: Reference code — hidden from agent, used by evaluator. + test_cases: Unit-test expressions for backend tasks. + task_type: "code" or "html". + """ + + def __init__(self, task_data: Dict[str, Any], **kwargs): + # Don't set attributes here — super().__init__() calls setup_state() + # before we'd get a chance, and create_tools() runs too. + super().__init__(task_data=task_data, **kwargs) + + def setup_state(self, task_data: dict) -> Any: + """Extract task artifacts from task_data.""" + self.problem_description = str(task_data.get("problem_description", "")) + self.ground_truth = str(task_data.get("ground_truth", "")) + self.test_cases = task_data.get("test_cases", {}) or {} + self.task_type = task_data.get("task_type", "code") + return task_data # stored as self.state + + def create_tools(self) -> Dict[str, Any]: + """ColBench has no tools — the agent interacts only via dialogue.""" + return {} + + # ── Tracing / Config ───────────────────────────────────────────────── + + def gather_traces(self) -> Dict[str, Any]: + return { + **super().gather_traces(), + "task_type": self.task_type, + "problem_description_length": len(self.problem_description), + "ground_truth_length": len(self.ground_truth), + "num_test_cases": len(self.test_cases), + } + + def gather_config(self) -> Dict[str, Any]: + return { + **super().gather_config(), + "task_type": self.task_type, + "num_test_cases": len(self.test_cases), + } \ No newline at end of file diff --git a/maseval/benchmark/colbench/evaluator.py b/maseval/benchmark/colbench/evaluator.py new file mode 100644 index 00000000..7f9ff764 --- /dev/null +++ b/maseval/benchmark/colbench/evaluator.py @@ -0,0 +1,190 @@ +""" +ColBench Code Evaluator — MASEval Evaluator adapter. + +Matches the ``Evaluator`` ABC interface: + - ``__init__(task, environment, user)`` — receives task/env for ground truth + - ``filter_traces(traces)`` — extracts user traces (where the answer lives) + - ``__call__(traces, final_answer)`` — runs unit tests, returns metrics + +Safety: + Generated code is executed with 1-second timeouts and blocked-pattern guards. +""" + +from __future__ import annotations + +import logging +import signal +from typing import Any, Dict, Optional + +from maseval.core.evaluator import Evaluator +from maseval.core.environment import Environment +from maseval.core.task import Task +from maseval.core.user import User + +logger = logging.getLogger(__name__) + +# ── Blocked patterns (from sweet_rl/utils/code_utils.py) ───────────────── +_BLOCKED_PATTERNS = [ + "import os", "from os", "import sys", "from sys", + "open(", "print(", "write", "sudo", "transformers", + "exit(", "quit(", "argparse", +] + + +def _timeout_handler(signum, frame): + raise TimeoutError("Code execution timed out") + + +def _get_function_output(function_definition: str, test_case: str) -> Any: + try: + exec(function_definition) + return eval(test_case) + except Exception: + return None + + +def _queue_get_function_output(function_definition, test_case, queue): + queue.put(_get_function_output(function_definition, test_case)) + + +def _strip_markdown_fences(code: str) -> str: + if "```python" in code: + code = code.split("```python")[1].split("```")[0] + elif "```" in code: + code = code.split("```")[1].split("```")[0] + return code.strip() + + +def check_correctness( + ground_truth_function: str, + test_function: str, + test_cases: Dict[str, str], +) -> float: + """Score a generated function against ground-truth using unit tests. + + Faithfully replicates ``code_utils.check_correctness()``. + """ + if not test_cases: + return 0.0 + + for pat in _BLOCKED_PATTERNS: + if pat in test_function: + return 0.0 + + test_function = _strip_markdown_fences(test_function) + + num_correct = 0 + for _name, test_expr in test_cases.items(): + gt_output = _get_function_output(ground_truth_function, test_expr) + + signal.signal(signal.SIGALRM, _timeout_handler) + signal.alarm(1) + try: + test_output = _get_function_output(test_function, test_expr) + except TimeoutError: + test_output = None + finally: + signal.alarm(0) + + try: + if gt_output == test_output and gt_output is not None: + num_correct += 1 + except ValueError: + pass + + return num_correct / len(test_cases) + + +def _extract_agent_code( + final_answer: Any, + traces: Optional[Dict[str, Any]], +) -> Optional[str]: + """Extract agent's code from available sources. + + Priority: + 1. traces (user traces with extracted answer) + 2. final_answer with "I WANT TO ANSWER:" stripped + 3. final_answer as-is + """ + traces = traces or {} + + # Source 1: already-parsed answer from traces + answer = traces.get("answer") + if answer and isinstance(answer, str) and answer.strip(): + return answer.strip() + + # Source 2-3: final_answer + if final_answer and isinstance(final_answer, str) and final_answer.strip(): + text = final_answer + if "I WANT TO ANSWER:" in text: + text = text.split("I WANT TO ANSWER:", 1)[1] + return text.strip() + + return None + + +# ── MASEval Evaluator ──────────────────────────────────────────────────── + +class ColBenchCodeEvaluator(Evaluator): + """Evaluates ColBench backend-programming trajectories. + + Uses ``task.evaluation_data`` for ground truth and test cases. + Extracts the agent's code from user traces (where ``ColBenchUser`` + stores the parsed answer) via ``filter_traces()``. + """ + + def __init__( + self, + task: Task, + environment: Environment, + user: Optional[User] = None, + ): + super().__init__(task, environment, user) + # Read ground truth from ColBenchEnvironment (set in setup_state) + # This is more reliable than task.evaluation_data which may not be stored + self.ground_truth = getattr(environment, 'ground_truth', '') + self.test_cases = getattr(environment, 'test_cases', {}) + + # Fallback to task.evaluation_data if environment doesn't have it + if not self.ground_truth: + eval_data = getattr(task, 'evaluation_data', None) or {} + self.ground_truth = eval_data.get("ground_truth", "") + self.test_cases = eval_data.get("test_cases", {}) + + def filter_traces(self, traces: Dict[str, Any]) -> Dict[str, Any]: + """Extract user traces where the answer is stored.""" + return traces.get("user", {}) or {} + + def __call__( + self, + traces: Dict[str, Any], + final_answer: Optional[str] = None, + ) -> Dict[str, Any]: + """Run unit tests and return correctness metrics. + + Args: + traces: Filtered traces (user traces from filter_traces). + final_answer: The last return value from run_agents(). + """ + agent_code = _extract_agent_code(final_answer, traces) + + if not agent_code: + return { + "correctness": 0.0, + "success": False, + "num_tests": len(self.test_cases), + "num_passed": 0, + "error": "no_answer_extracted", + } + + correctness = check_correctness( + self.ground_truth, agent_code, self.test_cases + ) + num_passed = int(round(correctness * len(self.test_cases))) + + return { + "correctness": correctness, + "success": correctness == 1.0, + "num_tests": len(self.test_cases), + "num_passed": num_passed, + } \ No newline at end of file diff --git a/maseval/benchmark/colbench/human_interaction_env.py b/maseval/benchmark/colbench/human_interaction_env.py new file mode 100644 index 00000000..9bb5166e --- /dev/null +++ b/maseval/benchmark/colbench/human_interaction_env.py @@ -0,0 +1,126 @@ +""" +Copyright (c) Meta Platforms, Inc. and affiliates. + +This source code is licensed under the CC-By-NC license found in the +LICENSE file in the root directory of this source tree. +""" + +import openai + +HUMAN_RESPONSE_CHARACTER_LIMIT = 400 + + +class HumanInteractionEnv: + def __init__(self, client, human_prompt, model_id, env_id=0, max_steps: int = 10): + super().__init__() + self.client = client + self.human_prompt = human_prompt + self.env_id = env_id + + self.model_id = model_id + self.max_steps = max_steps + self.problem_description = "" + self.hidden_information = "" + self.steps = 0 + self.dialogue_history = [] + self.done = False + + def get_dialogue_history(self): + messages = [ + { + "role": d["role"], + "content": d["content"], + } + for d in self.dialogue_history + ] + return messages + + def str_dialogue_history(self): + result = "" + for d in self.dialogue_history: + result += str(d["role"]) + ":" + result += str(d["content"]) + "\n\n\n\n" + return result + "agent:" + + + def reset(self, problem_description, hidden_information): + self.problem_description = str(problem_description) + self.hidden_information = str(hidden_information) + self.answer = "No answer" + self.steps = 0 + self.done = False + self.dialogue_history = [] + self.dialogue_history.append( + { + "role": "user", + "content": problem_description, + } + ) + return self.get_dialogue_history() + + def invoke_model(self): + for _ in range(3): + try: + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + { + "role": "user", + "content": self.human_prompt.format( + problem_description=self.problem_description, + hidden_information=self.hidden_information, + dialogue_history=self.str_dialogue_history(), + ), + }, + ] + completion = self.client.chat.completions.create( + model=self.model_id, + messages=messages, + max_tokens=4096, + temperature=0, + ) + return completion.choices[0].message.content + except openai.BadRequestError as e: + return "No response." + # messages=[ + # {"role": "system", "content": "You are a helpful assistant."}, + # {"role": "user", "content": self.human_prompt.format(problem_description=self.problem_description, + # hidden_information=self.hidden_information, + # dialogue_history=self.str_dialogue_history())}, + # ] + # completion = self.client.chat.completions.create(model=self.model_id, messages=messages, max_tokens=2048) + # return completion.choices[0].message.content + + def step(self, response, formatted_prompt=None): + self.steps += 1 + if self.done: + return None, 0, True + + raw_response = response + + if "OUTPUT:" in response: + response = response.split("OUTPUT:")[1] + # remove additional OUTPUT: if exists + raw_response = "OUTPUT:".join(raw_response.split("OUTPUT:")[:2]) + + if "I WANT TO ANSWER:" in response or self.steps >= self.max_steps: + self.done = True + if "I WANT TO ANSWER:" in response: + self.answer = response.split("I WANT TO ANSWER:")[1] + else: + self.answer = response + + self.dialogue_history.append( + { + "role": "assistant", + "content": response, + "input": formatted_prompt, + "output": raw_response, + } + ) + + if not self.done: + answer = self.invoke_model() + self.dialogue_history.append( + {"role": "user", "content": answer[:HUMAN_RESPONSE_CHARACTER_LIMIT]} + ) + return self.get_dialogue_history() if not self.done else None, 0, self.done diff --git a/maseval/benchmark/colbench/openai_model_adapter.py b/maseval/benchmark/colbench/openai_model_adapter.py new file mode 100644 index 00000000..1edd0894 --- /dev/null +++ b/maseval/benchmark/colbench/openai_model_adapter.py @@ -0,0 +1,125 @@ +""" +OpenAI-compatible ModelAdapter for ColBench. + +Implements ``ModelAdapter._chat_impl()`` using an ``openai.OpenAI`` client. +Works with any OpenAI-compatible API server (vLLM, TGI, etc.). +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional, Union + +from maseval.core.model import ModelAdapter, ChatResponse + +logger = logging.getLogger(__name__) + + +class OpenAIModelAdapter(ModelAdapter): + """ModelAdapter backed by an OpenAI-compatible API client. + + Usage:: + + from openai import OpenAI + + client = OpenAI(base_url="http://localhost:8000/v1", api_key="EMPTY") + adapter = OpenAIModelAdapter(client, model_id="meta-llama/Llama-3.1-70B-Instruct") + + # Use via ModelAdapter interface + response = adapter.chat([{"role": "user", "content": "Hello"}]) + print(response.content) + + # Or simple generation + text = adapter.generate("What is 2+2?") + """ + + def __init__( + self, + client: Any, # openai.OpenAI + model_id: str, + default_max_tokens: int = 4096, + default_temperature: float = 0.0, + seed: Optional[int] = None, + ): + super().__init__(seed=seed) + self._client = client + self._model_id = model_id + self.default_max_tokens = default_max_tokens + self.default_temperature = default_temperature + + @property + def model_id(self) -> str: + return self._model_id + + def _chat_impl( + self, + messages: List[Dict[str, Any]], + generation_params: Optional[Dict[str, Any]] = None, + tools: Optional[List[Dict[str, Any]]] = None, + tool_choice: Optional[Union[str, Dict[str, Any]]] = None, + **kwargs: Any, + ) -> ChatResponse: + """Call the OpenAI-compatible chat completions API.""" + params = generation_params or {} + max_tokens = params.pop("max_tokens", self.default_max_tokens) + temperature = params.pop("temperature", self.default_temperature) + + api_kwargs: Dict[str, Any] = { + "model": self._model_id, + "messages": messages, + "max_tokens": max_tokens, + "temperature": temperature, + **params, + **kwargs, + } + + if tools: + api_kwargs["tools"] = tools + if tool_choice is not None: + api_kwargs["tool_choice"] = tool_choice + if self._seed is not None and "seed" not in api_kwargs: + api_kwargs["seed"] = self._seed + + completion = self._client.chat.completions.create(**api_kwargs) + choice = completion.choices[0] + message = choice.message + + # Parse tool calls if present + parsed_tool_calls = None + if message.tool_calls: + parsed_tool_calls = [ + { + "id": tc.id, + "type": tc.type, + "function": { + "name": tc.function.name, + "arguments": tc.function.arguments, + }, + } + for tc in message.tool_calls + ] + + # Parse usage + usage = None + if completion.usage: + usage = { + "input_tokens": completion.usage.prompt_tokens, + "output_tokens": completion.usage.completion_tokens, + "total_tokens": completion.usage.total_tokens, + } + + return ChatResponse( + content=message.content, + tool_calls=parsed_tool_calls, + role=message.role or "assistant", + usage=usage, + model=completion.model, + stop_reason=choice.finish_reason, + ) + + def gather_config(self) -> Dict[str, Any]: + return { + **super().gather_config(), + "default_max_tokens": self.default_max_tokens, + "default_temperature": self.default_temperature, + } \ No newline at end of file diff --git a/maseval/benchmark/colbench/prompts/human_simulator_code_prompt.txt b/maseval/benchmark/colbench/prompts/human_simulator_code_prompt.txt new file mode 100644 index 00000000..8ba1fec0 --- /dev/null +++ b/maseval/benchmark/colbench/prompts/human_simulator_code_prompt.txt @@ -0,0 +1,16 @@ +Your task is to simulate a human user that interacts with an LLM agent in a dialogue. +You would like the LLM agent to help you with the following problem: +{problem_description} + +Your goal is to engage in the conversation with the LLM agent so that it can get to a personalized answer. +You should make use of the following hidden information to answer the LLM agent. +YOU SHOULD BEHAVE LIKE A HUMAN THAT NEEDS THE HELP FROM AN AGENT. +You SHOULD ONLY ANSWER QUESTIONS WITH INFORMATION PROVIDED IN THE HIDDEN INFORMATION, AND SAY YOU DON"T KNOW IF THE ANSWER CAN NOT BE FOUND IN THE HIDDEN INFORMATION. + +{hidden_information} + +Here is the dialogue so far: +{dialogue_history} + + +Now directly output your answer to the LLM agent IN TWO SENTENCES. DO NOT SAY ANYTHING ELSE. \ No newline at end of file diff --git a/maseval/benchmark/colbench/prompts/llm_agent_code_prompt.txt b/maseval/benchmark/colbench/prompts/llm_agent_code_prompt.txt new file mode 100644 index 00000000..fc26a4a3 --- /dev/null +++ b/maseval/benchmark/colbench/prompts/llm_agent_code_prompt.txt @@ -0,0 +1,16 @@ +You are a helpful LLM agent. +Your task is to help a human user to resolve their problem, in particular python programming. +1) Note that the problem is highly personalized so you need to explicitly gather information +by asking questions to the human user about some hidden information and implicit constraints. +YOU SHOULD TRY TO ASK CLARIFICATION QUESTIONS. +2) Note that you should not ask human users complicated questions as they will only answer questions briefly in two sentences. +3) When you have gathered enough information to answer, say "I WANT TO ANSWER:" in the beginning of your response and provide your final answer. +4) Note that you can only interact with the human users WITHIN 10 back-and-forth rounds and you have to provide your final answer before the conversation ends. +5) You should be as concise as possible in your response to human. + + +"I WANT TO ANSWER:" should be included in your response to human if you think that you have gathered enough information for addressing this problem. +Directly output the raw python code after "I WANT TO ANSWER:". + +Complete only the immediate agent response in this dialogue: +{dialogue_history} \ No newline at end of file diff --git a/maseval/benchmark/colbench/user.py b/maseval/benchmark/colbench/user.py new file mode 100644 index 00000000..1e69f3db --- /dev/null +++ b/maseval/benchmark/colbench/user.py @@ -0,0 +1,224 @@ +""" +ColBench Human Simulator — MASEval User adapter. + +Converts the sweet_rl ``HumanInteractionEnv`` into MASEval's ``User`` interface. + +Execution flow (driven by Benchmark.execution_loop): + 1. execution_loop calls ``get_initial_query()`` → problem_description + 2. Loop: ``run_agents(query)`` → agent_text + ``respond(agent_text)`` → checks termination, invokes simulator + ``is_done()`` → True if agent answered or max_steps exhausted + 3. When the agent emits ``"I WANT TO ANSWER:"``, ``respond()`` extracts + the answer, marks done, and returns ``""``. +""" + +from __future__ import annotations + +import logging +import time +from datetime import datetime +from typing import Any, Dict, List, Optional + +from maseval.core.user import User +from maseval.core.model import ModelAdapter +from maseval.core.history import MessageHistory + +logger = logging.getLogger(__name__) + +HUMAN_RESPONSE_CHARACTER_LIMIT = 400 + +DEFAULT_HUMAN_SIMULATOR_CODE_PROMPT = ( + "Your task is to simulate a human user that interacts with an LLM agent " + "in a dialogue.\n" + "You would like the LLM agent to help you with the following problem:\n" + "{problem_description}\n\n" + "Your goal is to engage in the conversation with the LLM agent so that " + "it can get to a personalized answer.\n" + "You should make use of the following hidden information to answer the " + "LLM agent.\n" + 'YOU SHOULD BEHAVE LIKE A HUMAN THAT NEEDS THE HELP FROM AN AGENT.\n' + "You SHOULD ONLY ANSWER QUESTIONS WITH INFORMATION PROVIDED IN THE " + 'HIDDEN INFORMATION, AND SAY YOU DON"T KNOW IF THE ANSWER CAN NOT BE ' + "FOUND IN THE HIDDEN INFORMATION.\n\n" + "{hidden_information}\n\n" + "Here is the dialogue so far:\n" + "{dialogue_history}\n\n\n" + "Now directly output your answer to the LLM agent IN TWO SENTENCES. " + "DO NOT SAY ANYTHING ELSE." +) + + +class ColBenchUser(User): + """MASEval User that replicates the ColBench human-simulator behaviour. + + Faithfully converts ``HumanInteractionEnv`` from sweet_rl: + + * The initial query is the task's ``problem_description``. + * On each ``respond()`` call the agent's message is inspected for + ``"I WANT TO ANSWER:"``; if found the user marks done and does NOT + call the simulator LLM. + * Otherwise, the human-simulator LLM is invoked with the full dialogue + history formatted identically to the original prompt template. + * Responses are truncated to ``HUMAN_RESPONSE_CHARACTER_LIMIT`` (400). + * Interaction terminates when the agent answers or ``max_steps`` exhausted. + """ + + def __init__( + self, + problem_description: str, + hidden_information: str, + model: ModelAdapter, + human_prompt: Optional[str] = None, + max_steps: int = 10, + response_char_limit: int = HUMAN_RESPONSE_CHARACTER_LIMIT, + ): + super().__init__() + self.problem_description = str(problem_description) + self.hidden_information = str(hidden_information) + self.model = model + self.human_prompt = human_prompt or DEFAULT_HUMAN_SIMULATOR_CODE_PROMPT + self.max_steps = max_steps + self.response_char_limit = response_char_limit + + self.messages = MessageHistory() + self.logs: List[Dict[str, Any]] = [] + self._turn_count: int = 0 + self._done: bool = False + self.answer: Optional[str] = None + + # ── User protocol ──────────────────────────────────────────────────── + + def get_initial_query(self) -> str: + self.messages.add_message("user", self.problem_description) + return self.problem_description + + def respond(self, agent_message: str) -> str: + if self._done: + return "" + + self._turn_count += 1 + raw_response = agent_message + + # Strip OUTPUT: prefix (matches original step() logic) + if "OUTPUT:" in agent_message: + agent_message = agent_message.split("OUTPUT:")[1] + raw_response = "OUTPUT:".join(raw_response.split("OUTPUT:")[:2]) + + # Termination check + if "I WANT TO ANSWER:" in agent_message: + self._done = True + self.answer = agent_message.split("I WANT TO ANSWER:")[1] + elif self._turn_count >= self.max_steps: + self._done = True + self.answer = agent_message + + # Record agent turn + self.messages.add_message("assistant", agent_message) + + # Invoke human simulator if interaction continues + if self._done: + return "" + + start_time = time.time() + log_entry: Dict[str, Any] = { + "timestamp": datetime.now().isoformat(), + "turn": self._turn_count, + "agent_message_preview": agent_message[:200], + "status": "success", + } + + try: + simulator_reply = self._invoke_simulator() + except Exception as exc: + log_entry["duration_seconds"] = time.time() - start_time + log_entry["status"] = "error" + log_entry["error"] = str(exc) + log_entry["error_type"] = type(exc).__name__ + self.logs.append(log_entry) + raise + + simulator_reply = simulator_reply[: self.response_char_limit] + + log_entry["duration_seconds"] = time.time() - start_time + log_entry["response_preview"] = simulator_reply[:200] + self.logs.append(log_entry) + + self.messages.add_message("user", simulator_reply) + return simulator_reply + + def is_done(self) -> bool: + return self._done + + # ── Simulator invocation ───────────────────────────────────────────── + + def _format_dialogue_history(self) -> str: + """Format dialogue as original ``str_dialogue_history()``.""" + result = "" + for msg in self.messages.to_list(): + role = msg["role"] if msg["role"] != "assistant" else "agent" + result += f"{role}:{msg['content']}\n\n\n\n" + return result + "agent:" + + def _invoke_simulator(self) -> str: + """Call the human-simulator LLM via ModelAdapter.chat(). + + Replicates ``HumanInteractionEnv.invoke_model()`` with 3-retry logic. + Uses ``model.chat()`` → ``ChatResponse.content``. + """ + prompt_text = self.human_prompt.format( + problem_description=self.problem_description, + hidden_information=self.hidden_information, + dialogue_history=self._format_dialogue_history(), + ) + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": prompt_text}, + ] + + for attempt in range(3): + try: + response = self.model.chat( + messages=messages, + generation_params={ + "max_tokens": 4096, + "temperature": 0, + }, + ) + return response.content or "No response." + except Exception as e: + logger.warning( + "Human simulator call failed (attempt %d/3): %s", + attempt + 1, + e, + ) + if attempt == 2: + return "No response." + return "No response." + + # ── Tracing / Config ───────────────────────────────────────────────── + + def gather_traces(self) -> Dict[str, Any]: + return { + **super().gather_traces(), + "name": "colbench_human_simulator", + "message_count": len(self.messages), + "messages": self.messages.to_list(), + "logs": self.logs, + "max_steps": self.max_steps, + "turns_used": self._turn_count, + "answer": self.answer, + "termination_reason": ( + "agent_answered" + if self.answer is not None and self._turn_count < self.max_steps + else "max_steps" if self._done else "not_terminated" + ), + } + + def gather_config(self) -> Dict[str, Any]: + return { + **super().gather_config(), + "name": "colbench_human_simulator", + "max_steps": self.max_steps, + "response_char_limit": self.response_char_limit, + "prompt_template_length": len(self.human_prompt), + } \ No newline at end of file diff --git a/maseval/benchmark/colbench/utils.py b/maseval/benchmark/colbench/utils.py new file mode 100644 index 00000000..6c15fbf4 --- /dev/null +++ b/maseval/benchmark/colbench/utils.py @@ -0,0 +1,153 @@ +""" +Copyright (c) Meta Platforms, Inc. and affiliates. + +This source code is licensed under the CC-By-NC license found in the +LICENSE file in the root directory of this source tree. +""" + +import multiprocessing +import signal +from queue import Empty + +disable_printing = """ +import sys +class DisablePrint: + def write(self, x): + pass + def flush(self): + pass +# Save the current state of stdout +original_stdout = sys.stdout +# Disable all printing to the console +sys.stdout = DisablePrint() +""" + + +# Handler function to be called when the alarm signal is received +def timeout_handler(signum, frame): + raise TimeoutError("Operation timed out") + + +def get_function_output(function_definition, test_case): + # exec(disable_printing) + try: + exec(function_definition) + return eval(test_case) + except Exception as e: + return None + + +def queue_get_function_output(function_definition, test_case, queue): + queue.put(get_function_output(function_definition, test_case)) + + +def subprocess_get_function_output(function_definition, test_case): + # do not want any os functions + if ( + "import os" in function_definition + or "from os" in function_definition + or "import sys" in function_definition + or "from sys" in function_definition + ): + return None + if ( + "open(" in function_definition + or "print(" in function_definition + or "write" in function_definition + ): + return None + if "sudo" in function_definition or "transformers" in function_definition: + return None + if "exit(" in function_definition or "quit(" in function_definition: + return None + + # Set the signal handler for SIGALRM + # signal.signal(signal.SIGALRM, timeout_handler) + # signal.alarm(1) # Set an alarm for 10 seconds + # try: + queue = multiprocessing.Queue() + process = multiprocessing.Process( + target=queue_get_function_output, args=(function_definition, test_case, queue) + ) + process.start() + process.join(timeout=1) + process.kill() + # except TimeoutError: + # return None + # process.terminate() + try: + result = queue.get(timeout=0.1) + except Empty: + result = None + return result + + +def check_correctness(ground_truth_function, test_function, test_cases): + # Although unlikely, there is a chance that this function may run malicious code outputted by the LLMs + num_correct = 0 + + if ( + "import os" in test_function + or "from os" in test_function + or "import sys" in test_function + or "from sys" in test_function + ): + return 0 + if "sudo" in test_function or "transformers" in test_function: + return 0 + if "exit(" in test_function or "quit(" in test_function: + return 0 + if "argparse" in test_function: + return 0 + + if "```python" in test_function: + test_function = test_function.split("```python")[1].split("```")[0] + if "```" in test_function: + test_function = test_function.split("```")[1].split("```")[0] + + for test_case in test_cases.values(): + ground_truth_output = get_function_output(ground_truth_function, test_case) + + # timeout precautions + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(1) # Set an alarm for 10 seconds + try: + # print(test_function) + # if "match_player" in test_function: + # return 0 + test_output = get_function_output(test_function, test_case) + except TimeoutError: + test_output = None + signal.alarm(0) # Reset the alarm + try: + if ground_truth_output == test_output and ground_truth_output is not None: + num_correct += 1 + except ValueError: + pass + return num_correct / len(test_cases) + + +def code_evaluate(trajectories): + all_correctness = [] + for i, trajectory in enumerate(trajectories): + # if i == 15051 or i == 15276: + # all_correctness.append(0) + # continue + # print(i) + ground_truth_function = trajectory["task"]["ground_truth"] + test_function = trajectory["answer"] + # print(test_function) + test_cases = trajectory["task"]["test_cases"] + correctness = check_correctness( + ground_truth_function, test_function, test_cases + ) + # if correctness < 1: + # print(i) + # print(test_function) + all_correctness.append(correctness) + print(f"Average correctness: {sum(all_correctness)/len(all_correctness)}") + print(f"Number of trajectories: {len(all_correctness)}") + print( + f"Percentage of correct trajectories: {sum([1 for correctness in all_correctness if correctness == 1])/len(all_correctness)}" + ) + return all_correctness diff --git a/maseval/benchmark/colbench/vllm_agent.py b/maseval/benchmark/colbench/vllm_agent.py new file mode 100644 index 00000000..cce64045 --- /dev/null +++ b/maseval/benchmark/colbench/vllm_agent.py @@ -0,0 +1,93 @@ +""" +Copyright (c) Meta Platforms, Inc. and affiliates. + +This source code is licensed under the CC-By-NC license found in the +LICENSE file in the root directory of this source tree. +""" + +import concurrent +import json +import random +import time + +import openai +import torch +from datasets import load_dataset +from fire import Fire +from huggingface_hub import login +from openai import OpenAI +from tqdm import tqdm +from transformers import AutoTokenizer +from vllm import LLM, SamplingParams + + +class VLLMAgent: + def __init__( + self, model_id, agent_prompt, tokenizer, tensor_parallel_size, temperature + ): + super().__init__() + llm_args = { + "model": model_id, + "distributed_executor_backend": "ray", + "tensor_parallel_size": tensor_parallel_size, + "enforce_eager": True, + "seed": int(time.time()), + } + + self.agent_prompt = agent_prompt + self.llm = LLM(**llm_args) + self.sampling_params = SamplingParams( + # n=1, + temperature=temperature, + # top_p=1, + # top_k=-1, + max_tokens=1024, + # use_beam_search=False, + ) + self.tokenizer = tokenizer + + def get_action(self, str_dialogue_histories, use_tqdm=True): + print(f"Length of str_dialogue_histories: {len(str_dialogue_histories)}") + # currently this does not work for duplicate dialogue histories + new_index2original_index = {} + undone_str_dialogue_histories = [] + for i, str_dialogue_history in enumerate(str_dialogue_histories): + if str_dialogue_history is not None: + new_index2original_index[len(undone_str_dialogue_histories)] = i + undone_str_dialogue_histories.append(str_dialogue_history) + + formatted_prompts = [] + for str_dialogue_history in undone_str_dialogue_histories: + messages = [ + {"role": "system", "content": self.agent_prompt} + ] + str_dialogue_history + prompt = self.tokenizer.apply_chat_template( + messages, tokenize=False, add_generation_prompt=True + ) + + # prompt = self.agent_prompt.format(dialogue_history=str_dialogue_history) + formatted_prompts.append(prompt) + # formatted_prompts_without_template.append(self.agent_prompt.replace("{dialogue_history}", str_dialogue_history)) + if len(formatted_prompts) == 0: + return [None for _ in range(len(str_dialogue_histories))], [ + None for _ in range(len(str_dialogue_histories)) + ] + outputs = self.llm.generate( + formatted_prompts, self.sampling_params, use_tqdm=use_tqdm + ) + outputs = [output.outputs[0].text for output in outputs] + + real_formatted_prompts = [None for _ in range(len(str_dialogue_histories))] + real_outputs = [None for _ in range(len(str_dialogue_histories))] + for j, output in enumerate(outputs): + real_outputs[new_index2original_index[j]] = output + real_formatted_prompts[new_index2original_index[j]] = formatted_prompts[j] + + for i, output in enumerate(real_outputs): + if output is None: + assert ( + str_dialogue_histories[i] is None + ), f"{i} {str_dialogue_histories[i]} is not None" + + # import IPython; IPython.embed(); exit(1) + return real_formatted_prompts, real_outputs