From 4c7d5a29454c80ac198bd6e807a7435f00c44c4e Mon Sep 17 00:00:00 2001 From: Yash Date: Thu, 25 Jun 2026 15:00:43 +1000 Subject: [PATCH 1/5] [Key Vault] Add External Key Manager (EKM) support for Managed HSM (Preview) - Add 'az keyvault ekm-connection' (create/update/show/check/delete + certificate show) as preview. - Add --external-key-id on 'az keyvault key create' to create EKM-backed external keys (preview). - Use public SDKs: azure-keyvault-keys==4.12.0b2 (create_external_key + public ExternalKey) and azure-keyvault-administration==4.8.0b1. - Enforce external-key-id <=64 chars per public SDK contract. - Add --server-cn alias to satisfy option-length linter. --- .../keyvault/_client_factory.py | 20 ++ .../cli/command_modules/keyvault/_help.py | 40 ++++ .../cli/command_modules/keyvault/_params.py | 42 ++++ .../command_modules/keyvault/_transformers.py | 20 +- .../command_modules/keyvault/_validators.py | 143 +++++++++++- .../cli/command_modules/keyvault/commands.py | 20 +- .../cli/command_modules/keyvault/custom.py | 204 ++++++++++++++++-- .../tests/latest/test_keyvault_commands.py | 147 +++++++++++++ src/azure-cli/requirements.py3.Darwin.txt | 4 +- src/azure-cli/requirements.py3.Linux.txt | 4 +- src/azure-cli/requirements.py3.windows.txt | 4 +- src/azure-cli/setup.py | 4 +- 12 files changed, 624 insertions(+), 28 deletions(-) diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/_client_factory.py b/src/azure-cli/azure/cli/command_modules/keyvault/_client_factory.py index adec3e01d86..6d40452430a 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/_client_factory.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/_client_factory.py @@ -244,6 +244,26 @@ def data_plane_azure_keyvault_security_domain_client(cli_ctx, command_args): verify_challenge_resource=False, **client_kwargs) +def data_plane_azure_keyvault_ekm_client(cli_ctx, command_args): + from azure.keyvault.administration import KeyVaultEkmClient + + # Reuse the existing login + URL resolution behavior. + vault_url, credential, _ = _prepare_data_plane_azure_keyvault_client( + cli_ctx, command_args, ResourceType.DATA_KEYVAULT_ADMINISTRATION_SETTING) + + command_args.pop('hsm_name', None) + command_args.pop('vault_base_url', None) + command_args.pop('identifier', None) + + client_kwargs = prepare_client_kwargs_track2(cli_ctx) + client_kwargs.pop('http_logging_policy') + return KeyVaultEkmClient( + vault_url=vault_url, + credential=credential, + verify_challenge_resource=False, + **client_kwargs) + + def _prepare_data_plane_azure_keyvault_client(cli_ctx, command_args, resource_type): version = str(get_api_version(cli_ctx, resource_type)) profile = Profile(cli_ctx=cli_ctx) diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/_help.py b/src/azure-cli/azure/cli/command_modules/keyvault/_help.py index 525b4c5b159..a6a6df2d03e 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/_help.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/_help.py @@ -975,6 +975,46 @@ az keyvault wait-hsm --hsm-name MyHSM --created """ +helps['keyvault ekm-connection'] = """ +type: group +short-summary: Manage External Key Manager (EKM) connection for a Managed HSM. +""" + +helps['keyvault ekm-connection create'] = """ +type: command +short-summary: Create the EKM connection. +""" + +helps['keyvault ekm-connection update'] = """ +type: command +short-summary: Update the EKM connection. +""" + +helps['keyvault ekm-connection show'] = """ +type: command +short-summary: Show the EKM connection. +""" + +helps['keyvault ekm-connection check'] = """ +type: command +short-summary: Check connectivity and authentication with the EKM proxy. +""" + +helps['keyvault ekm-connection delete'] = """ +type: command +short-summary: Delete the EKM connection. +""" + +helps['keyvault ekm-connection certificate'] = """ +type: group +short-summary: Manage EKM proxy certificate information. +""" + +helps['keyvault ekm-connection certificate show'] = """ +type: command +short-summary: Show the EKM proxy client certificate. +""" + helps['keyvault security-domain'] = """ type: group short-summary: Manage security domain operations. diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/_params.py b/src/azure-cli/azure/cli/command_modules/keyvault/_params.py index 5718281f2b4..b34d2174f10 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/_params.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/_params.py @@ -360,6 +360,9 @@ class CLISecurityDomainOperation(str, Enum): help='The type of key to create. For valid values, see: https://learn.microsoft.com/rest/api/keyvault/keys/create-key/create-key#jsonwebkeytype') c.argument('curve', arg_type=get_enum_type(KeyCurveName), help='Elliptic curve name. For valid values, see: https://learn.microsoft.com/rest/api/keyvault/keys/create-key/create-key#jsonwebkeycurvename') + c.extra('external_key_id', options_list=['--external-key-id'], arg_group='External Key', + is_preview=True, + help='Create an external Managed HSM key backed by an External Key Manager (EKM) key id.') with self.argument_context('keyvault key import') as c: c.argument('kty', arg_type=get_enum_type(CLIKeyTypeForBYOKImport), validator=validate_key_import_type, @@ -616,6 +619,45 @@ class CLISecurityDomainOperation(str, Enum): help='Target operation that needs waiting.') # endregion + # region keyvault ekm-connection + for scope in ['create', 'update', 'show', 'check', 'delete']: + with self.argument_context('keyvault ekm-connection {}'.format(scope), arg_group='HSM Id') as c: + c.extra('hsm_name', hsm_url_type, required=False, + help='Name of the HSM. Can be omitted if --id is specified.') + c.extra('identifier', options_list=['--id'], validator=validate_vault_or_hsm, + help='Full URI of the HSM.') + c.ignore('vault_base_url') + + with self.argument_context('keyvault ekm-connection create', arg_group='EKM Connection') as c: + c.argument('host', options_list=['--host'], required=True, + help='EKM proxy host (FQDN or FQDN:port). If port is omitted, 443 is assumed.') + c.extra('path_prefix', options_list=['--path-prefix'], + help='Optional path prefix to append to EKM proxy requests. Must start with "/".') + c.extra('server_ca_certificates', options_list=['--server-ca-certificate'], nargs='+', type=file_type, + completer=FilesCompleter(), + help='Path(s) to server CA certificate(s) in PEM or DER format.') + c.extra('server_subject_common_name', options_list=['--server-subject-common-name', '--server-cn'], + help='Optional expected Common Name (CN) for the EKM proxy server certificate.') + + with self.argument_context('keyvault ekm-connection update', arg_group='EKM Connection') as c: + c.argument('host', options_list=['--host'], required=False, + help='EKM proxy host (FQDN or FQDN:port). If port is omitted, 443 is assumed.') + c.extra('path_prefix', options_list=['--path-prefix'], + help='Optional path prefix to append to EKM proxy requests. Must start with "/".') + c.extra('server_ca_certificates', options_list=['--server-ca-certificate'], nargs='+', type=file_type, + completer=FilesCompleter(), + help='Path(s) to server CA certificate(s) in PEM or DER format.') + c.extra('server_subject_common_name', options_list=['--server-subject-common-name', '--server-cn'], + help='Optional expected Common Name (CN) for the EKM proxy server certificate.') + + with self.argument_context('keyvault ekm-connection certificate show', arg_group='HSM Id') as c: + c.extra('hsm_name', hsm_url_type, required=False, + help='Name of the HSM. Can be omitted if --id is specified.') + c.extra('identifier', options_list=['--id'], validator=validate_vault_or_hsm, + help='Full URI of the HSM.') + c.ignore('vault_base_url') + # endregion + # region keyvault backup/restore for item in ['backup', 'restore']: for scope in ['start']: # TODO add 'status' when SDK is ready diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/_transformers.py b/src/azure-cli/azure/cli/command_modules/keyvault/_transformers.py index 832b2f1956a..14604cb8804 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/_transformers.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/_transformers.py @@ -60,9 +60,9 @@ def transform_key_encryption_output(result, **command_args): # pylint: disable= 'kid': result.key_id, 'result': base64.b64encode(result.ciphertext).decode('utf-8'), 'algorithm': result.algorithm, - 'iv': binascii.hexlify(result.iv) if result.iv else None, - 'tag': binascii.hexlify(result.tag) if result.tag else None, - 'aad': binascii.hexlify(result.aad) if result.aad else None + 'iv': binascii.hexlify(result.iv).decode('ascii') if result.iv else None, + 'tag': binascii.hexlify(result.tag).decode('ascii') if result.tag else None, + 'aad': binascii.hexlify(result.aad).decode('ascii') if result.aad else None } return output @@ -105,6 +105,13 @@ def transform_key_list_output(result, **command_args): # pylint: disable=unused k['managed'] = key.managed k['tags'] = key.tags k['releasePolicy'] = key.release_policy + + # External key (EKM) is a preview property and may not exist on all SDK versions. + external_key = getattr(key, 'external_key', None) + external_key_id = getattr(external_key, 'id', None) if external_key else None + if external_key_id: + k['externalKeyId'] = external_key_id + output.append(k) return output @@ -141,6 +148,13 @@ def transform_key_output(result, **command_args): 'tags': result.properties.tags, 'releasePolicy': result.properties.release_policy } + + # External key (EKM) is a preview property and may not exist on all SDK versions. + external_key = getattr(result.properties, 'external_key', None) + external_key_id = getattr(external_key, 'id', None) if external_key else None + if external_key_id: + output['externalKeyId'] = external_key_id + if isinstance(result, DeletedKey): output['deletedDate'] = result.deleted_date output['scheduledPurgeDate'] = result.scheduled_purge_date diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/_validators.py b/src/azure-cli/azure/cli/command_modules/keyvault/_validators.py index c77a43bb8b7..5850e461064 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/_validators.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/_validators.py @@ -732,10 +732,151 @@ def validate_key_create(cmd, ns): validate_tags(ns) set_vault_base_url(ns) validate_keyvault_resource_id('key')(ns) - validate_key_type(ns) + validate_external_key_id(ns) + + # External keys are backed by EKM and the service rejects client-specified key type/size/curve. + # Avoid the defaulting behavior in validate_key_type (RSA) when --external-key-id is present. + if getattr(ns, 'external_key_id', None): + setattr(ns, 'kty', None) + setattr(ns, 'key_size', None) + setattr(ns, 'curve', None) + setattr(ns, 'protection', None) + else: + validate_key_type(ns) + process_key_release_policy(cmd, ns) +def validate_external_key_id(ns): + external_key_id = getattr(ns, 'external_key_id', None) + if not external_key_id: + return + if len(external_key_id) > 64: + raise CLIError('--external-key-id must be at most 64 characters.') + if not re.match(r'^[0-9A-Za-z-]+$', external_key_id): + raise CLIError('--external-key-id may contain only letters, digits, and hyphens.') + + +def _validate_ekm_path_prefix(path_prefix=None): + if path_prefix is None: + return + if not path_prefix.startswith('/'): + raise CLIError('--path-prefix must start with "/".') + if path_prefix.endswith('/'): + raise CLIError('--path-prefix must not end with "/".') + if len(path_prefix) > 64: + raise CLIError('--path-prefix must be at most 64 characters.') + if not re.match(r'^[A-Za-z0-9/-]+$', path_prefix): + raise CLIError('--path-prefix may contain only letters, digits, "/" and "-".') + + +def _normalize_ekm_host(host: str): + host = (host or '').strip() + if not host: + raise CLIError('--host cannot be empty.') + if '://' in host: + raise CLIError('--host must not include a URL scheme (use FQDN or FQDN:port).') + if '/' in host: + raise CLIError('--host must not include a path (use FQDN or FQDN:port).') + + if ':' not in host: + return f'{host}:443' + + # Avoid ambiguous parsing for IPv6 literals. + if host.count(':') != 1: + raise CLIError('--host must be in the form FQDN or FQDN:port.') + + hostname, port_str = host.split(':', 1) + if not hostname: + raise CLIError('--host must be in the form FQDN or FQDN:port.') + try: + port = int(port_str) + except ValueError as ex: + raise CLIError('--host port must be an integer.') from ex + if port < 1 or port > 65535: + raise CLIError('--host port must be between 1 and 65535.') + return f'{hostname}:{port}' + + +def _flatten_list(value): + if value is None: + return None + if isinstance(value, list) and value and isinstance(value[0], list): + flattened = [] + for item in value: + flattened.extend(item) + return flattened + return value + + +def _load_certificates_as_der_bytes(cert_paths): + import os + import ssl + + cert_paths = _flatten_list(cert_paths) + if not cert_paths: + return [] + + der_certs = [] + for cert_path in cert_paths: + if not cert_path: + continue + expanded = os.path.expanduser(cert_path) + with open(expanded, 'rb') as f: + raw = f.read() + + # PEM may contain multiple cert blocks. + if b'-----BEGIN CERTIFICATE-----' in raw: + text = raw.decode('utf-8', errors='ignore') + begin = '-----BEGIN CERTIFICATE-----' + end = '-----END CERTIFICATE-----' + start = 0 + found_any = False + while True: + b_idx = text.find(begin, start) + if b_idx == -1: + break + e_idx = text.find(end, b_idx) + if e_idx == -1: + raise CLIError(f'Invalid PEM certificate in {cert_path}.') + block = text[b_idx:e_idx + len(end)] + der_certs.append(ssl.PEM_cert_to_DER_cert(block)) + found_any = True + start = e_idx + len(end) + if not found_any: + raise CLIError(f'Invalid PEM certificate in {cert_path}.') + else: + # Assume DER. + der_certs.append(raw) + + return der_certs + + +def validate_ekm_connection_base(cmd, ns): # pylint: disable=unused-argument + set_vault_base_url(ns) + if not getattr(ns, 'hsm_name', None) and not getattr(ns, 'identifier', None): + raise CLIError('Please specify --hsm-name or --id.') + + +def validate_ekm_connection_create(cmd, ns): + validate_ekm_connection_base(cmd, ns) + ns.host = _normalize_ekm_host(ns.host) + _validate_ekm_path_prefix(getattr(ns, 'path_prefix', None)) + server_ca_certificates = _load_certificates_as_der_bytes(getattr(ns, 'server_ca_certificates', None)) + if not server_ca_certificates: + raise CLIError('Please specify at least one --server-ca-certificate for EKM connection creation.') + ns.server_ca_certificates = server_ca_certificates + + +def validate_ekm_connection_update(cmd, ns): + validate_ekm_connection_base(cmd, ns) + if getattr(ns, 'host', None): + ns.host = _normalize_ekm_host(ns.host) + _validate_ekm_path_prefix(getattr(ns, 'path_prefix', None)) + if getattr(ns, 'server_ca_certificates', None): + ns.server_ca_certificates = _load_certificates_as_der_bytes(ns.server_ca_certificates) + + # pylint: disable=line-too-long, too-many-locals def process_certificate_policy(cmd, ns): policy = getattr(ns, 'policy', None) diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/commands.py b/src/azure-cli/azure/cli/command_modules/keyvault/commands.py index c70ae0b600c..5db91e6d7c5 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/commands.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/commands.py @@ -9,7 +9,7 @@ from azure.cli.core.profiles import ResourceType from azure.cli.command_modules.keyvault._client_factory import ( - get_client, get_client_factory, Clients) + get_client, get_client_factory, Clients, data_plane_azure_keyvault_ekm_client) from azure.cli.command_modules.keyvault._transformers import ( filter_out_managed_resources, @@ -26,7 +26,8 @@ from azure.cli.command_modules.keyvault._validators import ( process_secret_set_namespace, validate_key_create, - validate_private_endpoint_connection_id, validate_role_assignment_args) + validate_private_endpoint_connection_id, validate_role_assignment_args, + validate_ekm_connection_base, validate_ekm_connection_create, validate_ekm_connection_update) def transform_assignment_list(result): @@ -65,6 +66,11 @@ def load_command_table(self, _): operations_tmpl='azure.cli.command_modules.keyvault.custom#{}', client_factory=get_client_factory(ResourceType.MGMT_KEYVAULT, Clients.managed_hsms) ) + + data_ekm_custom = CliCommandType( + operations_tmpl='azure.cli.command_modules.keyvault.custom#{}', + client_factory=data_plane_azure_keyvault_ekm_client + ) # endregion # Management Plane Commands @@ -137,6 +143,16 @@ def load_command_table(self, _): g.keyvault_custom('download', 'security_domain_download', supports_no_wait=True) g.keyvault_custom('wait', '_wait_security_domain_operation') + with self.command_group('keyvault ekm-connection', command_type=data_ekm_custom, is_preview=True) as g: + g.keyvault_custom('create', 'create_ekm_connection', validator=validate_ekm_connection_create) + g.keyvault_custom('update', 'update_ekm_connection', validator=validate_ekm_connection_update) + g.keyvault_custom('show', 'get_ekm_connection', validator=validate_ekm_connection_base) + g.keyvault_custom('check', 'check_ekm_connection', validator=validate_ekm_connection_base) + g.keyvault_custom('delete', 'delete_ekm_connection', validator=validate_ekm_connection_base) + + with self.command_group('keyvault ekm-connection certificate', command_type=data_ekm_custom, is_preview=True) as g: + g.keyvault_custom('show', 'get_ekm_certificate', validator=validate_ekm_connection_base) + with self.command_group('keyvault key', data_key_entity.command_type) as g: g.keyvault_custom('create', 'create_key', transform=transform_key_output, validator=validate_key_create) g.keyvault_command('set-attributes', 'update_key_properties', transform=transform_key_output) diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/custom.py b/src/azure-cli/azure/cli/command_modules/keyvault/custom.py index fb345d93677..193ceb8b927 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/custom.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/custom.py @@ -32,7 +32,7 @@ from cryptography.hazmat.primitives.asymmetric import rsa, ec from cryptography.hazmat.primitives.serialization import load_pem_private_key, Encoding, PublicFormat from cryptography.exceptions import UnsupportedAlgorithm -from cryptography.x509 import load_pem_x509_certificate +from cryptography.x509 import load_pem_x509_certificate, load_der_x509_certificate from knack.log import get_logger from knack.util import CLIError, todict @@ -1089,19 +1089,195 @@ def delete_policy(cmd, client, resource_group_name, vault_name, # region KeyVault Key def create_key(client, name=None, protection=None, # pylint: disable=unused-argument key_size=None, key_ops=None, disabled=False, expires=None, - not_before=None, tags=None, kty=None, curve=None, exportable=None, release_policy=None): - - return client.create_key(name=name, - key_type=kty, - size=key_size, - key_operations=key_ops, - enabled=not disabled, - not_before=not_before, - expires_on=expires, - tags=tags, - curve=curve, - exportable=exportable, - release_policy=release_policy) + not_before=None, tags=None, kty=None, curve=None, exportable=None, release_policy=None, + external_key_id=None): + + # External keys are backed by an External Key Manager (EKM). They use a dedicated SDK method, + # and the service rejects client-specified key type/size/curve/operations for them. + if external_key_id: + from azure.keyvault.keys import ExternalKey + return client.create_external_key( + name=name, + external_key=ExternalKey(id=external_key_id), + enabled=not disabled, + not_before=not_before, + expires_on=expires, + tags=tags, + release_policy=release_policy) + + return client.create_key( + name=name, + key_type=kty, + size=key_size, + curve=curve, + key_operations=key_ops, + enabled=not disabled, + not_before=not_before, + expires_on=expires, + tags=tags, + exportable=exportable, + release_policy=release_policy) + + +# region KeyVault EKM Connection +def get_ekm_connection(client): + return client.get_ekm_connection() + + +def get_ekm_certificate(client): + certificate = client.get_ekm_certificate() + + # Latest preview SDK mirrors the connection payload (subject_common_name + ca_certificates list). + subject_common_name = getattr(certificate, 'subject_common_name', None) + ca_certificates = getattr(certificate, 'ca_certificates', None) + if isinstance(ca_certificates, (list, tuple)) and ca_certificates: + encoded_certs = [] + for cert_bytes in ca_certificates: + if isinstance(cert_bytes, (bytes, bytearray, memoryview)): + encoded_certs.append(base64.b64encode(bytes(cert_bytes)).decode('ascii')) + if encoded_certs or subject_common_name: + return { + 'subjectCommonName': subject_common_name, + 'caCertificates': encoded_certs + } + + def _extract_der_bytes(obj): + if isinstance(obj, (bytes, bytearray, memoryview)): + return bytes(obj) + + # Known/likely shapes across SDK iterations. + for attr in ('cer', 'certificate', 'cert', 'der', 'value', 'data'): + if hasattr(obj, attr): + value = getattr(obj, attr) + if isinstance(value, (bytes, bytearray, memoryview)): + return bytes(value) + + if isinstance(obj, dict): + for key in ('cer', 'certificate', 'cert', 'der', 'value', 'data'): + value = obj.get(key) + if isinstance(value, (bytes, bytearray, memoryview)): + return bytes(value) + + return None + + def _find_bytes_anywhere(obj, *, _seen=None, _depth=0, _max_depth=4): # pylint: disable=too-many-return-statements + if obj is None: + return None + if isinstance(obj, (bytes, bytearray, memoryview)): + return bytes(obj) + if _depth >= _max_depth: + return None + + if _seen is None: + _seen = set() + obj_id = id(obj) + if obj_id in _seen: + return None + _seen.add(obj_id) + + if isinstance(obj, dict): + for v in obj.values(): + found = _find_bytes_anywhere(v, _seen=_seen, _depth=_depth + 1, _max_depth=_max_depth) + if found is not None: + return found + return None + + if isinstance(obj, (list, tuple, set)): + for v in obj: + found = _find_bytes_anywhere(v, _seen=_seen, _depth=_depth + 1, _max_depth=_max_depth) + if found is not None: + return found + return None + + # SDK model objects often keep fields in __dict__. + if hasattr(obj, '__dict__') and isinstance(obj.__dict__, dict): + for v in obj.__dict__.values(): + found = _find_bytes_anywhere(v, _seen=_seen, _depth=_depth + 1, _max_depth=_max_depth) + if found is not None: + return found + + # Last resort: materialize to dict and scan. + try: + as_dict = todict(obj) + except Exception: # pylint: disable=broad-except + return None + return _find_bytes_anywhere(as_dict, _seen=_seen, _depth=_depth + 1, _max_depth=_max_depth) + + def _json_safe(obj): + if isinstance(obj, (bytes, bytearray, memoryview)): + return base64.b64encode(bytes(obj)).decode('ascii') + if isinstance(obj, dict): + return {k: _json_safe(v) for k, v in obj.items()} + if isinstance(obj, (list, tuple, set)): + return [_json_safe(v) for v in obj] + + # Try to materialize SDK models into primitives. + try: + as_dict = todict(obj) + except Exception: # pylint: disable=broad-except + return obj + return _json_safe(as_dict) + + der_bytes = _extract_der_bytes(certificate) + if der_bytes is None: + der_bytes = _find_bytes_anywhere(certificate) + + if der_bytes is None: + # Ensure we never return raw bytes anywhere in the payload. + return _json_safe(certificate) + + pem = None + # Try to decode as PEM first (some services return PEM bytes). + try: + if der_bytes.lstrip().startswith(b'-----BEGIN CERTIFICATE-----'): + pem = der_bytes.decode('ascii', errors='strict') + der_bytes = load_pem_x509_certificate(der_bytes).public_bytes(Encoding.DER) + else: + pem = load_der_x509_certificate(der_bytes).public_bytes(Encoding.PEM).decode('ascii') + except Exception: # pylint: disable=broad-except + # Bytes found, but not a certificate. Fall back to JSON-safe representation. + return _json_safe(certificate) + + return { + 'format': 'der', + 'cer': base64.b64encode(der_bytes).decode('ascii'), + 'pem': pem + } + + +def check_ekm_connection(client): + return client.check_ekm_connection() + + +def delete_ekm_connection(client): + return client.delete_ekm_connection() + + +def create_ekm_connection(client, host, path_prefix=None, server_ca_certificates=None, server_subject_common_name=None): + from azure.keyvault.administration import KeyVaultEkmConnection + + ekm_connection = KeyVaultEkmConnection( + host=host, + path_prefix=path_prefix, + server_ca_certificates=server_ca_certificates, + server_subject_common_name=server_subject_common_name + ) + return client.create_ekm_connection(ekm_connection) + + +def update_ekm_connection(client, host=None, path_prefix=None, server_ca_certificates=None, + server_subject_common_name=None): + existing = client.get_ekm_connection() + if host is not None: + existing.host = host + if path_prefix is not None: + existing.path_prefix = path_prefix + if server_ca_certificates is not None: + existing.server_ca_certificates = server_ca_certificates + if server_subject_common_name is not None: + existing.server_subject_common_name = server_subject_common_name + return client.update_ekm_connection(existing) +# endregion def list_keys(client, maxresults=None, include_managed=False): diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py b/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py index 3b47dd5322d..f61d66411b3 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py @@ -5,6 +5,7 @@ import json import os +import argparse import pytest import tempfile import time @@ -154,6 +155,152 @@ def fake_get_models(name, **kwargs): "to satisfy Azure Policy checks") +class KeyVaultEkmValidatorUnitTest(unittest.TestCase): + def test_validate_external_key_id_valid(self): + from azure.cli.command_modules.keyvault._validators import validate_external_key_id + + ns = argparse.Namespace(external_key_id='test-aes-key') + validate_external_key_id(ns) + + def test_validate_external_key_id_invalid_chars(self): + from azure.cli.command_modules.keyvault._validators import validate_external_key_id + + ns = argparse.Namespace(external_key_id='bad_id') + with self.assertRaises(CLIError): + validate_external_key_id(ns) + + def test_validate_external_key_id_too_long(self): + from azure.cli.command_modules.keyvault._validators import validate_external_key_id + + ns = argparse.Namespace(external_key_id='a' * 65) + with self.assertRaises(CLIError): + validate_external_key_id(ns) + + def test_validate_external_key_id_max_length(self): + from azure.cli.command_modules.keyvault._validators import validate_external_key_id + + ns = argparse.Namespace(external_key_id='a' * 64) + validate_external_key_id(ns) + + def test_validate_ekm_path_prefix_rules(self): + from azure.cli.command_modules.keyvault._validators import _validate_ekm_path_prefix + + _validate_ekm_path_prefix('/api/v1') + with self.assertRaises(CLIError): + _validate_ekm_path_prefix('api/v1') + with self.assertRaises(CLIError): + _validate_ekm_path_prefix('/api/v1/') + + def test_normalize_ekm_host_rules(self): + from azure.cli.command_modules.keyvault._validators import _normalize_ekm_host + + self.assertEqual(_normalize_ekm_host('example.com'), 'example.com:443') + self.assertEqual(_normalize_ekm_host('example.com:443'), 'example.com:443') + with self.assertRaises(CLIError): + _normalize_ekm_host('https://example.com') + with self.assertRaises(CLIError): + _normalize_ekm_host('example.com/path') + with self.assertRaises(CLIError): + _normalize_ekm_host('example.com:abc') + + def test_load_certificates_as_der_bytes_from_pem(self): + from azure.cli.command_modules.keyvault._validators import _load_certificates_as_der_bytes + + pem_path = os.path.join(TEST_DIR, 'certs', 'cert_0.cer') + certs = _load_certificates_as_der_bytes([pem_path]) + self.assertTrue(certs) + self.assertIsInstance(certs[0], (bytes, bytearray)) + + +class KeyVaultEkmCertificateSerializationUnitTest(unittest.TestCase): + def test_get_ekm_certificate_serializes_der_bytes(self): + from azure.cli.command_modules.keyvault._validators import _load_certificates_as_der_bytes + from azure.cli.command_modules.keyvault.custom import get_ekm_certificate + + pem_path = os.path.join(TEST_DIR, 'certs', 'cert_0.cer') + der_bytes = _load_certificates_as_der_bytes([pem_path])[0] + + class DummyClient: + def get_ekm_certificate(self): + return der_bytes + + result = get_ekm_certificate(DummyClient()) + self.assertIsInstance(result, dict) + self.assertEqual(result.get('format'), 'der') + self.assertIsInstance(result.get('cer'), str) + # PEM is best-effort; if present, it should be a string with header. + if result.get('pem') is not None: + self.assertIsInstance(result.get('pem'), str) + self.assertIn('BEGIN CERTIFICATE', result.get('pem')) + + def test_get_ekm_certificate_serializes_model_value_bytes(self): + from azure.cli.command_modules.keyvault._validators import _load_certificates_as_der_bytes + from azure.cli.command_modules.keyvault.custom import get_ekm_certificate + + pem_path = os.path.join(TEST_DIR, 'certs', 'cert_0.cer') + der_bytes = _load_certificates_as_der_bytes([pem_path])[0] + + class CertModel: + def __init__(self, value): + self.value = value + + class DummyClient: + def get_ekm_certificate(self): + return CertModel(der_bytes) + + result = get_ekm_certificate(DummyClient()) + self.assertIsInstance(result, dict) + self.assertEqual(result.get('format'), 'der') + self.assertIsInstance(result.get('cer'), str) + + def test_get_ekm_certificate_handles_subject_cn_and_ca_list(self): + from azure.cli.command_modules.keyvault.custom import get_ekm_certificate + + class SdkModel: + def __init__(self, subject_common_name, ca_certificates): + self.subject_common_name = subject_common_name + self.ca_certificates = ca_certificates + + class DummyClient: + def get_ekm_certificate(self): + return SdkModel('*.managedhsm-int.azure-int.net', [b'\x01\x02']) + + result = get_ekm_certificate(DummyClient()) + self.assertEqual(result.get('subjectCommonName'), '*.managedhsm-int.azure-int.net') + self.assertEqual(result.get('caCertificates'), ['AQI=']) + + def test_get_ekm_certificate_fallback_json_safe_dict(self): + from azure.cli.command_modules.keyvault.custom import get_ekm_certificate + + class DummyClient: + def get_ekm_certificate(self): + return {'someField': b'\x01\x02\x03'} + + result = get_ekm_certificate(DummyClient()) + self.assertIsInstance(result, dict) + self.assertIsInstance(result.get('someField'), str) + + def test_get_ekm_certificate_finds_bytes_in_unknown_field(self): + from azure.cli.command_modules.keyvault._validators import _load_certificates_as_der_bytes + from azure.cli.command_modules.keyvault.custom import get_ekm_certificate + + pem_path = os.path.join(TEST_DIR, 'certs', 'cert_0.cer') + der_bytes = _load_certificates_as_der_bytes([pem_path])[0] + + class WeirdModel: + def __init__(self): + self.notCer = der_bytes + + class DummyClient: + def get_ekm_certificate(self): + return WeirdModel() + + result = get_ekm_certificate(DummyClient()) + self.assertIsInstance(result, dict) + self.assertEqual(result.get('format'), 'der') + self.assertIsInstance(result.get('cer'), str) + + class KeyVaultPrivateLinkResourceScenarioTest(ScenarioTest): @ResourceGroupPreparer(name_prefix='cli_test_keyvault_plr') @KeyVaultPreparer(name_prefix='cli-test-kv-plr-', location='eastus2') diff --git a/src/azure-cli/requirements.py3.Darwin.txt b/src/azure-cli/requirements.py3.Darwin.txt index c6c99a7cb3f..f4496e0e1a3 100644 --- a/src/azure-cli/requirements.py3.Darwin.txt +++ b/src/azure-cli/requirements.py3.Darwin.txt @@ -12,9 +12,9 @@ azure-core==1.39.0 azure-cosmos==3.2.0 azure-data-tables==12.4.0 azure-datalake-store==1.0.1 -azure-keyvault-administration==4.4.0 +azure-keyvault-administration==4.8.0b1 azure-keyvault-certificates==4.7.0 -azure-keyvault-keys==4.12.0b1 +azure-keyvault-keys==4.12.0b2 azure-keyvault-secrets==4.7.0 azure-keyvault-securitydomain==1.0.0b1 azure-mgmt-advisor==9.0.0 diff --git a/src/azure-cli/requirements.py3.Linux.txt b/src/azure-cli/requirements.py3.Linux.txt index f845fbd167a..1cccaa6d448 100644 --- a/src/azure-cli/requirements.py3.Linux.txt +++ b/src/azure-cli/requirements.py3.Linux.txt @@ -12,9 +12,9 @@ azure-core==1.39.0 azure-cosmos==3.2.0 azure-data-tables==12.4.0 azure-datalake-store==1.0.1 -azure-keyvault-administration==4.4.0 +azure-keyvault-administration==4.8.0b1 azure-keyvault-certificates==4.7.0 -azure-keyvault-keys==4.12.0b1 +azure-keyvault-keys==4.12.0b2 azure-keyvault-secrets==4.7.0 azure-keyvault-securitydomain==1.0.0b1 azure-mgmt-advisor==9.0.0 diff --git a/src/azure-cli/requirements.py3.windows.txt b/src/azure-cli/requirements.py3.windows.txt index 12eef81d927..e1f385aa3b5 100644 --- a/src/azure-cli/requirements.py3.windows.txt +++ b/src/azure-cli/requirements.py3.windows.txt @@ -12,9 +12,9 @@ azure-core==1.39.0 azure-cosmos==3.2.0 azure-data-tables==12.4.0 azure-datalake-store==1.0.1 -azure-keyvault-administration==4.4.0 +azure-keyvault-administration==4.8.0b1 azure-keyvault-certificates==4.7.0 -azure-keyvault-keys==4.12.0b1 +azure-keyvault-keys==4.12.0b2 azure-keyvault-secrets==4.7.0 azure-keyvault-securitydomain==1.0.0b1 azure-mgmt-advisor==9.0.0 diff --git a/src/azure-cli/setup.py b/src/azure-cli/setup.py index b515234d771..a4b5e5149e6 100644 --- a/src/azure-cli/setup.py +++ b/src/azure-cli/setup.py @@ -59,9 +59,9 @@ 'azure-cosmos~=3.0,>=3.0.2', 'azure-data-tables==12.4.0', 'azure-datalake-store~=1.0.1', - 'azure-keyvault-administration==4.4.0', + 'azure-keyvault-administration==4.8.0b1', 'azure-keyvault-certificates==4.7.0', - 'azure-keyvault-keys==4.12.0b1', + 'azure-keyvault-keys==4.12.0b2', 'azure-keyvault-secrets==4.7.0', 'azure-keyvault-securitydomain==1.0.0b1', 'azure-mgmt-advisor==9.0.0', From 014f06b3c6f494ed25f83b990fa5110bce43a76d Mon Sep 17 00:00:00 2001 From: Yash Date: Thu, 25 Jun 2026 19:55:15 +1000 Subject: [PATCH 2/5] [Key Vault] EKM: address review feedback (fail-fast on incompatible key args, wrap cert IO errors) - key create: when --external-key-id is set, fail fast with a clear error if key-shape args (--kty/--size/--curve/--ops/--protection/--exportable) are provided, instead of silently ignoring them. - Remove arg-level validate_key_type validator from --kty so an explicitly provided value is detectable (no more default-then-undo). - _load_certificates_as_der_bytes: wrap file IO in CLIError for missing/unreadable certificate files. --- .../cli/command_modules/keyvault/_params.py | 4 +-- .../command_modules/keyvault/_validators.py | 28 +++++++++++++------ 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/_params.py b/src/azure-cli/azure/cli/command_modules/keyvault/_params.py index b34d2174f10..3fbb9aae405 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/_params.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/_params.py @@ -21,7 +21,7 @@ from azure.cli.command_modules.keyvault._validators import ( datetime_type, certificate_type, validate_retention_days_on_creation, get_vault_base_url_type, get_hsm_base_url_type, validate_key_import_type, - validate_key_import_source, validate_key_type, validate_policy_permissions, validate_principal, + validate_key_import_source, validate_policy_permissions, validate_principal, validate_resource_group_name, validate_x509_certificate_chain, secret_text_encoding_values, secret_binary_encoding_values, validate_subnet, validate_ip_address, validate_vault_or_hsm, @@ -356,7 +356,7 @@ class CLISecurityDomainOperation(str, Enum): 'Release policies are mutable by default.') with self.argument_context('keyvault key create') as c: - c.argument('kty', arg_type=get_enum_type(JsonWebKeyType), validator=validate_key_type, + c.argument('kty', arg_type=get_enum_type(JsonWebKeyType), help='The type of key to create. For valid values, see: https://learn.microsoft.com/rest/api/keyvault/keys/create-key/create-key#jsonwebkeytype') c.argument('curve', arg_type=get_enum_type(KeyCurveName), help='Elliptic curve name. For valid values, see: https://learn.microsoft.com/rest/api/keyvault/keys/create-key/create-key#jsonwebkeycurvename') diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/_validators.py b/src/azure-cli/azure/cli/command_modules/keyvault/_validators.py index 5850e461064..668b83ae9a0 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/_validators.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/_validators.py @@ -734,13 +734,22 @@ def validate_key_create(cmd, ns): validate_keyvault_resource_id('key')(ns) validate_external_key_id(ns) - # External keys are backed by EKM and the service rejects client-specified key type/size/curve. - # Avoid the defaulting behavior in validate_key_type (RSA) when --external-key-id is present. if getattr(ns, 'external_key_id', None): - setattr(ns, 'kty', None) - setattr(ns, 'key_size', None) - setattr(ns, 'curve', None) - setattr(ns, 'protection', None) + # External keys are backed by an External Key Manager (EKM); the service controls the + # key material, so client-specified key-shape arguments are not supported. Fail fast with + # a clear error instead of silently ignoring them. + incompatible = [opt for opt, val in ( + ('--kty', getattr(ns, 'kty', None)), + ('--size', getattr(ns, 'key_size', None)), + ('--curve', getattr(ns, 'curve', None)), + ('--ops', getattr(ns, 'key_ops', None)), + ('--protection', getattr(ns, 'protection', None)), + ('--exportable', getattr(ns, 'exportable', None)), + ) if val is not None] + if incompatible: + raise CLIError( + '{} cannot be used with --external-key-id. External keys are backed by an External ' + 'Key Manager and the service controls the key material.'.format(', '.join(incompatible))) else: validate_key_type(ns) @@ -822,8 +831,11 @@ def _load_certificates_as_der_bytes(cert_paths): if not cert_path: continue expanded = os.path.expanduser(cert_path) - with open(expanded, 'rb') as f: - raw = f.read() + try: + with open(expanded, 'rb') as f: + raw = f.read() + except OSError as ex: + raise CLIError("Unable to load certificate file '{}': {}.".format(cert_path, ex.strerror)) from ex # PEM may contain multiple cert blocks. if b'-----BEGIN CERTIFICATE-----' in raw: From a5a504c040adc372191a91768cdc6999b3d84431 Mon Sep 17 00:00:00 2001 From: Yash Date: Thu, 25 Jun 2026 20:23:29 +1000 Subject: [PATCH 3/5] [Key Vault] EKM: add live scenario test for connection + external key lifecycle Live-only, env-var-gated scenario test (KeyVaultEkmScenarioTest) covering ekm-connection create/show/check/certificate-show, external key create/show/list-versions/delete, the fail-fast guard, and a normal-key regression. Skips unless AZURE_CLI_TEST_EKM_* env vars are set, so it is safe in CI playback and ready to run/record against a real MHSM+EKM proxy. --- .../tests/latest/test_keyvault_commands.py | 64 ++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py b/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py index f61d66411b3..21edcbfc93a 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py @@ -18,7 +18,7 @@ from azure.cli.testsdk.decorators import serial_test from azure.cli.testsdk.scenario_tests import AllowLargeResponse, record_only from azure.cli.testsdk.scenario_tests import RecordingProcessor -from azure.cli.testsdk import ResourceGroupPreparer, StorageAccountPreparer, KeyVaultPreparer, ManagedHSMPreparer, ScenarioTest +from azure.cli.testsdk import ResourceGroupPreparer, StorageAccountPreparer, KeyVaultPreparer, ManagedHSMPreparer, ScenarioTest, live_only from azure.core.exceptions import HttpResponseError from knack.util import CLIError @@ -301,6 +301,68 @@ def get_ekm_certificate(self): self.assertIsInstance(result.get('cer'), str) +@live_only() +class KeyVaultEkmScenarioTest(ScenarioTest): + """Live EKM scenario test. + + EKM (External Key Manager) requires a Managed HSM that is already wired to an external + key-manager proxy. That infrastructure cannot be provisioned by a preparer or captured in a + recording, so this test is live-only and reads the target from environment variables. It is + skipped unless all of them are set: + + AZURE_CLI_TEST_EKM_MHSM_URL - Managed HSM URL (https://.managedhsm.azure.net) + AZURE_CLI_TEST_EKM_HOST - EKM proxy host (FQDN or FQDN:port) + AZURE_CLI_TEST_EKM_CA_CERT - path to the EKM proxy server CA certificate (PEM/DER) + AZURE_CLI_TEST_EKM_EXTERNAL_KEY_ID - id of a key that already exists in the EKM proxy + """ + + def test_keyvault_ekm_connection_and_external_key(self): + mhsm_url = os.environ.get('AZURE_CLI_TEST_EKM_MHSM_URL') + ekm_host = os.environ.get('AZURE_CLI_TEST_EKM_HOST') + ca_cert = os.environ.get('AZURE_CLI_TEST_EKM_CA_CERT') + ext_key_id = os.environ.get('AZURE_CLI_TEST_EKM_EXTERNAL_KEY_ID') + if not all([mhsm_url, ekm_host, ca_cert, ext_key_id]): + self.skipTest('Set AZURE_CLI_TEST_EKM_MHSM_URL / _HOST / _CA_CERT / _EXTERNAL_KEY_ID ' + 'to run the EKM live scenario test.') + + self.kwargs.update({ + 'mhsm': mhsm_url, + 'host': ekm_host, + 'ca_cert': ca_cert, + 'ext_key_id': ext_key_id, + 'key_name': self.create_random_name('cli-ekm-key-', 24), + 'normal_key': self.create_random_name('cli-norm-key-', 24), + }) + + # --- EKM connection lifecycle --- + self.cmd('keyvault ekm-connection create --id {mhsm} --host {host} ' + '--server-ca-certificate "{ca_cert}"') + show = self.cmd('keyvault ekm-connection show --id {mhsm}').get_output_in_json() + self.assertIn('host', show) + self.cmd('keyvault ekm-connection check --id {mhsm}') + cert = self.cmd('keyvault ekm-connection certificate show --id {mhsm}').get_output_in_json() + self.assertIsInstance(cert, dict) + + # --- External (EKM-backed) key create/show/list-versions/delete --- + created = self.cmd('keyvault key create --id {mhsm}/keys/{key_name} ' + '--external-key-id {ext_key_id}').get_output_in_json() + self.assertEqual(created.get('externalKeyId'), ext_key_id) + self.cmd('keyvault key show --id {mhsm}/keys/{key_name}') + self.cmd('keyvault key list-versions --id {mhsm}/keys/{key_name}') + self.cmd('keyvault key delete --id {mhsm}/keys/{key_name}') + + # --- Fail fast: key-shape args are incompatible with --external-key-id --- + self.cmd('keyvault key create --id {mhsm}/keys/{key_name} ' + '--external-key-id {ext_key_id} --size 2048', expect_failure=True) + + # --- Regression: a normal HSM key still works --- + self.cmd('keyvault key create --id {mhsm}/keys/{normal_key} --kty RSA --size 2048 --protection hsm') + self.cmd('keyvault key delete --id {mhsm}/keys/{normal_key}') + + # --- Clean up the connection --- + self.cmd('keyvault ekm-connection delete --id {mhsm}') + + class KeyVaultPrivateLinkResourceScenarioTest(ScenarioTest): @ResourceGroupPreparer(name_prefix='cli_test_keyvault_plr') @KeyVaultPreparer(name_prefix='cli-test-kv-plr-', location='eastus2') From ed1bf78685398c6ea18a7fb4987cd8cd90d92de0 Mon Sep 17 00:00:00 2001 From: Yash Date: Fri, 26 Jun 2026 09:31:36 +1000 Subject: [PATCH 4/5] [Key Vault] EKM: make live scenario test runnable (path-prefix + clean-slate) Validated the scenario test by actually running it live against a real MHSM + EKM proxy. Two fixes from that run: - Support optional AZURE_CLI_TEST_EKM_PATH_PREFIX and pass --path-prefix; without it the connection check fails on proxies that require a path prefix (e.g. /api/v1). - Delete any pre-existing connection before create: the service returns 'EKM connection is already setup' if one exists, so start from a clean slate. Test now passes end-to-end live and still skips cleanly in CI. --- .../tests/latest/test_keyvault_commands.py | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py b/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py index 21edcbfc93a..23f5abd923a 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py @@ -308,12 +308,17 @@ class KeyVaultEkmScenarioTest(ScenarioTest): EKM (External Key Manager) requires a Managed HSM that is already wired to an external key-manager proxy. That infrastructure cannot be provisioned by a preparer or captured in a recording, so this test is live-only and reads the target from environment variables. It is - skipped unless all of them are set: + skipped unless the required ones are set: AZURE_CLI_TEST_EKM_MHSM_URL - Managed HSM URL (https://.managedhsm.azure.net) AZURE_CLI_TEST_EKM_HOST - EKM proxy host (FQDN or FQDN:port) AZURE_CLI_TEST_EKM_CA_CERT - path to the EKM proxy server CA certificate (PEM/DER) AZURE_CLI_TEST_EKM_EXTERNAL_KEY_ID - id of a key that already exists in the EKM proxy + AZURE_CLI_TEST_EKM_PATH_PREFIX - optional proxy path prefix (e.g. /api/v1); some + proxies require it for the connection check to succeed + + NOTE: this test creates AND deletes the EKM connection on the target MHSM, so run it against a + dedicated/disposable Managed HSM, not a shared pool with a connection you need to keep. """ def test_keyvault_ekm_connection_and_external_key(self): @@ -321,6 +326,7 @@ def test_keyvault_ekm_connection_and_external_key(self): ekm_host = os.environ.get('AZURE_CLI_TEST_EKM_HOST') ca_cert = os.environ.get('AZURE_CLI_TEST_EKM_CA_CERT') ext_key_id = os.environ.get('AZURE_CLI_TEST_EKM_EXTERNAL_KEY_ID') + path_prefix = os.environ.get('AZURE_CLI_TEST_EKM_PATH_PREFIX') if not all([mhsm_url, ekm_host, ca_cert, ext_key_id]): self.skipTest('Set AZURE_CLI_TEST_EKM_MHSM_URL / _HOST / _CA_CERT / _EXTERNAL_KEY_ID ' 'to run the EKM live scenario test.') @@ -335,8 +341,19 @@ def test_keyvault_ekm_connection_and_external_key(self): }) # --- EKM connection lifecycle --- - self.cmd('keyvault ekm-connection create --id {mhsm} --host {host} ' - '--server-ca-certificate "{ca_cert}"') + # The service rejects 'create' when a connection already exists, so start from a clean + # slate (best-effort delete; ignored if there is no pre-existing connection). + try: + self.cmd('keyvault ekm-connection delete --id {mhsm}') + except Exception: # pylint: disable=broad-except + pass + + create_cmd = ('keyvault ekm-connection create --id {mhsm} --host {host} ' + '--server-ca-certificate "{ca_cert}"') + if path_prefix: + self.kwargs['path_prefix'] = path_prefix + create_cmd += ' --path-prefix {path_prefix}' + self.cmd(create_cmd) show = self.cmd('keyvault ekm-connection show --id {mhsm}').get_output_in_json() self.assertIn('host', show) self.cmd('keyvault ekm-connection check --id {mhsm}') From 597275ca6bd21c2ad5b1d6e6e475e322b396fbab Mon Sep 17 00:00:00 2001 From: Yash Date: Fri, 26 Jun 2026 09:47:49 +1000 Subject: [PATCH 5/5] [Key Vault] EKM: make --server-ca-certificate required on create; verify multi-cert input Addresses review feedback from Chandan on the EKM connection commands: - --server-ca-certificate is now a required argument on 'ekm-connection create' so help shows [Required]. The validator's non-empty check is kept as a safety net for files that parse to zero certs. Left optional on 'update'. - Clarified the help text to document both supported input forms. - Added unit tests proving both certificate input combinations: (1) a single file containing a multi-cert PEM chain is split into separate DER blobs, and (2) multiple space-separated file paths (PEM and/or DER, each possibly multi-block) are all loaded in order. Also added a test that create rejects a missing certificate. Both forms were additionally verified live end-to-end against a Managed HSM + EKM proxy (create -> show shows 2 CAs -> check passes), then the connection was restored. --- .../cli/command_modules/keyvault/_params.py | 10 ++- .../tests/latest/test_keyvault_commands.py | 74 +++++++++++++++++++ 2 files changed, 81 insertions(+), 3 deletions(-) diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/_params.py b/src/azure-cli/azure/cli/command_modules/keyvault/_params.py index 3fbb9aae405..a7180cd850c 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/_params.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/_params.py @@ -634,8 +634,10 @@ class CLISecurityDomainOperation(str, Enum): c.extra('path_prefix', options_list=['--path-prefix'], help='Optional path prefix to append to EKM proxy requests. Must start with "/".') c.extra('server_ca_certificates', options_list=['--server-ca-certificate'], nargs='+', type=file_type, - completer=FilesCompleter(), - help='Path(s) to server CA certificate(s) in PEM or DER format.') + required=True, completer=FilesCompleter(), + help='Path(s) to server CA certificate(s) in PEM or DER format. ' + 'Pass a single file containing a PEM chain (multiple certificate blocks), ' + 'or multiple space-separated file paths (each PEM or DER).') c.extra('server_subject_common_name', options_list=['--server-subject-common-name', '--server-cn'], help='Optional expected Common Name (CN) for the EKM proxy server certificate.') @@ -646,7 +648,9 @@ class CLISecurityDomainOperation(str, Enum): help='Optional path prefix to append to EKM proxy requests. Must start with "/".') c.extra('server_ca_certificates', options_list=['--server-ca-certificate'], nargs='+', type=file_type, completer=FilesCompleter(), - help='Path(s) to server CA certificate(s) in PEM or DER format.') + help='Path(s) to server CA certificate(s) in PEM or DER format. ' + 'Pass a single file containing a PEM chain (multiple certificate blocks), ' + 'or multiple space-separated file paths (each PEM or DER).') c.extra('server_subject_common_name', options_list=['--server-subject-common-name', '--server-cn'], help='Optional expected Common Name (CN) for the EKM proxy server certificate.') diff --git a/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py b/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py index 23f5abd923a..c96ebf92ef5 100644 --- a/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py +++ b/src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/test_keyvault_commands.py @@ -211,6 +211,80 @@ def test_load_certificates_as_der_bytes_from_pem(self): self.assertTrue(certs) self.assertIsInstance(certs[0], (bytes, bytearray)) + def test_load_certificates_single_file_with_multiple_pem_blocks(self): + # Scenario 1: one file containing a PEM "chain" (multiple BEGIN/END CERTIFICATE blocks). + # The validator must split it and return each certificate as a separate DER blob. + from azure.cli.command_modules.keyvault._validators import _load_certificates_as_der_bytes + + cert_paths = [os.path.join(TEST_DIR, 'certs', name) for name in ('cert_0.cer', 'cert_1.cer', 'cert_2.cer')] + chain = b'\n'.join(open(p, 'rb').read() for p in cert_paths) + with tempfile.NamedTemporaryFile(suffix='.pem', delete=False) as tmp: + tmp.write(chain) + chain_path = tmp.name + try: + certs = _load_certificates_as_der_bytes([chain_path]) + finally: + os.remove(chain_path) + self.assertEqual(len(certs), 3) + for der in certs: + self.assertIsInstance(der, (bytes, bytearray)) + # Each split block must match the DER of the individual source certificate. + individual = [_load_certificates_as_der_bytes([p])[0] for p in cert_paths] + self.assertEqual(certs, individual) + + def test_load_certificates_multiple_files_space_separated(self): + # Scenario 2: multiple file paths passed to the same flag (nargs='+', space-separated). + # Each file contributes its certificate(s); order is preserved. + from azure.cli.command_modules.keyvault._validators import _load_certificates_as_der_bytes + + cert_paths = [os.path.join(TEST_DIR, 'certs', name) for name in ('cert_0.cer', 'cert_1.cer', 'cert_2.cer')] + certs = _load_certificates_as_der_bytes(cert_paths) + self.assertEqual(len(certs), 3) + individual = [_load_certificates_as_der_bytes([p])[0] for p in cert_paths] + self.assertEqual(certs, individual) + + def test_load_certificates_multiple_files_with_chain_and_der(self): + # Mixed: one file is a multi-cert PEM chain, another is single-cert DER. + # Total certs returned must equal the sum across all files. + import ssl + from azure.cli.command_modules.keyvault._validators import _load_certificates_as_der_bytes + + cert_0 = os.path.join(TEST_DIR, 'certs', 'cert_0.cer') + cert_1 = os.path.join(TEST_DIR, 'certs', 'cert_1.cer') + cert_2 = os.path.join(TEST_DIR, 'certs', 'cert_2.cer') + + # Build a 2-cert PEM chain file. + chain = open(cert_0, 'rb').read() + b'\n' + open(cert_1, 'rb').read() + # Build a DER-encoded file from cert_2 (no PEM headers). + der_2 = ssl.PEM_cert_to_DER_cert(open(cert_2, 'r', encoding='utf-8').read()) + + with tempfile.NamedTemporaryFile(suffix='.pem', delete=False) as tmp_pem: + tmp_pem.write(chain) + chain_path = tmp_pem.name + with tempfile.NamedTemporaryFile(suffix='.der', delete=False) as tmp_der: + tmp_der.write(der_2) + der_path = tmp_der.name + try: + certs = _load_certificates_as_der_bytes([chain_path, der_path]) + finally: + os.remove(chain_path) + os.remove(der_path) + self.assertEqual(len(certs), 3) + # The DER file must round-trip unchanged as the last entry. + self.assertEqual(certs[-1], der_2) + + def test_validate_ekm_connection_create_requires_certificate(self): + # Chandan's feedback: --server-ca-certificate is mandatory for create. + # Even if the param-level required check is bypassed, the validator must reject empty certs. + from azure.cli.command_modules.keyvault._validators import validate_ekm_connection_create + + ns = argparse.Namespace( + hsm_name=None, identifier='https://example.managedhsm.azure.net', + host='example.com:443', path_prefix='/api/v1', server_ca_certificates=None) + with mock.patch('azure.cli.command_modules.keyvault._validators.set_vault_base_url'): + with self.assertRaises(CLIError): + validate_ekm_connection_create(None, ns) + class KeyVaultEkmCertificateSerializationUnitTest(unittest.TestCase): def test_get_ekm_certificate_serializes_der_bytes(self):