Skip to content

Commit a091266

Browse files
committed
Test commit
1 parent 07b68df commit a091266

File tree

2 files changed

+14
-6
lines changed

2 files changed

+14
-6
lines changed

src/confluent_kafka/src/Producer.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,11 @@ static PyObject *Producer_close(Handle *self, PyObject *args, PyObject *kwargs)
438438

439439
// Flush any remaining messages before closing if possible
440440
err = rd_kafka_flush(self->rk, cfl_timeout_ms(tmout));
441+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
442+
PyErr_WarnFormat(PyExc_RuntimeWarning, 1,
443+
"Producer flush failed during close: %s",
444+
rd_kafka_err2str(err));
445+
}
441446
rd_kafka_destroy(self->rk);
442447
rd_kafka_log_print(self->rk, CK_LOG_INFO, "CLOSEINF", "Producer destroy requested");
443448

tests/test_Producer.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33
import gc
4-
import json
54
import pytest
65
import threading
76
import time
@@ -1403,6 +1402,7 @@ def __init__(self, config):
14031402
# Test __len__() - should return 0 for closed producer (safe, no crash)
14041403
assert len(producer) == 0
14051404

1405+
14061406
def test_producer_close():
14071407
"""
14081408
Ensures the producer close can be requested on demand
@@ -1414,9 +1414,11 @@ def test_producer_close():
14141414
'message.timeout.ms': 10
14151415
}
14161416
producer = Producer(conf)
1417-
msg = {"test": "test"}
1418-
producer.produce(json.dumps(msg))
1417+
producer.produce('mytopic', value='somedata', key='a key')
14191418
assert producer.close(), "The producer could not be closed on demand"
1419+
# Ensure no messages remain in the flush buffer after close
1420+
assert len(producer) == 0
1421+
14201422

14211423
def test_producer_close_with_timeout():
14221424
"""
@@ -1429,6 +1431,7 @@ def test_producer_close_with_timeout():
14291431
'message.timeout.ms': 10
14301432
}
14311433
producer = Producer(conf)
1432-
msg = {"test": "test"}
1433-
producer.produce(json.dumps(msg))
1434-
assert producer.close(0.1), "The producer could not be closed on demand with timeout"
1434+
producer.produce('mytopic', value='somedata', key='a key')
1435+
assert producer.close(0.1), "The producer could not be closed on demand with timeout"
1436+
# Ensure no messages remain in the flush buffer after close
1437+
assert len(producer) == 0

0 commit comments

Comments
 (0)