diff --git a/CHANGELOG.md b/CHANGELOG.md index ea739a78..93c84731 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fixed leader election to track explicit database-issued leadership terms, reducing handoff flakiness and same-client reacquisition edge cases while making reelection and resign target the current leadership lease instead of a stale one. [PR #1213](https://github.com/riverqueue/river/pull/1213). + ## [0.34.0] - 2026-04-08 ### Added diff --git a/internal/leadership/doc.go b/internal/leadership/doc.go new file mode 100644 index 00000000..63ff7226 --- /dev/null +++ b/internal/leadership/doc.go @@ -0,0 +1,117 @@ +// Package leadership implements leader election for River clients sharing a +// database schema. +// +// The database records at most one current leadership term at a time. The +// elected client runs distributed maintenance work such as queue management, +// job scheduling, and reindexing that should not be duplicated across clients. +// +// # Overview +// +// Leadership is modeled as a database-backed lease with an explicit term +// identity. +// +// A term is identified by: +// - `leader_id`: the stable client identity +// - `elected_at`: the database-issued timestamp for that specific term +// +// The database is authoritative for: +// - which client currently holds the leadership row +// - whether a term can be renewed +// - whether a term has already been replaced +// +// The process uses local time only to bound how long it trusts its last +// successful elect or reelect result. If it cannot renew a term in time, it +// steps down conservatively instead of continuing to act as leader on stale +// information. +// +// # State Model +// +// At a high level, an elector alternates between follower and leader states: +// +// Start +// │ +// ▼ +// ┌─────────────────────────────────────┐ +// │ Follower │ +// │ │ +// │ Retries election on timer or wakeup │ +// └──────────────────┬──────────────────┘ +// │ won election +// ▼ +// ┌─────────────────────────────────────┐ +// │ Leader │ +// │ │ +// │ Renews before trust window expires │ +// └──────────────────┬──────────────────┘ +// │ replaced / expired / resign requested / +// │ renewal failed for too long / shutdown +// ▼ +// Follower +// +// Followers attempt election periodically and can wake early when they learn +// that the previous leader resigned. Leaders renew their current term +// periodically. If renewal fails, the term is replaced, or the local trust +// window expires, the process stops acting as leader and returns to follower +// behavior. +// +// # Trust Window +// +// After each successful election or renewal, the elector computes a local +// trust deadline: +// +// trustedUntil = attemptStarted + TTL - safetyMargin +// +// This trust window has two important properties: +// - it is anchored to when the elect or reelect attempt started, so a slow +// successful database round trip cannot stretch leadership longer than the +// attempt budget allows +// - it ends before the database lease should expire, giving the process time +// to step down before it risks acting on a stale term +// +// The local trust window is a conservative stop condition, not an alternative +// source of truth. A client may step down while the database row is still +// present, but it should not continue acting as leader after it no longer +// trusts its last successful renewal. +// +// # Term-Scoped Operations +// +// Renewing and resigning are scoped to the exact term identified by +// `(leader_id, elected_at)`. +// +// That means: +// - an old term cannot accidentally renew a newer term for the same client +// - a delayed resign from an old term cannot delete a newer term for the +// same client +// - when the database says a term is gone, the elector can step down without +// ambiguity about which term it held +// +// # Notifications and Subscribers +// +// When a notifier is available, the elector listens for leadership-related +// events so followers can wake promptly and leaders can honor explicit +// resignation requests. +// +// Notification delivery is intentionally non-blocking: +// - wakeups may coalesce, because multiple rapid resignations only need to +// prompt another election attempt +// - polling remains the fallback when notifications are unavailable or missed +// +// Consumers inside the process can subscribe to leadership transitions. Those +// subscriptions preserve ordered `true`/`false` transitions so downstream +// maintenance components can reliably start and stop work, while still keeping +// slow subscribers from blocking the elector itself. +// +// # Failure Handling +// +// The system is intentionally conservative under failures: +// - if renewal errors persist until the trust window is exhausted, the leader +// steps down +// - if the database reports that the current term no longer exists, the +// leader steps down immediately +// - if resignation fails during shutdown or after a local timeout, the +// database lease expiry remains the safety net that eventually allows a new +// election +// +// This design keeps leadership decisions centered on the database while using +// local time only to stop trusting stale state sooner rather than later. +package leadership diff --git a/internal/leadership/elector.go b/internal/leadership/elector.go index 53b2d5f9..c9008582 100644 --- a/internal/leadership/elector.go +++ b/internal/leadership/elector.go @@ -20,12 +20,14 @@ import ( "github.com/riverqueue/river/rivershared/util/randutil" "github.com/riverqueue/river/rivershared/util/serviceutil" "github.com/riverqueue/river/rivershared/util/testutil" + "github.com/riverqueue/river/rivertype" ) const ( - electIntervalDefault = 5 * time.Second - electIntervalJitterDefault = 1 * time.Second - electIntervalTTLPaddingDefault = 10 * time.Second + electIntervalDefault = 5 * time.Second + electIntervalJitterDefault = 1 * time.Second + electIntervalTTLPaddingDefault = 10 * time.Second + leaderLocalDeadlineSafetyMargin = 1 * time.Second ) type DBNotification struct { @@ -45,16 +47,29 @@ type Notification struct { Timestamp time.Time } +// Subscription is a client-facing stream of leadership transitions. +// +// The elector publishes every transition (`false -> true -> false -> ...`) to +// each subscription. Delivery is delegated to a subscriptionRelay so the +// elector never blocks on a slow listener. type Subscription struct { creationTime time.Time - ch chan *Notification + relay *subscriptionRelay unlistenOnce *sync.Once e *Elector } func (s *Subscription) C() <-chan *Notification { - return s.ch + return s.relay.C() +} + +func (s *Subscription) enqueue(notification *Notification) { + s.relay.enqueue(notification) +} + +func (s *Subscription) stop() { + s.relay.stop() } func (s *Subscription) Unlisten() { @@ -63,6 +78,111 @@ func (s *Subscription) Unlisten() { }) } +// subscriptionRelay decouples elector publication from subscriber consumption. +// +// The elector may need to publish `true` and `false` transitions promptly while +// maintenance components are still busy reacting to the previous one. A plain +// buffered channel would either block the elector or force us to drop +// transitions when the buffer filled. The relay solves that by: +// - appending every Notification to an in-memory FIFO queue +// - waking a dedicated goroutine via `pendingChan` +// - letting that goroutine drain queued notifications into the subscriber's +// public channel `ch` +// +// `pendingChan` is only a wakeup signal. It does not carry the notifications +// themselves, so multiple sends may coalesce while the goroutine is already +// awake. That is safe because the authoritative queue is pendingNotifications. +type subscriptionRelay struct { + ch chan *Notification // public per-subscription delivery channel + done chan struct{} // closes the relay goroutine during Unlisten/stop + pendingChan chan struct{} // coalesced wakeup signal that queued work exists + + // pendingNotifications preserves every leadership transition in order. + // A dedicated goroutine drains it into `ch` so slow subscribers cannot + // block the elector, but consumers like QueueMaintainerLeader still see + // each `false` transition instead of only the latest state. + pendingMu sync.Mutex + pendingNotifications []*Notification +} + +func newSubscriptionRelay() *subscriptionRelay { + relay := &subscriptionRelay{ + ch: make(chan *Notification, 1), + done: make(chan struct{}), + pendingChan: make(chan struct{}, 1), + } + + go relay.run() + + return relay +} + +func (r *subscriptionRelay) C() <-chan *Notification { + return r.ch +} + +// enqueue appends a transition to the pending FIFO, then nudges the relay +// goroutine. The notification argument is the exact transition to preserve in +// order; unlike the notifier wakeup path elsewhere in the elector, these items +// must not be coalesced or replaced. +func (r *subscriptionRelay) enqueue(notification *Notification) { + r.pendingMu.Lock() + r.pendingNotifications = append(r.pendingNotifications, notification) + r.pendingMu.Unlock() + + select { + case r.pendingChan <- struct{}{}: + default: + } +} + +// nextPending pops the next queued notification for the relay goroutine. +func (r *subscriptionRelay) nextPending() (*Notification, bool) { + r.pendingMu.Lock() + defer r.pendingMu.Unlock() + + if len(r.pendingNotifications) == 0 { + return nil, false + } + + notification := r.pendingNotifications[0] + r.pendingNotifications = r.pendingNotifications[1:] + return notification, true +} + +// run waits until queued work exists, then drains as many pending +// notifications as possible into the subscriber channel before sleeping again. +// It exits promptly when stop closes done. +func (r *subscriptionRelay) run() { + for { + select { + case <-r.done: + return + + case <-r.pendingChan: + } + + for { + notification, ok := r.nextPending() + if !ok { + break + } + + select { + case <-r.done: + return + case r.ch <- notification: + } + } + } +} + +// stop terminates the relay goroutine. Callers must ensure they stop enqueueing +// through the owning Subscription afterwards. +func (r *subscriptionRelay) stop() { + close(r.done) +} + // Test-only properties. type electorTestSignals struct { DeniedLeadership testsignal.TestSignal[struct{}] // notifies when elector fails to gain leadership @@ -102,16 +222,58 @@ type Elector struct { baseservice.BaseService startstop.BaseStartStop - config *Config - exec riverdriver.Executor - leaderResignedChan chan struct{} - notifier *notifier.Notifier - requestResignChan chan struct{} - testSignals electorTestSignals + config *Config + exec riverdriver.Executor + notifier *notifier.Notifier + testSignals electorTestSignals + wakeupChan chan struct{} + + mu sync.Mutex + isLeader bool + pendingRequestResign bool + subscriptions []*Subscription +} - mu sync.Mutex - isLeader bool - subscriptions []*Subscription +type leadershipTerm struct { + clientID string + electedAt time.Time + trustedUntil time.Time +} + +func newLeadershipTerm(clientID string, electedAt, attemptStarted time.Time, ttl time.Duration) leadershipTerm { + term := leadershipTerm{ + clientID: clientID, + electedAt: electedAt, + } + + trustDuration := ttl - leaderLocalDeadlineSafetyMargin + if trustDuration <= 0 { + term.trustedUntil = attemptStarted + return term + } + + term.trustedUntil = attemptStarted.Add(trustDuration) + return term +} + +func (t leadershipTerm) remaining(now time.Time) time.Duration { + if !t.trustedUntil.After(now) { + return 0 + } + + return t.trustedUntil.Sub(now) +} + +func (t leadershipTerm) reelectAttemptTimeout(now time.Time) time.Duration { + remainingDuration := t.remaining(now) + if remainingDuration <= 0 { + return 0 + } + if remainingDuration < deadlineTimeout { + return remainingDuration + } + + return deadlineTimeout } // NewElector returns an Elector using the given adapter. The name should correspond @@ -130,20 +292,26 @@ func NewElector(archetype *baseservice.Archetype, exec riverdriver.Executor, not }) } +func trySendWakeup(ctx context.Context, wakeupChan chan struct{}) { + if ctx.Err() != nil { + return + } + + select { + case <-ctx.Done(): + case wakeupChan <- struct{}{}: + default: + } +} + func (e *Elector) Start(ctx context.Context) error { ctx, shouldStart, started, stopped := e.StartInit(ctx) if !shouldStart { return nil } - // We'll send to this channel anytime a leader resigns on the key with `name`. - // Buffered to 1 so a send doesn't block if the receiving loop hasn't entered - // its select yet (e.g. between gaining and maintaining leadership). - e.leaderResignedChan = make(chan struct{}, 1) - - // Buffered to 1 so a send from handleLeadershipNotification doesn't block - // if keepLeadershipLoop hasn't entered its select yet. - e.requestResignChan = make(chan struct{}, 1) + // Buffered to 1 so notifications coalesce instead of blocking the elector. + e.wakeupChan = make(chan struct{}, 1) var sub *notifier.Subscription if e.notifier == nil { @@ -175,7 +343,8 @@ func (e *Elector) Start(ctx context.Context) error { } for { - if err := e.attemptGainLeadershipLoop(ctx); err != nil { + term, err := e.runFollowerState(ctx) + if err != nil { // Function above only returns an error if context was cancelled // or overall context is done. if !errors.Is(err, context.Canceled) && ctx.Err() == nil { @@ -184,19 +353,16 @@ func (e *Elector) Start(ctx context.Context) error { return } + e.publishLeadershipState(true) e.Logger.DebugContext(ctx, e.Name+": Gained leadership", "client_id", e.config.ClientID) e.testSignals.GainedLeadership.Signal(struct{}{}) - err := e.keepLeadershipLoop(ctx) + err = e.runLeaderState(ctx, term) if err != nil { if errors.Is(err, context.Canceled) { return } - if errors.Is(err, errLostLeadershipReelection) { - continue // lost leadership reelection; unusual but not a problem; don't log - } - e.Logger.ErrorContext(ctx, e.Name+": Error keeping leadership", "client_id", e.config.ClientID, "err", err) } } @@ -205,13 +371,18 @@ func (e *Elector) Start(ctx context.Context) error { return nil } -func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error { +// runFollowerState is the follower side of the elector state machine. It keeps +// attempting election until this client becomes leader or the elector stops. +func (e *Elector) runFollowerState(ctx context.Context) (leadershipTerm, error) { var attempt int for { attempt++ e.Logger.DebugContext(ctx, e.Name+": Attempting to gain leadership", "client_id", e.config.ClientID) + // Use the local monotonic-bearing clock for the trust window. The + // DB-facing timestamp path stays on NowUTCOrNil below. + attemptStarted := e.Time.Now() - elected, err := attemptElectOrReelect(ctx, e.exec, false, &riverdriver.LeaderElectParams{ + leader, err := attemptElect(ctx, e.exec, &riverdriver.LeaderElectParams{ LeaderID: e.config.ClientID, Now: e.Time.NowOrNil(), Schema: e.config.Schema, @@ -219,16 +390,18 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error { }) if err != nil { if errors.Is(err, context.Canceled) || ctx.Err() != nil { - return err + return leadershipTerm{}, err + } + if !errors.Is(err, rivertype.ErrNotFound) { + sleepDuration := serviceutil.ExponentialBackoff(attempt, serviceutil.MaxAttemptsBeforeResetDefault) + e.Logger.ErrorContext(ctx, e.Name+": Error attempting to elect", e.errorSlogArgs(err, attempt, sleepDuration)...) + serviceutil.CancellableSleep(ctx, sleepDuration) + continue } - - sleepDuration := serviceutil.ExponentialBackoff(attempt, serviceutil.MaxAttemptsBeforeResetDefault) - e.Logger.ErrorContext(ctx, e.Name+": Error attempting to elect", e.errorSlogArgs(err, attempt, sleepDuration)...) - serviceutil.CancellableSleep(ctx, sleepDuration) - continue } - if elected { - return nil + + if leader != nil { + return newLeadershipTerm(leader.LeaderID, leader.ElectedAt, attemptStarted, e.leaderTTL()), nil } attempt = 0 @@ -237,16 +410,12 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error { e.testSignals.DeniedLeadership.Signal(struct{}{}) select { - // TODO: This could potentially leak memory / timers if we're seeing a ton - // of resignations. May want to make this reusable & cancel it when retrying? - // We may also want to consider a specialized ticker utility that can tick - // within a random range. case <-serviceutil.CancellableSleepC(ctx, randutil.DurationBetween(e.config.ElectInterval, e.config.ElectInterval+e.config.ElectIntervalJitter)): if ctx.Err() != nil { // context done - return ctx.Err() + return leadershipTerm{}, ctx.Err() } - case <-e.leaderResignedChan: + case <-e.wakeupChan: // Somebody just resigned, try to win the next election after a very // short random interval (to prevent all clients from bidding at once). serviceutil.CancellableSleep(ctx, randutil.DurationBetween(0, 50*time.Millisecond)) @@ -278,56 +447,28 @@ func (e *Elector) handleLeadershipNotification(ctx context.Context, topic notifi switch notification.Action { case DBNotificationKindRequestResign: - e.mu.Lock() - isLeader := e.isLeader - e.mu.Unlock() - - if !isLeader { + if !e.markPendingRequestResign() { return } - select { - case <-ctx.Done(): - case e.requestResignChan <- struct{}{}: - default: - // if context is not done and requestResignChan has an item in it - // already, do nothing - } + trySendWakeup(ctx, e.wakeupChan) case DBNotificationKindResigned: // If this a resignation from _this_ client, ignore the change. if notification.LeaderID == e.config.ClientID { return } - select { - case <-ctx.Done(): - case e.leaderResignedChan <- struct{}{}: - } + trySendWakeup(ctx, e.wakeupChan) } } -var errLostLeadershipReelection = errors.New("lost leadership with no error") +// runLeaderState is the leader side of the elector state machine. It waits for +// either a reelection interval, a forced resignation, or shutdown. +func (e *Elector) runLeaderState(ctx context.Context, term leadershipTerm) error { + defer e.clearPendingRequestResign() + defer e.publishLeadershipState(false) -func (e *Elector) keepLeadershipLoop(ctx context.Context) error { - // notify all subscribers that we're the leader - e.notifySubscribers(true) - - // On the way out clear any another item that may have been added to - // requestResignChan. Having isLeader set to false will prevent additional - // items from being queued after this one. - defer func() { - select { - case <-e.requestResignChan: - default: - } - }() - - // Defer is LIFO. This will run after the resign below. - // - // This also sets e.isLeader = false. - defer e.notifySubscribers(false) - - var lostLeadership bool + shouldResign := true // Before the elector returns, run a delete with NOTIFY to give up any // leadership that we have. If we do that here, we guarantee that any locks @@ -336,88 +477,96 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error { // // This doesn't use ctx because it runs *after* the ctx is done. defer func() { - if !lostLeadership { - e.attemptResignLoop(ctx) // will resign using WithoutCancel context, but ctx sent for logging + if shouldResign { + e.attemptResignLoop(ctx, term) // will resign using WithoutCancel context, but ctx sent for logging } }() - const maxNumErrors = 5 + timer := time.NewTimer(0) + defer timer.Stop() - var ( - numErrors = 0 - timer = time.NewTimer(0) // reset immediately below - ) - <-timer.C + numErrors := 0 + waitDuration := e.config.ElectInterval for { - timer.Reset(e.config.ElectInterval) + resetTimer(timer, waitDuration) select { case <-ctx.Done(): - if !timer.Stop() { - <-timer.C - } - return ctx.Err() - case <-e.requestResignChan: - // Receive a notification telling current leader to resign. + case <-e.wakeupChan: + if !e.takePendingRequestResign() { + continue + } e.Logger.InfoContext(ctx, e.Name+": Current leader received forced resignation", "client_id", e.config.ClientID) - if !timer.Stop() { - <-timer.C - } - // This client may win leadership again, but drop out of this // function and make it start all over. return nil - case <-e.leaderResignedChan: - // Used only in tests for force an immediately reelect attempt. - // - // This differs from the case above in that it drops through to - // attempting to reelect instead of returning from the function. - - if !timer.Stop() { - <-timer.C - } - case <-timer.C: // Reelect timer expired; attempt reelection below. } e.Logger.DebugContext(ctx, e.Name+": Current leader attempting reelect", "client_id", e.config.ClientID) - reelected, err := attemptElectOrReelect(ctx, e.exec, true, &riverdriver.LeaderElectParams{ - LeaderID: e.config.ClientID, - Now: e.Time.NowOrNil(), - Schema: e.config.Schema, - TTL: e.leaderTTL(), - }) + // Use the local monotonic-bearing clock for the trust window. The + // DB-facing timestamp path stays on NowOrNil below. + attemptStarted := e.Time.Now() + attemptTimeout := term.reelectAttemptTimeout(attemptStarted) + if attemptTimeout <= 0 { + e.Logger.WarnContext(ctx, e.Name+": Current leader stepping down because the reelection deadline elapsed", "client_id", e.config.ClientID) + e.testSignals.LostLeadership.Signal(struct{}{}) + return nil + } + + leader, err := attemptReelectWithTimeout(ctx, e.exec, &riverdriver.LeaderReelectParams{ + ElectedAt: term.electedAt, + LeaderID: term.clientID, + Now: e.Time.NowOrNil(), + Schema: e.config.Schema, + TTL: e.leaderTTL(), + }, attemptTimeout) if err != nil { if errors.Is(err, context.Canceled) { return err } + if errors.Is(err, rivertype.ErrNotFound) { + shouldResign = false + e.testSignals.LostLeadership.Signal(struct{}{}) + return nil + } numErrors++ - if numErrors >= maxNumErrors { - return err + sleepDuration := serviceutil.ExponentialBackoff(numErrors, 3) + remainingDuration := term.remaining(e.Time.Now()) + if remainingDuration <= 0 { + e.Logger.WarnContext(ctx, e.Name+": Current leader stepping down because the reelection deadline elapsed after an error", "client_id", e.config.ClientID) + e.testSignals.LostLeadership.Signal(struct{}{}) + return nil } - sleepDuration := serviceutil.ExponentialBackoff(numErrors, 3) e.Logger.ErrorContext(ctx, e.Name+": Error attempting reelection", e.errorSlogArgs(err, numErrors, sleepDuration)...) + if remainingDuration < sleepDuration { + sleepDuration = remainingDuration + } serviceutil.CancellableSleep(ctx, sleepDuration) + if ctx.Err() != nil { + return ctx.Err() + } + + // Retry immediately after the backoff because the time budget for this + // lease has already been reduced by the failed attempt above. + waitDuration = 0 continue } - if !reelected { - lostLeadership = true - e.testSignals.LostLeadership.Signal(struct{}{}) - return errLostLeadershipReelection - } numErrors = 0 + term = newLeadershipTerm(leader.LeaderID, leader.ElectedAt, attemptStarted, e.leaderTTL()) e.testSignals.MaintainedLeadership.Signal(struct{}{}) + waitDuration = e.config.ElectInterval } } @@ -427,7 +576,7 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error { // makes use of a background context to try and guarantee that leadership is // always surrendered in a timely manner so it can be picked up quickly by // another client, even in the event of a cancellation. -func (e *Elector) attemptResignLoop(ctx context.Context) { +func (e *Elector) attemptResignLoop(ctx context.Context, term leadershipTerm) { e.Logger.DebugContext(ctx, e.Name+": Attempting to resign leadership", "client_id", e.config.ClientID) // Make a good faith attempt to resign, even in the presence of errors, but @@ -442,7 +591,7 @@ func (e *Elector) attemptResignLoop(ctx context.Context) { ctx = context.WithoutCancel(ctx) for attempt := 1; attempt <= maxNumErrors; attempt++ { - if err := e.attemptResign(ctx, attempt); err != nil { + if err := e.attemptResign(ctx, attempt, term); err != nil { sleepDuration := serviceutil.ExponentialBackoff(attempt, maxNumErrors) e.Logger.ErrorContext(ctx, e.Name+": Error attempting to resign", e.errorSlogArgs(err, attempt, sleepDuration)...) serviceutil.CancellableSleep(ctx, sleepDuration) @@ -456,7 +605,7 @@ func (e *Elector) attemptResignLoop(ctx context.Context) { // attemptResign attempts to resign any currently held leaderships for the // elector's name and leader ID. -func (e *Elector) attemptResign(ctx context.Context, attempt int) error { +func (e *Elector) attemptResign(ctx context.Context, attempt int, term leadershipTerm) error { // Wait one second longer each time we try to resign: timeout := time.Duration(attempt) * time.Second @@ -464,7 +613,8 @@ func (e *Elector) attemptResign(ctx context.Context, attempt int) error { defer cancel() resigned, err := e.exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{ - LeaderID: e.config.ClientID, + ElectedAt: term.electedAt, + LeaderID: term.clientID, LeadershipTopic: string(notifier.NotificationTopicLeadership), Schema: e.config.Schema, }) @@ -496,8 +646,8 @@ func (e *Elector) errorSlogArgs(err error, attempt int, sleepDuration time.Durat func (e *Elector) Listen() *Subscription { sub := &Subscription{ creationTime: time.Now().UTC(), - ch: make(chan *Notification, 1), e: e, + relay: newSubscriptionRelay(), unlistenOnce: &sync.Once{}, } @@ -508,7 +658,7 @@ func (e *Elector) Listen() *Subscription { IsLeader: e.isLeader, Timestamp: sub.creationTime, } - sub.ch <- initialNotification + sub.enqueue(initialNotification) e.subscriptions = append(e.subscriptions, sub) return sub @@ -519,6 +669,8 @@ func (e *Elector) unlisten(sub *Subscription) { if !success { panic("BUG: tried to unlisten for subscription not in list") } + + sub.stop() } // needs to be in a separate method so the defer will cleanly unlock the mutex, @@ -528,7 +680,7 @@ func (e *Elector) tryUnlisten(sub *Subscription) bool { defer e.mu.Unlock() for i, s := range e.subscriptions { - if s.creationTime.Equal(sub.creationTime) { + if s == sub { e.subscriptions = append(e.subscriptions[:i], e.subscriptions[i+1:]...) return true } @@ -543,33 +695,79 @@ func (e *Elector) leaderTTL() time.Duration { return e.config.ElectInterval + electIntervalTTLPaddingDefault } -func (e *Elector) notifySubscribers(isLeader bool) { +func (e *Elector) markPendingRequestResign() bool { + e.mu.Lock() + defer e.mu.Unlock() + + if !e.isLeader { + return false + } + + e.pendingRequestResign = true + return true +} + +func (e *Elector) publishLeadershipState(isLeader bool) { notifyTime := time.Now().UTC() e.mu.Lock() defer e.mu.Unlock() e.isLeader = isLeader + if !isLeader { + e.pendingRequestResign = false + } + + notification := &Notification{ + IsLeader: isLeader, + Timestamp: notifyTime, + } for _, s := range e.subscriptions { - s.ch <- &Notification{ - IsLeader: isLeader, - Timestamp: notifyTime, - } + s.enqueue(notification) } } +func (e *Elector) clearPendingRequestResign() { + e.mu.Lock() + defer e.mu.Unlock() + + e.pendingRequestResign = false +} + +func (e *Elector) takePendingRequestResign() bool { + e.mu.Lock() + defer e.mu.Unlock() + + if !e.pendingRequestResign { + return false + } + + e.pendingRequestResign = false + return true +} + const deadlineTimeout = 5 * time.Second -// attemptElectOrReelect attempts to elect a leader for the given name. The -// bool alreadyElected indicates whether this is a potential reelection of -// an already-elected leader. If the election is successful because there is -// no leader or the previous leader expired, the provided leaderID will be -// set as the new leader with a TTL of ttl. -// -// Returns whether this leader was successfully elected or an error if one -// occurred. -func attemptElectOrReelect(ctx context.Context, exec riverdriver.Executor, alreadyElected bool, params *riverdriver.LeaderElectParams) (bool, error) { - ctx, cancel := context.WithTimeout(ctx, deadlineTimeout) +func resetTimer(timer *time.Timer, duration time.Duration) { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + + timer.Reset(duration) +} + +// attemptElect attempts to elect a leader for the given name. If there is no +// current leader or the previous leader expired, the provided leader ID is set +// as the new leader with a TTL of `params.TTL`. +func attemptElect(ctx context.Context, exec riverdriver.Executor, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) { + return attemptElectWithTimeout(ctx, exec, params, deadlineTimeout) +} + +func attemptElectWithTimeout(ctx context.Context, exec riverdriver.Executor, params *riverdriver.LeaderElectParams, timeout time.Duration) (*riverdriver.Leader, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() execTx, err := exec.Begin(ctx) @@ -579,7 +777,7 @@ func attemptElectOrReelect(ctx context.Context, exec riverdriver.Executor, alrea additionalDetail = " (a common cause of this is a database pool that's at its connection limit; you may need to increase maximum connections)" } - return false, fmt.Errorf("error beginning transaction: %w%s", err, additionalDetail) + return nil, fmt.Errorf("error beginning transaction: %w%s", err, additionalDetail) } defer dbutil.RollbackWithoutCancel(ctx, execTx) @@ -587,22 +785,26 @@ func attemptElectOrReelect(ctx context.Context, exec riverdriver.Executor, alrea Now: params.Now, Schema: params.Schema, }); err != nil { - return false, err + return nil, err } - var elected bool - if alreadyElected { - elected, err = execTx.LeaderAttemptReelect(ctx, params) - } else { - elected, err = execTx.LeaderAttemptElect(ctx, params) + leader, err := execTx.LeaderAttemptElect(ctx, params) + if err != nil && !errors.Is(err, rivertype.ErrNotFound) { + return nil, err + } + if err := execTx.Commit(ctx); err != nil { + return nil, fmt.Errorf("error committing transaction: %w", err) } if err != nil { - return false, err + return nil, err } - if err := execTx.Commit(ctx); err != nil { - return false, fmt.Errorf("error committing transaction: %w", err) - } + return leader, nil +} + +func attemptReelectWithTimeout(ctx context.Context, exec riverdriver.Executor, params *riverdriver.LeaderReelectParams, timeout time.Duration) (*riverdriver.Leader, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() - return elected, nil + return exec.LeaderAttemptReelect(ctx, params) } diff --git a/internal/leadership/elector_test.go b/internal/leadership/elector_test.go index 481471b8..f0255063 100644 --- a/internal/leadership/elector_test.go +++ b/internal/leadership/elector_test.go @@ -3,12 +3,11 @@ package leadership import ( "context" "encoding/json" - "log/slog" + "errors" "testing" "time" "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/require" "github.com/riverqueue/river/internal/notifier" @@ -20,10 +19,74 @@ import ( "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/startstoptest" "github.com/riverqueue/river/rivershared/testfactory" + "github.com/riverqueue/river/rivershared/util/dbutil" "github.com/riverqueue/river/rivershared/util/ptrutil" "github.com/riverqueue/river/rivertype" ) +type leaderAttemptScriptExecutorMock struct { + riverdriver.Executor + + LeaderAttemptElectFunc func(ctx context.Context, execTx riverdriver.ExecutorTx, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) +} + +func (m *leaderAttemptScriptExecutorMock) Begin(ctx context.Context) (riverdriver.ExecutorTx, error) { + tx, err := m.Executor.Begin(ctx) + if err != nil { + return nil, err + } + + return &leaderAttemptScriptExecutorTxMock{ + ExecutorTx: tx, + mock: m, + }, nil +} + +type leaderAttemptScriptExecutorTxMock struct { + riverdriver.ExecutorTx + + mock *leaderAttemptScriptExecutorMock +} + +func (m *leaderAttemptScriptExecutorTxMock) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) { + if m.mock.LeaderAttemptElectFunc == nil { + return m.ExecutorTx.LeaderAttemptElect(ctx, params) + } + + return m.mock.LeaderAttemptElectFunc(ctx, m.ExecutorTx, params) +} + +type leaderReelectExecutorMock struct { + riverdriver.Executor + + LeaderAttemptReelectFunc func(ctx context.Context, params *riverdriver.LeaderReelectParams) (*riverdriver.Leader, error) +} + +func (m *leaderReelectExecutorMock) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderReelectParams) (*riverdriver.Leader, error) { + if m.LeaderAttemptReelectFunc == nil { + return m.Executor.LeaderAttemptReelect(ctx, params) + } + + return m.LeaderAttemptReelectFunc(ctx, params) +} + +type localNowTimeGeneratorStub struct { + now time.Time +} + +func (g *localNowTimeGeneratorStub) Now() time.Time { + return g.now +} + +func (g *localNowTimeGeneratorStub) NowOrNil() *time.Time { + return nil +} + +func (g *localNowTimeGeneratorStub) StubNow(now time.Time) time.Time { + g.now = now + return now +} + func TestElector_PollOnly(t *testing.T) { t.Parallel() @@ -40,15 +103,13 @@ func TestElector_PollOnly(t *testing.T) { func(ctx context.Context, t *testing.T, stress bool) *electorBundle { t.Helper() - tx := riverdbtest.TestTxPgx(ctx, t) + tx, _ := riverdbtest.TestTxPgxDriver(ctx, t, riverpgxv5.New(riversharedtest.DBPool(ctx, t)), &riverdbtest.TestTxOpts{ + DisableSchemaSharing: true, + }) - // We'll put multiple electors on one transaction. Make sure they can - // live with each other in relative harmony. tx = sharedtx.NewSharedTx(tx) - return &electorBundle{ - tx: tx, - } + return &electorBundle{tx: tx} }, func(t *testing.T, electorBundle *electorBundle) *Elector { t.Helper() @@ -57,11 +118,334 @@ func TestElector_PollOnly(t *testing.T) { riversharedtest.BaseServiceArchetype(t), driver.UnwrapExecutor(electorBundle.tx), nil, - &Config{ - ClientID: "test_client_id", - }, + &Config{ClientID: "test_client_id"}, ) + }, + ) +} + +func TestElectorHandleLeadershipNotification(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + driver *riverpgxv5.Driver + tx pgx.Tx + } + + setup := func(t *testing.T) (*Elector, *testBundle) { + t.Helper() + + tx := riverdbtest.TestTxPgx(ctx, t) + driver := riverpgxv5.New(nil) + + elector := NewElector( + riversharedtest.BaseServiceArchetype(t), + driver.UnwrapExecutor(tx), + nil, + &Config{ClientID: "test_client_id"}, + ) + elector.wakeupChan = make(chan struct{}, 1) + + return elector, &testBundle{ + driver: driver, + tx: tx, + } + } + + mustMarshalJSON := func(t *testing.T, val any) []byte { + t.Helper() + + data, err := json.Marshal(val) + require.NoError(t, err) + return data + } + + validLeadershipChange := func() *DBNotification { + t.Helper() + + return &DBNotification{ + Action: DBNotificationKindResigned, + LeaderID: "other-client-id", + } + } + + t.Run("IgnoresNonResignedAction", func(t *testing.T) { + t.Parallel() + + elector, _ := setup(t) + + change := validLeadershipChange() + change.Action = "not_resigned" + + elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, change))) + + require.Empty(t, elector.wakeupChan) + }) + + t.Run("IgnoresSameClientID", func(t *testing.T) { + t.Parallel() + + elector, _ := setup(t) + + change := validLeadershipChange() + change.LeaderID = elector.config.ClientID + + elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, change))) + + require.Empty(t, elector.wakeupChan) + }) + + t.Run("SignalsLeadershipChange", func(t *testing.T) { + t.Parallel() + + elector, _ := setup(t) + + elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, validLeadershipChange()))) + + riversharedtest.WaitOrTimeout(t, elector.wakeupChan) + }) + + t.Run("SignalsLeadershipChangeDoesNotBlockOnFullWakeup", func(t *testing.T) { + t.Parallel() + + elector, _ := setup(t) + elector.wakeupChan <- struct{}{} + + done := make(chan struct{}) + + go func() { + defer close(done) + elector.handleLeadershipNotification(context.Background(), notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, validLeadershipChange()))) + }() + + select { + case <-done: + case <-time.After(100 * time.Millisecond): + require.Fail(t, "expected leadership notification to coalesce the wakeup instead of blocking") + } + + require.Len(t, elector.wakeupChan, 1) + }) + + t.Run("StopsOnContextDone", func(t *testing.T) { + t.Parallel() + + elector, _ := setup(t) + + ctx, cancel := context.WithCancel(ctx) + cancel() + + elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, validLeadershipChange()))) + + require.Empty(t, elector.wakeupChan) + }) +} + +func TestElectorRunLeaderState(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + setup := func(t *testing.T) (*Elector, riverdriver.Executor) { + t.Helper() + + driver := riverpgxv5.New(nil) + exec := driver.UnwrapExecutor(riverdbtest.TestTxPgx(ctx, t)) + + elector := NewElector( + riversharedtest.BaseServiceArchetype(t), + exec, + nil, + &Config{ClientID: "test_client_id"}, + ) + elector.config.ElectInterval = 10 * time.Millisecond + elector.config.ElectIntervalJitter = time.Millisecond + elector.testSignals.Init(t) + + return elector, exec + } + + t.Run("ResignsCurrentTermAfterReelectErrorsExhaustTrust", func(t *testing.T) { + t.Parallel() + + elector, exec := setup(t) + initialNow := elector.Time.StubNow(time.Now().UTC()) + elector.publishLeadershipState(true) + + leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + ElectedAt: ptrutil.Ptr(initialNow), + ExpiresAt: ptrutil.Ptr(initialNow.Add(elector.leaderTTL())), + LeaderID: ptrutil.Ptr(elector.config.ClientID), }) + + elector.exec = &leaderReelectExecutorMock{ + Executor: exec, + LeaderAttemptReelectFunc: func(ctx context.Context, params *riverdriver.LeaderReelectParams) (*riverdriver.Leader, error) { + elector.Time.StubNow(initialNow.Add(elector.leaderTTL())) + return nil, errors.New("reelection error") + }, + } + + runCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- elector.runLeaderState(runCtx, newLeadershipTerm(elector.config.ClientID, leader.ElectedAt, initialNow, elector.leaderTTL())) + }() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(time.Second): + require.Fail(t, "timed out waiting for leader state to exit") + } + + elector.testSignals.LostLeadership.WaitOrTimeout() + elector.testSignals.ResignedLeadership.WaitOrTimeout() + + _, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + }) + + t.Run("SlowSuccessfulReelectDoesNotExtendTrustWindow", func(t *testing.T) { + t.Parallel() + + elector, exec := setup(t) + initialNow := elector.Time.StubNow(time.Now().UTC()) + elector.publishLeadershipState(true) + + leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + ElectedAt: ptrutil.Ptr(initialNow), + ExpiresAt: ptrutil.Ptr(initialNow.Add(elector.leaderTTL())), + LeaderID: ptrutil.Ptr(elector.config.ClientID), + }) + + var numAttempts int + elector.exec = &leaderReelectExecutorMock{ + Executor: exec, + LeaderAttemptReelectFunc: func(ctx context.Context, params *riverdriver.LeaderReelectParams) (*riverdriver.Leader, error) { + numAttempts++ + + switch numAttempts { + case 1: + elector.Time.StubNow(initialNow.Add(elector.leaderTTL())) + return exec.LeaderAttemptReelect(ctx, params) + default: + return nil, errors.New("unexpected reelection attempt after trust window elapsed") + } + }, + } + + runCtx, cancel := context.WithTimeout(ctx, 250*time.Millisecond) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- elector.runLeaderState(runCtx, newLeadershipTerm(elector.config.ClientID, leader.ElectedAt, initialNow, elector.leaderTTL())) + }() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(time.Second): + require.Fail(t, "timed out waiting for leader state to exit") + } + + require.Equal(t, 1, numAttempts) + elector.testSignals.LostLeadership.WaitOrTimeout() + elector.testSignals.ResignedLeadership.WaitOrTimeout() + + _, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + }) + + t.Run("UsesNowForLocalDeadlineChecks", func(t *testing.T) { + t.Parallel() + + driver := riverpgxv5.New(nil) + exec := driver.UnwrapExecutor(riverdbtest.TestTxPgx(ctx, t)) + + timeGenerator := &localNowTimeGeneratorStub{now: time.Now()} + archetype := riversharedtest.BaseServiceArchetype(t) + archetype.Time = timeGenerator + + elector := NewElector( + archetype, + &leaderReelectExecutorMock{ + Executor: exec, + LeaderAttemptReelectFunc: func(ctx context.Context, params *riverdriver.LeaderReelectParams) (*riverdriver.Leader, error) { + require.Fail(t, "unexpected reelection attempt") + panic("unreachable") + }, + }, + nil, + &Config{ClientID: "test_client_id"}, + ) + elector.config.ElectInterval = 10 * time.Millisecond + elector.testSignals.Init(t) + + elector.publishLeadershipState(true) + + runCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + err := elector.runLeaderState(runCtx, leadershipTerm{ + clientID: elector.config.ClientID, + electedAt: time.Now(), + trustedUntil: timeGenerator.now.Add(-time.Millisecond), + }) + require.NoError(t, err) + + elector.testSignals.LostLeadership.WaitOrTimeout() + }) +} + +func TestElectorSubscriptions(t *testing.T) { + t.Parallel() + + setup := func(t *testing.T) *Elector { + t.Helper() + + return NewElector( + riversharedtest.BaseServiceArchetype(t), + nil, + nil, + &Config{ClientID: "test_client_id"}, + ) + } + + t.Run("SlowSubscribersDoNotBlockAndReceiveTransitionsInOrder", func(t *testing.T) { + t.Parallel() + + elector := setup(t) + sub := elector.Listen() + t.Cleanup(sub.Unlisten) + + done := make(chan struct{}) + go func() { + defer close(done) + elector.publishLeadershipState(true) + elector.publishLeadershipState(false) + }() + + select { + case <-done: + case <-time.After(100 * time.Millisecond): + require.Fail(t, "expected leadership publication to queue without blocking") + } + + notification := riversharedtest.WaitOrTimeout(t, sub.C()) + require.False(t, notification.IsLeader) + + notification = riversharedtest.WaitOrTimeout(t, sub.C()) + require.True(t, notification.IsLeader) + + notification = riversharedtest.WaitOrTimeout(t, sub.C()) + require.False(t, notification.IsLeader) + }) } func TestElector_WithNotifier(t *testing.T) { @@ -80,12 +464,7 @@ func TestElector_WithNotifier(t *testing.T) { func(ctx context.Context, t *testing.T, stress bool) *electorBundle { t.Helper() - var dbPool *pgxpool.Pool - if stress { - dbPool = riversharedtest.DBPoolClone(ctx, t) - } else { - dbPool = riversharedtest.DBPool(ctx, t) - } + dbPool := riversharedtest.DBPoolClone(ctx, t) var ( driver = riverpgxv5.New(dbPool) @@ -93,16 +472,14 @@ func TestElector_WithNotifier(t *testing.T) { archetype = riversharedtest.BaseServiceArchetype(t) ) - notifier := notifier.New(archetype, driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema})) - { - require.NoError(t, notifier.Start(ctx)) - t.Cleanup(notifier.Stop) - } + notifierSvc := notifier.New(archetype, driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema})) + require.NoError(t, notifierSvc.Start(ctx)) + t.Cleanup(notifierSvc.Stop) return &electorBundle{ archetype: archetype, exec: driver.GetExecutor(), - notifier: notifier, + notifier: notifierSvc, schema: schema, } }, @@ -118,12 +495,10 @@ func TestElector_WithNotifier(t *testing.T) { Schema: electorBundle.schema, }, ) - }) + }, + ) } -// This system of "elector bundles" may appear to be a little convoluted, but -// it's built so that we can initialize multiple electors against a single -// database or transaction. func testElector[TElectorBundle any]( ctx context.Context, t *testing.T, @@ -149,7 +524,6 @@ func testElector[TElectorBundle any]( } electorBundle := makeElectorBundle(ctx, t, opts.stress) - elector := makeElector(t, electorBundle) elector.testSignals.Init(t) @@ -161,187 +535,247 @@ func testElector[TElectorBundle any]( startElector := func(ctx context.Context, t *testing.T, elector *Elector) { t.Helper() - t.Logf("Starting %s", elector.config.ClientID) + require.NoError(t, elector.Start(ctx)) t.Cleanup(elector.Stop) } - t.Run("StartsGainsLeadershipAndStops", func(t *testing.T) { - t.Parallel() - - elector, bundle := setup(t, nil) - - startElector(ctx, t, elector) - - elector.testSignals.GainedLeadership.WaitOrTimeout() + signalLeaderResigned := func(ctx context.Context, t *testing.T, elector *Elector, leaderID string) { + t.Helper() - leader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ - Schema: elector.config.Schema, + payload, err := json.Marshal(DBNotification{ + Action: DBNotificationKindResigned, + LeaderID: leaderID, }) require.NoError(t, err) - require.Equal(t, elector.config.ClientID, leader.LeaderID) - elector.Stop() + elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(payload)) + } - elector.testSignals.ResignedLeadership.WaitOrTimeout() + signalRequestResign := func(ctx context.Context, t *testing.T, elector *Elector) { + t.Helper() + + payload, err := json.Marshal(DBNotification{Action: DBNotificationKindRequestResign}) + require.NoError(t, err) - _, err = bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ - Schema: elector.config.Schema, - }) - require.ErrorIs(t, err, rivertype.ErrNotFound) - }) + elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(payload)) + } - t.Run("NotifiesSubscribers", func(t *testing.T) { + t.Run("CoalescesResignedWakeups", func(t *testing.T) { t.Parallel() elector, _ := setup(t, nil) + elector.config.ElectInterval = 100 * time.Millisecond + elector.config.ElectIntervalJitter = 10 * time.Millisecond + elector.wakeupChan = make(chan struct{}, 1) - sub := elector.Listen() - t.Cleanup(func() { elector.unlisten(sub) }) + var ( + attempt int + attemptTimes []time.Time + ) - // Drain an initial notification that occurs on Listen. - notification := riversharedtest.WaitOrTimeout(t, sub.ch) - require.False(t, notification.IsLeader) + elector.exec = &leaderAttemptScriptExecutorMock{ + Executor: elector.exec, + LeaderAttemptElectFunc: func(ctx context.Context, execTx riverdriver.ExecutorTx, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) { + attempt++ + attemptTimes = append(attemptTimes, time.Now()) + + switch attempt { + case 1, 2: + return nil, rivertype.ErrNotFound + case 3: + return execTx.LeaderInsert(ctx, &riverdriver.LeaderInsertParams{ + LeaderID: params.LeaderID, + Now: params.Now, + Schema: params.Schema, + TTL: params.TTL, + }) + default: + require.FailNowf(t, "unexpected election attempt", "attempt %d", attempt) + panic("unreachable") + } + }, + } - startElector(ctx, t, elector) + signalLeaderResigned(ctx, t, elector, "leader-1") - elector.testSignals.GainedLeadership.WaitOrTimeout() + secondNotificationDone := make(chan struct{}) + go func() { + defer close(secondNotificationDone) + signalLeaderResigned(ctx, t, elector, "leader-2") + }() - notification = riversharedtest.WaitOrTimeout(t, sub.ch) - require.True(t, notification.IsLeader) + select { + case <-secondNotificationDone: + case <-time.After(50 * time.Millisecond): + require.Fail(t, "expected second resignation notification to be coalesced instead of blocking") + } - elector.Stop() + attemptCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() - elector.testSignals.ResignedLeadership.WaitOrTimeout() + start := time.Now() + term, err := elector.runFollowerState(attemptCtx) + require.NoError(t, err) + require.Equal(t, elector.config.ClientID, term.clientID) + require.False(t, term.electedAt.IsZero()) + require.GreaterOrEqual(t, time.Since(start), 75*time.Millisecond) - notification = riversharedtest.WaitOrTimeout(t, sub.ch) - require.False(t, notification.IsLeader) + require.Equal(t, 3, attempt) + require.Len(t, attemptTimes, 3) + require.GreaterOrEqual(t, attemptTimes[2].Sub(attemptTimes[1]), 75*time.Millisecond) }) - t.Run("SustainsLeadership", func(t *testing.T) { + t.Run("CompetingElectors", func(t *testing.T) { t.Parallel() - elector, _ := setup(t, nil) + elector1, bundle := setup(t, nil) + elector1.config.ClientID = "elector1" - startElector(ctx, t, elector) + startElector(ctx, t, elector1) + elector1.testSignals.GainedLeadership.WaitOrTimeout() - elector.testSignals.GainedLeadership.WaitOrTimeout() + leader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ + Schema: elector1.config.Schema, + }) + require.NoError(t, err) + require.Equal(t, elector1.config.ClientID, leader.LeaderID) - // The leadership maintenance loop also listens on the leadership - // notification channel. Take advantage of that to cause an - // immediate reelect attempt with no sleep. - elector.leaderResignedChan <- struct{}{} - elector.testSignals.MaintainedLeadership.WaitOrTimeout() + elector2 := makeElector(t, bundle.electorBundle) + elector2.config.ClientID = "elector2" + elector2.config.ElectInterval = 10 * time.Millisecond + elector2.config.ElectIntervalJitter = time.Millisecond + elector2.exec = elector1.exec + elector2.testSignals.Init(t) - elector.leaderResignedChan <- struct{}{} - elector.testSignals.MaintainedLeadership.WaitOrTimeout() + startElector(ctx, t, elector2) - elector.leaderResignedChan <- struct{}{} - elector.testSignals.MaintainedLeadership.WaitOrTimeout() + elector2.testSignals.DeniedLeadership.WaitOrTimeout() - elector.Stop() + elector1.Stop() + elector1.testSignals.ResignedLeadership.WaitOrTimeout() - elector.testSignals.ResignedLeadership.WaitOrTimeout() + elector2.testSignals.GainedLeadership.WaitOrTimeout() + + elector2.Stop() + elector2.testSignals.ResignedLeadership.WaitOrTimeout() + + _, err = bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ + Schema: elector1.config.Schema, + }) + require.ErrorIs(t, err, rivertype.ErrNotFound) }) - t.Run("LosesLeadership", func(t *testing.T) { + t.Run("IndependentBundlesAreIsolated", func(t *testing.T) { t.Parallel() - elector, bundle := setup(t, nil) + elector1, bundle1 := setup(t, nil) + elector1.config.ClientID = "elector1" - startElector(ctx, t, elector) + elector2, bundle2 := setup(t, nil) + elector2.config.ClientID = "elector2" - elector.testSignals.GainedLeadership.WaitOrTimeout() + startElector(ctx, t, elector1) + startElector(ctx, t, elector2) - t.Logf("Force resigning %s", elector.config.ClientID) + elector1.testSignals.GainedLeadership.WaitOrTimeout() + elector2.testSignals.GainedLeadership.WaitOrTimeout() - // Artificially force resign the elector and add a new leader record - // so that it can't be elected again. - _, err := bundle.exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{ - LeaderID: elector.config.ClientID, - LeadershipTopic: string(notifier.NotificationTopicLeadership), - Schema: elector.config.Schema, + leader1, err := bundle1.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ + Schema: elector1.config.Schema, }) require.NoError(t, err) + require.Equal(t, elector1.config.ClientID, leader1.LeaderID) - _ = testfactory.Leader(ctx, t, bundle.exec, &testfactory.LeaderOpts{ - LeaderID: ptrutil.Ptr("other-client-id"), - Schema: elector.config.Schema, + leader2, err := bundle2.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ + Schema: elector2.config.Schema, }) - - elector.leaderResignedChan <- struct{}{} - elector.testSignals.LostLeadership.WaitOrTimeout() - - // Wait for the elector to try and fail to gain leadership so we - // don't finish the test while it's still operating. - elector.testSignals.DeniedLeadership.WaitOrTimeout() - - elector.Stop() + require.NoError(t, err) + require.Equal(t, elector2.config.ClientID, leader2.LeaderID) }) - t.Run("CompetingElectors", func(t *testing.T) { + t.Run("LosesLeadershipWhenSameLeaderIDTermIsReplaced", func(t *testing.T) { t.Parallel() - elector1, bundle := setup(t, nil) - elector1.config.ClientID = "elector1" - - { - startElector(ctx, t, elector1) - - // next to avoid any raciness. - t.Logf("Waiting for %s to gain leadership", elector1.config.ClientID) - elector1.testSignals.GainedLeadership.WaitOrTimeout() + elector, bundle := setup(t, nil) + elector.config.ElectInterval = 25 * time.Millisecond + elector.config.ElectIntervalJitter = time.Millisecond - leader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ - Schema: elector1.config.Schema, - }) - require.NoError(t, err) - require.Equal(t, elector1.config.ClientID, leader.LeaderID) - } + sub := elector.Listen() + t.Cleanup(sub.Unlisten) - // Make another elector and make sure it's using the same executor. - elector2 := makeElector(t, bundle.electorBundle) - elector2.config.ClientID = "elector2" - elector2.exec = elector1.exec - elector2.testSignals.Init(t) + require.False(t, riversharedtest.WaitOrTimeout(t, sub.C()).IsLeader) - { - startElector(ctx, t, elector2) + startElector(ctx, t, elector) + elector.testSignals.GainedLeadership.WaitOrTimeout() + require.True(t, riversharedtest.WaitOrTimeout(t, sub.C()).IsLeader) - elector2.testSignals.DeniedLeadership.WaitOrTimeout() + leader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ + Schema: elector.config.Schema, + }) + require.NoError(t, err) - t.Logf("Stopping %s", elector1.config.ClientID) - elector1.Stop() - elector1.testSignals.ResignedLeadership.WaitOrTimeout() + newElectedAt := leader.ElectedAt.Add(time.Second) + newExpiresAt := newElectedAt.Add(elector.leaderTTL()) - // Cheat if we're in poll only by notifying leadership channel to - // wake the elector from sleep. - if elector2.notifier == nil { - elector2.leaderResignedChan <- struct{}{} + require.NoError(t, dbutil.WithTx(ctx, bundle.exec, func(ctx context.Context, execTx riverdriver.ExecutorTx) error { + resigned, err := execTx.LeaderResign(ctx, &riverdriver.LeaderResignParams{ + ElectedAt: leader.ElectedAt, + LeaderID: leader.LeaderID, + LeadershipTopic: string(notifier.NotificationTopicLeadership), + Schema: elector.config.Schema, + }) + if err != nil { + return err + } + if !resigned { + return errors.New("expected leader replacement to resign current term") } - t.Logf("Waiting for %s to gain leadership", elector2.config.ClientID) - elector2.testSignals.GainedLeadership.WaitOrTimeout() + _, err = execTx.LeaderInsert(ctx, &riverdriver.LeaderInsertParams{ + ElectedAt: &newElectedAt, + ExpiresAt: &newExpiresAt, + LeaderID: leader.LeaderID, + Schema: elector.config.Schema, + TTL: elector.leaderTTL(), + }) + return err + })) - t.Logf("Stopping %s", elector2.config.ClientID) - elector2.Stop() - elector2.testSignals.ResignedLeadership.WaitOrTimeout() - } + elector.testSignals.LostLeadership.WaitOrTimeout() + require.False(t, riversharedtest.WaitOrTimeout(t, sub.C()).IsLeader) + elector.testSignals.DeniedLeadership.WaitOrTimeout() - _, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ - Schema: elector1.config.Schema, + leaderFromDB, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ + Schema: elector.config.Schema, }) - require.ErrorIs(t, err, rivertype.ErrNotFound) + require.NoError(t, err) + require.Equal(t, leader.LeaderID, leaderFromDB.LeaderID) + require.Equal(t, newElectedAt, leaderFromDB.ElectedAt) }) - t.Run("StartStopStress", func(t *testing.T) { + t.Run("NotifiesSubscribers", func(t *testing.T) { t.Parallel() - elector, _ := setup(t, &testOpts{stress: true}) - elector.Logger = riversharedtest.LoggerWarn(t) // loop started/stop log is very noisy; suppress - elector.testSignals = electorTestSignals{} // deinit so channels don't fill + elector, _ := setup(t, nil) - startstoptest.Stress(ctx, t, elector) + sub := elector.Listen() + t.Cleanup(sub.Unlisten) + + notification := riversharedtest.WaitOrTimeout(t, sub.C()) + require.False(t, notification.IsLeader) + + startElector(ctx, t, elector) + elector.testSignals.GainedLeadership.WaitOrTimeout() + + notification = riversharedtest.WaitOrTimeout(t, sub.C()) + require.True(t, notification.IsLeader) + + elector.Stop() + elector.testSignals.ResignedLeadership.WaitOrTimeout() + + notification = riversharedtest.WaitOrTimeout(t, sub.C()) + require.False(t, notification.IsLeader) }) t.Run("RequestResignImmediatelyAfterElection", func(t *testing.T) { @@ -350,20 +784,9 @@ func testElector[TElectorBundle any]( elector, _ := setup(t, nil) startElector(ctx, t, elector) - elector.testSignals.GainedLeadership.WaitOrTimeout() - // Send a resign request immediately after gaining leadership. - // GainedLeadership is signaled _before_ keepLeadershipLoop is - // entered, so the resign request arrives before the loop's - // select. This only works if requestResignChan is buffered; - // with an unbuffered channel the send would be dropped by the - // default case since nobody is receiving yet. - payload, err := json.Marshal(DBNotification{ - Action: DBNotificationKindRequestResign, - }) - require.NoError(t, err) - elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(payload)) + signalRequestResign(ctx, t, elector) elector.testSignals.ResignedLeadership.WaitOrTimeout() elector.testSignals.GainedLeadership.WaitOrTimeout() @@ -375,7 +798,6 @@ func testElector[TElectorBundle any]( elector, bundle := setup(t, nil) startElector(ctx, t, elector) - elector.testSignals.GainedLeadership.WaitOrTimeout() leader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ @@ -384,21 +806,8 @@ func testElector[TElectorBundle any]( require.NoError(t, err) require.Equal(t, elector.config.ClientID, leader.LeaderID) - payload, err := json.Marshal(DBNotification{ - Action: DBNotificationKindRequestResign, - }) - require.NoError(t, err) - - // Send a bunch of resign requests consecutively. Only one will have an - // effect as the channel's size is only 1, and any extra items in it - // will be drained as the leader resigns, making it start empty the next - // time the leader starts up again. - // - // Wait for the elector to resign leadership, gain leadership again, - // then stop the elector and wait for it to resign one more time. for range 5 { - t.Log("Requesting leadership resign") - elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(payload)) + signalRequestResign(ctx, t, elector) } elector.testSignals.ResignedLeadership.WaitOrTimeout() elector.testSignals.GainedLeadership.WaitOrTimeout() @@ -411,208 +820,73 @@ func testElector[TElectorBundle any]( }) require.ErrorIs(t, err, rivertype.ErrNotFound) }) -} - -func TestAttemptElectOrReelect(t *testing.T) { - t.Parallel() - - const ( - clientID = "client-id" - leaderInstanceName = "default" - leaderTTL = 10 * time.Second - ) - - ctx := context.Background() - - type testBundle struct { - exec riverdriver.Executor - logger *slog.Logger - } - - setup := func(t *testing.T) *testBundle { - t.Helper() - - driver := riverpgxv5.New(nil) - - return &testBundle{ - exec: driver.UnwrapExecutor(riverdbtest.TestTxPgx(ctx, t)), - logger: riversharedtest.Logger(t), - } - } - t.Run("ElectsLeader", func(t *testing.T) { + t.Run("RequestResignWhileLeader", func(t *testing.T) { t.Parallel() - bundle := setup(t) - - elected, err := attemptElectOrReelect(ctx, bundle.exec, false, &riverdriver.LeaderElectParams{ - LeaderID: clientID, - TTL: leaderTTL, - Schema: "", - }) - require.NoError(t, err) - require.True(t, elected) // won election - - leader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ - Schema: "", - }) - require.NoError(t, err) - require.WithinDuration(t, time.Now(), leader.ElectedAt, 1*time.Second) - require.WithinDuration(t, time.Now().Add(leaderTTL), leader.ExpiresAt, 1*time.Second) - }) - - t.Run("ReelectsSameLeader", func(t *testing.T) { - t.Parallel() + elector, _ := setup(t, nil) + elector.config.ElectInterval = 10 * time.Millisecond + elector.config.ElectIntervalJitter = time.Millisecond - bundle := setup(t) + startElector(ctx, t, elector) - leader := testfactory.Leader(ctx, t, bundle.exec, &testfactory.LeaderOpts{ - LeaderID: ptrutil.Ptr(clientID), - Schema: "", - }) + elector.testSignals.GainedLeadership.WaitOrTimeout() + elector.testSignals.MaintainedLeadership.WaitOrTimeout() - // Re-elect the same leader. Use a larger TTL to see if time is updated, - // because we are in a test transaction and the time is frozen at the start of - // the transaction. - elected, err := attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{ - LeaderID: clientID, - TTL: 30 * time.Second, - Schema: "", - }) - require.NoError(t, err) - require.True(t, elected) // won re-election + signalRequestResign(ctx, t, elector) - // expires_at should be incremented because this is the same leader that won - // previously and we specified that we're already elected: - updatedLeader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ - Schema: "", - }) - require.NoError(t, err) - require.Greater(t, updatedLeader.ExpiresAt, leader.ExpiresAt) + elector.testSignals.ResignedLeadership.WaitOrTimeout() + elector.testSignals.GainedLeadership.WaitOrTimeout() }) - t.Run("CannotElectDifferentLeader", func(t *testing.T) { + t.Run("StartsGainsLeadershipAndStops", func(t *testing.T) { t.Parallel() - bundle := setup(t) - - leader := testfactory.Leader(ctx, t, bundle.exec, &testfactory.LeaderOpts{ - LeaderID: ptrutil.Ptr(clientID), - Schema: "", - }) + elector, bundle := setup(t, nil) - elected, err := attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{ - LeaderID: "different-client-id", - TTL: leaderTTL, - Schema: "", - }) - require.NoError(t, err) - require.False(t, elected) // lost election + startElector(ctx, t, elector) + elector.testSignals.GainedLeadership.WaitOrTimeout() - // The time should not have changed because we specified that we were not - // already elected, and the elect query is a no-op if there's already a - // updatedLeader: - updatedLeader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ - Schema: "", + leader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ + Schema: elector.config.Schema, }) require.NoError(t, err) - require.Equal(t, leader.ExpiresAt, updatedLeader.ExpiresAt) - }) -} - -func TestElectorHandleLeadershipNotification(t *testing.T) { - t.Parallel() - - var ( - ctx = context.Background() - driver = riverpgxv5.New(nil) - ) - - type testBundle struct{} - - setup := func(t *testing.T) (*Elector, *testBundle) { - t.Helper() - - tx := riverdbtest.TestTxPgx(ctx, t) - - elector := NewElector( - riversharedtest.BaseServiceArchetype(t), - driver.UnwrapExecutor(tx), - nil, - &Config{ClientID: "test_client_id"}, - ) - - // These channels are normally only initialized on start, so we need to - // create it manually here. - elector.requestResignChan = make(chan struct{}, 1) - elector.leaderResignedChan = make(chan struct{}, 1) - - return elector, &testBundle{} - } - - mustMarshalJSON := func(t *testing.T, val any) []byte { - t.Helper() - - data, err := json.Marshal(val) - require.NoError(t, err) - return data - } - - validLeadershipChange := func() *DBNotification { - t.Helper() - - return &DBNotification{ - Action: DBNotificationKindResigned, - LeaderID: "other-client-id", - } - } - - t.Run("SignalsLeadershipChange", func(t *testing.T) { - t.Parallel() - - elector, _ := setup(t) - - elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, validLeadershipChange()))) - - riversharedtest.WaitOrTimeout(t, elector.leaderResignedChan) - }) - - t.Run("StopsOnContextDone", func(t *testing.T) { - t.Parallel() - - elector, _ := setup(t) - - ctx, cancel := context.WithCancel(ctx) - cancel() // cancel immediately + require.Equal(t, elector.config.ClientID, leader.LeaderID) - elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, validLeadershipChange()))) + elector.Stop() + elector.testSignals.ResignedLeadership.WaitOrTimeout() - require.Empty(t, elector.leaderResignedChan) + _, err = bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ + Schema: elector.config.Schema, + }) + require.ErrorIs(t, err, rivertype.ErrNotFound) }) - t.Run("IgnoresNonResignedAction", func(t *testing.T) { + t.Run("StartStopStress", func(t *testing.T) { t.Parallel() - elector, _ := setup(t) - - change := validLeadershipChange() - change.Action = "not_resigned" - - elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, change))) + elector, _ := setup(t, &testOpts{stress: true}) + elector.Logger = riversharedtest.LoggerWarn(t) + elector.testSignals = electorTestSignals{} - require.Empty(t, elector.leaderResignedChan) + startstoptest.Stress(ctx, t, elector) }) - t.Run("IgnoresSameClientID", func(t *testing.T) { + t.Run("SustainsLeadership", func(t *testing.T) { t.Parallel() - elector, _ := setup(t) + elector, _ := setup(t, nil) + elector.config.ElectInterval = 10 * time.Millisecond + elector.config.ElectIntervalJitter = time.Millisecond - change := validLeadershipChange() - change.LeaderID = elector.config.ClientID + startElector(ctx, t, elector) + elector.testSignals.GainedLeadership.WaitOrTimeout() - elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, change))) + elector.testSignals.MaintainedLeadership.WaitOrTimeout() + elector.testSignals.MaintainedLeadership.WaitOrTimeout() + elector.testSignals.MaintainedLeadership.WaitOrTimeout() - require.Empty(t, elector.leaderResignedChan) + elector.Stop() + elector.testSignals.ResignedLeadership.WaitOrTimeout() }) } diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 43eacb26..8727a6e3 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -224,8 +224,8 @@ type Executor interface { JobSetStateIfRunningMany(ctx context.Context, params *JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) JobUpdate(ctx context.Context, params *JobUpdateParams) (*rivertype.JobRow, error) JobUpdateFull(ctx context.Context, params *JobUpdateFullParams) (*rivertype.JobRow, error) - LeaderAttemptElect(ctx context.Context, params *LeaderElectParams) (bool, error) - LeaderAttemptReelect(ctx context.Context, params *LeaderElectParams) (bool, error) + LeaderAttemptElect(ctx context.Context, params *LeaderElectParams) (*Leader, error) + LeaderAttemptReelect(ctx context.Context, params *LeaderReelectParams) (*Leader, error) LeaderDeleteExpired(ctx context.Context, params *LeaderDeleteExpiredParams) (int, error) LeaderGetElectedLeader(ctx context.Context, params *LeaderGetElectedLeaderParams) (*Leader, error) LeaderInsert(ctx context.Context, params *LeaderInsertParams) (*Leader, error) @@ -701,7 +701,16 @@ type LeaderElectParams struct { TTL time.Duration } +type LeaderReelectParams struct { + ElectedAt time.Time + LeaderID string + Now *time.Time + Schema string + TTL time.Duration +} + type LeaderResignParams struct { + ElectedAt time.Time LeaderID string LeadershipTopic string Schema string diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go index b3b63848..ea4dc674 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go @@ -11,7 +11,7 @@ import ( "time" ) -const leaderAttemptElect = `-- name: LeaderAttemptElect :execrows +const leaderAttemptElect = `-- name: LeaderAttemptElect :one INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, @@ -24,6 +24,7 @@ INSERT INTO /* TEMPLATE: schema */river_leader ( ) ON CONFLICT (name) DO NOTHING +RETURNING elected_at, expires_at, leader_id, name ` type LeaderAttemptElectParams struct { @@ -32,43 +33,50 @@ type LeaderAttemptElectParams struct { TTL float64 } -func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) - if err != nil { - return 0, err - } - return result.RowsAffected() +func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (*RiverLeader, error) { + row := db.QueryRowContext(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) + var i RiverLeader + err := row.Scan( + &i.ElectedAt, + &i.ExpiresAt, + &i.LeaderID, + &i.Name, + ) + return &i, err } -const leaderAttemptReelect = `-- name: LeaderAttemptReelect :execrows -INSERT INTO /* TEMPLATE: schema */river_leader ( - leader_id, - elected_at, - expires_at -) VALUES ( - $1, - coalesce($2::timestamptz, now()), - coalesce($2::timestamptz, now()) + make_interval(secs => $3) -) -ON CONFLICT (name) - DO UPDATE SET - expires_at = EXCLUDED.expires_at - WHERE - river_leader.leader_id = $1 +const leaderAttemptReelect = `-- name: LeaderAttemptReelect :one +UPDATE /* TEMPLATE: schema */river_leader +SET expires_at = coalesce($1::timestamptz, now()) + make_interval(secs => $2) +WHERE + elected_at = $3::timestamptz + AND expires_at >= coalesce($1::timestamptz, now()) + AND leader_id = $4 +RETURNING elected_at, expires_at, leader_id, name ` type LeaderAttemptReelectParams struct { - LeaderID string - Now *time.Time - TTL float64 + Now *time.Time + TTL float64 + ElectedAt time.Time + LeaderID string } -func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptReelect, arg.LeaderID, arg.Now, arg.TTL) - if err != nil { - return 0, err - } - return result.RowsAffected() +func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (*RiverLeader, error) { + row := db.QueryRowContext(ctx, leaderAttemptReelect, + arg.Now, + arg.TTL, + arg.ElectedAt, + arg.LeaderID, + ) + var i RiverLeader + err := row.Scan( + &i.ElectedAt, + &i.ExpiresAt, + &i.LeaderID, + &i.Name, + ) + return &i, err } const leaderDeleteExpired = `-- name: LeaderDeleteExpired :execrows @@ -143,12 +151,14 @@ const leaderResign = `-- name: LeaderResign :execrows WITH currently_held_leaders AS ( SELECT elected_at, expires_at, leader_id, name FROM /* TEMPLATE: schema */river_leader - WHERE leader_id = $1::text + WHERE + elected_at = $1::timestamptz + AND leader_id = $2::text FOR UPDATE ), notified_resignations AS ( SELECT pg_notify( - concat(coalesce($2::text, current_schema()), '.', $3::text), + concat(coalesce($3::text, current_schema()), '.', $4::text), json_build_object('leader_id', leader_id, 'action', 'resigned')::text ) FROM currently_held_leaders @@ -157,13 +167,19 @@ DELETE FROM /* TEMPLATE: schema */river_leader USING notified_resignations ` type LeaderResignParams struct { + ElectedAt time.Time LeaderID string Schema sql.NullString LeadershipTopic string } func (q *Queries) LeaderResign(ctx context.Context, db DBTX, arg *LeaderResignParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderResign, arg.LeaderID, arg.Schema, arg.LeadershipTopic) + result, err := db.ExecContext(ctx, leaderResign, + arg.ElectedAt, + arg.LeaderID, + arg.Schema, + arg.LeadershipTopic, + ) if err != nil { return 0, err } diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index ef41a525..d8a2d445 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -716,28 +716,29 @@ func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpd return jobRowFromInternal(job) } -func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { - numElectionsWon, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ +func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) { + leader, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, Now: params.Now, TTL: params.TTL.Seconds(), }) if err != nil { - return false, interpretError(err) + return nil, interpretError(err) } - return numElectionsWon > 0, nil + return leaderFromInternal(leader), nil } -func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { - numElectionsWon, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ - LeaderID: params.LeaderID, - Now: params.Now, - TTL: params.TTL.Seconds(), +func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderReelectParams) (*riverdriver.Leader, error) { + leader, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ + ElectedAt: params.ElectedAt, + LeaderID: params.LeaderID, + Now: params.Now, + TTL: params.TTL.Seconds(), }) if err != nil { - return false, interpretError(err) + return nil, interpretError(err) } - return numElectionsWon > 0, nil + return leaderFromInternal(leader), nil } func (e *Executor) LeaderDeleteExpired(ctx context.Context, params *riverdriver.LeaderDeleteExpiredParams) (int, error) { @@ -772,6 +773,7 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI func (e *Executor) LeaderResign(ctx context.Context, params *riverdriver.LeaderResignParams) (bool, error) { numResigned, err := dbsqlc.New().LeaderResign(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderResignParams{ + ElectedAt: params.ElectedAt, LeaderID: params.LeaderID, LeadershipTopic: params.LeadershipTopic, Schema: sql.NullString{String: params.Schema, Valid: params.Schema != ""}, diff --git a/riverdriver/riverdrivertest/leader.go b/riverdriver/riverdrivertest/leader.go index 23d0fbdd..f81904f8 100644 --- a/riverdriver/riverdrivertest/leader.go +++ b/riverdriver/riverdrivertest/leader.go @@ -11,6 +11,7 @@ import ( "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/testfactory" "github.com/riverqueue/river/rivershared/util/ptrutil" + "github.com/riverqueue/river/rivertype" ) func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx func(ctx context.Context, t *testing.T) (riverdriver.Executor, riverdriver.Driver[TTx])) { @@ -48,18 +49,21 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f now := time.Now().UTC() - elected, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ + leader, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ LeaderID: testClientID, Now: &now, TTL: leaderTTL, }) require.NoError(t, err) - require.True(t, elected) // won election - - leader, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) - require.NoError(t, err) require.WithinDuration(t, now, leader.ElectedAt, bundle.driver.TimePrecision()) require.WithinDuration(t, now.Add(leaderTTL), leader.ExpiresAt, bundle.driver.TimePrecision()) + require.Equal(t, testClientID, leader.LeaderID) + + leaderFromDB, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + require.NoError(t, err) + require.WithinDuration(t, now, leaderFromDB.ElectedAt, bundle.driver.TimePrecision()) + require.WithinDuration(t, now.Add(leaderTTL), leaderFromDB.ExpiresAt, bundle.driver.TimePrecision()) + require.Equal(t, testClientID, leaderFromDB.LeaderID) }) t.Run("CannotElectTwiceInARow", func(t *testing.T) { @@ -71,12 +75,12 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f LeaderID: ptrutil.Ptr(testClientID), }) - elected, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ + leaderAttempt, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ LeaderID: "different-client-id", TTL: leaderTTL, }) - require.NoError(t, err) - require.False(t, elected) // lost election + require.ErrorIs(t, err, rivertype.ErrNotFound) + require.Nil(t, leaderAttempt) // The time should not have changed because we specified that we were not // already elected, and the elect query is a no-op if there's already a @@ -91,48 +95,50 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f exec, _ := setup(ctx, t) - elected, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ + leader, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ LeaderID: testClientID, TTL: leaderTTL, }) require.NoError(t, err) - require.True(t, elected) // won election + require.Equal(t, testClientID, leader.LeaderID) - leader, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + leaderFromDB, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) require.NoError(t, err) - require.WithinDuration(t, time.Now(), leader.ElectedAt, veryGenerousTimeCompareTolerance) - require.WithinDuration(t, time.Now().Add(leaderTTL), leader.ExpiresAt, veryGenerousTimeCompareTolerance) + require.WithinDuration(t, time.Now(), leaderFromDB.ElectedAt, veryGenerousTimeCompareTolerance) + require.WithinDuration(t, time.Now().Add(leaderTTL), leaderFromDB.ExpiresAt, veryGenerousTimeCompareTolerance) }) }) t.Run("LeaderAttemptReelect", func(t *testing.T) { t.Parallel() - t.Run("ElectsLeader", func(t *testing.T) { + t.Run("DoesNotReelectDifferentLeaderID", func(t *testing.T) { t.Parallel() - exec, bundle := setup(ctx, t) + exec, _ := setup(ctx, t) - now := time.Now().UTC() + leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + LeaderID: ptrutil.Ptr("other-client-id"), + }) - elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ - LeaderID: testClientID, - Now: &now, - TTL: leaderTTL, + updatedLeader, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderReelectParams{ + ElectedAt: leader.ElectedAt, + LeaderID: testClientID, + TTL: leaderTTL, }) - require.NoError(t, err) - require.True(t, elected) // won election + require.ErrorIs(t, err, rivertype.ErrNotFound) + require.Nil(t, updatedLeader) - leader, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + leaderFromDB, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) require.NoError(t, err) - require.WithinDuration(t, now, leader.ElectedAt, bundle.driver.TimePrecision()) - require.WithinDuration(t, now.Add(leaderTTL), leader.ExpiresAt, bundle.driver.TimePrecision()) + require.Equal(t, leader.LeaderID, leaderFromDB.LeaderID) + require.Equal(t, leader.ElectedAt, leaderFromDB.ElectedAt) }) t.Run("ReelectsSameLeader", func(t *testing.T) { t.Parallel() - exec, _ := setup(ctx, t) + exec, bundle := setup(ctx, t) leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ LeaderID: ptrutil.Ptr(testClientID), @@ -141,59 +147,93 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f // Re-elect the same leader. Use a larger TTL to see if time is updated, // because we are in a test transaction and the time is frozen at the start of // the transaction. - elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ - LeaderID: testClientID, - TTL: 30 * time.Second, + updatedLeader, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderReelectParams{ + ElectedAt: leader.ElectedAt, + LeaderID: testClientID, + TTL: 30 * time.Second, }) require.NoError(t, err) - require.True(t, elected) // won re-election + require.Equal(t, testClientID, updatedLeader.LeaderID) + require.WithinDuration(t, leader.ElectedAt, updatedLeader.ElectedAt, bundle.driver.TimePrecision()) // expires_at should be incremented because this is the same leader that won // previously and we specified that we're already elected: - updatedLeader, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + leaderFromDB, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) require.NoError(t, err) - require.Greater(t, updatedLeader.ExpiresAt, leader.ExpiresAt) + require.Greater(t, leaderFromDB.ExpiresAt, leader.ExpiresAt) + require.WithinDuration(t, updatedLeader.ElectedAt, leaderFromDB.ElectedAt, bundle.driver.TimePrecision()) }) - t.Run("DoesNotReelectDifferentLeader", func(t *testing.T) { + t.Run("DoesNotReelectExpiredRowThatIsNotYetDeleted", func(t *testing.T) { t.Parallel() exec, _ := setup(ctx, t) + now := time.Now().UTC() leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ - LeaderID: ptrutil.Ptr(testClientID), + ElectedAt: ptrutil.Ptr(now.Add(-2 * time.Hour)), + ExpiresAt: ptrutil.Ptr(now.Add(-1 * time.Hour)), + LeaderID: ptrutil.Ptr(testClientID), }) - elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ - LeaderID: "different-client", - TTL: 30 * time.Second, + updatedLeader, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderReelectParams{ + ElectedAt: leader.ElectedAt, + LeaderID: testClientID, + TTL: 30 * time.Second, }) + require.ErrorIs(t, err, rivertype.ErrNotFound) + require.Nil(t, updatedLeader) + + leaderFromDB, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) require.NoError(t, err) - require.False(t, elected) // did not win re-election + require.Equal(t, leader.ElectedAt, leaderFromDB.ElectedAt) + require.Equal(t, leader.ExpiresAt, leaderFromDB.ExpiresAt) + }) - // expires_at should be incremented because this is the same leader that won - // previously and we specified that we're already elected: - updatedLeader, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + t.Run("DoesNotReelectMismatchedElectedAt", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + LeaderID: ptrutil.Ptr(testClientID), + }) + + updatedLeader, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderReelectParams{ + ElectedAt: leader.ElectedAt.Add(-time.Second), + LeaderID: testClientID, + TTL: 30 * time.Second, + }) + require.ErrorIs(t, err, rivertype.ErrNotFound) + require.Nil(t, updatedLeader) + + leaderFromDB, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) require.NoError(t, err) - require.Equal(t, leader.LeaderID, updatedLeader.LeaderID) + require.Equal(t, leader.ElectedAt, leaderFromDB.ElectedAt) }) t.Run("WithoutNow", func(t *testing.T) { t.Parallel() - exec, _ := setup(ctx, t) + exec, bundle := setup(ctx, t) - elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ - LeaderID: testClientID, - TTL: leaderTTL, + leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + LeaderID: ptrutil.Ptr(testClientID), + }) + + updatedLeader, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderReelectParams{ + ElectedAt: leader.ElectedAt, + LeaderID: testClientID, + TTL: leaderTTL, }) require.NoError(t, err) - require.True(t, elected) // won election + require.Equal(t, testClientID, updatedLeader.LeaderID) + require.WithinDuration(t, leader.ElectedAt, updatedLeader.ElectedAt, bundle.driver.TimePrecision()) - leader, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + leaderFromDB, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) require.NoError(t, err) - require.WithinDuration(t, time.Now(), leader.ElectedAt, veryGenerousTimeCompareTolerance) - require.WithinDuration(t, time.Now().Add(leaderTTL), leader.ExpiresAt, veryGenerousTimeCompareTolerance) + require.WithinDuration(t, leader.ElectedAt, leaderFromDB.ElectedAt, bundle.driver.TimePrecision()) + require.WithinDuration(t, time.Now().Add(leaderTTL), leaderFromDB.ExpiresAt, veryGenerousTimeCompareTolerance) }) }) @@ -318,6 +358,7 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f { resigned, err := exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{ + ElectedAt: time.Now().UTC(), LeaderID: testClientID, LeadershipTopic: string(notifier.NotificationTopicLeadership), }) @@ -325,12 +366,13 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f require.False(t, resigned) } - _ = testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ LeaderID: ptrutil.Ptr(testClientID), }) { resigned, err := exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{ + ElectedAt: leader.ElectedAt, LeaderID: testClientID, LeadershipTopic: string(notifier.NotificationTopicLeadership), }) @@ -344,16 +386,53 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f exec, _ := setup(ctx, t) - _ = testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ LeaderID: ptrutil.Ptr("other-client-id"), }) resigned, err := exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{ + ElectedAt: leader.ElectedAt, LeaderID: testClientID, LeadershipTopic: string(notifier.NotificationTopicLeadership), }) require.NoError(t, err) require.False(t, resigned) }) + + t.Run("DoesNotResignNewerTermForSameLeaderID", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + oldLeader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + ElectedAt: ptrutil.Ptr(now.Add(-2 * time.Hour)), + ExpiresAt: ptrutil.Ptr(now.Add(-1 * time.Hour)), + LeaderID: ptrutil.Ptr(testClientID), + }) + + numDeleted, err := exec.LeaderDeleteExpired(ctx, &riverdriver.LeaderDeleteExpiredParams{Now: &now}) + require.NoError(t, err) + require.Equal(t, 1, numDeleted) + + newLeader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + ElectedAt: ptrutil.Ptr(now), + LeaderID: ptrutil.Ptr(testClientID), + }) + + resigned, err := exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{ + ElectedAt: oldLeader.ElectedAt, + LeaderID: testClientID, + LeadershipTopic: string(notifier.NotificationTopicLeadership), + }) + require.NoError(t, err) + require.False(t, resigned) + + leaderFromDB, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + require.NoError(t, err) + require.Equal(t, newLeader.LeaderID, leaderFromDB.LeaderID) + require.Equal(t, newLeader.ElectedAt, leaderFromDB.ElectedAt) + }) }) } diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql index e2417d86..cea2195f 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql @@ -7,7 +7,7 @@ CREATE UNLOGGED TABLE river_leader( CONSTRAINT leader_id_length CHECK (char_length(leader_id) > 0 AND char_length(leader_id) < 128) ); --- name: LeaderAttemptElect :execrows +-- name: LeaderAttemptElect :one INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, @@ -19,23 +19,17 @@ INSERT INTO /* TEMPLATE: schema */river_leader ( coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl) ) ON CONFLICT (name) - DO NOTHING; + DO NOTHING +RETURNING *; --- name: LeaderAttemptReelect :execrows -INSERT INTO /* TEMPLATE: schema */river_leader ( - leader_id, - elected_at, - expires_at -) VALUES ( - @leader_id, - coalesce(sqlc.narg('now')::timestamptz, now()), - coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl) -) -ON CONFLICT (name) - DO UPDATE SET - expires_at = EXCLUDED.expires_at - WHERE - river_leader.leader_id = @leader_id; +-- name: LeaderAttemptReelect :one +UPDATE /* TEMPLATE: schema */river_leader +SET expires_at = coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl) +WHERE + elected_at = @elected_at::timestamptz + AND expires_at >= coalesce(sqlc.narg('now')::timestamptz, now()) + AND leader_id = @leader_id +RETURNING *; -- name: LeaderDeleteExpired :execrows DELETE FROM /* TEMPLATE: schema */river_leader @@ -60,7 +54,9 @@ INSERT INTO /* TEMPLATE: schema */river_leader( WITH currently_held_leaders AS ( SELECT * FROM /* TEMPLATE: schema */river_leader - WHERE leader_id = @leader_id::text + WHERE + elected_at = @elected_at::timestamptz + AND leader_id = @leader_id::text FOR UPDATE ), notified_resignations AS ( @@ -70,4 +66,4 @@ notified_resignations AS ( ) FROM currently_held_leaders ) -DELETE FROM /* TEMPLATE: schema */river_leader USING notified_resignations; \ No newline at end of file +DELETE FROM /* TEMPLATE: schema */river_leader USING notified_resignations; diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go index 389ef8b1..4fcea1cf 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go @@ -12,7 +12,7 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) -const leaderAttemptElect = `-- name: LeaderAttemptElect :execrows +const leaderAttemptElect = `-- name: LeaderAttemptElect :one INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, @@ -25,6 +25,7 @@ INSERT INTO /* TEMPLATE: schema */river_leader ( ) ON CONFLICT (name) DO NOTHING +RETURNING elected_at, expires_at, leader_id, name ` type LeaderAttemptElectParams struct { @@ -33,43 +34,50 @@ type LeaderAttemptElectParams struct { TTL float64 } -func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (int64, error) { - result, err := db.Exec(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) - if err != nil { - return 0, err - } - return result.RowsAffected(), nil +func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (*RiverLeader, error) { + row := db.QueryRow(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) + var i RiverLeader + err := row.Scan( + &i.ElectedAt, + &i.ExpiresAt, + &i.LeaderID, + &i.Name, + ) + return &i, err } -const leaderAttemptReelect = `-- name: LeaderAttemptReelect :execrows -INSERT INTO /* TEMPLATE: schema */river_leader ( - leader_id, - elected_at, - expires_at -) VALUES ( - $1, - coalesce($2::timestamptz, now()), - coalesce($2::timestamptz, now()) + make_interval(secs => $3) -) -ON CONFLICT (name) - DO UPDATE SET - expires_at = EXCLUDED.expires_at - WHERE - river_leader.leader_id = $1 +const leaderAttemptReelect = `-- name: LeaderAttemptReelect :one +UPDATE /* TEMPLATE: schema */river_leader +SET expires_at = coalesce($1::timestamptz, now()) + make_interval(secs => $2) +WHERE + elected_at = $3::timestamptz + AND expires_at >= coalesce($1::timestamptz, now()) + AND leader_id = $4 +RETURNING elected_at, expires_at, leader_id, name ` type LeaderAttemptReelectParams struct { - LeaderID string - Now *time.Time - TTL float64 + Now *time.Time + TTL float64 + ElectedAt time.Time + LeaderID string } -func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (int64, error) { - result, err := db.Exec(ctx, leaderAttemptReelect, arg.LeaderID, arg.Now, arg.TTL) - if err != nil { - return 0, err - } - return result.RowsAffected(), nil +func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (*RiverLeader, error) { + row := db.QueryRow(ctx, leaderAttemptReelect, + arg.Now, + arg.TTL, + arg.ElectedAt, + arg.LeaderID, + ) + var i RiverLeader + err := row.Scan( + &i.ElectedAt, + &i.ExpiresAt, + &i.LeaderID, + &i.Name, + ) + return &i, err } const leaderDeleteExpired = `-- name: LeaderDeleteExpired :execrows @@ -144,12 +152,14 @@ const leaderResign = `-- name: LeaderResign :execrows WITH currently_held_leaders AS ( SELECT elected_at, expires_at, leader_id, name FROM /* TEMPLATE: schema */river_leader - WHERE leader_id = $1::text + WHERE + elected_at = $1::timestamptz + AND leader_id = $2::text FOR UPDATE ), notified_resignations AS ( SELECT pg_notify( - concat(coalesce($2::text, current_schema()), '.', $3::text), + concat(coalesce($3::text, current_schema()), '.', $4::text), json_build_object('leader_id', leader_id, 'action', 'resigned')::text ) FROM currently_held_leaders @@ -158,13 +168,19 @@ DELETE FROM /* TEMPLATE: schema */river_leader USING notified_resignations ` type LeaderResignParams struct { + ElectedAt time.Time LeaderID string Schema pgtype.Text LeadershipTopic string } func (q *Queries) LeaderResign(ctx context.Context, db DBTX, arg *LeaderResignParams) (int64, error) { - result, err := db.Exec(ctx, leaderResign, arg.LeaderID, arg.Schema, arg.LeadershipTopic) + result, err := db.Exec(ctx, leaderResign, + arg.ElectedAt, + arg.LeaderID, + arg.Schema, + arg.LeadershipTopic, + ) if err != nil { return 0, err } diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 9d3fed61..e0201a0e 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -701,28 +701,29 @@ func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpd return jobRowFromInternal(job) } -func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { - numElectionsWon, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ +func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) { + leader, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, Now: params.Now, TTL: params.TTL.Seconds(), }) if err != nil { - return false, interpretError(err) + return nil, interpretError(err) } - return numElectionsWon > 0, nil + return leaderFromInternal(leader), nil } -func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { - numElectionsWon, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ - LeaderID: params.LeaderID, - Now: params.Now, - TTL: params.TTL.Seconds(), +func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderReelectParams) (*riverdriver.Leader, error) { + leader, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ + ElectedAt: params.ElectedAt, + LeaderID: params.LeaderID, + Now: params.Now, + TTL: params.TTL.Seconds(), }) if err != nil { - return false, interpretError(err) + return nil, interpretError(err) } - return numElectionsWon > 0, nil + return leaderFromInternal(leader), nil } func (e *Executor) LeaderDeleteExpired(ctx context.Context, params *riverdriver.LeaderDeleteExpiredParams) (int, error) { @@ -757,6 +758,7 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI func (e *Executor) LeaderResign(ctx context.Context, params *riverdriver.LeaderResignParams) (bool, error) { numResigned, err := dbsqlc.New().LeaderResign(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderResignParams{ + ElectedAt: params.ElectedAt, LeaderID: params.LeaderID, LeadershipTopic: params.LeadershipTopic, Schema: pgtype.Text{String: params.Schema, Valid: params.Schema != ""}, diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql index c5da22cb..398eadf9 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql @@ -7,7 +7,7 @@ CREATE TABLE river_leader ( CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128) ); --- name: LeaderAttemptElect :execrows +-- name: LeaderAttemptElect :one INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, @@ -18,23 +18,17 @@ INSERT INTO /* TEMPLATE: schema */river_leader ( datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text)) ) ON CONFLICT (name) - DO NOTHING; + DO NOTHING +RETURNING *; --- name: LeaderAttemptReelect :execrows -INSERT INTO /* TEMPLATE: schema */river_leader ( - leader_id, - elected_at, - expires_at -) VALUES ( - @leader_id, - coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), - datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text)) -) -ON CONFLICT (name) - DO UPDATE SET - expires_at = EXCLUDED.expires_at - WHERE - leader_id = EXCLUDED.leader_id; +-- name: LeaderAttemptReelect :one +UPDATE /* TEMPLATE: schema */river_leader +SET expires_at = datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text)) +WHERE + unixepoch(elected_at, 'subsec') = unixepoch(cast(@elected_at AS text), 'subsec') + AND expires_at >= coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')) + AND leader_id = @leader_id +RETURNING *; -- name: LeaderDeleteExpired :execrows DELETE FROM /* TEMPLATE: schema */river_leader @@ -58,4 +52,6 @@ INSERT INTO /* TEMPLATE: schema */river_leader( -- name: LeaderResign :execrows DELETE FROM /* TEMPLATE: schema */river_leader -WHERE leader_id = @leader_id; \ No newline at end of file +WHERE + unixepoch(elected_at, 'subsec') = unixepoch(cast(@elected_at AS text), 'subsec') + AND leader_id = @leader_id; diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go index 6e452d0e..a43227f0 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go @@ -9,7 +9,7 @@ import ( "context" ) -const leaderAttemptElect = `-- name: LeaderAttemptElect :execrows +const leaderAttemptElect = `-- name: LeaderAttemptElect :one INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, @@ -21,6 +21,7 @@ INSERT INTO /* TEMPLATE: schema */river_leader ( ) ON CONFLICT (name) DO NOTHING +RETURNING elected_at, expires_at, leader_id, name ` type LeaderAttemptElectParams struct { @@ -29,43 +30,50 @@ type LeaderAttemptElectParams struct { TTL string } -func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) - if err != nil { - return 0, err - } - return result.RowsAffected() +func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (*RiverLeader, error) { + row := db.QueryRowContext(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) + var i RiverLeader + err := row.Scan( + &i.ElectedAt, + &i.ExpiresAt, + &i.LeaderID, + &i.Name, + ) + return &i, err } -const leaderAttemptReelect = `-- name: LeaderAttemptReelect :execrows -INSERT INTO /* TEMPLATE: schema */river_leader ( - leader_id, - elected_at, - expires_at -) VALUES ( - ?1, - coalesce(cast(?2 AS text), datetime('now', 'subsec')), - datetime(coalesce(cast(?2 AS text), datetime('now', 'subsec')), 'subsec', cast(?3 as text)) -) -ON CONFLICT (name) - DO UPDATE SET - expires_at = EXCLUDED.expires_at - WHERE - leader_id = EXCLUDED.leader_id +const leaderAttemptReelect = `-- name: LeaderAttemptReelect :one +UPDATE /* TEMPLATE: schema */river_leader +SET expires_at = datetime(coalesce(cast(?1 AS text), datetime('now', 'subsec')), 'subsec', cast(?2 as text)) +WHERE + unixepoch(elected_at, 'subsec') = unixepoch(cast(?3 AS text), 'subsec') + AND expires_at >= coalesce(cast(?1 AS text), datetime('now', 'subsec')) + AND leader_id = ?4 +RETURNING elected_at, expires_at, leader_id, name ` type LeaderAttemptReelectParams struct { - LeaderID string - Now *string - TTL string + Now *string + TTL string + ElectedAt string + LeaderID string } -func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptReelect, arg.LeaderID, arg.Now, arg.TTL) - if err != nil { - return 0, err - } - return result.RowsAffected() +func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (*RiverLeader, error) { + row := db.QueryRowContext(ctx, leaderAttemptReelect, + arg.Now, + arg.TTL, + arg.ElectedAt, + arg.LeaderID, + ) + var i RiverLeader + err := row.Scan( + &i.ElectedAt, + &i.ExpiresAt, + &i.LeaderID, + &i.Name, + ) + return &i, err } const leaderDeleteExpired = `-- name: LeaderDeleteExpired :execrows @@ -139,11 +147,18 @@ func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertPa const leaderResign = `-- name: LeaderResign :execrows DELETE FROM /* TEMPLATE: schema */river_leader -WHERE leader_id = ?1 +WHERE + unixepoch(elected_at, 'subsec') = unixepoch(cast(?1 AS text), 'subsec') + AND leader_id = ?2 ` -func (q *Queries) LeaderResign(ctx context.Context, db DBTX, leaderID string) (int64, error) { - result, err := db.ExecContext(ctx, leaderResign, leaderID) +type LeaderResignParams struct { + ElectedAt string + LeaderID string +} + +func (q *Queries) LeaderResign(ctx context.Context, db DBTX, arg *LeaderResignParams) (int64, error) { + result, err := db.ExecContext(ctx, leaderResign, arg.ElectedAt, arg.LeaderID) if err != nil { return 0, err } diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 1eda9f14..586fdc53 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -1099,28 +1099,29 @@ func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpd return jobRowFromInternal(job) } -func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { - numElectionsWon, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ +func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) { + leader, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, Now: timeStringNullable(params.Now), TTL: durationAsString(params.TTL), }) if err != nil { - return false, interpretError(err) + return nil, interpretError(err) } - return numElectionsWon > 0, nil + return leaderFromInternal(leader), nil } -func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { - numElectionsWon, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ - LeaderID: params.LeaderID, - Now: timeStringNullable(params.Now), - TTL: durationAsString(params.TTL), +func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderReelectParams) (*riverdriver.Leader, error) { + leader, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ + ElectedAt: timeString(params.ElectedAt), + LeaderID: params.LeaderID, + Now: timeStringNullable(params.Now), + TTL: durationAsString(params.TTL), }) if err != nil { - return false, interpretError(err) + return nil, interpretError(err) } - return numElectionsWon > 0, nil + return leaderFromInternal(leader), nil } func (e *Executor) LeaderDeleteExpired(ctx context.Context, params *riverdriver.LeaderDeleteExpiredParams) (int, error) { @@ -1154,7 +1155,10 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI } func (e *Executor) LeaderResign(ctx context.Context, params *riverdriver.LeaderResignParams) (bool, error) { - numResigned, err := dbsqlc.New().LeaderResign(schemaTemplateParam(ctx, params.Schema), e.dbtx, params.LeaderID) + numResigned, err := dbsqlc.New().LeaderResign(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderResignParams{ + ElectedAt: timeString(params.ElectedAt), + LeaderID: params.LeaderID, + }) if err != nil { return false, interpretError(err) }