Skip to content

Commit de2fc04

Browse files
committed
Adjusted to reuse exit logic
1 parent a091266 commit de2fc04

File tree

3 files changed

+22
-83
lines changed

3 files changed

+22
-83
lines changed

DEVELOPER.md

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -178,26 +178,6 @@ Integration tests (may require local/CI Kafka cluster; see tests/README.md):
178178
pytest -q tests/integration
179179
```
180180
181-
## Local Setup with UV
182-
183-
Tested with python 3.11
184-
185-
```bash
186-
# Modify pyproject.toml to require python version >=3.11
187-
# This fixes the cel-python dependency conflict
188-
uv venv --python 3.11
189-
source .venv/bin/activate
190-
191-
uv sync --extra dev --extra tests
192-
uv pip install trivup setuptools
193-
pytest tests/
194-
195-
# When making changes, change project.version in pyproject.toml before re-running:
196-
uv sync --extra dev --extra tests
197-
198-
```
199-
200-
201181
## Tests
202182
203183
See [tests/README.md](tests/README.md) for instructions on how to run tests.

src/confluent_kafka/src/Producer.c

Lines changed: 17 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -422,37 +422,32 @@ static PyObject *Producer_flush (Handle *self, PyObject *args,
422422

423423

424424
static PyObject *Producer_close(Handle *self, PyObject *args, PyObject *kwargs) {
425-
426-
double tmout = 1; // Default timeout is 1 second for close to clear rather than indefinitely
427-
static char *kws[] = { "timeout", NULL };
428425
rd_kafka_resp_err_t err;
429426
CallState cs;
430427

431-
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout))
432-
return NULL;
433-
434428
if (!self->rk)
435429
Py_RETURN_TRUE;
436430

437431
CallState_begin(self, &cs);
438432

439-
// Flush any remaining messages before closing if possible
440-
err = rd_kafka_flush(self->rk, cfl_timeout_ms(tmout));
441-
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
442-
PyErr_WarnFormat(PyExc_RuntimeWarning, 1,
443-
"Producer flush failed during close: %s",
444-
rd_kafka_err2str(err));
445-
}
446-
rd_kafka_destroy(self->rk);
447-
rd_kafka_log_print(self->rk, CK_LOG_INFO, "CLOSEINF", "Producer destroy requested");
433+
/* Flush any pending messages (wait indefinitely to ensure delivery) */
434+
err = rd_kafka_flush(self->rk, -1);
448435

449-
self->rk = NULL;
436+
/* Destroy the producer (even if flush had issues) */
437+
rd_kafka_destroy(self->rk);
438+
self->rk = NULL;
450439

451-
if (!CallState_end(self, &cs))
452-
return NULL;
440+
if (!CallState_end(self, &cs))
441+
return NULL;
453442

454-
Py_RETURN_TRUE;
443+
/* If flush failed, warn but don't suppress original exception */
444+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
445+
PyErr_WarnFormat(PyExc_RuntimeWarning, 1,
446+
"Producer flush failed during close: %s",
447+
rd_kafka_err2str(err));
448+
}
455449

450+
Py_RETURN_TRUE;
456451
}
457452

458453

@@ -942,27 +937,7 @@ static PyObject *Producer_exit (Handle *self, PyObject *args) {
942937
&exc_type, &exc_value, &exc_traceback))
943938
return NULL;
944939

945-
/* Cleanup: flush pending messages and destroy producer */
946-
if (self->rk) {
947-
CallState_begin(self, &cs);
948-
949-
/* Flush any pending messages (wait indefinitely to ensure delivery) */
950-
err = rd_kafka_flush(self->rk, -1);
951-
952-
/* Destroy the producer (even if flush had issues) */
953-
rd_kafka_destroy(self->rk);
954-
self->rk = NULL;
955-
956-
if (!CallState_end(self, &cs))
957-
return NULL;
958-
959-
/* If flush failed, warn but don't suppress original exception */
960-
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
961-
PyErr_WarnFormat(PyExc_RuntimeWarning, 1,
962-
"Producer flush failed during context exit: %s",
963-
rd_kafka_err2str(err));
964-
}
965-
}
940+
Producer_close(self, (PyObject *)NULL, (PyObject *)NULL);
966941

967942
/* Return None to propagate any exceptions from the with block */
968943
Py_RETURN_NONE;
@@ -1020,11 +995,10 @@ static PyMethodDef Producer_methods[] = {
1020995
"\n"
1021996
},
1022997
{ "close", (PyCFunction)Producer_close, METH_VARARGS|METH_KEYWORDS,
1023-
".. py:function:: close([timeout])\n"
998+
".. py:function:: close()\n"
1024999
"\n"
1025-
" Request to close the producer on demand with an optional timeout.\n"
1000+
" Request to close the producer on demand.\n"
10261001
"\n"
1027-
" :param: float timeout: Maximum time to block (default 1 second). (Seconds)\n"
10281002
" :rtype: bool\n"
10291003
" :returns: True if producer close requested successfully, False otherwise\n"
10301004
"\n"

tests/test_Producer.py

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,24 +1414,9 @@ def test_producer_close():
14141414
'message.timeout.ms': 10
14151415
}
14161416
producer = Producer(conf)
1417-
producer.produce('mytopic', value='somedata', key='a key')
1417+
cb_detector = {"on_delivery_called": False}
1418+
def on_delivery(err, msg):
1419+
cb_detector["on_delivery_called"] = True
1420+
producer.produce('mytopic', value='somedata', key='a key', callback=on_delivery)
14181421
assert producer.close(), "The producer could not be closed on demand"
1419-
# Ensure no messages remain in the flush buffer after close
1420-
assert len(producer) == 0
1421-
1422-
1423-
def test_producer_close_with_timeout():
1424-
"""
1425-
Ensures the producer close can be requested on demand
1426-
"""
1427-
conf = {
1428-
'debug': 'all',
1429-
'socket.timeout.ms': 10,
1430-
'error_cb': error_cb,
1431-
'message.timeout.ms': 10
1432-
}
1433-
producer = Producer(conf)
1434-
producer.produce('mytopic', value='somedata', key='a key')
1435-
assert producer.close(0.1), "The producer could not be closed on demand with timeout"
1436-
# Ensure no messages remain in the flush buffer after close
1437-
assert len(producer) == 0
1422+
assert cb_detector["on_delivery_called"], "The delivery callback should have been called by flushing during close"

0 commit comments

Comments
 (0)