From eba4567df641ed17e5157909177d34f5f0048024 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 01:44:23 +0000 Subject: [PATCH 1/5] Initial plan From 93b841016cef76b0c587ba08f78e04f16849dc6e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 01:49:13 +0000 Subject: [PATCH 2/5] Add ASR reporting by converter type with tests Co-authored-by: romanlutz <10245648+romanlutz@users.noreply.github.com> --- pyrit/analytics/result_analysis.py | 32 ++- tests/unit/analytics/test_result_analysis.py | 221 +++++++++++++++++++ 2 files changed, 251 insertions(+), 2 deletions(-) diff --git a/pyrit/analytics/result_analysis.py b/pyrit/analytics/result_analysis.py index 7db3c02e87..7b672309c4 100644 --- a/pyrit/analytics/result_analysis.py +++ b/pyrit/analytics/result_analysis.py @@ -38,7 +38,8 @@ def analyze_results(attack_results: list[AttackResult]) -> dict[str, AttackStats Returns: A dictionary of AttackStats objects. The overall stats are accessible with the key "Overall", and the stats of any attack can be retrieved using "By_attack_identifier" - followed by the identifier of the attack. + followed by the identifier of the attack. Stats grouped by converter type can be + retrieved using "By_converter_type". Raises: ValueError: if attack_results is empty. @@ -48,7 +49,8 @@ def analyze_results(attack_results: list[AttackResult]) -> dict[str, AttackStats >>> analyze_results(attack_results) { "Overall": AttackStats, - "By_attack_identifier": dict[str, AttackStats] + "By_attack_identifier": dict[str, AttackStats], + "By_converter_type": dict[str, AttackStats] } """ if not attack_results: @@ -56,6 +58,7 @@ def analyze_results(attack_results: list[AttackResult]) -> dict[str, AttackStats overall_counts: DefaultDict[str, int] = defaultdict(int) by_type_counts: DefaultDict[str, DefaultDict[str, int]] = defaultdict(lambda: defaultdict(int)) + by_converter_counts: DefaultDict[str, DefaultDict[str, int]] = defaultdict(lambda: defaultdict(int)) for attack in attack_results: if not isinstance(attack, AttackResult): @@ -64,15 +67,30 @@ def analyze_results(attack_results: list[AttackResult]) -> dict[str, AttackStats outcome = attack.outcome attack_type = attack.attack_identifier.get("type", "unknown") + # Extract converter types from last_response + converter_types = [] + if attack.last_response and attack.last_response.converter_identifiers: + converter_types = [conv.class_name for conv in attack.last_response.converter_identifiers] + + # If no converters, track as "no_converter" + if not converter_types: + converter_types = ["no_converter"] + if outcome == AttackOutcome.SUCCESS: overall_counts["successes"] += 1 by_type_counts[attack_type]["successes"] += 1 + for converter_type in converter_types: + by_converter_counts[converter_type]["successes"] += 1 elif outcome == AttackOutcome.FAILURE: overall_counts["failures"] += 1 by_type_counts[attack_type]["failures"] += 1 + for converter_type in converter_types: + by_converter_counts[converter_type]["failures"] += 1 else: overall_counts["undetermined"] += 1 by_type_counts[attack_type]["undetermined"] += 1 + for converter_type in converter_types: + by_converter_counts[converter_type]["undetermined"] += 1 overall_stats = _compute_stats( successes=overall_counts["successes"], @@ -89,7 +107,17 @@ def analyze_results(attack_results: list[AttackResult]) -> dict[str, AttackStats for attack_type, counts in by_type_counts.items() } + by_converter_stats = { + converter_type: _compute_stats( + successes=counts["successes"], + failures=counts["failures"], + undetermined=counts["undetermined"], + ) + for converter_type, counts in by_converter_counts.items() + } + return { "Overall": overall_stats, "By_attack_identifier": by_type_stats, + "By_converter_type": by_converter_stats, } diff --git a/tests/unit/analytics/test_result_analysis.py b/tests/unit/analytics/test_result_analysis.py index 44b2a56e8a..377bc7a00f 100644 --- a/tests/unit/analytics/test_result_analysis.py +++ b/tests/unit/analytics/test_result_analysis.py @@ -133,3 +133,224 @@ def test_group_by_attack_type_parametrized(items, type_key, exp_succ, exp_fail, assert stats.undetermined == exp_und assert stats.total_decided == exp_succ + exp_fail assert stats.success_rate == exp_rate + + +def test_analyze_results_returns_by_converter_type(): + """Test that analyze_results returns By_converter_type key.""" + attacks = [make_attack(AttackOutcome.SUCCESS)] + result = analyze_results(attacks) + + assert "By_converter_type" in result + assert isinstance(result["By_converter_type"], dict) + + +def test_analyze_results_no_converter_tracking(): + """Test that attacks without converters are tracked as 'no_converter'.""" + from pyrit.models import AttackOutcome, AttackResult + + attacks = [ + AttackResult( + conversation_id="conv-1", + objective="test", + attack_identifier={"type": "test"}, + outcome=AttackOutcome.SUCCESS, + last_response=None, # No response, so no converters + ), + AttackResult( + conversation_id="conv-2", + objective="test", + attack_identifier={"type": "test"}, + outcome=AttackOutcome.FAILURE, + last_response=None, + ), + ] + result = analyze_results(attacks) + + assert "no_converter" in result["By_converter_type"] + stats = result["By_converter_type"]["no_converter"] + assert stats.successes == 1 + assert stats.failures == 1 + assert stats.total_decided == 2 + assert stats.success_rate == 0.5 + + +def test_analyze_results_with_converter_identifiers(): + """Test that attacks with converters are properly grouped by converter type.""" + from pyrit.identifiers import ConverterIdentifier + from pyrit.models import AttackOutcome, AttackResult, MessagePiece + + # Create attacks with different converters + converter1 = ConverterIdentifier( + class_name="Base64Converter", + class_module="pyrit.prompt_converter.base64_converter", + class_description="Test converter", + identifier_type="instance", + supported_input_types=("text",), + supported_output_types=("text",), + ) + + converter2 = ConverterIdentifier( + class_name="ROT13Converter", + class_module="pyrit.prompt_converter.rot13_converter", + class_description="Test converter", + identifier_type="instance", + supported_input_types=("text",), + supported_output_types=("text",), + ) + + message1 = MessagePiece( + role="user", + original_value="test", + converter_identifiers=[converter1], + ) + + message2 = MessagePiece( + role="user", + original_value="test", + converter_identifiers=[converter2], + ) + + message3 = MessagePiece( + role="user", + original_value="test", + converter_identifiers=[converter1], + ) + + attacks = [ + AttackResult( + conversation_id="conv-1", + objective="test", + attack_identifier={"type": "test"}, + outcome=AttackOutcome.SUCCESS, + last_response=message1, + ), + AttackResult( + conversation_id="conv-2", + objective="test", + attack_identifier={"type": "test"}, + outcome=AttackOutcome.FAILURE, + last_response=message2, + ), + AttackResult( + conversation_id="conv-3", + objective="test", + attack_identifier={"type": "test"}, + outcome=AttackOutcome.SUCCESS, + last_response=message3, + ), + ] + + result = analyze_results(attacks) + + # Check Base64Converter stats + assert "Base64Converter" in result["By_converter_type"] + base64_stats = result["By_converter_type"]["Base64Converter"] + assert base64_stats.successes == 2 + assert base64_stats.failures == 0 + assert base64_stats.total_decided == 2 + assert base64_stats.success_rate == 1.0 + + # Check ROT13Converter stats + assert "ROT13Converter" in result["By_converter_type"] + rot13_stats = result["By_converter_type"]["ROT13Converter"] + assert rot13_stats.successes == 0 + assert rot13_stats.failures == 1 + assert rot13_stats.total_decided == 1 + assert rot13_stats.success_rate == 0.0 + + +def test_analyze_results_multiple_converters_per_attack(): + """Test that attacks with multiple converters count towards each converter's stats.""" + from pyrit.identifiers import ConverterIdentifier + from pyrit.models import AttackOutcome, AttackResult, MessagePiece + + converter1 = ConverterIdentifier( + class_name="Base64Converter", + class_module="pyrit.prompt_converter.base64_converter", + class_description="Test converter", + identifier_type="instance", + supported_input_types=("text",), + supported_output_types=("text",), + ) + + converter2 = ConverterIdentifier( + class_name="ROT13Converter", + class_module="pyrit.prompt_converter.rot13_converter", + class_description="Test converter", + identifier_type="instance", + supported_input_types=("text",), + supported_output_types=("text",), + ) + + # Attack with multiple converters (pipeline) + message = MessagePiece( + role="user", + original_value="test", + converter_identifiers=[converter1, converter2], + ) + + attacks = [ + AttackResult( + conversation_id="conv-1", + objective="test", + attack_identifier={"type": "test"}, + outcome=AttackOutcome.SUCCESS, + last_response=message, + ), + ] + + result = analyze_results(attacks) + + # Both converters should have the success counted + assert "Base64Converter" in result["By_converter_type"] + assert result["By_converter_type"]["Base64Converter"].successes == 1 + assert "ROT13Converter" in result["By_converter_type"] + assert result["By_converter_type"]["ROT13Converter"].successes == 1 + + +def test_analyze_results_converter_with_undetermined(): + """Test that undetermined outcomes are tracked correctly for converters.""" + from pyrit.identifiers import ConverterIdentifier + from pyrit.models import AttackOutcome, AttackResult, MessagePiece + + converter = ConverterIdentifier( + class_name="Base64Converter", + class_module="pyrit.prompt_converter.base64_converter", + class_description="Test converter", + identifier_type="instance", + supported_input_types=("text",), + supported_output_types=("text",), + ) + + message = MessagePiece( + role="user", + original_value="test", + converter_identifiers=[converter], + ) + + attacks = [ + AttackResult( + conversation_id="conv-1", + objective="test", + attack_identifier={"type": "test"}, + outcome=AttackOutcome.SUCCESS, + last_response=message, + ), + AttackResult( + conversation_id="conv-2", + objective="test", + attack_identifier={"type": "test"}, + outcome=AttackOutcome.UNDETERMINED, + last_response=message, + ), + ] + + result = analyze_results(attacks) + + assert "Base64Converter" in result["By_converter_type"] + stats = result["By_converter_type"]["Base64Converter"] + assert stats.successes == 1 + assert stats.failures == 0 + assert stats.undetermined == 1 + assert stats.total_decided == 1 + assert stats.success_rate == 1.0 From b94525af4fadb24f38666510da447ef435c1b76e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 01:52:02 +0000 Subject: [PATCH 3/5] Fix redundant imports based on code review feedback Co-authored-by: romanlutz <10245648+romanlutz@users.noreply.github.com> --- tests/unit/analytics/test_result_analysis.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/tests/unit/analytics/test_result_analysis.py b/tests/unit/analytics/test_result_analysis.py index 377bc7a00f..823d683295 100644 --- a/tests/unit/analytics/test_result_analysis.py +++ b/tests/unit/analytics/test_result_analysis.py @@ -4,7 +4,8 @@ import pytest from pyrit.analytics.result_analysis import AttackStats, analyze_results -from pyrit.models import AttackOutcome, AttackResult +from pyrit.identifiers import ConverterIdentifier +from pyrit.models import AttackOutcome, AttackResult, MessagePiece # helpers @@ -146,8 +147,6 @@ def test_analyze_results_returns_by_converter_type(): def test_analyze_results_no_converter_tracking(): """Test that attacks without converters are tracked as 'no_converter'.""" - from pyrit.models import AttackOutcome, AttackResult - attacks = [ AttackResult( conversation_id="conv-1", @@ -176,9 +175,6 @@ def test_analyze_results_no_converter_tracking(): def test_analyze_results_with_converter_identifiers(): """Test that attacks with converters are properly grouped by converter type.""" - from pyrit.identifiers import ConverterIdentifier - from pyrit.models import AttackOutcome, AttackResult, MessagePiece - # Create attacks with different converters converter1 = ConverterIdentifier( class_name="Base64Converter", @@ -261,9 +257,6 @@ def test_analyze_results_with_converter_identifiers(): def test_analyze_results_multiple_converters_per_attack(): """Test that attacks with multiple converters count towards each converter's stats.""" - from pyrit.identifiers import ConverterIdentifier - from pyrit.models import AttackOutcome, AttackResult, MessagePiece - converter1 = ConverterIdentifier( class_name="Base64Converter", class_module="pyrit.prompt_converter.base64_converter", @@ -310,9 +303,6 @@ def test_analyze_results_multiple_converters_per_attack(): def test_analyze_results_converter_with_undetermined(): """Test that undetermined outcomes are tracked correctly for converters.""" - from pyrit.identifiers import ConverterIdentifier - from pyrit.models import AttackOutcome, AttackResult, MessagePiece - converter = ConverterIdentifier( class_name="Base64Converter", class_module="pyrit.prompt_converter.base64_converter", From c24c5d277c38e984f6711bbfce55c18fc98452cc Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 01:54:53 +0000 Subject: [PATCH 4/5] Refactor tests and improve code clarity per review feedback Co-authored-by: romanlutz <10245648+romanlutz@users.noreply.github.com> --- pyrit/analytics/result_analysis.py | 2 +- tests/unit/analytics/test_result_analysis.py | 65 +++++++------------- 2 files changed, 24 insertions(+), 43 deletions(-) diff --git a/pyrit/analytics/result_analysis.py b/pyrit/analytics/result_analysis.py index 7b672309c4..63a6dadcff 100644 --- a/pyrit/analytics/result_analysis.py +++ b/pyrit/analytics/result_analysis.py @@ -69,7 +69,7 @@ def analyze_results(attack_results: list[AttackResult]) -> dict[str, AttackStats # Extract converter types from last_response converter_types = [] - if attack.last_response and attack.last_response.converter_identifiers: + if attack.last_response is not None and attack.last_response.converter_identifiers: converter_types = [conv.class_name for conv in attack.last_response.converter_identifiers] # If no converters, track as "no_converter" diff --git a/tests/unit/analytics/test_result_analysis.py b/tests/unit/analytics/test_result_analysis.py index 823d683295..4bd9e7ca4b 100644 --- a/tests/unit/analytics/test_result_analysis.py +++ b/tests/unit/analytics/test_result_analysis.py @@ -29,6 +29,24 @@ def make_attack( ) +def make_converter( + class_name: str, + class_module: str = "pyrit.prompt_converter.test_converter", +) -> ConverterIdentifier: + """ + Create a test ConverterIdentifier with minimal required fields. + """ + return ConverterIdentifier( + class_name=class_name, + class_module=class_module, + class_description="Test converter", + identifier_type="instance", + supported_input_types=("text",), + supported_output_types=("text",), + ) + + + def test_analyze_results_empty_raises(): with pytest.raises(ValueError): analyze_results([]) @@ -176,23 +194,8 @@ def test_analyze_results_no_converter_tracking(): def test_analyze_results_with_converter_identifiers(): """Test that attacks with converters are properly grouped by converter type.""" # Create attacks with different converters - converter1 = ConverterIdentifier( - class_name="Base64Converter", - class_module="pyrit.prompt_converter.base64_converter", - class_description="Test converter", - identifier_type="instance", - supported_input_types=("text",), - supported_output_types=("text",), - ) - - converter2 = ConverterIdentifier( - class_name="ROT13Converter", - class_module="pyrit.prompt_converter.rot13_converter", - class_description="Test converter", - identifier_type="instance", - supported_input_types=("text",), - supported_output_types=("text",), - ) + converter1 = make_converter("Base64Converter", "pyrit.prompt_converter.base64_converter") + converter2 = make_converter("ROT13Converter", "pyrit.prompt_converter.rot13_converter") message1 = MessagePiece( role="user", @@ -257,23 +260,8 @@ def test_analyze_results_with_converter_identifiers(): def test_analyze_results_multiple_converters_per_attack(): """Test that attacks with multiple converters count towards each converter's stats.""" - converter1 = ConverterIdentifier( - class_name="Base64Converter", - class_module="pyrit.prompt_converter.base64_converter", - class_description="Test converter", - identifier_type="instance", - supported_input_types=("text",), - supported_output_types=("text",), - ) - - converter2 = ConverterIdentifier( - class_name="ROT13Converter", - class_module="pyrit.prompt_converter.rot13_converter", - class_description="Test converter", - identifier_type="instance", - supported_input_types=("text",), - supported_output_types=("text",), - ) + converter1 = make_converter("Base64Converter", "pyrit.prompt_converter.base64_converter") + converter2 = make_converter("ROT13Converter", "pyrit.prompt_converter.rot13_converter") # Attack with multiple converters (pipeline) message = MessagePiece( @@ -303,14 +291,7 @@ def test_analyze_results_multiple_converters_per_attack(): def test_analyze_results_converter_with_undetermined(): """Test that undetermined outcomes are tracked correctly for converters.""" - converter = ConverterIdentifier( - class_name="Base64Converter", - class_module="pyrit.prompt_converter.base64_converter", - class_description="Test converter", - identifier_type="instance", - supported_input_types=("text",), - supported_output_types=("text",), - ) + converter = make_converter("Base64Converter", "pyrit.prompt_converter.base64_converter") message = MessagePiece( role="user", From a60ee0f7432da4a4269e716a17693e42e568d4f2 Mon Sep 17 00:00:00 2001 From: Roman Lutz Date: Fri, 6 Feb 2026 13:07:39 -0800 Subject: [PATCH 5/5] more generic approach to filtering --- doc/_toc.yml | 1 + doc/code/analytics/1_result_analysis.ipynb | 449 +++++++++++ doc/code/analytics/1_result_analysis.py | 234 ++++++ pyrit/analytics/__init__.py | 9 +- pyrit/analytics/result_analysis.py | 331 ++++++-- tests/unit/analytics/test_result_analysis.py | 774 ++++++++++++------- 6 files changed, 1440 insertions(+), 358 deletions(-) create mode 100644 doc/code/analytics/1_result_analysis.ipynb create mode 100644 doc/code/analytics/1_result_analysis.py diff --git a/doc/_toc.yml b/doc/_toc.yml index dc57042079..8c9052306d 100644 --- a/doc/_toc.yml +++ b/doc/_toc.yml @@ -109,6 +109,7 @@ chapters: - file: code/scoring/prompt_shield_scorer - file: code/scoring/generic_scorers - file: code/scoring/8_scorer_metrics + - file: code/analytics/1_result_analysis - file: code/memory/0_memory sections: - file: code/memory/1_sqlite_memory diff --git a/doc/code/analytics/1_result_analysis.ipynb b/doc/code/analytics/1_result_analysis.ipynb new file mode 100644 index 0000000000..fd11e500ed --- /dev/null +++ b/doc/code/analytics/1_result_analysis.ipynb @@ -0,0 +1,449 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0", + "metadata": {}, + "source": [ + "# Result Analysis\n", + "\n", + "The `analyze_results` function computes attack success rates from a list of `AttackResult` objects.\n", + "It supports flexible grouping across built-in dimensions (`attack_type`, `converter_type`, `label`)\n", + "as well as composite and custom dimensions." + ] + }, + { + "cell_type": "markdown", + "id": "1", + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "First, let's create some sample `AttackResult` objects to work with." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Created 5 sample AttackResult objects\n" + ] + } + ], + "source": [ + "from pyrit.analytics import analyze_results\n", + "from pyrit.identifiers import ConverterIdentifier\n", + "from pyrit.models import AttackOutcome, AttackResult, MessagePiece\n", + "\n", + "\n", + "def make_converter(name: str) -> ConverterIdentifier:\n", + " return ConverterIdentifier(\n", + " class_name=name,\n", + " class_module=\"pyrit.prompt_converter\",\n", + " class_description=f\"{name} converter\",\n", + " identifier_type=\"instance\",\n", + " supported_input_types=(\"text\",),\n", + " supported_output_types=(\"text\",),\n", + " )\n", + "\n", + "\n", + "# Realistic attack_identifier dicts mirror Strategy.get_identifier() output\n", + "crescendo_id = {\n", + " \"__type__\": \"CrescendoAttack\",\n", + " \"__module__\": \"pyrit.executor.attack.multi_turn.crescendo\",\n", + " \"id\": \"a1b2c3d4-0001-4000-8000-000000000001\",\n", + "}\n", + "red_team_id = {\n", + " \"__type__\": \"RedTeamingAttack\",\n", + " \"__module__\": \"pyrit.executor.attack.multi_turn.red_teaming\",\n", + " \"id\": \"a1b2c3d4-0002-4000-8000-000000000002\",\n", + "}\n", + "\n", + "# Build a small set of representative attack results\n", + "results = [\n", + " # Crescendo attacks with Base64Converter\n", + " AttackResult(\n", + " conversation_id=\"c1\",\n", + " objective=\"bypass safety filter\",\n", + " attack_identifier=crescendo_id,\n", + " outcome=AttackOutcome.SUCCESS,\n", + " last_response=MessagePiece(\n", + " role=\"user\",\n", + " original_value=\"response 1\",\n", + " converter_identifiers=[make_converter(\"Base64Converter\")],\n", + " labels={\"operation_name\": \"op_safety_bypass\", \"operator\": \"alice\"},\n", + " ),\n", + " ),\n", + " AttackResult(\n", + " conversation_id=\"c2\",\n", + " objective=\"bypass safety filter\",\n", + " attack_identifier=crescendo_id,\n", + " outcome=AttackOutcome.FAILURE,\n", + " last_response=MessagePiece(\n", + " role=\"user\",\n", + " original_value=\"response 2\",\n", + " converter_identifiers=[make_converter(\"Base64Converter\")],\n", + " labels={\"operation_name\": \"op_safety_bypass\", \"operator\": \"alice\"},\n", + " ),\n", + " ),\n", + " # Red teaming attacks with ROT13Converter\n", + " AttackResult(\n", + " conversation_id=\"c3\",\n", + " objective=\"extract secrets\",\n", + " attack_identifier=red_team_id,\n", + " outcome=AttackOutcome.SUCCESS,\n", + " last_response=MessagePiece(\n", + " role=\"user\",\n", + " original_value=\"response 3\",\n", + " converter_identifiers=[make_converter(\"ROT13Converter\")],\n", + " labels={\"operation_name\": \"op_secret_extract\", \"operator\": \"bob\"},\n", + " ),\n", + " ),\n", + " AttackResult(\n", + " conversation_id=\"c4\",\n", + " objective=\"extract secrets\",\n", + " attack_identifier=red_team_id,\n", + " outcome=AttackOutcome.SUCCESS,\n", + " last_response=MessagePiece(\n", + " role=\"user\",\n", + " original_value=\"response 4\",\n", + " converter_identifiers=[make_converter(\"ROT13Converter\")],\n", + " labels={\"operation_name\": \"op_secret_extract\", \"operator\": \"bob\"},\n", + " ),\n", + " ),\n", + " # An undetermined result (no converter, no labels)\n", + " AttackResult(\n", + " conversation_id=\"c5\",\n", + " objective=\"test prompt\",\n", + " attack_identifier=crescendo_id,\n", + " outcome=AttackOutcome.UNDETERMINED,\n", + " ),\n", + "]\n", + "\n", + "print(f\"Created {len(results)} sample AttackResult objects\")" + ] + }, + { + "cell_type": "markdown", + "id": "3", + "metadata": {}, + "source": [ + "## Overall Stats (No Grouping)\n", + "\n", + "Pass `group_by=[]` to compute only the overall attack success rate, with no\n", + "dimensional breakdown." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Overall success rate: 0.75\n", + " Successes: 3\n", + " Failures: 1\n", + " Undetermined: 1\n", + " Total decided (excl. undetermined): 4\n" + ] + } + ], + "source": [ + "result = analyze_results(results, group_by=[])\n", + "\n", + "print(f\"Overall success rate: {result.overall.success_rate}\")\n", + "print(f\" Successes: {result.overall.successes}\")\n", + "print(f\" Failures: {result.overall.failures}\")\n", + "print(f\" Undetermined: {result.overall.undetermined}\")\n", + "print(f\" Total decided (excl. undetermined): {result.overall.total_decided}\")" + ] + }, + { + "cell_type": "markdown", + "id": "5", + "metadata": {}, + "source": [ + "## Group by Attack Type\n", + "\n", + "See how success rates differ across attack strategies (e.g. `crescendo` vs `red_teaming`)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " CrescendoAttack: success_rate=0.5, successes=1, failures=1, undetermined=1\n", + " RedTeamingAttack: success_rate=1.0, successes=2, failures=0, undetermined=0\n" + ] + } + ], + "source": [ + "result = analyze_results(results, group_by=[\"attack_type\"])\n", + "\n", + "for attack_type, stats in result.dimensions[\"attack_type\"].items():\n", + " print(\n", + " f\" {attack_type}: success_rate={stats.success_rate}, \"\n", + " f\"successes={stats.successes}, failures={stats.failures}, \"\n", + " f\"undetermined={stats.undetermined}\"\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "7", + "metadata": {}, + "source": [ + "## Group by Converter Type\n", + "\n", + "Break down success rates by which prompt converter was applied." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " Base64Converter: success_rate=0.5, successes=1, failures=1\n", + " ROT13Converter: success_rate=1.0, successes=2, failures=0\n", + " no_converter: success_rate=None, successes=0, failures=0\n" + ] + } + ], + "source": [ + "result = analyze_results(results, group_by=[\"converter_type\"])\n", + "\n", + "for converter, stats in result.dimensions[\"converter_type\"].items():\n", + " print(f\" {converter}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}\")" + ] + }, + { + "cell_type": "markdown", + "id": "9", + "metadata": {}, + "source": [ + "## Group by Label\n", + "\n", + "Labels are key=value metadata attached to messages. Each label pair becomes its own\n", + "grouping key." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " operation_name=op_safety_bypass: success_rate=0.5, successes=1, failures=1\n", + " operator=alice: success_rate=0.5, successes=1, failures=1\n", + " operation_name=op_secret_extract: success_rate=1.0, successes=2, failures=0\n", + " operator=bob: success_rate=1.0, successes=2, failures=0\n", + " no_labels: success_rate=None, successes=0, failures=0\n" + ] + } + ], + "source": [ + "result = analyze_results(results, group_by=[\"label\"])\n", + "\n", + "for label_key, stats in result.dimensions[\"label\"].items():\n", + " print(f\" {label_key}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}\")" + ] + }, + { + "cell_type": "markdown", + "id": "11", + "metadata": {}, + "source": [ + "## Multiple Dimensions at Once\n", + "\n", + "Pass several dimension names to `group_by` for independent breakdowns in a single call." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "12", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "--- By attack_type ---\n", + " CrescendoAttack: success_rate=0.5\n", + " RedTeamingAttack: success_rate=1.0\n", + "\n", + "--- By converter_type ---\n", + " Base64Converter: success_rate=0.5\n", + " ROT13Converter: success_rate=1.0\n", + " no_converter: success_rate=None\n" + ] + } + ], + "source": [ + "result = analyze_results(results, group_by=[\"attack_type\", \"converter_type\"])\n", + "\n", + "print(\"--- By attack_type ---\")\n", + "for key, stats in result.dimensions[\"attack_type\"].items():\n", + " print(f\" {key}: success_rate={stats.success_rate}\")\n", + "\n", + "print(\"\\n--- By converter_type ---\")\n", + "for key, stats in result.dimensions[\"converter_type\"].items():\n", + " print(f\" {key}: success_rate={stats.success_rate}\")" + ] + }, + { + "cell_type": "markdown", + "id": "13", + "metadata": {}, + "source": [ + "## Composite Dimensions\n", + "\n", + "Use a tuple of dimension names to create a cross-product grouping. For example,\n", + "`(\"converter_type\", \"attack_type\")` produces keys like `(\"Base64Converter\", \"crescendo\")`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "14", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " ('Base64Converter', 'CrescendoAttack'): success_rate=0.5, successes=1, failures=1\n", + " ('ROT13Converter', 'RedTeamingAttack'): success_rate=1.0, successes=2, failures=0\n", + " ('no_converter', 'CrescendoAttack'): success_rate=None, successes=0, failures=0\n" + ] + } + ], + "source": [ + "result = analyze_results(results, group_by=[(\"converter_type\", \"attack_type\")])\n", + "\n", + "for combo_key, stats in result.dimensions[(\"converter_type\", \"attack_type\")].items():\n", + " print(f\" {combo_key}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}\")" + ] + }, + { + "cell_type": "markdown", + "id": "15", + "metadata": {}, + "source": [ + "## Custom Dimensions\n", + "\n", + "Supply your own extractor function via `custom_dimensions`. An extractor takes an\n", + "`AttackResult` and returns a `list[str]` of dimension values. Here we group by the\n", + "attack objective." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "16", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " bypass safety filter: success_rate=0.5, successes=1, failures=1\n", + " extract secrets: success_rate=1.0, successes=2, failures=0\n", + " test prompt: success_rate=None, successes=0, failures=0\n" + ] + } + ], + "source": [ + "def extract_objective(attack: AttackResult) -> list[str]:\n", + " return [attack.objective]\n", + "\n", + "\n", + "result = analyze_results(\n", + " results,\n", + " group_by=[\"objective\"],\n", + " custom_dimensions={\"objective\": extract_objective},\n", + ")\n", + "\n", + "for objective, stats in result.dimensions[\"objective\"].items():\n", + " print(f\" {objective}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}\")" + ] + }, + { + "cell_type": "markdown", + "id": "17", + "metadata": {}, + "source": [ + "## Default Behavior\n", + "\n", + "When `group_by` is omitted, `analyze_results` groups by **all** registered\n", + "dimensions: `attack_type`, `converter_type`, and `label`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "18", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Dimensions returned: ['attack_type', 'converter_type', 'label']\n", + "Overall success rate: 0.75\n" + ] + } + ], + "source": [ + "result = analyze_results(results)\n", + "\n", + "print(f\"Dimensions returned: {list(result.dimensions.keys())}\")\n", + "print(f\"Overall success rate: {result.overall.success_rate}\")" + ] + } + ], + "metadata": { + "jupytext": { + "cell_metadata_filter": "-all" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.14" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/doc/code/analytics/1_result_analysis.py b/doc/code/analytics/1_result_analysis.py new file mode 100644 index 0000000000..d6939dcc37 --- /dev/null +++ b/doc/code/analytics/1_result_analysis.py @@ -0,0 +1,234 @@ +# --- +# jupyter: +# jupytext: +# cell_metadata_filter: -all +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# jupytext_version: 1.19.0 +# kernelspec: +# display_name: pyrit-dev +# language: python +# name: python3 +# --- + +# %% [markdown] +# # Result Analysis +# +# The `analyze_results` function computes attack success rates from a list of `AttackResult` objects. +# It supports flexible grouping across built-in dimensions (`attack_type`, `converter_type`, `label`) +# as well as composite and custom dimensions. + +# %% [markdown] +# ## Setup +# +# First, let's create some sample `AttackResult` objects to work with. + +# %% +from pyrit.analytics import analyze_results +from pyrit.identifiers import ConverterIdentifier +from pyrit.models import AttackOutcome, AttackResult, MessagePiece + + +def make_converter(name: str) -> ConverterIdentifier: + return ConverterIdentifier( + class_name=name, + class_module="pyrit.prompt_converter", + class_description=f"{name} converter", + identifier_type="instance", + supported_input_types=("text",), + supported_output_types=("text",), + ) + + +# Realistic attack_identifier dicts mirror Strategy.get_identifier() output +crescendo_id = { + "__type__": "CrescendoAttack", + "__module__": "pyrit.executor.attack.multi_turn.crescendo", + "id": "a1b2c3d4-0001-4000-8000-000000000001", +} +red_team_id = { + "__type__": "RedTeamingAttack", + "__module__": "pyrit.executor.attack.multi_turn.red_teaming", + "id": "a1b2c3d4-0002-4000-8000-000000000002", +} + +# Build a small set of representative attack results +results = [ + # Crescendo attacks with Base64Converter + AttackResult( + conversation_id="c1", + objective="bypass safety filter", + attack_identifier=crescendo_id, + outcome=AttackOutcome.SUCCESS, + last_response=MessagePiece( + role="user", + original_value="response 1", + converter_identifiers=[make_converter("Base64Converter")], + labels={"operation_name": "op_safety_bypass", "operator": "alice"}, + ), + ), + AttackResult( + conversation_id="c2", + objective="bypass safety filter", + attack_identifier=crescendo_id, + outcome=AttackOutcome.FAILURE, + last_response=MessagePiece( + role="user", + original_value="response 2", + converter_identifiers=[make_converter("Base64Converter")], + labels={"operation_name": "op_safety_bypass", "operator": "alice"}, + ), + ), + # Red teaming attacks with ROT13Converter + AttackResult( + conversation_id="c3", + objective="extract secrets", + attack_identifier=red_team_id, + outcome=AttackOutcome.SUCCESS, + last_response=MessagePiece( + role="user", + original_value="response 3", + converter_identifiers=[make_converter("ROT13Converter")], + labels={"operation_name": "op_secret_extract", "operator": "bob"}, + ), + ), + AttackResult( + conversation_id="c4", + objective="extract secrets", + attack_identifier=red_team_id, + outcome=AttackOutcome.SUCCESS, + last_response=MessagePiece( + role="user", + original_value="response 4", + converter_identifiers=[make_converter("ROT13Converter")], + labels={"operation_name": "op_secret_extract", "operator": "bob"}, + ), + ), + # An undetermined result (no converter, no labels) + AttackResult( + conversation_id="c5", + objective="test prompt", + attack_identifier=crescendo_id, + outcome=AttackOutcome.UNDETERMINED, + ), +] + +print(f"Created {len(results)} sample AttackResult objects") + +# %% [markdown] +# ## Overall Stats (No Grouping) +# +# Pass `group_by=[]` to compute only the overall attack success rate, with no +# dimensional breakdown. + +# %% +result = analyze_results(results, group_by=[]) + +print(f"Overall success rate: {result.overall.success_rate}") +print(f" Successes: {result.overall.successes}") +print(f" Failures: {result.overall.failures}") +print(f" Undetermined: {result.overall.undetermined}") +print(f" Total decided (excl. undetermined): {result.overall.total_decided}") + +# %% [markdown] +# ## Group by Attack Type +# +# See how success rates differ across attack strategies (e.g. `crescendo` vs `red_teaming`). + +# %% +result = analyze_results(results, group_by=["attack_type"]) + +for attack_type, stats in result.dimensions["attack_type"].items(): + print( + f" {attack_type}: success_rate={stats.success_rate}, " + f"successes={stats.successes}, failures={stats.failures}, " + f"undetermined={stats.undetermined}" + ) + +# %% [markdown] +# ## Group by Converter Type +# +# Break down success rates by which prompt converter was applied. + +# %% +result = analyze_results(results, group_by=["converter_type"]) + +for converter, stats in result.dimensions["converter_type"].items(): + print(f" {converter}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}") + +# %% [markdown] +# ## Group by Label +# +# Labels are key=value metadata attached to messages. Each label pair becomes its own +# grouping key. + +# %% +result = analyze_results(results, group_by=["label"]) + +for label_key, stats in result.dimensions["label"].items(): + print(f" {label_key}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}") + +# %% [markdown] +# ## Multiple Dimensions at Once +# +# Pass several dimension names to `group_by` for independent breakdowns in a single call. + +# %% +result = analyze_results(results, group_by=["attack_type", "converter_type"]) + +print("--- By attack_type ---") +for key, stats in result.dimensions["attack_type"].items(): + print(f" {key}: success_rate={stats.success_rate}") + +print("\n--- By converter_type ---") +for key, stats in result.dimensions["converter_type"].items(): + print(f" {key}: success_rate={stats.success_rate}") + +# %% [markdown] +# ## Composite Dimensions +# +# Use a tuple of dimension names to create a cross-product grouping. For example, +# `("converter_type", "attack_type")` produces keys like `("Base64Converter", "crescendo")`. + +# %% +result = analyze_results(results, group_by=[("converter_type", "attack_type")]) + +for combo_key, stats in result.dimensions[("converter_type", "attack_type")].items(): + print(f" {combo_key}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}") + +# %% [markdown] +# ## Custom Dimensions +# +# Supply your own extractor function via `custom_dimensions`. An extractor takes an +# `AttackResult` and returns a `list[str]` of dimension values. Here we group by the +# attack objective. + +# %% + + +def extract_objective(attack: AttackResult) -> list[str]: + return [attack.objective] + + +result = analyze_results( + results, + group_by=["objective"], + custom_dimensions={"objective": extract_objective}, +) + +for objective, stats in result.dimensions["objective"].items(): + print(f" {objective}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}") + +# %% [markdown] +# ## Default Behavior +# +# When `group_by` is omitted, `analyze_results` groups by **all** registered +# dimensions: `attack_type`, `converter_type`, and `label`. + +# %% +result = analyze_results(results) + +print(f"Dimensions returned: {list(result.dimensions.keys())}") +print(f"Overall success rate: {result.overall.success_rate}") diff --git a/pyrit/analytics/__init__.py b/pyrit/analytics/__init__.py index f75d401dd7..3e5d80ff3d 100644 --- a/pyrit/analytics/__init__.py +++ b/pyrit/analytics/__init__.py @@ -4,7 +4,12 @@ """Analytics module for PyRIT conversation and result analysis.""" from pyrit.analytics.conversation_analytics import ConversationAnalytics -from pyrit.analytics.result_analysis import AttackStats, analyze_results +from pyrit.analytics.result_analysis import ( + AnalysisResult, + AttackStats, + DimensionExtractor, + analyze_results, +) from pyrit.analytics.text_matching import ( ApproximateTextMatching, ExactTextMatching, @@ -13,9 +18,11 @@ __all__ = [ "analyze_results", + "AnalysisResult", "ApproximateTextMatching", "AttackStats", "ConversationAnalytics", + "DimensionExtractor", "ExactTextMatching", "TextMatching", ] diff --git a/pyrit/analytics/result_analysis.py b/pyrit/analytics/result_analysis.py index 63a6dadcff..add1c038a5 100644 --- a/pyrit/analytics/result_analysis.py +++ b/pyrit/analytics/result_analysis.py @@ -1,13 +1,25 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. +import warnings from collections import defaultdict -from dataclasses import dataclass -from typing import DefaultDict, Optional +from dataclasses import dataclass, field +from itertools import product +from typing import Callable, DefaultDict, Optional, Union from pyrit.models import AttackOutcome, AttackResult +# --------------------------------------------------------------------------- +# Type alias for dimension extractors. +# An extractor receives an AttackResult and returns a list of string keys +# (list to support one-to-many mappings, e.g. multiple converters per attack). +# --------------------------------------------------------------------------- +DimensionExtractor = Callable[[AttackResult], list[str]] + +# --------------------------------------------------------------------------- +# Data classes +# --------------------------------------------------------------------------- @dataclass class AttackStats: """Statistics for attack analysis results.""" @@ -19,7 +31,102 @@ class AttackStats: undetermined: int -def _compute_stats(successes: int, failures: int, undetermined: int) -> AttackStats: +@dataclass +class AnalysisResult: + """ + Structured result from attack analysis. + + Attributes: + overall (AttackStats): Aggregate stats across all attack results. + dimensions (dict): Per-dimension breakdown. Keys are dimension names + (str) for single dimensions, or tuples of dimension names for + composite groupings. Values map dimension keys to AttackStats. + """ + + overall: AttackStats + dimensions: dict[Union[str, tuple[str, ...]], dict[Union[str, tuple[str, ...]], AttackStats]] = field( + default_factory=dict + ) + + +# --------------------------------------------------------------------------- +# Built-in dimension extractors +# --------------------------------------------------------------------------- +def _extract_attack_type(result: AttackResult) -> list[str]: + """ + Extract the attack type from the attack identifier dict. + + Reads the ``__type__`` key populated by :meth:`Strategy.get_identifier`. + + Returns: + list[str]: A single-element list containing the attack type. + """ + return [result.attack_identifier.get("__type__", "unknown")] + + +def _extract_converter_types(result: AttackResult) -> list[str]: + """ + Extract converter class names from the last response. + + Returns: + list[str]: Converter class names, or ``["no_converter"]`` if none. + """ + if result.last_response is not None and result.last_response.converter_identifiers: + return [conv.class_name for conv in result.last_response.converter_identifiers] + return ["no_converter"] + + +def _extract_labels(result: AttackResult) -> list[str]: + """ + Extract label key=value pairs from the last response. + + Returns: + list[str]: Label strings as ``"key=value"``, or ``["no_labels"]`` if none. + """ + if result.last_response is not None and result.last_response.labels: + return [f"{k}={v}" for k, v in result.last_response.labels.items()] + return ["no_labels"] + + +DEFAULT_DIMENSIONS: dict[str, DimensionExtractor] = { + "attack_type": _extract_attack_type, + "converter_type": _extract_converter_types, + "label": _extract_labels, +} + +# Deprecated aliases — maps old name to canonical name. +# Using the old name emits a DeprecationWarning. +_DEPRECATED_DIMENSION_ALIASES: dict[str, str] = { + "attack_identifier": "attack_type", +} + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- +_OUTCOME_KEYS: dict[AttackOutcome, str] = { + AttackOutcome.SUCCESS: "successes", + AttackOutcome.FAILURE: "failures", +} + + +def _outcome_key(outcome: AttackOutcome) -> str: + """ + Map an AttackOutcome to its counter key. + + Returns: + str: The counter key (``"successes"``, ``"failures"``, or ``"undetermined"``). + """ + return _OUTCOME_KEYS.get(outcome, "undetermined") + + +def _compute_stats(*, successes: int, failures: int, undetermined: int) -> AttackStats: + """ + Compute AttackStats from raw counts. + + Returns: + AttackStats: The computed statistics. + """ total_decided = successes + failures success_rate = successes / total_decided if total_decided > 0 else None return AttackStats( @@ -31,93 +138,165 @@ def _compute_stats(successes: int, failures: int, undetermined: int) -> AttackSt ) -def analyze_results(attack_results: list[AttackResult]) -> dict[str, AttackStats | dict[str, AttackStats]]: +def _build_stats(counts: DefaultDict[str, int]) -> AttackStats: """ - Analyze a list of AttackResult objects and return overall and grouped statistics. + Build AttackStats from a counter dict. Returns: - A dictionary of AttackStats objects. The overall stats are accessible with the key - "Overall", and the stats of any attack can be retrieved using "By_attack_identifier" - followed by the identifier of the attack. Stats grouped by converter type can be - retrieved using "By_converter_type". + AttackStats: The computed statistics. + """ + return _compute_stats( + successes=counts["successes"], + failures=counts["failures"], + undetermined=counts["undetermined"], + ) + + +def _resolve_dimension_name(*, name: str, extractors: dict[str, DimensionExtractor]) -> str: + """ + Resolve a single dimension name, handling deprecated aliases. + + Returns: + str: The canonical dimension name. Raises: - ValueError: if attack_results is empty. - TypeError: if any element is not an AttackResult. + ValueError: If the dimension name is unknown. + """ + if name in extractors: + return name + canonical = _DEPRECATED_DIMENSION_ALIASES.get(name) + if canonical and canonical in extractors: + warnings.warn( + f"Dimension '{name}' is deprecated and will be removed in v0.13.0. Use '{canonical}' instead.", + DeprecationWarning, + stacklevel=4, + ) + return canonical + raise ValueError(f"Unknown dimension '{name}'. Available: {sorted(extractors.keys())}") - Example: - >>> analyze_results(attack_results) - { - "Overall": AttackStats, - "By_attack_identifier": dict[str, AttackStats], - "By_converter_type": dict[str, AttackStats] - } + +def _resolve_dimension_spec( + *, spec: Union[str, tuple[str, ...]], extractors: dict[str, DimensionExtractor] +) -> Union[str, tuple[str, ...]]: + """ + Resolve a group_by spec (single or composite), handling deprecated aliases. + + Returns: + Union[str, tuple[str, ...]]: The resolved spec with canonical dimension names. + """ + if isinstance(spec, str): + return _resolve_dimension_name(name=spec, extractors=extractors) + return tuple(_resolve_dimension_name(name=n, extractors=extractors) for n in spec) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- +def analyze_results( + attack_results: list[AttackResult], + *, + group_by: list[Union[str, tuple[str, ...]]] | None = None, + custom_dimensions: dict[str, DimensionExtractor] | None = None, +) -> AnalysisResult: + """ + Analyze attack results with flexible, dimension-based grouping. + + Computes overall stats and breaks down results by one or more dimensions. + Dimensions can be single (e.g. ``"converter_type"``) or composite tuples + (e.g. ``("converter_type", "attack_type")``) for cross-dimensional + grouping. + + Args: + attack_results (list[AttackResult]): The attack results to analyze. + group_by (list[str | tuple[str, ...]] | None): Dimensions to group by. + Each element is either a dimension name (str) for independent + grouping, or a tuple of dimension names for composite grouping. + Defaults to all registered single dimensions. + custom_dimensions (dict[str, DimensionExtractor] | None): Additional + or overriding dimension extractors keyed by name. Merged with + built-in defaults. + + Returns: + AnalysisResult: Overall stats and per-dimension breakdowns. + + Raises: + ValueError: If attack_results is empty or a dimension name is unknown. + TypeError: If any element is not an AttackResult. + + Examples: + Group by a single built-in dimension:: + + result = analyze_results(attacks, group_by=["attack_type"]) + for name, stats in result.dimensions["attack_type"].items(): + print(f"{name}: {stats.success_rate}") + + Group by a composite (cross-product) of two dimensions:: + + result = analyze_results( + attacks, + group_by=[("converter_type", "attack_type")], + ) + + Supply a custom dimension extractor:: + + def by_objective(r: AttackResult) -> list[str]: + return [r.objective] + + result = analyze_results( + attacks, + group_by=["objective"], + custom_dimensions={"objective": by_objective}, + ) """ if not attack_results: raise ValueError("attack_results cannot be empty") + # Merge extractors + extractors = dict(DEFAULT_DIMENSIONS) + if custom_dimensions: + extractors.update(custom_dimensions) + + # Resolve group_by — default to every registered dimension independently + if group_by is None: + group_by = list(extractors.keys()) + + # Resolve deprecated aliases and validate dimension names + resolved_group_by: list[Union[str, tuple[str, ...]]] = [] + for spec in group_by: + resolved_group_by.append(_resolve_dimension_spec(spec=spec, extractors=extractors)) + group_by = resolved_group_by + + # Accumulators overall_counts: DefaultDict[str, int] = defaultdict(int) - by_type_counts: DefaultDict[str, DefaultDict[str, int]] = defaultdict(lambda: defaultdict(int)) - by_converter_counts: DefaultDict[str, DefaultDict[str, int]] = defaultdict(lambda: defaultdict(int)) + dim_counts: dict[ + Union[str, tuple[str, ...]], + DefaultDict[Union[str, tuple[str, ...]], DefaultDict[str, int]], + ] = {spec: defaultdict(lambda: defaultdict(int)) for spec in group_by} + # Single pass over results for attack in attack_results: if not isinstance(attack, AttackResult): raise TypeError(f"Expected AttackResult, got {type(attack).__name__}: {attack!r}") - outcome = attack.outcome - attack_type = attack.attack_identifier.get("type", "unknown") - - # Extract converter types from last_response - converter_types = [] - if attack.last_response is not None and attack.last_response.converter_identifiers: - converter_types = [conv.class_name for conv in attack.last_response.converter_identifiers] - - # If no converters, track as "no_converter" - if not converter_types: - converter_types = ["no_converter"] - - if outcome == AttackOutcome.SUCCESS: - overall_counts["successes"] += 1 - by_type_counts[attack_type]["successes"] += 1 - for converter_type in converter_types: - by_converter_counts[converter_type]["successes"] += 1 - elif outcome == AttackOutcome.FAILURE: - overall_counts["failures"] += 1 - by_type_counts[attack_type]["failures"] += 1 - for converter_type in converter_types: - by_converter_counts[converter_type]["failures"] += 1 - else: - overall_counts["undetermined"] += 1 - by_type_counts[attack_type]["undetermined"] += 1 - for converter_type in converter_types: - by_converter_counts[converter_type]["undetermined"] += 1 - - overall_stats = _compute_stats( - successes=overall_counts["successes"], - failures=overall_counts["failures"], - undetermined=overall_counts["undetermined"], - ) + key = _outcome_key(attack.outcome) + overall_counts[key] += 1 - by_type_stats = { - attack_type: _compute_stats( - successes=counts["successes"], - failures=counts["failures"], - undetermined=counts["undetermined"], - ) - for attack_type, counts in by_type_counts.items() - } - - by_converter_stats = { - converter_type: _compute_stats( - successes=counts["successes"], - failures=counts["failures"], - undetermined=counts["undetermined"], - ) - for converter_type, counts in by_converter_counts.items() - } - - return { - "Overall": overall_stats, - "By_attack_identifier": by_type_stats, - "By_converter_type": by_converter_stats, - } + for spec in group_by: + if isinstance(spec, str): + for dim_value in extractors[spec](attack): + dim_counts[spec][dim_value][key] += 1 + else: + # Composite: cross-product of all sub-dimension values + sub_values = [extractors[name](attack) for name in spec] + for combo in product(*sub_values): + dim_counts[spec][combo][key] += 1 + + # Build result + dimension_stats: dict[Union[str, tuple[str, ...]], dict[Union[str, tuple[str, ...]], AttackStats]] = {} + for spec, counts_by_key in dim_counts.items(): + dimension_stats[spec] = {dim_key: _build_stats(counts) for dim_key, counts in counts_by_key.items()} + + return AnalysisResult( + overall=_build_stats(overall_counts), + dimensions=dimension_stats, + ) diff --git a/tests/unit/analytics/test_result_analysis.py b/tests/unit/analytics/test_result_analysis.py index 4bd9e7ca4b..b9ab0def34 100644 --- a/tests/unit/analytics/test_result_analysis.py +++ b/tests/unit/analytics/test_result_analysis.py @@ -1,25 +1,33 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. +import warnings + import pytest -from pyrit.analytics.result_analysis import AttackStats, analyze_results +from pyrit.analytics.result_analysis import ( + AnalysisResult, + AttackStats, + analyze_results, +) from pyrit.identifiers import ConverterIdentifier from pyrit.models import AttackOutcome, AttackResult, MessagePiece -# helpers +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- def make_attack( outcome: AttackOutcome, - attack_type: str | None = "default", + attack_type: str | None = "PromptSendingAttack", conversation_id: str = "conv-1", ) -> AttackResult: - """ - Minimal valid AttackResult for analytics tests. - """ + """Minimal valid AttackResult for analytics tests.""" attack_identifier: dict[str, str] = {} if attack_type is not None: - attack_identifier["type"] = attack_type + attack_identifier["__type__"] = attack_type + attack_identifier["__module__"] = "pyrit.executor.attack" + attack_identifier["id"] = "00000000-0000-0000-0000-000000000001" return AttackResult( conversation_id=conversation_id, @@ -33,9 +41,7 @@ def make_converter( class_name: str, class_module: str = "pyrit.prompt_converter.test_converter", ) -> ConverterIdentifier: - """ - Create a test ConverterIdentifier with minimal required fields. - """ + """Create a test ConverterIdentifier with minimal required fields.""" return ConverterIdentifier( class_name=class_name, class_module=class_module, @@ -46,282 +52,488 @@ def make_converter( ) - -def test_analyze_results_empty_raises(): - with pytest.raises(ValueError): - analyze_results([]) - - -def test_analyze_results_raises_on_invalid_object(): - with pytest.raises(TypeError): - analyze_results(["not-an-AttackResult"]) - - -@pytest.mark.parametrize( - "outcomes, expected_successes, expected_failures, expected_undetermined, expected_rate", - [ - # all successes - ([AttackOutcome.SUCCESS, AttackOutcome.SUCCESS], 2, 0, 0, 1.0), - # all failures - ([AttackOutcome.FAILURE, AttackOutcome.FAILURE], 0, 2, 0, 0.0), - # mixed decided - ([AttackOutcome.SUCCESS, AttackOutcome.FAILURE], 1, 1, 0, 0.5), - # include undetermined (excluded from denominator) - ([AttackOutcome.SUCCESS, AttackOutcome.UNDETERMINED], 1, 0, 1, 1.0), - ([AttackOutcome.FAILURE, AttackOutcome.UNDETERMINED], 0, 1, 1, 0.0), - # multiple with undetermined - ( - [AttackOutcome.SUCCESS, AttackOutcome.FAILURE, AttackOutcome.UNDETERMINED], - 1, - 1, - 1, - 0.5, - ), - ], -) -def test_overall_success_rate_parametrized( - outcomes, expected_successes, expected_failures, expected_undetermined, expected_rate -): - attacks = [make_attack(o) for o in outcomes] - result = analyze_results(attacks) - - assert isinstance(result["Overall"], AttackStats) - overall = result["Overall"] - assert overall.successes == expected_successes - assert overall.failures == expected_failures - assert overall.undetermined == expected_undetermined - assert overall.total_decided == expected_successes + expected_failures - assert overall.success_rate == expected_rate - - -@pytest.mark.parametrize( - "items, type_key, exp_succ, exp_fail, exp_und, exp_rate", - [ - # single type, mixed decided + undetermined - ( - [ - (AttackOutcome.SUCCESS, "crescendo"), - (AttackOutcome.FAILURE, "crescendo"), - (AttackOutcome.UNDETERMINED, "crescendo"), - ], - "crescendo", - 1, - 1, - 1, - 0.5, - ), - # two types with different balances - ( - [ - (AttackOutcome.SUCCESS, "crescendo"), - (AttackOutcome.FAILURE, "crescendo"), - (AttackOutcome.SUCCESS, "red_teaming"), - (AttackOutcome.FAILURE, "red_teaming"), - (AttackOutcome.SUCCESS, "red_teaming"), - ], - "red_teaming", - 2, - 1, - 0, - 2 / 3, - ), - # unknown type fallback (missing "type" key) - ( - [ - (AttackOutcome.FAILURE, None), - (AttackOutcome.UNDETERMINED, None), - (AttackOutcome.SUCCESS, None), - ], - "unknown", - 1, - 1, - 1, - 0.5, - ), - ], -) -def test_group_by_attack_type_parametrized(items, type_key, exp_succ, exp_fail, exp_und, exp_rate): - attacks = [make_attack(outcome=o, attack_type=t) for (o, t) in items] - result = analyze_results(attacks) - - assert type_key in result["By_attack_identifier"] - stats = result["By_attack_identifier"][type_key] - assert isinstance(stats, AttackStats) - assert stats.successes == exp_succ - assert stats.failures == exp_fail - assert stats.undetermined == exp_und - assert stats.total_decided == exp_succ + exp_fail - assert stats.success_rate == exp_rate - - -def test_analyze_results_returns_by_converter_type(): - """Test that analyze_results returns By_converter_type key.""" - attacks = [make_attack(AttackOutcome.SUCCESS)] - result = analyze_results(attacks) - - assert "By_converter_type" in result - assert isinstance(result["By_converter_type"], dict) - - -def test_analyze_results_no_converter_tracking(): - """Test that attacks without converters are tracked as 'no_converter'.""" - attacks = [ - AttackResult( - conversation_id="conv-1", - objective="test", - attack_identifier={"type": "test"}, - outcome=AttackOutcome.SUCCESS, - last_response=None, # No response, so no converters - ), - AttackResult( - conversation_id="conv-2", - objective="test", - attack_identifier={"type": "test"}, - outcome=AttackOutcome.FAILURE, - last_response=None, - ), - ] - result = analyze_results(attacks) - - assert "no_converter" in result["By_converter_type"] - stats = result["By_converter_type"]["no_converter"] - assert stats.successes == 1 - assert stats.failures == 1 - assert stats.total_decided == 2 - assert stats.success_rate == 0.5 - - -def test_analyze_results_with_converter_identifiers(): - """Test that attacks with converters are properly grouped by converter type.""" - # Create attacks with different converters - converter1 = make_converter("Base64Converter", "pyrit.prompt_converter.base64_converter") - converter2 = make_converter("ROT13Converter", "pyrit.prompt_converter.rot13_converter") - - message1 = MessagePiece( +def make_attack_with_converters( + outcome: AttackOutcome, + converter_names: list[str], + attack_type: str = "test", + conversation_id: str = "conv-1", +) -> AttackResult: + """Create an AttackResult with converter identifiers on last_response.""" + converters = [make_converter(name) for name in converter_names] + message = MessagePiece( role="user", original_value="test", - converter_identifiers=[converter1], + converter_identifiers=converters, ) - - message2 = MessagePiece( - role="user", - original_value="test", - converter_identifiers=[converter2], + attack_identifier: dict[str, str] = { + "__type__": attack_type, + "__module__": "pyrit.executor.attack", + "id": "00000000-0000-0000-0000-000000000001", + } + return AttackResult( + conversation_id=conversation_id, + objective="test", + attack_identifier=attack_identifier, + outcome=outcome, + last_response=message, ) - message3 = MessagePiece( - role="user", - original_value="test", - converter_identifiers=[converter1], - ) - attacks = [ - AttackResult( - conversation_id="conv-1", - objective="test", - attack_identifier={"type": "test"}, - outcome=AttackOutcome.SUCCESS, - last_response=message1, - ), - AttackResult( - conversation_id="conv-2", - objective="test", - attack_identifier={"type": "test"}, - outcome=AttackOutcome.FAILURE, - last_response=message2, - ), - AttackResult( - conversation_id="conv-3", - objective="test", - attack_identifier={"type": "test"}, - outcome=AttackOutcome.SUCCESS, - last_response=message3, - ), - ] - - result = analyze_results(attacks) - - # Check Base64Converter stats - assert "Base64Converter" in result["By_converter_type"] - base64_stats = result["By_converter_type"]["Base64Converter"] - assert base64_stats.successes == 2 - assert base64_stats.failures == 0 - assert base64_stats.total_decided == 2 - assert base64_stats.success_rate == 1.0 - - # Check ROT13Converter stats - assert "ROT13Converter" in result["By_converter_type"] - rot13_stats = result["By_converter_type"]["ROT13Converter"] - assert rot13_stats.successes == 0 - assert rot13_stats.failures == 1 - assert rot13_stats.total_decided == 1 - assert rot13_stats.success_rate == 0.0 - - -def test_analyze_results_multiple_converters_per_attack(): - """Test that attacks with multiple converters count towards each converter's stats.""" - converter1 = make_converter("Base64Converter", "pyrit.prompt_converter.base64_converter") - converter2 = make_converter("ROT13Converter", "pyrit.prompt_converter.rot13_converter") - - # Attack with multiple converters (pipeline) - message = MessagePiece( - role="user", - original_value="test", - converter_identifiers=[converter1, converter2], +# --------------------------------------------------------------------------- +# Validation +# --------------------------------------------------------------------------- +class TestAnalyzeResultsValidation: + """Input validation for analyze_results.""" + + def test_empty_raises(self): + with pytest.raises(ValueError, match="cannot be empty"): + analyze_results([]) + + def test_invalid_object_raises(self): + with pytest.raises(TypeError, match="Expected AttackResult"): + analyze_results(["not-an-AttackResult"]) + + def test_unknown_dimension_raises(self): + attacks = [make_attack(AttackOutcome.SUCCESS)] + with pytest.raises(ValueError, match="Unknown dimension 'nonexistent'"): + analyze_results(attacks, group_by=["nonexistent"]) + + def test_unknown_dimension_in_composite_raises(self): + attacks = [make_attack(AttackOutcome.SUCCESS)] + with pytest.raises(ValueError, match="Unknown dimension 'bad_dim'"): + analyze_results(attacks, group_by=[("attack_type", "bad_dim")]) + + +# --------------------------------------------------------------------------- +# Overall stats +# --------------------------------------------------------------------------- +class TestOverallStats: + """Overall stats computation (no dimension breakdown).""" + + @pytest.mark.parametrize( + "outcomes, expected_successes, expected_failures, expected_undetermined, expected_rate", + [ + ([AttackOutcome.SUCCESS, AttackOutcome.SUCCESS], 2, 0, 0, 1.0), + ([AttackOutcome.FAILURE, AttackOutcome.FAILURE], 0, 2, 0, 0.0), + ([AttackOutcome.SUCCESS, AttackOutcome.FAILURE], 1, 1, 0, 0.5), + ([AttackOutcome.SUCCESS, AttackOutcome.UNDETERMINED], 1, 0, 1, 1.0), + ([AttackOutcome.FAILURE, AttackOutcome.UNDETERMINED], 0, 1, 1, 0.0), + ( + [AttackOutcome.SUCCESS, AttackOutcome.FAILURE, AttackOutcome.UNDETERMINED], + 1, + 1, + 1, + 0.5, + ), + ], ) - - attacks = [ - AttackResult( - conversation_id="conv-1", - objective="test", - attack_identifier={"type": "test"}, - outcome=AttackOutcome.SUCCESS, - last_response=message, - ), - ] - - result = analyze_results(attacks) - - # Both converters should have the success counted - assert "Base64Converter" in result["By_converter_type"] - assert result["By_converter_type"]["Base64Converter"].successes == 1 - assert "ROT13Converter" in result["By_converter_type"] - assert result["By_converter_type"]["ROT13Converter"].successes == 1 - - -def test_analyze_results_converter_with_undetermined(): - """Test that undetermined outcomes are tracked correctly for converters.""" - converter = make_converter("Base64Converter", "pyrit.prompt_converter.base64_converter") - - message = MessagePiece( - role="user", - original_value="test", - converter_identifiers=[converter], + def test_overall_stats(self, outcomes, expected_successes, expected_failures, expected_undetermined, expected_rate): + attacks = [make_attack(o) for o in outcomes] + result = analyze_results(attacks, group_by=[]) + + assert isinstance(result, AnalysisResult) + overall = result.overall + assert overall.successes == expected_successes + assert overall.failures == expected_failures + assert overall.undetermined == expected_undetermined + assert overall.total_decided == expected_successes + expected_failures + assert overall.success_rate == expected_rate + + def test_all_undetermined_gives_none_rate(self): + attacks = [make_attack(AttackOutcome.UNDETERMINED)] + result = analyze_results(attacks, group_by=[]) + assert result.overall.success_rate is None + assert result.overall.total_decided == 0 + + +# --------------------------------------------------------------------------- +# Single dimension: attack_identifier +# --------------------------------------------------------------------------- +class TestGroupByAttackType: + """Group-by a single dimension: attack_type.""" + + @pytest.mark.parametrize( + "items, type_key, exp_succ, exp_fail, exp_und, exp_rate", + [ + ( + [ + (AttackOutcome.SUCCESS, "CrescendoAttack"), + (AttackOutcome.FAILURE, "CrescendoAttack"), + (AttackOutcome.UNDETERMINED, "CrescendoAttack"), + ], + "CrescendoAttack", + 1, + 1, + 1, + 0.5, + ), + ( + [ + (AttackOutcome.SUCCESS, "CrescendoAttack"), + (AttackOutcome.FAILURE, "CrescendoAttack"), + (AttackOutcome.SUCCESS, "RedTeamingAttack"), + (AttackOutcome.FAILURE, "RedTeamingAttack"), + (AttackOutcome.SUCCESS, "RedTeamingAttack"), + ], + "RedTeamingAttack", + 2, + 1, + 0, + 2 / 3, + ), + ( + [ + (AttackOutcome.FAILURE, None), + (AttackOutcome.UNDETERMINED, None), + (AttackOutcome.SUCCESS, None), + ], + "unknown", + 1, + 1, + 1, + 0.5, + ), + ], ) - - attacks = [ - AttackResult( - conversation_id="conv-1", - objective="test", - attack_identifier={"type": "test"}, - outcome=AttackOutcome.SUCCESS, - last_response=message, - ), - AttackResult( - conversation_id="conv-2", - objective="test", - attack_identifier={"type": "test"}, - outcome=AttackOutcome.UNDETERMINED, - last_response=message, - ), - ] - - result = analyze_results(attacks) - - assert "Base64Converter" in result["By_converter_type"] - stats = result["By_converter_type"]["Base64Converter"] - assert stats.successes == 1 - assert stats.failures == 0 - assert stats.undetermined == 1 - assert stats.total_decided == 1 - assert stats.success_rate == 1.0 + def test_single_dimension(self, items, type_key, exp_succ, exp_fail, exp_und, exp_rate): + attacks = [make_attack(outcome=o, attack_type=t) for (o, t) in items] + result = analyze_results(attacks, group_by=["attack_type"]) + + assert "attack_type" in result.dimensions + stats = result.dimensions["attack_type"][type_key] + assert isinstance(stats, AttackStats) + assert stats.successes == exp_succ + assert stats.failures == exp_fail + assert stats.undetermined == exp_und + assert stats.total_decided == exp_succ + exp_fail + assert stats.success_rate == exp_rate + + +# --------------------------------------------------------------------------- +# Single dimension: converter_type +# --------------------------------------------------------------------------- +class TestGroupByConverterType: + """Group-by a single dimension: converter_type.""" + + def test_no_converter_tracked(self): + attacks = [ + AttackResult( + conversation_id="conv-1", + objective="test", + attack_identifier={"__type__": "PromptSendingAttack"}, + outcome=AttackOutcome.SUCCESS, + last_response=None, + ), + AttackResult( + conversation_id="conv-2", + objective="test", + attack_identifier={"__type__": "PromptSendingAttack"}, + outcome=AttackOutcome.FAILURE, + last_response=None, + ), + ] + result = analyze_results(attacks, group_by=["converter_type"]) + + stats = result.dimensions["converter_type"]["no_converter"] + assert stats.successes == 1 + assert stats.failures == 1 + assert stats.success_rate == 0.5 + + def test_multiple_converter_types(self): + attacks = [ + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"]), + make_attack_with_converters(AttackOutcome.FAILURE, ["ROT13Converter"]), + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"]), + ] + result = analyze_results(attacks, group_by=["converter_type"]) + + base64 = result.dimensions["converter_type"]["Base64Converter"] + assert base64.successes == 2 + assert base64.failures == 0 + assert base64.success_rate == 1.0 + + rot13 = result.dimensions["converter_type"]["ROT13Converter"] + assert rot13.successes == 0 + assert rot13.failures == 1 + assert rot13.success_rate == 0.0 + + def test_multiple_converters_per_attack(self): + attacks = [ + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter", "ROT13Converter"]), + ] + result = analyze_results(attacks, group_by=["converter_type"]) + + assert result.dimensions["converter_type"]["Base64Converter"].successes == 1 + assert result.dimensions["converter_type"]["ROT13Converter"].successes == 1 + + def test_undetermined_tracked(self): + attacks = [ + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"]), + make_attack_with_converters(AttackOutcome.UNDETERMINED, ["Base64Converter"]), + ] + result = analyze_results(attacks, group_by=["converter_type"]) + + stats = result.dimensions["converter_type"]["Base64Converter"] + assert stats.successes == 1 + assert stats.undetermined == 1 + assert stats.total_decided == 1 + assert stats.success_rate == 1.0 + + +# --------------------------------------------------------------------------- +# Composite dimensions +# --------------------------------------------------------------------------- +class TestCompositeDimensions: + """Group-by composite (cross-product) dimensions.""" + + def test_composite_two_dimensions(self): + attacks = [ + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"], attack_type="CrescendoAttack"), + make_attack_with_converters(AttackOutcome.FAILURE, ["ROT13Converter"], attack_type="CrescendoAttack"), + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"], attack_type="RedTeamingAttack"), + ] + result = analyze_results(attacks, group_by=[("converter_type", "attack_type")]) + + dim = result.dimensions[("converter_type", "attack_type")] + assert dim[("Base64Converter", "CrescendoAttack")].successes == 1 + assert dim[("Base64Converter", "CrescendoAttack")].failures == 0 + assert dim[("ROT13Converter", "CrescendoAttack")].failures == 1 + assert dim[("Base64Converter", "RedTeamingAttack")].successes == 1 + + def test_composite_with_multi_converter_creates_cross_product(self): + attacks = [ + make_attack_with_converters( + AttackOutcome.SUCCESS, + ["Base64Converter", "ROT13Converter"], + attack_type="CrescendoAttack", + ), + ] + result = analyze_results(attacks, group_by=[("converter_type", "attack_type")]) + + dim = result.dimensions[("converter_type", "attack_type")] + assert ("Base64Converter", "CrescendoAttack") in dim + assert ("ROT13Converter", "CrescendoAttack") in dim + assert dim[("Base64Converter", "CrescendoAttack")].successes == 1 + assert dim[("ROT13Converter", "CrescendoAttack")].successes == 1 + + def test_mixed_single_and_composite(self): + attacks = [ + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"], attack_type="CrescendoAttack"), + make_attack_with_converters(AttackOutcome.FAILURE, ["ROT13Converter"], attack_type="RedTeamingAttack"), + ] + result = analyze_results( + attacks, + group_by=[ + "attack_type", + ("converter_type", "attack_type"), + ], + ) + + # Single dimension present + assert "attack_type" in result.dimensions + assert result.dimensions["attack_type"]["CrescendoAttack"].successes == 1 + assert result.dimensions["attack_type"]["RedTeamingAttack"].failures == 1 + + # Composite dimension present + composite = result.dimensions[("converter_type", "attack_type")] + assert composite[("Base64Converter", "CrescendoAttack")].successes == 1 + assert composite[("ROT13Converter", "RedTeamingAttack")].failures == 1 + + +# --------------------------------------------------------------------------- +# Custom dimensions +# --------------------------------------------------------------------------- +class TestCustomDimensions: + """User-supplied custom dimension extractors.""" + + def test_custom_extractor(self): + def _extract_objective(result: AttackResult) -> list[str]: + return [result.objective] + + attacks = [ + AttackResult( + conversation_id="c1", + objective="steal secrets", + attack_identifier={"__type__": "PromptSendingAttack"}, + outcome=AttackOutcome.SUCCESS, + ), + AttackResult( + conversation_id="c2", + objective="bypass filter", + attack_identifier={"__type__": "PromptSendingAttack"}, + outcome=AttackOutcome.FAILURE, + ), + ] + result = analyze_results( + attacks, + group_by=["objective"], + custom_dimensions={"objective": _extract_objective}, + ) + + assert result.dimensions["objective"]["steal secrets"].successes == 1 + assert result.dimensions["objective"]["bypass filter"].failures == 1 + + def test_custom_dimension_in_composite(self): + def _extract_objective(result: AttackResult) -> list[str]: + return [result.objective] + + attacks = [ + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"]), + ] + # Override objective on the attack for testing + attacks[0].objective = "test_obj" + + result = analyze_results( + attacks, + group_by=[("converter_type", "objective")], + custom_dimensions={"objective": _extract_objective}, + ) + + composite = result.dimensions[("converter_type", "objective")] + assert ("Base64Converter", "test_obj") in composite + + +# --------------------------------------------------------------------------- +# Single dimension: label +# --------------------------------------------------------------------------- +class TestGroupByLabel: + """Group-by a single dimension: label.""" + + def test_no_labels_tracked(self): + attacks = [make_attack(AttackOutcome.SUCCESS)] + result = analyze_results(attacks, group_by=["label"]) + + stats = result.dimensions["label"]["no_labels"] + assert stats.successes == 1 + assert stats.total_decided == 1 + + def test_single_label(self): + message = MessagePiece( + role="user", + original_value="test", + labels={"operation_name": "op_trash_panda"}, + ) + attacks = [ + AttackResult( + conversation_id="c1", + objective="test", + attack_identifier={"__type__": "PromptSendingAttack"}, + outcome=AttackOutcome.SUCCESS, + last_response=message, + ), + ] + result = analyze_results(attacks, group_by=["label"]) + + assert "operation_name=op_trash_panda" in result.dimensions["label"] + assert result.dimensions["label"]["operation_name=op_trash_panda"].successes == 1 + + def test_multiple_labels_per_attack(self): + """Each label key=value pair creates its own stats entry.""" + message = MessagePiece( + role="user", + original_value="test", + labels={"operation_name": "op_trash_panda", "operator": "roakey"}, + ) + attacks = [ + AttackResult( + conversation_id="c1", + objective="test", + attack_identifier={"__type__": "PromptSendingAttack"}, + outcome=AttackOutcome.SUCCESS, + last_response=message, + ), + ] + result = analyze_results(attacks, group_by=["label"]) + + assert result.dimensions["label"]["operation_name=op_trash_panda"].successes == 1 + assert result.dimensions["label"]["operator=roakey"].successes == 1 + + def test_label_composite_with_attack_type(self): + message = MessagePiece( + role="user", + original_value="test", + labels={"operator": "roakey"}, + ) + attacks = [ + AttackResult( + conversation_id="c1", + objective="test", + attack_identifier={"__type__": "CrescendoAttack"}, + outcome=AttackOutcome.SUCCESS, + last_response=message, + ), + AttackResult( + conversation_id="c2", + objective="test", + attack_identifier={"__type__": "CrescendoAttack"}, + outcome=AttackOutcome.FAILURE, + last_response=message, + ), + ] + result = analyze_results(attacks, group_by=[("label", "attack_type")]) + + dim = result.dimensions[("label", "attack_type")] + assert ("operator=roakey", "CrescendoAttack") in dim + assert dim[("operator=roakey", "CrescendoAttack")].successes == 1 + assert dim[("operator=roakey", "CrescendoAttack")].failures == 1 + + +# --------------------------------------------------------------------------- +# Default group_by behavior +# --------------------------------------------------------------------------- +class TestDefaultGroupBy: + """When group_by=None, all built-in dimensions are used.""" + + def test_defaults_include_all_builtin_dimensions(self): + attacks = [make_attack(AttackOutcome.SUCCESS)] + result = analyze_results(attacks) + + assert "attack_type" in result.dimensions + assert "converter_type" in result.dimensions + assert "label" in result.dimensions + + def test_empty_group_by_returns_only_overall(self): + attacks = [make_attack(AttackOutcome.SUCCESS)] + result = analyze_results(attacks, group_by=[]) + + assert result.dimensions == {} + assert result.overall.successes == 1 + + +# --------------------------------------------------------------------------- +# Deprecated dimension alias: attack_identifier -> attack_type +# --------------------------------------------------------------------------- +class TestDeprecatedAttackIdentifierAlias: + """Using 'attack_identifier' in group_by should work but warn.""" + + def test_alias_emits_deprecation_warning(self): + attacks = [make_attack(AttackOutcome.SUCCESS, attack_type="CrescendoAttack")] + with pytest.warns(DeprecationWarning, match="'attack_identifier' is deprecated"): + analyze_results(attacks, group_by=["attack_identifier"]) + + def test_alias_resolves_to_canonical_key(self): + attacks = [ + make_attack(AttackOutcome.SUCCESS, attack_type="CrescendoAttack"), + make_attack(AttackOutcome.FAILURE, attack_type="CrescendoAttack"), + ] + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + result = analyze_results(attacks, group_by=["attack_identifier"]) + + # The dimension key in the result should be the canonical "attack_type" + assert "attack_type" in result.dimensions + assert "attack_identifier" not in result.dimensions + assert result.dimensions["attack_type"]["CrescendoAttack"].successes == 1 + + def test_alias_in_composite(self): + attacks = [ + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"], attack_type="CrescendoAttack"), + ] + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + result = analyze_results(attacks, group_by=[("converter_type", "attack_identifier")]) + + # Composite key uses canonical names + assert ("converter_type", "attack_type") in result.dimensions + dim = result.dimensions[("converter_type", "attack_type")] + assert ("Base64Converter", "CrescendoAttack") in dim