Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pyoaev/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
__version__ = "2.260622.0"
__version__ = "2.260627.0"

from pyoaev._version import ( # noqa: F401
__author__,
Expand All @@ -8,6 +8,12 @@
__license__,
__title__,
)
from pyoaev.asset_types import ( # noqa: F401
AssetCategory,
AssetCriticality,
AssetSubCategory,
CloudProvider,
)
from pyoaev.client import OpenAEV # noqa: F401
from pyoaev.configuration import * # noqa: F401,F403,F405
from pyoaev.contracts import * # noqa: F401,F403,F405
Expand All @@ -22,5 +28,9 @@
"__title__",
"__version__",
"OpenAEV",
"AssetCategory",
"AssetSubCategory",
"CloudProvider",
"AssetCriticality",
]
__all__.extend(exceptions.__all__) # noqa: F405
22 changes: 18 additions & 4 deletions pyoaev/apis/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,30 @@ class Endpoint(RESTObject):
class EndpointManager(RESTManager):
_path = "/endpoints"
_obj_cls = Endpoint
# asset_name is the only required attribute. Everything else - including endpoint_hostname,
# endpoint_platform and endpoint_arch - is optional: agents and collectors typically provide
# the endpoint fields, while category-driven assets (web app, cloud, network, ...) may omit
# them and the backend defaults endpoint_platform / endpoint_arch to "Unknown".
_create_attrs = RequiredOptional(
required=(
required=("asset_name",),
optional=(
"endpoint_hostname",
"endpoint_platform",
"endpoint_arch",
),
optional=(
"endpoint_mac_addresses",
"endpoint_ips",
"endpoint_mac_addresses",
"endpoint_url",
"asset_description",
"asset_external_reference",
"asset_tags",
"asset_category",
"asset_subcategory",
"asset_criticality",
"asset_internet_facing",
"asset_cloud_provider",
"asset_cloud_native_type",
"asset_cloud_region",
"asset_metadata",
),
)

Expand Down
135 changes: 135 additions & 0 deletions pyoaev/asset_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# -*- coding: utf-8 -*-
"""Canonical asset taxonomy for OpenAEV (mirrors the backend enums).

These give producers (collectors, custom integrations) a single, typed vocabulary for the
two-level asset categorization (category + subcategory), criticality and cloud provider, instead
of free-form strings. They are plain ``str`` enums so they can be dropped straight into the dict
payloads sent to the API.
"""

from enum import Enum


class AssetCategory(str, Enum):
HOST = "HOST"
CONTAINER_WORKLOAD = "CONTAINER_WORKLOAD"
CLOUD_RESOURCE = "CLOUD_RESOURCE"
WEB_APPLICATION = "WEB_APPLICATION"
NETWORK_DEVICE = "NETWORK_DEVICE"
MOBILE_DEVICE = "MOBILE_DEVICE"
IOT_OT_DEVICE = "IOT_OT_DEVICE"
IDENTITY = "IDENTITY"
SAAS_APPLICATION = "SAAS_APPLICATION"
AI_TARGET = "AI_TARGET"
SECURITY_PLATFORM = "SECURITY_PLATFORM"
GENERIC_ASSET = "GENERIC_ASSET"


class AssetSubCategory(str, Enum):
# HOST
SERVER = "SERVER"
WORKSTATION = "WORKSTATION"
LAPTOP = "LAPTOP"
VIRTUAL_MACHINE = "VIRTUAL_MACHINE"
HYPERVISOR = "HYPERVISOR"
MAINFRAME = "MAINFRAME"
THIN_CLIENT = "THIN_CLIENT"
# CONTAINER_WORKLOAD
CONTAINER = "CONTAINER"
CONTAINER_IMAGE = "CONTAINER_IMAGE"
KUBERNETES_POD = "KUBERNETES_POD"
KUBERNETES_CLUSTER = "KUBERNETES_CLUSTER"
KUBERNETES_NODE = "KUBERNETES_NODE"
SERVERLESS_FUNCTION = "SERVERLESS_FUNCTION"
# CLOUD_RESOURCE
COMPUTE = "COMPUTE"
STORAGE = "STORAGE"
DATABASE = "DATABASE"
NETWORKING = "NETWORKING"
SERVERLESS = "SERVERLESS"
CONTAINER_REGISTRY = "CONTAINER_REGISTRY"
KUBERNETES = "KUBERNETES"
IAM_PRINCIPAL = "IAM_PRINCIPAL"
SECRETS_KEY_MGMT = "SECRETS_KEY_MGMT"
MESSAGING_QUEUE = "MESSAGING_QUEUE"
ANALYTICS_DATA = "ANALYTICS_DATA"
AI_ML_SERVICE = "AI_ML_SERVICE"
IAC_TEMPLATE = "IAC_TEMPLATE"
CLOUD_OTHER = "CLOUD_OTHER"
# WEB_APPLICATION
WEBSITE = "WEBSITE"
WEB_API = "WEB_API"
SINGLE_PAGE_APP = "SINGLE_PAGE_APP"
GRAPHQL_API = "GRAPHQL_API"
WEB_SERVICE = "WEB_SERVICE"
MICROSERVICE = "MICROSERVICE"
# NETWORK_DEVICE
ROUTER = "ROUTER"
SWITCH = "SWITCH"
FIREWALL = "FIREWALL"
LOAD_BALANCER = "LOAD_BALANCER"
VPN_GATEWAY = "VPN_GATEWAY"
WIRELESS_AP = "WIRELESS_AP"
PROXY = "PROXY"
DNS_SERVER = "DNS_SERVER"
DHCP_SERVER = "DHCP_SERVER"
SAN_NAS = "SAN_NAS"
NETWORK_OTHER = "NETWORK_OTHER"
# MOBILE_DEVICE
SMARTPHONE = "SMARTPHONE"
TABLET = "TABLET"
# IOT_OT_DEVICE
IOT_SENSOR = "IOT_SENSOR"
IP_CAMERA = "IP_CAMERA"
GATEWAY = "GATEWAY"
POINT_OF_SALE = "POINT_OF_SALE"
MEDIA_DEVICE = "MEDIA_DEVICE"
PLC = "PLC"
RTU = "RTU"
HMI = "HMI"
SCADA_HISTORIAN = "SCADA_HISTORIAN"
MEDICAL_DEVICE = "MEDICAL_DEVICE"
PRINTER_PERIPHERAL = "PRINTER_PERIPHERAL"
BUILDING_MGMT = "BUILDING_MGMT"
# IDENTITY
USER_ACCOUNT = "USER_ACCOUNT"
SERVICE_ACCOUNT = "SERVICE_ACCOUNT"
GROUP = "GROUP"
ROLE = "ROLE"
SHARED_MAILBOX = "SHARED_MAILBOX"
NON_HUMAN_IDENTITY = "NON_HUMAN_IDENTITY"
# SAAS_APPLICATION
SAAS_APP = "SAAS_APP"
SAAS_TENANT = "SAAS_TENANT"
# AI_TARGET
LLM_MODEL = "LLM_MODEL"
AI_AGENT = "AI_AGENT"
MCP_SERVER = "MCP_SERVER"
RAG_PIPELINE = "RAG_PIPELINE"
# SECURITY_PLATFORM
EDR = "EDR"
XDR = "XDR"
SIEM = "SIEM"
SOAR = "SOAR"
NDR = "NDR"
ISPM = "ISPM"
LLM_FIREWALL = "LLM_FIREWALL"
AI_GATEWAY = "AI_GATEWAY"


class CloudProvider(str, Enum):
AWS = "AWS"
AZURE = "AZURE"
GCP = "GCP"
OCI = "OCI"
ALIBABA = "ALIBABA"
KUBERNETES = "KUBERNETES"
OTHER = "OTHER"


class AssetCriticality(str, Enum):
VERY_HIGH = "VERY_HIGH"
HIGH = "HIGH"
MEDIUM = "MEDIUM"
LOW = "LOW"
UNKNOWN = "UNKNOWN"
46 changes: 45 additions & 1 deletion test/apis/endpoint/test_endpoint.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from unittest import TestCase, main, mock
from unittest.mock import ANY

from pyoaev import OpenAEV
from pyoaev import AssetCategory, AssetSubCategory, CloudProvider, OpenAEV
from pyoaev.apis.inputs.search import Filter, FilterGroup, SearchPaginationInput


Expand Down Expand Up @@ -53,5 +53,49 @@ def test_search_input_correctly_serialised(self, mock_request):
)


class TestEndpointCategorization(TestCase):
@mock.patch("requests.Session.request", side_effect=mock_response)
def test_upsert_web_application_without_platform(self, mock_request):
api_client = OpenAEV("url", "token")
data = {
"asset_name": "Filigran website",
"asset_category": AssetCategory.WEB_APPLICATION,
"asset_subcategory": AssetSubCategory.WEBSITE,
"endpoint_url": "https://filigran.io",
"asset_internet_facing": True,
}

api_client.endpoint.upsert(data)

mock_request.assert_called_once()
_, kwargs = mock_request.call_args
self.assertEqual(kwargs["method"], "post")
self.assertEqual(kwargs["url"], "url/api/endpoints/agentless/upsert")
self.assertEqual(kwargs["json"]["asset_category"], "WEB_APPLICATION")
Comment thread
SamuelHassine marked this conversation as resolved.
# A category-driven asset can be upserted without platform / arch.
self.assertNotIn("endpoint_platform", kwargs["json"])
self.assertNotIn("endpoint_arch", kwargs["json"])

@mock.patch("requests.Session.request", side_effect=mock_response)
def test_upsert_cloud_resource(self, mock_request):
api_client = OpenAEV("url", "token")
data = {
"asset_name": "prod-bucket",
"asset_category": AssetCategory.CLOUD_RESOURCE,
"asset_subcategory": AssetSubCategory.STORAGE,
"asset_cloud_provider": CloudProvider.AWS,
"asset_cloud_native_type": "s3_bucket",
"asset_cloud_region": "eu-west-1",
}

api_client.endpoint.upsert(data)

mock_request.assert_called_once()
_, kwargs = mock_request.call_args
self.assertEqual(kwargs["url"], "url/api/endpoints/agentless/upsert")
self.assertEqual(kwargs["json"]["asset_cloud_provider"], "AWS")
self.assertEqual(kwargs["json"]["asset_cloud_native_type"], "s3_bucket")
Comment thread
SamuelHassine marked this conversation as resolved.


if __name__ == "__main__":
main()
Loading