Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1423,9 +1423,9 @@ func (c *Client[TTx]) JobCancelTx(ctx context.Context, tx TTx, jobID int64) (*ri
func (c *Client[TTx]) jobCancel(ctx context.Context, exec riverdriver.Executor, jobID int64) (*rivertype.JobRow, error) {
return c.pilot.JobCancel(ctx, exec, &riverdriver.JobCancelParams{
ID: jobID,
CancelAttemptedAt: c.baseService.Time.NowUTC(),
CancelAttemptedAt: c.baseService.Time.Now(),
ControlTopic: string(notifier.NotificationTopicControl),
Now: c.baseService.Time.NowUTCOrNil(),
Now: c.baseService.Time.NowOrNil(),
Schema: c.config.Schema,
})
}
Expand Down Expand Up @@ -1504,7 +1504,7 @@ func (c *Client[TTx]) JobRetryTx(ctx context.Context, tx TTx, id int64) (*rivert
func (c *Client[TTx]) jobRetry(ctx context.Context, exec riverdriver.Executor, id int64) (*rivertype.JobRow, error) {
return c.pilot.JobRetry(ctx, exec, &riverdriver.JobRetryParams{
ID: id,
Now: c.baseService.Time.NowUTCOrNil(),
Now: c.baseService.Time.NowOrNil(),
Schema: c.config.Schema,
})
}
Expand Down Expand Up @@ -1624,7 +1624,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
// If the time is stubbed (in a test), use that for `created_at`. Otherwise,
// leave an empty value which will either use the database's `now()` or be defaulted
// by drivers as necessary.
createdAt := archetype.Time.NowUTCOrNil()
createdAt := archetype.Time.NowOrNil()

maxAttempts := cmp.Or(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts)
priority := cmp.Or(insertOpts.Priority, jobInsertOpts.Priority, rivercommon.PriorityDefault)
Expand Down Expand Up @@ -2554,7 +2554,7 @@ func (c *Client[TTx]) QueuePause(ctx context.Context, name string, opts *QueuePa

if err := tx.QueuePause(ctx, &riverdriver.QueuePauseParams{
Name: name,
Now: c.baseService.Time.NowUTCOrNil(),
Now: c.baseService.Time.NowOrNil(),
Schema: c.config.Schema,
}); err != nil {
return err
Expand Down Expand Up @@ -2590,7 +2590,7 @@ func (c *Client[TTx]) QueuePauseTx(ctx context.Context, tx TTx, name string, opt

if err := executorTx.QueuePause(ctx, &riverdriver.QueuePauseParams{
Name: name,
Now: c.baseService.Time.NowUTCOrNil(),
Now: c.baseService.Time.NowOrNil(),
Schema: c.config.Schema,
}); err != nil {
return err
Expand Down Expand Up @@ -2624,7 +2624,7 @@ func (c *Client[TTx]) QueueResume(ctx context.Context, name string, opts *QueueP

if err := tx.QueueResume(ctx, &riverdriver.QueueResumeParams{
Name: name,
Now: c.baseService.Time.NowUTCOrNil(),
Now: c.baseService.Time.NowOrNil(),
Schema: c.config.Schema,
}); err != nil {
return err
Expand Down Expand Up @@ -2661,7 +2661,7 @@ func (c *Client[TTx]) QueueResumeTx(ctx context.Context, tx TTx, name string, op

if err := executorTx.QueueResume(ctx, &riverdriver.QueueResumeParams{
Name: name,
Now: c.baseService.Time.NowUTCOrNil(),
Now: c.baseService.Time.NowOrNil(),
Schema: c.config.Schema,
}); err != nil {
return err
Expand Down
22 changes: 11 additions & 11 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6061,7 +6061,7 @@ func Test_Client_RetryPolicy(t *testing.T) {

client := newTestClient(t, dbPool, config)

now := client.baseService.Time.StubNowUTC(time.Now().UTC())
now := client.baseService.Time.StubNow(time.Now().UTC())
t.Logf("Now: %s", now)

subscribeChan, cancel := client.Subscribe(EventKindJobCompleted, EventKindJobFailed)
Expand Down Expand Up @@ -6736,7 +6736,7 @@ func Test_Client_InsertNotificationsAreDeduplicatedAndDebounced(t *testing.T) {
riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())
// Anchor all limiter checks to a known base time so debounce behavior is
// independent from scheduler jitter or machine load.
now := client.baseService.Time.StubNowUTC(time.Now().UTC())
now := client.baseService.Time.StubNow(time.Now().UTC())

type insertPayload struct {
Queue string `json:"queue"`
Expand Down Expand Up @@ -6771,7 +6771,7 @@ func Test_Client_InsertNotificationsAreDeduplicatedAndDebounced(t *testing.T) {
// Keep time fixed inside the cooldown window before issuing repeated queue1
// inserts. This guarantees that all of these inserts are ineligible for a
// second notification regardless of wall-clock runtime.
client.baseService.Time.StubNowUTC(now.Add(500 * time.Millisecond))
client.baseService.Time.StubNow(now.Add(500 * time.Millisecond))

for range 5 {
config.Logger.InfoContext(ctx, "inserting queue1 job")
Expand All @@ -6791,7 +6791,7 @@ func Test_Client_InsertNotificationsAreDeduplicatedAndDebounced(t *testing.T) {
expectImmediateNotification(t, "queue3") // Immediate first fire on queue3

// `ShouldTrigger` uses a strict `Before` check; move just past the boundary.
client.baseService.Time.StubNowUTC(now.Add(config.FetchCooldown + time.Nanosecond))
client.baseService.Time.StubNow(now.Add(config.FetchCooldown + time.Nanosecond))

// Now queue1 should immediately notify again.
expectImmediateNotification(t, "queue1")
Expand Down Expand Up @@ -6846,7 +6846,7 @@ func Test_Client_JobCompletion(t *testing.T) {

client, bundle := setup(t, config)

now := client.baseService.Time.StubNowUTC(time.Now().UTC())
now := client.baseService.Time.StubNow(time.Now().UTC())

insertRes, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -6919,7 +6919,7 @@ func Test_Client_JobCompletion(t *testing.T) {

client, bundle := setup(t, config)

now := client.baseService.Time.StubNowUTC(time.Now().UTC())
now := client.baseService.Time.StubNow(time.Now().UTC())

insertRes, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -6951,7 +6951,7 @@ func Test_Client_JobCompletion(t *testing.T) {

client, bundle := setup(t, config)

now := client.baseService.Time.StubNowUTC(time.Now().UTC())
now := client.baseService.Time.StubNow(time.Now().UTC())

insertRes, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -7013,7 +7013,7 @@ func Test_Client_JobCompletion(t *testing.T) {

client, bundle := setup(t, newTestConfig(t, ""))

now := client.baseService.Time.StubNowUTC(time.Now().UTC())
now := client.baseService.Time.StubNow(time.Now().UTC())

type JobArgs struct {
testutil.JobArgsReflectKind[JobArgs]
Expand Down Expand Up @@ -8117,7 +8117,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Parallel()

archetype := riversharedtest.BaseServiceArchetype(t)
archetype.Time.StubNowUTC(time.Now().UTC())
archetype.Time.StubNow(time.Now().UTC())

uniqueOpts := UniqueOpts{
ByArgs: true,
Expand Down Expand Up @@ -8147,7 +8147,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Parallel()

archetype := riversharedtest.BaseServiceArchetype(t)
archetype.Time.StubNowUTC(time.Now().UTC())
archetype.Time.StubNow(time.Now().UTC())

states := []rivertype.JobState{
rivertype.JobStateAvailable,
Expand Down Expand Up @@ -8465,7 +8465,7 @@ func TestUniqueOpts(t *testing.T) {
// the current time, but make sure it's nicened up a little to be
// roughly in the middle of the hour and well clear of any period
// boundaries.
client.baseService.Time.StubNowUTC(
client.baseService.Time.StubNow(
time.Now().Truncate(1 * time.Hour).Add(37*time.Minute + 23*time.Second + 123*time.Millisecond).UTC(),
)

Expand Down
2 changes: 1 addition & 1 deletion internal/dbunique/db_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func buildUniqueKeyString(timeGen rivertype.TimeGenerator, uniqueOpts *UniqueOpt
}

if uniqueOpts.ByPeriod != time.Duration(0) {
lowerPeriodBound := ptrutil.ValOrDefaultFunc(params.ScheduledAt, timeGen.NowUTC).Truncate(uniqueOpts.ByPeriod)
lowerPeriodBound := ptrutil.ValOrDefaultFunc(params.ScheduledAt, timeGen.Now).Truncate(uniqueOpts.ByPeriod)
sb.WriteString("&period=" + lowerPeriodBound.Format(time.RFC3339))
}

Expand Down
2 changes: 1 addition & 1 deletion internal/dbunique/db_unique_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestUniqueKey(t *testing.T) {
// Fixed timestamp for consistency across tests:
now := time.Now().UTC()
stubSvc := &riversharedtest.TimeStub{}
stubSvc.StubNowUTC(now)
stubSvc.StubNow(now)

tests := []struct {
name string
Expand Down
16 changes: 8 additions & 8 deletions internal/jobcompleter/job_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ func (c *InlineCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobst
c.wg.Add(1)
defer c.wg.Done()

start := c.Time.NowUTC()
start := c.Time.Now()

jobs, err := withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) {
jobs, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, setStateParamsToMany(c.Time.NowUTCOrNil(), c.schema, params))
jobs, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, setStateParamsToMany(c.Time.NowOrNil(), c.schema, params))
if err != nil {
return nil, err
}
Expand All @@ -92,7 +92,7 @@ func (c *InlineCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobst
return err
}

stats.CompleteDuration = c.Time.NowUTC().Sub(start)
stats.CompleteDuration = c.Time.Now().Sub(start)
c.subscribeCh <- []CompleterJobUpdated{{
Job: jobs[0],
JobStats: stats,
Expand Down Expand Up @@ -185,11 +185,11 @@ func newAsyncCompleterWithConcurrency(archetype *baseservice.Archetype, schema s
func (c *AsyncCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error {
// Start clock outside of goroutine so that the time spent blocking waiting
// for an errgroup slot is accurately measured.
start := c.Time.NowUTC()
start := c.Time.Now()

c.errGroup.Go(func() error {
jobs, err := withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) {
rows, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, setStateParamsToMany(c.Time.NowUTCOrNil(), c.schema, params))
rows, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, setStateParamsToMany(c.Time.NowOrNil(), c.schema, params))
if err != nil {
return nil, err
}
Expand All @@ -200,7 +200,7 @@ func (c *AsyncCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobsta
return err
}

stats.CompleteDuration = c.Time.NowUTC().Sub(start)
stats.CompleteDuration = c.Time.Now().Sub(start)
c.subscribeCh <- []CompleterJobUpdated{{
Job: jobs[0],
JobStats: stats,
Expand Down Expand Up @@ -479,7 +479,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
events := sliceutil.Map(jobRows, func(jobRow *rivertype.JobRow) CompleterJobUpdated {
setState := setStateBatch[jobRow.ID]
startTime := setStateStartTimes[jobRow.ID]
setState.Stats.CompleteDuration = c.Time.NowUTC().Sub(startTime)
setState.Stats.CompleteDuration = c.Time.Now().Sub(startTime)
return CompleterJobUpdated{
Job: jobRow,
JobStats: setState.Stats,
Expand All @@ -504,7 +504,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
}

func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error {
now := c.Time.NowUTC()
now := c.Time.Now()
// If we've built up too much of a backlog because the completer's fallen
// behind, block completions until the complete loop's had a chance to catch
// up.
Expand Down
12 changes: 6 additions & 6 deletions internal/jobexecutor/job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (e *JobExecutor) Execute(ctx context.Context) {
// Ensure that the context is cancelled no matter what, or it will leak:
defer e.CancelFunc(errExecutorDefaultCancel)

e.start = e.Time.NowUTC()
e.start = e.Time.Now()
e.stats = &jobstats.JobStatistics{
QueueWaitDuration: e.start.Sub(e.JobRow.ScheduledAt),
}
Expand Down Expand Up @@ -196,7 +196,7 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
PanicVal: recovery,
}
}
e.stats.RunDuration = e.Time.NowUTC().Sub(e.start)
e.stats.RunDuration = e.Time.Now().Sub(e.start)
}()

if e.WorkUnit == nil {
Expand Down Expand Up @@ -385,7 +385,7 @@ func (e *JobExecutor) reportResult(ctx context.Context, jobRow *rivertype.JobRow
// so we instead make the job immediately `available` if the snooze time is
// smaller than the scheduler's run interval.
var params *riverdriver.JobSetStateIfRunningParams
if nextAttemptScheduledAt.Sub(e.Time.NowUTC()) <= e.SchedulerInterval {
if nextAttemptScheduledAt.Sub(e.Time.Now()) <= e.SchedulerInterval {
params = riverdriver.JobSetStateSnoozedAvailable(jobRow.ID, nextAttemptScheduledAt, jobRow.Attempt-1, metadataUpdatesBytes)
} else {
params = riverdriver.JobSetStateSnoozed(jobRow.ID, nextAttemptScheduledAt, jobRow.Attempt-1, metadataUpdatesBytes)
Expand All @@ -409,7 +409,7 @@ func (e *JobExecutor) reportResult(ctx context.Context, jobRow *rivertype.JobRow
return
}

if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCompleted(jobRow.ID, e.Time.NowUTC(), metadataUpdatesBytes)); err != nil {
if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCompleted(jobRow.ID, e.Time.Now(), metadataUpdatesBytes)); err != nil {
e.Logger.ErrorContext(ctx, e.Name+": Error completing job",
slog.String("err", err.Error()),
slog.Int64("job_id", jobRow.ID),
Expand Down Expand Up @@ -462,7 +462,7 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow,
return
}

now := e.Time.NowUTC()
now := e.Time.Now()

if cancelJob {
if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCancelled(jobRow.ID, now, errData, metadataUpdates)); err != nil {
Expand Down Expand Up @@ -502,7 +502,7 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow,
// respected. Here, we offset that with a branch that makes jobs immediately
// `available` if their retry was smaller than the scheduler's run interval.
var params *riverdriver.JobSetStateIfRunningParams
if nextRetryScheduledAt.Sub(e.Time.NowUTC()) <= e.SchedulerInterval {
if nextRetryScheduledAt.Sub(e.Time.Now()) <= e.SchedulerInterval {
params = riverdriver.JobSetStateErrorAvailable(jobRow.ID, nextRetryScheduledAt, errData, metadataUpdates)
} else {
params = riverdriver.JobSetStateErrorRetryable(jobRow.ID, nextRetryScheduledAt, errData, metadataUpdates)
Expand Down
2 changes: 1 addition & 1 deletion internal/jobexecutor/job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func TestJobExecutor_Execute(t *testing.T) {

executor, bundle := setup(t)

now := executor.Time.StubNowUTC(time.Now().UTC())
now := executor.Time.StubNow(time.Now().UTC())

workerErr := errors.New("job error")
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow)
Expand Down
4 changes: 2 additions & 2 deletions internal/leadership/elector.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error {

elected, err := attemptElectOrReelect(ctx, e.exec, false, &riverdriver.LeaderElectParams{
LeaderID: e.config.ClientID,
Now: e.Time.NowUTCOrNil(),
Now: e.Time.NowOrNil(),
Schema: e.config.Schema,
TTL: e.leaderTTL(),
})
Expand Down Expand Up @@ -391,7 +391,7 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error {

reelected, err := attemptElectOrReelect(ctx, e.exec, true, &riverdriver.LeaderElectParams{
LeaderID: e.config.ClientID,
Now: e.Time.NowUTCOrNil(),
Now: e.Time.NowOrNil(),
Schema: e.config.Schema,
TTL: e.leaderTTL(),
})
Expand Down
4 changes: 2 additions & 2 deletions internal/maintenance/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er
}
defer dbutil.RollbackWithoutCancel(ctx, execTx)

now := s.Time.NowUTC()
now := s.Time.Now()
nowWithLookAhead := now.Add(s.config.Interval)

scheduledJobResults, err := execTx.JobSchedule(ctx, &riverdriver.JobScheduleParams{
Expand All @@ -194,7 +194,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er
// slightly in the future (this loop, the notify, and tx commit will take
// a small amount of time). This isn't going to be perfect, but the goal
// is to roughly try to guess when the clients will attempt to fetch jobs.
notificationHorizon := s.Time.NowUTC().Add(5 * time.Millisecond)
notificationHorizon := s.Time.Now().Add(5 * time.Millisecond)

for _, result := range scheduledJobResults {
if result.Job.ScheduledAt.After(notificationHorizon) {
Expand Down
10 changes: 5 additions & 5 deletions internal/maintenance/periodic_job_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {

var (
insertParamsMany []*rivertype.JobInsertParams
now = s.Time.NowUTC()
now = s.Time.Now()
periodicJobUpsertParams = &riverpilot.PeriodicJobUpsertManyParams{Schema: s.Config.Schema}
)

Expand Down Expand Up @@ -417,7 +417,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
periodicJobUpsertParams.Jobs = append(periodicJobUpsertParams.Jobs, &riverpilot.PeriodicJobUpsertParams{
ID: periodicJob.ID,
NextRunAt: periodicJob.nextRunAt,
UpdatedAt: s.Time.NowUTC(),
UpdatedAt: s.Time.Now(),
})
}

Expand Down Expand Up @@ -452,7 +452,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
periodicJobUpsertParams = &riverpilot.PeriodicJobUpsertManyParams{Schema: s.Config.Schema}
)

now := s.Time.NowUTC()
now := s.Time.Now()

// Add a small margin to the current time so we're not only
// running jobs that are already ready, but also ones ready at
Expand Down Expand Up @@ -482,7 +482,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
periodicJobUpsertParams.Jobs = append(periodicJobUpsertParams.Jobs, &riverpilot.PeriodicJobUpsertParams{
ID: periodicJob.ID,
NextRunAt: periodicJob.nextRunAt,
UpdatedAt: s.Time.NowUTC(),
UpdatedAt: s.Time.Now(),
})
}
}
Expand Down Expand Up @@ -642,7 +642,7 @@ func (s *PeriodicJobEnqueuer) timeUntilNextRun() time.Duration {

var (
firstNextRunAt time.Time
now = s.Time.NowUTC()
now = s.Time.Now()
)

for _, periodicJob := range s.periodicJobs {
Expand Down
Loading
Loading