diff --git a/pyoaev/__init__.py b/pyoaev/__init__.py index 050d448..1b0239f 100644 --- a/pyoaev/__init__.py +++ b/pyoaev/__init__.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -__version__ = "2.260622.0" +__version__ = "2.260627.0" from pyoaev._version import ( # noqa: F401 __author__, @@ -8,6 +8,12 @@ __license__, __title__, ) +from pyoaev.asset_types import ( # noqa: F401 + AssetCategory, + AssetCriticality, + AssetSubCategory, + CloudProvider, +) from pyoaev.client import OpenAEV # noqa: F401 from pyoaev.configuration import * # noqa: F401,F403,F405 from pyoaev.contracts import * # noqa: F401,F403,F405 @@ -22,5 +28,9 @@ "__title__", "__version__", "OpenAEV", + "AssetCategory", + "AssetSubCategory", + "CloudProvider", + "AssetCriticality", ] __all__.extend(exceptions.__all__) # noqa: F405 diff --git a/pyoaev/apis/endpoint.py b/pyoaev/apis/endpoint.py index f9683cc..6793c3d 100644 --- a/pyoaev/apis/endpoint.py +++ b/pyoaev/apis/endpoint.py @@ -13,16 +13,30 @@ class Endpoint(RESTObject): class EndpointManager(RESTManager): _path = "/endpoints" _obj_cls = Endpoint + # asset_name is the only required attribute. Everything else - including endpoint_hostname, + # endpoint_platform and endpoint_arch - is optional: agents and collectors typically provide + # the endpoint fields, while category-driven assets (web app, cloud, network, ...) may omit + # them and the backend defaults endpoint_platform / endpoint_arch to "Unknown". _create_attrs = RequiredOptional( - required=( + required=("asset_name",), + optional=( "endpoint_hostname", "endpoint_platform", "endpoint_arch", - ), - optional=( - "endpoint_mac_addresses", "endpoint_ips", + "endpoint_mac_addresses", + "endpoint_url", + "asset_description", "asset_external_reference", + "asset_tags", + "asset_category", + "asset_subcategory", + "asset_criticality", + "asset_internet_facing", + "asset_cloud_provider", + "asset_cloud_native_type", + "asset_cloud_region", + "asset_metadata", ), ) diff --git a/pyoaev/asset_types.py b/pyoaev/asset_types.py new file mode 100644 index 0000000..b8e0b41 --- /dev/null +++ b/pyoaev/asset_types.py @@ -0,0 +1,135 @@ +# -*- coding: utf-8 -*- +"""Canonical asset taxonomy for OpenAEV (mirrors the backend enums). + +These give producers (collectors, custom integrations) a single, typed vocabulary for the +two-level asset categorization (category + subcategory), criticality and cloud provider, instead +of free-form strings. They are plain ``str`` enums so they can be dropped straight into the dict +payloads sent to the API. +""" + +from enum import Enum + + +class AssetCategory(str, Enum): + HOST = "HOST" + CONTAINER_WORKLOAD = "CONTAINER_WORKLOAD" + CLOUD_RESOURCE = "CLOUD_RESOURCE" + WEB_APPLICATION = "WEB_APPLICATION" + NETWORK_DEVICE = "NETWORK_DEVICE" + MOBILE_DEVICE = "MOBILE_DEVICE" + IOT_OT_DEVICE = "IOT_OT_DEVICE" + IDENTITY = "IDENTITY" + SAAS_APPLICATION = "SAAS_APPLICATION" + AI_TARGET = "AI_TARGET" + SECURITY_PLATFORM = "SECURITY_PLATFORM" + GENERIC_ASSET = "GENERIC_ASSET" + + +class AssetSubCategory(str, Enum): + # HOST + SERVER = "SERVER" + WORKSTATION = "WORKSTATION" + LAPTOP = "LAPTOP" + VIRTUAL_MACHINE = "VIRTUAL_MACHINE" + HYPERVISOR = "HYPERVISOR" + MAINFRAME = "MAINFRAME" + THIN_CLIENT = "THIN_CLIENT" + # CONTAINER_WORKLOAD + CONTAINER = "CONTAINER" + CONTAINER_IMAGE = "CONTAINER_IMAGE" + KUBERNETES_POD = "KUBERNETES_POD" + KUBERNETES_CLUSTER = "KUBERNETES_CLUSTER" + KUBERNETES_NODE = "KUBERNETES_NODE" + SERVERLESS_FUNCTION = "SERVERLESS_FUNCTION" + # CLOUD_RESOURCE + COMPUTE = "COMPUTE" + STORAGE = "STORAGE" + DATABASE = "DATABASE" + NETWORKING = "NETWORKING" + SERVERLESS = "SERVERLESS" + CONTAINER_REGISTRY = "CONTAINER_REGISTRY" + KUBERNETES = "KUBERNETES" + IAM_PRINCIPAL = "IAM_PRINCIPAL" + SECRETS_KEY_MGMT = "SECRETS_KEY_MGMT" + MESSAGING_QUEUE = "MESSAGING_QUEUE" + ANALYTICS_DATA = "ANALYTICS_DATA" + AI_ML_SERVICE = "AI_ML_SERVICE" + IAC_TEMPLATE = "IAC_TEMPLATE" + CLOUD_OTHER = "CLOUD_OTHER" + # WEB_APPLICATION + WEBSITE = "WEBSITE" + WEB_API = "WEB_API" + SINGLE_PAGE_APP = "SINGLE_PAGE_APP" + GRAPHQL_API = "GRAPHQL_API" + WEB_SERVICE = "WEB_SERVICE" + MICROSERVICE = "MICROSERVICE" + # NETWORK_DEVICE + ROUTER = "ROUTER" + SWITCH = "SWITCH" + FIREWALL = "FIREWALL" + LOAD_BALANCER = "LOAD_BALANCER" + VPN_GATEWAY = "VPN_GATEWAY" + WIRELESS_AP = "WIRELESS_AP" + PROXY = "PROXY" + DNS_SERVER = "DNS_SERVER" + DHCP_SERVER = "DHCP_SERVER" + SAN_NAS = "SAN_NAS" + NETWORK_OTHER = "NETWORK_OTHER" + # MOBILE_DEVICE + SMARTPHONE = "SMARTPHONE" + TABLET = "TABLET" + # IOT_OT_DEVICE + IOT_SENSOR = "IOT_SENSOR" + IP_CAMERA = "IP_CAMERA" + GATEWAY = "GATEWAY" + POINT_OF_SALE = "POINT_OF_SALE" + MEDIA_DEVICE = "MEDIA_DEVICE" + PLC = "PLC" + RTU = "RTU" + HMI = "HMI" + SCADA_HISTORIAN = "SCADA_HISTORIAN" + MEDICAL_DEVICE = "MEDICAL_DEVICE" + PRINTER_PERIPHERAL = "PRINTER_PERIPHERAL" + BUILDING_MGMT = "BUILDING_MGMT" + # IDENTITY + USER_ACCOUNT = "USER_ACCOUNT" + SERVICE_ACCOUNT = "SERVICE_ACCOUNT" + GROUP = "GROUP" + ROLE = "ROLE" + SHARED_MAILBOX = "SHARED_MAILBOX" + NON_HUMAN_IDENTITY = "NON_HUMAN_IDENTITY" + # SAAS_APPLICATION + SAAS_APP = "SAAS_APP" + SAAS_TENANT = "SAAS_TENANT" + # AI_TARGET + LLM_MODEL = "LLM_MODEL" + AI_AGENT = "AI_AGENT" + MCP_SERVER = "MCP_SERVER" + RAG_PIPELINE = "RAG_PIPELINE" + # SECURITY_PLATFORM + EDR = "EDR" + XDR = "XDR" + SIEM = "SIEM" + SOAR = "SOAR" + NDR = "NDR" + ISPM = "ISPM" + LLM_FIREWALL = "LLM_FIREWALL" + AI_GATEWAY = "AI_GATEWAY" + + +class CloudProvider(str, Enum): + AWS = "AWS" + AZURE = "AZURE" + GCP = "GCP" + OCI = "OCI" + ALIBABA = "ALIBABA" + KUBERNETES = "KUBERNETES" + OTHER = "OTHER" + + +class AssetCriticality(str, Enum): + VERY_HIGH = "VERY_HIGH" + HIGH = "HIGH" + MEDIUM = "MEDIUM" + LOW = "LOW" + UNKNOWN = "UNKNOWN" diff --git a/test/apis/endpoint/test_endpoint.py b/test/apis/endpoint/test_endpoint.py index dcf081e..8755359 100644 --- a/test/apis/endpoint/test_endpoint.py +++ b/test/apis/endpoint/test_endpoint.py @@ -1,7 +1,7 @@ from unittest import TestCase, main, mock from unittest.mock import ANY -from pyoaev import OpenAEV +from pyoaev import AssetCategory, AssetSubCategory, CloudProvider, OpenAEV from pyoaev.apis.inputs.search import Filter, FilterGroup, SearchPaginationInput @@ -53,5 +53,49 @@ def test_search_input_correctly_serialised(self, mock_request): ) +class TestEndpointCategorization(TestCase): + @mock.patch("requests.Session.request", side_effect=mock_response) + def test_upsert_web_application_without_platform(self, mock_request): + api_client = OpenAEV("url", "token") + data = { + "asset_name": "Filigran website", + "asset_category": AssetCategory.WEB_APPLICATION, + "asset_subcategory": AssetSubCategory.WEBSITE, + "endpoint_url": "https://filigran.io", + "asset_internet_facing": True, + } + + api_client.endpoint.upsert(data) + + mock_request.assert_called_once() + _, kwargs = mock_request.call_args + self.assertEqual(kwargs["method"], "post") + self.assertEqual(kwargs["url"], "url/api/endpoints/agentless/upsert") + self.assertEqual(kwargs["json"]["asset_category"], "WEB_APPLICATION") + # A category-driven asset can be upserted without platform / arch. + self.assertNotIn("endpoint_platform", kwargs["json"]) + self.assertNotIn("endpoint_arch", kwargs["json"]) + + @mock.patch("requests.Session.request", side_effect=mock_response) + def test_upsert_cloud_resource(self, mock_request): + api_client = OpenAEV("url", "token") + data = { + "asset_name": "prod-bucket", + "asset_category": AssetCategory.CLOUD_RESOURCE, + "asset_subcategory": AssetSubCategory.STORAGE, + "asset_cloud_provider": CloudProvider.AWS, + "asset_cloud_native_type": "s3_bucket", + "asset_cloud_region": "eu-west-1", + } + + api_client.endpoint.upsert(data) + + mock_request.assert_called_once() + _, kwargs = mock_request.call_args + self.assertEqual(kwargs["url"], "url/api/endpoints/agentless/upsert") + self.assertEqual(kwargs["json"]["asset_cloud_provider"], "AWS") + self.assertEqual(kwargs["json"]["asset_cloud_native_type"], "s3_bucket") + + if __name__ == "__main__": main()