Skip to content

Commit 104f627

Browse files
authored
rmap: fix SetAndWait race (#62)
* rmap: fix SetAndWait race * pool: make jump hash concurrency-safe
1 parent a2ebdc3 commit 104f627

File tree

2 files changed

+13
-9
lines changed

2 files changed

+13
-9
lines changed

pool/node.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ type (
7272

7373
// jumpHash implement Jump Consistent Hash.
7474
jumpHash struct {
75-
h hash.Hash64
75+
mu sync.Mutex
76+
h hash.Hash64
7677
}
7778
)
7879

@@ -245,7 +246,7 @@ func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...No
245246
workerTTL: o.workerTTL,
246247
workerShutdownTTL: o.workerShutdownTTL,
247248
ackGracePeriod: o.ackGracePeriod,
248-
h: jumpHash{crc64.New(crc64.MakeTable(crc64.ECMA))},
249+
h: &jumpHash{h: crc64.New(crc64.MakeTable(crc64.ECMA))},
249250
stop: make(chan struct{}),
250251
closed: closed,
251252
rdb: rdb,
@@ -1404,13 +1405,18 @@ func (node *Node) maps() []*rmap.Map {
14041405

14051406
// Hash implements the Jump Consistent Hash algorithm.
14061407
// See https://arxiv.org/ftp/arxiv/papers/1406/1406.2294.pdf for details.
1407-
func (jh jumpHash) Hash(key string, numBuckets int64) int64 {
1408+
func (jh *jumpHash) Hash(key string, numBuckets int64) int64 {
14081409
var b int64 = -1
14091410
var j int64
14101411

1412+
jh.mu.Lock()
14111413
jh.h.Reset()
1412-
io.WriteString(jh.h, key) // nolint: errcheck
1414+
_, err := io.WriteString(jh.h, key)
14131415
sum := jh.h.Sum64()
1416+
jh.mu.Unlock()
1417+
if err != nil {
1418+
panic(fmt.Errorf("jumpHash: write key: %w", err))
1419+
}
14141420

14151421
for j < numBuckets {
14161422
b = j

rmap/map.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -258,9 +258,8 @@ func (sm *Map) SetAndWait(ctx context.Context, key, value string) (string, error
258258
cancel: cancel,
259259
}
260260

261-
// First mark the channel as closing before removing from waiters
262261
defer func() {
263-
// Cancel first to prevent new sends
262+
// Cancel first so notifiers prefer to drop any late sends.
264263
cancel()
265264

266265
// Remove waiter under lock
@@ -280,9 +279,6 @@ func (sm *Map) SetAndWait(ctx context.Context, key, value string) (string, error
280279
}
281280
}
282281
sm.wlock.Unlock()
283-
284-
// Now safe to close channel as no more sends will occur
285-
close(notifyCh)
286282
}()
287283

288284
// Prepare new waiters list under lock
@@ -681,6 +677,8 @@ func (sm *Map) run() {
681677
case <-waiter.ctx.Done():
682678
// Waiter was cancelled or timed out
683679
continue
680+
default:
681+
// Non-blocking to avoid stalling the map loop if the waiter is no longer receiving.
684682
}
685683
}
686684
}

0 commit comments

Comments
 (0)