diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py index 0e4ebfee4a68..290a6268a556 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py @@ -318,6 +318,8 @@ def sequence_number(self) -> Optional[int]: :rtype: int or None """ + if self._raw_amqp_message.annotations is None: + return None return self._raw_amqp_message.annotations.get(PROP_SEQ_NUMBER, None) @property @@ -327,8 +329,8 @@ def offset(self) -> Optional[str]: :rtype: str or None """ try: - return self._raw_amqp_message.annotations[PROP_OFFSET].decode("UTF-8") - except (KeyError, AttributeError): + return self._raw_amqp_message.annotations[PROP_OFFSET].decode("UTF-8") # type: ignore[index] + except (KeyError, AttributeError, TypeError): return None @property @@ -337,7 +339,8 @@ def enqueued_time(self) -> Optional[datetime.datetime]: :rtype: datetime.datetime or None """ - timestamp = self._raw_amqp_message.annotations.get(PROP_TIMESTAMP, None) + annotations = self._raw_amqp_message.annotations or {} + timestamp = annotations.get(PROP_TIMESTAMP, None) if timestamp: return utc_from_timestamp(float(timestamp) / 1000) return None @@ -348,6 +351,8 @@ def partition_key(self) -> Optional[bytes]: :rtype: bytes or None """ + if self._raw_amqp_message.annotations is None: + return None return self._raw_amqp_message.annotations.get(PROP_PARTITION_KEY, None) @property @@ -356,6 +361,8 @@ def properties(self) -> Dict[Union[str, bytes], Any]: :rtype: dict[str, any] or dict[bytes, any] """ + if self._raw_amqp_message.application_properties is None: + self._raw_amqp_message.application_properties = {} return self._raw_amqp_message.application_properties @properties.setter @@ -402,7 +409,8 @@ def system_properties(self) -> Dict[bytes, Any]: value = getattr(self._raw_amqp_message.properties, prop_name, None) if value: self._sys_properties[key] = value - self._sys_properties.update(self._raw_amqp_message.annotations) + if self._raw_amqp_message.annotations: + self._sys_properties.update(self._raw_amqp_message.annotations) # type: ignore[arg-type] return self._sys_properties @property @@ -483,10 +491,10 @@ def content_type(self) -> Optional[str]: return self._raw_amqp_message.properties.content_type @content_type.setter - def content_type(self, value: str) -> None: - if not self._raw_amqp_message.properties: - self._raw_amqp_message.properties = AmqpMessageProperties() - self._raw_amqp_message.properties.content_type = value + def content_type(self, value: Optional[str]) -> None: + properties = self._raw_amqp_message.properties or AmqpMessageProperties() + properties.content_type = value + self._raw_amqp_message.properties = properties @property def correlation_id(self) -> Optional[str]: @@ -503,10 +511,10 @@ def correlation_id(self) -> Optional[str]: return self._raw_amqp_message.properties.correlation_id @correlation_id.setter - def correlation_id(self, value: str) -> None: - if not self._raw_amqp_message.properties: - self._raw_amqp_message.properties = AmqpMessageProperties() - self._raw_amqp_message.properties.correlation_id = value + def correlation_id(self, value: Optional[str]) -> None: + properties = self._raw_amqp_message.properties or AmqpMessageProperties() + properties.correlation_id = value + self._raw_amqp_message.properties = properties @property def message_id(self) -> Optional[str]: @@ -525,10 +533,10 @@ def message_id(self) -> Optional[str]: return self._raw_amqp_message.properties.message_id @message_id.setter - def message_id(self, value: str) -> None: - if not self._raw_amqp_message.properties: - self._raw_amqp_message.properties = AmqpMessageProperties() - self._raw_amqp_message.properties.message_id = value + def message_id(self, value: Optional[str]) -> None: + properties = self._raw_amqp_message.properties or AmqpMessageProperties() + properties.message_id = value + self._raw_amqp_message.properties = properties class EventDataBatch: diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_base.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_base.py index a297c111eef6..cfbc284b38cd 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_base.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_base.py @@ -3,6 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from __future__ import annotations +import datetime from typing import List, Tuple, Union, TYPE_CHECKING, Optional, Any, Dict, Callable from abc import ABC, abstractmethod @@ -209,7 +210,7 @@ def create_send_client( idle_timeout: Optional[float], network_trace: bool, retry_policy: Any, - keep_alive_interval: int, + keep_alive_interval: Optional[int], client_name: str, link_properties: Optional[Dict[str, Any]], properties: Optional[Dict[str, Any]], @@ -270,7 +271,11 @@ def add_batch( @staticmethod @abstractmethod - def create_source(source: Union["uamqp_Source", "pyamqp_Source"], offset: int, selector: bytes): + def create_source( + source: Union["uamqp_Source", "pyamqp_Source"], + offset: Optional[Union[int, str, datetime.datetime]], + selector: bytes, + ): """ Creates and returns the Source. diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_pyamqp_transport.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_pyamqp_transport.py index d9b8c5211253..450e959122fa 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_pyamqp_transport.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_pyamqp_transport.py @@ -304,7 +304,7 @@ def create_send_client( idle_timeout: Optional[float], network_trace: bool, retry_policy: Any, - keep_alive_interval: int, + keep_alive_interval: Optional[int], client_name: str, link_properties: Optional[Dict[str, Any]] = None, properties: Optional[Dict[str, Any]] = None, diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_uamqp_transport.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_uamqp_transport.py index e04aa072686e..af915a02bc66 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_uamqp_transport.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_uamqp_transport.py @@ -365,7 +365,7 @@ def create_send_client( idle_timeout: Optional[float], network_trace: bool, retry_policy: Any, - keep_alive_interval: int, + keep_alive_interval: Optional[int], client_name: str, link_properties: Optional[Dict[str, Any]] = None, properties: Optional[Dict[str, Any]] = None, diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_utils.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_utils.py index e347ae2e3fc5..6711545dd264 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_utils.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_utils.py @@ -120,7 +120,7 @@ def set_event_partition_key( raw_message.header.durable = True -def event_position_selector(value: Union[str, int, datetime.datetime], inclusive: bool = False) -> bytes: +def event_position_selector(value: Optional[Union[str, int, datetime.datetime]], inclusive: bool = False) -> bytes: """Creates a selector expression of the offset. :param int or str or datetime.datetime value: The offset value to use for the offset. diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_base_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_base_async.py index aaad055c7bbd..895c3abcd88e 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_base_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_base_async.py @@ -3,6 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from __future__ import annotations +import datetime from abc import ABC, abstractmethod from typing import List, Tuple, Union, TYPE_CHECKING, Optional, Any, Dict, Callable from typing_extensions import Literal @@ -201,7 +202,7 @@ def create_send_client( idle_timeout: Optional[float], network_trace: bool, retry_policy: Any, - keep_alive_interval: int, + keep_alive_interval: Optional[int], client_name: str, link_properties: Optional[Dict[str, Any]], properties: Optional[Dict[str, Any]], @@ -249,7 +250,11 @@ def set_message_partition_key( @staticmethod @abstractmethod - def create_source(source: str, offset: int, selector: bytes) -> Union["uamqp_Source", "pyamqp_Source"]: + def create_source( + source: str, + offset: Optional[Union[int, str, datetime.datetime]], + selector: bytes, + ) -> Union["uamqp_Source", "pyamqp_Source"]: """ Creates and returns the Source. diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_pyamqp_transport_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_pyamqp_transport_async.py index 64c56e0b71d4..fd49d6851642 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_pyamqp_transport_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_pyamqp_transport_async.py @@ -96,7 +96,7 @@ def create_send_client( idle_timeout: Optional[float], network_trace: bool, retry_policy: Any, - keep_alive_interval: int, + keep_alive_interval: Optional[int], client_name: str, link_properties: Optional[Dict[str, Any]], properties: Optional[Dict[str, Any]] = None, diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_uamqp_transport_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_uamqp_transport_async.py index c4fae70a0542..e6ef72d030a0 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_uamqp_transport_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_uamqp_transport_async.py @@ -119,7 +119,7 @@ def create_send_client( idle_timeout: Optional[float], network_trace: bool, retry_policy: Any, - keep_alive_interval: int, + keep_alive_interval: Optional[int], client_name: str, link_properties: Optional[Dict[str, Any]] = None, properties: Optional[Dict[str, Any]] = None,