Skip to content

Commit 843e6f7

Browse files
committed
FIX: Improve PRNG seeding on Windows to ensure uniqueness of generated numbers.
Fix for the pseudo-random seed on Windows. The function `rand_r` isn't present on Windows and the global seed wasn't based on the current microseconds and thread id. Also it wasn't called on every thread as required on this platform but only once per process. The fix allows on this platform the uniqueness of client side member id generation in next-generation consumer group protocol. Happening since 1.x
1 parent 477eb2e commit 843e6f7

File tree

10 files changed

+105
-27
lines changed

10 files changed

+105
-27
lines changed

CHANGELOG.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,33 @@
1+
# librdkafka v2.13.0
2+
3+
librdkafka v2.13.0 is a feature release:
4+
5+
* Fix memory management for interceptors in rd_kafka_conf to prevent
6+
double-free errors (#5240).
7+
* Fix for the pseudo-random generator seed on Windows involving as well
8+
the uniqueness of the new consumer group protocol member id (#).
9+
10+
11+
## Fixes
12+
13+
### General fixes
14+
15+
* Issues: #4142.
16+
Fix memory management for interceptors in rd_kafka_conf to prevent double-free errors.
17+
In case the client instance fails the users needs to destroy the configuration
18+
data structure, it was causing a double-free because the interceptors were
19+
already freed in the constructor.
20+
Happening since 1.x (#5240).
21+
* Issues: #.
22+
Fix for the pseudo-random seed on Windows. The function `rand_r` isn't present
23+
on Windows and the global seed wasn't based on the current microseconds and thread
24+
id. Also it wasn't called on every thread as required on this platform but
25+
only once per process. The fix allows on this platform the uniqueness of client side
26+
member id generation in next-generation consumer group protocol.
27+
Happening since 1.x (#).
28+
29+
30+
131
# librdkafka v2.12.1
232

333
librdkafka v2.12.1 is a maintenance release:

src/rdkafka.c

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
#include "rdkafka_interceptor.h"
5858
#include "rdkafka_idempotence.h"
5959
#include "rdkafka_sasl_oauthbearer.h"
60+
#include "rdmurmur2.h"
6061
#if WITH_OAUTHBEARER_OIDC
6162
#include "rdkafka_sasl_oauthbearer_oidc.h"
6263
#endif
@@ -82,8 +83,14 @@
8283
#endif
8384

8485

85-
static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT;
86-
static once_flag rd_kafka_global_srand_once = ONCE_FLAG_INIT;
86+
static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT;
87+
#ifdef _WIN32
88+
/* On Windows srand needs to be called on each thread. */
89+
static RD_TLS once_flag rd_kafka_srand_once = ONCE_FLAG_INIT;
90+
#else
91+
static once_flag rd_kafka_srand_once = ONCE_FLAG_INIT;
92+
#endif
93+
8794

8895
/**
8996
* @brief Global counter+lock for all active librdkafka instances
@@ -130,6 +137,22 @@ void rd_kafka_set_thread_name(const char *fmt, ...) {
130137
*/
131138
static char RD_TLS rd_kafka_thread_sysname[16] = "app";
132139

140+
/**
141+
* @brief Seed the PRNG with current microseconds and thread ID.
142+
*/
143+
static void rd_kafka_srand(void) {
144+
unsigned int seed = 0;
145+
struct timeval tv;
146+
rd_gettimeofday(&tv, NULL);
147+
seed = (unsigned int)(tv.tv_usec);
148+
seed ^= thrd_current_id();
149+
150+
/* Apply the murmur2 hash to distribute entropy to
151+
* the whole seed. */
152+
seed = (unsigned int)rd_murmur2(&seed, sizeof(seed));
153+
srand(seed);
154+
}
155+
133156
void rd_kafka_set_thread_sysname(const char *fmt, ...) {
134157
va_list ap;
135158

@@ -141,6 +164,30 @@ void rd_kafka_set_thread_sysname(const char *fmt, ...) {
141164
thrd_setname(rd_kafka_thread_sysname);
142165
}
143166

167+
/**
168+
* @brief Seed the PRNG for the current thread or for the whole process.
169+
* Depending on the platform implementation of srand() the seed can
170+
* be a thread local or global one. In case it's thread local we
171+
* need to call it on each thread.
172+
*
173+
* @param rk Client instance.
174+
* @param internal_thread If true, seed the PRNG if
175+
* it's required per-thread.
176+
*/
177+
void rd_kafka_thread_srand(rd_kafka_t *rk, rd_bool_t internal_thread) {
178+
#ifdef _WIN32
179+
rd_bool_t required_per_thread = rd_true;
180+
#else
181+
rd_bool_t required_per_thread = rd_false;
182+
#endif
183+
if ((required_per_thread &&
184+
(rk->rk_conf.enable_random_seed || internal_thread)) ||
185+
(!required_per_thread && rk->rk_conf.enable_random_seed &&
186+
!internal_thread)) {
187+
call_once(&rd_kafka_srand_once, rd_kafka_srand);
188+
}
189+
}
190+
144191
static void rd_kafka_global_init0(void) {
145192
cJSON_Hooks json_hooks = {.malloc_fn = rd_malloc, .free_fn = rd_free};
146193

@@ -171,18 +218,6 @@ void rd_kafka_global_init(void) {
171218
}
172219

173220

174-
/**
175-
* @brief Seed the PRNG with current_time.milliseconds
176-
*/
177-
static void rd_kafka_global_srand(void) {
178-
struct timeval tv;
179-
180-
rd_gettimeofday(&tv, NULL);
181-
182-
srand((unsigned int)(tv.tv_usec / 1000));
183-
}
184-
185-
186221
/**
187222
* @returns the current number of active librdkafka instances
188223
*/
@@ -2218,6 +2253,7 @@ static int rd_kafka_thread_main(void *arg) {
22182253

22192254
rd_kafka_set_thread_name("main");
22202255
rd_kafka_set_thread_sysname("rdk:main");
2256+
rd_kafka_thread_srand(rk, rd_true /* we're in an internal thread */);
22212257

22222258
rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_MAIN);
22232259

@@ -2367,10 +2403,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
23672403
* freed from rd_kafka_destroy_internal()
23682404
* as the rk itself is destroyed. */
23692405

2370-
/* Seed PRNG, don't bother about HAVE_RAND_R, since it is pretty cheap.
2371-
*/
2372-
if (rk->rk_conf.enable_random_seed)
2373-
call_once(&rd_kafka_global_srand_once, rd_kafka_global_srand);
2406+
rd_kafka_thread_srand(rk, rd_false /* we're on an app thread */);
23742407

23752408
/* Call on_new() interceptors */
23762409
rd_kafka_interceptors_on_new(rk, &rk->rk_conf);

src/rdkafka_background.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ int rd_kafka_background_thread_main(void *arg) {
111111

112112
rd_kafka_set_thread_name("background");
113113
rd_kafka_set_thread_sysname("rdk:bg");
114+
rd_kafka_thread_srand(rk, rd_true /* we're in an internal thread */);
114115

115116
rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_BACKGROUND);
116117

src/rdkafka_broker.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4512,6 +4512,7 @@ static int rd_kafka_broker_thread_main(void *arg) {
45124512

45134513
rd_kafka_set_thread_name("%s", rkb->rkb_name);
45144514
rd_kafka_set_thread_sysname("rdk:broker%" PRId32, rkb->rkb_nodeid);
4515+
rd_kafka_thread_srand(rk, rd_true /* we're in an internal thread */);
45154516

45164517
rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_BROKER);
45174518

src/rdkafka_int.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,6 +1122,7 @@ extern char RD_TLS rd_kafka_thread_name[64];
11221122

11231123
void rd_kafka_set_thread_name(const char *fmt, ...) RD_FORMAT(printf, 1, 2);
11241124
void rd_kafka_set_thread_sysname(const char *fmt, ...) RD_FORMAT(printf, 1, 2);
1125+
void rd_kafka_thread_srand(rd_kafka_t *rk, rd_bool_t internal_thread);
11251126

11261127
int rd_kafka_path_is_dir(const char *path);
11271128
rd_bool_t rd_kafka_dir_is_empty(const char *path);

src/rdkafka_mock.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1490,6 +1490,8 @@ static int rd_kafka_mock_cluster_thread_main(void *arg) {
14901490

14911491
rd_kafka_set_thread_name("mock");
14921492
rd_kafka_set_thread_sysname("rdk:mock");
1493+
rd_kafka_thread_srand(mcluster->rk,
1494+
rd_true /* we're in an internal thread */);
14931495
rd_kafka_interceptors_on_thread_start(mcluster->rk,
14941496
RD_KAFKA_THREAD_BACKGROUND);
14951497
rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);

src/rdkafka_ssl.c

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1993,14 +1993,7 @@ rd_kafka_transport_ssl_lock_cb(int mode, int i, const char *file, int line) {
19931993
#endif
19941994

19951995
static RD_UNUSED unsigned long rd_kafka_transport_ssl_threadid_cb(void) {
1996-
#ifdef _WIN32
1997-
/* Windows makes a distinction between thread handle
1998-
* and thread id, which means we can't use the
1999-
* thrd_current() API that returns the handle. */
2000-
return (unsigned long)GetCurrentThreadId();
2001-
#else
2002-
return (unsigned long)(intptr_t)thrd_current();
2003-
#endif
1996+
return thrd_current_id();
20041997
}
20051998

20061999
#ifdef HAVE_OPENSSL_CRYPTO_THREADID_SET_CALLBACK

src/rdrand.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ int rd_jitter(int low, int high) {
4242
struct timeval tv;
4343
rd_gettimeofday(&tv, NULL);
4444
seed = (unsigned int)(tv.tv_usec);
45-
seed ^= (unsigned int)(intptr_t)thrd_current();
45+
seed ^= thrd_current_id();
4646

4747
/* When many threads are created at the same time and the
4848
* thread id is different only by a few bits it's possible that

src/tinycthread_extra.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,17 @@ int thrd_is_current(thrd_t thr) {
5959
#endif
6060
}
6161

62+
unsigned long thrd_current_id(void) {
63+
#ifdef _WIN32
64+
/* Windows makes a distinction between thread handle
65+
* and thread id, which means we can't use the
66+
* thrd_current() API that returns the handle. */
67+
return (unsigned long)GetCurrentThreadId();
68+
#else
69+
return (unsigned long)(intptr_t)thrd_current();
70+
#endif
71+
}
72+
6273

6374
#ifdef _WIN32
6475
void cnd_wait_enter(cnd_t *cond) {

src/tinycthread_extra.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ int thrd_setname(const char *name);
5555
*/
5656
int thrd_is_current(thrd_t thr);
5757

58+
/**
59+
* @brief Get current thread ID as an unsigned long.
60+
* @return Current thread ID.
61+
*/
62+
unsigned long thrd_current_id(void);
63+
5864

5965
#ifdef _WIN32
6066
/**

0 commit comments

Comments
 (0)