From 0fbb9b2489a3e4cf4e47e5e973013f5039e00eb8 Mon Sep 17 00:00:00 2001 From: Johnathan W Date: Thu, 7 May 2026 15:29:34 -0400 Subject: [PATCH 1/7] Bound AMQP compound element count during decode --- .../azure/eventhub/_pyamqp/_decode.py | 21 +++++- .../unittest/test_decode_bounds.py | 68 +++++++++++++++++++ .../azure/servicebus/_pyamqp/_decode.py | 21 +++++- .../tests/unittests/test_decode_bounds.py | 68 +++++++++++++++++++ 4 files changed, 176 insertions(+), 2 deletions(-) create mode 100644 sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_decode_bounds.py create mode 100644 sdk/servicebus/azure-servicebus/tests/unittests/test_decode_bounds.py diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py index afe58f311a0c..08feccaa93bb 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py @@ -29,6 +29,12 @@ _LOGGER = logging.getLogger(__name__) _HEADER_PREFIX = memoryview(b"AMQP") + +# Maximum number of elements permitted in any AMQP compound type (list, array, map). +# Mirrors MAX_AMQPVALUE_ITEM_COUNT (65536) in azure-uamqp-c. Without this cap an +# unauthenticated peer can force a multi-gigabyte allocation by sending a crafted +# compound type whose COUNT field is taken straight from the wire. +_MAX_COMPOUND_COUNT = 65536 _COMPOSITES = { 35: "received", 36: "accepted", @@ -222,6 +228,10 @@ def _decode_list_small(buffer: memoryview) -> Tuple[memoryview, List[Any]]: def _decode_list_large(buffer: memoryview) -> Tuple[memoryview, List[Any]]: count = c_unsigned_long.unpack(buffer[4:8])[0] + if count > _MAX_COMPOUND_COUNT: + raise ValueError( + f"AMQP list element count {count} exceeds maximum {_MAX_COMPOUND_COUNT}" + ) buffer = buffer[8:] values = [None] * count for i in range(count): @@ -241,7 +251,12 @@ def _decode_map_small(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]: def _decode_map_large(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]: - count = int(c_unsigned_long.unpack(buffer[4:8])[0] / 2) + raw_count = c_unsigned_long.unpack(buffer[4:8])[0] + if raw_count > _MAX_COMPOUND_COUNT: + raise ValueError( + f"AMQP map element count {raw_count} exceeds maximum {_MAX_COMPOUND_COUNT}" + ) + count = int(raw_count / 2) buffer = buffer[8:] values = {} for _ in range(count): @@ -265,6 +280,10 @@ def _decode_array_small(buffer: memoryview) -> Tuple[memoryview, List[Any]]: def _decode_array_large(buffer: memoryview) -> Tuple[memoryview, List[Any]]: count = c_unsigned_long.unpack(buffer[4:8])[0] + if count > _MAX_COMPOUND_COUNT: + raise ValueError( + f"AMQP array element count {count} exceeds maximum {_MAX_COMPOUND_COUNT}" + ) if count: subconstructor = buffer[8] buffer = buffer[9:] diff --git a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_decode_bounds.py b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_decode_bounds.py new file mode 100644 index 000000000000..50617da397d4 --- /dev/null +++ b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_decode_bounds.py @@ -0,0 +1,68 @@ +import pytest + +from azure.eventhub._pyamqp._decode import ( + _decode_array_large, + _decode_list_large, + _decode_map_large, + _MAX_COMPOUND_COUNT, +) + + +def _header(count: int) -> bytes: + # 4 bytes size (unused by the decoder beyond slicing) + 4 bytes big-endian count + return b"\x00\x00\x00\x00" + count.to_bytes(4, "big") + + +HUGE_COUNT = 0x7FFFFFFF +JUST_OVER = _MAX_COMPOUND_COUNT + 1 + + +@pytest.mark.parametrize("count", [HUGE_COUNT, JUST_OVER]) +def test_decode_array_large_rejects_oversized_count(count): + buffer = memoryview(_header(count)) + with pytest.raises(ValueError, match="exceeds maximum"): + _decode_array_large(buffer) + + +@pytest.mark.parametrize("count", [HUGE_COUNT, JUST_OVER]) +def test_decode_list_large_rejects_oversized_count(count): + buffer = memoryview(_header(count)) + with pytest.raises(ValueError, match="exceeds maximum"): + _decode_list_large(buffer) + + +@pytest.mark.parametrize("count", [HUGE_COUNT, JUST_OVER]) +def test_decode_map_large_rejects_oversized_count(count): + buffer = memoryview(_header(count)) + with pytest.raises(ValueError, match="exceeds maximum"): + _decode_map_large(buffer) + + +def test_decode_array_large_accepts_boundary(): + # COUNT exactly at the cap with a null subconstructor (0x40). _decode_null + # consumes no bytes, so the result is a list of _MAX_COMPOUND_COUNT Nones. + buffer = memoryview(_header(_MAX_COMPOUND_COUNT) + b"\x40") + remaining, values = _decode_array_large(buffer) + assert len(values) == _MAX_COMPOUND_COUNT + assert all(v is None for v in values) + assert bytes(remaining) == b"" + + +def test_decode_list_large_accepts_boundary(): + # Each element carries its own constructor byte; _MAX_COMPOUND_COUNT nulls. + body = b"\x40" * _MAX_COMPOUND_COUNT + buffer = memoryview(_header(_MAX_COMPOUND_COUNT) + body) + remaining, values = _decode_list_large(buffer) + assert len(values) == _MAX_COMPOUND_COUNT + assert all(v is None for v in values) + assert bytes(remaining) == b"" + + +def test_decode_map_large_accepts_boundary(): + # COUNT counts entries (keys + values); pairs = count // 2. + body = b"\x40" * _MAX_COMPOUND_COUNT + buffer = memoryview(_header(_MAX_COMPOUND_COUNT) + body) + remaining, values = _decode_map_large(buffer) + # All keys collapse to None, so the dict has a single entry. + assert values == {None: None} + assert bytes(remaining) == b"" diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py index afe58f311a0c..08feccaa93bb 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py @@ -29,6 +29,12 @@ _LOGGER = logging.getLogger(__name__) _HEADER_PREFIX = memoryview(b"AMQP") + +# Maximum number of elements permitted in any AMQP compound type (list, array, map). +# Mirrors MAX_AMQPVALUE_ITEM_COUNT (65536) in azure-uamqp-c. Without this cap an +# unauthenticated peer can force a multi-gigabyte allocation by sending a crafted +# compound type whose COUNT field is taken straight from the wire. +_MAX_COMPOUND_COUNT = 65536 _COMPOSITES = { 35: "received", 36: "accepted", @@ -222,6 +228,10 @@ def _decode_list_small(buffer: memoryview) -> Tuple[memoryview, List[Any]]: def _decode_list_large(buffer: memoryview) -> Tuple[memoryview, List[Any]]: count = c_unsigned_long.unpack(buffer[4:8])[0] + if count > _MAX_COMPOUND_COUNT: + raise ValueError( + f"AMQP list element count {count} exceeds maximum {_MAX_COMPOUND_COUNT}" + ) buffer = buffer[8:] values = [None] * count for i in range(count): @@ -241,7 +251,12 @@ def _decode_map_small(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]: def _decode_map_large(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]: - count = int(c_unsigned_long.unpack(buffer[4:8])[0] / 2) + raw_count = c_unsigned_long.unpack(buffer[4:8])[0] + if raw_count > _MAX_COMPOUND_COUNT: + raise ValueError( + f"AMQP map element count {raw_count} exceeds maximum {_MAX_COMPOUND_COUNT}" + ) + count = int(raw_count / 2) buffer = buffer[8:] values = {} for _ in range(count): @@ -265,6 +280,10 @@ def _decode_array_small(buffer: memoryview) -> Tuple[memoryview, List[Any]]: def _decode_array_large(buffer: memoryview) -> Tuple[memoryview, List[Any]]: count = c_unsigned_long.unpack(buffer[4:8])[0] + if count > _MAX_COMPOUND_COUNT: + raise ValueError( + f"AMQP array element count {count} exceeds maximum {_MAX_COMPOUND_COUNT}" + ) if count: subconstructor = buffer[8] buffer = buffer[9:] diff --git a/sdk/servicebus/azure-servicebus/tests/unittests/test_decode_bounds.py b/sdk/servicebus/azure-servicebus/tests/unittests/test_decode_bounds.py new file mode 100644 index 000000000000..6f752876daaa --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/unittests/test_decode_bounds.py @@ -0,0 +1,68 @@ +import pytest + +from azure.servicebus._pyamqp._decode import ( + _decode_array_large, + _decode_list_large, + _decode_map_large, + _MAX_COMPOUND_COUNT, +) + + +def _header(count: int) -> bytes: + # 4 bytes size (unused by the decoder beyond slicing) + 4 bytes big-endian count + return b"\x00\x00\x00\x00" + count.to_bytes(4, "big") + + +HUGE_COUNT = 0x7FFFFFFF +JUST_OVER = _MAX_COMPOUND_COUNT + 1 + + +@pytest.mark.parametrize("count", [HUGE_COUNT, JUST_OVER]) +def test_decode_array_large_rejects_oversized_count(count): + buffer = memoryview(_header(count)) + with pytest.raises(ValueError, match="exceeds maximum"): + _decode_array_large(buffer) + + +@pytest.mark.parametrize("count", [HUGE_COUNT, JUST_OVER]) +def test_decode_list_large_rejects_oversized_count(count): + buffer = memoryview(_header(count)) + with pytest.raises(ValueError, match="exceeds maximum"): + _decode_list_large(buffer) + + +@pytest.mark.parametrize("count", [HUGE_COUNT, JUST_OVER]) +def test_decode_map_large_rejects_oversized_count(count): + buffer = memoryview(_header(count)) + with pytest.raises(ValueError, match="exceeds maximum"): + _decode_map_large(buffer) + + +def test_decode_array_large_accepts_boundary(): + # COUNT exactly at the cap with a null subconstructor (0x40). _decode_null + # consumes no bytes, so the result is a list of _MAX_COMPOUND_COUNT Nones. + buffer = memoryview(_header(_MAX_COMPOUND_COUNT) + b"\x40") + remaining, values = _decode_array_large(buffer) + assert len(values) == _MAX_COMPOUND_COUNT + assert all(v is None for v in values) + assert bytes(remaining) == b"" + + +def test_decode_list_large_accepts_boundary(): + # Each element carries its own constructor byte; _MAX_COMPOUND_COUNT nulls. + body = b"\x40" * _MAX_COMPOUND_COUNT + buffer = memoryview(_header(_MAX_COMPOUND_COUNT) + body) + remaining, values = _decode_list_large(buffer) + assert len(values) == _MAX_COMPOUND_COUNT + assert all(v is None for v in values) + assert bytes(remaining) == b"" + + +def test_decode_map_large_accepts_boundary(): + # COUNT counts entries (keys + values); pairs = count // 2. + body = b"\x40" * _MAX_COMPOUND_COUNT + buffer = memoryview(_header(_MAX_COMPOUND_COUNT) + body) + remaining, values = _decode_map_large(buffer) + # All keys collapse to None, so the dict has a single entry. + assert values == {None: None} + assert bytes(remaining) == b"" From 3c2d5d1d96760d9351b971d562979f18656cf65a Mon Sep 17 00:00:00 2001 From: Johnathan W Date: Sat, 9 May 2026 10:31:20 -0400 Subject: [PATCH 2/7] Document AMQP compound count bound and per-decoder rationale --- .../azure/eventhub/_pyamqp/_decode.py | 27 ++++++++++++++++--- .../azure/servicebus/_pyamqp/_decode.py | 27 ++++++++++++++++--- 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py index 08feccaa93bb..7ea5e82fd59d 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py @@ -31,9 +31,21 @@ _HEADER_PREFIX = memoryview(b"AMQP") # Maximum number of elements permitted in any AMQP compound type (list, array, map). -# Mirrors MAX_AMQPVALUE_ITEM_COUNT (65536) in azure-uamqp-c. Without this cap an -# unauthenticated peer can force a multi-gigabyte allocation by sending a crafted -# compound type whose COUNT field is taken straight from the wire. +# +# The AMQP 1.0 wire format encodes compound element counts as either a 1-byte +# field (the *_small variants, naturally bounded at 255) or a 4-byte field +# (the *_large variants, wire-level maximum 0xFFFFFFFF). The large variants +# allocate a Python list/dict sized directly from this count, so without an +# upper bound a small frame can demand a multi-gigabyte allocation. This cap +# is applied at every large-variant decode site to keep allocation sizes +# proportional to the bytes actually delivered. +# +# The value mirrors MAX_AMQPVALUE_ITEM_COUNT (65536) in the C reference +# implementation, azure-uamqp-c, and the equivalent MAX_COMPOUND_COUNT bound +# applied across list/array/map decode sites in the Java AMQP codec. Real +# AMQP traffic does not approach this many elements in a single compound +# value, so the bound functions as a hard ceiling rather than a practical +# constraint on legitimate workloads. _MAX_COMPOUND_COUNT = 65536 _COMPOSITES = { 35: "received", @@ -228,6 +240,8 @@ def _decode_list_small(buffer: memoryview) -> Tuple[memoryview, List[Any]]: def _decode_list_large(buffer: memoryview) -> Tuple[memoryview, List[Any]]: count = c_unsigned_long.unpack(buffer[4:8])[0] + # Validate the wire-supplied count before allocating `[None] * count`, + # which would otherwise scale linearly with an untrusted 32-bit value. if count > _MAX_COMPOUND_COUNT: raise ValueError( f"AMQP list element count {count} exceeds maximum {_MAX_COMPOUND_COUNT}" @@ -251,6 +265,10 @@ def _decode_map_small(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]: def _decode_map_large(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]: + # Validate the raw on-wire count *before* halving it (the AMQP encoding + # stores total entries; pairs = entries / 2). Checking pre-halve catches + # hostile odd values and keeps the comparison aligned with the bound used + # by _decode_list_large / _decode_array_large. raw_count = c_unsigned_long.unpack(buffer[4:8])[0] if raw_count > _MAX_COMPOUND_COUNT: raise ValueError( @@ -280,6 +298,9 @@ def _decode_array_small(buffer: memoryview) -> Tuple[memoryview, List[Any]]: def _decode_array_large(buffer: memoryview) -> Tuple[memoryview, List[Any]]: count = c_unsigned_long.unpack(buffer[4:8])[0] + # Validate the wire-supplied count before allocating `[None] * count`. + # An Array32 frame's COUNT is read directly from the network and would + # otherwise drive a Python list allocation of arbitrary size. if count > _MAX_COMPOUND_COUNT: raise ValueError( f"AMQP array element count {count} exceeds maximum {_MAX_COMPOUND_COUNT}" diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py index 08feccaa93bb..7ea5e82fd59d 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py @@ -31,9 +31,21 @@ _HEADER_PREFIX = memoryview(b"AMQP") # Maximum number of elements permitted in any AMQP compound type (list, array, map). -# Mirrors MAX_AMQPVALUE_ITEM_COUNT (65536) in azure-uamqp-c. Without this cap an -# unauthenticated peer can force a multi-gigabyte allocation by sending a crafted -# compound type whose COUNT field is taken straight from the wire. +# +# The AMQP 1.0 wire format encodes compound element counts as either a 1-byte +# field (the *_small variants, naturally bounded at 255) or a 4-byte field +# (the *_large variants, wire-level maximum 0xFFFFFFFF). The large variants +# allocate a Python list/dict sized directly from this count, so without an +# upper bound a small frame can demand a multi-gigabyte allocation. This cap +# is applied at every large-variant decode site to keep allocation sizes +# proportional to the bytes actually delivered. +# +# The value mirrors MAX_AMQPVALUE_ITEM_COUNT (65536) in the C reference +# implementation, azure-uamqp-c, and the equivalent MAX_COMPOUND_COUNT bound +# applied across list/array/map decode sites in the Java AMQP codec. Real +# AMQP traffic does not approach this many elements in a single compound +# value, so the bound functions as a hard ceiling rather than a practical +# constraint on legitimate workloads. _MAX_COMPOUND_COUNT = 65536 _COMPOSITES = { 35: "received", @@ -228,6 +240,8 @@ def _decode_list_small(buffer: memoryview) -> Tuple[memoryview, List[Any]]: def _decode_list_large(buffer: memoryview) -> Tuple[memoryview, List[Any]]: count = c_unsigned_long.unpack(buffer[4:8])[0] + # Validate the wire-supplied count before allocating `[None] * count`, + # which would otherwise scale linearly with an untrusted 32-bit value. if count > _MAX_COMPOUND_COUNT: raise ValueError( f"AMQP list element count {count} exceeds maximum {_MAX_COMPOUND_COUNT}" @@ -251,6 +265,10 @@ def _decode_map_small(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]: def _decode_map_large(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]: + # Validate the raw on-wire count *before* halving it (the AMQP encoding + # stores total entries; pairs = entries / 2). Checking pre-halve catches + # hostile odd values and keeps the comparison aligned with the bound used + # by _decode_list_large / _decode_array_large. raw_count = c_unsigned_long.unpack(buffer[4:8])[0] if raw_count > _MAX_COMPOUND_COUNT: raise ValueError( @@ -280,6 +298,9 @@ def _decode_array_small(buffer: memoryview) -> Tuple[memoryview, List[Any]]: def _decode_array_large(buffer: memoryview) -> Tuple[memoryview, List[Any]]: count = c_unsigned_long.unpack(buffer[4:8])[0] + # Validate the wire-supplied count before allocating `[None] * count`. + # An Array32 frame's COUNT is read directly from the network and would + # otherwise drive a Python list allocation of arbitrary size. if count > _MAX_COMPOUND_COUNT: raise ValueError( f"AMQP array element count {count} exceeds maximum {_MAX_COMPOUND_COUNT}" From c419f6571979e5052990961c583b4829a34e8c31 Mon Sep 17 00:00:00 2001 From: Johnathan W Date: Tue, 19 May 2026 07:51:52 -0500 Subject: [PATCH 3/7] fix(pyamqp): bound list32 element count in decode_frame() decode_frame() decoded the list32 COUNT field as a signed int and skipped the _MAX_COMPOUND_COUNT cap that already protects the per-type _decode_list_large / _decode_map_large / _decode_array_large sites. A malicious peer could send a frame advertising a multi-billion field count, driving `fields = [None] * count` into a multi-gigabyte allocation before the body is even consumed. Switch to c_unsigned_long (the AMQP 1.0 wire definition for list32 COUNT) and apply the existing _MAX_COMPOUND_COUNT cap on the same path. Add negative-path coverage to both vendored copies. --- .../azure/eventhub/_pyamqp/_decode.py | 13 ++++++++++--- .../pyamqp_tests/unittest/test_decode_bounds.py | 15 +++++++++++++++ .../azure/servicebus/_pyamqp/_decode.py | 13 ++++++++++--- .../tests/unittests/test_decode_bounds.py | 15 +++++++++++++++ 4 files changed, 50 insertions(+), 6 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py index 7ea5e82fd59d..fe75851c240a 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py @@ -373,11 +373,18 @@ def decode_frame(data: memoryview) -> Tuple[int, List[Any]]: frame_type = data[2] compound_list_type = data[3] if compound_list_type == 0xD0: - # list32 0xd0: data[4:8] is size, data[8:12] is count - count = c_signed_int.unpack(data[8:12])[0] + # list32 0xd0: data[4:8] is size, data[8:12] is count. The AMQP 1.0 + # wire format defines COUNT as an unsigned 32-bit field; decoding it + # as a signed int and skipping the cap would let a malicious peer + # request a multi-gigabyte field-list allocation below. + count = c_unsigned_long.unpack(data[8:12])[0] + if count > _MAX_COMPOUND_COUNT: + raise ValueError( + f"AMQP frame field count {count} exceeds maximum {_MAX_COMPOUND_COUNT}" + ) buffer = data[12:] else: - # list8 0xc0: data[4] is size, data[5] is count + # list8 0xc0: data[4] is size, data[5] is count (1 byte, bounded at 255). count = data[5] buffer = data[6:] fields: List[Optional[memoryview]] = [None] * count diff --git a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_decode_bounds.py b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_decode_bounds.py index 50617da397d4..733c1d4bd374 100644 --- a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_decode_bounds.py +++ b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_decode_bounds.py @@ -4,6 +4,7 @@ _decode_array_large, _decode_list_large, _decode_map_large, + decode_frame, _MAX_COMPOUND_COUNT, ) @@ -66,3 +67,17 @@ def test_decode_map_large_accepts_boundary(): # All keys collapse to None, so the dict has a single entry. assert values == {None: None} assert bytes(remaining) == b"" + + +def _frame_list32(count: int) -> bytes: + # decode_frame skips data[0:2] (described/ulong constructors), reads + # frame_type at data[2], the compound marker at data[3], size at + # data[4:8], and the COUNT under test at data[8:12]. + return b"\x00\x53\x00\xd0" + b"\x00\x00\x00\x00" + count.to_bytes(4, "big") + + +@pytest.mark.parametrize("count", [HUGE_COUNT, JUST_OVER]) +def test_decode_frame_rejects_oversized_list32_count(count): + buffer = memoryview(_frame_list32(count)) + with pytest.raises(ValueError, match="exceeds maximum"): + decode_frame(buffer) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py index 7ea5e82fd59d..fe75851c240a 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py @@ -373,11 +373,18 @@ def decode_frame(data: memoryview) -> Tuple[int, List[Any]]: frame_type = data[2] compound_list_type = data[3] if compound_list_type == 0xD0: - # list32 0xd0: data[4:8] is size, data[8:12] is count - count = c_signed_int.unpack(data[8:12])[0] + # list32 0xd0: data[4:8] is size, data[8:12] is count. The AMQP 1.0 + # wire format defines COUNT as an unsigned 32-bit field; decoding it + # as a signed int and skipping the cap would let a malicious peer + # request a multi-gigabyte field-list allocation below. + count = c_unsigned_long.unpack(data[8:12])[0] + if count > _MAX_COMPOUND_COUNT: + raise ValueError( + f"AMQP frame field count {count} exceeds maximum {_MAX_COMPOUND_COUNT}" + ) buffer = data[12:] else: - # list8 0xc0: data[4] is size, data[5] is count + # list8 0xc0: data[4] is size, data[5] is count (1 byte, bounded at 255). count = data[5] buffer = data[6:] fields: List[Optional[memoryview]] = [None] * count diff --git a/sdk/servicebus/azure-servicebus/tests/unittests/test_decode_bounds.py b/sdk/servicebus/azure-servicebus/tests/unittests/test_decode_bounds.py index 6f752876daaa..6dcfd0cb4d2c 100644 --- a/sdk/servicebus/azure-servicebus/tests/unittests/test_decode_bounds.py +++ b/sdk/servicebus/azure-servicebus/tests/unittests/test_decode_bounds.py @@ -4,6 +4,7 @@ _decode_array_large, _decode_list_large, _decode_map_large, + decode_frame, _MAX_COMPOUND_COUNT, ) @@ -66,3 +67,17 @@ def test_decode_map_large_accepts_boundary(): # All keys collapse to None, so the dict has a single entry. assert values == {None: None} assert bytes(remaining) == b"" + + +def _frame_list32(count: int) -> bytes: + # decode_frame skips data[0:2] (described/ulong constructors), reads + # frame_type at data[2], the compound marker at data[3], size at + # data[4:8], and the COUNT under test at data[8:12]. + return b"\x00\x53\x00\xd0" + b"\x00\x00\x00\x00" + count.to_bytes(4, "big") + + +@pytest.mark.parametrize("count", [HUGE_COUNT, JUST_OVER]) +def test_decode_frame_rejects_oversized_list32_count(count): + buffer = memoryview(_frame_list32(count)) + with pytest.raises(ValueError, match="exceeds maximum"): + decode_frame(buffer) From 363898e5f13413956c21ef09efcc3f748202ebb6 Mon Sep 17 00:00:00 2001 From: Johnathan W Date: Tue, 19 May 2026 07:53:13 -0500 Subject: [PATCH 4/7] fix(pyamqp): reject odd-count maps and use integer division _decode_map_small and _decode_map_large both used `int(raw_count / 2)`, which silently floors odd counts and leaves a trailing key with no value. The half-decoded pair leaks bytes into the next decoder and corrupts subsequent values on the wire. Reject odd raw counts explicitly and switch to integer division. The cap in _decode_map_large already protects against resource exhaustion; this patch is purely a correctness fix for the small variant and a robustness fix for the large variant. Negative-path tests added to both vendored copies. --- .../azure/eventhub/_pyamqp/_decode.py | 21 ++++++++++++++----- .../unittest/test_decode_bounds.py | 17 +++++++++++++++ .../azure/servicebus/_pyamqp/_decode.py | 21 ++++++++++++++----- .../tests/unittests/test_decode_bounds.py | 17 +++++++++++++++ 4 files changed, 66 insertions(+), 10 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py index fe75851c240a..c3c72afbb8c2 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py @@ -254,7 +254,12 @@ def _decode_list_large(buffer: memoryview) -> Tuple[memoryview, List[Any]]: def _decode_map_small(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]: - count = int(buffer[1] / 2) + raw_count = buffer[1] + if raw_count % 2 != 0: + raise ValueError( + f"AMQP map element count {raw_count} must be even (key/value pairs)" + ) + count = raw_count // 2 buffer = buffer[2:] values = {} for _ in range(count): @@ -266,15 +271,21 @@ def _decode_map_small(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]: def _decode_map_large(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]: # Validate the raw on-wire count *before* halving it (the AMQP encoding - # stores total entries; pairs = entries / 2). Checking pre-halve catches - # hostile odd values and keeps the comparison aligned with the bound used - # by _decode_list_large / _decode_array_large. + # stores total entries; pairs = entries / 2). Checking pre-halve keeps + # the comparison aligned with the bound used by _decode_list_large / + # _decode_array_large. Odd counts are rejected explicitly: silently + # flooring to (raw_count - 1) // 2 would leave a trailing key with no + # value, leaking bytes into the next decoder. raw_count = c_unsigned_long.unpack(buffer[4:8])[0] if raw_count > _MAX_COMPOUND_COUNT: raise ValueError( f"AMQP map element count {raw_count} exceeds maximum {_MAX_COMPOUND_COUNT}" ) - count = int(raw_count / 2) + if raw_count % 2 != 0: + raise ValueError( + f"AMQP map element count {raw_count} must be even (key/value pairs)" + ) + count = raw_count // 2 buffer = buffer[8:] values = {} for _ in range(count): diff --git a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_decode_bounds.py b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_decode_bounds.py index 733c1d4bd374..30cb90582297 100644 --- a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_decode_bounds.py +++ b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_decode_bounds.py @@ -4,6 +4,7 @@ _decode_array_large, _decode_list_large, _decode_map_large, + _decode_map_small, decode_frame, _MAX_COMPOUND_COUNT, ) @@ -81,3 +82,19 @@ def test_decode_frame_rejects_oversized_list32_count(count): buffer = memoryview(_frame_list32(count)) with pytest.raises(ValueError, match="exceeds maximum"): decode_frame(buffer) + + +def test_decode_map_large_rejects_odd_count(): + # An odd raw COUNT would silently floor to pairs = (count - 1) // 2 and + # leave a trailing key with no value, corrupting subsequent decoding. + buffer = memoryview(_header(3)) + with pytest.raises(ValueError, match="must be even"): + _decode_map_large(buffer) + + +def test_decode_map_small_rejects_odd_count(): + # _decode_map_small reads the COUNT from buffer[1] (1 byte, 0-255). An + # odd value has the same trailing-key problem as the large variant. + buffer = memoryview(b"\x00\x03") + with pytest.raises(ValueError, match="must be even"): + _decode_map_small(buffer) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py index fe75851c240a..c3c72afbb8c2 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py @@ -254,7 +254,12 @@ def _decode_list_large(buffer: memoryview) -> Tuple[memoryview, List[Any]]: def _decode_map_small(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]: - count = int(buffer[1] / 2) + raw_count = buffer[1] + if raw_count % 2 != 0: + raise ValueError( + f"AMQP map element count {raw_count} must be even (key/value pairs)" + ) + count = raw_count // 2 buffer = buffer[2:] values = {} for _ in range(count): @@ -266,15 +271,21 @@ def _decode_map_small(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]: def _decode_map_large(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]: # Validate the raw on-wire count *before* halving it (the AMQP encoding - # stores total entries; pairs = entries / 2). Checking pre-halve catches - # hostile odd values and keeps the comparison aligned with the bound used - # by _decode_list_large / _decode_array_large. + # stores total entries; pairs = entries / 2). Checking pre-halve keeps + # the comparison aligned with the bound used by _decode_list_large / + # _decode_array_large. Odd counts are rejected explicitly: silently + # flooring to (raw_count - 1) // 2 would leave a trailing key with no + # value, leaking bytes into the next decoder. raw_count = c_unsigned_long.unpack(buffer[4:8])[0] if raw_count > _MAX_COMPOUND_COUNT: raise ValueError( f"AMQP map element count {raw_count} exceeds maximum {_MAX_COMPOUND_COUNT}" ) - count = int(raw_count / 2) + if raw_count % 2 != 0: + raise ValueError( + f"AMQP map element count {raw_count} must be even (key/value pairs)" + ) + count = raw_count // 2 buffer = buffer[8:] values = {} for _ in range(count): diff --git a/sdk/servicebus/azure-servicebus/tests/unittests/test_decode_bounds.py b/sdk/servicebus/azure-servicebus/tests/unittests/test_decode_bounds.py index 6dcfd0cb4d2c..d0892e871f3e 100644 --- a/sdk/servicebus/azure-servicebus/tests/unittests/test_decode_bounds.py +++ b/sdk/servicebus/azure-servicebus/tests/unittests/test_decode_bounds.py @@ -4,6 +4,7 @@ _decode_array_large, _decode_list_large, _decode_map_large, + _decode_map_small, decode_frame, _MAX_COMPOUND_COUNT, ) @@ -81,3 +82,19 @@ def test_decode_frame_rejects_oversized_list32_count(count): buffer = memoryview(_frame_list32(count)) with pytest.raises(ValueError, match="exceeds maximum"): decode_frame(buffer) + + +def test_decode_map_large_rejects_odd_count(): + # An odd raw COUNT would silently floor to pairs = (count - 1) // 2 and + # leave a trailing key with no value, corrupting subsequent decoding. + buffer = memoryview(_header(3)) + with pytest.raises(ValueError, match="must be even"): + _decode_map_large(buffer) + + +def test_decode_map_small_rejects_odd_count(): + # _decode_map_small reads the COUNT from buffer[1] (1 byte, 0-255). An + # odd value has the same trailing-key problem as the large variant. + buffer = memoryview(b"\x00\x03") + with pytest.raises(ValueError, match="must be even"): + _decode_map_small(buffer) From ecb9321d1c5ebe02a455b58dc961cdea612dcd74 Mon Sep 17 00:00:00 2001 From: Johnathan Walker Date: Tue, 19 May 2026 09:29:33 -0500 Subject: [PATCH 5/7] [EventHub] Fix mypy errors in _common, _transport, and _utils Resolves 16 mypy errors that surface in CI (e.g., PR #46953) without changing runtime behavior: - _common.py: widen message_id/content_type/correlation_id setters to Optional[str] so EventData.__init__ can clear them, narrow Optional accesses on annotations/application_properties, and silence one residual update() arg-type with a typed ignore. - _transport/_base.py, aio/_transport/_base_async.py: keep_alive_interval is Optional[int] (None means 'no keep-alive') and create_source.offset is Optional[Union[int, str, datetime.datetime]] (matches event_position). - _transport/_pyamqp_transport.py, _uamqp_transport.py, and async equivalents: align keep_alive_interval signatures with the new abstract type to satisfy Liskov. - _utils.event_position_selector: accept Optional value (already handled at runtime via the else branch). No source files imported by user-facing API surfaces change behavior. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../azure-eventhub/azure/eventhub/_common.py | 40 +++++++++++-------- .../azure/eventhub/_transport/_base.py | 5 ++- .../eventhub/_transport/_pyamqp_transport.py | 2 +- .../eventhub/_transport/_uamqp_transport.py | 2 +- .../azure-eventhub/azure/eventhub/_utils.py | 2 +- .../eventhub/aio/_transport/_base_async.py | 5 ++- .../aio/_transport/_pyamqp_transport_async.py | 2 +- .../aio/_transport/_uamqp_transport_async.py | 2 +- 8 files changed, 34 insertions(+), 26 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py index 0e4ebfee4a68..77a2e1311db2 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py @@ -318,6 +318,8 @@ def sequence_number(self) -> Optional[int]: :rtype: int or None """ + if self._raw_amqp_message.annotations is None: + return None return self._raw_amqp_message.annotations.get(PROP_SEQ_NUMBER, None) @property @@ -327,8 +329,8 @@ def offset(self) -> Optional[str]: :rtype: str or None """ try: - return self._raw_amqp_message.annotations[PROP_OFFSET].decode("UTF-8") - except (KeyError, AttributeError): + return self._raw_amqp_message.annotations[PROP_OFFSET].decode("UTF-8") # type: ignore[index] + except (KeyError, AttributeError, TypeError): return None @property @@ -337,7 +339,8 @@ def enqueued_time(self) -> Optional[datetime.datetime]: :rtype: datetime.datetime or None """ - timestamp = self._raw_amqp_message.annotations.get(PROP_TIMESTAMP, None) + annotations = self._raw_amqp_message.annotations or {} + timestamp = annotations.get(PROP_TIMESTAMP, None) if timestamp: return utc_from_timestamp(float(timestamp) / 1000) return None @@ -348,6 +351,8 @@ def partition_key(self) -> Optional[bytes]: :rtype: bytes or None """ + if self._raw_amqp_message.annotations is None: + return None return self._raw_amqp_message.annotations.get(PROP_PARTITION_KEY, None) @property @@ -356,7 +361,7 @@ def properties(self) -> Dict[Union[str, bytes], Any]: :rtype: dict[str, any] or dict[bytes, any] """ - return self._raw_amqp_message.application_properties + return self._raw_amqp_message.application_properties or {} @properties.setter def properties(self, value: Dict[Union[str, bytes], Any]) -> None: @@ -402,7 +407,8 @@ def system_properties(self) -> Dict[bytes, Any]: value = getattr(self._raw_amqp_message.properties, prop_name, None) if value: self._sys_properties[key] = value - self._sys_properties.update(self._raw_amqp_message.annotations) + if self._raw_amqp_message.annotations: + self._sys_properties.update(self._raw_amqp_message.annotations) # type: ignore[arg-type] return self._sys_properties @property @@ -483,10 +489,10 @@ def content_type(self) -> Optional[str]: return self._raw_amqp_message.properties.content_type @content_type.setter - def content_type(self, value: str) -> None: - if not self._raw_amqp_message.properties: - self._raw_amqp_message.properties = AmqpMessageProperties() - self._raw_amqp_message.properties.content_type = value + def content_type(self, value: Optional[str]) -> None: + properties = self._raw_amqp_message.properties or AmqpMessageProperties() + properties.content_type = value + self._raw_amqp_message.properties = properties @property def correlation_id(self) -> Optional[str]: @@ -503,10 +509,10 @@ def correlation_id(self) -> Optional[str]: return self._raw_amqp_message.properties.correlation_id @correlation_id.setter - def correlation_id(self, value: str) -> None: - if not self._raw_amqp_message.properties: - self._raw_amqp_message.properties = AmqpMessageProperties() - self._raw_amqp_message.properties.correlation_id = value + def correlation_id(self, value: Optional[str]) -> None: + properties = self._raw_amqp_message.properties or AmqpMessageProperties() + properties.correlation_id = value + self._raw_amqp_message.properties = properties @property def message_id(self) -> Optional[str]: @@ -525,10 +531,10 @@ def message_id(self) -> Optional[str]: return self._raw_amqp_message.properties.message_id @message_id.setter - def message_id(self, value: str) -> None: - if not self._raw_amqp_message.properties: - self._raw_amqp_message.properties = AmqpMessageProperties() - self._raw_amqp_message.properties.message_id = value + def message_id(self, value: Optional[str]) -> None: + properties = self._raw_amqp_message.properties or AmqpMessageProperties() + properties.message_id = value + self._raw_amqp_message.properties = properties class EventDataBatch: diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_base.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_base.py index a297c111eef6..ec444dec0f2d 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_base.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_base.py @@ -3,6 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from __future__ import annotations +import datetime from typing import List, Tuple, Union, TYPE_CHECKING, Optional, Any, Dict, Callable from abc import ABC, abstractmethod @@ -209,7 +210,7 @@ def create_send_client( idle_timeout: Optional[float], network_trace: bool, retry_policy: Any, - keep_alive_interval: int, + keep_alive_interval: Optional[int], client_name: str, link_properties: Optional[Dict[str, Any]], properties: Optional[Dict[str, Any]], @@ -270,7 +271,7 @@ def add_batch( @staticmethod @abstractmethod - def create_source(source: Union["uamqp_Source", "pyamqp_Source"], offset: int, selector: bytes): + def create_source(source: Union["uamqp_Source", "pyamqp_Source"], offset: Optional[Union[int, str, datetime.datetime]], selector: bytes): """ Creates and returns the Source. diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_pyamqp_transport.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_pyamqp_transport.py index d9b8c5211253..450e959122fa 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_pyamqp_transport.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_pyamqp_transport.py @@ -304,7 +304,7 @@ def create_send_client( idle_timeout: Optional[float], network_trace: bool, retry_policy: Any, - keep_alive_interval: int, + keep_alive_interval: Optional[int], client_name: str, link_properties: Optional[Dict[str, Any]] = None, properties: Optional[Dict[str, Any]] = None, diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_uamqp_transport.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_uamqp_transport.py index e04aa072686e..af915a02bc66 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_uamqp_transport.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_uamqp_transport.py @@ -365,7 +365,7 @@ def create_send_client( idle_timeout: Optional[float], network_trace: bool, retry_policy: Any, - keep_alive_interval: int, + keep_alive_interval: Optional[int], client_name: str, link_properties: Optional[Dict[str, Any]] = None, properties: Optional[Dict[str, Any]] = None, diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_utils.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_utils.py index e347ae2e3fc5..6711545dd264 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_utils.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_utils.py @@ -120,7 +120,7 @@ def set_event_partition_key( raw_message.header.durable = True -def event_position_selector(value: Union[str, int, datetime.datetime], inclusive: bool = False) -> bytes: +def event_position_selector(value: Optional[Union[str, int, datetime.datetime]], inclusive: bool = False) -> bytes: """Creates a selector expression of the offset. :param int or str or datetime.datetime value: The offset value to use for the offset. diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_base_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_base_async.py index aaad055c7bbd..440b851796e1 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_base_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_base_async.py @@ -3,6 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from __future__ import annotations +import datetime from abc import ABC, abstractmethod from typing import List, Tuple, Union, TYPE_CHECKING, Optional, Any, Dict, Callable from typing_extensions import Literal @@ -201,7 +202,7 @@ def create_send_client( idle_timeout: Optional[float], network_trace: bool, retry_policy: Any, - keep_alive_interval: int, + keep_alive_interval: Optional[int], client_name: str, link_properties: Optional[Dict[str, Any]], properties: Optional[Dict[str, Any]], @@ -249,7 +250,7 @@ def set_message_partition_key( @staticmethod @abstractmethod - def create_source(source: str, offset: int, selector: bytes) -> Union["uamqp_Source", "pyamqp_Source"]: + def create_source(source: str, offset: Optional[Union[int, str, datetime.datetime]], selector: bytes) -> Union["uamqp_Source", "pyamqp_Source"]: """ Creates and returns the Source. diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_pyamqp_transport_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_pyamqp_transport_async.py index 64c56e0b71d4..fd49d6851642 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_pyamqp_transport_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_pyamqp_transport_async.py @@ -96,7 +96,7 @@ def create_send_client( idle_timeout: Optional[float], network_trace: bool, retry_policy: Any, - keep_alive_interval: int, + keep_alive_interval: Optional[int], client_name: str, link_properties: Optional[Dict[str, Any]], properties: Optional[Dict[str, Any]] = None, diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_uamqp_transport_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_uamqp_transport_async.py index c4fae70a0542..e6ef72d030a0 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_uamqp_transport_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_uamqp_transport_async.py @@ -119,7 +119,7 @@ def create_send_client( idle_timeout: Optional[float], network_trace: bool, retry_policy: Any, - keep_alive_interval: int, + keep_alive_interval: Optional[int], client_name: str, link_properties: Optional[Dict[str, Any]] = None, properties: Optional[Dict[str, Any]] = None, From 72f6fff3b30b84b047ef94243f9cc044d524ac7d Mon Sep 17 00:00:00 2001 From: Johnathan Walker Date: Tue, 19 May 2026 10:42:41 -0500 Subject: [PATCH 6/7] Wrap long create_source signatures to satisfy pylint line-too-long Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../azure-eventhub/azure/eventhub/_transport/_base.py | 6 +++++- .../azure/eventhub/aio/_transport/_base_async.py | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_base.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_base.py index ec444dec0f2d..cfbc284b38cd 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_base.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_transport/_base.py @@ -271,7 +271,11 @@ def add_batch( @staticmethod @abstractmethod - def create_source(source: Union["uamqp_Source", "pyamqp_Source"], offset: Optional[Union[int, str, datetime.datetime]], selector: bytes): + def create_source( + source: Union["uamqp_Source", "pyamqp_Source"], + offset: Optional[Union[int, str, datetime.datetime]], + selector: bytes, + ): """ Creates and returns the Source. diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_base_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_base_async.py index 440b851796e1..895c3abcd88e 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_base_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_base_async.py @@ -250,7 +250,11 @@ def set_message_partition_key( @staticmethod @abstractmethod - def create_source(source: str, offset: Optional[Union[int, str, datetime.datetime]], selector: bytes) -> Union["uamqp_Source", "pyamqp_Source"]: + def create_source( + source: str, + offset: Optional[Union[int, str, datetime.datetime]], + selector: bytes, + ) -> Union["uamqp_Source", "pyamqp_Source"]: """ Creates and returns the Source. From 466e8de383f1bc6a3153967ebffc5eedcc71afd4 Mon Sep 17 00:00:00 2001 From: Johnathan Walker Date: Tue, 19 May 2026 10:55:13 -0500 Subject: [PATCH 7/7] Address review: preserve backing dict for EventData.properties Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/eventhub/azure-eventhub/azure/eventhub/_common.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py index 77a2e1311db2..290a6268a556 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py @@ -361,7 +361,9 @@ def properties(self) -> Dict[Union[str, bytes], Any]: :rtype: dict[str, any] or dict[bytes, any] """ - return self._raw_amqp_message.application_properties or {} + if self._raw_amqp_message.application_properties is None: + self._raw_amqp_message.application_properties = {} + return self._raw_amqp_message.application_properties @properties.setter def properties(self, value: Dict[Union[str, bytes], Any]) -> None: