From 94f94fa3ea6e9d2e16f7d42b0d61f5b87ecaec40 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Wed, 15 Apr 2026 17:02:46 -0500 Subject: [PATCH] refactor leader election around DB-issued terms The old elector treated leadership as a lease held by one client ID and renewed over time. That was simple, but it left too much implicit. One leadership term was not clearly separated from the next, so reelection and resignation were not scoped to a specific term. That made same- client reacquisition harder to reason about, made it easy for stale work or cleanup to target the wrong lease, and left the elector carrying more responsibility for edge cases than it should have. This change makes the database issue explicit leadership terms using the columns we already have. `leader_id` remains the stable client ID, while `(leader_id, elected_at)` identifies one specific term. Elect, reelect, and resign now all operate on that exact term and return the leader row from the database. The elector keeps a bounded local trust window for its last successful confirmation, but that window is anchored to the attempt that produced it, not to when the response happened to arrive. That keeps slow successful reelections from stretching leadership past its real lease budget while still avoiding direct app-vs-database clock comparisons in the state machine. The notification and test story is also clearer after the rewrite. Slow subscribers now receive each leadership transition in order without blocking the elector, resignation wakeups are coalesced safely, and the poll-only coverage uses isolated fixtures so it can exercise real handoff behavior without shared-schema flakiness. The shared driver suite now covers term-scoped elect, reelect, and resign behavior across PostgreSQL and both SQLite backends, including same-client term replacement and stale-term rejection. The elector tests focus on the observable behaviors that matter: gaining leadership, handing it off, responding to resign requests, and stepping down cleanly when its trust window expires. This also rolls up the branch's earlier flake investigation and keeps the original CI reference for the shared-schema failures that led to the redesign: https://github.com/riverqueue/river/actions/runs/24406465152 --- CHANGELOG.md | 4 + internal/leadership/doc.go | 117 +++ internal/leadership/elector.go | 518 +++++++--- internal/leadership/elector_test.go | 964 +++++++++++------- riverdriver/river_driver_interface.go | 13 +- .../internal/dbsqlc/river_leader.sql.go | 84 +- .../river_database_sql_driver.go | 24 +- riverdriver/riverdrivertest/leader.go | 183 +++- .../internal/dbsqlc/river_leader.sql | 34 +- .../internal/dbsqlc/river_leader.sql.go | 84 +- riverdriver/riverpgxv5/river_pgx_v5_driver.go | 24 +- .../internal/dbsqlc/river_leader.sql | 32 +- .../internal/dbsqlc/river_leader.sql.go | 83 +- .../riversqlite/river_sqlite_driver.go | 28 +- 14 files changed, 1462 insertions(+), 730 deletions(-) create mode 100644 internal/leadership/doc.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ea739a78..93c84731 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fixed leader election to track explicit database-issued leadership terms, reducing handoff flakiness and same-client reacquisition edge cases while making reelection and resign target the current leadership lease instead of a stale one. [PR #1213](https://github.com/riverqueue/river/pull/1213). + ## [0.34.0] - 2026-04-08 ### Added diff --git a/internal/leadership/doc.go b/internal/leadership/doc.go new file mode 100644 index 00000000..63ff7226 --- /dev/null +++ b/internal/leadership/doc.go @@ -0,0 +1,117 @@ +// Package leadership implements leader election for River clients sharing a +// database schema. +// +// The database records at most one current leadership term at a time. The +// elected client runs distributed maintenance work such as queue management, +// job scheduling, and reindexing that should not be duplicated across clients. +// +// # Overview +// +// Leadership is modeled as a database-backed lease with an explicit term +// identity. +// +// A term is identified by: +// - `leader_id`: the stable client identity +// - `elected_at`: the database-issued timestamp for that specific term +// +// The database is authoritative for: +// - which client currently holds the leadership row +// - whether a term can be renewed +// - whether a term has already been replaced +// +// The process uses local time only to bound how long it trusts its last +// successful elect or reelect result. If it cannot renew a term in time, it +// steps down conservatively instead of continuing to act as leader on stale +// information. +// +// # State Model +// +// At a high level, an elector alternates between follower and leader states: +// +// Start +// │ +// ▼ +// ┌─────────────────────────────────────┐ +// │ Follower │ +// │ │ +// │ Retries election on timer or wakeup │ +// └──────────────────┬──────────────────┘ +// │ won election +// ▼ +// ┌─────────────────────────────────────┐ +// │ Leader │ +// │ │ +// │ Renews before trust window expires │ +// └──────────────────┬──────────────────┘ +// │ replaced / expired / resign requested / +// │ renewal failed for too long / shutdown +// ▼ +// Follower +// +// Followers attempt election periodically and can wake early when they learn +// that the previous leader resigned. Leaders renew their current term +// periodically. If renewal fails, the term is replaced, or the local trust +// window expires, the process stops acting as leader and returns to follower +// behavior. +// +// # Trust Window +// +// After each successful election or renewal, the elector computes a local +// trust deadline: +// +// trustedUntil = attemptStarted + TTL - safetyMargin +// +// This trust window has two important properties: +// - it is anchored to when the elect or reelect attempt started, so a slow +// successful database round trip cannot stretch leadership longer than the +// attempt budget allows +// - it ends before the database lease should expire, giving the process time +// to step down before it risks acting on a stale term +// +// The local trust window is a conservative stop condition, not an alternative +// source of truth. A client may step down while the database row is still +// present, but it should not continue acting as leader after it no longer +// trusts its last successful renewal. +// +// # Term-Scoped Operations +// +// Renewing and resigning are scoped to the exact term identified by +// `(leader_id, elected_at)`. +// +// That means: +// - an old term cannot accidentally renew a newer term for the same client +// - a delayed resign from an old term cannot delete a newer term for the +// same client +// - when the database says a term is gone, the elector can step down without +// ambiguity about which term it held +// +// # Notifications and Subscribers +// +// When a notifier is available, the elector listens for leadership-related +// events so followers can wake promptly and leaders can honor explicit +// resignation requests. +// +// Notification delivery is intentionally non-blocking: +// - wakeups may coalesce, because multiple rapid resignations only need to +// prompt another election attempt +// - polling remains the fallback when notifications are unavailable or missed +// +// Consumers inside the process can subscribe to leadership transitions. Those +// subscriptions preserve ordered `true`/`false` transitions so downstream +// maintenance components can reliably start and stop work, while still keeping +// slow subscribers from blocking the elector itself. +// +// # Failure Handling +// +// The system is intentionally conservative under failures: +// - if renewal errors persist until the trust window is exhausted, the leader +// steps down +// - if the database reports that the current term no longer exists, the +// leader steps down immediately +// - if resignation fails during shutdown or after a local timeout, the +// database lease expiry remains the safety net that eventually allows a new +// election +// +// This design keeps leadership decisions centered on the database while using +// local time only to stop trusting stale state sooner rather than later. +package leadership diff --git a/internal/leadership/elector.go b/internal/leadership/elector.go index 53b2d5f9..c9008582 100644 --- a/internal/leadership/elector.go +++ b/internal/leadership/elector.go @@ -20,12 +20,14 @@ import ( "github.com/riverqueue/river/rivershared/util/randutil" "github.com/riverqueue/river/rivershared/util/serviceutil" "github.com/riverqueue/river/rivershared/util/testutil" + "github.com/riverqueue/river/rivertype" ) const ( - electIntervalDefault = 5 * time.Second - electIntervalJitterDefault = 1 * time.Second - electIntervalTTLPaddingDefault = 10 * time.Second + electIntervalDefault = 5 * time.Second + electIntervalJitterDefault = 1 * time.Second + electIntervalTTLPaddingDefault = 10 * time.Second + leaderLocalDeadlineSafetyMargin = 1 * time.Second ) type DBNotification struct { @@ -45,16 +47,29 @@ type Notification struct { Timestamp time.Time } +// Subscription is a client-facing stream of leadership transitions. +// +// The elector publishes every transition (`false -> true -> false -> ...`) to +// each subscription. Delivery is delegated to a subscriptionRelay so the +// elector never blocks on a slow listener. type Subscription struct { creationTime time.Time - ch chan *Notification + relay *subscriptionRelay unlistenOnce *sync.Once e *Elector } func (s *Subscription) C() <-chan *Notification { - return s.ch + return s.relay.C() +} + +func (s *Subscription) enqueue(notification *Notification) { + s.relay.enqueue(notification) +} + +func (s *Subscription) stop() { + s.relay.stop() } func (s *Subscription) Unlisten() { @@ -63,6 +78,111 @@ func (s *Subscription) Unlisten() { }) } +// subscriptionRelay decouples elector publication from subscriber consumption. +// +// The elector may need to publish `true` and `false` transitions promptly while +// maintenance components are still busy reacting to the previous one. A plain +// buffered channel would either block the elector or force us to drop +// transitions when the buffer filled. The relay solves that by: +// - appending every Notification to an in-memory FIFO queue +// - waking a dedicated goroutine via `pendingChan` +// - letting that goroutine drain queued notifications into the subscriber's +// public channel `ch` +// +// `pendingChan` is only a wakeup signal. It does not carry the notifications +// themselves, so multiple sends may coalesce while the goroutine is already +// awake. That is safe because the authoritative queue is pendingNotifications. +type subscriptionRelay struct { + ch chan *Notification // public per-subscription delivery channel + done chan struct{} // closes the relay goroutine during Unlisten/stop + pendingChan chan struct{} // coalesced wakeup signal that queued work exists + + // pendingNotifications preserves every leadership transition in order. + // A dedicated goroutine drains it into `ch` so slow subscribers cannot + // block the elector, but consumers like QueueMaintainerLeader still see + // each `false` transition instead of only the latest state. + pendingMu sync.Mutex + pendingNotifications []*Notification +} + +func newSubscriptionRelay() *subscriptionRelay { + relay := &subscriptionRelay{ + ch: make(chan *Notification, 1), + done: make(chan struct{}), + pendingChan: make(chan struct{}, 1), + } + + go relay.run() + + return relay +} + +func (r *subscriptionRelay) C() <-chan *Notification { + return r.ch +} + +// enqueue appends a transition to the pending FIFO, then nudges the relay +// goroutine. The notification argument is the exact transition to preserve in +// order; unlike the notifier wakeup path elsewhere in the elector, these items +// must not be coalesced or replaced. +func (r *subscriptionRelay) enqueue(notification *Notification) { + r.pendingMu.Lock() + r.pendingNotifications = append(r.pendingNotifications, notification) + r.pendingMu.Unlock() + + select { + case r.pendingChan <- struct{}{}: + default: + } +} + +// nextPending pops the next queued notification for the relay goroutine. +func (r *subscriptionRelay) nextPending() (*Notification, bool) { + r.pendingMu.Lock() + defer r.pendingMu.Unlock() + + if len(r.pendingNotifications) == 0 { + return nil, false + } + + notification := r.pendingNotifications[0] + r.pendingNotifications = r.pendingNotifications[1:] + return notification, true +} + +// run waits until queued work exists, then drains as many pending +// notifications as possible into the subscriber channel before sleeping again. +// It exits promptly when stop closes done. +func (r *subscriptionRelay) run() { + for { + select { + case <-r.done: + return + + case <-r.pendingChan: + } + + for { + notification, ok := r.nextPending() + if !ok { + break + } + + select { + case <-r.done: + return + case r.ch <- notification: + } + } + } +} + +// stop terminates the relay goroutine. Callers must ensure they stop enqueueing +// through the owning Subscription afterwards. +func (r *subscriptionRelay) stop() { + close(r.done) +} + // Test-only properties. type electorTestSignals struct { DeniedLeadership testsignal.TestSignal[struct{}] // notifies when elector fails to gain leadership @@ -102,16 +222,58 @@ type Elector struct { baseservice.BaseService startstop.BaseStartStop - config *Config - exec riverdriver.Executor - leaderResignedChan chan struct{} - notifier *notifier.Notifier - requestResignChan chan struct{} - testSignals electorTestSignals + config *Config + exec riverdriver.Executor + notifier *notifier.Notifier + testSignals electorTestSignals + wakeupChan chan struct{} + + mu sync.Mutex + isLeader bool + pendingRequestResign bool + subscriptions []*Subscription +} - mu sync.Mutex - isLeader bool - subscriptions []*Subscription +type leadershipTerm struct { + clientID string + electedAt time.Time + trustedUntil time.Time +} + +func newLeadershipTerm(clientID string, electedAt, attemptStarted time.Time, ttl time.Duration) leadershipTerm { + term := leadershipTerm{ + clientID: clientID, + electedAt: electedAt, + } + + trustDuration := ttl - leaderLocalDeadlineSafetyMargin + if trustDuration <= 0 { + term.trustedUntil = attemptStarted + return term + } + + term.trustedUntil = attemptStarted.Add(trustDuration) + return term +} + +func (t leadershipTerm) remaining(now time.Time) time.Duration { + if !t.trustedUntil.After(now) { + return 0 + } + + return t.trustedUntil.Sub(now) +} + +func (t leadershipTerm) reelectAttemptTimeout(now time.Time) time.Duration { + remainingDuration := t.remaining(now) + if remainingDuration <= 0 { + return 0 + } + if remainingDuration < deadlineTimeout { + return remainingDuration + } + + return deadlineTimeout } // NewElector returns an Elector using the given adapter. The name should correspond @@ -130,20 +292,26 @@ func NewElector(archetype *baseservice.Archetype, exec riverdriver.Executor, not }) } +func trySendWakeup(ctx context.Context, wakeupChan chan struct{}) { + if ctx.Err() != nil { + return + } + + select { + case <-ctx.Done(): + case wakeupChan <- struct{}{}: + default: + } +} + func (e *Elector) Start(ctx context.Context) error { ctx, shouldStart, started, stopped := e.StartInit(ctx) if !shouldStart { return nil } - // We'll send to this channel anytime a leader resigns on the key with `name`. - // Buffered to 1 so a send doesn't block if the receiving loop hasn't entered - // its select yet (e.g. between gaining and maintaining leadership). - e.leaderResignedChan = make(chan struct{}, 1) - - // Buffered to 1 so a send from handleLeadershipNotification doesn't block - // if keepLeadershipLoop hasn't entered its select yet. - e.requestResignChan = make(chan struct{}, 1) + // Buffered to 1 so notifications coalesce instead of blocking the elector. + e.wakeupChan = make(chan struct{}, 1) var sub *notifier.Subscription if e.notifier == nil { @@ -175,7 +343,8 @@ func (e *Elector) Start(ctx context.Context) error { } for { - if err := e.attemptGainLeadershipLoop(ctx); err != nil { + term, err := e.runFollowerState(ctx) + if err != nil { // Function above only returns an error if context was cancelled // or overall context is done. if !errors.Is(err, context.Canceled) && ctx.Err() == nil { @@ -184,19 +353,16 @@ func (e *Elector) Start(ctx context.Context) error { return } + e.publishLeadershipState(true) e.Logger.DebugContext(ctx, e.Name+": Gained leadership", "client_id", e.config.ClientID) e.testSignals.GainedLeadership.Signal(struct{}{}) - err := e.keepLeadershipLoop(ctx) + err = e.runLeaderState(ctx, term) if err != nil { if errors.Is(err, context.Canceled) { return } - if errors.Is(err, errLostLeadershipReelection) { - continue // lost leadership reelection; unusual but not a problem; don't log - } - e.Logger.ErrorContext(ctx, e.Name+": Error keeping leadership", "client_id", e.config.ClientID, "err", err) } } @@ -205,13 +371,18 @@ func (e *Elector) Start(ctx context.Context) error { return nil } -func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error { +// runFollowerState is the follower side of the elector state machine. It keeps +// attempting election until this client becomes leader or the elector stops. +func (e *Elector) runFollowerState(ctx context.Context) (leadershipTerm, error) { var attempt int for { attempt++ e.Logger.DebugContext(ctx, e.Name+": Attempting to gain leadership", "client_id", e.config.ClientID) + // Use the local monotonic-bearing clock for the trust window. The + // DB-facing timestamp path stays on NowUTCOrNil below. + attemptStarted := e.Time.Now() - elected, err := attemptElectOrReelect(ctx, e.exec, false, &riverdriver.LeaderElectParams{ + leader, err := attemptElect(ctx, e.exec, &riverdriver.LeaderElectParams{ LeaderID: e.config.ClientID, Now: e.Time.NowOrNil(), Schema: e.config.Schema, @@ -219,16 +390,18 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error { }) if err != nil { if errors.Is(err, context.Canceled) || ctx.Err() != nil { - return err + return leadershipTerm{}, err + } + if !errors.Is(err, rivertype.ErrNotFound) { + sleepDuration := serviceutil.ExponentialBackoff(attempt, serviceutil.MaxAttemptsBeforeResetDefault) + e.Logger.ErrorContext(ctx, e.Name+": Error attempting to elect", e.errorSlogArgs(err, attempt, sleepDuration)...) + serviceutil.CancellableSleep(ctx, sleepDuration) + continue } - - sleepDuration := serviceutil.ExponentialBackoff(attempt, serviceutil.MaxAttemptsBeforeResetDefault) - e.Logger.ErrorContext(ctx, e.Name+": Error attempting to elect", e.errorSlogArgs(err, attempt, sleepDuration)...) - serviceutil.CancellableSleep(ctx, sleepDuration) - continue } - if elected { - return nil + + if leader != nil { + return newLeadershipTerm(leader.LeaderID, leader.ElectedAt, attemptStarted, e.leaderTTL()), nil } attempt = 0 @@ -237,16 +410,12 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error { e.testSignals.DeniedLeadership.Signal(struct{}{}) select { - // TODO: This could potentially leak memory / timers if we're seeing a ton - // of resignations. May want to make this reusable & cancel it when retrying? - // We may also want to consider a specialized ticker utility that can tick - // within a random range. case <-serviceutil.CancellableSleepC(ctx, randutil.DurationBetween(e.config.ElectInterval, e.config.ElectInterval+e.config.ElectIntervalJitter)): if ctx.Err() != nil { // context done - return ctx.Err() + return leadershipTerm{}, ctx.Err() } - case <-e.leaderResignedChan: + case <-e.wakeupChan: // Somebody just resigned, try to win the next election after a very // short random interval (to prevent all clients from bidding at once). serviceutil.CancellableSleep(ctx, randutil.DurationBetween(0, 50*time.Millisecond)) @@ -278,56 +447,28 @@ func (e *Elector) handleLeadershipNotification(ctx context.Context, topic notifi switch notification.Action { case DBNotificationKindRequestResign: - e.mu.Lock() - isLeader := e.isLeader - e.mu.Unlock() - - if !isLeader { + if !e.markPendingRequestResign() { return } - select { - case <-ctx.Done(): - case e.requestResignChan <- struct{}{}: - default: - // if context is not done and requestResignChan has an item in it - // already, do nothing - } + trySendWakeup(ctx, e.wakeupChan) case DBNotificationKindResigned: // If this a resignation from _this_ client, ignore the change. if notification.LeaderID == e.config.ClientID { return } - select { - case <-ctx.Done(): - case e.leaderResignedChan <- struct{}{}: - } + trySendWakeup(ctx, e.wakeupChan) } } -var errLostLeadershipReelection = errors.New("lost leadership with no error") +// runLeaderState is the leader side of the elector state machine. It waits for +// either a reelection interval, a forced resignation, or shutdown. +func (e *Elector) runLeaderState(ctx context.Context, term leadershipTerm) error { + defer e.clearPendingRequestResign() + defer e.publishLeadershipState(false) -func (e *Elector) keepLeadershipLoop(ctx context.Context) error { - // notify all subscribers that we're the leader - e.notifySubscribers(true) - - // On the way out clear any another item that may have been added to - // requestResignChan. Having isLeader set to false will prevent additional - // items from being queued after this one. - defer func() { - select { - case <-e.requestResignChan: - default: - } - }() - - // Defer is LIFO. This will run after the resign below. - // - // This also sets e.isLeader = false. - defer e.notifySubscribers(false) - - var lostLeadership bool + shouldResign := true // Before the elector returns, run a delete with NOTIFY to give up any // leadership that we have. If we do that here, we guarantee that any locks @@ -336,88 +477,96 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error { // // This doesn't use ctx because it runs *after* the ctx is done. defer func() { - if !lostLeadership { - e.attemptResignLoop(ctx) // will resign using WithoutCancel context, but ctx sent for logging + if shouldResign { + e.attemptResignLoop(ctx, term) // will resign using WithoutCancel context, but ctx sent for logging } }() - const maxNumErrors = 5 + timer := time.NewTimer(0) + defer timer.Stop() - var ( - numErrors = 0 - timer = time.NewTimer(0) // reset immediately below - ) - <-timer.C + numErrors := 0 + waitDuration := e.config.ElectInterval for { - timer.Reset(e.config.ElectInterval) + resetTimer(timer, waitDuration) select { case <-ctx.Done(): - if !timer.Stop() { - <-timer.C - } - return ctx.Err() - case <-e.requestResignChan: - // Receive a notification telling current leader to resign. + case <-e.wakeupChan: + if !e.takePendingRequestResign() { + continue + } e.Logger.InfoContext(ctx, e.Name+": Current leader received forced resignation", "client_id", e.config.ClientID) - if !timer.Stop() { - <-timer.C - } - // This client may win leadership again, but drop out of this // function and make it start all over. return nil - case <-e.leaderResignedChan: - // Used only in tests for force an immediately reelect attempt. - // - // This differs from the case above in that it drops through to - // attempting to reelect instead of returning from the function. - - if !timer.Stop() { - <-timer.C - } - case <-timer.C: // Reelect timer expired; attempt reelection below. } e.Logger.DebugContext(ctx, e.Name+": Current leader attempting reelect", "client_id", e.config.ClientID) - reelected, err := attemptElectOrReelect(ctx, e.exec, true, &riverdriver.LeaderElectParams{ - LeaderID: e.config.ClientID, - Now: e.Time.NowOrNil(), - Schema: e.config.Schema, - TTL: e.leaderTTL(), - }) + // Use the local monotonic-bearing clock for the trust window. The + // DB-facing timestamp path stays on NowOrNil below. + attemptStarted := e.Time.Now() + attemptTimeout := term.reelectAttemptTimeout(attemptStarted) + if attemptTimeout <= 0 { + e.Logger.WarnContext(ctx, e.Name+": Current leader stepping down because the reelection deadline elapsed", "client_id", e.config.ClientID) + e.testSignals.LostLeadership.Signal(struct{}{}) + return nil + } + + leader, err := attemptReelectWithTimeout(ctx, e.exec, &riverdriver.LeaderReelectParams{ + ElectedAt: term.electedAt, + LeaderID: term.clientID, + Now: e.Time.NowOrNil(), + Schema: e.config.Schema, + TTL: e.leaderTTL(), + }, attemptTimeout) if err != nil { if errors.Is(err, context.Canceled) { return err } + if errors.Is(err, rivertype.ErrNotFound) { + shouldResign = false + e.testSignals.LostLeadership.Signal(struct{}{}) + return nil + } numErrors++ - if numErrors >= maxNumErrors { - return err + sleepDuration := serviceutil.ExponentialBackoff(numErrors, 3) + remainingDuration := term.remaining(e.Time.Now()) + if remainingDuration <= 0 { + e.Logger.WarnContext(ctx, e.Name+": Current leader stepping down because the reelection deadline elapsed after an error", "client_id", e.config.ClientID) + e.testSignals.LostLeadership.Signal(struct{}{}) + return nil } - sleepDuration := serviceutil.ExponentialBackoff(numErrors, 3) e.Logger.ErrorContext(ctx, e.Name+": Error attempting reelection", e.errorSlogArgs(err, numErrors, sleepDuration)...) + if remainingDuration < sleepDuration { + sleepDuration = remainingDuration + } serviceutil.CancellableSleep(ctx, sleepDuration) + if ctx.Err() != nil { + return ctx.Err() + } + + // Retry immediately after the backoff because the time budget for this + // lease has already been reduced by the failed attempt above. + waitDuration = 0 continue } - if !reelected { - lostLeadership = true - e.testSignals.LostLeadership.Signal(struct{}{}) - return errLostLeadershipReelection - } numErrors = 0 + term = newLeadershipTerm(leader.LeaderID, leader.ElectedAt, attemptStarted, e.leaderTTL()) e.testSignals.MaintainedLeadership.Signal(struct{}{}) + waitDuration = e.config.ElectInterval } } @@ -427,7 +576,7 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error { // makes use of a background context to try and guarantee that leadership is // always surrendered in a timely manner so it can be picked up quickly by // another client, even in the event of a cancellation. -func (e *Elector) attemptResignLoop(ctx context.Context) { +func (e *Elector) attemptResignLoop(ctx context.Context, term leadershipTerm) { e.Logger.DebugContext(ctx, e.Name+": Attempting to resign leadership", "client_id", e.config.ClientID) // Make a good faith attempt to resign, even in the presence of errors, but @@ -442,7 +591,7 @@ func (e *Elector) attemptResignLoop(ctx context.Context) { ctx = context.WithoutCancel(ctx) for attempt := 1; attempt <= maxNumErrors; attempt++ { - if err := e.attemptResign(ctx, attempt); err != nil { + if err := e.attemptResign(ctx, attempt, term); err != nil { sleepDuration := serviceutil.ExponentialBackoff(attempt, maxNumErrors) e.Logger.ErrorContext(ctx, e.Name+": Error attempting to resign", e.errorSlogArgs(err, attempt, sleepDuration)...) serviceutil.CancellableSleep(ctx, sleepDuration) @@ -456,7 +605,7 @@ func (e *Elector) attemptResignLoop(ctx context.Context) { // attemptResign attempts to resign any currently held leaderships for the // elector's name and leader ID. -func (e *Elector) attemptResign(ctx context.Context, attempt int) error { +func (e *Elector) attemptResign(ctx context.Context, attempt int, term leadershipTerm) error { // Wait one second longer each time we try to resign: timeout := time.Duration(attempt) * time.Second @@ -464,7 +613,8 @@ func (e *Elector) attemptResign(ctx context.Context, attempt int) error { defer cancel() resigned, err := e.exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{ - LeaderID: e.config.ClientID, + ElectedAt: term.electedAt, + LeaderID: term.clientID, LeadershipTopic: string(notifier.NotificationTopicLeadership), Schema: e.config.Schema, }) @@ -496,8 +646,8 @@ func (e *Elector) errorSlogArgs(err error, attempt int, sleepDuration time.Durat func (e *Elector) Listen() *Subscription { sub := &Subscription{ creationTime: time.Now().UTC(), - ch: make(chan *Notification, 1), e: e, + relay: newSubscriptionRelay(), unlistenOnce: &sync.Once{}, } @@ -508,7 +658,7 @@ func (e *Elector) Listen() *Subscription { IsLeader: e.isLeader, Timestamp: sub.creationTime, } - sub.ch <- initialNotification + sub.enqueue(initialNotification) e.subscriptions = append(e.subscriptions, sub) return sub @@ -519,6 +669,8 @@ func (e *Elector) unlisten(sub *Subscription) { if !success { panic("BUG: tried to unlisten for subscription not in list") } + + sub.stop() } // needs to be in a separate method so the defer will cleanly unlock the mutex, @@ -528,7 +680,7 @@ func (e *Elector) tryUnlisten(sub *Subscription) bool { defer e.mu.Unlock() for i, s := range e.subscriptions { - if s.creationTime.Equal(sub.creationTime) { + if s == sub { e.subscriptions = append(e.subscriptions[:i], e.subscriptions[i+1:]...) return true } @@ -543,33 +695,79 @@ func (e *Elector) leaderTTL() time.Duration { return e.config.ElectInterval + electIntervalTTLPaddingDefault } -func (e *Elector) notifySubscribers(isLeader bool) { +func (e *Elector) markPendingRequestResign() bool { + e.mu.Lock() + defer e.mu.Unlock() + + if !e.isLeader { + return false + } + + e.pendingRequestResign = true + return true +} + +func (e *Elector) publishLeadershipState(isLeader bool) { notifyTime := time.Now().UTC() e.mu.Lock() defer e.mu.Unlock() e.isLeader = isLeader + if !isLeader { + e.pendingRequestResign = false + } + + notification := &Notification{ + IsLeader: isLeader, + Timestamp: notifyTime, + } for _, s := range e.subscriptions { - s.ch <- &Notification{ - IsLeader: isLeader, - Timestamp: notifyTime, - } + s.enqueue(notification) } } +func (e *Elector) clearPendingRequestResign() { + e.mu.Lock() + defer e.mu.Unlock() + + e.pendingRequestResign = false +} + +func (e *Elector) takePendingRequestResign() bool { + e.mu.Lock() + defer e.mu.Unlock() + + if !e.pendingRequestResign { + return false + } + + e.pendingRequestResign = false + return true +} + const deadlineTimeout = 5 * time.Second -// attemptElectOrReelect attempts to elect a leader for the given name. The -// bool alreadyElected indicates whether this is a potential reelection of -// an already-elected leader. If the election is successful because there is -// no leader or the previous leader expired, the provided leaderID will be -// set as the new leader with a TTL of ttl. -// -// Returns whether this leader was successfully elected or an error if one -// occurred. -func attemptElectOrReelect(ctx context.Context, exec riverdriver.Executor, alreadyElected bool, params *riverdriver.LeaderElectParams) (bool, error) { - ctx, cancel := context.WithTimeout(ctx, deadlineTimeout) +func resetTimer(timer *time.Timer, duration time.Duration) { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + + timer.Reset(duration) +} + +// attemptElect attempts to elect a leader for the given name. If there is no +// current leader or the previous leader expired, the provided leader ID is set +// as the new leader with a TTL of `params.TTL`. +func attemptElect(ctx context.Context, exec riverdriver.Executor, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) { + return attemptElectWithTimeout(ctx, exec, params, deadlineTimeout) +} + +func attemptElectWithTimeout(ctx context.Context, exec riverdriver.Executor, params *riverdriver.LeaderElectParams, timeout time.Duration) (*riverdriver.Leader, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() execTx, err := exec.Begin(ctx) @@ -579,7 +777,7 @@ func attemptElectOrReelect(ctx context.Context, exec riverdriver.Executor, alrea additionalDetail = " (a common cause of this is a database pool that's at its connection limit; you may need to increase maximum connections)" } - return false, fmt.Errorf("error beginning transaction: %w%s", err, additionalDetail) + return nil, fmt.Errorf("error beginning transaction: %w%s", err, additionalDetail) } defer dbutil.RollbackWithoutCancel(ctx, execTx) @@ -587,22 +785,26 @@ func attemptElectOrReelect(ctx context.Context, exec riverdriver.Executor, alrea Now: params.Now, Schema: params.Schema, }); err != nil { - return false, err + return nil, err } - var elected bool - if alreadyElected { - elected, err = execTx.LeaderAttemptReelect(ctx, params) - } else { - elected, err = execTx.LeaderAttemptElect(ctx, params) + leader, err := execTx.LeaderAttemptElect(ctx, params) + if err != nil && !errors.Is(err, rivertype.ErrNotFound) { + return nil, err + } + if err := execTx.Commit(ctx); err != nil { + return nil, fmt.Errorf("error committing transaction: %w", err) } if err != nil { - return false, err + return nil, err } - if err := execTx.Commit(ctx); err != nil { - return false, fmt.Errorf("error committing transaction: %w", err) - } + return leader, nil +} + +func attemptReelectWithTimeout(ctx context.Context, exec riverdriver.Executor, params *riverdriver.LeaderReelectParams, timeout time.Duration) (*riverdriver.Leader, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() - return elected, nil + return exec.LeaderAttemptReelect(ctx, params) } diff --git a/internal/leadership/elector_test.go b/internal/leadership/elector_test.go index 481471b8..f0255063 100644 --- a/internal/leadership/elector_test.go +++ b/internal/leadership/elector_test.go @@ -3,12 +3,11 @@ package leadership import ( "context" "encoding/json" - "log/slog" + "errors" "testing" "time" "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/require" "github.com/riverqueue/river/internal/notifier" @@ -20,10 +19,74 @@ import ( "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/startstoptest" "github.com/riverqueue/river/rivershared/testfactory" + "github.com/riverqueue/river/rivershared/util/dbutil" "github.com/riverqueue/river/rivershared/util/ptrutil" "github.com/riverqueue/river/rivertype" ) +type leaderAttemptScriptExecutorMock struct { + riverdriver.Executor + + LeaderAttemptElectFunc func(ctx context.Context, execTx riverdriver.ExecutorTx, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) +} + +func (m *leaderAttemptScriptExecutorMock) Begin(ctx context.Context) (riverdriver.ExecutorTx, error) { + tx, err := m.Executor.Begin(ctx) + if err != nil { + return nil, err + } + + return &leaderAttemptScriptExecutorTxMock{ + ExecutorTx: tx, + mock: m, + }, nil +} + +type leaderAttemptScriptExecutorTxMock struct { + riverdriver.ExecutorTx + + mock *leaderAttemptScriptExecutorMock +} + +func (m *leaderAttemptScriptExecutorTxMock) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) { + if m.mock.LeaderAttemptElectFunc == nil { + return m.ExecutorTx.LeaderAttemptElect(ctx, params) + } + + return m.mock.LeaderAttemptElectFunc(ctx, m.ExecutorTx, params) +} + +type leaderReelectExecutorMock struct { + riverdriver.Executor + + LeaderAttemptReelectFunc func(ctx context.Context, params *riverdriver.LeaderReelectParams) (*riverdriver.Leader, error) +} + +func (m *leaderReelectExecutorMock) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderReelectParams) (*riverdriver.Leader, error) { + if m.LeaderAttemptReelectFunc == nil { + return m.Executor.LeaderAttemptReelect(ctx, params) + } + + return m.LeaderAttemptReelectFunc(ctx, params) +} + +type localNowTimeGeneratorStub struct { + now time.Time +} + +func (g *localNowTimeGeneratorStub) Now() time.Time { + return g.now +} + +func (g *localNowTimeGeneratorStub) NowOrNil() *time.Time { + return nil +} + +func (g *localNowTimeGeneratorStub) StubNow(now time.Time) time.Time { + g.now = now + return now +} + func TestElector_PollOnly(t *testing.T) { t.Parallel() @@ -40,15 +103,13 @@ func TestElector_PollOnly(t *testing.T) { func(ctx context.Context, t *testing.T, stress bool) *electorBundle { t.Helper() - tx := riverdbtest.TestTxPgx(ctx, t) + tx, _ := riverdbtest.TestTxPgxDriver(ctx, t, riverpgxv5.New(riversharedtest.DBPool(ctx, t)), &riverdbtest.TestTxOpts{ + DisableSchemaSharing: true, + }) - // We'll put multiple electors on one transaction. Make sure they can - // live with each other in relative harmony. tx = sharedtx.NewSharedTx(tx) - return &electorBundle{ - tx: tx, - } + return &electorBundle{tx: tx} }, func(t *testing.T, electorBundle *electorBundle) *Elector { t.Helper() @@ -57,11 +118,334 @@ func TestElector_PollOnly(t *testing.T) { riversharedtest.BaseServiceArchetype(t), driver.UnwrapExecutor(electorBundle.tx), nil, - &Config{ - ClientID: "test_client_id", - }, + &Config{ClientID: "test_client_id"}, ) + }, + ) +} + +func TestElectorHandleLeadershipNotification(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + driver *riverpgxv5.Driver + tx pgx.Tx + } + + setup := func(t *testing.T) (*Elector, *testBundle) { + t.Helper() + + tx := riverdbtest.TestTxPgx(ctx, t) + driver := riverpgxv5.New(nil) + + elector := NewElector( + riversharedtest.BaseServiceArchetype(t), + driver.UnwrapExecutor(tx), + nil, + &Config{ClientID: "test_client_id"}, + ) + elector.wakeupChan = make(chan struct{}, 1) + + return elector, &testBundle{ + driver: driver, + tx: tx, + } + } + + mustMarshalJSON := func(t *testing.T, val any) []byte { + t.Helper() + + data, err := json.Marshal(val) + require.NoError(t, err) + return data + } + + validLeadershipChange := func() *DBNotification { + t.Helper() + + return &DBNotification{ + Action: DBNotificationKindResigned, + LeaderID: "other-client-id", + } + } + + t.Run("IgnoresNonResignedAction", func(t *testing.T) { + t.Parallel() + + elector, _ := setup(t) + + change := validLeadershipChange() + change.Action = "not_resigned" + + elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, change))) + + require.Empty(t, elector.wakeupChan) + }) + + t.Run("IgnoresSameClientID", func(t *testing.T) { + t.Parallel() + + elector, _ := setup(t) + + change := validLeadershipChange() + change.LeaderID = elector.config.ClientID + + elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, change))) + + require.Empty(t, elector.wakeupChan) + }) + + t.Run("SignalsLeadershipChange", func(t *testing.T) { + t.Parallel() + + elector, _ := setup(t) + + elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, validLeadershipChange()))) + + riversharedtest.WaitOrTimeout(t, elector.wakeupChan) + }) + + t.Run("SignalsLeadershipChangeDoesNotBlockOnFullWakeup", func(t *testing.T) { + t.Parallel() + + elector, _ := setup(t) + elector.wakeupChan <- struct{}{} + + done := make(chan struct{}) + + go func() { + defer close(done) + elector.handleLeadershipNotification(context.Background(), notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, validLeadershipChange()))) + }() + + select { + case <-done: + case <-time.After(100 * time.Millisecond): + require.Fail(t, "expected leadership notification to coalesce the wakeup instead of blocking") + } + + require.Len(t, elector.wakeupChan, 1) + }) + + t.Run("StopsOnContextDone", func(t *testing.T) { + t.Parallel() + + elector, _ := setup(t) + + ctx, cancel := context.WithCancel(ctx) + cancel() + + elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, validLeadershipChange()))) + + require.Empty(t, elector.wakeupChan) + }) +} + +func TestElectorRunLeaderState(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + setup := func(t *testing.T) (*Elector, riverdriver.Executor) { + t.Helper() + + driver := riverpgxv5.New(nil) + exec := driver.UnwrapExecutor(riverdbtest.TestTxPgx(ctx, t)) + + elector := NewElector( + riversharedtest.BaseServiceArchetype(t), + exec, + nil, + &Config{ClientID: "test_client_id"}, + ) + elector.config.ElectInterval = 10 * time.Millisecond + elector.config.ElectIntervalJitter = time.Millisecond + elector.testSignals.Init(t) + + return elector, exec + } + + t.Run("ResignsCurrentTermAfterReelectErrorsExhaustTrust", func(t *testing.T) { + t.Parallel() + + elector, exec := setup(t) + initialNow := elector.Time.StubNow(time.Now().UTC()) + elector.publishLeadershipState(true) + + leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + ElectedAt: ptrutil.Ptr(initialNow), + ExpiresAt: ptrutil.Ptr(initialNow.Add(elector.leaderTTL())), + LeaderID: ptrutil.Ptr(elector.config.ClientID), }) + + elector.exec = &leaderReelectExecutorMock{ + Executor: exec, + LeaderAttemptReelectFunc: func(ctx context.Context, params *riverdriver.LeaderReelectParams) (*riverdriver.Leader, error) { + elector.Time.StubNow(initialNow.Add(elector.leaderTTL())) + return nil, errors.New("reelection error") + }, + } + + runCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- elector.runLeaderState(runCtx, newLeadershipTerm(elector.config.ClientID, leader.ElectedAt, initialNow, elector.leaderTTL())) + }() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(time.Second): + require.Fail(t, "timed out waiting for leader state to exit") + } + + elector.testSignals.LostLeadership.WaitOrTimeout() + elector.testSignals.ResignedLeadership.WaitOrTimeout() + + _, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + }) + + t.Run("SlowSuccessfulReelectDoesNotExtendTrustWindow", func(t *testing.T) { + t.Parallel() + + elector, exec := setup(t) + initialNow := elector.Time.StubNow(time.Now().UTC()) + elector.publishLeadershipState(true) + + leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + ElectedAt: ptrutil.Ptr(initialNow), + ExpiresAt: ptrutil.Ptr(initialNow.Add(elector.leaderTTL())), + LeaderID: ptrutil.Ptr(elector.config.ClientID), + }) + + var numAttempts int + elector.exec = &leaderReelectExecutorMock{ + Executor: exec, + LeaderAttemptReelectFunc: func(ctx context.Context, params *riverdriver.LeaderReelectParams) (*riverdriver.Leader, error) { + numAttempts++ + + switch numAttempts { + case 1: + elector.Time.StubNow(initialNow.Add(elector.leaderTTL())) + return exec.LeaderAttemptReelect(ctx, params) + default: + return nil, errors.New("unexpected reelection attempt after trust window elapsed") + } + }, + } + + runCtx, cancel := context.WithTimeout(ctx, 250*time.Millisecond) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- elector.runLeaderState(runCtx, newLeadershipTerm(elector.config.ClientID, leader.ElectedAt, initialNow, elector.leaderTTL())) + }() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(time.Second): + require.Fail(t, "timed out waiting for leader state to exit") + } + + require.Equal(t, 1, numAttempts) + elector.testSignals.LostLeadership.WaitOrTimeout() + elector.testSignals.ResignedLeadership.WaitOrTimeout() + + _, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + }) + + t.Run("UsesNowForLocalDeadlineChecks", func(t *testing.T) { + t.Parallel() + + driver := riverpgxv5.New(nil) + exec := driver.UnwrapExecutor(riverdbtest.TestTxPgx(ctx, t)) + + timeGenerator := &localNowTimeGeneratorStub{now: time.Now()} + archetype := riversharedtest.BaseServiceArchetype(t) + archetype.Time = timeGenerator + + elector := NewElector( + archetype, + &leaderReelectExecutorMock{ + Executor: exec, + LeaderAttemptReelectFunc: func(ctx context.Context, params *riverdriver.LeaderReelectParams) (*riverdriver.Leader, error) { + require.Fail(t, "unexpected reelection attempt") + panic("unreachable") + }, + }, + nil, + &Config{ClientID: "test_client_id"}, + ) + elector.config.ElectInterval = 10 * time.Millisecond + elector.testSignals.Init(t) + + elector.publishLeadershipState(true) + + runCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + err := elector.runLeaderState(runCtx, leadershipTerm{ + clientID: elector.config.ClientID, + electedAt: time.Now(), + trustedUntil: timeGenerator.now.Add(-time.Millisecond), + }) + require.NoError(t, err) + + elector.testSignals.LostLeadership.WaitOrTimeout() + }) +} + +func TestElectorSubscriptions(t *testing.T) { + t.Parallel() + + setup := func(t *testing.T) *Elector { + t.Helper() + + return NewElector( + riversharedtest.BaseServiceArchetype(t), + nil, + nil, + &Config{ClientID: "test_client_id"}, + ) + } + + t.Run("SlowSubscribersDoNotBlockAndReceiveTransitionsInOrder", func(t *testing.T) { + t.Parallel() + + elector := setup(t) + sub := elector.Listen() + t.Cleanup(sub.Unlisten) + + done := make(chan struct{}) + go func() { + defer close(done) + elector.publishLeadershipState(true) + elector.publishLeadershipState(false) + }() + + select { + case <-done: + case <-time.After(100 * time.Millisecond): + require.Fail(t, "expected leadership publication to queue without blocking") + } + + notification := riversharedtest.WaitOrTimeout(t, sub.C()) + require.False(t, notification.IsLeader) + + notification = riversharedtest.WaitOrTimeout(t, sub.C()) + require.True(t, notification.IsLeader) + + notification = riversharedtest.WaitOrTimeout(t, sub.C()) + require.False(t, notification.IsLeader) + }) } func TestElector_WithNotifier(t *testing.T) { @@ -80,12 +464,7 @@ func TestElector_WithNotifier(t *testing.T) { func(ctx context.Context, t *testing.T, stress bool) *electorBundle { t.Helper() - var dbPool *pgxpool.Pool - if stress { - dbPool = riversharedtest.DBPoolClone(ctx, t) - } else { - dbPool = riversharedtest.DBPool(ctx, t) - } + dbPool := riversharedtest.DBPoolClone(ctx, t) var ( driver = riverpgxv5.New(dbPool) @@ -93,16 +472,14 @@ func TestElector_WithNotifier(t *testing.T) { archetype = riversharedtest.BaseServiceArchetype(t) ) - notifier := notifier.New(archetype, driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema})) - { - require.NoError(t, notifier.Start(ctx)) - t.Cleanup(notifier.Stop) - } + notifierSvc := notifier.New(archetype, driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema})) + require.NoError(t, notifierSvc.Start(ctx)) + t.Cleanup(notifierSvc.Stop) return &electorBundle{ archetype: archetype, exec: driver.GetExecutor(), - notifier: notifier, + notifier: notifierSvc, schema: schema, } }, @@ -118,12 +495,10 @@ func TestElector_WithNotifier(t *testing.T) { Schema: electorBundle.schema, }, ) - }) + }, + ) } -// This system of "elector bundles" may appear to be a little convoluted, but -// it's built so that we can initialize multiple electors against a single -// database or transaction. func testElector[TElectorBundle any]( ctx context.Context, t *testing.T, @@ -149,7 +524,6 @@ func testElector[TElectorBundle any]( } electorBundle := makeElectorBundle(ctx, t, opts.stress) - elector := makeElector(t, electorBundle) elector.testSignals.Init(t) @@ -161,187 +535,247 @@ func testElector[TElectorBundle any]( startElector := func(ctx context.Context, t *testing.T, elector *Elector) { t.Helper() - t.Logf("Starting %s", elector.config.ClientID) + require.NoError(t, elector.Start(ctx)) t.Cleanup(elector.Stop) } - t.Run("StartsGainsLeadershipAndStops", func(t *testing.T) { - t.Parallel() - - elector, bundle := setup(t, nil) - - startElector(ctx, t, elector) - - elector.testSignals.GainedLeadership.WaitOrTimeout() + signalLeaderResigned := func(ctx context.Context, t *testing.T, elector *Elector, leaderID string) { + t.Helper() - leader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ - Schema: elector.config.Schema, + payload, err := json.Marshal(DBNotification{ + Action: DBNotificationKindResigned, + LeaderID: leaderID, }) require.NoError(t, err) - require.Equal(t, elector.config.ClientID, leader.LeaderID) - elector.Stop() + elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(payload)) + } - elector.testSignals.ResignedLeadership.WaitOrTimeout() + signalRequestResign := func(ctx context.Context, t *testing.T, elector *Elector) { + t.Helper() + + payload, err := json.Marshal(DBNotification{Action: DBNotificationKindRequestResign}) + require.NoError(t, err) - _, err = bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ - Schema: elector.config.Schema, - }) - require.ErrorIs(t, err, rivertype.ErrNotFound) - }) + elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(payload)) + } - t.Run("NotifiesSubscribers", func(t *testing.T) { + t.Run("CoalescesResignedWakeups", func(t *testing.T) { t.Parallel() elector, _ := setup(t, nil) + elector.config.ElectInterval = 100 * time.Millisecond + elector.config.ElectIntervalJitter = 10 * time.Millisecond + elector.wakeupChan = make(chan struct{}, 1) - sub := elector.Listen() - t.Cleanup(func() { elector.unlisten(sub) }) + var ( + attempt int + attemptTimes []time.Time + ) - // Drain an initial notification that occurs on Listen. - notification := riversharedtest.WaitOrTimeout(t, sub.ch) - require.False(t, notification.IsLeader) + elector.exec = &leaderAttemptScriptExecutorMock{ + Executor: elector.exec, + LeaderAttemptElectFunc: func(ctx context.Context, execTx riverdriver.ExecutorTx, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) { + attempt++ + attemptTimes = append(attemptTimes, time.Now()) + + switch attempt { + case 1, 2: + return nil, rivertype.ErrNotFound + case 3: + return execTx.LeaderInsert(ctx, &riverdriver.LeaderInsertParams{ + LeaderID: params.LeaderID, + Now: params.Now, + Schema: params.Schema, + TTL: params.TTL, + }) + default: + require.FailNowf(t, "unexpected election attempt", "attempt %d", attempt) + panic("unreachable") + } + }, + } - startElector(ctx, t, elector) + signalLeaderResigned(ctx, t, elector, "leader-1") - elector.testSignals.GainedLeadership.WaitOrTimeout() + secondNotificationDone := make(chan struct{}) + go func() { + defer close(secondNotificationDone) + signalLeaderResigned(ctx, t, elector, "leader-2") + }() - notification = riversharedtest.WaitOrTimeout(t, sub.ch) - require.True(t, notification.IsLeader) + select { + case <-secondNotificationDone: + case <-time.After(50 * time.Millisecond): + require.Fail(t, "expected second resignation notification to be coalesced instead of blocking") + } - elector.Stop() + attemptCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() - elector.testSignals.ResignedLeadership.WaitOrTimeout() + start := time.Now() + term, err := elector.runFollowerState(attemptCtx) + require.NoError(t, err) + require.Equal(t, elector.config.ClientID, term.clientID) + require.False(t, term.electedAt.IsZero()) + require.GreaterOrEqual(t, time.Since(start), 75*time.Millisecond) - notification = riversharedtest.WaitOrTimeout(t, sub.ch) - require.False(t, notification.IsLeader) + require.Equal(t, 3, attempt) + require.Len(t, attemptTimes, 3) + require.GreaterOrEqual(t, attemptTimes[2].Sub(attemptTimes[1]), 75*time.Millisecond) }) - t.Run("SustainsLeadership", func(t *testing.T) { + t.Run("CompetingElectors", func(t *testing.T) { t.Parallel() - elector, _ := setup(t, nil) + elector1, bundle := setup(t, nil) + elector1.config.ClientID = "elector1" - startElector(ctx, t, elector) + startElector(ctx, t, elector1) + elector1.testSignals.GainedLeadership.WaitOrTimeout() - elector.testSignals.GainedLeadership.WaitOrTimeout() + leader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ + Schema: elector1.config.Schema, + }) + require.NoError(t, err) + require.Equal(t, elector1.config.ClientID, leader.LeaderID) - // The leadership maintenance loop also listens on the leadership - // notification channel. Take advantage of that to cause an - // immediate reelect attempt with no sleep. - elector.leaderResignedChan <- struct{}{} - elector.testSignals.MaintainedLeadership.WaitOrTimeout() + elector2 := makeElector(t, bundle.electorBundle) + elector2.config.ClientID = "elector2" + elector2.config.ElectInterval = 10 * time.Millisecond + elector2.config.ElectIntervalJitter = time.Millisecond + elector2.exec = elector1.exec + elector2.testSignals.Init(t) - elector.leaderResignedChan <- struct{}{} - elector.testSignals.MaintainedLeadership.WaitOrTimeout() + startElector(ctx, t, elector2) - elector.leaderResignedChan <- struct{}{} - elector.testSignals.MaintainedLeadership.WaitOrTimeout() + elector2.testSignals.DeniedLeadership.WaitOrTimeout() - elector.Stop() + elector1.Stop() + elector1.testSignals.ResignedLeadership.WaitOrTimeout() - elector.testSignals.ResignedLeadership.WaitOrTimeout() + elector2.testSignals.GainedLeadership.WaitOrTimeout() + + elector2.Stop() + elector2.testSignals.ResignedLeadership.WaitOrTimeout() + + _, err = bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ + Schema: elector1.config.Schema, + }) + require.ErrorIs(t, err, rivertype.ErrNotFound) }) - t.Run("LosesLeadership", func(t *testing.T) { + t.Run("IndependentBundlesAreIsolated", func(t *testing.T) { t.Parallel() - elector, bundle := setup(t, nil) + elector1, bundle1 := setup(t, nil) + elector1.config.ClientID = "elector1" - startElector(ctx, t, elector) + elector2, bundle2 := setup(t, nil) + elector2.config.ClientID = "elector2" - elector.testSignals.GainedLeadership.WaitOrTimeout() + startElector(ctx, t, elector1) + startElector(ctx, t, elector2) - t.Logf("Force resigning %s", elector.config.ClientID) + elector1.testSignals.GainedLeadership.WaitOrTimeout() + elector2.testSignals.GainedLeadership.WaitOrTimeout() - // Artificially force resign the elector and add a new leader record - // so that it can't be elected again. - _, err := bundle.exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{ - LeaderID: elector.config.ClientID, - LeadershipTopic: string(notifier.NotificationTopicLeadership), - Schema: elector.config.Schema, + leader1, err := bundle1.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ + Schema: elector1.config.Schema, }) require.NoError(t, err) + require.Equal(t, elector1.config.ClientID, leader1.LeaderID) - _ = testfactory.Leader(ctx, t, bundle.exec, &testfactory.LeaderOpts{ - LeaderID: ptrutil.Ptr("other-client-id"), - Schema: elector.config.Schema, + leader2, err := bundle2.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ + Schema: elector2.config.Schema, }) - - elector.leaderResignedChan <- struct{}{} - elector.testSignals.LostLeadership.WaitOrTimeout() - - // Wait for the elector to try and fail to gain leadership so we - // don't finish the test while it's still operating. - elector.testSignals.DeniedLeadership.WaitOrTimeout() - - elector.Stop() + require.NoError(t, err) + require.Equal(t, elector2.config.ClientID, leader2.LeaderID) }) - t.Run("CompetingElectors", func(t *testing.T) { + t.Run("LosesLeadershipWhenSameLeaderIDTermIsReplaced", func(t *testing.T) { t.Parallel() - elector1, bundle := setup(t, nil) - elector1.config.ClientID = "elector1" - - { - startElector(ctx, t, elector1) - - // next to avoid any raciness. - t.Logf("Waiting for %s to gain leadership", elector1.config.ClientID) - elector1.testSignals.GainedLeadership.WaitOrTimeout() + elector, bundle := setup(t, nil) + elector.config.ElectInterval = 25 * time.Millisecond + elector.config.ElectIntervalJitter = time.Millisecond - leader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ - Schema: elector1.config.Schema, - }) - require.NoError(t, err) - require.Equal(t, elector1.config.ClientID, leader.LeaderID) - } + sub := elector.Listen() + t.Cleanup(sub.Unlisten) - // Make another elector and make sure it's using the same executor. - elector2 := makeElector(t, bundle.electorBundle) - elector2.config.ClientID = "elector2" - elector2.exec = elector1.exec - elector2.testSignals.Init(t) + require.False(t, riversharedtest.WaitOrTimeout(t, sub.C()).IsLeader) - { - startElector(ctx, t, elector2) + startElector(ctx, t, elector) + elector.testSignals.GainedLeadership.WaitOrTimeout() + require.True(t, riversharedtest.WaitOrTimeout(t, sub.C()).IsLeader) - elector2.testSignals.DeniedLeadership.WaitOrTimeout() + leader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ + Schema: elector.config.Schema, + }) + require.NoError(t, err) - t.Logf("Stopping %s", elector1.config.ClientID) - elector1.Stop() - elector1.testSignals.ResignedLeadership.WaitOrTimeout() + newElectedAt := leader.ElectedAt.Add(time.Second) + newExpiresAt := newElectedAt.Add(elector.leaderTTL()) - // Cheat if we're in poll only by notifying leadership channel to - // wake the elector from sleep. - if elector2.notifier == nil { - elector2.leaderResignedChan <- struct{}{} + require.NoError(t, dbutil.WithTx(ctx, bundle.exec, func(ctx context.Context, execTx riverdriver.ExecutorTx) error { + resigned, err := execTx.LeaderResign(ctx, &riverdriver.LeaderResignParams{ + ElectedAt: leader.ElectedAt, + LeaderID: leader.LeaderID, + LeadershipTopic: string(notifier.NotificationTopicLeadership), + Schema: elector.config.Schema, + }) + if err != nil { + return err + } + if !resigned { + return errors.New("expected leader replacement to resign current term") } - t.Logf("Waiting for %s to gain leadership", elector2.config.ClientID) - elector2.testSignals.GainedLeadership.WaitOrTimeout() + _, err = execTx.LeaderInsert(ctx, &riverdriver.LeaderInsertParams{ + ElectedAt: &newElectedAt, + ExpiresAt: &newExpiresAt, + LeaderID: leader.LeaderID, + Schema: elector.config.Schema, + TTL: elector.leaderTTL(), + }) + return err + })) - t.Logf("Stopping %s", elector2.config.ClientID) - elector2.Stop() - elector2.testSignals.ResignedLeadership.WaitOrTimeout() - } + elector.testSignals.LostLeadership.WaitOrTimeout() + require.False(t, riversharedtest.WaitOrTimeout(t, sub.C()).IsLeader) + elector.testSignals.DeniedLeadership.WaitOrTimeout() - _, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ - Schema: elector1.config.Schema, + leaderFromDB, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ + Schema: elector.config.Schema, }) - require.ErrorIs(t, err, rivertype.ErrNotFound) + require.NoError(t, err) + require.Equal(t, leader.LeaderID, leaderFromDB.LeaderID) + require.Equal(t, newElectedAt, leaderFromDB.ElectedAt) }) - t.Run("StartStopStress", func(t *testing.T) { + t.Run("NotifiesSubscribers", func(t *testing.T) { t.Parallel() - elector, _ := setup(t, &testOpts{stress: true}) - elector.Logger = riversharedtest.LoggerWarn(t) // loop started/stop log is very noisy; suppress - elector.testSignals = electorTestSignals{} // deinit so channels don't fill + elector, _ := setup(t, nil) - startstoptest.Stress(ctx, t, elector) + sub := elector.Listen() + t.Cleanup(sub.Unlisten) + + notification := riversharedtest.WaitOrTimeout(t, sub.C()) + require.False(t, notification.IsLeader) + + startElector(ctx, t, elector) + elector.testSignals.GainedLeadership.WaitOrTimeout() + + notification = riversharedtest.WaitOrTimeout(t, sub.C()) + require.True(t, notification.IsLeader) + + elector.Stop() + elector.testSignals.ResignedLeadership.WaitOrTimeout() + + notification = riversharedtest.WaitOrTimeout(t, sub.C()) + require.False(t, notification.IsLeader) }) t.Run("RequestResignImmediatelyAfterElection", func(t *testing.T) { @@ -350,20 +784,9 @@ func testElector[TElectorBundle any]( elector, _ := setup(t, nil) startElector(ctx, t, elector) - elector.testSignals.GainedLeadership.WaitOrTimeout() - // Send a resign request immediately after gaining leadership. - // GainedLeadership is signaled _before_ keepLeadershipLoop is - // entered, so the resign request arrives before the loop's - // select. This only works if requestResignChan is buffered; - // with an unbuffered channel the send would be dropped by the - // default case since nobody is receiving yet. - payload, err := json.Marshal(DBNotification{ - Action: DBNotificationKindRequestResign, - }) - require.NoError(t, err) - elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(payload)) + signalRequestResign(ctx, t, elector) elector.testSignals.ResignedLeadership.WaitOrTimeout() elector.testSignals.GainedLeadership.WaitOrTimeout() @@ -375,7 +798,6 @@ func testElector[TElectorBundle any]( elector, bundle := setup(t, nil) startElector(ctx, t, elector) - elector.testSignals.GainedLeadership.WaitOrTimeout() leader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ @@ -384,21 +806,8 @@ func testElector[TElectorBundle any]( require.NoError(t, err) require.Equal(t, elector.config.ClientID, leader.LeaderID) - payload, err := json.Marshal(DBNotification{ - Action: DBNotificationKindRequestResign, - }) - require.NoError(t, err) - - // Send a bunch of resign requests consecutively. Only one will have an - // effect as the channel's size is only 1, and any extra items in it - // will be drained as the leader resigns, making it start empty the next - // time the leader starts up again. - // - // Wait for the elector to resign leadership, gain leadership again, - // then stop the elector and wait for it to resign one more time. for range 5 { - t.Log("Requesting leadership resign") - elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(payload)) + signalRequestResign(ctx, t, elector) } elector.testSignals.ResignedLeadership.WaitOrTimeout() elector.testSignals.GainedLeadership.WaitOrTimeout() @@ -411,208 +820,73 @@ func testElector[TElectorBundle any]( }) require.ErrorIs(t, err, rivertype.ErrNotFound) }) -} - -func TestAttemptElectOrReelect(t *testing.T) { - t.Parallel() - - const ( - clientID = "client-id" - leaderInstanceName = "default" - leaderTTL = 10 * time.Second - ) - - ctx := context.Background() - - type testBundle struct { - exec riverdriver.Executor - logger *slog.Logger - } - - setup := func(t *testing.T) *testBundle { - t.Helper() - - driver := riverpgxv5.New(nil) - - return &testBundle{ - exec: driver.UnwrapExecutor(riverdbtest.TestTxPgx(ctx, t)), - logger: riversharedtest.Logger(t), - } - } - t.Run("ElectsLeader", func(t *testing.T) { + t.Run("RequestResignWhileLeader", func(t *testing.T) { t.Parallel() - bundle := setup(t) - - elected, err := attemptElectOrReelect(ctx, bundle.exec, false, &riverdriver.LeaderElectParams{ - LeaderID: clientID, - TTL: leaderTTL, - Schema: "", - }) - require.NoError(t, err) - require.True(t, elected) // won election - - leader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ - Schema: "", - }) - require.NoError(t, err) - require.WithinDuration(t, time.Now(), leader.ElectedAt, 1*time.Second) - require.WithinDuration(t, time.Now().Add(leaderTTL), leader.ExpiresAt, 1*time.Second) - }) - - t.Run("ReelectsSameLeader", func(t *testing.T) { - t.Parallel() + elector, _ := setup(t, nil) + elector.config.ElectInterval = 10 * time.Millisecond + elector.config.ElectIntervalJitter = time.Millisecond - bundle := setup(t) + startElector(ctx, t, elector) - leader := testfactory.Leader(ctx, t, bundle.exec, &testfactory.LeaderOpts{ - LeaderID: ptrutil.Ptr(clientID), - Schema: "", - }) + elector.testSignals.GainedLeadership.WaitOrTimeout() + elector.testSignals.MaintainedLeadership.WaitOrTimeout() - // Re-elect the same leader. Use a larger TTL to see if time is updated, - // because we are in a test transaction and the time is frozen at the start of - // the transaction. - elected, err := attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{ - LeaderID: clientID, - TTL: 30 * time.Second, - Schema: "", - }) - require.NoError(t, err) - require.True(t, elected) // won re-election + signalRequestResign(ctx, t, elector) - // expires_at should be incremented because this is the same leader that won - // previously and we specified that we're already elected: - updatedLeader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ - Schema: "", - }) - require.NoError(t, err) - require.Greater(t, updatedLeader.ExpiresAt, leader.ExpiresAt) + elector.testSignals.ResignedLeadership.WaitOrTimeout() + elector.testSignals.GainedLeadership.WaitOrTimeout() }) - t.Run("CannotElectDifferentLeader", func(t *testing.T) { + t.Run("StartsGainsLeadershipAndStops", func(t *testing.T) { t.Parallel() - bundle := setup(t) - - leader := testfactory.Leader(ctx, t, bundle.exec, &testfactory.LeaderOpts{ - LeaderID: ptrutil.Ptr(clientID), - Schema: "", - }) + elector, bundle := setup(t, nil) - elected, err := attemptElectOrReelect(ctx, bundle.exec, true, &riverdriver.LeaderElectParams{ - LeaderID: "different-client-id", - TTL: leaderTTL, - Schema: "", - }) - require.NoError(t, err) - require.False(t, elected) // lost election + startElector(ctx, t, elector) + elector.testSignals.GainedLeadership.WaitOrTimeout() - // The time should not have changed because we specified that we were not - // already elected, and the elect query is a no-op if there's already a - // updatedLeader: - updatedLeader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ - Schema: "", + leader, err := bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ + Schema: elector.config.Schema, }) require.NoError(t, err) - require.Equal(t, leader.ExpiresAt, updatedLeader.ExpiresAt) - }) -} - -func TestElectorHandleLeadershipNotification(t *testing.T) { - t.Parallel() - - var ( - ctx = context.Background() - driver = riverpgxv5.New(nil) - ) - - type testBundle struct{} - - setup := func(t *testing.T) (*Elector, *testBundle) { - t.Helper() - - tx := riverdbtest.TestTxPgx(ctx, t) - - elector := NewElector( - riversharedtest.BaseServiceArchetype(t), - driver.UnwrapExecutor(tx), - nil, - &Config{ClientID: "test_client_id"}, - ) - - // These channels are normally only initialized on start, so we need to - // create it manually here. - elector.requestResignChan = make(chan struct{}, 1) - elector.leaderResignedChan = make(chan struct{}, 1) - - return elector, &testBundle{} - } - - mustMarshalJSON := func(t *testing.T, val any) []byte { - t.Helper() - - data, err := json.Marshal(val) - require.NoError(t, err) - return data - } - - validLeadershipChange := func() *DBNotification { - t.Helper() - - return &DBNotification{ - Action: DBNotificationKindResigned, - LeaderID: "other-client-id", - } - } - - t.Run("SignalsLeadershipChange", func(t *testing.T) { - t.Parallel() - - elector, _ := setup(t) - - elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, validLeadershipChange()))) - - riversharedtest.WaitOrTimeout(t, elector.leaderResignedChan) - }) - - t.Run("StopsOnContextDone", func(t *testing.T) { - t.Parallel() - - elector, _ := setup(t) - - ctx, cancel := context.WithCancel(ctx) - cancel() // cancel immediately + require.Equal(t, elector.config.ClientID, leader.LeaderID) - elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, validLeadershipChange()))) + elector.Stop() + elector.testSignals.ResignedLeadership.WaitOrTimeout() - require.Empty(t, elector.leaderResignedChan) + _, err = bundle.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{ + Schema: elector.config.Schema, + }) + require.ErrorIs(t, err, rivertype.ErrNotFound) }) - t.Run("IgnoresNonResignedAction", func(t *testing.T) { + t.Run("StartStopStress", func(t *testing.T) { t.Parallel() - elector, _ := setup(t) - - change := validLeadershipChange() - change.Action = "not_resigned" - - elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, change))) + elector, _ := setup(t, &testOpts{stress: true}) + elector.Logger = riversharedtest.LoggerWarn(t) + elector.testSignals = electorTestSignals{} - require.Empty(t, elector.leaderResignedChan) + startstoptest.Stress(ctx, t, elector) }) - t.Run("IgnoresSameClientID", func(t *testing.T) { + t.Run("SustainsLeadership", func(t *testing.T) { t.Parallel() - elector, _ := setup(t) + elector, _ := setup(t, nil) + elector.config.ElectInterval = 10 * time.Millisecond + elector.config.ElectIntervalJitter = time.Millisecond - change := validLeadershipChange() - change.LeaderID = elector.config.ClientID + startElector(ctx, t, elector) + elector.testSignals.GainedLeadership.WaitOrTimeout() - elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, change))) + elector.testSignals.MaintainedLeadership.WaitOrTimeout() + elector.testSignals.MaintainedLeadership.WaitOrTimeout() + elector.testSignals.MaintainedLeadership.WaitOrTimeout() - require.Empty(t, elector.leaderResignedChan) + elector.Stop() + elector.testSignals.ResignedLeadership.WaitOrTimeout() }) } diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 43eacb26..8727a6e3 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -224,8 +224,8 @@ type Executor interface { JobSetStateIfRunningMany(ctx context.Context, params *JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) JobUpdate(ctx context.Context, params *JobUpdateParams) (*rivertype.JobRow, error) JobUpdateFull(ctx context.Context, params *JobUpdateFullParams) (*rivertype.JobRow, error) - LeaderAttemptElect(ctx context.Context, params *LeaderElectParams) (bool, error) - LeaderAttemptReelect(ctx context.Context, params *LeaderElectParams) (bool, error) + LeaderAttemptElect(ctx context.Context, params *LeaderElectParams) (*Leader, error) + LeaderAttemptReelect(ctx context.Context, params *LeaderReelectParams) (*Leader, error) LeaderDeleteExpired(ctx context.Context, params *LeaderDeleteExpiredParams) (int, error) LeaderGetElectedLeader(ctx context.Context, params *LeaderGetElectedLeaderParams) (*Leader, error) LeaderInsert(ctx context.Context, params *LeaderInsertParams) (*Leader, error) @@ -701,7 +701,16 @@ type LeaderElectParams struct { TTL time.Duration } +type LeaderReelectParams struct { + ElectedAt time.Time + LeaderID string + Now *time.Time + Schema string + TTL time.Duration +} + type LeaderResignParams struct { + ElectedAt time.Time LeaderID string LeadershipTopic string Schema string diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go index b3b63848..ea4dc674 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go @@ -11,7 +11,7 @@ import ( "time" ) -const leaderAttemptElect = `-- name: LeaderAttemptElect :execrows +const leaderAttemptElect = `-- name: LeaderAttemptElect :one INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, @@ -24,6 +24,7 @@ INSERT INTO /* TEMPLATE: schema */river_leader ( ) ON CONFLICT (name) DO NOTHING +RETURNING elected_at, expires_at, leader_id, name ` type LeaderAttemptElectParams struct { @@ -32,43 +33,50 @@ type LeaderAttemptElectParams struct { TTL float64 } -func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) - if err != nil { - return 0, err - } - return result.RowsAffected() +func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (*RiverLeader, error) { + row := db.QueryRowContext(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) + var i RiverLeader + err := row.Scan( + &i.ElectedAt, + &i.ExpiresAt, + &i.LeaderID, + &i.Name, + ) + return &i, err } -const leaderAttemptReelect = `-- name: LeaderAttemptReelect :execrows -INSERT INTO /* TEMPLATE: schema */river_leader ( - leader_id, - elected_at, - expires_at -) VALUES ( - $1, - coalesce($2::timestamptz, now()), - coalesce($2::timestamptz, now()) + make_interval(secs => $3) -) -ON CONFLICT (name) - DO UPDATE SET - expires_at = EXCLUDED.expires_at - WHERE - river_leader.leader_id = $1 +const leaderAttemptReelect = `-- name: LeaderAttemptReelect :one +UPDATE /* TEMPLATE: schema */river_leader +SET expires_at = coalesce($1::timestamptz, now()) + make_interval(secs => $2) +WHERE + elected_at = $3::timestamptz + AND expires_at >= coalesce($1::timestamptz, now()) + AND leader_id = $4 +RETURNING elected_at, expires_at, leader_id, name ` type LeaderAttemptReelectParams struct { - LeaderID string - Now *time.Time - TTL float64 + Now *time.Time + TTL float64 + ElectedAt time.Time + LeaderID string } -func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptReelect, arg.LeaderID, arg.Now, arg.TTL) - if err != nil { - return 0, err - } - return result.RowsAffected() +func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (*RiverLeader, error) { + row := db.QueryRowContext(ctx, leaderAttemptReelect, + arg.Now, + arg.TTL, + arg.ElectedAt, + arg.LeaderID, + ) + var i RiverLeader + err := row.Scan( + &i.ElectedAt, + &i.ExpiresAt, + &i.LeaderID, + &i.Name, + ) + return &i, err } const leaderDeleteExpired = `-- name: LeaderDeleteExpired :execrows @@ -143,12 +151,14 @@ const leaderResign = `-- name: LeaderResign :execrows WITH currently_held_leaders AS ( SELECT elected_at, expires_at, leader_id, name FROM /* TEMPLATE: schema */river_leader - WHERE leader_id = $1::text + WHERE + elected_at = $1::timestamptz + AND leader_id = $2::text FOR UPDATE ), notified_resignations AS ( SELECT pg_notify( - concat(coalesce($2::text, current_schema()), '.', $3::text), + concat(coalesce($3::text, current_schema()), '.', $4::text), json_build_object('leader_id', leader_id, 'action', 'resigned')::text ) FROM currently_held_leaders @@ -157,13 +167,19 @@ DELETE FROM /* TEMPLATE: schema */river_leader USING notified_resignations ` type LeaderResignParams struct { + ElectedAt time.Time LeaderID string Schema sql.NullString LeadershipTopic string } func (q *Queries) LeaderResign(ctx context.Context, db DBTX, arg *LeaderResignParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderResign, arg.LeaderID, arg.Schema, arg.LeadershipTopic) + result, err := db.ExecContext(ctx, leaderResign, + arg.ElectedAt, + arg.LeaderID, + arg.Schema, + arg.LeadershipTopic, + ) if err != nil { return 0, err } diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index ef41a525..d8a2d445 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -716,28 +716,29 @@ func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpd return jobRowFromInternal(job) } -func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { - numElectionsWon, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ +func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) { + leader, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, Now: params.Now, TTL: params.TTL.Seconds(), }) if err != nil { - return false, interpretError(err) + return nil, interpretError(err) } - return numElectionsWon > 0, nil + return leaderFromInternal(leader), nil } -func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { - numElectionsWon, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ - LeaderID: params.LeaderID, - Now: params.Now, - TTL: params.TTL.Seconds(), +func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderReelectParams) (*riverdriver.Leader, error) { + leader, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ + ElectedAt: params.ElectedAt, + LeaderID: params.LeaderID, + Now: params.Now, + TTL: params.TTL.Seconds(), }) if err != nil { - return false, interpretError(err) + return nil, interpretError(err) } - return numElectionsWon > 0, nil + return leaderFromInternal(leader), nil } func (e *Executor) LeaderDeleteExpired(ctx context.Context, params *riverdriver.LeaderDeleteExpiredParams) (int, error) { @@ -772,6 +773,7 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI func (e *Executor) LeaderResign(ctx context.Context, params *riverdriver.LeaderResignParams) (bool, error) { numResigned, err := dbsqlc.New().LeaderResign(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderResignParams{ + ElectedAt: params.ElectedAt, LeaderID: params.LeaderID, LeadershipTopic: params.LeadershipTopic, Schema: sql.NullString{String: params.Schema, Valid: params.Schema != ""}, diff --git a/riverdriver/riverdrivertest/leader.go b/riverdriver/riverdrivertest/leader.go index 23d0fbdd..f81904f8 100644 --- a/riverdriver/riverdrivertest/leader.go +++ b/riverdriver/riverdrivertest/leader.go @@ -11,6 +11,7 @@ import ( "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/testfactory" "github.com/riverqueue/river/rivershared/util/ptrutil" + "github.com/riverqueue/river/rivertype" ) func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx func(ctx context.Context, t *testing.T) (riverdriver.Executor, riverdriver.Driver[TTx])) { @@ -48,18 +49,21 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f now := time.Now().UTC() - elected, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ + leader, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ LeaderID: testClientID, Now: &now, TTL: leaderTTL, }) require.NoError(t, err) - require.True(t, elected) // won election - - leader, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) - require.NoError(t, err) require.WithinDuration(t, now, leader.ElectedAt, bundle.driver.TimePrecision()) require.WithinDuration(t, now.Add(leaderTTL), leader.ExpiresAt, bundle.driver.TimePrecision()) + require.Equal(t, testClientID, leader.LeaderID) + + leaderFromDB, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + require.NoError(t, err) + require.WithinDuration(t, now, leaderFromDB.ElectedAt, bundle.driver.TimePrecision()) + require.WithinDuration(t, now.Add(leaderTTL), leaderFromDB.ExpiresAt, bundle.driver.TimePrecision()) + require.Equal(t, testClientID, leaderFromDB.LeaderID) }) t.Run("CannotElectTwiceInARow", func(t *testing.T) { @@ -71,12 +75,12 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f LeaderID: ptrutil.Ptr(testClientID), }) - elected, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ + leaderAttempt, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ LeaderID: "different-client-id", TTL: leaderTTL, }) - require.NoError(t, err) - require.False(t, elected) // lost election + require.ErrorIs(t, err, rivertype.ErrNotFound) + require.Nil(t, leaderAttempt) // The time should not have changed because we specified that we were not // already elected, and the elect query is a no-op if there's already a @@ -91,48 +95,50 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f exec, _ := setup(ctx, t) - elected, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ + leader, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ LeaderID: testClientID, TTL: leaderTTL, }) require.NoError(t, err) - require.True(t, elected) // won election + require.Equal(t, testClientID, leader.LeaderID) - leader, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + leaderFromDB, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) require.NoError(t, err) - require.WithinDuration(t, time.Now(), leader.ElectedAt, veryGenerousTimeCompareTolerance) - require.WithinDuration(t, time.Now().Add(leaderTTL), leader.ExpiresAt, veryGenerousTimeCompareTolerance) + require.WithinDuration(t, time.Now(), leaderFromDB.ElectedAt, veryGenerousTimeCompareTolerance) + require.WithinDuration(t, time.Now().Add(leaderTTL), leaderFromDB.ExpiresAt, veryGenerousTimeCompareTolerance) }) }) t.Run("LeaderAttemptReelect", func(t *testing.T) { t.Parallel() - t.Run("ElectsLeader", func(t *testing.T) { + t.Run("DoesNotReelectDifferentLeaderID", func(t *testing.T) { t.Parallel() - exec, bundle := setup(ctx, t) + exec, _ := setup(ctx, t) - now := time.Now().UTC() + leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + LeaderID: ptrutil.Ptr("other-client-id"), + }) - elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ - LeaderID: testClientID, - Now: &now, - TTL: leaderTTL, + updatedLeader, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderReelectParams{ + ElectedAt: leader.ElectedAt, + LeaderID: testClientID, + TTL: leaderTTL, }) - require.NoError(t, err) - require.True(t, elected) // won election + require.ErrorIs(t, err, rivertype.ErrNotFound) + require.Nil(t, updatedLeader) - leader, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + leaderFromDB, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) require.NoError(t, err) - require.WithinDuration(t, now, leader.ElectedAt, bundle.driver.TimePrecision()) - require.WithinDuration(t, now.Add(leaderTTL), leader.ExpiresAt, bundle.driver.TimePrecision()) + require.Equal(t, leader.LeaderID, leaderFromDB.LeaderID) + require.Equal(t, leader.ElectedAt, leaderFromDB.ElectedAt) }) t.Run("ReelectsSameLeader", func(t *testing.T) { t.Parallel() - exec, _ := setup(ctx, t) + exec, bundle := setup(ctx, t) leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ LeaderID: ptrutil.Ptr(testClientID), @@ -141,59 +147,93 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f // Re-elect the same leader. Use a larger TTL to see if time is updated, // because we are in a test transaction and the time is frozen at the start of // the transaction. - elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ - LeaderID: testClientID, - TTL: 30 * time.Second, + updatedLeader, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderReelectParams{ + ElectedAt: leader.ElectedAt, + LeaderID: testClientID, + TTL: 30 * time.Second, }) require.NoError(t, err) - require.True(t, elected) // won re-election + require.Equal(t, testClientID, updatedLeader.LeaderID) + require.WithinDuration(t, leader.ElectedAt, updatedLeader.ElectedAt, bundle.driver.TimePrecision()) // expires_at should be incremented because this is the same leader that won // previously and we specified that we're already elected: - updatedLeader, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + leaderFromDB, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) require.NoError(t, err) - require.Greater(t, updatedLeader.ExpiresAt, leader.ExpiresAt) + require.Greater(t, leaderFromDB.ExpiresAt, leader.ExpiresAt) + require.WithinDuration(t, updatedLeader.ElectedAt, leaderFromDB.ElectedAt, bundle.driver.TimePrecision()) }) - t.Run("DoesNotReelectDifferentLeader", func(t *testing.T) { + t.Run("DoesNotReelectExpiredRowThatIsNotYetDeleted", func(t *testing.T) { t.Parallel() exec, _ := setup(ctx, t) + now := time.Now().UTC() leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ - LeaderID: ptrutil.Ptr(testClientID), + ElectedAt: ptrutil.Ptr(now.Add(-2 * time.Hour)), + ExpiresAt: ptrutil.Ptr(now.Add(-1 * time.Hour)), + LeaderID: ptrutil.Ptr(testClientID), }) - elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ - LeaderID: "different-client", - TTL: 30 * time.Second, + updatedLeader, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderReelectParams{ + ElectedAt: leader.ElectedAt, + LeaderID: testClientID, + TTL: 30 * time.Second, }) + require.ErrorIs(t, err, rivertype.ErrNotFound) + require.Nil(t, updatedLeader) + + leaderFromDB, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) require.NoError(t, err) - require.False(t, elected) // did not win re-election + require.Equal(t, leader.ElectedAt, leaderFromDB.ElectedAt) + require.Equal(t, leader.ExpiresAt, leaderFromDB.ExpiresAt) + }) - // expires_at should be incremented because this is the same leader that won - // previously and we specified that we're already elected: - updatedLeader, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + t.Run("DoesNotReelectMismatchedElectedAt", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + LeaderID: ptrutil.Ptr(testClientID), + }) + + updatedLeader, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderReelectParams{ + ElectedAt: leader.ElectedAt.Add(-time.Second), + LeaderID: testClientID, + TTL: 30 * time.Second, + }) + require.ErrorIs(t, err, rivertype.ErrNotFound) + require.Nil(t, updatedLeader) + + leaderFromDB, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) require.NoError(t, err) - require.Equal(t, leader.LeaderID, updatedLeader.LeaderID) + require.Equal(t, leader.ElectedAt, leaderFromDB.ElectedAt) }) t.Run("WithoutNow", func(t *testing.T) { t.Parallel() - exec, _ := setup(ctx, t) + exec, bundle := setup(ctx, t) - elected, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderElectParams{ - LeaderID: testClientID, - TTL: leaderTTL, + leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + LeaderID: ptrutil.Ptr(testClientID), + }) + + updatedLeader, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderReelectParams{ + ElectedAt: leader.ElectedAt, + LeaderID: testClientID, + TTL: leaderTTL, }) require.NoError(t, err) - require.True(t, elected) // won election + require.Equal(t, testClientID, updatedLeader.LeaderID) + require.WithinDuration(t, leader.ElectedAt, updatedLeader.ElectedAt, bundle.driver.TimePrecision()) - leader, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + leaderFromDB, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) require.NoError(t, err) - require.WithinDuration(t, time.Now(), leader.ElectedAt, veryGenerousTimeCompareTolerance) - require.WithinDuration(t, time.Now().Add(leaderTTL), leader.ExpiresAt, veryGenerousTimeCompareTolerance) + require.WithinDuration(t, leader.ElectedAt, leaderFromDB.ElectedAt, bundle.driver.TimePrecision()) + require.WithinDuration(t, time.Now().Add(leaderTTL), leaderFromDB.ExpiresAt, veryGenerousTimeCompareTolerance) }) }) @@ -318,6 +358,7 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f { resigned, err := exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{ + ElectedAt: time.Now().UTC(), LeaderID: testClientID, LeadershipTopic: string(notifier.NotificationTopicLeadership), }) @@ -325,12 +366,13 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f require.False(t, resigned) } - _ = testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ LeaderID: ptrutil.Ptr(testClientID), }) { resigned, err := exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{ + ElectedAt: leader.ElectedAt, LeaderID: testClientID, LeadershipTopic: string(notifier.NotificationTopicLeadership), }) @@ -344,16 +386,53 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f exec, _ := setup(ctx, t) - _ = testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + leader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ LeaderID: ptrutil.Ptr("other-client-id"), }) resigned, err := exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{ + ElectedAt: leader.ElectedAt, LeaderID: testClientID, LeadershipTopic: string(notifier.NotificationTopicLeadership), }) require.NoError(t, err) require.False(t, resigned) }) + + t.Run("DoesNotResignNewerTermForSameLeaderID", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + oldLeader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + ElectedAt: ptrutil.Ptr(now.Add(-2 * time.Hour)), + ExpiresAt: ptrutil.Ptr(now.Add(-1 * time.Hour)), + LeaderID: ptrutil.Ptr(testClientID), + }) + + numDeleted, err := exec.LeaderDeleteExpired(ctx, &riverdriver.LeaderDeleteExpiredParams{Now: &now}) + require.NoError(t, err) + require.Equal(t, 1, numDeleted) + + newLeader := testfactory.Leader(ctx, t, exec, &testfactory.LeaderOpts{ + ElectedAt: ptrutil.Ptr(now), + LeaderID: ptrutil.Ptr(testClientID), + }) + + resigned, err := exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{ + ElectedAt: oldLeader.ElectedAt, + LeaderID: testClientID, + LeadershipTopic: string(notifier.NotificationTopicLeadership), + }) + require.NoError(t, err) + require.False(t, resigned) + + leaderFromDB, err := exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{}) + require.NoError(t, err) + require.Equal(t, newLeader.LeaderID, leaderFromDB.LeaderID) + require.Equal(t, newLeader.ElectedAt, leaderFromDB.ElectedAt) + }) }) } diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql index e2417d86..cea2195f 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql @@ -7,7 +7,7 @@ CREATE UNLOGGED TABLE river_leader( CONSTRAINT leader_id_length CHECK (char_length(leader_id) > 0 AND char_length(leader_id) < 128) ); --- name: LeaderAttemptElect :execrows +-- name: LeaderAttemptElect :one INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, @@ -19,23 +19,17 @@ INSERT INTO /* TEMPLATE: schema */river_leader ( coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl) ) ON CONFLICT (name) - DO NOTHING; + DO NOTHING +RETURNING *; --- name: LeaderAttemptReelect :execrows -INSERT INTO /* TEMPLATE: schema */river_leader ( - leader_id, - elected_at, - expires_at -) VALUES ( - @leader_id, - coalesce(sqlc.narg('now')::timestamptz, now()), - coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl) -) -ON CONFLICT (name) - DO UPDATE SET - expires_at = EXCLUDED.expires_at - WHERE - river_leader.leader_id = @leader_id; +-- name: LeaderAttemptReelect :one +UPDATE /* TEMPLATE: schema */river_leader +SET expires_at = coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl) +WHERE + elected_at = @elected_at::timestamptz + AND expires_at >= coalesce(sqlc.narg('now')::timestamptz, now()) + AND leader_id = @leader_id +RETURNING *; -- name: LeaderDeleteExpired :execrows DELETE FROM /* TEMPLATE: schema */river_leader @@ -60,7 +54,9 @@ INSERT INTO /* TEMPLATE: schema */river_leader( WITH currently_held_leaders AS ( SELECT * FROM /* TEMPLATE: schema */river_leader - WHERE leader_id = @leader_id::text + WHERE + elected_at = @elected_at::timestamptz + AND leader_id = @leader_id::text FOR UPDATE ), notified_resignations AS ( @@ -70,4 +66,4 @@ notified_resignations AS ( ) FROM currently_held_leaders ) -DELETE FROM /* TEMPLATE: schema */river_leader USING notified_resignations; \ No newline at end of file +DELETE FROM /* TEMPLATE: schema */river_leader USING notified_resignations; diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go index 389ef8b1..4fcea1cf 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go @@ -12,7 +12,7 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) -const leaderAttemptElect = `-- name: LeaderAttemptElect :execrows +const leaderAttemptElect = `-- name: LeaderAttemptElect :one INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, @@ -25,6 +25,7 @@ INSERT INTO /* TEMPLATE: schema */river_leader ( ) ON CONFLICT (name) DO NOTHING +RETURNING elected_at, expires_at, leader_id, name ` type LeaderAttemptElectParams struct { @@ -33,43 +34,50 @@ type LeaderAttemptElectParams struct { TTL float64 } -func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (int64, error) { - result, err := db.Exec(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) - if err != nil { - return 0, err - } - return result.RowsAffected(), nil +func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (*RiverLeader, error) { + row := db.QueryRow(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) + var i RiverLeader + err := row.Scan( + &i.ElectedAt, + &i.ExpiresAt, + &i.LeaderID, + &i.Name, + ) + return &i, err } -const leaderAttemptReelect = `-- name: LeaderAttemptReelect :execrows -INSERT INTO /* TEMPLATE: schema */river_leader ( - leader_id, - elected_at, - expires_at -) VALUES ( - $1, - coalesce($2::timestamptz, now()), - coalesce($2::timestamptz, now()) + make_interval(secs => $3) -) -ON CONFLICT (name) - DO UPDATE SET - expires_at = EXCLUDED.expires_at - WHERE - river_leader.leader_id = $1 +const leaderAttemptReelect = `-- name: LeaderAttemptReelect :one +UPDATE /* TEMPLATE: schema */river_leader +SET expires_at = coalesce($1::timestamptz, now()) + make_interval(secs => $2) +WHERE + elected_at = $3::timestamptz + AND expires_at >= coalesce($1::timestamptz, now()) + AND leader_id = $4 +RETURNING elected_at, expires_at, leader_id, name ` type LeaderAttemptReelectParams struct { - LeaderID string - Now *time.Time - TTL float64 + Now *time.Time + TTL float64 + ElectedAt time.Time + LeaderID string } -func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (int64, error) { - result, err := db.Exec(ctx, leaderAttemptReelect, arg.LeaderID, arg.Now, arg.TTL) - if err != nil { - return 0, err - } - return result.RowsAffected(), nil +func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (*RiverLeader, error) { + row := db.QueryRow(ctx, leaderAttemptReelect, + arg.Now, + arg.TTL, + arg.ElectedAt, + arg.LeaderID, + ) + var i RiverLeader + err := row.Scan( + &i.ElectedAt, + &i.ExpiresAt, + &i.LeaderID, + &i.Name, + ) + return &i, err } const leaderDeleteExpired = `-- name: LeaderDeleteExpired :execrows @@ -144,12 +152,14 @@ const leaderResign = `-- name: LeaderResign :execrows WITH currently_held_leaders AS ( SELECT elected_at, expires_at, leader_id, name FROM /* TEMPLATE: schema */river_leader - WHERE leader_id = $1::text + WHERE + elected_at = $1::timestamptz + AND leader_id = $2::text FOR UPDATE ), notified_resignations AS ( SELECT pg_notify( - concat(coalesce($2::text, current_schema()), '.', $3::text), + concat(coalesce($3::text, current_schema()), '.', $4::text), json_build_object('leader_id', leader_id, 'action', 'resigned')::text ) FROM currently_held_leaders @@ -158,13 +168,19 @@ DELETE FROM /* TEMPLATE: schema */river_leader USING notified_resignations ` type LeaderResignParams struct { + ElectedAt time.Time LeaderID string Schema pgtype.Text LeadershipTopic string } func (q *Queries) LeaderResign(ctx context.Context, db DBTX, arg *LeaderResignParams) (int64, error) { - result, err := db.Exec(ctx, leaderResign, arg.LeaderID, arg.Schema, arg.LeadershipTopic) + result, err := db.Exec(ctx, leaderResign, + arg.ElectedAt, + arg.LeaderID, + arg.Schema, + arg.LeadershipTopic, + ) if err != nil { return 0, err } diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 9d3fed61..e0201a0e 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -701,28 +701,29 @@ func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpd return jobRowFromInternal(job) } -func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { - numElectionsWon, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ +func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) { + leader, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, Now: params.Now, TTL: params.TTL.Seconds(), }) if err != nil { - return false, interpretError(err) + return nil, interpretError(err) } - return numElectionsWon > 0, nil + return leaderFromInternal(leader), nil } -func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { - numElectionsWon, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ - LeaderID: params.LeaderID, - Now: params.Now, - TTL: params.TTL.Seconds(), +func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderReelectParams) (*riverdriver.Leader, error) { + leader, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ + ElectedAt: params.ElectedAt, + LeaderID: params.LeaderID, + Now: params.Now, + TTL: params.TTL.Seconds(), }) if err != nil { - return false, interpretError(err) + return nil, interpretError(err) } - return numElectionsWon > 0, nil + return leaderFromInternal(leader), nil } func (e *Executor) LeaderDeleteExpired(ctx context.Context, params *riverdriver.LeaderDeleteExpiredParams) (int, error) { @@ -757,6 +758,7 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI func (e *Executor) LeaderResign(ctx context.Context, params *riverdriver.LeaderResignParams) (bool, error) { numResigned, err := dbsqlc.New().LeaderResign(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderResignParams{ + ElectedAt: params.ElectedAt, LeaderID: params.LeaderID, LeadershipTopic: params.LeadershipTopic, Schema: pgtype.Text{String: params.Schema, Valid: params.Schema != ""}, diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql index c5da22cb..398eadf9 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql @@ -7,7 +7,7 @@ CREATE TABLE river_leader ( CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128) ); --- name: LeaderAttemptElect :execrows +-- name: LeaderAttemptElect :one INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, @@ -18,23 +18,17 @@ INSERT INTO /* TEMPLATE: schema */river_leader ( datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text)) ) ON CONFLICT (name) - DO NOTHING; + DO NOTHING +RETURNING *; --- name: LeaderAttemptReelect :execrows -INSERT INTO /* TEMPLATE: schema */river_leader ( - leader_id, - elected_at, - expires_at -) VALUES ( - @leader_id, - coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), - datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text)) -) -ON CONFLICT (name) - DO UPDATE SET - expires_at = EXCLUDED.expires_at - WHERE - leader_id = EXCLUDED.leader_id; +-- name: LeaderAttemptReelect :one +UPDATE /* TEMPLATE: schema */river_leader +SET expires_at = datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text)) +WHERE + unixepoch(elected_at, 'subsec') = unixepoch(cast(@elected_at AS text), 'subsec') + AND expires_at >= coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')) + AND leader_id = @leader_id +RETURNING *; -- name: LeaderDeleteExpired :execrows DELETE FROM /* TEMPLATE: schema */river_leader @@ -58,4 +52,6 @@ INSERT INTO /* TEMPLATE: schema */river_leader( -- name: LeaderResign :execrows DELETE FROM /* TEMPLATE: schema */river_leader -WHERE leader_id = @leader_id; \ No newline at end of file +WHERE + unixepoch(elected_at, 'subsec') = unixepoch(cast(@elected_at AS text), 'subsec') + AND leader_id = @leader_id; diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go index 6e452d0e..a43227f0 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go @@ -9,7 +9,7 @@ import ( "context" ) -const leaderAttemptElect = `-- name: LeaderAttemptElect :execrows +const leaderAttemptElect = `-- name: LeaderAttemptElect :one INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, @@ -21,6 +21,7 @@ INSERT INTO /* TEMPLATE: schema */river_leader ( ) ON CONFLICT (name) DO NOTHING +RETURNING elected_at, expires_at, leader_id, name ` type LeaderAttemptElectParams struct { @@ -29,43 +30,50 @@ type LeaderAttemptElectParams struct { TTL string } -func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) - if err != nil { - return 0, err - } - return result.RowsAffected() +func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (*RiverLeader, error) { + row := db.QueryRowContext(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) + var i RiverLeader + err := row.Scan( + &i.ElectedAt, + &i.ExpiresAt, + &i.LeaderID, + &i.Name, + ) + return &i, err } -const leaderAttemptReelect = `-- name: LeaderAttemptReelect :execrows -INSERT INTO /* TEMPLATE: schema */river_leader ( - leader_id, - elected_at, - expires_at -) VALUES ( - ?1, - coalesce(cast(?2 AS text), datetime('now', 'subsec')), - datetime(coalesce(cast(?2 AS text), datetime('now', 'subsec')), 'subsec', cast(?3 as text)) -) -ON CONFLICT (name) - DO UPDATE SET - expires_at = EXCLUDED.expires_at - WHERE - leader_id = EXCLUDED.leader_id +const leaderAttemptReelect = `-- name: LeaderAttemptReelect :one +UPDATE /* TEMPLATE: schema */river_leader +SET expires_at = datetime(coalesce(cast(?1 AS text), datetime('now', 'subsec')), 'subsec', cast(?2 as text)) +WHERE + unixepoch(elected_at, 'subsec') = unixepoch(cast(?3 AS text), 'subsec') + AND expires_at >= coalesce(cast(?1 AS text), datetime('now', 'subsec')) + AND leader_id = ?4 +RETURNING elected_at, expires_at, leader_id, name ` type LeaderAttemptReelectParams struct { - LeaderID string - Now *string - TTL string + Now *string + TTL string + ElectedAt string + LeaderID string } -func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (int64, error) { - result, err := db.ExecContext(ctx, leaderAttemptReelect, arg.LeaderID, arg.Now, arg.TTL) - if err != nil { - return 0, err - } - return result.RowsAffected() +func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (*RiverLeader, error) { + row := db.QueryRowContext(ctx, leaderAttemptReelect, + arg.Now, + arg.TTL, + arg.ElectedAt, + arg.LeaderID, + ) + var i RiverLeader + err := row.Scan( + &i.ElectedAt, + &i.ExpiresAt, + &i.LeaderID, + &i.Name, + ) + return &i, err } const leaderDeleteExpired = `-- name: LeaderDeleteExpired :execrows @@ -139,11 +147,18 @@ func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertPa const leaderResign = `-- name: LeaderResign :execrows DELETE FROM /* TEMPLATE: schema */river_leader -WHERE leader_id = ?1 +WHERE + unixepoch(elected_at, 'subsec') = unixepoch(cast(?1 AS text), 'subsec') + AND leader_id = ?2 ` -func (q *Queries) LeaderResign(ctx context.Context, db DBTX, leaderID string) (int64, error) { - result, err := db.ExecContext(ctx, leaderResign, leaderID) +type LeaderResignParams struct { + ElectedAt string + LeaderID string +} + +func (q *Queries) LeaderResign(ctx context.Context, db DBTX, arg *LeaderResignParams) (int64, error) { + result, err := db.ExecContext(ctx, leaderResign, arg.ElectedAt, arg.LeaderID) if err != nil { return 0, err } diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 1eda9f14..586fdc53 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -1099,28 +1099,29 @@ func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpd return jobRowFromInternal(job) } -func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { - numElectionsWon, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ +func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) { + leader, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, Now: timeStringNullable(params.Now), TTL: durationAsString(params.TTL), }) if err != nil { - return false, interpretError(err) + return nil, interpretError(err) } - return numElectionsWon > 0, nil + return leaderFromInternal(leader), nil } -func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) { - numElectionsWon, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ - LeaderID: params.LeaderID, - Now: timeStringNullable(params.Now), - TTL: durationAsString(params.TTL), +func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderReelectParams) (*riverdriver.Leader, error) { + leader, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ + ElectedAt: timeString(params.ElectedAt), + LeaderID: params.LeaderID, + Now: timeStringNullable(params.Now), + TTL: durationAsString(params.TTL), }) if err != nil { - return false, interpretError(err) + return nil, interpretError(err) } - return numElectionsWon > 0, nil + return leaderFromInternal(leader), nil } func (e *Executor) LeaderDeleteExpired(ctx context.Context, params *riverdriver.LeaderDeleteExpiredParams) (int, error) { @@ -1154,7 +1155,10 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI } func (e *Executor) LeaderResign(ctx context.Context, params *riverdriver.LeaderResignParams) (bool, error) { - numResigned, err := dbsqlc.New().LeaderResign(schemaTemplateParam(ctx, params.Schema), e.dbtx, params.LeaderID) + numResigned, err := dbsqlc.New().LeaderResign(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderResignParams{ + ElectedAt: timeString(params.ElectedAt), + LeaderID: params.LeaderID, + }) if err != nil { return false, interpretError(err) }