diff --git a/doc/myst.yml b/doc/myst.yml index d2decaceca..1c0ed16b02 100644 --- a/doc/myst.yml +++ b/doc/myst.yml @@ -60,6 +60,7 @@ project: - file: scanner/1_pyrit_scan.ipynb - file: scanner/2_pyrit_shell.md - file: scanner/airt.ipynb + - file: scanner/benchmark.ipynb - file: scanner/foundry.ipynb - file: scanner/garak.ipynb - file: code/framework.md diff --git a/doc/scanner/0_scanner.md b/doc/scanner/0_scanner.md index 61b63d3de0..6efd2e77d0 100644 --- a/doc/scanner/0_scanner.md +++ b/doc/scanner/0_scanner.md @@ -28,11 +28,12 @@ pyrit_scan foundry.red_team_agent --target openai_chat --initializers target loa ## Built-in Scenarios -PyRIT ships with scenarios organized into three families: +PyRIT ships with scenarios organized into the following families: | Family | Scenarios | Documentation | |--------|-----------|---------------| | **AIRT** | ContentHarms, Psychosocial, Cyber, Jailbreak, Leakage, Scam | [AIRT Scenarios](airt.ipynb) | +| **Benchmark** | AdversarialBenchmark | [Benchmark Scenarios](benchmark.ipynb) | | **Foundry** | RedTeamAgent | [Foundry Scenarios](foundry.ipynb) | | **Garak** | Encoding | [Garak Scenarios](garak.ipynb) | diff --git a/doc/scanner/benchmark.ipynb b/doc/scanner/benchmark.ipynb new file mode 100644 index 0000000000..88c92c1b7e --- /dev/null +++ b/doc/scanner/benchmark.ipynb @@ -0,0 +1,160 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0", + "metadata": {}, + "source": [ + "# Benchmark Scenarios\n", + "\n", + "Benchmark scenarios are a subset of scenarios that compare the effectiveness of attacks across an axis that varies within the scenario itself. The axis can be many things; currently, the only benchmark variant is the adversarial benchmark, whose axis of change is the adversarial model used in attacks." + ] + }, + { + "cell_type": "markdown", + "id": "1", + "metadata": {}, + "source": [ + "## Adversarial Benchmark\n", + "The adversarial benchmarking scenario (`AdversarialBenchmark`) compares the effectiveness of different adversarial models in successfully executing attacks against a target model." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Found default environment files: ['./.pyrit/.env', './.pyrit/.env.local']\n", + "Loaded environment file: ./.pyrit/.env\n", + "Loaded environment file: ./.pyrit/.env.local\n" + ] + }, + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "8316db039ba1408499df0a2de6c8d6f6", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Executing AdversarialBenchmark: 0%| | 0/3 [00:00)` to pick\n", + "# up where this run left off (constructor args must match the original run).\n", + "print(f\"Scenario result id: {baseline_result.id}\")\n", + "\n", + "printer = ConsoleScenarioResultPrinter()\n", + "\n", + "await printer.print_summary_async(baseline_result) # type: ignore" + ] + } + ], + "metadata": { + "jupytext": { + "main_language": "python" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/doc/scanner/benchmark.py b/doc/scanner/benchmark.py new file mode 100644 index 0000000000..90dcba2a6d --- /dev/null +++ b/doc/scanner/benchmark.py @@ -0,0 +1,48 @@ +# --- +# jupyter: +# jupytext: +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# jupytext_version: 1.18.1 +# --- + +# %% [markdown] +# # Benchmark Scenarios +# +# Benchmark scenarios are a subset of scenarios that compare the effectiveness of attacks across an axis that varies within the scenario itself. The axis can be many things; currently, the only benchmark variant is the adversarial benchmark, whose axis of change is the adversarial model used in attacks. + +# %% [markdown] +# ## Adversarial Benchmark +# The adversarial benchmarking scenario (`AdversarialBenchmark`) compares the effectiveness of different adversarial models in successfully executing attacks against a target model. + +# %% +from pyrit.prompt_target import OpenAIChatTarget +from pyrit.scenario.printer.console_printer import ConsoleScenarioResultPrinter +from pyrit.scenario.scenarios.benchmark import AdversarialBenchmark +from pyrit.setup import IN_MEMORY, initialize_pyrit_async +from pyrit.setup.initializers import LoadDefaultDatasets + +await initialize_pyrit_async(memory_db_type=IN_MEMORY, initializers=[LoadDefaultDatasets()]) # type: ignore + +# Pass any number of adversarial PromptChatTargets as a list; AdversarialBenchmark +# infers a label for each from its identifier and runs every benchmark-friendly +# attack technique against the objective target with each adversarial model. +adversarial_model = OpenAIChatTarget() + +benchmark_scenario = AdversarialBenchmark(adversarial_models=[adversarial_model]) + +await benchmark_scenario.initialize_async( # type: ignore + objective_target=OpenAIChatTarget(), max_concurrency=2 +) + +baseline_result = await benchmark_scenario.run_async() # type: ignore + +# Resume handle: re-run with `AdversarialBenchmark(..., scenario_result_id=)` to pick +# up where this run left off (constructor args must match the original run). +print(f"Scenario result id: {baseline_result.id}") + +printer = ConsoleScenarioResultPrinter() + +await printer.print_summary_async(baseline_result) # type: ignore diff --git a/pyrit/scenario/__init__.py b/pyrit/scenario/__init__.py index a9924c28ca..c5a3130069 100644 --- a/pyrit/scenario/__init__.py +++ b/pyrit/scenario/__init__.py @@ -31,15 +31,18 @@ # This allows: from pyrit.scenario.airt import ContentHarms # without needing separate pyrit/scenario/airt/ directories from pyrit.scenario.scenarios import airt as _airt_module +from pyrit.scenario.scenarios import benchmark as _benchmark_module from pyrit.scenario.scenarios import foundry as _foundry_module from pyrit.scenario.scenarios import garak as _garak_module sys.modules["pyrit.scenario.airt"] = _airt_module +sys.modules["pyrit.scenario.benchmark"] = _benchmark_module sys.modules["pyrit.scenario.garak"] = _garak_module sys.modules["pyrit.scenario.foundry"] = _foundry_module # Also expose as attributes for IDE support airt = _airt_module +benchmark = _benchmark_module garak = _garak_module foundry = _foundry_module @@ -55,6 +58,7 @@ "ScenarioIdentifier", "ScenarioResult", "airt", + "benchmark", "garak", "foundry", ] diff --git a/pyrit/scenario/core/scenario_techniques.py b/pyrit/scenario/core/scenario_techniques.py index 2e405ce4c8..0150d9775e 100644 --- a/pyrit/scenario/core/scenario_techniques.py +++ b/pyrit/scenario/core/scenario_techniques.py @@ -25,6 +25,7 @@ from pyrit.common.path import EXECUTOR_SEED_PROMPT_PATH from pyrit.executor.attack import ( + ContextComplianceAttack, ManyShotJailbreakAttack, PromptSendingAttack, RedTeamingAttack, @@ -56,18 +57,18 @@ AttackTechniqueSpec( name="prompt_sending", attack_class=PromptSendingAttack, - strategy_tags=["core", "single_turn", "default"], + strategy_tags=["core", "single_turn", "default", "light"], ), AttackTechniqueSpec( name="role_play", attack_class=RolePlayAttack, - strategy_tags=["core", "single_turn"], + strategy_tags=["core", "single_turn", "light"], extra_kwargs={"role_play_definition_path": RolePlayPaths.MOVIE_SCRIPT.value}, ), AttackTechniqueSpec( name="many_shot", attack_class=ManyShotJailbreakAttack, - strategy_tags=["core", "multi_turn", "default"], + strategy_tags=["core", "multi_turn", "default", "light"], ), AttackTechniqueSpec( name="tap", @@ -93,7 +94,12 @@ AttackTechniqueSpec( name="red_teaming", attack_class=RedTeamingAttack, - strategy_tags=["core", "multi_turn"], + strategy_tags=["core", "multi_turn", "light"], + ), + AttackTechniqueSpec( + name="context_compliance", + attack_class=ContextComplianceAttack, + strategy_tags=["core", "single_turn", "light"], ), ] diff --git a/pyrit/scenario/scenarios/benchmark/__init__.py b/pyrit/scenario/scenarios/benchmark/__init__.py new file mode 100644 index 0000000000..0b554670d2 --- /dev/null +++ b/pyrit/scenario/scenarios/benchmark/__init__.py @@ -0,0 +1,26 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +"""Benchmark scenario classes.""" + +from typing import Any + +from pyrit.scenario.scenarios.benchmark.adversarial import AdversarialBenchmark + + +def __getattr__(name: str) -> Any: + """ + Lazily resolve the dynamic BenchmarkStrategy class. + + Returns: + Any: The resolved strategy class. + + Raises: + AttributeError: If the attribute name is not recognized. + """ + if name == "AdversarialBenchmarkStrategy": + return AdversarialBenchmark.get_strategy_class() + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + +__all__ = ["AdversarialBenchmark", "AdversarialBenchmarkStrategy"] diff --git a/pyrit/scenario/scenarios/benchmark/adversarial.py b/pyrit/scenario/scenarios/benchmark/adversarial.py new file mode 100644 index 0000000000..bdcbd7e0d5 --- /dev/null +++ b/pyrit/scenario/scenarios/benchmark/adversarial.py @@ -0,0 +1,277 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +"""AdversarialBenchmark scenario — compare attack success rate across adversarial models.""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, ClassVar + +from pyrit.common import apply_defaults +from pyrit.executor.attack import AttackAdversarialConfig, AttackScoringConfig +from pyrit.registry import AttackTechniqueRegistry, AttackTechniqueSpec +from pyrit.registry.tag_query import TagQuery +from pyrit.scenario.core.atomic_attack import AtomicAttack +from pyrit.scenario.core.dataset_configuration import DatasetConfiguration +from pyrit.scenario.core.scenario import Scenario +from pyrit.scenario.core.scenario_techniques import SCENARIO_TECHNIQUES + +if TYPE_CHECKING: + from pyrit.prompt_target import PromptChatTarget + from pyrit.scenario.core.scenario_strategy import ScenarioStrategy + from pyrit.score import TrueFalseScorer + +logger = logging.getLogger(__name__) + + +class AdversarialBenchmark(Scenario): + """ + Benchmarking scenario that compares the attack success rate (ASR) + of several different adversarial models. + """ + + VERSION: int = 1 + _cached_strategy_class: ClassVar[type[ScenarioStrategy] | None] = None + + @classmethod + def get_strategy_class(cls) -> type[ScenarioStrategy]: + """ + Return the AdversarialBenchmarkStrategy enum, building on first access. + + Returns: + type[ScenarioStrategy]: The BenchmarkStrategy enum class. + """ + if cls._cached_strategy_class is None: + cls._cached_strategy_class = AdversarialBenchmark._build_benchmark_strategy() + + return cls._cached_strategy_class + + @classmethod + def get_default_strategy(cls) -> ScenarioStrategy: + """ + Return the default strategy (``light`` — run benchmark-friendly techniques + that can wrap up quickly and without too many system resources). + + Returns: + ScenarioStrategy: The ``light`` aggregate member. + """ + return cls.get_strategy_class()("light") + + @classmethod + def default_dataset_config(cls) -> DatasetConfiguration: + """ + Return the default dataset configuration for benchmarking. + + Returns: + DatasetConfiguration: Configuration with standard harm-category datasets. + """ + return DatasetConfiguration( + dataset_names=["harmbench"], + max_dataset_size=8, + ) + + @apply_defaults + def __init__( + self, + *, + adversarial_models: list[PromptChatTarget], + objective_scorer: TrueFalseScorer | None = None, + scenario_result_id: str | None = None, + ) -> None: + """ + Initialize the AdversarialBenchmark scenario. + + Args: + adversarial_models: A non-empty list of ``PromptChatTarget`` instances. + Labels are inferred from each target's identifier (preferring + ``underlying_model_name`` over ``model_name`` over the class + name). Identical targets are silently deduped and distinct + targets whose inferred names collide are suffixed (``_2``, + ``_3``, …) with a warning. + objective_scorer: Scorer for evaluating attack success. + Defaults to the registered default objective scorer. + scenario_result_id: Optional ID of an existing scenario + result to resume. + + Raises: + ValueError: If ``adversarial_models`` is empty or not a list. + """ + if not adversarial_models: + raise ValueError("adversarial_models must be a non-empty list of PromptChatTarget instances.") + + if not isinstance(adversarial_models, list): + raise ValueError("adversarial_models must be a list of PromptChatTarget instances.") + + # Infer labels, then wrap each bare target in a default AttackAdversarialConfig + # so it can be passed to factory.create() as an override. + labeled_targets = self._infer_labels(items=adversarial_models) + self._adversarial_configs: dict[str, AttackAdversarialConfig] = { + label: AttackAdversarialConfig(target=target) for label, target in labeled_targets.items() + } + + self._objective_scorer: TrueFalseScorer = ( + objective_scorer if objective_scorer else self._get_default_objective_scorer() + ) + + super().__init__( + version=self.VERSION, + objective_scorer=self._objective_scorer, + strategy_class=self.get_strategy_class(), + include_default_baseline=False, + scenario_result_id=scenario_result_id, + ) + + async def _get_atomic_attacks_async(self) -> list[AtomicAttack]: + """ + Build atomic attacks from the cross-product of techniques × models × datasets. + + Factories are built locally from adversarial-capable ``SCENARIO_TECHNIQUES`` + (not the registry singleton). Each model is injected at create-time via + ``attack_adversarial_config_override``. + + Returns: + list[AtomicAttack]: One atomic attack per technique/model/dataset combination. + + Raises: + ValueError: If the scenario has not been initialized. + """ + if self._objective_target is None: + raise ValueError( + "Scenario not properly initialized. Call await scenario.initialize_async() before running." + ) + + benchmarkable_specs = AdversarialBenchmark._get_benchmarkable_specs() + local_factories = { + spec.name: AttackTechniqueRegistry.build_factory_from_spec(spec) for spec in benchmarkable_specs + } + + selected_techniques = {s.value for s in self._scenario_strategies} + seed_groups_by_dataset = self._dataset_config.get_seed_attack_groups() + scoring_config = AttackScoringConfig(objective_scorer=self._objective_scorer) + + atomic_attacks: list[AtomicAttack] = [] + for technique_name in selected_techniques: + factory = local_factories.get(technique_name) + if factory is None: + logger.warning("No factory for technique '%s', skipping.", technique_name) + continue + + for model_label, adv_config in self._adversarial_configs.items(): + for dataset_name, seed_groups in seed_groups_by_dataset.items(): + attack_technique = factory.create( + objective_target=self._objective_target, + attack_scoring_config=scoring_config, + attack_adversarial_config_override=adv_config, + ) + atomic_attacks.append( + AtomicAttack( + atomic_attack_name=f"{technique_name}__{model_label}__{dataset_name}", + attack_technique=attack_technique, + seed_groups=list(seed_groups), + adversarial_chat=adv_config.target, + objective_scorer=self._objective_scorer, + memory_labels=self._memory_labels, + display_group=model_label, + ) + ) + + return atomic_attacks + + @staticmethod + def _infer_labels( + *, + items: list[PromptChatTarget], + ) -> dict[str, PromptChatTarget]: + """ + Infer user-facing labels for a list of adversarial targets. + + The dedupe key is ``target.get_identifier().hash`` so identical + targets collapse to a single entry silently, while two distinct + targets whose inferred names happen to match get a numeric suffix + and a ``logger.warning`` so the situation isn't silent. + + Args: + items: List of ``PromptChatTarget`` instances. + + Returns: + dict[str, PromptChatTarget]: Mapping from inferred label to the + original target. Targets are wrapped in an + ``AttackAdversarialConfig`` by ``__init__`` after this call. + """ + result: dict[str, PromptChatTarget] = {} + seen_keys: dict[str, str | None] = {} + + for target in items: + identifier = target.get_identifier() + params = identifier.params or {} + base_name = params.get("underlying_model_name") or params.get("model_name") or type(target).__name__ + + dedupe_key = identifier.hash + + # Identical target already stored under some label — silently drop. + if dedupe_key in seen_keys.values(): + continue + + if base_name not in seen_keys: + result[base_name] = target + seen_keys[base_name] = dedupe_key + continue + + # Distinct target colliding on inferred name — find next free suffix and warn. + counter = 2 + while f"{base_name}_{counter}" in seen_keys: + counter += 1 + suffixed = f"{base_name}_{counter}" + logger.warning( + "Inferred label '%s' collided with a different model setup; using '%s' instead.", + base_name, + suffixed, + ) + result[suffixed] = target + seen_keys[suffixed] = dedupe_key + + return result + + @staticmethod + def _build_benchmark_strategy() -> type[ScenarioStrategy]: + """ + Build the BenchmarkStrategy enum from adversarial-capable ``SCENARIO_TECHNIQUES``. + + Returns a strategy class whose concrete members are adversarial-capable + techniques (no baked-in adversarial chat) and whose aggregates allow + selecting by turn style. + + Returns: + type[ScenarioStrategy]: The dynamically generated strategy enum class. + """ + specs = AdversarialBenchmark._get_benchmarkable_specs() + return AttackTechniqueRegistry.build_strategy_class_from_specs( # type: ignore[ty:invalid-return-type] + class_name="BenchmarkStrategy", + specs=TagQuery.all("core").filter(specs), + aggregate_tags={ + "default": TagQuery.any_of("default"), + "single_turn": TagQuery.any_of("single_turn"), + "multi_turn": TagQuery.any_of("multi_turn"), + "light": TagQuery.any_of("light"), + }, + ) + + @staticmethod + def _get_benchmarkable_specs() -> list[AttackTechniqueSpec]: + """ + Return techniques from ``SCENARIO_TECHNIQUES`` that accept an adversarial + model but don't have one already baked in. + + This is the dual guard: ``_accepts_adversarial`` ensures the technique + CAN use an adversarial model, and ``adversarial_chat is None`` ensures + it doesn't already have one set — we inject our own at create-time. + + Returns: + list[AttackTechniqueSpec]: Filtered, adversarial-ready specs. + """ + return [ + spec + for spec in SCENARIO_TECHNIQUES + if AttackTechniqueRegistry._accepts_adversarial(spec.attack_class) and spec.adversarial_chat is None + ] diff --git a/tests/unit/scenario/test_adversarial.py b/tests/unit/scenario/test_adversarial.py new file mode 100644 index 0000000000..e6b082cb0d --- /dev/null +++ b/tests/unit/scenario/test_adversarial.py @@ -0,0 +1,568 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +"""Tests for the AdversarialBenchmark scenario.""" + +import copy +from dataclasses import FrozenInstanceError +from unittest.mock import MagicMock, patch + +import pytest + +from pyrit.executor.attack import AttackAdversarialConfig +from pyrit.identifiers import ComponentIdentifier +from pyrit.models import ( + AttackOutcome, + AttackResult, + ScenarioIdentifier, + ScenarioResult, + SeedAttackGroup, + SeedObjective, + SeedPrompt, +) +from pyrit.prompt_target import PromptTarget +from pyrit.prompt_target.common.prompt_chat_target import PromptChatTarget +from pyrit.registry.object_registries.attack_technique_registry import AttackTechniqueRegistry +from pyrit.scenario.core import AtomicAttack +from pyrit.scenario.core.dataset_configuration import DatasetConfiguration +from pyrit.scenario.core.scenario_techniques import SCENARIO_TECHNIQUES +from pyrit.scenario.scenarios.benchmark.adversarial import AdversarialBenchmark +from pyrit.score import TrueFalseScorer + +# Self-pinned: any change to ``_get_benchmarkable_specs`` (or to the ``light`` tag +# membership in SCENARIO_TECHNIQUES) is reflected automatically — no magic numbers. +# +# ``_BENCHMARKABLE_*`` covers every adversarial-capable spec (used to verify the +# strategy enum's full concrete-member roster). ``_LIGHT_BENCHMARKABLE_*`` covers +# only the subset tagged ``"light"`` (used for runtime expectations under the +# default ``"light"`` strategy). +_BENCHMARKABLE_SPECS = AdversarialBenchmark._get_benchmarkable_specs() +_NUM_ADVERSARIAL_TECHNIQUES = len(_BENCHMARKABLE_SPECS) +_BENCHMARKABLE_TECHNIQUE_NAMES = {spec.name for spec in _BENCHMARKABLE_SPECS} +_BENCHMARKABLE_ATTACK_CLASSES = {spec.attack_class for spec in _BENCHMARKABLE_SPECS} + +_LIGHT_BENCHMARKABLE_SPECS = [spec for spec in _BENCHMARKABLE_SPECS if "light" in spec.strategy_tags] +_NUM_LIGHT_BENCHMARKABLE = len(_LIGHT_BENCHMARKABLE_SPECS) + +# --------------------------------------------------------------------------- +# Synthetic many-shot examples — prevents reading the real JSON during tests +# --------------------------------------------------------------------------- +_MOCK_MANY_SHOT_EXAMPLES = [{"question": f"test question {i}", "answer": f"test answer {i}"} for i in range(100)] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _mock_id(name: str, *, params: dict | None = None) -> ComponentIdentifier: + return ComponentIdentifier(class_name=name, class_module="test", params=params or {}) + + +def _make_adversarial_target(name: str, *, params: dict | None = None) -> MagicMock: + """Create a mock PromptChatTarget with a given model name and optional identifier params. + + By default, ``model_name`` is stamped into the identifier params so the + inferred label produced by ``_infer_labels`` matches ``name``. Pass an + explicit ``params`` dict to override (e.g. to omit the key for collision + testing or to add ``underlying_model_name`` / ``endpoint``). + """ + mock = MagicMock(spec=PromptChatTarget) + mock._model_name = name + mock.get_identifier.return_value = _mock_id(name, params=params if params is not None else {"model_name": name}) + return mock + + +def _make_seed_groups(name: str) -> list[SeedAttackGroup]: + """Create two seed attack groups for a given category.""" + return [ + SeedAttackGroup(seeds=[SeedObjective(value=f"{name} objective 1"), SeedPrompt(value=f"{name} prompt 1")]), + SeedAttackGroup(seeds=[SeedObjective(value=f"{name} objective 2"), SeedPrompt(value=f"{name} prompt 2")]), + ] + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def all_supported_attacks(): + """All attacks that currently support adversarial models (computed from production).""" + return _BENCHMARKABLE_TECHNIQUE_NAMES + + +@pytest.fixture +def mock_objective_target(): + mock = MagicMock(spec=PromptTarget) + mock.get_identifier.return_value = _mock_id("MockObjectiveTarget") + return mock + + +@pytest.fixture +def two_adversarial_models(): + """Two mock adversarial models for benchmark permutation.""" + return [_make_adversarial_target("model_a"), _make_adversarial_target("model_b")] + + +@pytest.fixture +def single_adversarial_model(): + """Single mock adversarial model.""" + return [_make_adversarial_target("model_a")] + + +@pytest.fixture(autouse=True) +def reset_technique_registry(): + """Reset the AttackTechniqueRegistry and cached strategy class between tests.""" + from pyrit.registry import TargetRegistry + + AttackTechniqueRegistry.reset_instance() + TargetRegistry.reset_instance() + AdversarialBenchmark._cached_strategy_class = None + yield + AttackTechniqueRegistry.reset_instance() + TargetRegistry.reset_instance() + AdversarialBenchmark._cached_strategy_class = None + + +@pytest.fixture(autouse=True) +def patch_many_shot_load(): + """Prevent ManyShotJailbreakAttack from loading the full bundled dataset.""" + with patch( + "pyrit.executor.attack.single_turn.many_shot_jailbreak.load_many_shot_jailbreaking_dataset", + return_value=_MOCK_MANY_SHOT_EXAMPLES, + ): + yield + + +@pytest.fixture +def mock_runtime_env(): + """Set minimal env vars needed for OpenAIChatTarget fallback via @apply_defaults.""" + with patch.dict( + "os.environ", + { + "OPENAI_CHAT_ENDPOINT": "https://test.openai.azure.com/", + "OPENAI_CHAT_KEY": "test-key", + "OPENAI_CHAT_MODEL": "gpt-4", + }, + ): + yield + + +FIXTURES = ["patch_central_database", "mock_runtime_env"] + + +# =========================================================================== +# Type and syntax tests +# =========================================================================== + + +@pytest.mark.usefixtures(*FIXTURES) +class TestBenchmarkTypes: + """Unit tests for types, validation, and basic construction.""" + + def test_empty_list_adversarial_models_raises(self): + """Passing an empty list must raise ValueError.""" + with pytest.raises(ValueError, match="non-empty"): + AdversarialBenchmark(adversarial_models=[]) + + def test_unsupported_type_adversarial_models_raises(self): + """Passing a non-list type must raise ValueError.""" + with pytest.raises(ValueError, match="non-empty list|list of PromptChatTarget"): + AdversarialBenchmark(adversarial_models="not-a-list") # type: ignore[arg-type] + + def test_version_is_1(self): + assert AdversarialBenchmark.VERSION == 1 + + def test_default_dataset_config_uses_harmbench(self): + config = AdversarialBenchmark.default_dataset_config() + assert isinstance(config, DatasetConfiguration) + names = config.get_default_dataset_names() + assert "harmbench" in names + + def test_default_dataset_config_max_size_is_8(self): + config = AdversarialBenchmark.default_dataset_config() + assert config.max_dataset_size == 8 + + def test_frozen_spec_cannot_be_mutated(self): + """AttackTechniqueSpec is frozen — direct mutation must raise.""" + spec = SCENARIO_TECHNIQUES[0] + with pytest.raises(FrozenInstanceError): + spec.name = "mutated" # type: ignore[misc] + + +# =========================================================================== +# Strategy construction tests +# =========================================================================== + + +def _make_benchmark(adversarial_models): + """Helper to create a AdversarialBenchmark with mocked default scorer.""" + with patch("pyrit.scenario.core.scenario.Scenario._get_default_objective_scorer") as mock_scorer: + mock_scorer.return_value = MagicMock(spec=TrueFalseScorer, get_identifier=lambda: _mock_id("scorer")) + return AdversarialBenchmark(adversarial_models=adversarial_models) + + +@pytest.mark.usefixtures(*FIXTURES) +class TestBenchmarkStrategy: + """Tests for the (static) BenchmarkStrategy enum and instance-level wiring.""" + + def test_strategy_includes_all_adversarial_techniques(self, all_supported_attacks): + """get_strategy_class() concrete members match the adversarial-capable spec set.""" + strat = AdversarialBenchmark.get_strategy_class() + values = {s.value for s in strat.get_all_strategies()} + assert values == all_supported_attacks + + def test_strategy_has_no_permuted_members(self): + """No ``__model`` suffixes — models are a runtime parameter, not a strategy axis.""" + strat = AdversarialBenchmark.get_strategy_class() + values = {s.value for s in strat.get_all_strategies()} + assert not any("__" in v for v in values) + + def test_strategy_excludes_non_adversarial_techniques(self): + """prompt_sending and many_shot don't accept an adversarial chat and must be excluded.""" + strat = AdversarialBenchmark.get_strategy_class() + values = {s.value for s in strat.get_all_strategies()} + assert "prompt_sending" not in values + assert "many_shot" not in values + + def test_strategy_class_is_static(self, single_adversarial_model, two_adversarial_models): + """All instances share the same strategy class — no per-instance permutation.""" + s1 = _make_benchmark(single_adversarial_model) + s2 = _make_benchmark(two_adversarial_models) + assert s1._strategy_class is s2._strategy_class + assert s1._strategy_class is AdversarialBenchmark.get_strategy_class() + + def test_default_strategy_is_light(self): + """Default expands to every benchmarkable technique via the ``all`` aggregate.""" + default = AdversarialBenchmark.get_default_strategy() + assert default.value == "light" + + def test_benchmarkable_specs_have_no_adversarial_chat(self): + """Filtered specs must leave adversarial_chat unset — the scenario injects its own.""" + for spec in AdversarialBenchmark._get_benchmarkable_specs(): + assert spec.adversarial_chat is None + + def test_benchmarkable_specs_accept_adversarial(self): + """All filtered specs must accept attack_adversarial_config.""" + for spec in AdversarialBenchmark._get_benchmarkable_specs(): + assert AttackTechniqueRegistry._accepts_adversarial(spec.attack_class) + + def test_original_scenario_techniques_unmodified(self, two_adversarial_models): + """SCENARIO_TECHNIQUES global must not be mutated by spec filtering.""" + original = copy.deepcopy([(s.name, s.attack_class) for s in SCENARIO_TECHNIQUES]) + _make_benchmark(two_adversarial_models) + current = [(s.name, s.attack_class) for s in SCENARIO_TECHNIQUES] + assert current == original + + def test_singleton_registry_not_polluted(self, two_adversarial_models): + """Building atomic attacks must not register anything in the global singleton.""" + _make_benchmark(two_adversarial_models) + registry = AttackTechniqueRegistry.get_registry_singleton() + factories = registry.get_factories() + assert not any("__" in name for name in factories) + + def test_scenario_name(self, single_adversarial_model): + """Scenario name should be 'AdversarialBenchmark'.""" + scenario = _make_benchmark(single_adversarial_model) + assert scenario.name == "AdversarialBenchmark" + + +# =========================================================================== +# Runtime / attack generation tests +# =========================================================================== + + +@pytest.mark.usefixtures(*FIXTURES) +class TestBenchmarkRuntime: + """Tests for _get_atomic_attacks_async and display grouping.""" + + async def _init_and_get_attacks( + self, + *, + mock_objective_target, + adversarial_models, + seed_groups: dict[str, list[SeedAttackGroup]] | None = None, + strategies=None, + ) -> tuple[AdversarialBenchmark, list[AtomicAttack]]: + """Helper: create AdversarialBenchmark, initialize, return (scenario, attacks).""" + groups = seed_groups or {"harmbench": _make_seed_groups("harmbench")} + with ( + patch.object(DatasetConfiguration, "get_seed_attack_groups", return_value=groups), + patch("pyrit.scenario.core.scenario.Scenario._get_default_objective_scorer") as mock_scorer, + ): + mock_scorer.return_value = MagicMock(spec=TrueFalseScorer, get_identifier=lambda: _mock_id("scorer")) + scenario = AdversarialBenchmark(adversarial_models=adversarial_models) + init_kwargs: dict = {"objective_target": mock_objective_target} + if strategies: + init_kwargs["scenario_strategies"] = strategies + await scenario.initialize_async(**init_kwargs) + attacks = await scenario._get_atomic_attacks_async() + return scenario, attacks + + @pytest.mark.asyncio + async def test_default_strategy_runs_light_techniques(self, mock_objective_target, two_adversarial_models): + """With no strategies passed, default ``light`` produces N_light x N_models attacks.""" + _, attacks = await self._init_and_get_attacks( + mock_objective_target=mock_objective_target, + adversarial_models=two_adversarial_models, + ) + assert len(attacks) == _NUM_LIGHT_BENCHMARKABLE * 2 + + @pytest.mark.asyncio + async def test_all_strategy_produces_full_cross_product(self, mock_objective_target, two_adversarial_models): + """ALL strategy: N_techniques x 2 models x 1 dataset attacks.""" + with ( + patch.object( + DatasetConfiguration, + "get_seed_attack_groups", + return_value={"harmbench": _make_seed_groups("harmbench")}, + ), + patch("pyrit.scenario.core.scenario.Scenario._get_default_objective_scorer") as mock_scorer, + ): + mock_scorer.return_value = MagicMock(spec=TrueFalseScorer, get_identifier=lambda: _mock_id("scorer")) + scenario = AdversarialBenchmark(adversarial_models=two_adversarial_models) + all_strat = scenario._strategy_class("all") + await scenario.initialize_async(objective_target=mock_objective_target, scenario_strategies=[all_strat]) + attacks = await scenario._get_atomic_attacks_async() + assert len(attacks) == _NUM_ADVERSARIAL_TECHNIQUES * 2 + + @pytest.mark.asyncio + async def test_atomic_attack_names_are_unique(self, mock_objective_target, two_adversarial_models): + """All atomic_attack_name values must be unique for resume correctness.""" + with ( + patch.object( + DatasetConfiguration, + "get_seed_attack_groups", + return_value={"harmbench": _make_seed_groups("harmbench")}, + ), + patch("pyrit.scenario.core.scenario.Scenario._get_default_objective_scorer") as mock_scorer, + ): + mock_scorer.return_value = MagicMock(spec=TrueFalseScorer, get_identifier=lambda: _mock_id("scorer")) + scenario = AdversarialBenchmark(adversarial_models=two_adversarial_models) + all_strat = scenario._strategy_class("all") + await scenario.initialize_async(objective_target=mock_objective_target, scenario_strategies=[all_strat]) + attacks = await scenario._get_atomic_attacks_async() + names = [a.atomic_attack_name for a in attacks] + assert len(names) == len(set(names)) + + @pytest.mark.asyncio + async def test_atomic_attack_names_follow_pattern(self, mock_objective_target, single_adversarial_model): + """Each atomic_attack_name should contain the technique__model and dataset.""" + with ( + patch.object( + DatasetConfiguration, + "get_seed_attack_groups", + return_value={"harmbench": _make_seed_groups("harmbench")}, + ), + patch("pyrit.scenario.core.scenario.Scenario._get_default_objective_scorer") as mock_scorer, + ): + mock_scorer.return_value = MagicMock(spec=TrueFalseScorer, get_identifier=lambda: _mock_id("scorer")) + scenario = AdversarialBenchmark(adversarial_models=single_adversarial_model) + all_strat = scenario._strategy_class("all") + await scenario.initialize_async(objective_target=mock_objective_target, scenario_strategies=[all_strat]) + attacks = await scenario._get_atomic_attacks_async() + for a in attacks: + assert "_harmbench" in a.atomic_attack_name + assert "__model_a" in a.atomic_attack_name + + @pytest.mark.asyncio + async def test_display_groups_by_adversarial_model(self, mock_objective_target, two_adversarial_models): + """display_group should group by model label, not by technique or dataset.""" + with ( + patch.object( + DatasetConfiguration, + "get_seed_attack_groups", + return_value={"harmbench": _make_seed_groups("harmbench")}, + ), + patch("pyrit.scenario.core.scenario.Scenario._get_default_objective_scorer") as mock_scorer, + ): + mock_scorer.return_value = MagicMock(spec=TrueFalseScorer, get_identifier=lambda: _mock_id("scorer")) + scenario = AdversarialBenchmark(adversarial_models=two_adversarial_models) + all_strat = scenario._strategy_class("all") + await scenario.initialize_async(objective_target=mock_objective_target, scenario_strategies=[all_strat]) + attacks = await scenario._get_atomic_attacks_async() + display_groups = {a.display_group for a in attacks} + assert display_groups == {"model_a", "model_b"} + + @pytest.mark.asyncio + async def test_raises_when_not_initialized(self, single_adversarial_model): + """_get_atomic_attacks_async must raise if initialize_async was not called.""" + scenario = _make_benchmark(single_adversarial_model) + with pytest.raises(ValueError, match="Scenario not properly initialized"): + await scenario._get_atomic_attacks_async() + + @pytest.mark.asyncio + async def test_multiple_datasets_multiplies_attacks(self, mock_objective_target, single_adversarial_model): + """1 model x N_light_techniques x 2 datasets = 2 * N_light atomic attacks (default ``light``).""" + two_datasets = { + "harmbench": _make_seed_groups("harmbench"), + "extra": _make_seed_groups("extra"), + } + _, attacks = await self._init_and_get_attacks( + mock_objective_target=mock_objective_target, + adversarial_models=single_adversarial_model, + seed_groups=two_datasets, + ) + assert len(attacks) == _NUM_LIGHT_BENCHMARKABLE * 2 + + @pytest.mark.asyncio + async def test_attacks_use_all_benchmarkable_attack_classes(self, mock_objective_target, single_adversarial_model): + """Under the ``all`` strategy, atomic attacks must cover every adversarial-capable attack class.""" + scenario_class_strategies = AdversarialBenchmark.get_strategy_class() + _, attacks = await self._init_and_get_attacks( + mock_objective_target=mock_objective_target, + adversarial_models=single_adversarial_model, + strategies=[scenario_class_strategies("all")], + ) + technique_classes = {type(a.attack_technique.attack) for a in attacks} + assert technique_classes == _BENCHMARKABLE_ATTACK_CLASSES + + @pytest.mark.asyncio + async def test_attacks_carry_seed_groups(self, mock_objective_target, single_adversarial_model): + """Each atomic attack should have non-empty objectives from the seed groups.""" + _, attacks = await self._init_and_get_attacks( + mock_objective_target=mock_objective_target, + adversarial_models=single_adversarial_model, + ) + for a in attacks: + assert len(a.objectives) > 0 + + @pytest.mark.asyncio + async def test_baseline_excluded(self, mock_objective_target, single_adversarial_model): + """AdversarialBenchmark must opt out of the parent's default baseline. + + Verifies both the configuration toggle (``_include_baseline is False``) and + the observable property (no atomic attack is named ``"baseline"``). + """ + scenario, attacks = await self._init_and_get_attacks( + mock_objective_target=mock_objective_target, + adversarial_models=single_adversarial_model, + ) + assert scenario._include_baseline is False + assert not any(a.atomic_attack_name == "baseline" for a in attacks) + + +# =========================================================================== +# adversarial_models normalization tests (label inference / dedupe / collision) +# =========================================================================== + + +@pytest.mark.usefixtures(*FIXTURES) +class TestBenchmarkAdversarialModelsNormalization: + """Tests for the list → ``dict[str, AttackAdversarialConfig]`` normalization in __init__. + + Labels are inferred from each target's identifier; identical targets dedupe + silently, distinct targets whose inferred names collide get suffixed with + a warning. + """ + + def test_list_of_targets_infers_labels_from_model_name(self): + """A list of bare targets is normalized to {model_name: AttackAdversarialConfig}.""" + t1 = _make_adversarial_target("t1", params={"model_name": "alpha"}) + t2 = _make_adversarial_target("t2", params={"model_name": "beta"}) + scenario = _make_benchmark([t1, t2]) + assert set(scenario._adversarial_configs.keys()) == {"alpha", "beta"} + assert all(isinstance(v, AttackAdversarialConfig) for v in scenario._adversarial_configs.values()) + assert scenario._adversarial_configs["alpha"].target is t1 + assert scenario._adversarial_configs["beta"].target is t2 + + def test_list_falls_back_to_underlying_model_name(self): + """``underlying_model_name`` is preferred over ``model_name`` when present.""" + t = _make_adversarial_target("t", params={"underlying_model_name": "gpt-4o", "model_name": "wrapper"}) + scenario = _make_benchmark([t]) + assert "gpt-4o" in scenario._adversarial_configs + + def test_list_dedupe_silent_for_identical_target(self, caplog): + """The same target instance passed twice in a list collapses to one entry, silently.""" + t = _make_adversarial_target("t", params={"model_name": "alpha"}) + with caplog.at_level("WARNING"): + scenario = _make_benchmark([t, t]) + assert list(scenario._adversarial_configs.keys()) == ["alpha"] + assert "collided" not in caplog.text + + def test_list_collision_suffixes_distinct_targets_and_warns(self, caplog): + """Two distinct targets that infer the same name get suffixed and a warning is logged.""" + t1 = _make_adversarial_target("t1", params={"model_name": "alpha", "endpoint": "ep1"}) + t2 = _make_adversarial_target("t2", params={"model_name": "alpha", "endpoint": "ep2"}) + with caplog.at_level("WARNING"): + scenario = _make_benchmark([t1, t2]) + assert set(scenario._adversarial_configs.keys()) == {"alpha", "alpha_2"} + assert "collided" in caplog.text + + +# =========================================================================== +# ASR-sensibility tests (per-model breakdown math) +# =========================================================================== + + +@pytest.mark.usefixtures("patch_central_database") +class TestBenchmarkASRBreakdown: + """Verify the per-display-group ASR math the notebook sanity check relies on. + + A higher per-group success rate must correspond to more ``AttackOutcome.SUCCESS`` + results in that group. This test pins the invariant that lets reviewers trust + the printed breakdown when comparing adversarial models or system prompts. + """ + + @staticmethod + def _result(*, conv_id: str, outcome: AttackOutcome) -> AttackResult: + return AttackResult( + conversation_id=conv_id, + objective="objective", + outcome=outcome, + executed_turns=1, + ) + + def test_per_model_breakdown_reflects_outcome_counts(self): + """High-success model > low-success model in per-group ASR; math invariants hold.""" + # Two techniques × two models, mirroring how AdversarialBenchmark keys atomic_attack_name + # ("{technique}__{model_label}__{dataset}") and folds them into model_label. + attack_results: dict[str, list[AttackResult]] = { + "role_play__model_high__hb": [ + self._result(conv_id=f"high-rp-{i}", outcome=AttackOutcome.SUCCESS) for i in range(3) + ], + "context_compliance__model_high__hb": [ + self._result(conv_id=f"high-cc-{i}", outcome=AttackOutcome.SUCCESS) for i in range(3) + ], + "role_play__model_low__hb": [ + self._result(conv_id=f"low-rp-{i}", outcome=AttackOutcome.FAILURE) for i in range(3) + ], + "context_compliance__model_low__hb": [ + self._result(conv_id=f"low-cc-{i}", outcome=AttackOutcome.FAILURE) for i in range(3) + ], + } + display_group_map = { + "role_play__model_high__hb": "model_high", + "context_compliance__model_high__hb": "model_high", + "role_play__model_low__hb": "model_low", + "context_compliance__model_low__hb": "model_low", + } + result = ScenarioResult( + scenario_identifier=ScenarioIdentifier(name="AdversarialBenchmark", scenario_version=1), + objective_target_identifier=ComponentIdentifier(class_name="MockTarget", class_module="test"), + attack_results=attack_results, + objective_scorer_identifier=ComponentIdentifier(class_name="MockScorer", class_module="test"), + display_group_map=display_group_map, + ) + + groups = result.get_display_groups() + assert set(groups.keys()) == {"model_high", "model_low"} + + per_group = { + label: int(sum(1 for r in rs if r.outcome == AttackOutcome.SUCCESS) / max(len(rs), 1) * 100) + for label, rs in groups.items() + } + + # The whole point of the sanity check: more SUCCESSes ⇒ higher rate. + assert per_group["model_high"] == 100 + assert per_group["model_low"] == 0 + assert per_group["model_high"] > per_group["model_low"] + # Bounds invariant the notebook asserts. + assert all(0 <= rate <= 100 for rate in per_group.values()) + + # Overall rate matches the weighted average (6 SUCCESS / 12 total = 50%). + assert result.objective_achieved_rate() == 50 + + # Display grouping must not lose results. + assert sum(len(rs) for rs in groups.values()) == sum(len(rs) for rs in attack_results.values()) diff --git a/tests/unit/scenario/test_rapid_response.py b/tests/unit/scenario/test_rapid_response.py index ddf95df2e6..0c53e4ac9f 100644 --- a/tests/unit/scenario/test_rapid_response.py +++ b/tests/unit/scenario/test_rapid_response.py @@ -10,6 +10,7 @@ from pyrit.common.path import DATASETS_PATH from pyrit.executor.attack import ( + ContextComplianceAttack, ManyShotJailbreakAttack, PromptSendingAttack, RolePlayAttack, @@ -261,7 +262,7 @@ async def test_default_strategy_produces_prompt_sending_and_many_shot( technique_classes = {type(a.attack_technique.attack) for a in attacks} assert technique_classes == {PromptSendingAttack, ManyShotJailbreakAttack} - async def test_single_turn_strategy_produces_prompt_sending_and_role_play( + async def test_single_turn_strategy_produces_single_turn_attacks( self, mock_objective_target, mock_objective_scorer ): attacks = await self._init_and_get_attacks( @@ -270,7 +271,11 @@ async def test_single_turn_strategy_produces_prompt_sending_and_role_play( strategies=[_strategy_class().SINGLE_TURN], ) technique_classes = {type(a.attack_technique.attack) for a in attacks} - assert technique_classes == {PromptSendingAttack, RolePlayAttack} + # Every core technique tagged ``single_turn`` in SCENARIO_TECHNIQUES must appear. + assert {PromptSendingAttack, RolePlayAttack, ContextComplianceAttack} <= technique_classes + # And no multi-turn-only attack should leak in. + assert ManyShotJailbreakAttack not in technique_classes + assert TreeOfAttacksWithPruningAttack not in technique_classes async def test_multi_turn_strategy_produces_multi_turn_attacks(self, mock_objective_target, mock_objective_scorer): attacks = await self._init_and_get_attacks(