diff --git a/src/confluent_kafka/cimpl.pyi b/src/confluent_kafka/cimpl.pyi index 4fd2bef53..a5ee21b80 100644 --- a/src/confluent_kafka/cimpl.pyi +++ b/src/confluent_kafka/cimpl.pyi @@ -76,9 +76,14 @@ class KafkaException(Exception): args: Tuple[Any, ...] class Message: - def topic(self) -> str: ... - def partition(self) -> int: ... - def offset(self) -> int: ... + def __init__(self, topic: Optional[str] = ..., partition: Optional[int] = ..., offset: Optional[int] = ..., + key: Optional[bytes] = ..., value: Optional[bytes] = ..., + headers: Optional[HeadersType] = ..., error: Optional[KafkaError] = ..., + timestamp: Optional[Tuple[int, int]] = ..., latency: Optional[float] = ..., + leader_epoch: Optional[int] = ...) -> None: ... + def topic(self) -> Optional[str]: ... + def partition(self) -> Optional[int]: ... + def offset(self) -> Optional[int]: ... def key(self) -> Optional[bytes]: ... def value(self) -> Optional[bytes]: ... def headers(self) -> Optional[HeadersType]: ... diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index 632ade6c5..09c57efa9 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -689,13 +689,114 @@ static int Message_clear (Message *self) { return 0; } - static void Message_dealloc (Message *self) { Message_clear(self); PyObject_GC_UnTrack(self); Py_TYPE(self)->tp_free((PyObject *)self); } +static int Message_init (PyObject *self0, PyObject *args, PyObject *kwargs) { + Message *self = (Message *)self0; + PyObject *topic = NULL; + int32_t partition = RD_KAFKA_PARTITION_UA; + int64_t offset = RD_KAFKA_OFFSET_INVALID; + PyObject *key = NULL; + PyObject *value = NULL; + PyObject *headers = NULL; + PyObject *error = NULL; + PyObject *timestamp = NULL; + double latency = -1; + int32_t leader_epoch = -1; + + static char *kws[] = { "topic", + "partition", + "offset", + "key", + "value", + "headers", + "error", + "timestamp", + "latency", + "leader_epoch", + NULL }; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OiLOOOOOdi", kws, + &topic, &partition, &offset, + &key, &value, &headers, &error, + ×tamp, &latency, &leader_epoch)) { + return -1; + } + + /* Initialize all PyObject fields to NULL first */ + self->topic = NULL; + self->value = NULL; + self->key = NULL; + self->headers = NULL; +#ifdef RD_KAFKA_V_HEADERS + self->c_headers = NULL; +#endif + self->error = NULL; + + /* Set topic (string) */ + if (topic && topic != Py_None) { + Py_INCREF(topic); + self->topic = topic; + } + + /* Set key (bytes) */ + if (key && key != Py_None) { + Py_INCREF(key); + self->key = key; + } + + /* Set value (bytes) */ + if (value && value != Py_None) { + Py_INCREF(value); + self->value = value; + } + + /* Set headers (list of tuples) */ + if (headers && headers != Py_None) { + Py_INCREF(headers); + self->headers = headers; + } + + /* Set error (KafkaError) */ + if (error && error != Py_None) { + Py_INCREF(error); + self->error = error; + } + + /* Set timestamp (tuple of (timestamp_type, timestamp)) */ + if (timestamp && timestamp != Py_None) { + if (!PyTuple_Check(timestamp) || PyTuple_Size(timestamp) != 2) { + PyErr_SetString(PyExc_TypeError, + "timestamp must be a tuple of (int, int)"); + return -1; + } + self->tstype = (rd_kafka_timestamp_type_t)cfl_PyInt_AsInt(PyTuple_GET_ITEM(timestamp, 0)); + self->timestamp = PyLong_AsLongLong(PyTuple_GET_ITEM(timestamp, 1)); + } + else { + self->tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; + self->timestamp = 0; + } + + self->partition = partition < 0 ? RD_KAFKA_PARTITION_UA: partition; + self->offset = offset < 0 ? RD_KAFKA_OFFSET_INVALID: offset; + self->leader_epoch = leader_epoch < 0 ? -1: leader_epoch; + self->latency = (int64_t)(latency < 0 ? -1: latency * 1000000.0); + + return 0; +} + + + +static PyObject *Message_new (PyTypeObject *type, PyObject *args, + PyObject *kwargs) { + return type->tp_alloc(type, 0); +} + static int Message_traverse (Message *self, visitproc visit, void *arg) { @@ -749,8 +850,6 @@ PyTypeObject MessageType = { "object is a proper message (error() returns None) or an " "error/event.\n" "\n" - "This class is not user-instantiable.\n" - "\n" ".. py:function:: len()\n" "\n" " :returns: Message value (payload) size in bytes\n" @@ -770,8 +869,9 @@ PyTypeObject MessageType = { 0, /* tp_descr_get */ 0, /* tp_descr_set */ 0, /* tp_dictoffset */ - 0, /* tp_init */ - 0 /* tp_alloc */ + Message_init, /* tp_init */ + 0, /* tp_alloc */ + Message_new /* tp_new */ }; /** diff --git a/tests/test_message.py b/tests/test_message.py new file mode 100644 index 000000000..88a342a6e --- /dev/null +++ b/tests/test_message.py @@ -0,0 +1,61 @@ +# #!/usr/bin/env python + +from confluent_kafka import Message, KafkaError + + +def test_init_no_params(): + m = Message() + assert m.topic() is None + assert m.partition() is None + assert m.offset() is None + assert m.key() is None + assert m.value() is None + assert m.headers() is None + assert m.error() is None + assert m.timestamp() == (0, 0) + assert m.latency() is None + assert m.leader_epoch() is None + + +def test_init_all_params(): + m = Message(topic="test", partition=1, offset=2, key=b"key", value=b"value", headers=[("h1", "v1")], + error=KafkaError(0), + timestamp=(1, 1762499956), latency=0.05, leader_epoch=1762499956) + assert m.topic() == "test" + assert m.partition() == 1 + assert m.offset() == 2 + assert m.key() == b"key" + assert m.value() == b"value" + assert m.headers() == [("h1", "v1")] + assert m.error() == KafkaError(0) + assert m.timestamp() == (1, 1762499956) + assert m.latency() == 0.05 + assert m.leader_epoch() == 1762499956 + + +def test_init_negative_param_values(): + m = Message(partition=-1, offset=-1, latency=-1.0, leader_epoch=-1762499956) + assert m.partition() is None + assert m.offset() is None + assert m.latency() is None + assert m.leader_epoch() is None + + +def test_set_headers(): + m = Message() + m.set_headers([("h1", "v1")]) + assert m.headers() == [("h1", "v1")] + m.set_headers([("h2", "v2")]) + assert m.headers() == [("h2", "v2")] + + +def test_set_key(): + m = Message() + m.set_key(b"key") + assert m.key() == b"key" + + +def test_set_value(): + m = Message() + m.set_value(b"value") + assert m.value() == b"value"