Skip to content
Open
1 change: 1 addition & 0 deletions pyoaev/apis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .ai_target import * # noqa: F401,F403
from .attack_pattern import * # noqa: F401,F403
from .collector import * # noqa: F401,F403
from .cve import * # noqa: F401,F403
Expand Down
30 changes: 30 additions & 0 deletions pyoaev/apis/ai_target.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from pyoaev.base import RESTManager, RESTObject
from pyoaev.mixins import CreateMixin, DeleteMixin, GetMixin, ListMixin, UpdateMixin
from pyoaev.utils import RequiredOptional


class AiTarget(RESTObject):
_id_attr = "asset_id"


class AiTargetManager(
GetMixin, ListMixin, CreateMixin, UpdateMixin, DeleteMixin, RESTManager
):
Comment thread
SamuelHassine marked this conversation as resolved.
"""Manage AI Target assets (LLM endpoints / AI agents under adversarial test)."""

_path = "/ai_targets"
_obj_cls = AiTarget
Comment thread
SamuelHassine marked this conversation as resolved.
_create_attrs = RequiredOptional(
required=("asset_name", "ai_target_provider"),
optional=(
"asset_description",
"asset_tags",
"asset_external_reference",
"ai_target_endpoint",
"ai_target_model",
"ai_target_modality",
"ai_target_system_prompt",
"ai_target_api_key_variable",
"ai_target_configuration",
),
)
37 changes: 36 additions & 1 deletion pyoaev/apis/inject_expectation/inject_expectation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict
from typing import Any, Dict, List

from pyoaev import exceptions as exc
from pyoaev.apis.inject_expectation.model import (
Expand Down Expand Up @@ -40,6 +40,41 @@ def expectations_assets_for_source(
)
return result

@exc.on_http_error(exc.OpenAEVListError)
def ai_expectations_for_source(
self, source_id: str, **kwargs: Any
) -> List[Dict[str, Any]]:
"""Returns agentless DETECTION/PREVENTION expectations (AI adversarial injects) not yet
filled for the given source. Used by AI defense collectors (LLM firewall / guardrail).

:param source_id: the identifier of the collector requesting expectations
:type source_id: str
:raises OpenAEVParsingError: if the server does not return a JSON list of dicts
:return: a list of agentless detection/prevention expectation dicts
:rtype: list[dict]
"""
path = f"{self.path}/ai/{source_id}"
# http_get is annotated Union[Dict, requests.Response]; widen to Any so the list
# validation below stays meaningful to type checkers, then build the result list
# explicitly to satisfy the declared return type.
raw: Any = self.openaev.http_get(path, **kwargs)
if not isinstance(raw, list):
raise exc.OpenAEVParsingError(
error_message=(
f"Expected a list of AI expectations from {path}, "
f"got {type(raw).__name__}"
)
)
for item in raw:
if not isinstance(item, dict):
raise exc.OpenAEVParsingError(
error_message=(
f"Expected AI expectation objects (dicts) from {path}, "
f"got a list element of type {type(item).__name__}"
)
)
return [item for item in raw]

def expectations_models_for_source(self, source_id: str, **kwargs: Any):
"""Returns all expectations from OpenAEV that have had no result yet
from the source_id (e.g. collector).
Expand Down
1 change: 1 addition & 0 deletions pyoaev/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def __init__(
self.inject_expectation = apis.InjectExpectationManager(self)
self.payload = apis.PayloadManager(self)
self.security_platform = apis.SecurityPlatformManager(self)
self.ai_target = apis.AiTargetManager(self)
self.inject_expectation_trace = apis.InjectExpectationTraceManager(self)
self.signature = apis.SignatureApiManager(self)
self.tag = apis.TagManager(self)
Expand Down
6 changes: 6 additions & 0 deletions pyoaev/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class OpenAEVCreateError(OpenAEVError):
pass


class OpenAEVDeleteError(OpenAEVError):
pass


class SignatureTransmissionError(OpenAEVError):
"""Signatures didn't make it. Validation rejected them, 4xx slammed the door, or retries ran dry."""

Expand Down Expand Up @@ -222,5 +226,7 @@ def wrapped_f(*args: Any, **kwargs: Any) -> Any:
"OpenAEVListError",
"OpenAEVGetError",
"OpenAEVUpdateError",
"OpenAEVCreateError",
"OpenAEVDeleteError",
"SignatureTransmissionError",
]
2 changes: 1 addition & 1 deletion pyoaev/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class DeleteMixin(_RestManagerBase):
_path: Optional[str]
openaev: pyoaev.OpenAEV

@exc.on_http_error(exc.OpenAEVCreateError)
@exc.on_http_error(exc.OpenAEVDeleteError)
def delete(
self, id: Optional[Union[str, int]] = None, **kwargs: Any
) -> requests.Response:
Expand Down
4 changes: 4 additions & 0 deletions pyoaev/security_domain/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,8 @@ class SecurityDomains(Enum):
URL_FILTERING = {"domain_name": "URL Filtering", "domain_color": "#66CCFF"}
CLOUD = {"domain_name": "Cloud", "domain_color": "#9999CC"}
TABLE_TOP = {"domain_name": "Tabletop", "domain_color": "#FFCC33"}
ARTIFICIAL_INTELLIGENCE = {
"domain_name": "Artificial Intelligence",
"domain_color": "#7C4DFF",
}
TOCLASSIFY = {"domain_name": "To classify", "domain_color": "#FFFFFF"}
15 changes: 15 additions & 0 deletions pyoaev/signatures/ai_marker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Deterministic per-inject canary marker shared by the AI red-team injector and the AI defense
collectors.

The marker is derived purely from the inject (and optional agent) id, so the injector that sends the
attack and the collector that validates an AI defense response compute the same value independently,
without the platform having to store it. It is emitted by the injector (request header + in-prompt
token) and matched by collectors against guardrail / firewall logs.
"""

import hashlib


def build_marker(inject_id: str, agent_id: str = "") -> str:
seed = f"{inject_id}:{agent_id}".encode("utf-8")
return "oaev" + hashlib.sha256(seed).hexdigest()[:16]
Comment thread
SamuelHassine marked this conversation as resolved.
4 changes: 4 additions & 0 deletions pyoaev/signatures/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,7 @@ class SignatureTypes(str, Enum):
SIG_TYPE_CLOUD_REGION = "cloud_region"
SIG_TYPE_TARGET_SERVICE = "target_service"
SIG_TYPE_QUERY = "query"
# AI adversarial validation: correlate AI defense (LLM firewall / guardrail) events back to a
# specific AI inject execution.
SIG_TYPE_AI_REQUEST_MARKER = "ai_request_marker"
SIG_TYPE_AI_TARGET_ENDPOINT = "ai_target_endpoint"
Comment thread
SamuelHassine marked this conversation as resolved.
89 changes: 89 additions & 0 deletions test/apis/ai_target/test_ai_target.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from unittest import TestCase, main, mock


def mock_response(*args, **kwargs):
class MockResponse:
def __init__(self):
self.status_code = 200
self.history = None
self.content = None
self.headers = {"Content-Type": "application/json"}

def json(self):
return {}

return MockResponse()


class TestAiTargetManager(TestCase):
@mock.patch("requests.Session.request", side_effect=mock_response)
def test_create_posts_to_ai_targets(self, mock_request):
from pyoaev import OpenAEV

api_client = OpenAEV("url", "token")
data = {
"asset_name": "OpenAI guardrail",
"ai_target_provider": "openai",
"ai_target_endpoint": "https://api.openai.com/v1",
}

api_client.ai_target.create(data=data)

_, kwargs = mock_request.call_args
self.assertEqual(kwargs["method"], "post")
self.assertEqual(kwargs["url"], "url/api/ai_targets")
self.assertEqual(kwargs["json"], data)

@mock.patch("requests.Session.request", side_effect=mock_response)
def test_get_requests_single_ai_target(self, mock_request):
from pyoaev import OpenAEV

api_client = OpenAEV("url", "token")

api_client.ai_target.get("asset-123")

_, kwargs = mock_request.call_args
self.assertEqual(kwargs["method"], "get")
self.assertEqual(kwargs["url"], "url/api/ai_targets/asset-123")

@mock.patch("requests.Session.request", side_effect=mock_response)
def test_update_puts_to_ai_target(self, mock_request):
from pyoaev import OpenAEV

api_client = OpenAEV("url", "token")
new_data = {"asset_description": "updated"}

api_client.ai_target.update("asset-123", new_data=new_data)

_, kwargs = mock_request.call_args
self.assertEqual(kwargs["method"], "put")
self.assertEqual(kwargs["url"], "url/api/ai_targets/asset-123")
self.assertEqual(kwargs["json"], new_data)

@mock.patch("requests.Session.request", side_effect=mock_response)
def test_delete_calls_delete_on_ai_target(self, mock_request):
from pyoaev import OpenAEV

api_client = OpenAEV("url", "token")

api_client.ai_target.delete("asset-123")

_, kwargs = mock_request.call_args
self.assertEqual(kwargs["method"], "delete")
self.assertEqual(kwargs["url"], "url/api/ai_targets/asset-123")

def test_delete_http_error_is_mapped_to_delete_error(self):
from pyoaev import OpenAEV
from pyoaev.exceptions import OpenAEVDeleteError, OpenAEVHttpError

api_client = OpenAEV("url", "token")

with mock.patch.object(
OpenAEV, "http_delete", side_effect=OpenAEVHttpError("boom")
):
with self.assertRaises(OpenAEVDeleteError):
api_client.ai_target.delete("asset-123")


if __name__ == "__main__":
main()
69 changes: 69 additions & 0 deletions test/apis/inject_expectation/test_inject_expectation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from unittest import TestCase, main, mock

from pyoaev import OpenAEV
from pyoaev.exceptions import OpenAEVHttpError, OpenAEVListError, OpenAEVParsingError


def make_json_response(payload):
class MockResponse:
def __init__(self):
self.status_code = 200
self.history = None
self.content = None
self.headers = {"Content-Type": "application/json"}

def json(self):
return payload

return MockResponse()


class TestAiExpectationsForSource(TestCase):
@mock.patch("requests.Session.request")
def test_returns_list_of_expectations(self, mock_request):
expectations = [
{"inject_expectation_id": "exp-1"},
{"inject_expectation_id": "exp-2"},
]
mock_request.return_value = make_json_response(expectations)
api_client = OpenAEV("url", "token")

result = api_client.inject_expectation.ai_expectations_for_source("collector-1")

mock_request.assert_called_once()
_, kwargs = mock_request.call_args
self.assertEqual(kwargs["method"], "get")
self.assertEqual(kwargs["url"], "url/api/injects/expectations/ai/collector-1")
self.assertEqual(result, expectations)

@mock.patch("requests.Session.request")
def test_raises_parsing_error_when_not_a_list(self, mock_request):
mock_request.return_value = make_json_response({"unexpected": "shape"})
api_client = OpenAEV("url", "token")

with self.assertRaises(OpenAEVParsingError):
api_client.inject_expectation.ai_expectations_for_source("collector-1")

@mock.patch("requests.Session.request")
def test_raises_parsing_error_when_elements_not_dicts(self, mock_request):
mock_request.return_value = make_json_response(["not", "a", "dict"])
api_client = OpenAEV("url", "token")

with self.assertRaises(OpenAEVParsingError) as ctx:
api_client.inject_expectation.ai_expectations_for_source("collector-1")

# The message should call out the offending element type, not just "list".
self.assertIn("str", str(ctx.exception))

def test_http_error_is_mapped_to_list_error(self):
api_client = OpenAEV("url", "token")

with mock.patch.object(
OpenAEV, "http_get", side_effect=OpenAEVHttpError("boom")
):
with self.assertRaises(OpenAEVListError):
api_client.inject_expectation.ai_expectations_for_source("collector-1")


if __name__ == "__main__":
main()
41 changes: 41 additions & 0 deletions test/signatures/test_ai_marker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import unittest

from pyoaev.signatures.ai_marker import build_marker


class TestBuildMarker(unittest.TestCase):
def test_marker_has_expected_prefix_and_length(self):
marker = build_marker("inject-1", "agent-1")

self.assertTrue(marker.startswith("oaev"))
# "oaev" prefix (4 chars) + 16 hex chars from the sha256 digest.
self.assertEqual(len(marker), 20)
self.assertTrue(all(c in "0123456789abcdef" for c in marker[4:]))

def test_marker_is_deterministic_for_same_inputs(self):
self.assertEqual(
build_marker("inject-1", "agent-1"),
build_marker("inject-1", "agent-1"),
)

def test_marker_differs_for_different_inputs(self):
self.assertNotEqual(
build_marker("inject-1", "agent-1"),
build_marker("inject-2", "agent-1"),
)
self.assertNotEqual(
build_marker("inject-1", "agent-1"),
build_marker("inject-1", "agent-2"),
)

def test_agent_id_defaults_to_empty(self):
self.assertEqual(build_marker("inject-1"), build_marker("inject-1", ""))

def test_marker_value_is_stable_across_runs(self):
# Lock the exact value so the injector and collectors (potentially in
# other languages) stay byte-for-byte compatible.
self.assertEqual(build_marker("inject-1", "agent-1"), "oaev6457d87cba0698ab")


if __name__ == "__main__":
unittest.main()
29 changes: 29 additions & 0 deletions test/signatures/test_ai_signature_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import unittest

from pyoaev.signatures.signature_type import SignatureType
from pyoaev.signatures.types import MatchTypes, SignatureTypes


class TestAiSignatureTypes(unittest.TestCase):
def test_ai_signature_type_values(self):
self.assertEqual(
SignatureTypes.SIG_TYPE_AI_REQUEST_MARKER.value, "ai_request_marker"
)
self.assertEqual(
SignatureTypes.SIG_TYPE_AI_TARGET_ENDPOINT.value, "ai_target_endpoint"
)

def test_ai_request_marker_usable_in_signature_type(self):
signature_type = SignatureType(
label=SignatureTypes.SIG_TYPE_AI_REQUEST_MARKER,
match_type=MatchTypes.MATCH_TYPE_SIMPLE,
)

struct = signature_type.make_struct_for_matching(data="oaevdeadbeef")

self.assertEqual(struct.get("type"), MatchTypes.MATCH_TYPE_SIMPLE.value)
self.assertEqual(struct.get("data"), "oaevdeadbeef")


if __name__ == "__main__":
unittest.main()
Loading