diff --git a/bindings/sysman/python/.gitignore b/bindings/sysman/python/.gitignore new file mode 100644 index 00000000..30ff04e6 --- /dev/null +++ b/bindings/sysman/python/.gitignore @@ -0,0 +1,55 @@ +# Python cache files +__pycache__/ +*.py[cod] +*$py.class + +# Coverage reports +.coverage +.coverage.* +htmlcov/ +.tox/ +.pytest_cache/ + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Virtual environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db \ No newline at end of file diff --git a/bindings/sysman/python/LICENSE b/bindings/sysman/python/LICENSE new file mode 100644 index 00000000..20c06793 --- /dev/null +++ b/bindings/sysman/python/LICENSE @@ -0,0 +1,9 @@ +MIT License + +Copyright (C) 2025 Intel Corporation + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/bindings/sysman/python/README.md b/bindings/sysman/python/README.md new file mode 100644 index 00000000..eab97937 --- /dev/null +++ b/bindings/sysman/python/README.md @@ -0,0 +1,219 @@ +# drivers.gpu.compute.pyzes +pyzes +====== + +Python bindings to the Intel Level-Zero-Driver Library +------------------------------------------------ + +Provides a Python interface to GPU management and monitoring functions. + +This is a wrapper around the Level-Zero-Driver library. +For information about the Level-Zero-Driver library, see the spec document +https://oneapi-src.github.io/level-zero-spec/level-zero/latest/index.html + +Download the latest package from: https://github.com/oneapi-src/level-zero + +The level-zero header file contains function documentation that is relevant +to this wrapper. The header file is distributed with driver (ze_api.h and zes_api.h) + +This module does not handles allocating structs before returning the desired value. +Non-success codes are returned and respective error is printed. + +REQUIREMENTS +------------ +- **Python 3.10** (required) +- ctypes module (included in standard Python library) +- Level Zero driver installed on the system + +INSTALLATION +------------ +```bash +# Ensure you have Python 3.10 installed +python3.10 --version + +# Install the package (when available) +pip install pyzes +``` + +USAGE +----- +``` +>>> from pyzes import * +>>> rc = zesInit(0) +>>> driver_count = c_uint32(0) +>>> rc = pyzes.zesDriverGet(byref(driver_count), None) +>>> print(f"Driver Count: {driver_count.value}") + +``` + +## C Structure and its python module class ## +struct zes_process_state_t { + +zes_structure_type_t stype +[in] type of this structure + +const void *pNext +[in][optional] must be null or a pointer to an extension-specific structure (i.e. contains stype and pNext). + +uint32_t processId +[out] Host OS process ID. + +uint64_t memSize +[out] Device memory size in bytes allocated by this process (may not necessarily be resident on the device at the time of reading). + +uint64_t sharedSize +[out] The size of shared device memory mapped into this process (may not necessarily be resident on the device at the time of reading). + +zes_engine_type_flags_t engines +[out] Bitfield of accelerator engine types being used by this process. +} + +Python Class +class zes_process_state_t(Structure): + _fields_ = [ + ("pid", c_uint32), + ("command", c_char * ZES_STRING_PROPERTY_SIZE), + ("memSize", c_uint64), # in bytes + ("sharedMemSize", c_uint64),# in bytes + ("engineType", zes_engine_type_flags_t), + ("subdeviceId", c_uint32), + ] + + +FUNCTIONS +--------- +Python methods wrap Level-Zero-Driver functions, implemented in a C shared library. +Each function's use is the same: +- C function output parameters are filled in with values, and return codes are returned. + +``` +ze_result_t zesDeviceGetProperties( + zes_device_handle_t hDevice, + zes_device_properties_t* pProperties); + +>>> props = zes_device_properties_t() +>>> props.stype = ZES_STRUCTURE_TYPE_DEVICE_PROPERTIES +>>> props.pNext = None +>>> pyzes.zesDeviceGetProperties(devices[i], byref(props)) + +``` + +- C structs are converted into Python classes. +``` +// C Function and typedef struct + +ze_result_t zesDeviceGetProperties( + zes_device_handle_t hDevice, + zes_device_properties_t* pProperties); + +typedef struct _zes_device_properties_t +{ + zes_structure_type_t stype; + void* pNext; + ze_device_properties_t core; + uint32_t numSubdevices; + char serialNumber[ZES_STRING_PROPERTY_SIZE]; + char boardNumber[ZES_STRING_PROPERTY_SIZE]; + char brandName[ZES_STRING_PROPERTY_SIZE]; + char modelName[ZES_STRING_PROPERTY_SIZE]; + char vendorName[ZES_STRING_PROPERTY_SIZE]; + char driverVersion[ZES_STRING_PROPERTY_SIZE] +} zes_device_properties_t; + +>>>print(f"numSubdevices: {props.numSubdevices}") +>>>print(f"serialNumber: {props.serialNumber}") +>>>print(f"boardNumber: {props.boardNumber}") +>>>print(f"brandName: {props.brandName}") +>>>print(f"modelName: {props.modelName}") +>>>print(f"driverVersion: {props.driverVersion}") +>>>print(f"coreClockMHz: {props.core.coreClockRate}") +``` + +HOW TO USE STRUCTURE CHAINING +``` +>>> props = zes_device_properties_t() +>>> props.stype = ZES_STRUCTURE_TYPE_DEVICE_PROPERTIES +>>> ext = zes_device_ext_properties_t() +>>> ext.stype = ZES_STRUCTURE_TYPE_DEVICE_EXT_PROPERTIES +>>> ext.pNext = None +>>> base.pNext = cast(pointer(ext), c_void_p) +>>> pyzes.zesDeviceGetProperties(devices[i], byref(props)) +>>> print(f"Extension properties flags: {ext.flags}") +``` + +For more information see the Level-Zero-Driver documentation. + +VARIABLES +--------- +All meaningful constants and enums are exposed in Python module. + +SUPPORTED APIs +-------------- + +| API Function | Module | Since Version | Limitations | +|--------------|--------|---------------|-------------| +| `zesInit` | Device | 0.0.0 | None | +| `zesDriverGet` | Device | 0.0.0 | None | +| `zesDeviceGet` | Device | 0.0.0 | None | +| `zesDeviceGetProperties` | Device | 0.0.0 | None | +| `zesDriverGetDeviceByUuidExp` | Device | 0.0.0 | Experimental API | +| `zesDeviceProcessesGetState` | Device | 0.0.0 | None | +| **Memory Management** |-|-|-| +| `zesDeviceEnumMemoryModules` | Memory | 0.0.0 | None | +| `zesMemoryGetProperties` | Memory | 0.0.0 | None | +| `zesMemoryGetState` | Memory | 0.0.0 | None | +| `zesMemoryGetBandwidth` | Memory | 0.0.0 | Linux: Requires superuser or read permissions for telem nodes | +| **Power Management** |-|-|-| +| `zesDeviceEnumPowerDomains` | Power | 0.0.0 | None | +| `zesPowerGetEnergyCounter` | Power | 0.0.0 | Linux: Requires superuser or read permissions for telem nodes | +| **Frequency Management** |-|-|-| +| `zesDeviceEnumFrequencyDomains` | Frequency | 0.0.0 | None | +| `zesFrequencyGetState` | Frequency | 0.0.0 | None | +| **Temperature Monitoring** |-|-|-| +| `zesDeviceEnumTemperatureSensors` | Temperature | 0.0.0 | None | +| `zesTemperatureGetProperties` | Temperature | 0.0.0 | None | +| `zesTemperatureGetConfig` | Temperature | 0.0.0 | None | +| `zesTemperatureGetState` | Temperature | 0.0.0 | Linux: Requires superuser or read permissions for telem nodes | +| **Engine Management** |-|-|-| +| `zesDeviceEnumEngineGroups` | Engine | 0.1.0 | Linux: Shows "no handles found" error when not in superuser mode | +| `zesEngineGetProperties` | Engine | 0.1.0 | None | +| `zesEngineGetActivity` | Engine | 0.1.0 | None | + +RELEASE NOTES +------------- +Version 0.0.0 +- Added pyzes.py module that has python binding wrapper functions. +- Added pyzes_example.py tool as a sample app. + +Version 0.1.0 +- Added support for engine module APIs. +- Added APIs: + zesDeviceEnumEngineGroups + zesEngineGetProperties + zesEngineGetActivity + +Notes: + Linux: + zesPowerGetEnergyCounter + zesTemperatureGetState + zesMemoryGetBandwidth + + The above APIs needs user to be in superuser/root mode or have read permissions for telem nodes + Telem Node Directory: /sys/class/intel_pmt/telem(1/2/3/4)/telem + + zesDeviceEnumEngineGroups shows no handles found error when not in super user mode. + +# Contributing + +See [CONTRIBUTING](CONTRIBUTING.md) for more information. + +# License + +Distributed under the MIT license. See [LICENSE](LICENSE) for more information. + +# Security + +See Intel's [Security Center](https://www.intel.com/content/www/us/en/security-center/default.html) for information on how to report a potential security issue or vulnerability. + +See also [SECURITY](SECURITY.md). + diff --git a/bindings/sysman/python/source/examples/README.md b/bindings/sysman/python/source/examples/README.md new file mode 100644 index 00000000..9f900ec7 --- /dev/null +++ b/bindings/sysman/python/source/examples/README.md @@ -0,0 +1,42 @@ +# pyzes Examples + +This directory contains example scripts demonstrating how to use the pyzes module. + +**IMPORTANT:** The usage instructions below apply to pyzes installed via pip. If you've cloned this repository locally, you should run the examples directly from the `test/` directory in the repository root instead. + +## Using Examples After Installation + +**These instructions are for users who installed pyzes via pip:** + +After installing pyzes via pip: +```bash +pip install pyzes +``` + +You can run examples directly: +```bash +python3 -m examples.pyzes_example +python3 -m examples.pyzes_black_box_test -h +``` + +--- + +## If You Cloned the Repository + +If you cloned the level-zero repository locally, run the examples directly from the repository: + +```bash +cd bindings/sysman/python/source/examples +python3 pyzes_example.py +python3 pyzes_black_box_test.py -h +``` + +The examples in this `source/examples/` directory are packaged for pip distribution only. + +--- + +## Requirements + +- Python 3.10+ +- Intel GPU with Level-Zero drivers installed +- pyzes module diff --git a/bindings/sysman/python/source/examples/__init__.py b/bindings/sysman/python/source/examples/__init__.py new file mode 100644 index 00000000..97f11039 --- /dev/null +++ b/bindings/sysman/python/source/examples/__init__.py @@ -0,0 +1,7 @@ +""" +Example scripts demonstrating pyzes usage. + +These examples show how to use the pyzes module for Intel Level-Zero System Management. +""" + +__version__ = "0.1.0" diff --git a/bindings/sysman/python/source/examples/pyzes_black_box_test.py b/bindings/sysman/python/source/examples/pyzes_black_box_test.py new file mode 100644 index 00000000..fb4f1147 --- /dev/null +++ b/bindings/sysman/python/source/examples/pyzes_black_box_test.py @@ -0,0 +1,1030 @@ +#!/usr/bin/env python3 +## +# Copyright (C) 2025 Intel Corporation +# +# SPDX-License-Identifier: MIT +# +## + +## +# Python Black Box Test for pyzes.py - Level Zero Sysman Python Bindings +## + +import argparse +import os +import sys +from ctypes import * + +# Add the source directory to Python path so we can import pyzes +script_dir = os.path.dirname(os.path.abspath(__file__)) +source_dir = os.path.join(script_dir, "..", "source") +source_dir = os.path.abspath(source_dir) +if source_dir not in sys.path: + sys.path.insert(0, source_dir) + +# Import the pyzes module +try: + import pyzes as pz +except ImportError as e: + print("Error: Could not import pyzes module") + print(f"Import error: {e}") + print(f"Make sure pyzes.py is in the source directory: {source_dir}") + print(f"Current working directory: {os.getcwd()}") + print(f"Script directory: {script_dir}") + print(f"Source directory: {source_dir}") + sys.exit(1) + + +# Setup environment and path for pyzes module +def setup_environment(): + """Setup Python path and environment for pyzes module""" + # Get the directory where this script is located + script_dir = os.path.dirname(os.path.abspath(__file__)) + + # Add script directory to Python path if not already there + if script_dir not in sys.path: + sys.path.insert(0, script_dir) + print(f"Added {script_dir} to Python path") + + # Set PYTHONPATH environment variable for completeness + current_pythonpath = os.environ.get("PYTHONPATH", "") + if script_dir not in current_pythonpath: + if current_pythonpath: + os.environ["PYTHONPATH"] = f"{script_dir}{os.pathsep}{current_pythonpath}" + else: + os.environ["PYTHONPATH"] = script_dir + + +setup_environment() + +verbose = True + + +def check_rc(label, rc): + """Check return code and exit on error""" + if rc != pz.ZE_RESULT_SUCCESS: + print(f"ERROR: {label} failed with ze_result_t={rc}") + return False + return True + + +def print_error(func_name, error_msg): + """Print error message in a consistent format""" + print(f"ERROR: {func_name}: {error_msg}") + + +def print_verbose(message): + """Print message only if verbose mode is enabled""" + if verbose: + print(message) + + +def get_memory_type_string(mem_type): + """Convert memory type enum to string""" + type_map = { + pz.ZES_MEM_TYPE_HBM: "ZES_MEM_TYPE_HBM", + pz.ZES_MEM_TYPE_DDR: "ZES_MEM_TYPE_DDR", + pz.ZES_MEM_TYPE_DDR3: "ZES_MEM_TYPE_DDR3", + pz.ZES_MEM_TYPE_DDR4: "ZES_MEM_TYPE_DDR4", + pz.ZES_MEM_TYPE_DDR5: "ZES_MEM_TYPE_DDR5", + pz.ZES_MEM_TYPE_LPDDR: "ZES_MEM_TYPE_LPDDR", + pz.ZES_MEM_TYPE_LPDDR3: "ZES_MEM_TYPE_LPDDR3", + pz.ZES_MEM_TYPE_LPDDR4: "ZES_MEM_TYPE_LPDDR4", + pz.ZES_MEM_TYPE_LPDDR5: "ZES_MEM_TYPE_LPDDR5", + pz.ZES_MEM_TYPE_SRAM: "ZES_MEM_TYPE_SRAM", + pz.ZES_MEM_TYPE_L1: "ZES_MEM_TYPE_L1", + pz.ZES_MEM_TYPE_L3: "ZES_MEM_TYPE_L3", + pz.ZES_MEM_TYPE_GRF: "ZES_MEM_TYPE_GRF", + pz.ZES_MEM_TYPE_SLM: "ZES_MEM_TYPE_SLM", + pz.ZES_MEM_TYPE_GDDR4: "ZES_MEM_TYPE_GDDR4", + pz.ZES_MEM_TYPE_GDDR5: "ZES_MEM_TYPE_GDDR5", + pz.ZES_MEM_TYPE_GDDR5X: "ZES_MEM_TYPE_GDDR5X", + pz.ZES_MEM_TYPE_GDDR6: "ZES_MEM_TYPE_GDDR6", + pz.ZES_MEM_TYPE_GDDR6X: "ZES_MEM_TYPE_GDDR6X", + pz.ZES_MEM_TYPE_GDDR7: "ZES_MEM_TYPE_GDDR7", + } + return type_map.get(mem_type, f"UNKNOWN_TYPE_{mem_type}") + + +def get_memory_location_string(mem_loc): + """Convert memory location enum to string""" + loc_map = { + pz.ZES_MEM_LOC_SYSTEM: "ZES_MEM_LOC_SYSTEM", + pz.ZES_MEM_LOC_DEVICE: "ZES_MEM_LOC_DEVICE", + } + return loc_map.get(mem_loc, f"UNKNOWN_LOCATION_{mem_loc}") + + +def get_memory_health_string(mem_health): + """Convert memory health enum to string""" + health_map = { + pz.ZES_MEM_HEALTH_UNKNOWN: "ZES_MEM_HEALTH_UNKNOWN", + pz.ZES_MEM_HEALTH_OK: "ZES_MEM_HEALTH_OK", + pz.ZES_MEM_HEALTH_DEGRADED: "ZES_MEM_HEALTH_DEGRADED", + pz.ZES_MEM_HEALTH_CRITICAL: "ZES_MEM_HEALTH_CRITICAL", + pz.ZES_MEM_HEALTH_REPLACE: "ZES_MEM_HEALTH_REPLACE", + } + return health_map.get(mem_health, f"UNKNOWN_HEALTH_{mem_health}") + + +def get_device_type_string(device_type): + """Convert device type enum to string""" + type_map = { + pz.ZES_DEVICE_TYPE_GPU: "ZES_DEVICE_TYPE_GPU", + pz.ZES_DEVICE_TYPE_CPU: "ZES_DEVICE_TYPE_CPU", + pz.ZES_DEVICE_TYPE_FPGA: "ZES_DEVICE_TYPE_FPGA", + pz.ZES_DEVICE_TYPE_MCA: "ZES_DEVICE_TYPE_MCA", + pz.ZES_DEVICE_TYPE_VPU: "ZES_DEVICE_TYPE_VPU", + } + return type_map.get(device_type, f"UNKNOWN_DEVICE_TYPE_{device_type}") + + +def get_frequency_domain_string(freq_domain): + """Convert frequency domain enum to string""" + domain_map = { + pz.ZES_FREQ_DOMAIN_GPU: "ZES_FREQ_DOMAIN_GPU", + pz.ZES_FREQ_DOMAIN_MEMORY: "ZES_FREQ_DOMAIN_MEMORY", + pz.ZES_FREQ_DOMAIN_MEDIA: "ZES_FREQ_DOMAIN_MEDIA", + } + return domain_map.get(freq_domain, f"UNKNOWN_FREQ_DOMAIN_{freq_domain}") + + +def get_throttle_reasons_string(throttle_reasons): + """Convert throttle reason flags to human-readable string""" + if throttle_reasons == 0: + return "None (not throttled)" + + reasons = [] + if throttle_reasons & pz.ZES_FREQ_THROTTLE_REASON_FLAG_AVE_PWR_CAP: + reasons.append("AVE_PWR_CAP") + if throttle_reasons & pz.ZES_FREQ_THROTTLE_REASON_FLAG_BURST_PWR_CAP: + reasons.append("BURST_PWR_CAP") + if throttle_reasons & pz.ZES_FREQ_THROTTLE_REASON_FLAG_CURRENT_LIMIT: + reasons.append("CURRENT_LIMIT") + if throttle_reasons & pz.ZES_FREQ_THROTTLE_REASON_FLAG_THERMAL_LIMIT: + reasons.append("THERMAL_LIMIT") + if throttle_reasons & pz.ZES_FREQ_THROTTLE_REASON_FLAG_PSU_ALERT: + reasons.append("PSU_ALERT") + if throttle_reasons & pz.ZES_FREQ_THROTTLE_REASON_FLAG_SW_RANGE: + reasons.append("SW_RANGE") + if throttle_reasons & pz.ZES_FREQ_THROTTLE_REASON_FLAG_HW_RANGE: + reasons.append("HW_RANGE") + + return " | ".join(reasons) + + +def get_temperature_sensor_string(temp_sensor): + """Convert temperature sensor enum to string""" + sensor_map = { + pz.ZES_TEMP_SENSORS_GLOBAL: "ZES_TEMP_SENSORS_GLOBAL", + pz.ZES_TEMP_SENSORS_GPU: "ZES_TEMP_SENSORS_GPU", + pz.ZES_TEMP_SENSORS_MEMORY: "ZES_TEMP_SENSORS_MEMORY", + pz.ZES_TEMP_SENSORS_GLOBAL_MIN: "ZES_TEMP_SENSORS_GLOBAL_MIN", + pz.ZES_TEMP_SENSORS_GPU_MIN: "ZES_TEMP_SENSORS_GPU_MIN", + pz.ZES_TEMP_SENSORS_MEMORY_MIN: "ZES_TEMP_SENSORS_MEMORY_MIN", + } + return sensor_map.get(temp_sensor, f"UNKNOWN_TEMP_SENSOR_{temp_sensor}") + + +def get_engine_type_string(engine_type): + """Convert engine type enum to string""" + type_map = { + pz.ZES_ENGINE_GROUP_ALL: "ZES_ENGINE_GROUP_ALL", + pz.ZES_ENGINE_GROUP_COMPUTE_ALL: "ZES_ENGINE_GROUP_COMPUTE_ALL", + pz.ZES_ENGINE_GROUP_MEDIA_ALL: "ZES_ENGINE_GROUP_MEDIA_ALL", + pz.ZES_ENGINE_GROUP_COPY_ALL: "ZES_ENGINE_GROUP_COPY_ALL", + pz.ZES_ENGINE_GROUP_COMPUTE_SINGLE: "ZES_ENGINE_GROUP_COMPUTE_SINGLE", + pz.ZES_ENGINE_GROUP_RENDER_SINGLE: "ZES_ENGINE_GROUP_RENDER_SINGLE", + pz.ZES_ENGINE_GROUP_MEDIA_DECODE_SINGLE: "ZES_ENGINE_GROUP_MEDIA_DECODE_SINGLE", + pz.ZES_ENGINE_GROUP_MEDIA_ENCODE_SINGLE: "ZES_ENGINE_GROUP_MEDIA_ENCODE_SINGLE", + pz.ZES_ENGINE_GROUP_COPY_SINGLE: "ZES_ENGINE_GROUP_COPY_SINGLE", + pz.ZES_ENGINE_GROUP_MEDIA_ENHANCEMENT_SINGLE: "ZES_ENGINE_GROUP_MEDIA_ENHANCEMENT_SINGLE", + pz.ZES_ENGINE_GROUP_3D_SINGLE: "ZES_ENGINE_GROUP_3D_SINGLE", + pz.ZES_ENGINE_GROUP_3D_RENDER_COMPUTE_ALL: "ZES_ENGINE_GROUP_3D_RENDER_COMPUTE_ALL", + pz.ZES_ENGINE_GROUP_RENDER_ALL: "ZES_ENGINE_GROUP_RENDER_ALL", + pz.ZES_ENGINE_GROUP_3D_ALL: "ZES_ENGINE_GROUP_3D_ALL", + pz.ZES_ENGINE_GROUP_MEDIA_CODEC_SINGLE: "ZES_ENGINE_GROUP_MEDIA_CODEC_SINGLE", + } + return type_map.get(engine_type, f"UNKNOWN_ENGINE_TYPE_{engine_type}") + + +def initialize_sysman_and_get_devices(): + """Initialize Sysman and enumerate drivers/devices. Returns (drivers, driver_count, devices, device_count).""" + if not initialize_sysman(): + return None, 0, None, 0 + + # Get drivers + driver_info = get_drivers() + if driver_info is None: + return None, 0, None, 0 + + drivers, driver_count = driver_info + + if driver_count == 0: + return drivers, driver_count, None, 0 + + # Get devices for the first driver + device_info = get_devices(drivers[0]) + if device_info is None: + return drivers, driver_count, None, 0 + + devices, device_count = device_info + return drivers, driver_count, devices, device_count + + +def initialize_sysman(): + """Initialize the Sysman interface""" + print_verbose("Initializing Sysman...") + rc = pz.zesInit(0) + if not check_rc("zesInit", rc): + return False + print_verbose("Sysman initialization successful") + return True + + +def get_drivers(): + """Get all available drivers""" + print_verbose("Getting driver count...") + driver_count = c_uint32(0) + rc = pz.zesDriverGet(byref(driver_count), None) + if not check_rc("zesDriverGet(count)", rc): + return None + + if driver_count.value == 0: + print("No drivers found") + return None + + print_verbose(f"Found {driver_count.value} driver(s)") + + # Allocate array for driver handles + DriverArray = pz.zes_driver_handle_t * driver_count.value + drivers = DriverArray() + + rc = pz.zesDriverGet(byref(driver_count), drivers) + if not check_rc("zesDriverGet(handles)", rc): + return None + + return drivers, driver_count.value + + +def get_devices(driver_handle): + """Get all devices for a given driver""" + print_verbose("Getting device count...") + device_count = c_uint32(0) + rc = pz.zesDeviceGet(driver_handle, byref(device_count), None) + if not check_rc("zesDeviceGet(count)", rc): + return None + + if device_count.value == 0: + print("No devices found") + return None + + print_verbose(f"Found {device_count.value} device(s)") + + # Allocate array for device handles + DeviceArray = pz.zes_device_handle_t * device_count.value + devices = DeviceArray() + + rc = pz.zesDeviceGet(driver_handle, byref(device_count), devices) + if not check_rc("zesDeviceGet(handles)", rc): + return None + + return devices, device_count.value + + +def test_global_operation(driver_handle, device_handle, device_index): + """Test global device operations: properties retrieval, UUID mapping, and device processes state""" + print(f"\n---- Device {device_index} Global Operations Test ----") + + # Test Device Properties + print_verbose("=== Device Properties Test ===") + + # Initialize properties structure + properties = pz.zes_device_properties_t() + properties.stype = pz.ZES_STRUCTURE_TYPE_DEVICE_PROPERTIES + properties.pNext = None + + rc = pz.zesDeviceGetProperties(device_handle, byref(properties)) + if not check_rc(f"zesDeviceGetProperties(device {device_index})", rc): + return False + + print_verbose("Device Properties:") + print_verbose( + f" Serial Number: {properties.serialNumber.decode('utf-8', errors='ignore')}" + ) + print_verbose( + f" Board Number: {properties.boardNumber.decode('utf-8', errors='ignore')}" + ) + print_verbose( + f" Brand Name: {properties.brandName.decode('utf-8', errors='ignore')}" + ) + print_verbose( + f" Model Name: {properties.modelName.decode('utf-8', errors='ignore')}" + ) + print_verbose( + f" Vendor Name: {properties.vendorName.decode('utf-8', errors='ignore')}" + ) + print_verbose( + f" Driver Version: {properties.driverVersion.decode('utf-8', errors='ignore')}" + ) + print_verbose(f" Number of Subdevices: {properties.numSubdevices}") + print_verbose( + f" Core Device Name: {properties.core.name.decode('utf-8', errors='ignore')}" + ) + print_verbose(f" Core Device Type: {get_device_type_string(properties.core.type)}") + print_verbose( + f" Core UUID: {'-'.join(f'{properties.core.uuid.id[i]:02x}' for i in range(16))}" + ) + print_verbose(f" Vendor ID: 0x{properties.core.vendorId:04X}") + print_verbose(f" Device ID: 0x{properties.core.deviceId:04X}") + + # Test Subdevice Properties (Experimental API) + print_verbose("\n=== Subdevice Properties Test (Experimental) ===") + if properties.numSubdevices > 0: + # First call to get count + subdevice_count = c_uint32(0) + rc = pz.zesDeviceGetSubDevicePropertiesExp( + device_handle, byref(subdevice_count), None + ) + if check_rc( + f"zesDeviceGetSubDevicePropertiesExp(device {device_index}, count)", rc + ): + print_verbose(f"Number of subdevices: {subdevice_count.value}") + + if subdevice_count.value > 0: + # Allocate array for subdevice properties + SubdevicePropsArray = ( + pz.zes_subdevice_exp_properties_t * subdevice_count.value + ) + subdevice_props = SubdevicePropsArray() + + # Initialize each structure + for i in range(subdevice_count.value): + subdevice_props[i].stype = ( + pz.ZES_STRUCTURE_TYPE_SUBDEVICE_EXP_PROPERTIES + ) + subdevice_props[i].pNext = None + + # Second call to get properties + rc = pz.zesDeviceGetSubDevicePropertiesExp( + device_handle, byref(subdevice_count), subdevice_props + ) + if check_rc( + f"zesDeviceGetSubDevicePropertiesExp(device {device_index}, properties)", + rc, + ): + for i in range(subdevice_count.value): + prop = subdevice_props[i] + print_verbose(f" Subdevice {i}:") + print_verbose(f" Subdevice ID: {prop.subdeviceId}") + print_verbose( + f" UUID: {'-'.join(f'{prop.uuid.id[j]:02x}' for j in range(16))}" + ) + else: + print_verbose("Subdevice properties API not supported or failed") + else: + print_verbose("No subdevices present on this device") + + # Test UUID mapping using the properties we just retrieved + print_verbose("\nUUID Mapping Test:") + try: + # Convert core UUID to zes_uuid_t + core_uuid = pz.zes_uuid_t() + for i in range(min(len(properties.core.uuid.id), len(core_uuid.id))): + core_uuid.id[i] = properties.core.uuid.id[i] + + # Test UUID mapping + mapped_device = pz.zes_device_handle_t() + on_subdevice = pz.ze_bool_t() + subdevice_id = c_uint32() + + ret = pz.zesDriverGetDeviceByUuidExp( + driver_handle, + core_uuid, + byref(mapped_device), + byref(on_subdevice), + byref(subdevice_id), + ) + + if ret == pz.ZE_RESULT_SUCCESS: + print_verbose(" UUID mapping successful") + print_verbose( + f" Same handle: {mapped_device.value == device_handle.value}" + ) + print_verbose(f" On subdevice: {bool(on_subdevice.value)}") + print_verbose(f" Subdevice ID: {subdevice_id.value}") + else: + print_verbose(f" UUID mapping failed with return code: {ret}") + + except Exception: + print_verbose(" UUID mapping test failed with exception") + + # Test Device Processes + print_verbose("\n=== Device Processes Test ===") + + process_count = c_uint32(0) + rc = pz.zesDeviceProcessesGetState(device_handle, byref(process_count), None) + if not check_rc(f"zesDeviceProcessesGetState(device {device_index}, count)", rc): + return False + + if process_count.value == 0: + print_verbose("No active processes found on this device") + else: + print_verbose(f"Found {process_count.value} active process(es)") + + # Allocate array for process states + ProcessArray = pz.zes_process_state_t * process_count.value + processes = ProcessArray() + + rc = pz.zesDeviceProcessesGetState( + device_handle, byref(process_count), processes + ) + if not check_rc( + f"zesDeviceProcessesGetState(device {device_index}, handles)", rc + ): + return False + + for i in range(process_count.value): + process = processes[i] + print_verbose(f" Process {i}:") + print_verbose(f" PID: {process.pid}") + print_verbose(f" Memory Size: {process.memSize} bytes") + print_verbose(f" Shared Memory Size: {process.sharedMemSize} bytes") + print_verbose(f" Engine Type Flags: 0x{process.engineType:08X}") + print_verbose(f" Subdevice ID: {process.subdeviceId}") + + return True + + +def test_device_processes(device_handle, device_index): + """Test device processes state""" + print(f"\n---- Device {device_index} Processes Test ----") + + process_count = c_uint32(0) + rc = pz.zesDeviceProcessesGetState(device_handle, byref(process_count), None) + if not check_rc(f"zesDeviceProcessesGetState(device {device_index}, count)", rc): + return False + + if process_count.value == 0: + print_verbose("No active processes found on this device") + return True + + print_verbose(f"Found {process_count.value} active process(es)") + + # Allocate array for process states + ProcessArray = pz.zes_process_state_t * process_count.value + processes = ProcessArray() + + rc = pz.zesDeviceProcessesGetState(device_handle, byref(process_count), processes) + if not check_rc(f"zesDeviceProcessesGetState(device {device_index}, handles)", rc): + return False + + for i in range(process_count.value): + process = processes[i] + print_verbose(f" Process {i}:") + print_verbose(f" PID: {process.pid}") + print_verbose(f" Memory Size: {process.memSize} bytes") + print_verbose(f" Shared Memory Size: {process.sharedMemSize} bytes") + print_verbose(f" Engine Type Flags: 0x{process.engineType:08X}") + print_verbose(f" Subdevice ID: {process.subdeviceId}") + + return True + + +def test_engine_modules(device_handle, device_index): + """Test engine module enumeration and operations""" + print(f"\n---- Device {device_index} Engine Modules Test ----") + + # Get engine module count + engine_count = c_uint32(0) + rc = pz.zesDeviceEnumEngineGroups(device_handle, byref(engine_count), None) + if not check_rc(f"zesDeviceEnumEngineGroups(device {device_index}, count)", rc): + return False + + if engine_count.value == 0: + print_verbose("No engine modules found on this device") + return True + + print_verbose(f"Found {engine_count.value} engine module(s)") + + # Allocate array for engine handles + EngineArray = pz.zes_engine_handle_t * engine_count.value + engine_handles = EngineArray() + + rc = pz.zesDeviceEnumEngineGroups( + device_handle, byref(engine_count), engine_handles + ) + if not check_rc(f"zesDeviceEnumEngineGroups(device {device_index}, handles)", rc): + return False + + for iteration in range(10): + print(f" Iteration {iteration}") + # Test each engine module + for i in range(engine_count.value): + print_verbose(f"\n Engine Module {i}:") + + # Test engine properties + props = pz.zes_engine_properties_t() + props.stype = pz.ZES_STRUCTURE_TYPE_ENGINE_PROPERTIES + props.pNext = None + + rc = pz.zesEngineGetProperties(engine_handles[i], byref(props)) + if not check_rc(f"zesEngineGetProperties(engine {i})", rc): + continue + + print_verbose(f" Type: {get_engine_type_string(props.type)}") + if props.onSubdevice: + print_verbose(f" Subdevice ID: {props.subdeviceId}") + + engineStats = pz.zes_engine_stats_t() + rc = pz.zesEngineGetActivity(engine_handles[i], byref(engineStats)) + if not check_rc(f"zesEngineGetActivity(engine {i})", rc): + continue + + print_verbose(" Activity:") + print_verbose(f" Active Time: {engineStats.activeTime}") + print_verbose(f" Timestamp: {engineStats.timestamp}") + + return True + + +def test_memory_modules(device_handle, device_index): + """Test memory module enumeration and operations""" + print(f"\n---- Device {device_index} Memory Modules Test ----") + + # Get memory module count + mem_count = c_uint32(0) + rc = pz.zesDeviceEnumMemoryModules(device_handle, byref(mem_count), None) + if not check_rc(f"zesDeviceEnumMemoryModules(device {device_index}, count)", rc): + return False + + if mem_count.value == 0: + print_verbose("No memory modules found on this device") + return True + + print_verbose(f"Found {mem_count.value} memory module(s)") + + # Allocate array for memory handles + MemoryArray = pz.zes_mem_handle_t * mem_count.value + mem_handles = MemoryArray() + + rc = pz.zesDeviceEnumMemoryModules(device_handle, byref(mem_count), mem_handles) + if not check_rc(f"zesDeviceEnumMemoryModules(device {device_index}, handles)", rc): + return False + + # Test each memory module + for i in range(mem_count.value): + print_verbose(f"\n Memory Module {i}:") + + # Test memory properties + props = pz.zes_mem_properties_t() + props.stype = pz.ZES_STRUCTURE_TYPE_MEM_PROPERTIES + props.pNext = None + + rc = pz.zesMemoryGetProperties(mem_handles[i], byref(props)) + if not check_rc(f"zesMemoryGetProperties(memory {i})", rc): + continue + + print_verbose(" Properties:") + print_verbose(f" Type: {get_memory_type_string(props.type)}") + print_verbose(f" Location: {get_memory_location_string(props.location)}") + print_verbose(f" Physical Size: {props.physicalSize} bytes") + print_verbose( + f" Bus Width: {props.busWidth} bits" + if props.busWidth != -1 + else " Bus Width: Unknown" + ) + print_verbose( + f" Num Channels: {props.numChannels}" + if props.numChannels != -1 + else " Num Channels: Unknown" + ) + print_verbose(f" On Subdevice: {bool(props.onSubdevice)}") + if props.onSubdevice: + print_verbose(f" Subdevice ID: {props.subdeviceId}") + + # Test memory state + state = pz.zes_mem_state_t() + state.stype = pz.ZES_STRUCTURE_TYPE_MEM_STATE + state.pNext = None + + rc = pz.zesMemoryGetState(mem_handles[i], byref(state)) + if not check_rc(f"zesMemoryGetState(memory {i})", rc): + continue + + print_verbose(" State:") + print_verbose(f" Health: {get_memory_health_string(state.health)}") + print_verbose(f" Free: {state.free} bytes") + print_verbose(f" Size: {state.size} bytes") + print_verbose(f" Used: {state.size - state.free} bytes") + if state.size > 0: + usage_percent = ((state.size - state.free) / state.size) * 100 + print_verbose(f" Usage: {usage_percent:.1f}%") + + # Test memory bandwidth + bandwidth = pz.zes_mem_bandwidth_t() + + rc = pz.zesMemoryGetBandwidth(mem_handles[i], byref(bandwidth)) + if not check_rc(f"zesMemoryGetBandwidth(memory {i})", rc): + continue + + print_verbose(" Bandwidth:") + print_verbose(f" Read Counter: {bandwidth.readCounter} bytes") + print_verbose(f" Write Counter: {bandwidth.writeCounter} bytes") + print_verbose(f" Max Bandwidth: {bandwidth.maxBandwidth} bytes/sec") + print_verbose(f" Timestamp: {bandwidth.timestamp} microseconds") + + return True + + +def test_power_module(device_handle, device_index): + """Test power domain enumeration and energy counter operations""" + print(f"\n---- Device {device_index} Power Domains Test ----") + + # Get power domain count + power_count = c_uint32(0) + rc = pz.zesDeviceEnumPowerDomains(device_handle, byref(power_count), None) + if not check_rc(f"zesDeviceEnumPowerDomains(device {device_index}, count)", rc): + return False + + if power_count.value == 0: + print_verbose("No power domains found on this device") + return True + + print_verbose(f"Found {power_count.value} power domain(s)") + + # Allocate array for power handles + PowerArray = pz.zes_pwr_handle_t * power_count.value + power_handles = PowerArray() + + rc = pz.zesDeviceEnumPowerDomains(device_handle, byref(power_count), power_handles) + if not check_rc(f"zesDeviceEnumPowerDomains(device {device_index}, handles)", rc): + return False + + # Test each power domain + for i in range(power_count.value): + print_verbose(f"\n Power Domain {i}:") + + # Test power energy counter + energy_counter = pz.zes_power_energy_counter_t() + + rc = pz.zesPowerGetEnergyCounter(power_handles[i], byref(energy_counter)) + if not check_rc(f"zesPowerGetEnergyCounter(power {i})", rc): + continue + + print_verbose(" Energy Counter:") + print_verbose(f" Energy: {energy_counter.energy}") + print_verbose(f" Timestamp: {energy_counter.timestamp} microseconds") + + # Take a second reading after a small delay + import time + + time.sleep(0.01) # 10ms delay + + energy_counter2 = pz.zes_power_energy_counter_t() + ret2 = pz.zesPowerGetEnergyCounter(power_handles[i], byref(energy_counter2)) + if ret2 == pz.ZE_RESULT_SUCCESS: + energy_delta = energy_counter2.energy - energy_counter.energy + time_delta = energy_counter2.timestamp - energy_counter.timestamp + if time_delta > 0: + print_verbose( + f" Energy Delta: {energy_delta} over {time_delta} microseconds" + ) + + return True + + +def test_frequency_domains(device_handle, device_index): + """Test frequency domain enumeration and state operations""" + print(f"\n---- Device {device_index} Frequency Domains Test ----") + + # Get frequency domain count + freq_count = c_uint32(0) + rc = pz.zesDeviceEnumFrequencyDomains(device_handle, byref(freq_count), None) + if not check_rc(f"zesDeviceEnumFrequencyDomains(device {device_index}, count)", rc): + return False + + if freq_count.value == 0: + print_verbose("No frequency domains found on this device") + return True + + print_verbose(f"Found {freq_count.value} frequency domain(s)") + + # Allocate array for frequency handles + FrequencyArray = pz.zes_freq_handle_t * freq_count.value + freq_handles = FrequencyArray() + + rc = pz.zesDeviceEnumFrequencyDomains( + device_handle, byref(freq_count), freq_handles + ) + if not check_rc( + f"zesDeviceEnumFrequencyDomains(device {device_index}, handles)", rc + ): + return False + + # Test each frequency domain + for i in range(freq_count.value): + print_verbose(f"\n Frequency Domain {i}:") + + # Test frequency state + freq_state = pz.zes_freq_state_t() + freq_state.stype = pz.ZES_STRUCTURE_TYPE_FREQ_STATE + freq_state.pNext = None + + rc = pz.zesFrequencyGetState(freq_handles[i], byref(freq_state)) + if not check_rc(f"zesFrequencyGetState(frequency {i})", rc): + continue + + print_verbose(" Frequency State:") + print_verbose( + f" Current Voltage: {freq_state.currentVoltage:.3f} V" + if freq_state.currentVoltage >= 0 + else " Current Voltage: Unknown" + ) + print_verbose( + f" Requested Frequency: {freq_state.request:.1f} MHz" + if freq_state.request >= 0 + else " Requested Frequency: Unknown" + ) + print_verbose( + f" TDP Frequency: {freq_state.tdp:.1f} MHz" + if freq_state.tdp >= 0 + else " TDP Frequency: Unknown" + ) + print_verbose( + f" Efficient Frequency: {freq_state.efficient:.1f} MHz" + if freq_state.efficient >= 0 + else " Efficient Frequency: Unknown" + ) + print_verbose( + f" Actual Frequency: {freq_state.actual:.1f} MHz" + if freq_state.actual >= 0 + else " Actual Frequency: Unknown" + ) + print_verbose( + f" Throttle Reasons: {get_throttle_reasons_string(freq_state.throttleReasons)}" + ) + + return True + + +def test_temperature_sensors(device_handle, device_index): + """Test temperature sensor enumeration and state operations""" + print(f"\n---- Device {device_index} Temperature Sensors Test ----") + + # Get temperature sensor count + temp_count = c_uint32(0) + rc = pz.zesDeviceEnumTemperatureSensors(device_handle, byref(temp_count), None) + if not check_rc( + f"zesDeviceEnumTemperatureSensors(device {device_index}, count)", rc + ): + return False + + if temp_count.value == 0: + print_verbose("No temperature sensors found on this device") + return True + + print_verbose(f"Found {temp_count.value} temperature sensor(s)") + + # Allocate array for temperature handles + TemperatureArray = pz.zes_temp_handle_t * temp_count.value + temp_handles = TemperatureArray() + + rc = pz.zesDeviceEnumTemperatureSensors( + device_handle, byref(temp_count), temp_handles + ) + if not check_rc( + f"zesDeviceEnumTemperatureSensors(device {device_index}, handles)", rc + ): + return False + + # Test each temperature sensor + for i in range(temp_count.value): + print_verbose(f"\n Temperature Sensor {i}:") + + # Test temperature properties + temp_props = pz.zes_temp_properties_t() + temp_props.stype = pz.ZES_STRUCTURE_TYPE_TEMP_PROPERTIES + temp_props.pNext = None + + rc = pz.zesTemperatureGetProperties(temp_handles[i], byref(temp_props)) + if not check_rc(f"zesTemperatureGetProperties(temperature {i})", rc): + continue + + print_verbose(" Temperature Properties:") + print_verbose(f" Type: {get_temperature_sensor_string(temp_props.type)}") + print_verbose(f" On Subdevice: {bool(temp_props.onSubdevice)}") + if temp_props.onSubdevice: + print_verbose(f" Subdevice ID: {temp_props.subdeviceId}") + print_verbose( + f" Max Temperature: {temp_props.maxTemperature:.1f} °C" + if temp_props.maxTemperature >= 0 + else " Max Temperature: Unknown" + ) + print_verbose( + f" Critical Temp Supported: {bool(temp_props.isCriticalTempSupported)}" + ) + print_verbose( + f" Threshold 1 Supported: {bool(temp_props.isThreshold1Supported)}" + ) + print_verbose( + f" Threshold 2 Supported: {bool(temp_props.isThreshold2Supported)}" + ) + + # Test temperature state - this is the main function we're demonstrating + temperature = c_double(0.0) + rc = pz.zesTemperatureGetState(temp_handles[i], byref(temperature)) + if not check_rc(f"zesTemperatureGetState(temperature {i})", rc): + continue + + print_verbose(f" Current Temperature: {temperature.value:.1f} °C") + + # Test temperature configuration if supported + temp_config = pz.zes_temp_config_t() + temp_config.stype = pz.ZES_STRUCTURE_TYPE_TEMP_CONFIG + temp_config.pNext = None + + rc = pz.zesTemperatureGetConfig(temp_handles[i], byref(temp_config)) + if rc == pz.ZE_RESULT_SUCCESS: + print_verbose(" Temperature Config:") + print_verbose(f" Critical Enabled: {bool(temp_config.enableCritical)}") + print_verbose( + f" Threshold 1: {temp_config.threshold1:.1f} °C" + if temp_config.threshold1 >= 0 + else " Threshold 1: Not set" + ) + print_verbose( + f" Threshold 2: {temp_config.threshold2:.1f} °C" + if temp_config.threshold2 >= 0 + else " Threshold 2: Not set" + ) + else: + print_verbose(f" Temperature Config: Not available (rc={rc})") + + return True + + +def run_all_tests(): + """Run all black box tests""" + print("=== Python Level Zero Sysman Black Box Test ===") + + # Get drivers + driver_info = get_drivers() + if driver_info is None: + return False + + drivers, driver_count = driver_info + + # Test each driver + for driver_idx in range(driver_count): + print(f"\n=== Driver {driver_idx} Tests ===") + + # Get devices for this driver + device_info = get_devices(drivers[driver_idx]) + if device_info is None: + print(f"No devices found for driver {driver_idx}") + continue + + devices, device_count = device_info + + # Test each device + for device_idx in range(device_count): + print(f"\n--- Device {device_idx} ---") + + # Test global device operations (properties and processes) + test_global_operation(drivers[driver_idx], devices[device_idx], device_idx) + + # Test memory modules + test_memory_modules(devices[device_idx], device_idx) + + # Test power domains + test_power_module(devices[device_idx], device_idx) + + # Test frequency domains + test_frequency_domains(devices[device_idx], device_idx) + + # Test temperature sensors + test_temperature_sensors(devices[device_idx], device_idx) + + # Test engine modules + test_engine_modules(devices[device_idx], device_idx) + + print("\n=== Test Completed ===") + return True + + +def main(): + """Main function""" + parser = argparse.ArgumentParser( + description="Python Level Zero Sysman Black Box Test", + epilog="""Examples: + %(prog)s -a # Run all tests + %(prog)s -m # Memory tests only + %(prog)s -g # Global operations (device properties and processes) only + %(prog)s -p # Power tests only + %(prog)s -f # Frequency tests only + %(prog)s -t # Temperature tests only + %(prog)s -e # Engine tests only + %(prog)s -h # Show help message""", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + parser.add_argument("-a", "--all", action="store_true", help="Run all tests") + parser.add_argument( + "-m", "--memory", action="store_true", help="Run only memory-related tests" + ) + parser.add_argument( + "-g", + "--global", + action="store_true", + help="Run only global operations (device properties and processes)", + ) + parser.add_argument( + "-p", "--power", action="store_true", help="Run only power-related tests" + ) + parser.add_argument( + "-f", + "--frequency", + action="store_true", + help="Run only frequency-related tests", + ) + parser.add_argument( + "-t", + "--temperature", + action="store_true", + help="Run only temperature sensor tests", + ) + parser.add_argument( + "--version", + action="version", + version="Python Level Zero Sysman Black Box Test v1.0", + ) + parser.add_argument("-e", "--engine", action="store_true", help="Run engine tests ") + + args = parser.parse_args() + + # Check if any specific test is requested + specific_test = ( + args.memory + or getattr(args, "global", False) + or args.power + or args.frequency + or args.temperature + or args.engine + or args.all + ) + + # If no arguments provided, show help and exit + if not specific_test: + parser.print_help() + return 0 + + # Initialize Sysman and get devices + drivers, driver_count, devices, device_count = initialize_sysman_and_get_devices() + try: + if args.all: + # Run all tests + success = run_all_tests() + else: + # Run specific tests + print("=== Python Level Zero Sysman Selective Black Box Test ===") + + if not drivers or driver_count == 0: + print("No drivers available for testing") + return 1 + + if not devices or device_count == 0: + print("No devices available for testing") + return 1 + + # Run selected tests on all devices + for device_idx in range(device_count): + if getattr(args, "global", False): + test_global_operation(drivers[0], devices[device_idx], device_idx) + + if args.memory: + test_memory_modules(devices[device_idx], device_idx) + + if args.engine: + test_engine_modules(devices[device_idx], device_idx) + + if args.power: + test_power_module(devices[device_idx], device_idx) + + if args.frequency: + test_frequency_domains(devices[device_idx], device_idx) + + if args.temperature: + test_temperature_sensors(devices[device_idx], device_idx) + + success = True + + return 0 if success else 1 + + except Exception as e: + print(f"Unhandled exception: {e}") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/bindings/sysman/python/source/examples/pyzes_example.py b/bindings/sysman/python/source/examples/pyzes_example.py new file mode 100644 index 00000000..9fc53253 --- /dev/null +++ b/bindings/sysman/python/source/examples/pyzes_example.py @@ -0,0 +1,744 @@ +#!/usr/bin/env python3 +## +# Copyright (C) 2025 Intel Corporation +# +# SPDX-License-Identifier: MIT +# +## + +""" +Run: + python pyzes_example.py + +If the Sysman APIs are not enabled you may need to export environment variables, e.g.: + export ZES_ENABLE_SYSMAN=1 +or just rely on calling zesInit(0) which initializes Sysman explicitly. + +This script enumerates drivers, devices, prints basic properties, and tries the +experimental zesDriverGetDeviceByUuidExp mapping if exported by the runtime. +""" + +import os +import sys +from ctypes import * + +# Add the source directory to Python path so we can import pyzes +script_dir = os.path.dirname(os.path.abspath(__file__)) +source_dir = os.path.join(script_dir, "..", "source") +source_dir = os.path.abspath(source_dir) +if source_dir not in sys.path: + sys.path.insert(0, source_dir) + +# Import the binding module (ensure its directory is on PYTHONPATH) +import pyzes +from pyzes import * + +# Set environment variables to enable Sysman +os.environ["ZES_ENABLE_SYSMAN"] = "1" +os.environ["ZELLO_SYSMAN_USE_ZESINIT"] = "1" + + +##################################################################################################################################### +# Helper utilities ## +def zes_driver_get_device_by_uuid(hDriver, uuid_bytes: bytes): + """Helper: given a 16-byte UUID (from core device properties), retrieve the corresponding Sysman device handle. + Returns (result_code, device_handle, on_subdevice(bool), subdevice_id(int)). + """ + if len(uuid_bytes) != ZES_MAX_UUID_SIZE: + raise ValueError( + f"uuid_bytes must be {ZES_MAX_UUID_SIZE} bytes, got {len(uuid_bytes)}" + ) + uuid_struct = zes_uuid_t() + for i, b in enumerate(uuid_bytes): + uuid_struct.id[i] = b + device_handle = zes_device_handle_t() + on_sub = ze_bool_t(0) + sub_id = c_uint32(0) + rc = zesDriverGetDeviceByUuidExp( + hDriver, uuid_struct, byref(device_handle), byref(on_sub), byref(sub_id) + ) + return rc, device_handle, (on_sub.value != 0), sub_id.value + + +def allocate_device_properties(): + props = zes_device_properties_t() + props.stype = ZES_STRUCTURE_TYPE_DEVICE_PROPERTIES + props.pNext = None + return props + + +def utf8(s): + return ( + s.split(b"\0", 1)[0].decode("utf-8", "replace") + if isinstance(s, (bytes, bytearray)) + else str(s) + ) + + +def format_device_properties(props: zes_device_properties_t) -> dict: + return { + "numSubdevices": props.numSubdevices, + "serialNumber": utf8(props.serialNumber), + "boardNumber": utf8(props.boardNumber), + "brandName": utf8(props.brandName), + "modelName": utf8(props.modelName), + "vendorName": utf8(props.vendorName), + "driverVersion": utf8(props.driverVersion), + "coreName": utf8(props.core.name), + "coreClockMHz": props.core.coreClockRate, + } + + +def chain_device_properties_with_ext() -> ( + tuple[zes_device_properties_t, zes_device_ext_properties_t] +): + """Allocate base Sysman device properties struct and chain an extension properties struct. + Returns (base, ext). Caller must keep both alive until after the API call. + Usage: + base, ext = chain_device_properties_with_ext() + rc = zesDeviceGetProperties(dev_handle, byref(base)) + if rc == ZE_RESULT_SUCCESS: + print('Ext UUID:', bytes(ext.uuid.id)) + """ + base = allocate_device_properties() + ext = zes_device_ext_properties_t() + ext.stype = ZES_STRUCTURE_TYPE_DEVICE_EXT_PROPERTIES + ext.pNext = None + base.pNext = cast(pointer(ext), c_void_p) + return base, ext + + +def handle_val(h): + """Return integer value of a handle that may be ctypes or plain int.""" + return h.value if hasattr(h, "value") else h + + +def check_rc(label, rc): + if rc != ZE_RESULT_SUCCESS: + print(f"ERROR: {label} failed with ze_result_t={rc}") + sys.exit(1) + + +# Shared initialization function - NO CODE DUPLICATION! +def initialize_sysman_and_get_devices(): + """Initialize Sysman and enumerate drivers/devices. Returns (hDriver, devices, dev_count).""" + rc = pyzes.zesInit(0) + check_rc("zesInit", rc) + + # -- Driver enumeration + driver_count = c_uint32(0) + rc = pyzes.zesDriverGet(byref(driver_count), None) + check_rc("zesDriverGet(count)", rc) + if driver_count.value == 0: + print("No drivers found") + return None, None, 0 + + DriverArray = pyzes.zes_driver_handle_t * driver_count.value + drivers = DriverArray() + rc = pyzes.zesDriverGet(byref(driver_count), drivers) + check_rc("zesDriverGet(handles)", rc) + + hDriver = drivers[0] + # -- Device enumeration + dev_count = c_uint32(0) + rc = pyzes.zesDeviceGet(hDriver, byref(dev_count), None) + check_rc("zesDeviceGet(count)", rc) + if not hasattr(dev_count, "value"): + print(f"WARNING: dev_count is plain int type={type(dev_count)}") + if dev_count.value == 0: + print("No devices found") + return hDriver, None, 0 + + DeviceArray = pyzes.zes_device_handle_t * dev_count.value + devices = DeviceArray() + rc = pyzes.zesDeviceGet(hDriver, byref(dev_count), devices) + check_rc("zesDeviceGet(handles)", rc) + if dev_count.value and not hasattr(devices[0], "value"): + print(f"WARNING: Device handles are plain ints (type={type(devices[0])})") + + return hDriver, devices, dev_count.value + + +def enumerate_devices(hDriver, devices, dev_count): + """Enumerate devices and show their properties.""" + if not devices or dev_count == 0: + return + + print(f"Found {dev_count} device(s)\n") + + # -- Query properties for each device -- + for i in range(dev_count): + # test for extension chaining + props, ext = chain_device_properties_with_ext() + rc = pyzes.zesDeviceGetProperties(devices[i], byref(props)) + check_rc(f"zesDeviceGetProperties(device {i})", rc) + print(f"Extension properties flags: {ext.flags}") + formatted = format_device_properties(props) + print(f"Device {i}:") + for k, v in formatted.items(): + print(f" {k}: {v}") + + # Try experimental mapping API + try: + core_uuid_bytes = bytes(props.core.uuid.id) + rc2, mapped_handle, on_sub, sub_id = zes_driver_get_device_by_uuid( + hDriver, core_uuid_bytes + ) + if rc2 == 0: + same = handle_val(mapped_handle) == handle_val(devices[i]) + print( + f" zesDriverGetDeviceByUuidExp: SUCCESS same_handle={same} on_subdevice={on_sub} subdevice_id={sub_id}" + ) + else: + print(f" zesDriverGetDeviceByUuidExp: rc={rc2}") + except NotImplementedError: + print(" zesDriverGetDeviceByUuidExp not supported") + print() + + +def demonstrate_memory_state(hDriver, devices, dev_count): + print("=== Memory State ===") + if not devices or dev_count == 0: + return + + # For each device, enumerate memory modules and query their state + for dev_idx in range(dev_count): + print(f"\nDevice {dev_idx} Memory Modules:") + hDevice = devices[dev_idx] + + # Step 1: Enumerate memory modules + mem_count = c_uint32(0) + rc = pyzes.zesDeviceEnumMemoryModules(hDevice, byref(mem_count), None) + if rc != ZE_RESULT_SUCCESS: + print(f" Error enumerating memory modules: Return code {rc}") + continue + + if mem_count.value == 0: + print(" No memory modules found") + continue + + # Allocate array for memory handles + MemoryArray = pyzes.zes_mem_handle_t * mem_count.value + memories = MemoryArray() + rc = pyzes.zesDeviceEnumMemoryModules(hDevice, byref(mem_count), memories) + check_rc(f"zesDeviceEnumMemoryModules(device {dev_idx})", rc) + + print(f" Found {mem_count.value} memory module(s)") + + # Step 2: For each memory module, get properties and state + for mem_idx in range(mem_count.value): + hMemory = memories[mem_idx] + print(f"\\n Memory Module {mem_idx}:") + + try: + # Get memory properties + mem_props = zes_mem_properties_t() + mem_props.stype = ZES_STRUCTURE_TYPE_MEM_PROPERTIES + mem_props.pNext = None + retVal = pyzes.zesMemoryGetProperties(hMemory, byref(mem_props)) + if retVal != ZE_RESULT_SUCCESS: + print(f" Error getting memory properties: Return code {retVal}") + continue + print(f" Properties: {mem_props}") + + # Decode memory type + mem_type_names = { + ZES_MEM_TYPE_HBM: "HBM", + ZES_MEM_TYPE_DDR: "DDR", + ZES_MEM_TYPE_DDR4: "DDR4", + ZES_MEM_TYPE_DDR5: "DDR5", + ZES_MEM_TYPE_GDDR6: "GDDR6", + ZES_MEM_TYPE_GDDR6X: "GDDR6X", + ZES_MEM_TYPE_GDDR7: "GDDR7", + } + type_name = mem_type_names.get( + mem_props.type, f"Unknown({mem_props.type})" + ) + + location_names = { + ZES_MEM_LOC_SYSTEM: "System", + ZES_MEM_LOC_DEVICE: "Device", + } + location_name = location_names.get( + mem_props.location, f"Unknown({mem_props.location})" + ) + + print(f" Type: {type_name}") + print(f" Location: {location_name}") + print( + f" Physical Size: {mem_props.physicalSize:,} bytes ({mem_props.physicalSize / (1024**3):.2f} GB)" + ) + if mem_props.busWidth != -1: + print(f" Bus Width: {mem_props.busWidth} bits") + if mem_props.numChannels != -1: + print(f" Channels: {mem_props.numChannels}") + + except Exception as e: + print(f" Error getting memory properties: {e}") + continue + + try: + # Get memory state - this is the main function we're demonstrating + mem_state = zes_mem_state_t() + mem_state.stype = ZES_STRUCTURE_TYPE_MEM_STATE + mem_state.pNext = None + retVal = pyzes.zesMemoryGetState(hMemory, byref(mem_state)) + if retVal != ZE_RESULT_SUCCESS: + print(f" Error getting memory state: Return code {retVal}") + continue + print(f" State: {mem_state}") + + # Decode health status + health_names = { + ZES_MEM_HEALTH_UNKNOWN: "Unknown", + ZES_MEM_HEALTH_OK: "OK", + ZES_MEM_HEALTH_DEGRADED: "Degraded", + ZES_MEM_HEALTH_CRITICAL: "Critical", + ZES_MEM_HEALTH_REPLACE: "Replace", + } + health_name = health_names.get( + mem_state.health, f"Unknown({mem_state.health})" + ) + + print(f" Health: {health_name}") + print( + f" Total Size: {mem_state.size:,} bytes ({mem_state.size / (1024**3):.2f} GB)" + ) + print( + f" Free: {mem_state.free:,} bytes ({mem_state.free / (1024**3):.2f} GB)" + ) + + if mem_state.size > 0: + used = mem_state.size - mem_state.free + used_percent = (used / mem_state.size) * 100 + free_percent = (mem_state.free / mem_state.size) * 100 + print( + f" Used: {used:,} bytes ({used / (1024**3):.2f} GB, {used_percent:.1f}%)" + ) + print(f" Free: {free_percent:.1f}%") + + except Exception as e: + print(f" Error getting memory state: {e}") + continue + + try: + # Optionally get memory bandwidth if supported + mem_bandwidth = zes_mem_bandwidth_t() + retVal = pyzes.zesMemoryGetBandwidth(hMemory, byref(mem_bandwidth)) + if retVal != ZE_RESULT_SUCCESS: + print(f" Error getting memory bandwidth: Return code {retVal}") + continue + print(f" Bandwidth: {mem_bandwidth}") + print( + f" Max Bandwidth: {mem_bandwidth.maxBandwidth:,} bytes/sec ({mem_bandwidth.maxBandwidth / (1024**3):.2f} GB/s)" + ) + print(f" Read Counter: {mem_bandwidth.readCounter:,} bytes") + print(f" Write Counter: {mem_bandwidth.writeCounter:,} bytes") + + except Exception as e: + # Bandwidth might not be supported on all systems + print(f" Bandwidth info not available: {e}") + + +def demonstrate_memory_state_advanced(hDriver, devices, dev_count): + """Advanced example showing manual struct initialization and error handling.""" + print("\n=== Advanced Memory State Example ===") + + if not devices or dev_count == 0: + return + + mem_count = c_uint32(0) + pyzes.zesDeviceEnumMemoryModules(devices[0], byref(mem_count), None) + if mem_count.value == 0: + print("No memory modules found") + return + + MemoryArray = pyzes.zes_mem_handle_t * mem_count.value + memories = MemoryArray() + pyzes.zesDeviceEnumMemoryModules(devices[0], byref(mem_count), memories) + + print("\nManual struct initialization example:") + mem_state = zes_mem_state_t() + mem_state.stype = ZES_STRUCTURE_TYPE_MEM_STATE + mem_state.pNext = None + + try: + retVal = pyzes.zesMemoryGetState(memories[0], byref(mem_state)) + check_rc("zesMemoryGetState", retVal) + print(f"Pre-initialized struct result: {mem_state}") + print(f"Memory health: {mem_state.health}") + print(f"Free memory: {mem_state.free} bytes") + print(f"Total memory: {mem_state.size} bytes") + except Exception as e: + print(f"Error: {e}") + + +def demonstrate_power_energy(hDriver, devices, dev_count): + """Demonstrate power energy counter functionality.""" + print("\n=== Power Energy Counter ===") + if not devices or dev_count == 0: + return + + # For each device, enumerate power domains and query their energy counters + for dev_idx in range(dev_count): + print(f"\nDevice {dev_idx} Power Domains:") + hDevice = devices[dev_idx] + + # Step 1: Enumerate power domains + power_count = c_uint32(0) + rc = pyzes.zesDeviceEnumPowerDomains(hDevice, byref(power_count), None) + if rc != ZE_RESULT_SUCCESS: + print(f" Error enumerating power domains: {rc}") + continue + + if power_count.value == 0: + print(" No power domains found") + continue + + # Allocate array for power handles + PowerArray = pyzes.zes_pwr_handle_t * power_count.value + power_handles = PowerArray() + rc = pyzes.zesDeviceEnumPowerDomains(hDevice, byref(power_count), power_handles) + check_rc(f"zesDeviceEnumPowerDomains(device {dev_idx})", rc) + + print(f" Found {power_count.value} power domain(s)") + + # Step 2: For each power domain, get energy counter + for pwr_idx in range(power_count.value): + hPower = power_handles[pwr_idx] + print(f"\n Power Domain {pwr_idx}:") + + try: + # Get power energy counter + energy_counter = zes_power_energy_counter_t() + retVal = pyzes.zesPowerGetEnergyCounter(hPower, byref(energy_counter)) + if retVal != ZE_RESULT_SUCCESS: + print(f" Error getting energy counter: {retVal}") + continue + + print(f" Energy Counter: {energy_counter}") + print( + f" Energy: {energy_counter.energy} microjoules ({energy_counter.energy / 1000000.0:.6f} joules)" + ) + print(f" Timestamp: {energy_counter.timestamp} microseconds") + + # Convert timestamp to seconds for readability + timestamp_seconds = energy_counter.timestamp / 1000000.0 + print(f" Timestamp: {timestamp_seconds:.6f} seconds") + + # Try to get a second reading to show energy consumption + import time + + print(" Waiting 1 second for energy measurement...") + time.sleep(1) + + energy_counter2 = zes_power_energy_counter_t() + retVal = pyzes.zesPowerGetEnergyCounter(hPower, byref(energy_counter2)) + if retVal == ZE_RESULT_SUCCESS: + energy_diff = energy_counter2.energy - energy_counter.energy + time_diff = ( + energy_counter2.timestamp - energy_counter.timestamp + ) / 1000000.0 + + print(f" Second reading:") + print( + f" Energy: {energy_counter2.energy} microjoules ({energy_counter2.energy / 1000000.0:.6f} joules)" + ) + print( + f" Energy consumed: {energy_diff} microjoules ({energy_diff / 1000000.0:.6f} joules)" + ) + if time_diff > 0: + avg_power = (energy_diff / 1000000.0) / time_diff # Watts + print( + f" Average power: {avg_power:.6f} watts over {time_diff:.6f} seconds" + ) + + except Exception as e: + print(f" Error getting power energy counter: {e}") + continue + + +def demonstrate_frequency_state(hDriver, devices, dev_count): + """Demonstrate frequency domain enumeration and state functionality.""" + print("\n=== Frequency State ===") + if not devices or dev_count == 0: + return + + # For each device, enumerate frequency domains and query their state + for dev_idx in range(dev_count): + print(f"\nDevice {dev_idx} Frequency Domains:") + hDevice = devices[dev_idx] + + # Step 1: Enumerate frequency domains + freq_count = c_uint32(0) + rc = pyzes.zesDeviceEnumFrequencyDomains(hDevice, byref(freq_count), None) + if rc != ZE_RESULT_SUCCESS: + print(f" Error enumerating frequency domains: {rc}") + continue + + if freq_count.value == 0: + print(" No frequency domains found") + continue + + # Allocate array for frequency handles + FrequencyArray = pyzes.zes_freq_handle_t * freq_count.value + freq_handles = FrequencyArray() + rc = pyzes.zesDeviceEnumFrequencyDomains( + hDevice, byref(freq_count), freq_handles + ) + check_rc(f"zesDeviceEnumFrequencyDomains(device {dev_idx})", rc) + + print(f" Found {freq_count.value} frequency domain(s)") + + # Step 2: For each frequency domain, get state + for freq_idx in range(freq_count.value): + hFrequency = freq_handles[freq_idx] + print(f"\n Frequency Domain {freq_idx}:") + + try: + # Get frequency state - this is the main function we're demonstrating + freq_state = zes_freq_state_t() + freq_state.stype = ZES_STRUCTURE_TYPE_FREQ_STATE + freq_state.pNext = None + retVal = pyzes.zesFrequencyGetState(hFrequency, byref(freq_state)) + if retVal != ZE_RESULT_SUCCESS: + print(f" Error getting frequency state: {retVal}") + continue + print(f" State: {freq_state}") + + # Display frequency state details + print( + f" Current Voltage: {freq_state.currentVoltage:.3f} V" + if freq_state.currentVoltage >= 0 + else " Current Voltage: Unknown" + ) + print( + f" Requested Frequency: {freq_state.request:.1f} MHz" + if freq_state.request >= 0 + else " Requested Frequency: Unknown" + ) + print( + f" TDP Frequency: {freq_state.tdp:.1f} MHz" + if freq_state.tdp >= 0 + else " TDP Frequency: Unknown" + ) + print( + f" Efficient Frequency: {freq_state.efficient:.1f} MHz" + if freq_state.efficient >= 0 + else " Efficient Frequency: Unknown" + ) + print( + f" Actual Frequency: {freq_state.actual:.1f} MHz" + if freq_state.actual >= 0 + else " Actual Frequency: Unknown" + ) + + # Decode throttle reasons + if freq_state.throttleReasons != 0: + throttle_reasons = [] + if ( + freq_state.throttleReasons + & ZES_FREQ_THROTTLE_REASON_FLAG_AVE_PWR_CAP + ): + throttle_reasons.append("Average Power Cap (PL1)") + if ( + freq_state.throttleReasons + & ZES_FREQ_THROTTLE_REASON_FLAG_BURST_PWR_CAP + ): + throttle_reasons.append("Burst Power Cap (PL2)") + if ( + freq_state.throttleReasons + & ZES_FREQ_THROTTLE_REASON_FLAG_CURRENT_LIMIT + ): + throttle_reasons.append("Current Limit (PL4)") + if ( + freq_state.throttleReasons + & ZES_FREQ_THROTTLE_REASON_FLAG_THERMAL_LIMIT + ): + throttle_reasons.append("Thermal Limit") + if ( + freq_state.throttleReasons + & ZES_FREQ_THROTTLE_REASON_FLAG_PSU_ALERT + ): + throttle_reasons.append("PSU Alert") + if ( + freq_state.throttleReasons + & ZES_FREQ_THROTTLE_REASON_FLAG_SW_RANGE + ): + throttle_reasons.append("Software Range") + if ( + freq_state.throttleReasons + & ZES_FREQ_THROTTLE_REASON_FLAG_HW_RANGE + ): + throttle_reasons.append("Hardware Range") + + print(f" Throttle Reasons: {', '.join(throttle_reasons)}") + else: + print(" Throttle Reasons: None (not throttled)") + + except Exception as e: + print(f" Error getting frequency state: {e}") + continue + + +def demonstrate_temperature_state(hDriver, devices, dev_count): + """Demonstrate temperature sensor enumeration and state functionality.""" + print("\n=== Temperature State ===") + if not devices or dev_count == 0: + return + + # For each device, enumerate temperature sensors and query their state + for dev_idx in range(dev_count): + print(f"\nDevice {dev_idx} Temperature Sensors:") + hDevice = devices[dev_idx] + + # Step 1: Enumerate temperature sensors + temp_count = c_uint32(0) + rc = pyzes.zesDeviceEnumTemperatureSensors(hDevice, byref(temp_count), None) + if rc != ZE_RESULT_SUCCESS: + print(f" Error enumerating temperature sensors: {rc}") + continue + + if temp_count.value == 0: + print(" No temperature sensors found") + continue + + # Allocate array for temperature handles + TemperatureArray = pyzes.zes_temp_handle_t * temp_count.value + temp_handles = TemperatureArray() + rc = pyzes.zesDeviceEnumTemperatureSensors( + hDevice, byref(temp_count), temp_handles + ) + check_rc(f"zesDeviceEnumTemperatureSensors(device {dev_idx})", rc) + + print(f" Found {temp_count.value} temperature sensor(s)") + + # Step 2: For each temperature sensor, get properties and state + for temp_idx in range(temp_count.value): + hTemperature = temp_handles[temp_idx] + print(f"\n Temperature Sensor {temp_idx}:") + + try: + # Get temperature properties first + temp_props = zes_temp_properties_t() + temp_props.stype = ZES_STRUCTURE_TYPE_TEMP_PROPERTIES + temp_props.pNext = None + retVal = pyzes.zesTemperatureGetProperties( + hTemperature, byref(temp_props) + ) + if retVal != ZE_RESULT_SUCCESS: + print(f" Error getting temperature properties: {retVal}") + continue + print(f" Properties: {temp_props}") + + # Decode sensor type + sensor_type_names = { + ZES_TEMP_SENSORS_GLOBAL: "Global", + ZES_TEMP_SENSORS_GPU: "GPU Core", + ZES_TEMP_SENSORS_MEMORY: "Memory", + ZES_TEMP_SENSORS_GLOBAL_MIN: "Global Minimum", + ZES_TEMP_SENSORS_GPU_MIN: "GPU Minimum", + ZES_TEMP_SENSORS_MEMORY_MIN: "Memory Minimum", + } + type_name = sensor_type_names.get( + temp_props.type, f"Unknown({temp_props.type})" + ) + + print(f" Type: {type_name}") + print(f" On Subdevice: {bool(temp_props.onSubdevice)}") + if temp_props.onSubdevice: + print(f" Subdevice ID: {temp_props.subdeviceId}") + print( + f" Max Temperature: {temp_props.maxTemperature:.1f} °C" + if temp_props.maxTemperature >= 0 + else " Max Temperature: Unknown" + ) + print( + f" Critical Temp Supported: {bool(temp_props.isCriticalTempSupported)}" + ) + print( + f" Threshold 1 Supported: {bool(temp_props.isThreshold1Supported)}" + ) + print( + f" Threshold 2 Supported: {bool(temp_props.isThreshold2Supported)}" + ) + + except Exception as e: + print(f" Error getting temperature properties: {e}") + continue + + try: + # Get temperature state - this is the main function we're demonstrating + temperature = c_double(0.0) + retVal = pyzes.zesTemperatureGetState(hTemperature, byref(temperature)) + if retVal != ZE_RESULT_SUCCESS: + print(f" Error getting temperature state: {retVal}") + continue + + print(f" Current Temperature: {temperature.value:.1f} °C") + + # Provide some context for the temperature reading + if temperature.value > 0: + if temperature.value > 85: + temp_status = "Very Hot (consider cooling)" + elif temperature.value > 70: + temp_status = "Hot (normal under load)" + elif temperature.value > 50: + temp_status = "Warm (normal operation)" + elif temperature.value > 30: + temp_status = "Cool (idle/light load)" + else: + temp_status = "Cold (possibly just started)" + print(f" Temperature Status: {temp_status}") + + except Exception as e: + print(f" Error getting temperature state: {e}") + continue + + try: + # Optionally get temperature configuration if supported + temp_config = zes_temp_config_t() + temp_config.stype = ZES_STRUCTURE_TYPE_TEMP_CONFIG + temp_config.pNext = None + retVal = pyzes.zesTemperatureGetConfig(hTemperature, byref(temp_config)) + if retVal == ZE_RESULT_SUCCESS: + print(f" Configuration: {temp_config}") + print( + f" Critical Events Enabled: {bool(temp_config.enableCritical)}" + ) + print( + f" Threshold 1: {temp_config.threshold1:.1f} °C" + if temp_config.threshold1 >= 0 + else " Threshold 1: Not set" + ) + print( + f" Threshold 2: {temp_config.threshold2:.1f} °C" + if temp_config.threshold2 >= 0 + else " Threshold 2: Not set" + ) + else: + print(f" Configuration: Not available (rc={retVal})") + + except Exception as e: + # Config might not be supported on all systems + print(f" Configuration info not available: {e}") + + +if __name__ == "__main__": + try: + # Initialize Sysman and get devices only once + hDriver, devices, dev_count = initialize_sysman_and_get_devices() + if not devices or dev_count == 0: + print("No devices available for testing") + sys.exit(0) + + # Pass the initialized data to all functions + enumerate_devices(hDriver, devices, dev_count) + demonstrate_memory_state(hDriver, devices, dev_count) + demonstrate_memory_state_advanced(hDriver, devices, dev_count) + demonstrate_power_energy(hDriver, devices, dev_count) + demonstrate_frequency_state(hDriver, devices, dev_count) + demonstrate_temperature_state(hDriver, devices, dev_count) + except Exception as e: + print(f"Unhandled exception: {e}") + sys.exit(1) diff --git a/bindings/sysman/python/source/pyzes.py b/bindings/sysman/python/source/pyzes.py new file mode 100644 index 00000000..b87050c2 --- /dev/null +++ b/bindings/sysman/python/source/pyzes.py @@ -0,0 +1,1050 @@ +## +# Copyright (C) 2025 Intel Corporation +# +# SPDX-License-Identifier: MIT +# +## + +## +# Python bindings for the libze_intel_gpu.so / ze_intel_gpu.dll library +## + +import sys +import threading +from ctypes import * +from typing import Any, Dict + +libLoadLock = threading.Lock() +gpuLib = None + + +class _PrintableStructure(Structure): + """ + Abstract class that produces nicer __str__ output than ctypes.Structure. + e.g. instead of: + >>> print str(obj) + + this class will print + class_name(field_name: formatted_value, field_name: formatted_value) + + _fmt_ dictionary of -> + e.g. class that has _field_ 'hex_value', c_uint could be formatted with + _fmt_ = {"hex_value" : "%08X"} + to produce nicer output. + Default fomratting string for all fields can be set with key "" like: + _fmt_ = {"" : "%d MHz"} # e.g all values are numbers in MHz. + If not set it's assumed to be just "%s" + + Exact format of returned str from this class is subject to change in the future. + """ + + _fmt_: Dict[str, str] = {} + + def __str__(self): + result = [] + for x in self._fields_: + key = x[0] + value = getattr(self, key) + fmt = "%s" + if key in self._fmt_: + fmt = self._fmt_[key] + elif "" in self._fmt_: + fmt = self._fmt_[""] + result.append(("%s: " + fmt) % (key, value)) + return self.__class__.__name__ + "(" + ", ".join(result) + ")" + + +def _LoadZeLibrary(): + global gpuLib + if gpuLib is not None: + return + + libLoadLock.acquire() + try: + # ensure library was loaded + if gpuLib is not None: + return + # load the library + libName = "ze_loader" + if sys.platform.startswith("linux"): + print("Loading Linux library") + libName = "/usr/lib/x86_64-linux-gnu/lib" + libName + ".so.1" + else: + print("Loading Windows library") + # Try multiple common locations for Windows Intel GPU drivers + import os + + possible_paths = [ + # Try system PATH first + libName + "64.dll", + # Common Intel GPU driver locations + r"C:\Windows\System32\DriverStore\FileRepository\iigd_dch.inf_amd64_*\ze_intel_gpu64.dll", + r"C:\Windows\System32\DriverStore\FileRepository\igdlh64.inf_amd64_*\ze_intel_gpu64.dll", + # Try current directory + "ze_intel_gpu64.dll", + ] + + library_loaded = False + for path in possible_paths: + try: + if "*" in path: + # Handle wildcard paths for driver store + import glob + + matching_paths = glob.glob(path) + for match_path in matching_paths: + try: + print(f"Trying: {match_path}") + gpuLib = CDLL(match_path) + library_loaded = True + print(f"Successfully loaded: {match_path}") + break + except Exception as e: + print(f"Failed to load {match_path}: {e}") + continue + else: + if os.path.exists(path): + print(f"Trying: {path}") + gpuLib = CDLL(path) + library_loaded = True + print(f"Successfully loaded: {path}") + break + else: + print(f"Trying: {path}") + gpuLib = CDLL(path) + library_loaded = True + print(f"Successfully loaded: {path}") + break + except Exception as e: + print(f"Failed to load {path}: {e}") + continue + + if library_loaded: + break + + if not library_loaded: + raise Exception( + f"Failed to load Intel GPU library. Tried paths: {possible_paths}" + ) + + if sys.platform.startswith("linux"): + gpuLib = CDLL(libName) + + if gpuLib is None: + raise Exception("Failed to load library: " + str(libName)) + finally: + # lock is always freed + libLoadLock.release() + + +_LoadZeLibrary() + + +class zes_driver_handle_t(c_void_p): + pass + + +class zes_device_handle_t(c_void_p): + pass + + +class zes_mem_handle_t(c_void_p): + pass + + +class zes_pwr_handle_t(c_void_p): + pass + + +class zes_freq_handle_t(c_void_p): + pass + + +class zes_temp_handle_t(c_void_p): + pass + + +class zes_engine_handle_t(c_void_p): + pass + + +## + +ze_bool_t = c_uint32 +ze_device_property_flags_t = c_uint32 +zes_engine_type_flags_t = c_uint32 +zes_device_property_flags_t = c_uint32 + +ze_device_type_t = c_int32 +ZE_DEVICE_TYPE_GPU = 1 +ZE_DEVICE_TYPE_CPU = 2 +ZE_DEVICE_TYPE_FPGA = 3 +ZE_DEVICE_TYPE_MCA = 4 +ZE_DEVICE_TYPE_VPU = 5 +ZE_DEVICE_TYPE_FORCE_UINT32 = 0x7FFFFFFF + +# Sysman device type enumeration (same values as ze_device_type_t) +zes_device_type_t = c_int32 +ZES_DEVICE_TYPE_GPU = 1 +ZES_DEVICE_TYPE_CPU = 2 +ZES_DEVICE_TYPE_FPGA = 3 +ZES_DEVICE_TYPE_MCA = 4 +ZES_DEVICE_TYPE_VPU = 5 +ZES_DEVICE_TYPE_FORCE_UINT32 = 0x7FFFFFFF + +# Memory type enumeration +zes_mem_type_t = c_int32 +ZES_MEM_TYPE_HBM = 0 +ZES_MEM_TYPE_DDR = 1 +ZES_MEM_TYPE_DDR3 = 2 +ZES_MEM_TYPE_DDR4 = 3 +ZES_MEM_TYPE_DDR5 = 4 +ZES_MEM_TYPE_LPDDR = 5 +ZES_MEM_TYPE_LPDDR3 = 6 +ZES_MEM_TYPE_LPDDR4 = 7 +ZES_MEM_TYPE_LPDDR5 = 8 +ZES_MEM_TYPE_SRAM = 9 +ZES_MEM_TYPE_L1 = 10 +ZES_MEM_TYPE_L3 = 11 +ZES_MEM_TYPE_GRF = 12 +ZES_MEM_TYPE_SLM = 13 +ZES_MEM_TYPE_GDDR4 = 14 +ZES_MEM_TYPE_GDDR5 = 15 +ZES_MEM_TYPE_GDDR5X = 16 +ZES_MEM_TYPE_GDDR6 = 17 +ZES_MEM_TYPE_GDDR6X = 18 +ZES_MEM_TYPE_GDDR7 = 19 +ZES_MEM_TYPE_FORCE_UINT32 = 0x7FFFFFFF + +# Memory location enumeration +zes_mem_loc_t = c_int32 +ZES_MEM_LOC_SYSTEM = 0 +ZES_MEM_LOC_DEVICE = 1 +ZES_MEM_LOC_FORCE_UINT32 = 0x7FFFFFFF + +# Memory health enumeration +zes_mem_health_t = c_int32 +ZES_MEM_HEALTH_UNKNOWN = 0 +ZES_MEM_HEALTH_OK = 1 +ZES_MEM_HEALTH_DEGRADED = 2 +ZES_MEM_HEALTH_CRITICAL = 3 +ZES_MEM_HEALTH_REPLACE = 4 +ZES_MEM_HEALTH_FORCE_UINT32 = 0x7FFFFFFF + +# Power domain enumeration +zes_power_domain_t = c_int32 +ZES_POWER_DOMAIN_UNKNOWN = 0 +ZES_POWER_DOMAIN_CARD = 1 +ZES_POWER_DOMAIN_PACKAGE = 2 +ZES_POWER_DOMAIN_STACK = 3 +ZES_POWER_DOMAIN_MEMORY = 4 +ZES_POWER_DOMAIN_GPU = 5 +ZES_POWER_DOMAIN_FORCE_UINT32 = 0x7FFFFFFF + +## Frequency domain enums ## +zes_freq_domain_t = c_int32 +ZES_FREQ_DOMAIN_GPU = 0 +ZES_FREQ_DOMAIN_MEMORY = 1 +ZES_FREQ_DOMAIN_MEDIA = 2 +ZES_FREQ_DOMAIN_FORCE_UINT32 = 0x7FFFFFFF + +## Frequency throttle reason flags ## +zes_freq_throttle_reason_flags_t = c_uint32 +ZES_FREQ_THROTTLE_REASON_FLAG_AVE_PWR_CAP = 1 << 0 +ZES_FREQ_THROTTLE_REASON_FLAG_BURST_PWR_CAP = 1 << 1 +ZES_FREQ_THROTTLE_REASON_FLAG_CURRENT_LIMIT = 1 << 2 +ZES_FREQ_THROTTLE_REASON_FLAG_THERMAL_LIMIT = 1 << 3 +ZES_FREQ_THROTTLE_REASON_FLAG_PSU_ALERT = 1 << 4 +ZES_FREQ_THROTTLE_REASON_FLAG_SW_RANGE = 1 << 5 +ZES_FREQ_THROTTLE_REASON_FLAG_HW_RANGE = 1 << 6 +ZES_FREQ_THROTTLE_REASON_FLAG_FORCE_UINT32 = 0x7FFFFFFF + +## Temperature sensor enums ## +zes_temp_sensors_t = c_int32 +ZES_TEMP_SENSORS_GLOBAL = 0 +ZES_TEMP_SENSORS_GPU = 1 +ZES_TEMP_SENSORS_MEMORY = 2 +ZES_TEMP_SENSORS_GLOBAL_MIN = 3 +ZES_TEMP_SENSORS_GPU_MIN = 4 +ZES_TEMP_SENSORS_MEMORY_MIN = 5 +ZES_TEMP_SENSORS_FORCE_UINT32 = 0x7FFFFFFF + +## Engine type enums ## +zes_engine_group_t = c_uint32 +ZES_ENGINE_GROUP_ALL = 0 +ZES_ENGINE_GROUP_COMPUTE_ALL = 1 +ZES_ENGINE_GROUP_MEDIA_ALL = 2 +ZES_ENGINE_GROUP_COPY_ALL = 3 +ZES_ENGINE_GROUP_COMPUTE_SINGLE = 4 +ZES_ENGINE_GROUP_RENDER_SINGLE = 5 +ZES_ENGINE_GROUP_MEDIA_DECODE_SINGLE = 6 +ZES_ENGINE_GROUP_MEDIA_ENCODE_SINGLE = 7 +ZES_ENGINE_GROUP_COPY_SINGLE = 8 +ZES_ENGINE_GROUP_MEDIA_ENHANCEMENT_SINGLE = 9 +ZES_ENGINE_GROUP_3D_SINGLE = 10 +ZES_ENGINE_GROUP_3D_RENDER_COMPUTE_ALL = 11 +ZES_ENGINE_GROUP_RENDER_ALL = 12 +ZES_ENGINE_GROUP_3D_ALL = 13 +ZES_ENGINE_GROUP_MEDIA_CODEC_SINGLE = 14 +ZES_ENGINE_GROUP_FORCE_UINT32 = 0x7FFFFFFF + +ze_result_t = c_int32 +ZE_RESULT_SUCCESS = 0 +ZE_RESULT_NOT_READY = 1 +ZE_RESULT_ERROR_DEVICE_LOST = 0x70000001 +ZE_RESULT_ERROR_OUT_OF_HOST_MEMORY = 0x70000002 +ZE_RESULT_ERROR_OUT_OF_DEVICE_MEMORY = 0x70000003 +ZE_RESULT_ERROR_MODULE_BUILD_FAILURE = 0x70000004 +ZE_RESULT_ERROR_MODULE_LINK_FAILURE = 0x70000005 +ZE_RESULT_ERROR_DEVICE_REQUIRES_RESET = 0x70000006 +ZE_RESULT_ERROR_DEVICE_IN_LOW_POWER_STATE = 0x70000007 +ZE_RESULT_EXP_ERROR_DEVICE_IS_NOT_VERTEX = 0x7FF00001 +ZE_RESULT_EXP_ERROR_VERTEX_IS_NOT_DEVICE = 0x7FF00002 +ZE_RESULT_EXP_ERROR_REMOTE_DEVICE = 0x7FF00003 +ZE_RESULT_EXP_ERROR_OPERANDS_INCOMPATIBLE = 0x7FF00004 +ZE_RESULT_EXP_RTAS_BUILD_RETRY = 0x7FF00005 +ZE_RESULT_EXP_RTAS_BUILD_DEFERRED = 0x7FF00006 +ZE_RESULT_ERROR_INSUFFICIENT_PERMISSIONS = 0x70010000 +ZE_RESULT_ERROR_NOT_AVAILABLE = 0x70010001 +ZE_RESULT_ERROR_DEPENDENCY_UNAVAILABLE = 0x70020000 +ZE_RESULT_WARNING_DROPPED_DATA = 0x70020001 +ZE_RESULT_ERROR_UNINITIALIZED = 0x78000001 +ZE_RESULT_ERROR_UNSUPPORTED_VERSION = 0x78000002 +ZE_RESULT_ERROR_UNSUPPORTED_FEATURE = 0x78000003 +ZE_RESULT_ERROR_INVALID_ARGUMENT = 0x78000004 +ZE_RESULT_ERROR_INVALID_NULL_HANDLE = 0x78000005 +ZE_RESULT_ERROR_HANDLE_OBJECT_IN_USE = 0x78000006 +ZE_RESULT_ERROR_INVALID_NULL_POINTER = 0x78000007 +ZE_RESULT_ERROR_INVALID_SIZE = 0x78000008 +ZE_RESULT_ERROR_UNSUPPORTED_SIZE = 0x78000009 +ZE_RESULT_ERROR_UNSUPPORTED_ALIGNMENT = 0x7800000A +ZE_RESULT_ERROR_INVALID_SYNCHRONIZATION_OBJECT = 0x7800000B +ZE_RESULT_ERROR_INVALID_ENUMERATION = 0x7800000C +ZE_RESULT_ERROR_UNSUPPORTED_ENUMERATION = 0x7800000D +ZE_RESULT_ERROR_UNSUPPORTED_IMAGE_FORMAT = 0x7800000E +ZE_RESULT_ERROR_INVALID_NATIVE_BINARY = 0x7800000F +ZE_RESULT_ERROR_INVALID_GLOBAL_NAME = 0x78000010 +ZE_RESULT_ERROR_INVALID_KERNEL_NAME = 0x78000011 +ZE_RESULT_ERROR_INVALID_FUNCTION_NAME = 0x78000012 +ZE_RESULT_ERROR_INVALID_GROUP_SIZE_DIMENSION = 0x78000013 +ZE_RESULT_ERROR_INVALID_GLOBAL_WIDTH_DIMENSION = 0x78000014 +ZE_RESULT_ERROR_INVALID_KERNEL_ARGUMENT_INDEX = 0x78000015 +ZE_RESULT_ERROR_INVALID_KERNEL_ARGUMENT_SIZE = 0x78000016 +ZE_RESULT_ERROR_INVALID_KERNEL_ATTRIBUTE_VALUE = 0x78000017 +ZE_RESULT_ERROR_INVALID_MODULE_UNLINKED = 0x78000018 +ZE_RESULT_ERROR_INVALID_COMMAND_LIST_TYPE = 0x78000019 +ZE_RESULT_ERROR_OVERLAPPING_REGIONS = 0x7800001A +ZE_RESULT_WARNING_ACTION_REQUIRED = 0x7800001B +ZE_RESULT_ERROR_INVALID_KERNEL_HANDLE = 0x7800001C +ZE_RESULT_EXT_RTAS_BUILD_RETRY = 0x7800001D +ZE_RESULT_EXT_RTAS_BUILD_DEFERRED = 0x7800001E +ZE_RESULT_EXT_ERROR_OPERANDS_INCOMPATIBLE = 0x7800001F +ZE_RESULT_ERROR_SURVIVABILITY_MODE_DETECTED = 0x78000020 +ZE_RESULT_ERROR_UNKNOWN = 0x7FFFFFFE +ZE_RESULT_FORCE_UINT32 = 0x7FFFFFFF + +## Constants (subset) ## +ZE_MAX_DEVICE_UUID_SIZE = 16 +ZES_STRING_PROPERTY_SIZE = 64 # from zes_api.h +ZE_MAX_DEVICE_NAME = 256 # from ze_api.h +ZES_MAX_UUID_SIZE = 16 # from zes_api.h (uuid size for zes_uuid_t) + +# Structure type enum values +ZES_STRUCTURE_TYPE_DEVICE_PROPERTIES = 0x1 +ZES_STRUCTURE_TYPE_DEVICE_EXT_PROPERTIES = 0x2D # from zes_structure_type_t +ZES_STRUCTURE_TYPE_SUBDEVICE_EXP_PROPERTIES = 0x2E # Experimental subdevice properties +ZES_STRUCTURE_TYPE_MEM_PROPERTIES = 0xB +ZES_STRUCTURE_TYPE_MEM_STATE = 0x1E +ZES_STRUCTURE_TYPE_FREQ_PROPERTIES = 0x9 +ZES_STRUCTURE_TYPE_FREQ_STATE = 0x1B +ZES_STRUCTURE_TYPE_TEMP_PROPERTIES = 0xA +ZES_STRUCTURE_TYPE_TEMP_CONFIG = 0x1C +ZES_STRUCTURE_TYPE_ENGINE_PROPERTIES = 0x5 + + +## Core ze_device UUID struct ## +class ze_device_uuid_t(_PrintableStructure): + _fields_ = [("id", c_ubyte * ZE_MAX_DEVICE_UUID_SIZE)] + + +## Core ze_device_properties_t ## +class ze_device_properties_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), + ("pNext", c_void_p), + ("type", ze_device_type_t), + ("vendorId", c_uint32), + ("deviceId", c_uint32), + ("flags", ze_device_property_flags_t), + ("subdeviceId", c_uint32), + ("coreClockRate", c_uint32), + ("maxMemAllocSize", c_uint64), + ("maxHardwareContexts", c_uint32), + ("maxCommandQueuePriority", c_uint32), + ("numThreadsPerEU", c_uint32), + ("physicalEUSimdWidth", c_uint32), + ("numEUsPerSubslice", c_uint32), + ("numSubslicesPerSlice", c_uint32), + ("numSlices", c_uint32), + ("timerResolution", c_uint64), + ("timestampValidBits", c_uint32), + ("kernelTimestampValidBits", c_uint32), + ("uuid", ze_device_uuid_t), + ("name", c_char * ZE_MAX_DEVICE_NAME), + ] + + +## Sysman zes_device_properties_t ## +class zes_device_properties_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), + ("pNext", c_void_p), + ("core", ze_device_properties_t), + ("numSubdevices", c_uint32), + ("serialNumber", c_char * ZES_STRING_PROPERTY_SIZE), + ("boardNumber", c_char * ZES_STRING_PROPERTY_SIZE), + ("brandName", c_char * ZES_STRING_PROPERTY_SIZE), + ("modelName", c_char * ZES_STRING_PROPERTY_SIZE), + ("vendorName", c_char * ZES_STRING_PROPERTY_SIZE), + ("driverVersion", c_char * ZES_STRING_PROPERTY_SIZE), + ] + + +## Sysman zes_process_state_t ## +class zes_process_state_t(_PrintableStructure): + _fields_ = [ + ("pid", c_uint32), + ("command", c_char * ZES_STRING_PROPERTY_SIZE), + ("memSize", c_uint64), # in bytes + ("sharedMemSize", c_uint64), # in bytes + ("engineType", zes_engine_type_flags_t), + ("subdeviceId", c_uint32), + ] + _fmt_ = {"memSize": "%d bytes", "sharedMemSize": "%d bytes"} + + +## Sysman zes_uuid_t ## +class zes_uuid_t(_PrintableStructure): + _fields_ = [("id", c_ubyte * ZES_MAX_UUID_SIZE)] + + +## Sysman zes_device_ext_properties_t (extension) ## +class zes_device_ext_properties_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), # ZES_STRUCTURE_TYPE_DEVICE_EXT_PROPERTIES + ("pNext", c_void_p), # optional chain + ("uuid", zes_uuid_t), # same as core/sysman device UUID + ("type", zes_device_type_t), # zes_device_type_t (enum) + ("flags", zes_device_property_flags_t), # zes_device_property_flags_t + ] + + +## Sysman zes_subdevice_exp_properties_t (experimental extension) ## +class zes_subdevice_exp_properties_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), # ZES_STRUCTURE_TYPE_SUBDEVICE_EXP_PROPERTIES + ("pNext", c_void_p), # optional chain + ("subdeviceId", c_uint32), # Sub-device ID + ("uuid", zes_uuid_t), # Sub-device UUID + ] + + +## Memory structures ## +class zes_mem_properties_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), # ZES_STRUCTURE_TYPE_MEM_PROPERTIES + ("pNext", c_void_p), # optional chain + ("type", zes_mem_type_t), # memory type + ("onSubdevice", ze_bool_t), # is on subdevice + ("subdeviceId", c_uint32), # subdevice ID + ("location", zes_mem_loc_t), # memory location + ("physicalSize", c_uint64), # physical memory size in bytes + ("busWidth", c_int32), # memory bus width (-1 if unknown) + ("numChannels", c_int32), # number of memory channels (-1 if unknown) + ] + _fmt_ = { + "physicalSize": "%d bytes", + "busWidth": "%d bits", + "numChannels": "%d channels", + } + + +class zes_mem_state_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), # ZES_STRUCTURE_TYPE_MEM_STATE + ("pNext", c_void_p), # optional chain (const void*) + ("health", zes_mem_health_t), # memory health status + ("free", c_uint64), # free memory in bytes + ("size", c_uint64), # total allocatable memory in bytes + ] + _fmt_ = {"free": "%d bytes", "size": "%d bytes"} + + +class zes_mem_bandwidth_t(_PrintableStructure): + _fields_ = [ + ("readCounter", c_uint64), # total bytes read from memory + ("writeCounter", c_uint64), # total bytes written to memory + ("maxBandwidth", c_uint64), # current maximum bandwidth in bytes/sec + ("timestamp", c_uint64), # timestamp in microseconds + ] + _fmt_ = { + "readCounter": "%d bytes", + "writeCounter": "%d bytes", + "maxBandwidth": "%d bytes/sec", + } + + +## Power structures ## +class zes_power_energy_counter_t(_PrintableStructure): + _fields_ = [ + ("energy", c_uint64), # monotonic energy counter in microjoules + ("timestamp", c_uint64), # microsecond timestamp when energy was captured + ] + _fmt_ = {"energy": "%d", "timestamp": "%d microseconds"} + + +## Frequency structures ## +class zes_freq_state_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), # ZES_STRUCTURE_TYPE_FREQ_STATE + ("pNext", c_void_p), + ("currentVoltage", c_double), # current voltage in Volts + ("request", c_double), # current frequency request in MHz + ("tdp", c_double), # max frequency under current TDP in MHz + ("efficient", c_double), # efficient minimum frequency in MHz + ("actual", c_double), # resolved frequency in MHz + ("throttleReasons", zes_freq_throttle_reason_flags_t), # throttle reasons + ] + _fmt_ = { + "currentVoltage": "%.3f V", + "request": "%.1f MHz", + "tdp": "%.1f MHz", + "efficient": "%.1f MHz", + "actual": "%.1f MHz", + "throttleReasons": "0x%08X", + } + + +## Temperature structures ## +class zes_temp_properties_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), # ZES_STRUCTURE_TYPE_TEMP_PROPERTIES + ("pNext", c_void_p), + ("type", zes_temp_sensors_t), # temperature sensor type + ("onSubdevice", ze_bool_t), # is on subdevice + ("subdeviceId", c_uint32), # subdevice ID + ("maxTemperature", c_double), # maximum temperature in degrees Celsius + ("isCriticalTempSupported", ze_bool_t), # is critical temperature supported + ("isThreshold1Supported", ze_bool_t), # is threshold 1 supported + ("isThreshold2Supported", ze_bool_t), # is threshold 2 supported + ] + _fmt_ = {"maxTemperature": "%.1f °C"} + + +class zes_temp_config_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), # ZES_STRUCTURE_TYPE_TEMP_CONFIG + ("pNext", c_void_p), + ("enableCritical", ze_bool_t), # enable critical temperature event + ("threshold1", c_double), # threshold 1 in degrees Celsius + ("threshold2", c_double), # threshold 2 in degrees Celsius + ] + _fmt_ = {"threshold1": "%.1f °C", "threshold2": "%.1f °C"} + + +## Engine structures ## + + +class zes_engine_properties_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), # ZES_STRUCTURE_TYPE_ENGINE_PROPERTIES + ("pNext", c_void_p), + ("type", zes_engine_group_t), # engine type + ("onSubdevice", ze_bool_t), # is on subdevice + ("subdeviceId", c_uint32), # subdevice ID + ] + _fmt_ = {"type": "0x%08X"} + + +class zes_engine_stats_t(_PrintableStructure): + _fields_ = [ + ("activeTime", c_uint64), # active time + ("timestamp", c_uint64), # timestamp + ] + _fmt_ = {"activeTime": "%d", "timestamp": "%d"} + + +## Function access ## +_getFunctionPointerList: Dict[str, Any] = dict() + + +def getFunctionPointerList(name): + if name in _getFunctionPointerList: + return _getFunctionPointerList[name] + + libLoadLock.acquire() + try: + # ensure library was loaded + if gpuLib is None: + raise Exception("Library not loaded") + else: + _getFunctionPointerList[name] = getattr(gpuLib, name) + return _getFunctionPointerList[name] + finally: + # lock is always freed + libLoadLock.release() + + +## function wrappers ## +def zesInit(flags: int): + funcPtr = getFunctionPointerList("zesInit") + funcPtr.argtypes = [c_uint32] + funcPtr.restype = ze_result_t + retVal = funcPtr(flags) + return retVal + + +def zesDriverGet(pCount, phDrivers): + """Wraps: + ze_result_t zesDriverGet(uint32_t* pCount, zes_driver_handle_t* phDrivers) + pCount: POINTER(c_uint32) + phDrivers: POINTER(zes_driver_handle_t) or None + """ + funcPtr = getFunctionPointerList("zesDriverGet") + funcPtr.argtypes = [POINTER(c_uint32), POINTER(zes_driver_handle_t)] + funcPtr.restype = ze_result_t + retVal = funcPtr(pCount, phDrivers) + return retVal + + +def zesDeviceGet(hDriver, pCount, phDevices): + """Wraps: + ze_result_t zesDeviceGet(zes_driver_handle_t hDriver, uint32_t* pCount, zes_device_handle_t* phDevices) + hDriver: driver handle (void*) + pCount: POINTER(c_uint32) + phDevices: POINTER(zes_device_handle_t) or None + """ + funcPtr = getFunctionPointerList("zesDeviceGet") + funcPtr.argtypes = [ + zes_driver_handle_t, + POINTER(c_uint32), + POINTER(zes_device_handle_t), + ] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDriver, pCount, phDevices) + return retVal + + +def zesDeviceGetProperties(hDevice, pProperties): + """Wraps API: + ze_result_t zesDeviceGetProperties(zes_device_handle_t hDevice, zes_device_properties_t* pProperties) + + Parameters: + hDevice: device handle + pProperties: POINTER(zes_device_properties_t) - properties structure to fill + Returns: + ze_result_t - return code only, properties are filled into pProperties + """ + funcPtr = getFunctionPointerList("zesDeviceGetProperties") + funcPtr.argtypes = [zes_device_handle_t, POINTER(zes_device_properties_t)] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pProperties) + return retVal + + +def zesDeviceGetSubDevicePropertiesExp(hDevice, pCount, pSubdeviceProps): + """Wraps API: + ze_result_t zesDeviceGetSubDevicePropertiesExp( + zes_device_handle_t hDevice, + uint32_t* pCount, + zes_subdevice_exp_properties_t* pSubdeviceProps) + + Parameters: + hDevice: device handle + pCount: POINTER(c_uint32) - pointer to count of subdevices + pSubdeviceProps: POINTER(zes_subdevice_exp_properties_t) or None - array of subdevice properties + Returns: + ze_result_t - return code only, subdevice properties are filled into pSubdeviceProps + """ + funcPtr = getFunctionPointerList("zesDeviceGetSubDevicePropertiesExp") + funcPtr.argtypes = [ + zes_device_handle_t, + POINTER(c_uint32), + POINTER(zes_subdevice_exp_properties_t), + ] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pCount, pSubdeviceProps) + return retVal + + +def zesDriverGetDeviceByUuidExp( + hDriver, uuid: "zes_uuid_t", phDevice, onSubdevice, subdeviceId +): + """Wraps API: + ze_result_t zesDriverGetDeviceByUuidExp( + zes_driver_handle_t hDriver, + zes_uuid_t uuid, + zes_device_handle_t *phDevice, + ze_bool_t *onSubdevice, + uint32_t *subdeviceId) + + Parameters: + hDriver: driver handle + uuid: instance of zes_uuid_t passed by value + phDevice: POINTER(zes_device_handle_t) + onSubdevice: POINTER(ze_bool_t) + subdeviceId: POINTER(c_uint32) + """ + funcPtr = getFunctionPointerList("zesDriverGetDeviceByUuidExp") + funcPtr.argtypes = [ + zes_driver_handle_t, + zes_uuid_t, + POINTER(zes_device_handle_t), + POINTER(ze_bool_t), + POINTER(c_uint32), + ] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDriver, uuid, phDevice, onSubdevice, subdeviceId) + return retVal + + +def zesDeviceProcessesGetState(hDevice, pCount, pProcesses): + """Wraps API: + ze_result_t zesDeviceProcessesGetState( + zes_device_handle_t hDevice, + uint32_t* pCount, + zes_process_state_t* pProcesses) + + Parameters: + hDevice: device handle + pCount: POINTER(c_uint32) + pProcesses: POINTER(zes_process_state_t) or None + """ + funcPtr = getFunctionPointerList("zesDeviceProcessesGetState") + funcPtr.argtypes = [ + zes_device_handle_t, + POINTER(c_uint32), + POINTER(zes_process_state_t), + ] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pCount, pProcesses) + return retVal + + +## Memory management functions ## +def zesDeviceEnumMemoryModules(hDevice, pCount, phMemory): + """Wraps API: + ze_result_t zesDeviceEnumMemoryModules( + zes_device_handle_t hDevice, + uint32_t* pCount, + zes_mem_handle_t* phMemory) + + Parameters: + hDevice: device handle + pCount: POINTER(c_uint32) + phMemory: POINTER(zes_mem_handle_t) or None + """ + funcPtr = getFunctionPointerList("zesDeviceEnumMemoryModules") + funcPtr.argtypes = [ + zes_device_handle_t, + POINTER(c_uint32), + POINTER(zes_mem_handle_t), + ] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pCount, phMemory) + return retVal + + +def zesMemoryGetProperties(hMemory, pProperties): + """Wraps API: + ze_result_t zesMemoryGetProperties( + zes_mem_handle_t hMemory, + zes_mem_properties_t* pProperties) + + Parameters: + hMemory: memory handle + pProperties: POINTER(zes_mem_properties_t) - properties structure to fill + Returns: + ze_result_t - return code only, properties are filled into pProperties + """ + funcPtr = getFunctionPointerList("zesMemoryGetProperties") + funcPtr.argtypes = [zes_mem_handle_t, POINTER(zes_mem_properties_t)] + funcPtr.restype = ze_result_t + retVal = funcPtr(hMemory, pProperties) + return retVal + + +def zesMemoryGetState(hMemory, pState): + """Wraps API: + ze_result_t zesMemoryGetState( + zes_mem_handle_t hMemory, + zes_mem_state_t* pState) + + Parameters: + hMemory: memory handle + pState: POINTER(zes_mem_state_t) - state structure to fill + Returns: + ze_result_t - return code only, state is filled into pState + """ + funcPtr = getFunctionPointerList("zesMemoryGetState") + funcPtr.argtypes = [zes_mem_handle_t, POINTER(zes_mem_state_t)] + funcPtr.restype = ze_result_t + retVal = funcPtr(hMemory, pState) + return retVal + + +def zesMemoryGetBandwidth(hMemory, pBandwidth): + """Wraps API: + ze_result_t zesMemoryGetBandwidth( + zes_mem_handle_t hMemory, + zes_mem_bandwidth_t* pBandwidth) + + Parameters: + hMemory: memory handle + pBandwidth: POINTER(zes_mem_bandwidth_t) - bandwidth structure to fill + Returns: + ze_result_t - return code only, bandwidth is filled into pBandwidth + """ + funcPtr = getFunctionPointerList("zesMemoryGetBandwidth") + funcPtr.argtypes = [zes_mem_handle_t, POINTER(zes_mem_bandwidth_t)] + funcPtr.restype = ze_result_t + retVal = funcPtr(hMemory, pBandwidth) + return retVal + + +## Power management functions ## +def zesDeviceEnumPowerDomains(hDevice, pCount, phPower): + """Wraps API: + ze_result_t zesDeviceEnumPowerDomains( + zes_device_handle_t hDevice, + uint32_t* pCount, + zes_pwr_handle_t* phPower) + + Parameters: + hDevice: device handle + pCount: POINTER(c_uint32) + phPower: POINTER(zes_pwr_handle_t) or None + """ + funcPtr = getFunctionPointerList("zesDeviceEnumPowerDomains") + funcPtr.argtypes = [ + zes_device_handle_t, + POINTER(c_uint32), + POINTER(zes_pwr_handle_t), + ] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pCount, phPower) + return retVal + + +def zesPowerGetEnergyCounter(hPower, pEnergy): + """Wraps API: + ze_result_t zesPowerGetEnergyCounter( + zes_pwr_handle_t hPower, + zes_power_energy_counter_t* pEnergy) + + Parameters: + hPower: power handle + pEnergy: POINTER(zes_power_energy_counter_t) - energy counter structure to fill + Returns: + ze_result_t - return code only, energy counter is filled into pEnergy + """ + funcPtr = getFunctionPointerList("zesPowerGetEnergyCounter") + funcPtr.argtypes = [zes_pwr_handle_t, POINTER(zes_power_energy_counter_t)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hPower, pEnergy) + return retVal + + +## Frequency module functions ## +def zesDeviceEnumFrequencyDomains(hDevice, pCount, phFrequency): + """Wraps API: + ze_result_t zesDeviceEnumFrequencyDomains( + zes_device_handle_t hDevice, + uint32_t* pCount, + zes_freq_handle_t* phFrequency) + + Parameters: + hDevice: device handle + pCount: POINTER(c_uint32) + phFrequency: POINTER(zes_freq_handle_t) or None + """ + funcPtr = getFunctionPointerList("zesDeviceEnumFrequencyDomains") + funcPtr.argtypes = [ + zes_device_handle_t, + POINTER(c_uint32), + POINTER(zes_freq_handle_t), + ] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pCount, phFrequency) + return retVal + + +def zesFrequencyGetState(hFrequency, pState): + """Wraps API: + ze_result_t zesFrequencyGetState( + zes_freq_handle_t hFrequency, + zes_freq_state_t* pState) + + Parameters: + hFrequency: frequency handle + pState: POINTER(zes_freq_state_t) - state structure to fill + Returns: + ze_result_t - return code only, state is filled into pState + """ + funcPtr = getFunctionPointerList("zesFrequencyGetState") + funcPtr.argtypes = [zes_freq_handle_t, POINTER(zes_freq_state_t)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hFrequency, pState) + return retVal + + +## Temperature sensor functions ## +def zesDeviceEnumTemperatureSensors(hDevice, pCount, phTemperature): + """Wraps API: + ze_result_t zesDeviceEnumTemperatureSensors( + zes_device_handle_t hDevice, + uint32_t* pCount, + zes_temp_handle_t* phTemperature) + + Parameters: + hDevice: device handle + pCount: POINTER(c_uint32) + phTemperature: POINTER(zes_temp_handle_t) or None + """ + funcPtr = getFunctionPointerList("zesDeviceEnumTemperatureSensors") + funcPtr.argtypes = [ + zes_device_handle_t, + POINTER(c_uint32), + POINTER(zes_temp_handle_t), + ] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pCount, phTemperature) + return retVal + + +def zesTemperatureGetProperties(hTemperature, pProperties): + """Wraps API: + ze_result_t zesTemperatureGetProperties( + zes_temp_handle_t hTemperature, + zes_temp_properties_t* pProperties) + + Parameters: + hTemperature: temperature handle + pProperties: POINTER(zes_temp_properties_t) - properties structure to fill + Returns: + ze_result_t - return code only, properties are filled into pProperties + """ + funcPtr = getFunctionPointerList("zesTemperatureGetProperties") + funcPtr.argtypes = [zes_temp_handle_t, POINTER(zes_temp_properties_t)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hTemperature, pProperties) + return retVal + + +def zesTemperatureGetConfig(hTemperature, pConfig): + """Wraps API: + ze_result_t zesTemperatureGetConfig( + zes_temp_handle_t hTemperature, + zes_temp_config_t* pConfig) + + Parameters: + hTemperature: temperature handle + pConfig: POINTER(zes_temp_config_t) - config structure to fill + Returns: + ze_result_t - return code only, config is filled into pConfig + """ + funcPtr = getFunctionPointerList("zesTemperatureGetConfig") + funcPtr.argtypes = [zes_temp_handle_t, POINTER(zes_temp_config_t)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hTemperature, pConfig) + return retVal + + +def zesTemperatureGetState(hTemperature, pTemperature): + """Wraps API: + ze_result_t zesTemperatureGetState( + zes_temp_handle_t hTemperature, + double* pTemperature) + + Parameters: + hTemperature: temperature handle + pTemperature: POINTER(c_double) - temperature value to fill in degrees Celsius + Returns: + ze_result_t - return code only, temperature is filled into pTemperature + """ + funcPtr = getFunctionPointerList("zesTemperatureGetState") + funcPtr.argtypes = [zes_temp_handle_t, POINTER(c_double)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hTemperature, pTemperature) + return retVal + + +## Engine functions ## + + +def zesDeviceEnumEngineGroups(hDevice, pCount, phEngine): + """Wraps API: + ze_result_t zesDeviceEnumEngineGroups( + zes_device_handle_t hDevice, + uint32_t* pCount, + zes_engine_handle_t* phEngine) + + Parameters: + hDevice: device handle + pCount: POINTER(c_uint32) + phEngine: POINTER(zes_engine_handle_t) or None + """ + funcPtr = getFunctionPointerList("zesDeviceEnumEngineGroups") + funcPtr.argtypes = [ + zes_device_handle_t, + POINTER(c_uint32), + POINTER(zes_engine_handle_t), + ] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pCount, phEngine) + return retVal + + +def zesEngineGetProperties(hEngine, pProperties): + """Wraps API: + ze_result_t zesEngineGetProperties( + zes_engine_handle_t hEngine, + zes_engine_properties_t* pProperties) + + Parameters: + hEngine: engine handle + pProperties: POINTER(zes_engine_properties_t) - properties structure to fill + Returns: + ze_result_t - return code only, properties are filled into pProperties + """ + funcPtr = getFunctionPointerList("zesEngineGetProperties") + funcPtr.argtypes = [zes_engine_handle_t, POINTER(zes_engine_properties_t)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hEngine, pProperties) + return retVal + + +def zesEngineGetActivity(hEngine, pStats): + """Wraps API: + ze_result_t zesEngineGetActivity( + zes_engine_handle_t hEngine, + zes_engine_stats_t* pStats) + + Parameters: + hEngine: engine handle + pStats: POINTER(zes_engine_stats_t) - stats structure to fill + Returns: + ze_result_t - return code only, stats are filled into pStats + """ + funcPtr = getFunctionPointerList("zesEngineGetActivity") + funcPtr.argtypes = [zes_engine_handle_t, POINTER(zes_engine_stats_t)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hEngine, pStats) + return retVal diff --git a/bindings/sysman/python/test/unit_tests/conftest.py b/bindings/sysman/python/test/unit_tests/conftest.py new file mode 100644 index 00000000..fabaafd5 --- /dev/null +++ b/bindings/sysman/python/test/unit_tests/conftest.py @@ -0,0 +1,42 @@ +## +# Copyright (C) 2025 Intel Corporation +# +# SPDX-License-Identifier: MIT +# +## + +""" +Pytest configuration for unit tests with GPU library mocking. +Unit tests should always use mocked GPU libraries to test logic in isolation. +""" + +import sys +from unittest.mock import Mock, patch + +import pytest + + +@pytest.fixture(autouse=True) +def mock_gpu_library(): + """ + Automatically mock the GPU library loading for all unit tests. + Unit tests should always run with mocked libraries to test logic in isolation. + """ + # Always mock for unit tests - no conditional logic needed + with patch("ctypes.CDLL") as mock_cdll: + # Create a mock library object + mock_lib = Mock() + mock_cdll.return_value = mock_lib + + # Patch the module before it gets imported + if "pyzes" in sys.modules: + # If already imported, patch the existing gpuLib + import pyzes + + original_gpulib = pyzes.gpuLib + pyzes.gpuLib = mock_lib + yield mock_lib + pyzes.gpuLib = original_gpulib + else: + # If not imported yet, let the mock handle the CDLL calls + yield mock_lib diff --git a/bindings/sysman/python/test/unit_tests/test_engine.py b/bindings/sysman/python/test/unit_tests/test_engine.py new file mode 100644 index 00000000..be7f31af --- /dev/null +++ b/bindings/sysman/python/test/unit_tests/test_engine.py @@ -0,0 +1,105 @@ +## +# Copyright (C) 2025 Intel Corporation +# +# SPDX-License-Identifier: MIT +# +## + +import os +import sys +import unittest +from ctypes import * +from unittest.mock import MagicMock, patch + +# Add the source directory to Python path so we can import pyzes +script_dir = os.path.dirname(os.path.abspath(__file__)) +source_dir = os.path.join(script_dir, "..", "..", "source") +source_dir = os.path.abspath(source_dir) +if source_dir not in sys.path: + sys.path.insert(0, source_dir) + + +@patch("pyzes.getFunctionPointerList") +class TestEngineFunctions(unittest.TestCase): + + def setUp(self): + import pyzes + + self.pyzes = pyzes + + def test_GivenValidDeviceHandleWhenCallingZesDeviceEnumEngineGroupsThenCallSucceedsWithValidCount( + self, mock_get_func + ): + mock_engine_count = 4 + + def mock_enum_engine_groups(device_handle, count_ptr, handles_ptr): + count_ptr._obj.value = mock_engine_count + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_enum_engine_groups) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + count = c_uint32(0) + + result = self.pyzes.zesDeviceEnumEngineGroups(device_handle, byref(count), None) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(count.value, mock_engine_count) + mock_get_func.assert_called_with("zesDeviceEnumEngineGroups") + mock_func.assert_called_once() + + def test_GivenValidEngineHandleWhenCallingZesEngineGetPropertiesThenCallSucceedsWithValidProperties( + self, mock_get_func + ): + mock_type = self.pyzes.ZES_ENGINE_GROUP_COMPUTE_SINGLE + mock_subdevice_id = 0 + mock_on_subdevice = False + + def mock_get_properties(engine_handle, properties_ptr): + properties_ptr._obj.type = mock_type + properties_ptr._obj.subdeviceId = mock_subdevice_id + properties_ptr._obj.onSubdevice = mock_on_subdevice + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_properties) + mock_get_func.return_value = mock_func + + engine_handle = self.pyzes.zes_engine_handle_t() + engine_props = self.pyzes.zes_engine_properties_t() + result = self.pyzes.zesEngineGetProperties(engine_handle, byref(engine_props)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(engine_props.type, mock_type) + self.assertEqual(engine_props.subdeviceId, mock_subdevice_id) + self.assertEqual(engine_props.onSubdevice, mock_on_subdevice) + mock_get_func.assert_called_with("zesEngineGetProperties") + mock_func.assert_called_once() + + def test_GivenValidEngineHandleWhenCallingZesEngineGetActivityThenCallSucceedsWithActivity( + self, mock_get_func + ): + mock_active_time = 987654321 + mock_timestamp = 123456789 + + def mock_get_activity(engine_handle, stats_ptr): + stats_ptr._obj.activeTime = mock_active_time + stats_ptr._obj.timestamp = mock_timestamp + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_activity) + mock_get_func.return_value = mock_func + + engine_handle = self.pyzes.zes_engine_handle_t() + engine_stats = self.pyzes.zes_engine_stats_t() + result = self.pyzes.zesEngineGetActivity(engine_handle, byref(engine_stats)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(engine_stats.activeTime, mock_active_time) + self.assertEqual(engine_stats.timestamp, mock_timestamp) + mock_get_func.assert_called_with("zesEngineGetActivity") + mock_func.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/bindings/sysman/python/test/unit_tests/test_frequency.py b/bindings/sysman/python/test/unit_tests/test_frequency.py new file mode 100644 index 00000000..2de80205 --- /dev/null +++ b/bindings/sysman/python/test/unit_tests/test_frequency.py @@ -0,0 +1,92 @@ +## +# Copyright (C) 2025 Intel Corporation +# +# SPDX-License-Identifier: MIT +# +## + +import os +import sys +import unittest +from ctypes import * +from unittest.mock import MagicMock, patch + +# Add the source directory to Python path so we can import pyzes +script_dir = os.path.dirname(os.path.abspath(__file__)) +source_dir = os.path.join(script_dir, "..", "..", "source") +source_dir = os.path.abspath(source_dir) +if source_dir not in sys.path: + sys.path.insert(0, source_dir) + + +@patch("pyzes.getFunctionPointerList") +class TestFrequencyFunctions(unittest.TestCase): + + def setUp(self): + import pyzes + + self.pyzes = pyzes + + def test_GivenValidDeviceHandleWhenCallingZesDeviceEnumFrequencyDomainsThenCallSucceedsWithValidCount( + self, mock_get_func + ): + mock_frequency_count = 2 + + def mock_enum_frequency_domains(device_handle, count_ptr, handles_ptr): + count_ptr._obj.value = mock_frequency_count + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_enum_frequency_domains) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + count = c_uint32(0) + + result = self.pyzes.zesDeviceEnumFrequencyDomains( + device_handle, byref(count), None + ) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(count.value, mock_frequency_count) + mock_get_func.assert_called_with("zesDeviceEnumFrequencyDomains") + mock_func.assert_called_once() + + def test_GivenValidFrequencyHandleWhenCallingZesFrequencyGetStateThenCallSucceedsWithState( + self, mock_get_func + ): + mock_current_voltage = 1.2 # 1.2V + mock_request = 1500.0 # 1500 MHz + mock_tdp = 1600.0 # 1600 MHz + mock_efficient = 800.0 # 800 MHz + mock_actual = 1450.0 # 1450 MHz + mock_throttle_reasons = 0 # throttle reason flags + + def mock_get_state(frequency_handle, state_ptr): + state_ptr._obj.currentVoltage = mock_current_voltage + state_ptr._obj.request = mock_request + state_ptr._obj.tdp = mock_tdp + state_ptr._obj.efficient = mock_efficient + state_ptr._obj.actual = mock_actual + state_ptr._obj.throttleReasons = mock_throttle_reasons + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_state) + mock_get_func.return_value = mock_func + + frequency_handle = self.pyzes.zes_freq_handle_t() + freq_state = self.pyzes.zes_freq_state_t() + result = self.pyzes.zesFrequencyGetState(frequency_handle, byref(freq_state)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(freq_state.currentVoltage, mock_current_voltage) + self.assertEqual(freq_state.request, mock_request) + self.assertEqual(freq_state.tdp, mock_tdp) + self.assertEqual(freq_state.efficient, mock_efficient) + self.assertEqual(freq_state.actual, mock_actual) + self.assertEqual(freq_state.throttleReasons, mock_throttle_reasons) + mock_get_func.assert_called_with("zesFrequencyGetState") + mock_func.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/bindings/sysman/python/test/unit_tests/test_global_operations.py b/bindings/sysman/python/test/unit_tests/test_global_operations.py new file mode 100644 index 00000000..96138ab0 --- /dev/null +++ b/bindings/sysman/python/test/unit_tests/test_global_operations.py @@ -0,0 +1,306 @@ +## +# Copyright (C) 2025 Intel Corporation +# +# SPDX-License-Identifier: MIT +# +## + +import os +import sys +import unittest +from ctypes import * +from unittest.mock import MagicMock, patch + +# Add the source directory to Python path so we can import pyzes +script_dir = os.path.dirname(os.path.abspath(__file__)) +source_dir = os.path.join(script_dir, "..", "..", "source") +source_dir = os.path.abspath(source_dir) +if source_dir not in sys.path: + sys.path.insert(0, source_dir) + + +@patch("pyzes.getFunctionPointerList") +class TestGlobalOperations(unittest.TestCase): + + def setUp(self): + import pyzes + + self.pyzes = pyzes + + def test_GivenValidDriverHandleWhenCallingZesDeviceGetThenCallSucceedsWithValidCount( + self, mock_get_func + ): + mock_device_count = 2 + + def mock_device_get(driver_handle, count_ptr, devices_ptr): + count_ptr._obj.value = mock_device_count + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_device_get) + mock_get_func.return_value = mock_func + + driver_handle = self.pyzes.zes_driver_handle_t() + count = c_uint32(0) + + result = self.pyzes.zesDeviceGet(driver_handle, byref(count), None) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(count.value, mock_device_count) + mock_get_func.assert_called_with("zesDeviceGet") + mock_func.assert_called_once() + + def test_GivenValidDeviceHandleWhenCallingZesDeviceGetPropertiesThenCallSucceedsWithValidProperties( + self, mock_get_func + ): + mock_num_subdevices = 4 + mock_vendor_id = 0x8086 # Intel + mock_device_id = 0x1234 + mock_core_clock_rate = 1500 # MHz + mock_max_mem_alloc_size = 4294967296 # 4GB + mock_num_slices = 8 + mock_max_hardware_contexts = 512 + mock_num_threads_per_eu = 8 + + def mock_get_properties(device_handle, properties_ptr): + # Set zes_device_properties_t fields + properties_ptr._obj.numSubdevices = mock_num_subdevices + + # Set ze_device_properties_t core fields + properties_ptr._obj.core.vendorId = mock_vendor_id + properties_ptr._obj.core.deviceId = mock_device_id + properties_ptr._obj.core.coreClockRate = mock_core_clock_rate + properties_ptr._obj.core.maxMemAllocSize = mock_max_mem_alloc_size + properties_ptr._obj.core.numSlices = mock_num_slices + properties_ptr._obj.core.maxHardwareContexts = mock_max_hardware_contexts + properties_ptr._obj.core.numThreadsPerEU = mock_num_threads_per_eu + + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_properties) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + device_props = self.pyzes.zes_device_properties_t() + result = self.pyzes.zesDeviceGetProperties(device_handle, byref(device_props)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + + # Validate zes_device_properties_t fields + self.assertEqual(device_props.numSubdevices, mock_num_subdevices) + + # Validate ze_device_properties_t core fields + self.assertEqual(device_props.core.vendorId, mock_vendor_id) + self.assertEqual(device_props.core.deviceId, mock_device_id) + self.assertEqual(device_props.core.coreClockRate, mock_core_clock_rate) + self.assertEqual(device_props.core.maxMemAllocSize, mock_max_mem_alloc_size) + self.assertEqual(device_props.core.numSlices, mock_num_slices) + self.assertEqual( + device_props.core.maxHardwareContexts, mock_max_hardware_contexts + ) + self.assertEqual(device_props.core.numThreadsPerEU, mock_num_threads_per_eu) + + mock_get_func.assert_called_with("zesDeviceGetProperties") + mock_func.assert_called_once() + + def test_GivenValidDeviceHandleWhenCallingZesDeviceProcessesGetStateThenCallSucceedsWithProcessCount( + self, mock_get_func + ): + mock_process_count = 3 + + def mock_get_processes_state(device_handle, count_ptr, processes_ptr): + count_ptr._obj.value = mock_process_count + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_processes_state) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + count = c_uint32(0) + + result = self.pyzes.zesDeviceProcessesGetState( + device_handle, byref(count), None + ) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(count.value, mock_process_count) + mock_get_func.assert_called_with("zesDeviceProcessesGetState") + mock_func.assert_called_once() + + def test_GivenValidDeviceHandleWhenCallingZesDeviceProcessesGetStateWithArrayThenCallSucceedsWithProcessData( + self, mock_get_func + ): + mock_process_count = 2 + mock_pid_1 = 1234 + mock_pid_2 = 5678 + mock_mem_size_1 = 1073741824 # 1GB + mock_mem_size_2 = 2147483648 # 2GB + mock_shared_mem_size_1 = 536870912 # 512MB + mock_shared_mem_size_2 = 1073741824 # 1GB + mock_subdevice_id_1 = 0 + mock_subdevice_id_2 = 1 + + def mock_get_processes_state_with_data(device_handle, count_ptr, processes_ptr): + if processes_ptr: + # Fill in process data for first process + processes_ptr[0].pid = mock_pid_1 + processes_ptr[0].memSize = mock_mem_size_1 + processes_ptr[0].sharedMemSize = mock_shared_mem_size_1 + processes_ptr[0].subdeviceId = mock_subdevice_id_1 + + # Fill in process data for second process + processes_ptr[1].pid = mock_pid_2 + processes_ptr[1].memSize = mock_mem_size_2 + processes_ptr[1].sharedMemSize = mock_shared_mem_size_2 + processes_ptr[1].subdeviceId = mock_subdevice_id_2 + else: + count_ptr._obj.value = mock_process_count + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_processes_state_with_data) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + count = c_uint32(mock_process_count) + + # Create array for process states + process_array = (self.pyzes.zes_process_state_t * mock_process_count)() + + result = self.pyzes.zesDeviceProcessesGetState( + device_handle, byref(count), process_array + ) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + + # Validate first process data + self.assertEqual(process_array[0].pid, mock_pid_1) + self.assertEqual(process_array[0].memSize, mock_mem_size_1) + self.assertEqual(process_array[0].sharedMemSize, mock_shared_mem_size_1) + self.assertEqual(process_array[0].subdeviceId, mock_subdevice_id_1) + + # Validate second process data + self.assertEqual(process_array[1].pid, mock_pid_2) + self.assertEqual(process_array[1].memSize, mock_mem_size_2) + self.assertEqual(process_array[1].sharedMemSize, mock_shared_mem_size_2) + self.assertEqual(process_array[1].subdeviceId, mock_subdevice_id_2) + + mock_get_func.assert_called_with("zesDeviceProcessesGetState") + mock_func.assert_called_once() + + def test_GivenValidDeviceHandleWhenCallingZesDeviceGetSubDevicePropertiesExpThenCallSucceedsWithSubdeviceCount( + self, mock_get_func + ): + mock_subdevice_count = 2 + + def mock_get_subdevice_props(device_handle, count_ptr, props_ptr): + count_ptr._obj.value = mock_subdevice_count + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_subdevice_props) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + count = c_uint32(0) + + result = self.pyzes.zesDeviceGetSubDevicePropertiesExp( + device_handle, byref(count), None + ) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(count.value, mock_subdevice_count) + mock_get_func.assert_called_with("zesDeviceGetSubDevicePropertiesExp") + mock_func.assert_called_once() + + def test_GivenValidDeviceHandleWhenCallingZesDeviceGetSubDevicePropertiesExpWithArrayThenCallSucceedsWithSubdeviceData( + self, mock_get_func + ): + mock_subdevice_count = 2 + mock_subdevice_id_0 = 0 + mock_subdevice_id_1 = 1 + # Mock UUIDs as predefined byte arrays + mock_uuid_0 = [ + 0x01, + 0x23, + 0x45, + 0x67, + 0x89, + 0xAB, + 0xCD, + 0xEF, + 0xFE, + 0xDC, + 0xBA, + 0x98, + 0x76, + 0x54, + 0x32, + 0x10, + ] + mock_uuid_1 = [ + 0x11, + 0x22, + 0x33, + 0x44, + 0x55, + 0x66, + 0x77, + 0x88, + 0x99, + 0xAA, + 0xBB, + 0xCC, + 0xDD, + 0xEE, + 0xFF, + 0x00, + ] + + def mock_get_subdevice_props_with_data(device_handle, count_ptr, props_ptr): + if props_ptr: + # Fill in first subdevice properties + props_ptr[0].subdeviceId = mock_subdevice_id_0 + # Set UUID for first subdevice + for i in range(16): + props_ptr[0].uuid.id[i] = mock_uuid_0[i] + + # Fill in second subdevice properties + props_ptr[1].subdeviceId = mock_subdevice_id_1 + # Set UUID for second subdevice + for i in range(16): + props_ptr[1].uuid.id[i] = mock_uuid_1[i] + else: + count_ptr._obj.value = mock_subdevice_count + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_subdevice_props_with_data) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + count = c_uint32(mock_subdevice_count) + + # Create array for subdevice properties + subdevice_props_array = ( + self.pyzes.zes_subdevice_exp_properties_t * mock_subdevice_count + )() + + result = self.pyzes.zesDeviceGetSubDevicePropertiesExp( + device_handle, byref(count), subdevice_props_array + ) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + + # Validate first subdevice properties + self.assertEqual(subdevice_props_array[0].subdeviceId, mock_subdevice_id_0) + for i in range(16): + self.assertEqual(subdevice_props_array[0].uuid.id[i], mock_uuid_0[i]) + + # Validate second subdevice properties + self.assertEqual(subdevice_props_array[1].subdeviceId, mock_subdevice_id_1) + for i in range(16): + self.assertEqual(subdevice_props_array[1].uuid.id[i], mock_uuid_1[i]) + + mock_get_func.assert_called_with("zesDeviceGetSubDevicePropertiesExp") + mock_func.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/bindings/sysman/python/test/unit_tests/test_init.py b/bindings/sysman/python/test/unit_tests/test_init.py new file mode 100644 index 00000000..38d32d34 --- /dev/null +++ b/bindings/sysman/python/test/unit_tests/test_init.py @@ -0,0 +1,224 @@ +## +# Copyright (C) 2025 Intel Corporation +# +# SPDX-License-Identifier: MIT +# +## + +import os +import sys +import unittest +from ctypes import * +from unittest.mock import MagicMock, patch + +# Add the source directory to Python path so we can import pyzes +script_dir = os.path.dirname(os.path.abspath(__file__)) +source_dir = os.path.join(script_dir, "..", "..", "source") +source_dir = os.path.abspath(source_dir) +if source_dir not in sys.path: + sys.path.insert(0, source_dir) + + +@patch("pyzes.getFunctionPointerList") +class TestInitFunctions(unittest.TestCase): + + def setUp(self): + import pyzes + + self.pyzes = pyzes + + def test_GivenvalidPyzesModuleWhenCallingZesInitThenCallSucceeds( + self, mock_get_func + ): + mock_func = MagicMock() + mock_func.return_value = self.pyzes.ZE_RESULT_SUCCESS + mock_get_func.return_value = mock_func + + result = self.pyzes.zesInit(0) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + mock_get_func.assert_called_with("zesInit") + mock_func.assert_called_once() + + def test_GivenValidPyzesMouleWhenCallingZesDriverGetThenCallSucceedsWithValidCount( + self, mock_get_func + ): + mock_driver_count = 1 + + def mock_driver_get(count_ptr, drivers_ptr): + count_ptr._obj.value = mock_driver_count + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_driver_get) + mock_get_func.return_value = mock_func + + count = c_uint32(0) + + result = self.pyzes.zesDriverGet(byref(count), None) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(count.value, mock_driver_count) + mock_get_func.assert_called_with("zesDriverGet") + mock_func.assert_called_once() + + def test_GivenValidDriverHandleWhenCallingZesDriverGetDeviceByUuidExpThenCallSucceedsWithDeviceInfo( + self, mock_get_func + ): + mock_device_handle = 0x12345678 + mock_on_subdevice = False + mock_subdevice_id = 0 + + def mock_get_device_by_uuid( + driver_handle, uuid, device_handle_ptr, on_subdevice_ptr, subdevice_id_ptr + ): + device_handle_ptr._obj.value = mock_device_handle + on_subdevice_ptr._obj.value = mock_on_subdevice + subdevice_id_ptr._obj.value = mock_subdevice_id + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_device_by_uuid) + mock_get_func.return_value = mock_func + + driver_handle = self.pyzes.zes_driver_handle_t() + uuid = self.pyzes.zes_uuid_t() + device_handle = self.pyzes.zes_device_handle_t() + on_subdevice = self.pyzes.ze_bool_t() + subdevice_id = c_uint32() + + result = self.pyzes.zesDriverGetDeviceByUuidExp( + driver_handle, + uuid, + byref(device_handle), + byref(on_subdevice), + byref(subdevice_id), + ) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(device_handle.value, mock_device_handle) + self.assertEqual(on_subdevice.value, mock_on_subdevice) + self.assertEqual(subdevice_id.value, mock_subdevice_id) + mock_get_func.assert_called_with("zesDriverGetDeviceByUuidExp") + mock_func.assert_called_once() + + +class TestInitInfrastructure(unittest.TestCase): + + def setUp(self): + import pyzes + + self.pyzes = pyzes + + def test_GivenLibraryAlreadyLoadedWhenCallingLoadZeLibraryThenReturnsEarly(self): + # Test early return when library already loaded + original_gpuLib = self.pyzes.gpuLib + + # Set gpuLib to non-None to trigger early return + mock_lib = MagicMock() + self.pyzes.gpuLib = mock_lib + + try: + # This should return early without any library loading + with patch("builtins.print") as mock_print: + self.pyzes._LoadZeLibrary() + # Should not print "Loading Linux library" because it returns early + print_calls = [ + call.args[0] for call in mock_print.call_args_list if call.args + ] + self.assertNotIn("Loading Linux library", print_calls) + finally: + # Restore original state + self.pyzes.gpuLib = original_gpuLib + + @patch("pyzes.gpuLib", None) + def test_GivenLibraryNotLoadedWhenCallingGetFunctionPointerListThenRaisesException( + self, + ): + # Test error handling path when library not loaded + with patch("pyzes.libLoadLock") as mock_lock: + mock_acquire = MagicMock() + mock_release = MagicMock() + mock_lock.acquire = mock_acquire + mock_lock.release = mock_release + + with self.assertRaises(Exception) as context: + self.pyzes.getFunctionPointerList("zesInit") + + self.assertIn("Library not loaded", str(context.exception)) + mock_acquire.assert_called_once() + mock_release.assert_called_once() + + def test_GivenFunctionPointerAlreadyCachedWhenCallingGetFunctionPointerListThenReturnsCachedVersion( + self, + ): + # Test caching logic in getFunctionPointerList + mock_func_ptr = MagicMock() + + # Pre-populate cache + original_cache = self.pyzes._getFunctionPointerList.copy() + self.pyzes._getFunctionPointerList["testFunction"] = mock_func_ptr + + try: + # This should return cached version without library access + result = self.pyzes.getFunctionPointerList("testFunction") + self.assertEqual(result, mock_func_ptr) + finally: + # Restore original cache + self.pyzes._getFunctionPointerList.clear() + self.pyzes._getFunctionPointerList.update(original_cache) + + @patch("pyzes.gpuLib") + @patch("pyzes.libLoadLock") + def test_GivenValidLibraryWhenGettingNewFunctionPointerThenCachesAndReturnsFunction( + self, mock_lock, mock_lib + ): + # Test function pointer retrieval and caching + mock_acquire = MagicMock() + mock_release = MagicMock() + mock_lock.acquire = mock_acquire + mock_lock.release = mock_release + + mock_function = MagicMock() + mock_lib.testNewFunction = mock_function + + # Clear cache for this test + original_cache = self.pyzes._getFunctionPointerList.copy() + self.pyzes._getFunctionPointerList.clear() + + try: + result = self.pyzes.getFunctionPointerList("testNewFunction") + + # Verify function was retrieved and cached + self.assertEqual(result, mock_function) + self.assertEqual( + self.pyzes._getFunctionPointerList["testNewFunction"], mock_function + ) + mock_acquire.assert_called_once() + mock_release.assert_called_once() + finally: + # Restore original cache + self.pyzes._getFunctionPointerList.clear() + self.pyzes._getFunctionPointerList.update(original_cache) + + @patch("sys.platform", "win32") + @patch("pyzes.gpuLib", None) + @patch("builtins.print") + @patch("os.path.exists") + @patch("pyzes.CDLL") + def test_GivenWindowsPlatformWhenLoadingLibraryThenLibraryIsLoaded( + self, mock_cdll, mock_exists, mock_print + ): + # Test Windows library loading path with deterministic mocking + mock_exists.return_value = True + mock_lib = MagicMock() + mock_cdll.return_value = mock_lib + + self.pyzes._LoadZeLibrary() + + # Verify Windows-specific print was called + mock_print.assert_any_call("Loading Windows library") + # Verify successful library loading + mock_cdll.assert_called() + + +if __name__ == "__main__": + unittest.main() diff --git a/bindings/sysman/python/test/unit_tests/test_memory.py b/bindings/sysman/python/test/unit_tests/test_memory.py new file mode 100644 index 00000000..f487be32 --- /dev/null +++ b/bindings/sysman/python/test/unit_tests/test_memory.py @@ -0,0 +1,152 @@ +## +# Copyright (C) 2025 Intel Corporation +# +# SPDX-License-Identifier: MIT +# +## + +import os +import sys +import unittest +from ctypes import * +from unittest.mock import MagicMock, patch + +# Add the source directory to Python path so we can import pyzes +script_dir = os.path.dirname(os.path.abspath(__file__)) +source_dir = os.path.join(script_dir, "..", "..", "source") +source_dir = os.path.abspath(source_dir) +if source_dir not in sys.path: + sys.path.insert(0, source_dir) + + +@patch("pyzes.getFunctionPointerList") +class TestMemoryFunctions(unittest.TestCase): + + def setUp(self): + import pyzes + + self.pyzes = pyzes + + def test_GivenValidDeviceHandleWhenCallingZesDeviceEnumMemoryModulesThenCallSucceedsWithValidCount( + self, mock_get_func + ): + mock_module_count = 3 + + def mock_enum_memory_modules(device_handle, count_ptr, handles_ptr): + count_ptr._obj.value = mock_module_count + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_enum_memory_modules) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + count = c_uint32(0) + + result = self.pyzes.zesDeviceEnumMemoryModules( + device_handle, byref(count), None + ) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(count.value, mock_module_count) + mock_get_func.assert_called_with("zesDeviceEnumMemoryModules") + mock_func.assert_called_once() + + def test_GivenValidMemoryHandleWhenCallingZesMemoryGetPropertiesThenCallSucceedsWithValidProperties( + self, mock_get_func + ): + mock_type = self.pyzes.ZES_MEM_TYPE_GDDR6 + mock_on_subdevice = False + mock_subdevice_id = 0 + mock_location = self.pyzes.ZES_MEM_LOC_DEVICE + mock_physical_size = 8589934592 # 8GB + mock_bus_width = 256 + mock_num_channels = 8 + + def mock_get_properties(memory_handle, properties_ptr): + properties_ptr._obj.type = mock_type + properties_ptr._obj.onSubdevice = mock_on_subdevice + properties_ptr._obj.subdeviceId = mock_subdevice_id + properties_ptr._obj.location = mock_location + properties_ptr._obj.physicalSize = mock_physical_size + properties_ptr._obj.busWidth = mock_bus_width + properties_ptr._obj.numChannels = mock_num_channels + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_properties) + mock_get_func.return_value = mock_func + + memory_handle = self.pyzes.zes_mem_handle_t() + mem_props = self.pyzes.zes_mem_properties_t() + result = self.pyzes.zesMemoryGetProperties(memory_handle, byref(mem_props)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(mem_props.type, mock_type) + self.assertEqual(mem_props.onSubdevice, mock_on_subdevice) + self.assertEqual(mem_props.subdeviceId, mock_subdevice_id) + self.assertEqual(mem_props.location, mock_location) + self.assertEqual(mem_props.physicalSize, mock_physical_size) + self.assertEqual(mem_props.busWidth, mock_bus_width) + self.assertEqual(mem_props.numChannels, mock_num_channels) + mock_get_func.assert_called_with("zesMemoryGetProperties") + mock_func.assert_called_once() + + def test_GivenValidMemoryHandleWhenCallingZesMemoryGetStateThenCallSucceedsWithState( + self, mock_get_func + ): + mock_health = self.pyzes.ZES_MEM_HEALTH_OK + mock_free = 4294967296 # 4GB + mock_size = 8589934592 # 8GB + + def mock_get_state(memory_handle, state_ptr): + state_ptr._obj.health = mock_health + state_ptr._obj.free = mock_free + state_ptr._obj.size = mock_size + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_state) + mock_get_func.return_value = mock_func + + memory_handle = self.pyzes.zes_mem_handle_t() + mem_state = self.pyzes.zes_mem_state_t() + result = self.pyzes.zesMemoryGetState(memory_handle, byref(mem_state)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(mem_state.health, mock_health) + self.assertEqual(mem_state.free, mock_free) + self.assertEqual(mem_state.size, mock_size) + mock_get_func.assert_called_with("zesMemoryGetState") + mock_func.assert_called_once() + + def test_GivenValidMemoryHandleWhenCallingZesMemoryGetBandwidthThenCallSucceedsWithBandwidth( + self, mock_get_func + ): + mock_read_counter = 1073741824 # 1GB + mock_write_counter = 536870912 # 512MB + mock_max_bandwidth = 429496729600 # 400GB/s + mock_timestamp = 123456789 + + def mock_get_bandwidth(memory_handle, bandwidth_ptr): + bandwidth_ptr._obj.readCounter = mock_read_counter + bandwidth_ptr._obj.writeCounter = mock_write_counter + bandwidth_ptr._obj.maxBandwidth = mock_max_bandwidth + bandwidth_ptr._obj.timestamp = mock_timestamp + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_bandwidth) + mock_get_func.return_value = mock_func + + memory_handle = self.pyzes.zes_mem_handle_t() + mem_bandwidth = self.pyzes.zes_mem_bandwidth_t() + result = self.pyzes.zesMemoryGetBandwidth(memory_handle, byref(mem_bandwidth)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(mem_bandwidth.readCounter, mock_read_counter) + self.assertEqual(mem_bandwidth.writeCounter, mock_write_counter) + self.assertEqual(mem_bandwidth.maxBandwidth, mock_max_bandwidth) + self.assertEqual(mem_bandwidth.timestamp, mock_timestamp) + mock_get_func.assert_called_with("zesMemoryGetBandwidth") + mock_func.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/bindings/sysman/python/test/unit_tests/test_power.py b/bindings/sysman/python/test/unit_tests/test_power.py new file mode 100644 index 00000000..d8fd7c9b --- /dev/null +++ b/bindings/sysman/python/test/unit_tests/test_power.py @@ -0,0 +1,79 @@ +## +# Copyright (C) 2025 Intel Corporation +# +# SPDX-License-Identifier: MIT +# +## + +import os +import sys +import unittest +from ctypes import * +from unittest.mock import MagicMock, patch + +# Add the source directory to Python path so we can import pyzes +script_dir = os.path.dirname(os.path.abspath(__file__)) +source_dir = os.path.join(script_dir, "..", "..", "source") +source_dir = os.path.abspath(source_dir) +if source_dir not in sys.path: + sys.path.insert(0, source_dir) + + +@patch("pyzes.getFunctionPointerList") +class TestPowerFunctions(unittest.TestCase): + + def setUp(self): + import pyzes + + self.pyzes = pyzes + + def test_GivenValidDeviceHandleWhenCallingZesDeviceEnumPowerDomainsThenCallSucceedsWithValidCount( + self, mock_get_func + ): + mock_count = 2 + + def mock_enum_domains(device_handle, count_ptr, handles_ptr): + count_ptr._obj.value = mock_count + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_enum_domains) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + count = c_uint32(0) + + result = self.pyzes.zesDeviceEnumPowerDomains(device_handle, byref(count), None) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(count.value, mock_count) + mock_get_func.assert_called_with("zesDeviceEnumPowerDomains") + mock_func.assert_called_once() + + def test_GivenValidPowerHandleWhenCallingZesPowerGetEnergyCounterThenCallSucceedsWithEnergyData( + self, mock_get_func + ): + mock_energy = 12345 + mock_timestamp = 67890 + + def mock_get_energy(power_handle, energy_ptr): + energy_ptr._obj.energy = mock_energy + energy_ptr._obj.timestamp = mock_timestamp + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_energy) + mock_get_func.return_value = mock_func + + power_handle = self.pyzes.zes_pwr_handle_t() + energy = self.pyzes.zes_power_energy_counter_t() + + result = self.pyzes.zesPowerGetEnergyCounter(power_handle, byref(energy)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(energy.energy, mock_energy) + self.assertEqual(energy.timestamp, mock_timestamp) + mock_get_func.assert_called_with("zesPowerGetEnergyCounter") + mock_func.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/bindings/sysman/python/test/unit_tests/test_temperature.py b/bindings/sysman/python/test/unit_tests/test_temperature.py new file mode 100644 index 00000000..0a4007dd --- /dev/null +++ b/bindings/sysman/python/test/unit_tests/test_temperature.py @@ -0,0 +1,149 @@ +## +# Copyright (C) 2025 Intel Corporation +# +# SPDX-License-Identifier: MIT +# +## + +import os +import sys +import unittest +from ctypes import * +from unittest.mock import MagicMock, patch + +# Add the source directory to Python path so we can import pyzes +script_dir = os.path.dirname(os.path.abspath(__file__)) +source_dir = os.path.join(script_dir, "..", "..", "source") +source_dir = os.path.abspath(source_dir) +if source_dir not in sys.path: + sys.path.insert(0, source_dir) + + +@patch("pyzes.getFunctionPointerList") +class TestTemperatureFunctions(unittest.TestCase): + """Test temperature-related function wrappers.""" + + def setUp(self): + """Set up test fixtures before each test method.""" + import pyzes + + self.pyzes = pyzes + + def test_GivenValidDeviceHandleWhenCallingZesDeviceEnumTemperatureSensorsThenCallSucceedsWithValidCount( + self, mock_get_func + ): + mock_sensor_count = 4 + + def mock_enum_temperature_sensors(device_handle, count_ptr, handles_ptr): + count_ptr._obj.value = mock_sensor_count + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_enum_temperature_sensors) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + count = c_uint32(0) + + result = self.pyzes.zesDeviceEnumTemperatureSensors( + device_handle, byref(count), None + ) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(count.value, mock_sensor_count) + mock_get_func.assert_called_with("zesDeviceEnumTemperatureSensors") + mock_func.assert_called_once() + + def test_GivenValidTemperatureHandleWhenCallingZesTemperatureGetPropertiesThenCallSucceedsWithValidProperties( + self, mock_get_func + ): + mock_sensor_type = self.pyzes.ZES_TEMP_SENSORS_GPU + mock_on_subdevice = False + mock_subdevice_id = 0 + mock_max_temperature = 85.0 + mock_is_critical_temp_supported = True + mock_is_threshold1_supported = False + mock_is_threshold2_supported = True + + def mock_get_properties(temp_handle, properties_ptr): + properties_ptr._obj.type = mock_sensor_type + properties_ptr._obj.onSubdevice = mock_on_subdevice + properties_ptr._obj.subdeviceId = mock_subdevice_id + properties_ptr._obj.maxTemperature = mock_max_temperature + properties_ptr._obj.isCriticalTempSupported = ( + mock_is_critical_temp_supported + ) + properties_ptr._obj.isThreshold1Supported = mock_is_threshold1_supported + properties_ptr._obj.isThreshold2Supported = mock_is_threshold2_supported + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_properties) + mock_get_func.return_value = mock_func + + temp_handle = self.pyzes.zes_temp_handle_t() + temp_props = self.pyzes.zes_temp_properties_t() + result = self.pyzes.zesTemperatureGetProperties(temp_handle, byref(temp_props)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(temp_props.type, mock_sensor_type) + self.assertEqual(temp_props.onSubdevice, mock_on_subdevice) + self.assertEqual(temp_props.subdeviceId, mock_subdevice_id) + self.assertEqual(temp_props.maxTemperature, mock_max_temperature) + self.assertEqual( + temp_props.isCriticalTempSupported, mock_is_critical_temp_supported + ) + self.assertEqual(temp_props.isThreshold1Supported, mock_is_threshold1_supported) + self.assertEqual(temp_props.isThreshold2Supported, mock_is_threshold2_supported) + mock_get_func.assert_called_with("zesTemperatureGetProperties") + mock_func.assert_called_once() + + def test_GivenValidTemperatureHandleWhenCallingZesTemperatureGetConfigThenCallSucceedsWithConfig( + self, mock_get_func + ): + mock_enable_critical = True + mock_threshold1 = 75.0 + mock_threshold2 = 80.0 + + def mock_get_config(temp_handle, config_ptr): + config_ptr._obj.enableCritical = mock_enable_critical + config_ptr._obj.threshold1 = mock_threshold1 + config_ptr._obj.threshold2 = mock_threshold2 + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_config) + mock_get_func.return_value = mock_func + + temp_handle = self.pyzes.zes_temp_handle_t() + temp_config = self.pyzes.zes_temp_config_t() + result = self.pyzes.zesTemperatureGetConfig(temp_handle, byref(temp_config)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(temp_config.enableCritical, mock_enable_critical) + self.assertEqual(temp_config.threshold1, mock_threshold1) + self.assertEqual(temp_config.threshold2, mock_threshold2) + mock_get_func.assert_called_with("zesTemperatureGetConfig") + mock_func.assert_called_once() + + def test_GivenValidTemperatureHandleWhenCallingZesTemperatureGetStateThenCallSucceedsWithState( + self, mock_get_func + ): + mock_temperature = 65.5 # Current temperature in Celsius + + def mock_get_state(temp_handle, temperature_ptr): + temperature_ptr._obj.value = mock_temperature + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_state) + mock_get_func.return_value = mock_func + + temp_handle = self.pyzes.zes_temp_handle_t() + temperature = c_double() + result = self.pyzes.zesTemperatureGetState(temp_handle, byref(temperature)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(temperature.value, mock_temperature) + mock_get_func.assert_called_with("zesTemperatureGetState") + mock_func.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/bindings/sysman/python/test/unit_tests/test_utils.py b/bindings/sysman/python/test/unit_tests/test_utils.py new file mode 100644 index 00000000..b306fa96 --- /dev/null +++ b/bindings/sysman/python/test/unit_tests/test_utils.py @@ -0,0 +1,66 @@ +## +# Copyright (C) 2025 Intel Corporation +# +# SPDX-License-Identifier: MIT +# +## + +import os +import sys +import unittest +from ctypes import c_int32 + +# Add the source directory to Python path so we can import pyzes +script_dir = os.path.dirname(os.path.abspath(__file__)) +source_dir = os.path.join(script_dir, "..", "..", "source") +source_dir = os.path.abspath(source_dir) +if source_dir not in sys.path: + sys.path.insert(0, source_dir) + + +class TestUtilsInfrastructure(unittest.TestCase): + """Test utility and infrastructure classes""" + + def setUp(self): + import pyzes + + self.pyzes = pyzes + + def test_GivenPrintableStructureWhenConvertedToStringThenProvidesFormattedOutput( + self, + ): + # Test _PrintableStructure string formatting functionality + temp_state = self.pyzes.zes_temp_config_t() + temp_state.enableCritical = True + temp_state.threshold1 = 75.0 + temp_state.threshold2 = 80.0 + + # Test string representation functionality + str_repr = str(temp_state) + self.assertIsInstance(str_repr, str) + self.assertIn("zes_temp_config_t", str_repr) + self.assertIn("enableCritical", str_repr) + self.assertIn("threshold1", str_repr) + + def test_GivenPrintableStructureWithDefaultFormattingWhenConvertedToStringThenUsesDefaultFormat( + self, + ): + # Test _PrintableStructure with format case + # Create a custom structure class with formatting + + class TestStructure(self.pyzes._PrintableStructure): + _fields_ = [("testField", c_int32)] + _fmt_ = { + "": "default_%d" + } # This will trigger default format usage + + test_struct = TestStructure() + test_struct.testField = 42 + + str_repr = str(test_struct) + self.assertIn("default_42", str_repr) # Tests format usage + self.assertIn("TestStructure", str_repr) + + +if __name__ == "__main__": + unittest.main()