From b40f2074e9244ac07a761f0ca8b3eaa59fdddb56 Mon Sep 17 00:00:00 2001 From: Ojasva Jain Date: Fri, 7 Nov 2025 10:58:47 +0530 Subject: [PATCH 1/5] dev changes to allow message object to be instantiated --- src/confluent_kafka/cimpl.pyi | 21 ++++- src/confluent_kafka/src/confluent_kafka.c | 109 +++++++++++++++++++++- 2 files changed, 122 insertions(+), 8 deletions(-) diff --git a/src/confluent_kafka/cimpl.pyi b/src/confluent_kafka/cimpl.pyi index 4fd2bef53..f8d7b6146 100644 --- a/src/confluent_kafka/cimpl.pyi +++ b/src/confluent_kafka/cimpl.pyi @@ -76,9 +76,24 @@ class KafkaException(Exception): args: Tuple[Any, ...] class Message: - def topic(self) -> str: ... - def partition(self) -> int: ... - def offset(self) -> int: ... + def __init__(self, topic: Optional[str] = ..., partition: Optional[int] = ..., offset: Optional[int] = ..., + key: Optional[bytes] = ..., value: Optional[bytes] = ..., + headers: Optional[HeadersType] = ..., error: Optional[KafkaError] = ..., + timestamp: Optional[Tuple[int, int]] = ..., latency: Optional[float] = ..., + leader_epoch: Optional[int] = ...) -> None: ... + topic: Optional[str] + partition: Optional[int] + offset: Optional[int] + key: Optional[bytes] + value: Optional[bytes] + headers: Optional[HeadersType] + error: Optional[KafkaError] + timestamp: Tuple[int, int] + latency: Optional[float] + leader_epoch: Optional[int] + def topic(self) -> Optional[str]: ... + def partition(self) -> Optional[int]: ... + def offset(self) -> Optional[int]: ... def key(self) -> Optional[bytes]: ... def value(self) -> Optional[bytes]: ... def headers(self) -> Optional[HeadersType]: ... diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index 632ade6c5..470b40dea 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -689,13 +689,113 @@ static int Message_clear (Message *self) { return 0; } - static void Message_dealloc (Message *self) { Message_clear(self); PyObject_GC_UnTrack(self); Py_TYPE(self)->tp_free((PyObject *)self); } +static int Message_init (PyObject *self0, PyObject *args, PyObject *kwargs) { + Message *self = (Message *)self0; + PyObject *topic = NULL; + int32_t partition = RD_KAFKA_PARTITION_UA; + int64_t offset = RD_KAFKA_OFFSET_INVALID; + PyObject *key = NULL; + PyObject *value = NULL; + PyObject *headers = NULL; + PyObject *error = NULL; + PyObject *timestamp = NULL; + double latency = -1; + int32_t leader_epoch = -1; + + static char *kws[] = { "topic", + "partition", + "offset", + "key", + "value", + "headers", + "error", + "timestamp", + "latency", + "leader_epoch", + NULL }; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OiLOOOOOdi", kws, + &topic, &partition, &offset, + &key, &value, &headers, &error, + ×tamp, &latency, &leader_epoch)) { + return -1; + } + + /* Initialize all PyObject fields to NULL first */ + self->topic = NULL; + self->value = NULL; + self->key = NULL; + self->headers = NULL; +#ifdef RD_KAFKA_V_HEADERS + self->c_headers = NULL; +#endif + self->error = NULL; + + /* Set topic (string) */ + if (topic && topic != Py_None) { + Py_INCREF(topic); + self->topic = topic; + } + + /* Set key (bytes) */ + if (key && key != Py_None) { + Py_INCREF(key); + self->key = key; + } + + /* Set value (bytes) */ + if (value && value != Py_None) { + Py_INCREF(value); + self->value = value; + } + + /* Set headers (list of tuples) */ + if (headers && headers != Py_None) { + Py_INCREF(headers); + self->headers = headers; + } + + /* Set error (KafkaError) - only if provided and not None */ + if (error && error != Py_None) { + Py_INCREF(error); + self->error = error; + } + + if (timestamp && timestamp != Py_None) { + if (!PyTuple_Check(timestamp) || PyTuple_Size(timestamp) != 2) { + PyErr_SetString(PyExc_TypeError, + "timestamp must be a tuple of (int, int)"); + return -1; + } + self->tstype = (rd_kafka_timestamp_type_t)cfl_PyInt_AsInt(PyTuple_GET_ITEM(timestamp, 0)); + self->timestamp = PyLong_AsLongLong(PyTuple_GET_ITEM(timestamp, 1)); + } + else { + self->tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; + self->timestamp = 0; + } + + self->partition = partition < 0 ? RD_KAFKA_PARTITION_UA: partition; + self->offset = offset < 0 ? RD_KAFKA_OFFSET_INVALID: offset; + self->leader_epoch = leader_epoch < 0 ? -1: leader_epoch; + self->latency = (int64_t)(latency < 0 ? -1: latency * 1000000.0); + + return 0; +} + + + +static PyObject *Message_new (PyTypeObject *type, PyObject *args, + PyObject *kwargs) { + return type->tp_alloc(type, 0); +} + static int Message_traverse (Message *self, visitproc visit, void *arg) { @@ -749,8 +849,6 @@ PyTypeObject MessageType = { "object is a proper message (error() returns None) or an " "error/event.\n" "\n" - "This class is not user-instantiable.\n" - "\n" ".. py:function:: len()\n" "\n" " :returns: Message value (payload) size in bytes\n" @@ -770,8 +868,9 @@ PyTypeObject MessageType = { 0, /* tp_descr_get */ 0, /* tp_descr_set */ 0, /* tp_dictoffset */ - 0, /* tp_init */ - 0 /* tp_alloc */ + Message_init, /* tp_init */ + 0, /* tp_alloc */ + Message_new /* tp_new */ }; /** From ab7b8dfc147944354117105fb647ebb3b535c45d Mon Sep 17 00:00:00 2001 From: Ojasva Jain Date: Fri, 7 Nov 2025 13:07:59 +0530 Subject: [PATCH 2/5] add test cases for message class --- tests/test_message.py | 56 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 tests/test_message.py diff --git a/tests/test_message.py b/tests/test_message.py new file mode 100644 index 000000000..ebfac5358 --- /dev/null +++ b/tests/test_message.py @@ -0,0 +1,56 @@ +# #!/usr/bin/env python + +from confluent_kafka import Message, KafkaError + + +def test_init_no_params(): + m = Message() + assert m.topic() is None + assert m.partition() is None + assert m.offset() is None + assert m.key() is None + assert m.value() is None + assert m.headers() is None + assert m.error() is None + assert m.timestamp() == (0, 0) + assert m.latency() is None + assert m.leader_epoch() is None + +def test_init_all_params(): + m = Message(topic="test", partition=1, offset=2, key=b"key", value=b"value", headers=[("h1", "v1")], error=KafkaError(0), + timestamp=(1, 1762499956), latency=0.05, leader_epoch=1762499956) + assert m.topic() == "test" + assert m.partition() == 1 + assert m.offset() == 2 + assert m.key() == b"key" + assert m.value() == b"value" + assert m.headers() == [("h1", "v1")] + assert m.error() == KafkaError(0) + assert m.timestamp() == (1, 1762499956) + assert m.latency() == 0.05 + assert m.leader_epoch() == 1762499956 + +def test_init_negative_param_values(): + m = Message(partition=-1, offset=-1, latency=-1.0, leader_epoch=-1762499956) + assert m.partition() is None + assert m.offset() is None + assert m.latency() is None + assert m.leader_epoch() is None + + +def test_set_headers(): + m = Message() + m.set_headers([("h1", "v1")]) + assert m.headers() == [("h1", "v1")] + m.set_headers([("h2", "v2")]) + assert m.headers() == [("h2", "v2")] + +def test_set_key(): + m = Message() + m.set_key(b"key") + assert m.key() == b"key" + +def test_set_value(): + m = Message() + m.set_value(b"value") + assert m.value() == b"value" From 58511480575102ad1601cd1bf1d69195e0576573 Mon Sep 17 00:00:00 2001 From: Ojasva Jain Date: Mon, 10 Nov 2025 12:35:19 +0530 Subject: [PATCH 3/5] modified some comments --- src/confluent_kafka/src/confluent_kafka.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index 470b40dea..09c57efa9 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -761,12 +761,13 @@ static int Message_init (PyObject *self0, PyObject *args, PyObject *kwargs) { self->headers = headers; } - /* Set error (KafkaError) - only if provided and not None */ + /* Set error (KafkaError) */ if (error && error != Py_None) { Py_INCREF(error); self->error = error; } + /* Set timestamp (tuple of (timestamp_type, timestamp)) */ if (timestamp && timestamp != Py_None) { if (!PyTuple_Check(timestamp) || PyTuple_Size(timestamp) != 2) { PyErr_SetString(PyExc_TypeError, From 586a9f15bc79a6b301977d4b9ea3048ccd557d48 Mon Sep 17 00:00:00 2001 From: Ojasva Jain Date: Mon, 10 Nov 2025 13:03:55 +0530 Subject: [PATCH 4/5] remove data member definition from type hints --- src/confluent_kafka/cimpl.pyi | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/confluent_kafka/cimpl.pyi b/src/confluent_kafka/cimpl.pyi index f8d7b6146..a5ee21b80 100644 --- a/src/confluent_kafka/cimpl.pyi +++ b/src/confluent_kafka/cimpl.pyi @@ -81,16 +81,6 @@ class Message: headers: Optional[HeadersType] = ..., error: Optional[KafkaError] = ..., timestamp: Optional[Tuple[int, int]] = ..., latency: Optional[float] = ..., leader_epoch: Optional[int] = ...) -> None: ... - topic: Optional[str] - partition: Optional[int] - offset: Optional[int] - key: Optional[bytes] - value: Optional[bytes] - headers: Optional[HeadersType] - error: Optional[KafkaError] - timestamp: Tuple[int, int] - latency: Optional[float] - leader_epoch: Optional[int] def topic(self) -> Optional[str]: ... def partition(self) -> Optional[int]: ... def offset(self) -> Optional[int]: ... From 81812ae4198639d95dbcc8aab0097800d9a54095 Mon Sep 17 00:00:00 2001 From: Ojasva Jain Date: Mon, 10 Nov 2025 13:27:47 +0530 Subject: [PATCH 5/5] reformat test_message.py --- tests/test_message.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/test_message.py b/tests/test_message.py index ebfac5358..88a342a6e 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -16,8 +16,10 @@ def test_init_no_params(): assert m.latency() is None assert m.leader_epoch() is None + def test_init_all_params(): - m = Message(topic="test", partition=1, offset=2, key=b"key", value=b"value", headers=[("h1", "v1")], error=KafkaError(0), + m = Message(topic="test", partition=1, offset=2, key=b"key", value=b"value", headers=[("h1", "v1")], + error=KafkaError(0), timestamp=(1, 1762499956), latency=0.05, leader_epoch=1762499956) assert m.topic() == "test" assert m.partition() == 1 @@ -30,6 +32,7 @@ def test_init_all_params(): assert m.latency() == 0.05 assert m.leader_epoch() == 1762499956 + def test_init_negative_param_values(): m = Message(partition=-1, offset=-1, latency=-1.0, leader_epoch=-1762499956) assert m.partition() is None @@ -45,11 +48,13 @@ def test_set_headers(): m.set_headers([("h2", "v2")]) assert m.headers() == [("h2", "v2")] + def test_set_key(): m = Message() m.set_key(b"key") assert m.key() == b"key" + def test_set_value(): m = Message() m.set_value(b"value")