diff --git a/client.go b/client.go index 28c0ab61..b60addcd 100644 --- a/client.go +++ b/client.go @@ -1423,9 +1423,9 @@ func (c *Client[TTx]) JobCancelTx(ctx context.Context, tx TTx, jobID int64) (*ri func (c *Client[TTx]) jobCancel(ctx context.Context, exec riverdriver.Executor, jobID int64) (*rivertype.JobRow, error) { return c.pilot.JobCancel(ctx, exec, &riverdriver.JobCancelParams{ ID: jobID, - CancelAttemptedAt: c.baseService.Time.NowUTC(), + CancelAttemptedAt: c.baseService.Time.Now(), ControlTopic: string(notifier.NotificationTopicControl), - Now: c.baseService.Time.NowUTCOrNil(), + Now: c.baseService.Time.NowOrNil(), Schema: c.config.Schema, }) } @@ -1504,7 +1504,7 @@ func (c *Client[TTx]) JobRetryTx(ctx context.Context, tx TTx, id int64) (*rivert func (c *Client[TTx]) jobRetry(ctx context.Context, exec riverdriver.Executor, id int64) (*rivertype.JobRow, error) { return c.pilot.JobRetry(ctx, exec, &riverdriver.JobRetryParams{ ID: id, - Now: c.baseService.Time.NowUTCOrNil(), + Now: c.baseService.Time.NowOrNil(), Schema: c.config.Schema, }) } @@ -1624,7 +1624,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf // If the time is stubbed (in a test), use that for `created_at`. Otherwise, // leave an empty value which will either use the database's `now()` or be defaulted // by drivers as necessary. - createdAt := archetype.Time.NowUTCOrNil() + createdAt := archetype.Time.NowOrNil() maxAttempts := cmp.Or(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts) priority := cmp.Or(insertOpts.Priority, jobInsertOpts.Priority, rivercommon.PriorityDefault) @@ -2554,7 +2554,7 @@ func (c *Client[TTx]) QueuePause(ctx context.Context, name string, opts *QueuePa if err := tx.QueuePause(ctx, &riverdriver.QueuePauseParams{ Name: name, - Now: c.baseService.Time.NowUTCOrNil(), + Now: c.baseService.Time.NowOrNil(), Schema: c.config.Schema, }); err != nil { return err @@ -2590,7 +2590,7 @@ func (c *Client[TTx]) QueuePauseTx(ctx context.Context, tx TTx, name string, opt if err := executorTx.QueuePause(ctx, &riverdriver.QueuePauseParams{ Name: name, - Now: c.baseService.Time.NowUTCOrNil(), + Now: c.baseService.Time.NowOrNil(), Schema: c.config.Schema, }); err != nil { return err @@ -2624,7 +2624,7 @@ func (c *Client[TTx]) QueueResume(ctx context.Context, name string, opts *QueueP if err := tx.QueueResume(ctx, &riverdriver.QueueResumeParams{ Name: name, - Now: c.baseService.Time.NowUTCOrNil(), + Now: c.baseService.Time.NowOrNil(), Schema: c.config.Schema, }); err != nil { return err @@ -2661,7 +2661,7 @@ func (c *Client[TTx]) QueueResumeTx(ctx context.Context, tx TTx, name string, op if err := executorTx.QueueResume(ctx, &riverdriver.QueueResumeParams{ Name: name, - Now: c.baseService.Time.NowUTCOrNil(), + Now: c.baseService.Time.NowOrNil(), Schema: c.config.Schema, }); err != nil { return err diff --git a/client_test.go b/client_test.go index 3b53740d..f8d035c4 100644 --- a/client_test.go +++ b/client_test.go @@ -6061,7 +6061,7 @@ func Test_Client_RetryPolicy(t *testing.T) { client := newTestClient(t, dbPool, config) - now := client.baseService.Time.StubNowUTC(time.Now().UTC()) + now := client.baseService.Time.StubNow(time.Now().UTC()) t.Logf("Now: %s", now) subscribeChan, cancel := client.Subscribe(EventKindJobCompleted, EventKindJobFailed) @@ -6736,7 +6736,7 @@ func Test_Client_InsertNotificationsAreDeduplicatedAndDebounced(t *testing.T) { riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) // Anchor all limiter checks to a known base time so debounce behavior is // independent from scheduler jitter or machine load. - now := client.baseService.Time.StubNowUTC(time.Now().UTC()) + now := client.baseService.Time.StubNow(time.Now().UTC()) type insertPayload struct { Queue string `json:"queue"` @@ -6771,7 +6771,7 @@ func Test_Client_InsertNotificationsAreDeduplicatedAndDebounced(t *testing.T) { // Keep time fixed inside the cooldown window before issuing repeated queue1 // inserts. This guarantees that all of these inserts are ineligible for a // second notification regardless of wall-clock runtime. - client.baseService.Time.StubNowUTC(now.Add(500 * time.Millisecond)) + client.baseService.Time.StubNow(now.Add(500 * time.Millisecond)) for range 5 { config.Logger.InfoContext(ctx, "inserting queue1 job") @@ -6791,7 +6791,7 @@ func Test_Client_InsertNotificationsAreDeduplicatedAndDebounced(t *testing.T) { expectImmediateNotification(t, "queue3") // Immediate first fire on queue3 // `ShouldTrigger` uses a strict `Before` check; move just past the boundary. - client.baseService.Time.StubNowUTC(now.Add(config.FetchCooldown + time.Nanosecond)) + client.baseService.Time.StubNow(now.Add(config.FetchCooldown + time.Nanosecond)) // Now queue1 should immediately notify again. expectImmediateNotification(t, "queue1") @@ -6846,7 +6846,7 @@ func Test_Client_JobCompletion(t *testing.T) { client, bundle := setup(t, config) - now := client.baseService.Time.StubNowUTC(time.Now().UTC()) + now := client.baseService.Time.StubNow(time.Now().UTC()) insertRes, err := client.Insert(ctx, JobArgs{}, nil) require.NoError(t, err) @@ -6919,7 +6919,7 @@ func Test_Client_JobCompletion(t *testing.T) { client, bundle := setup(t, config) - now := client.baseService.Time.StubNowUTC(time.Now().UTC()) + now := client.baseService.Time.StubNow(time.Now().UTC()) insertRes, err := client.Insert(ctx, JobArgs{}, nil) require.NoError(t, err) @@ -6951,7 +6951,7 @@ func Test_Client_JobCompletion(t *testing.T) { client, bundle := setup(t, config) - now := client.baseService.Time.StubNowUTC(time.Now().UTC()) + now := client.baseService.Time.StubNow(time.Now().UTC()) insertRes, err := client.Insert(ctx, JobArgs{}, nil) require.NoError(t, err) @@ -7013,7 +7013,7 @@ func Test_Client_JobCompletion(t *testing.T) { client, bundle := setup(t, newTestConfig(t, "")) - now := client.baseService.Time.StubNowUTC(time.Now().UTC()) + now := client.baseService.Time.StubNow(time.Now().UTC()) type JobArgs struct { testutil.JobArgsReflectKind[JobArgs] @@ -8117,7 +8117,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Parallel() archetype := riversharedtest.BaseServiceArchetype(t) - archetype.Time.StubNowUTC(time.Now().UTC()) + archetype.Time.StubNow(time.Now().UTC()) uniqueOpts := UniqueOpts{ ByArgs: true, @@ -8147,7 +8147,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Parallel() archetype := riversharedtest.BaseServiceArchetype(t) - archetype.Time.StubNowUTC(time.Now().UTC()) + archetype.Time.StubNow(time.Now().UTC()) states := []rivertype.JobState{ rivertype.JobStateAvailable, @@ -8465,7 +8465,7 @@ func TestUniqueOpts(t *testing.T) { // the current time, but make sure it's nicened up a little to be // roughly in the middle of the hour and well clear of any period // boundaries. - client.baseService.Time.StubNowUTC( + client.baseService.Time.StubNow( time.Now().Truncate(1 * time.Hour).Add(37*time.Minute + 23*time.Second + 123*time.Millisecond).UTC(), ) diff --git a/internal/dbunique/db_unique.go b/internal/dbunique/db_unique.go index f69e2cea..bbf48661 100644 --- a/internal/dbunique/db_unique.go +++ b/internal/dbunique/db_unique.go @@ -114,7 +114,7 @@ func buildUniqueKeyString(timeGen rivertype.TimeGenerator, uniqueOpts *UniqueOpt } if uniqueOpts.ByPeriod != time.Duration(0) { - lowerPeriodBound := ptrutil.ValOrDefaultFunc(params.ScheduledAt, timeGen.NowUTC).Truncate(uniqueOpts.ByPeriod) + lowerPeriodBound := ptrutil.ValOrDefaultFunc(params.ScheduledAt, timeGen.Now).Truncate(uniqueOpts.ByPeriod) sb.WriteString("&period=" + lowerPeriodBound.Format(time.RFC3339)) } diff --git a/internal/dbunique/db_unique_test.go b/internal/dbunique/db_unique_test.go index 81cf1d60..80f27dc9 100644 --- a/internal/dbunique/db_unique_test.go +++ b/internal/dbunique/db_unique_test.go @@ -29,7 +29,7 @@ func TestUniqueKey(t *testing.T) { // Fixed timestamp for consistency across tests: now := time.Now().UTC() stubSvc := &riversharedtest.TimeStub{} - stubSvc.StubNowUTC(now) + stubSvc.StubNow(now) tests := []struct { name string diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index aca85923..37a4d399 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -78,10 +78,10 @@ func (c *InlineCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobst c.wg.Add(1) defer c.wg.Done() - start := c.Time.NowUTC() + start := c.Time.Now() jobs, err := withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) { - jobs, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, setStateParamsToMany(c.Time.NowUTCOrNil(), c.schema, params)) + jobs, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, setStateParamsToMany(c.Time.NowOrNil(), c.schema, params)) if err != nil { return nil, err } @@ -92,7 +92,7 @@ func (c *InlineCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobst return err } - stats.CompleteDuration = c.Time.NowUTC().Sub(start) + stats.CompleteDuration = c.Time.Now().Sub(start) c.subscribeCh <- []CompleterJobUpdated{{ Job: jobs[0], JobStats: stats, @@ -185,11 +185,11 @@ func newAsyncCompleterWithConcurrency(archetype *baseservice.Archetype, schema s func (c *AsyncCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { // Start clock outside of goroutine so that the time spent blocking waiting // for an errgroup slot is accurately measured. - start := c.Time.NowUTC() + start := c.Time.Now() c.errGroup.Go(func() error { jobs, err := withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) { - rows, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, setStateParamsToMany(c.Time.NowUTCOrNil(), c.schema, params)) + rows, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, setStateParamsToMany(c.Time.NowOrNil(), c.schema, params)) if err != nil { return nil, err } @@ -200,7 +200,7 @@ func (c *AsyncCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobsta return err } - stats.CompleteDuration = c.Time.NowUTC().Sub(start) + stats.CompleteDuration = c.Time.Now().Sub(start) c.subscribeCh <- []CompleterJobUpdated{{ Job: jobs[0], JobStats: stats, @@ -479,7 +479,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { events := sliceutil.Map(jobRows, func(jobRow *rivertype.JobRow) CompleterJobUpdated { setState := setStateBatch[jobRow.ID] startTime := setStateStartTimes[jobRow.ID] - setState.Stats.CompleteDuration = c.Time.NowUTC().Sub(startTime) + setState.Stats.CompleteDuration = c.Time.Now().Sub(startTime) return CompleterJobUpdated{ Job: jobRow, JobStats: setState.Stats, @@ -504,7 +504,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { } func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { - now := c.Time.NowUTC() + now := c.Time.Now() // If we've built up too much of a backlog because the completer's fallen // behind, block completions until the complete loop's had a chance to catch // up. diff --git a/internal/jobexecutor/job_executor.go b/internal/jobexecutor/job_executor.go index 888031eb..dab44f39 100644 --- a/internal/jobexecutor/job_executor.go +++ b/internal/jobexecutor/job_executor.go @@ -138,7 +138,7 @@ func (e *JobExecutor) Execute(ctx context.Context) { // Ensure that the context is cancelled no matter what, or it will leak: defer e.CancelFunc(errExecutorDefaultCancel) - e.start = e.Time.NowUTC() + e.start = e.Time.Now() e.stats = &jobstats.JobStatistics{ QueueWaitDuration: e.start.Sub(e.JobRow.ScheduledAt), } @@ -196,7 +196,7 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { PanicVal: recovery, } } - e.stats.RunDuration = e.Time.NowUTC().Sub(e.start) + e.stats.RunDuration = e.Time.Now().Sub(e.start) }() if e.WorkUnit == nil { @@ -385,7 +385,7 @@ func (e *JobExecutor) reportResult(ctx context.Context, jobRow *rivertype.JobRow // so we instead make the job immediately `available` if the snooze time is // smaller than the scheduler's run interval. var params *riverdriver.JobSetStateIfRunningParams - if nextAttemptScheduledAt.Sub(e.Time.NowUTC()) <= e.SchedulerInterval { + if nextAttemptScheduledAt.Sub(e.Time.Now()) <= e.SchedulerInterval { params = riverdriver.JobSetStateSnoozedAvailable(jobRow.ID, nextAttemptScheduledAt, jobRow.Attempt-1, metadataUpdatesBytes) } else { params = riverdriver.JobSetStateSnoozed(jobRow.ID, nextAttemptScheduledAt, jobRow.Attempt-1, metadataUpdatesBytes) @@ -409,7 +409,7 @@ func (e *JobExecutor) reportResult(ctx context.Context, jobRow *rivertype.JobRow return } - if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCompleted(jobRow.ID, e.Time.NowUTC(), metadataUpdatesBytes)); err != nil { + if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCompleted(jobRow.ID, e.Time.Now(), metadataUpdatesBytes)); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Error completing job", slog.String("err", err.Error()), slog.Int64("job_id", jobRow.ID), @@ -462,7 +462,7 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow, return } - now := e.Time.NowUTC() + now := e.Time.Now() if cancelJob { if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCancelled(jobRow.ID, now, errData, metadataUpdates)); err != nil { @@ -502,7 +502,7 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow, // respected. Here, we offset that with a branch that makes jobs immediately // `available` if their retry was smaller than the scheduler's run interval. var params *riverdriver.JobSetStateIfRunningParams - if nextRetryScheduledAt.Sub(e.Time.NowUTC()) <= e.SchedulerInterval { + if nextRetryScheduledAt.Sub(e.Time.Now()) <= e.SchedulerInterval { params = riverdriver.JobSetStateErrorAvailable(jobRow.ID, nextRetryScheduledAt, errData, metadataUpdates) } else { params = riverdriver.JobSetStateErrorRetryable(jobRow.ID, nextRetryScheduledAt, errData, metadataUpdates) diff --git a/internal/jobexecutor/job_executor_test.go b/internal/jobexecutor/job_executor_test.go index 5be8c855..1674fa7e 100644 --- a/internal/jobexecutor/job_executor_test.go +++ b/internal/jobexecutor/job_executor_test.go @@ -251,7 +251,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor, bundle := setup(t) - now := executor.Time.StubNowUTC(time.Now().UTC()) + now := executor.Time.StubNow(time.Now().UTC()) workerErr := errors.New("job error") executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) diff --git a/internal/leadership/elector.go b/internal/leadership/elector.go index 734917a8..53b2d5f9 100644 --- a/internal/leadership/elector.go +++ b/internal/leadership/elector.go @@ -213,7 +213,7 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error { elected, err := attemptElectOrReelect(ctx, e.exec, false, &riverdriver.LeaderElectParams{ LeaderID: e.config.ClientID, - Now: e.Time.NowUTCOrNil(), + Now: e.Time.NowOrNil(), Schema: e.config.Schema, TTL: e.leaderTTL(), }) @@ -391,7 +391,7 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error { reelected, err := attemptElectOrReelect(ctx, e.exec, true, &riverdriver.LeaderElectParams{ LeaderID: e.config.ClientID, - Now: e.Time.NowUTCOrNil(), + Now: e.Time.NowOrNil(), Schema: e.config.Schema, TTL: e.leaderTTL(), }) diff --git a/internal/maintenance/job_scheduler.go b/internal/maintenance/job_scheduler.go index 00adcee4..f593ae35 100644 --- a/internal/maintenance/job_scheduler.go +++ b/internal/maintenance/job_scheduler.go @@ -174,7 +174,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er } defer dbutil.RollbackWithoutCancel(ctx, execTx) - now := s.Time.NowUTC() + now := s.Time.Now() nowWithLookAhead := now.Add(s.config.Interval) scheduledJobResults, err := execTx.JobSchedule(ctx, &riverdriver.JobScheduleParams{ @@ -194,7 +194,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er // slightly in the future (this loop, the notify, and tx commit will take // a small amount of time). This isn't going to be perfect, but the goal // is to roughly try to guess when the clients will attempt to fetch jobs. - notificationHorizon := s.Time.NowUTC().Add(5 * time.Millisecond) + notificationHorizon := s.Time.Now().Add(5 * time.Millisecond) for _, result := range scheduledJobResults { if result.Job.ScheduledAt.After(notificationHorizon) { diff --git a/internal/maintenance/periodic_job_enqueuer.go b/internal/maintenance/periodic_job_enqueuer.go index 54e8fe53..6369740a 100644 --- a/internal/maintenance/periodic_job_enqueuer.go +++ b/internal/maintenance/periodic_job_enqueuer.go @@ -388,7 +388,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { var ( insertParamsMany []*rivertype.JobInsertParams - now = s.Time.NowUTC() + now = s.Time.Now() periodicJobUpsertParams = &riverpilot.PeriodicJobUpsertManyParams{Schema: s.Config.Schema} ) @@ -417,7 +417,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { periodicJobUpsertParams.Jobs = append(periodicJobUpsertParams.Jobs, &riverpilot.PeriodicJobUpsertParams{ ID: periodicJob.ID, NextRunAt: periodicJob.nextRunAt, - UpdatedAt: s.Time.NowUTC(), + UpdatedAt: s.Time.Now(), }) } @@ -452,7 +452,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { periodicJobUpsertParams = &riverpilot.PeriodicJobUpsertManyParams{Schema: s.Config.Schema} ) - now := s.Time.NowUTC() + now := s.Time.Now() // Add a small margin to the current time so we're not only // running jobs that are already ready, but also ones ready at @@ -482,7 +482,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { periodicJobUpsertParams.Jobs = append(periodicJobUpsertParams.Jobs, &riverpilot.PeriodicJobUpsertParams{ ID: periodicJob.ID, NextRunAt: periodicJob.nextRunAt, - UpdatedAt: s.Time.NowUTC(), + UpdatedAt: s.Time.Now(), }) } } @@ -642,7 +642,7 @@ func (s *PeriodicJobEnqueuer) timeUntilNextRun() time.Duration { var ( firstNextRunAt time.Time - now = s.Time.NowUTC() + now = s.Time.Now() ) for _, periodicJob := range s.periodicJobs { diff --git a/internal/maintenance/periodic_job_enqueuer_test.go b/internal/maintenance/periodic_job_enqueuer_test.go index eca662b2..197926d7 100644 --- a/internal/maintenance/periodic_job_enqueuer_test.go +++ b/internal/maintenance/periodic_job_enqueuer_test.go @@ -96,7 +96,7 @@ func TestPeriodicJobEnqueuer(t *testing.T) { } stubSvc := &riversharedtest.TimeStub{} - stubSvc.StubNowUTC(time.Now().UTC()) + stubSvc.StubNow(time.Now().UTC()) jobConstructorWithQueueFunc := func(name string, unique bool, queue string) func() (*rivertype.JobInsertParams, error) { return func() (*rivertype.JobInsertParams, error) { @@ -386,7 +386,7 @@ func TestPeriodicJobEnqueuer(t *testing.T) { svc, _ := setup(t) - now := svc.Time.StubNowUTC(time.Now()) + now := svc.Time.StubNow(time.Now()) svc.periodicJobs = make(map[rivertype.PeriodicJobHandle]*PeriodicJob) periodicJobHandles, err := svc.AddManySafely([]*PeriodicJob{ @@ -781,7 +781,7 @@ func TestPeriodicJobEnqueuer(t *testing.T) { svc, _ := setup(t) - now := svc.Time.StubNowUTC(time.Now()) + now := svc.Time.StubNow(time.Now()) // no jobs require.Equal(t, periodicJobEnqueuerVeryLongDuration, svc.timeUntilNextRun()) diff --git a/internal/maintenance/reindexer_test.go b/internal/maintenance/reindexer_test.go index ca71d93c..2d4817a5 100644 --- a/internal/maintenance/reindexer_test.go +++ b/internal/maintenance/reindexer_test.go @@ -80,7 +80,7 @@ func TestReindexer(t *testing.T) { } archetype := riversharedtest.BaseServiceArchetype(t) - bundle.now = archetype.Time.StubNowUTC(time.Now()) + bundle.now = archetype.Time.StubNow(time.Now()) fromNow := func(d time.Duration) func(time.Time) time.Time { return func(t time.Time) time.Time { diff --git a/internal/notifylimiter/limiter.go b/internal/notifylimiter/limiter.go index 366787c1..04598912 100644 --- a/internal/notifylimiter/limiter.go +++ b/internal/notifylimiter/limiter.go @@ -29,7 +29,7 @@ func NewLimiter(archetype *baseservice.Archetype, waitDuration time.Duration) *L func (l *Limiter) ShouldTrigger(topic string) bool { // Calculate this beforehand to reduce mutex duration. - now := l.Time.NowUTC() + now := l.Time.Now() lastSentHorizon := now.Add(-l.waitDuration) l.mu.Lock() diff --git a/internal/notifylimiter/limiter_test.go b/internal/notifylimiter/limiter_test.go index 64693d4d..f69ceafe 100644 --- a/internal/notifylimiter/limiter_test.go +++ b/internal/notifylimiter/limiter_test.go @@ -29,20 +29,20 @@ func TestLimiter(t *testing.T) { limiter, _ := setup() now := time.Now() - limiter.Time.StubNowUTC(now) + limiter.Time.StubNow(now) require.True(t, limiter.ShouldTrigger("a")) for range 10 { require.False(t, limiter.ShouldTrigger("a")) } // Move the time forward, by just less than waitDuration: - limiter.Time.StubNowUTC(now.Add(9 * time.Millisecond)) + limiter.Time.StubNow(now.Add(9 * time.Millisecond)) require.False(t, limiter.ShouldTrigger("a")) require.True(t, limiter.ShouldTrigger("b")) // First time being triggered on "b" // Move the time forward to just past the waitDuration: - limiter.Time.StubNowUTC(now.Add(11 * time.Millisecond)) + limiter.Time.StubNow(now.Add(11 * time.Millisecond)) require.True(t, limiter.ShouldTrigger("a")) for range 10 { require.False(t, limiter.ShouldTrigger("a")) @@ -51,7 +51,7 @@ func TestLimiter(t *testing.T) { require.False(t, limiter.ShouldTrigger("b")) // has only been 2ms since last trigger of "b" // Move forward by another waitDuration (plus padding): - limiter.Time.StubNowUTC(now.Add(22 * time.Millisecond)) + limiter.Time.StubNow(now.Add(22 * time.Millisecond)) require.True(t, limiter.ShouldTrigger("a")) require.True(t, limiter.ShouldTrigger("b")) require.False(t, limiter.ShouldTrigger("b")) @@ -65,7 +65,7 @@ func TestLimiter(t *testing.T) { limiter, _ := setup() now := time.Now() - limiter.Time.StubNowUTC(now) + limiter.Time.StubNow(now) counters := make(map[string]*atomic.Int64) for _, topic := range []string{"a", "b", "c"} { @@ -97,7 +97,7 @@ func TestLimiter(t *testing.T) { require.Equal(t, int64(1), counters["b"].Load()) require.Equal(t, int64(1), counters["c"].Load()) - limiter.Time.StubNowUTC(now.Add(11 * time.Millisecond)) + limiter.Time.StubNow(now.Add(11 * time.Millisecond)) <-time.After(100 * time.Millisecond) diff --git a/job_complete_tx.go b/job_complete_tx.go index bd408e8b..5c1a75eb 100644 --- a/job_complete_tx.go +++ b/job_complete_tx.go @@ -53,7 +53,7 @@ func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx } execTx := driver.UnwrapExecutor(tx) - params := riverdriver.JobSetStateCompleted(job.ID, client.baseService.Time.NowUTC(), nil) + params := riverdriver.JobSetStateCompleted(job.ID, client.baseService.Time.Now(), nil) rows, err := pilot.JobSetStateIfRunningMany(ctx, execTx, &riverdriver.JobSetStateIfRunningManyParams{ ID: []int64{params.ID}, Attempt: []*int{params.Attempt}, diff --git a/producer.go b/producer.go index f42f72f1..c753814c 100644 --- a/producer.go +++ b/producer.go @@ -291,7 +291,7 @@ func (p *producer) StartWorkContext(fetchCtx, workCtx context.Context) error { return p.exec.QueueCreateOrSetUpdatedAt(ctx, &riverdriver.QueueCreateOrSetUpdatedAtParams{ Metadata: []byte("{}"), Name: p.config.Queue, - Now: p.Time.NowUTCOrNil(), + Now: p.Time.NowOrNil(), Schema: p.config.Schema, }) }() @@ -759,7 +759,7 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC ClientID: p.config.ClientID, MaxAttemptedBy: maxAttemptedBy, MaxToLock: count, - Now: p.Time.NowUTCOrNil(), + Now: p.Time.NowOrNil(), Queue: p.config.Queue, ProducerID: p.id.Load(), Schema: p.config.Schema, @@ -966,7 +966,7 @@ func (p *producer) reportProducerStatusOnce(ctx context.Context) { ID: p.id.Load(), QueueName: p.config.Queue, Schema: p.config.Schema, - StaleUpdatedAtHorizon: p.Time.NowUTC().Add(-p.config.StaleProducerRetentionPeriod), + StaleUpdatedAtHorizon: p.Time.Now().Add(-p.config.StaleProducerRetentionPeriod), }) if err != nil && errors.Is(context.Cause(ctx), startstop.ErrStop) { return @@ -1006,7 +1006,7 @@ func (p *producer) reportQueueStatusOnce(ctx context.Context) { _, err := p.exec.QueueCreateOrSetUpdatedAt(ctx, &riverdriver.QueueCreateOrSetUpdatedAtParams{ Metadata: []byte("{}"), Name: p.config.Queue, - Now: p.Time.NowUTCOrNil(), + Now: p.Time.NowOrNil(), Schema: p.config.Schema, }) if err != nil && errors.Is(context.Cause(ctx), startstop.ErrStop) { diff --git a/producer_test.go b/producer_test.go index b524338a..c03bd766 100644 --- a/producer_test.go +++ b/producer_test.go @@ -375,7 +375,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin producer, bundle := setup(t) producer.config.QueueReportInterval = 50 * time.Millisecond - now := producer.Time.StubNowUTC(time.Now().UTC()) + now := producer.Time.StubNow(time.Now().UTC()) startProducer(t, ctx, ctx, producer) diff --git a/riverdriver/go.sum b/riverdriver/go.sum index 01785d00..451f0a1f 100644 --- a/riverdriver/go.sum +++ b/riverdriver/go.sum @@ -7,8 +7,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/riverqueue/river/rivertype v0.31.0 h1:O6vaJ72SffgF1nxzCrDKd4M+eMZFRlJpycnOcUIGLD8= -github.com/riverqueue/river/rivertype v0.31.0/go.mod h1:D1Ad+EaZiaXbQbJcJcfeicXJMBKno0n6UcfKI5Q7DIQ= +github.com/riverqueue/river/rivertype v0.34.0 h1:8NftF6oNlxWHdSpvbv4d6JXY6RlSDi9ZtQE8UC5oF0c= +github.com/riverqueue/river/rivertype v0.34.0/go.mod h1:D1Ad+EaZiaXbQbJcJcfeicXJMBKno0n6UcfKI5Q7DIQ= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= diff --git a/riverdriver/riversqlite/go.sum b/riverdriver/riversqlite/go.sum index a9c9f126..f3018226 100644 --- a/riverdriver/riversqlite/go.sum +++ b/riverdriver/riversqlite/go.sum @@ -4,8 +4,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= -github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= +github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc= +github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -14,16 +14,16 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/riverqueue/river v0.31.0 h1:BERwce/WS4Guter0/A3GyTDP+1rxl6vFHyBQv+U/5tM= -github.com/riverqueue/river v0.31.0/go.mod h1:Aqbb/jBrFMvh6rbe6SDC6XVZnS0v1W+QQPjejRvyHzk= -github.com/riverqueue/river/riverdriver v0.31.0 h1:XwDa8DqkRxkqMqfdLOYTgSykiTHNSRcWG1LcCg/g0ys= -github.com/riverqueue/river/riverdriver v0.31.0/go.mod h1:Vl6XPbWtjqP+rqEa/HxcEeXeZL/KPCwqjRlqj+wWsq8= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.31.0 h1:Zii6/VNqasBuPvFIA98xgjz3MRy2EvMm6lMyh1RtWBw= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.31.0/go.mod h1:z859lpsOraO3IYWjY9w8RZec5I0BAcas9rjZkwxAijU= -github.com/riverqueue/river/rivershared v0.31.0 h1:KVEp+13jnK9YOlMUKnR0eUyJaK+P/APcheoSGMfZArA= -github.com/riverqueue/river/rivershared v0.31.0/go.mod h1:Wvf489bvAiZsJm7mln8YAPZbK7pVfuK7bYfsBt5Nzbw= -github.com/riverqueue/river/rivertype v0.31.0 h1:O6vaJ72SffgF1nxzCrDKd4M+eMZFRlJpycnOcUIGLD8= -github.com/riverqueue/river/rivertype v0.31.0/go.mod h1:D1Ad+EaZiaXbQbJcJcfeicXJMBKno0n6UcfKI5Q7DIQ= +github.com/riverqueue/river v0.34.0 h1:TG4S2V1CfGvB828rrq18oGtGnRFzW7wlkwewLbcD3OI= +github.com/riverqueue/river v0.34.0/go.mod h1:EYAnX+jhreccUJt3nCEYF+7MxQcIJmU5idZahlDB3Po= +github.com/riverqueue/river/riverdriver v0.34.0 h1:Dam8kENDwaAmXMOOhdUKsaXtts9Gjv8Ac4kjB5KVd38= +github.com/riverqueue/river/riverdriver v0.34.0/go.mod h1:oYE5YkM2Awk/sr3ucRyu+71SjXAtp0PBrDuon+jA50A= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.34.0 h1:NMD9TnV+33D6uOc76zpuBRwJyibA+txcAepDw7/Du98= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.34.0/go.mod h1:+rTHXis4+zvgIqI6XJ/0HwAcJ4BgVGQYK+BcuPUimc0= +github.com/riverqueue/river/rivershared v0.34.0 h1:OZwOrYGXWM8C1JZ5AaJ0ztqLpsFnQSljvtROj1JWBiQ= +github.com/riverqueue/river/rivershared v0.34.0/go.mod h1:WeECN4ZC97pwvIP1WGvBKi7ucNomcbhsDUDOkHuEqho= +github.com/riverqueue/river/rivertype v0.34.0 h1:8NftF6oNlxWHdSpvbv4d6JXY6RlSDi9ZtQE8UC5oF0c= +github.com/riverqueue/river/rivertype v0.34.0/go.mod h1:D1Ad+EaZiaXbQbJcJcfeicXJMBKno0n6UcfKI5Q7DIQ= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= @@ -41,10 +41,10 @@ github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= -golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= -golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= -golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= +golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= +golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/rivershared/baseservice/base_service.go b/rivershared/baseservice/base_service.go index 1b96b379..95d81f04 100644 --- a/rivershared/baseservice/base_service.go +++ b/rivershared/baseservice/base_service.go @@ -23,12 +23,14 @@ type Archetype struct { // Logger is a structured logger. Logger *slog.Logger - // Time returns a time generator to get the current time in UTC. Normally - // it's implemented as [UnStubbableTimeGenerator] which just calls - // through to `time.Now().UTC()`, but is riverinternaltest.timeStub in tests - // to allow the current time to be stubbed. Services should try to use this - // function instead of the vanilla ones from the `time` package for testing - // purposes. + // Time returns a time generator for in-process deadline/duration math and + // optional stubbed wall-clock timestamps in tests. + // + // The production path intentionally uses `time.Now()` rather than + // `time.Now().UTC()`: per the Go time package's monotonic clock semantics, + // normalizing through `UTC()` strips the monotonic reading. Services should + // use this clock for local timing math and normalize to UTC only at + // database or serialization boundaries. Time TimeGeneratorWithStub } @@ -86,9 +88,10 @@ func Init[TService WithBaseService](archetype *Archetype, service TService) TSer type TimeGeneratorWithStub interface { rivertype.TimeGenerator - // StubNowUTC stubs the current time. It will panic if invoked outside of - // tests. Returns the same time passed as parameter for convenience. - StubNowUTC(nowUTC time.Time) time.Time + // StubNow stubs the current wall-clock time. It will panic if invoked + // outside of tests. Returns the same time passed as parameter for + // convenience. + StubNow(now time.Time) time.Time } // TimeGeneratorWithStubWrapper provides a wrapper around TimeGenerator that @@ -99,7 +102,7 @@ type TimeGeneratorWithStubWrapper struct { rivertype.TimeGenerator } -func (g *TimeGeneratorWithStubWrapper) StubNowUTC(nowUTC time.Time) time.Time { +func (g *TimeGeneratorWithStubWrapper) StubNow(now time.Time) time.Time { panic("time not stubbable outside tests") } @@ -107,10 +110,14 @@ func (g *TimeGeneratorWithStubWrapper) StubNowUTC(nowUTC time.Time) time.Time { // stubbed. It's always the generator used outside of tests. type UnStubbableTimeGenerator struct{} -func (g *UnStubbableTimeGenerator) NowUTC() time.Time { return time.Now() } -func (g *UnStubbableTimeGenerator) NowUTCOrNil() *time.Time { return nil } +// Now intentionally returns `time.Now()` without calling `.UTC()`. River uses +// this clock for in-process duration and deadline math, and Go strips the +// monotonic clock reading when changing a Time's location with methods like +// `UTC()`. Normalize at database or serialization boundaries instead. +func (g *UnStubbableTimeGenerator) Now() time.Time { return time.Now() } +func (g *UnStubbableTimeGenerator) NowOrNil() *time.Time { return nil } -func (g *UnStubbableTimeGenerator) StubNowUTC(nowUTC time.Time) time.Time { +func (g *UnStubbableTimeGenerator) StubNow(now time.Time) time.Time { panic("time not stubbable outside tests") } diff --git a/rivershared/baseservice/base_service_test.go b/rivershared/baseservice/base_service_test.go index e60a5146..22c4df8c 100644 --- a/rivershared/baseservice/base_service_test.go +++ b/rivershared/baseservice/base_service_test.go @@ -17,7 +17,7 @@ func TestInit(t *testing.T) { myService := Init(archetype, &MyService{}) require.NotNil(t, myService.Logger) require.Equal(t, "baseservice.MyService", myService.Name) - require.WithinDuration(t, time.Now().UTC(), myService.Time.NowUTC(), 2*time.Second) + require.WithinDuration(t, time.Now(), myService.Time.Now(), 2*time.Second) } type MyService struct { diff --git a/rivershared/circuitbreaker/circuit_breaker.go b/rivershared/circuitbreaker/circuit_breaker.go index f0e9503a..9ada2d8d 100644 --- a/rivershared/circuitbreaker/circuit_breaker.go +++ b/rivershared/circuitbreaker/circuit_breaker.go @@ -90,7 +90,7 @@ func (b *CircuitBreaker) Trip() bool { var ( horizonIndex = -1 - now = b.timeGenerator.NowUTC() + now = b.timeGenerator.Now() ) for i := len(b.trips) - 1; i >= 0; i-- { if b.trips[i].Before(now.Add(-b.opts.Window)) { diff --git a/rivershared/circuitbreaker/circuit_breaker_test.go b/rivershared/circuitbreaker/circuit_breaker_test.go index 14118959..c141aeb1 100644 --- a/rivershared/circuitbreaker/circuit_breaker_test.go +++ b/rivershared/circuitbreaker/circuit_breaker_test.go @@ -65,15 +65,15 @@ func TestCircuitBreaker(t *testing.T) { now = time.Now() ) - timeStub.StubNowUTC(now) + timeStub.StubNow(now) for range limit - 2 { require.False(t, breaker.Trip()) } - timeStub.StubNowUTC(now.Add(window - 1*time.Second)) + timeStub.StubNow(now.Add(window - 1*time.Second)) require.False(t, breaker.Trip()) - timeStub.StubNowUTC(now.Add(window)) + timeStub.StubNow(now.Add(window)) require.True(t, breaker.Trip()) }) @@ -86,17 +86,17 @@ func TestCircuitBreaker(t *testing.T) { now = time.Now() ) - timeStub.StubNowUTC(now) + timeStub.StubNow(now) require.False(t, breaker.Trip()) - timeStub.StubNowUTC(now.Add(window - 1*time.Second)) + timeStub.StubNow(now.Add(window - 1*time.Second)) for range limit - 2 { require.False(t, breaker.Trip()) } // Does *not* trip because the first trip is reaped as it's fallen out // of the window. - timeStub.StubNowUTC(now.Add(window + 1*time.Second)) + timeStub.StubNow(now.Add(window + 1*time.Second)) require.False(t, breaker.Trip()) }) @@ -109,13 +109,13 @@ func TestCircuitBreaker(t *testing.T) { now = time.Now() ) - timeStub.StubNowUTC(now) + timeStub.StubNow(now) for range limit - 1 { require.False(t, breaker.Trip()) } // Similar to the above, but multiple trips fall out of the window at once. - timeStub.StubNowUTC(now.Add(window + 1*time.Second)) + timeStub.StubNow(now.Add(window + 1*time.Second)) require.False(t, breaker.Trip()) }) @@ -128,7 +128,7 @@ func TestCircuitBreaker(t *testing.T) { now = time.Now() ) - timeStub.StubNowUTC(now) + timeStub.StubNow(now) for range limit - 1 { require.False(t, breaker.Trip()) } @@ -137,7 +137,7 @@ func TestCircuitBreaker(t *testing.T) { // We're allowed to go right up the limit again because the call to // ResetIfNotOpen reset everything. - timeStub.StubNowUTC(now) + timeStub.StubNow(now) for range limit - 1 { require.False(t, breaker.Trip()) } diff --git a/rivershared/riversharedtest/riversharedtest.go b/rivershared/riversharedtest/riversharedtest.go index b2bc34c6..d6f2b5ba 100644 --- a/rivershared/riversharedtest/riversharedtest.go +++ b/rivershared/riversharedtest/riversharedtest.go @@ -240,34 +240,34 @@ func TestDatabaseURL() string { // // It exists separately from rivertest.TimeStub to avoid a circular dependency. type TimeStub struct { - mu sync.RWMutex - nowUTC *time.Time + mu sync.RWMutex + now *time.Time } -func (t *TimeStub) NowUTC() time.Time { +func (t *TimeStub) Now() time.Time { t.mu.RLock() defer t.mu.RUnlock() - if t.nowUTC == nil { - return time.Now().UTC() + if t.now == nil { + return time.Now() } - return *t.nowUTC + return *t.now } -func (t *TimeStub) NowUTCOrNil() *time.Time { +func (t *TimeStub) NowOrNil() *time.Time { t.mu.RLock() defer t.mu.RUnlock() - return t.nowUTC + return t.now } -func (t *TimeStub) StubNowUTC(nowUTC time.Time) time.Time { +func (t *TimeStub) StubNow(now time.Time) time.Time { t.mu.Lock() defer t.mu.Unlock() - t.nowUTC = &nowUTC - return nowUTC + t.now = &now + return now } // WaitOrTimeout tries to wait on the given channel for a value to come through, diff --git a/rivershared/riversharedtest/riversharedtest_test.go b/rivershared/riversharedtest/riversharedtest_test.go index db4270d2..6ee2e560 100644 --- a/rivershared/riversharedtest/riversharedtest_test.go +++ b/rivershared/riversharedtest/riversharedtest_test.go @@ -58,11 +58,11 @@ func TestTimeStub(t *testing.T) { timeStub := &TimeStub{} - timeStub.StubNowUTC(initialTime) - require.Equal(t, initialTime, timeStub.NowUTC()) + timeStub.StubNow(initialTime) + require.Equal(t, initialTime, timeStub.Now()) - newTime := timeStub.StubNowUTC(initialTime.Add(1 * time.Second)) - require.Equal(t, newTime, timeStub.NowUTC()) + newTime := timeStub.StubNow(initialTime.Add(1 * time.Second)) + require.Equal(t, newTime, timeStub.Now()) }) t.Run("Stress", func(t *testing.T) { @@ -73,8 +73,8 @@ func TestTimeStub(t *testing.T) { for range 10 { go func() { for range 50 { - timeStub.StubNowUTC(time.Now().UTC()) - _ = timeStub.NowUTC() + timeStub.StubNow(time.Now().UTC()) + _ = timeStub.Now() } }() } diff --git a/rivertest/worker.go b/rivertest/worker.go index 93382c44..8ef06837 100644 --- a/rivertest/worker.go +++ b/rivertest/worker.go @@ -161,7 +161,7 @@ func (w *Worker[T, TTx]) workJob(ctx context.Context, tb testing.TB, tx TTx, job ID: job.ID, Attempt: job.Attempt + 1, AttemptDoUpdate: true, - AttemptedAt: ptrutil.Ptr(timeGen.NowUTC()), + AttemptedAt: ptrutil.Ptr(timeGen.Now()), AttemptedAtDoUpdate: true, AttemptedBy: append(job.AttemptedBy, w.config.ID), AttemptedByDoUpdate: true, diff --git a/rivertest/worker_test.go b/rivertest/worker_test.go index 37032229..1e0d5680 100644 --- a/rivertest/worker_test.go +++ b/rivertest/worker_test.go @@ -205,7 +205,7 @@ func TestWorker_Work(t *testing.T) { stubTime := &riversharedtest.TimeStub{} now := time.Now().UTC() - stubTime.StubNowUTC(now) + stubTime.StubNow(now) bundle.config.Test.Time = stubTime worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { @@ -296,7 +296,7 @@ func TestWorker_Work(t *testing.T) { bundle := setup(t) hourFromNow := time.Now().UTC().Add(1 * time.Hour) timeStub := &riversharedtest.TimeStub{} - timeStub.StubNowUTC(hourFromNow) + timeStub.StubNow(hourFromNow) bundle.config.Test.Time = timeStub worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { diff --git a/rivertype/time_generator.go b/rivertype/time_generator.go index a8678a6f..9144cf8e 100644 --- a/rivertype/time_generator.go +++ b/rivertype/time_generator.go @@ -2,18 +2,27 @@ package rivertype import "time" -// TimeGenerator generates a current time in UTC. In test environments it's +// TimeGenerator generates current time values for in-process timing math and +// optional stubbed wall-clock timestamps in tests. In test environments it's // implemented by riversharedtest.TimeStub which lets the current time be // stubbed. Otherwise, it's implemented as UnStubbableTimeGenerator which // doesn't allow stubbing. type TimeGenerator interface { - // NowUTC returns the current time. This may be a stubbed time if the time - // has been actively stubbed in a test. - NowUTC() time.Time + // Now returns the current time. This may be a stubbed time if the time has + // been actively stubbed in a test. + // + // Production implementations should preserve Go's monotonic clock reading + // from time.Now for in-process duration and deadline math. Do not normalize + // through `t.UTC()` here: per the time package's monotonic clock semantics, + // changing location strips the monotonic reading. Normalize at database or + // serialization boundaries instead. + Now() time.Time - // NowUTCOrNil returns if the currently stubbed time _if_ the current time - // is stubbed, and returns nil otherwise. This is generally useful in cases - // where a component may want to use a stubbed time if the time is stubbed, - // but to fall back to a database time default otherwise. - NowUTCOrNil() *time.Time + // NowOrNil returns the currently stubbed time if the current time is + // stubbed, and nil otherwise. + // + // This is mainly for database-facing test paths that want to inject a + // deterministic wall-clock timestamp when time is stubbed, but to fall back + // to a database-side time default in production. + NowOrNil() *time.Time }