From 0b7f79b9ff7173ea09358f61cf8e96a15eb102c2 Mon Sep 17 00:00:00 2001 From: Ignat Tubylov Date: Tue, 21 Oct 2025 10:00:12 +0200 Subject: [PATCH 1/7] Introduce Task Processor optimization Signed-off-by: Ignat Tubylov --- .../dynamicproperties/constants.go | 7 ++ config/development.yaml | 2 +- service/history/common/type.go | 4 + service/history/config/config.go | 2 + service/history/config/config_test.go | 1 + service/history/execution/context.go | 1 + service/history/execution/context_test.go | 2 + service/history/queuev2/queue_base.go | 8 ++ service/history/queuev2/queue_scheduled.go | 39 ++++++- .../history/queuev2/queue_scheduled_test.go | 1 + service/history/queuev2/virtual_queue.go | 107 ++++++++++++++---- .../history/queuev2/virtual_queue_manager.go | 21 ++++ .../queuev2/virtual_queue_manager_mock.go | 29 +++++ service/history/queuev2/virtual_queue_mock.go | 28 +++++ service/history/queuev2/virtual_slice.go | 5 + service/history/queuev2/virtual_slice_mock.go | 12 ++ service/history/shard/context.go | 28 +++-- service/sharddistributor/store/etcd/go.mod | 1 + 18 files changed, 258 insertions(+), 40 deletions(-) diff --git a/common/dynamicconfig/dynamicproperties/constants.go b/common/dynamicconfig/dynamicproperties/constants.go index 255a4b7d01f..4bd196c2ff9 100644 --- a/common/dynamicconfig/dynamicproperties/constants.go +++ b/common/dynamicconfig/dynamicproperties/constants.go @@ -2113,6 +2113,7 @@ const ( EnableTimerQueueV2 EnableTransferQueueV2PendingTaskCountAlert EnableTimerQueueV2PendingTaskCountAlert + EnableTimerProcessorInMemoryQueue // LastBoolKey must be the last one in this const group LastBoolKey @@ -4680,6 +4681,12 @@ var BoolKeys = map[BoolKey]DynamicBool{ Filters: []Filter{ShardID}, DefaultValue: false, }, + EnableTimerProcessorInMemoryQueue: { + KeyName: "history.enableTimerProcessorInMemoryQueue", + Description: "EnableTimerProcessorInMemoryQueue is the flag to enable in-memory queue for timer processor", + Filters: []Filter{ShardID}, + DefaultValue: false, + }, } var FloatKeys = map[FloatKey]DynamicFloat{ diff --git a/config/development.yaml b/config/development.yaml index cdb3886ce03..75b5ee889ce 100644 --- a/config/development.yaml +++ b/config/development.yaml @@ -172,4 +172,4 @@ shardDistribution: prefix: "store" process: period: 1s - heartbeatTTL: 2s + heartbeatTTL: 2s \ No newline at end of file diff --git a/service/history/common/type.go b/service/history/common/type.go index def10db08d0..42b74d7acd1 100644 --- a/service/history/common/type.go +++ b/service/history/common/type.go @@ -36,5 +36,9 @@ type ( Activities map[int64]*persistence.ActivityInfo History events.PersistedBlobs PersistenceError bool + + // if true, the task will be scheduled in memory for the current execution, otherwise + // it will only be scheduled after the next DB scan + ScheduleInMemory bool } ) diff --git a/service/history/config/config.go b/service/history/config/config.go index d39933d0d01..464636d739b 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -154,6 +154,7 @@ type Config struct { TimerProcessorSplitQueueIntervalJitterCoefficient dynamicproperties.FloatPropertyFn TimerProcessorMaxRedispatchQueueSize dynamicproperties.IntPropertyFn TimerProcessorMaxTimeShift dynamicproperties.DurationPropertyFn + EnableTimerProcessorInMemoryQueue dynamicproperties.BoolPropertyFnWithShardIDFilter TimerProcessorHistoryArchivalSizeLimit dynamicproperties.IntPropertyFn TimerProcessorArchivalTimeLimit dynamicproperties.DurationPropertyFn DisableTimerFailoverQueue dynamicproperties.BoolPropertyFn @@ -453,6 +454,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, i TimerProcessorSplitQueueIntervalJitterCoefficient: dc.GetFloat64Property(dynamicproperties.TimerProcessorSplitQueueIntervalJitterCoefficient), TimerProcessorMaxRedispatchQueueSize: dc.GetIntProperty(dynamicproperties.TimerProcessorMaxRedispatchQueueSize), TimerProcessorMaxTimeShift: dc.GetDurationProperty(dynamicproperties.TimerProcessorMaxTimeShift), + EnableTimerProcessorInMemoryQueue: dc.GetBoolPropertyFilteredByShardID(dynamicproperties.EnableTimerProcessorInMemoryQueue), TimerProcessorHistoryArchivalSizeLimit: dc.GetIntProperty(dynamicproperties.TimerProcessorHistoryArchivalSizeLimit), TimerProcessorArchivalTimeLimit: dc.GetDurationProperty(dynamicproperties.TimerProcessorArchivalTimeLimit), DisableTimerFailoverQueue: dc.GetBoolProperty(dynamicproperties.DisableTimerFailoverQueue), diff --git a/service/history/config/config_test.go b/service/history/config/config_test.go index 3c498f5c72c..d74d6f58a44 100644 --- a/service/history/config/config_test.go +++ b/service/history/config/config_test.go @@ -278,6 +278,7 @@ func TestNewConfig(t *testing.T) { "QueueMaxVirtualQueueCount": {dynamicproperties.QueueMaxVirtualQueueCount, 101}, "VirtualSliceForceAppendInterval": {dynamicproperties.VirtualSliceForceAppendInterval, time.Second}, "ReplicationTaskProcessorLatencyLogThreshold": {dynamicproperties.ReplicationTaskProcessorLatencyLogThreshold, time.Duration(0)}, + "EnableTimerProcessorInMemoryQueue": {dynamicproperties.EnableTimerProcessorInMemoryQueue, false}, } client := dynamicconfig.NewInMemoryClient() for fieldName, expected := range fields { diff --git a/service/history/execution/context.go b/service/history/execution/context.go index 8b3e00492ab..720a44358c7 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -992,6 +992,7 @@ func notifyTasks( ExecutionInfo: executionInfo, Tasks: tasksByCategory[persistence.HistoryTaskCategoryTimer], PersistenceError: persistenceError, + ScheduleInMemory: true, } replicationTaskInfo := &hcommon.NotifyTaskInfo{ ExecutionInfo: executionInfo, diff --git a/service/history/execution/context_test.go b/service/history/execution/context_test.go index 3303cc7cb69..31143ccd61c 100644 --- a/service/history/execution/context_test.go +++ b/service/history/execution/context_test.go @@ -289,6 +289,7 @@ func TestNotifyTasksFromWorkflowSnapshot(t *testing.T) { }, }, PersistenceError: true, + ScheduleInMemory: true, }) mockEngine.EXPECT().NotifyNewReplicationTasks(&hcommon.NotifyTaskInfo{ ExecutionInfo: &persistence.WorkflowExecutionInfo{ @@ -419,6 +420,7 @@ func TestNotifyTasksFromWorkflowMutation(t *testing.T) { }, }, PersistenceError: true, + ScheduleInMemory: true, }) mockEngine.EXPECT().NotifyNewReplicationTasks(&hcommon.NotifyTaskInfo{ ExecutionInfo: &persistence.WorkflowExecutionInfo{ diff --git a/service/history/queuev2/queue_base.go b/service/history/queuev2/queue_base.go index c45e00bcc2f..614c633a395 100644 --- a/service/history/queuev2/queue_base.go +++ b/service/history/queuev2/queue_base.go @@ -274,6 +274,14 @@ func (q *queueBase) processNewTasks() bool { return true } +func (q *queueBase) insertSingleTask(task task.Task) bool { + return q.virtualQueueManager.InsertSingleTaskToRootQueue(task) +} + +func (q *queueBase) removeScheduledTasksAfter(t time.Time) { + q.virtualQueueManager.RemoveScheduledTasksAfter(t) +} + func (q *queueBase) updateQueueState(ctx context.Context) { q.metricsScope.IncCounter(metrics.AckLevelUpdateCounter) queueState := &QueueState{ diff --git a/service/history/queuev2/queue_scheduled.go b/service/history/queuev2/queue_scheduled.go index fff1d0bef7d..547f2fea2c9 100644 --- a/service/history/queuev2/queue_scheduled.go +++ b/service/history/queuev2/queue_scheduled.go @@ -143,15 +143,42 @@ func (q *scheduledQueue) NotifyNewTask(clusterName string, info *hcommon.NotifyT return } - nextTime := info.Tasks[0].GetVisibilityTimestamp() - for i := 1; i < numTasks; i++ { - ts := info.Tasks[i].GetVisibilityTimestamp() - if ts.Before(nextTime) { - nextTime = ts + q.base.logger.Debug( + "New timer task notification received", + tag.Dynamic("numTasks", numTasks), + tag.Dynamic("scheduleInMemory", info.ScheduleInMemory), + tag.Dynamic("persistenceError", info.PersistenceError), + tag.Dynamic("shardId", q.base.shard.GetShardID()), + ) + + tasksToBeReadFromDB := make([]persistence.Task, 0) + + if info.ScheduleInMemory && !info.PersistenceError { + for _, task := range info.Tasks { + ts := task.GetVisibilityTimestamp() + q.base.logger.Debug("Submitting task to an in-memory queue", tag.Dynamic("scheduledTime", ts), tag.Dynamic("shardId", q.base.shard.GetShardID())) + + if !q.base.insertSingleTask(q.base.taskInitializer(task)) { + tasksToBeReadFromDB = append(tasksToBeReadFromDB, task) + } } + } else { + tasksToBeReadFromDB = info.Tasks + } + + var nextReadTime time.Time + for _, task := range tasksToBeReadFromDB { + ts := task.GetVisibilityTimestamp() + if nextReadTime.IsZero() || ts.Before(nextReadTime) { + nextReadTime = ts + } + } + + if !nextReadTime.IsZero() { + q.base.removeScheduledTasksAfter(nextReadTime) + q.notify(nextReadTime) } - q.notify(nextTime) q.base.metricsScope.AddCounter(metrics.NewHistoryTaskCounter, int64(numTasks)) } diff --git a/service/history/queuev2/queue_scheduled_test.go b/service/history/queuev2/queue_scheduled_test.go index cc96ac7588a..ce3477d99a1 100644 --- a/service/history/queuev2/queue_scheduled_test.go +++ b/service/history/queuev2/queue_scheduled_test.go @@ -37,6 +37,7 @@ func TestScheduledQueue_LifeCycle(t *testing.T) { mockExecutionManager := persistence.NewMockExecutionManager(ctrl) // Setup mock expectations + mockShard.EXPECT().GetShardID().Return(1).AnyTimes() mockShard.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).AnyTimes() mockShard.EXPECT().GetTimeSource().Return(mockTimeSource).AnyTimes() mockShard.EXPECT().GetQueueState(persistence.HistoryTaskCategoryTimer).Return(&types.QueueState{ diff --git a/service/history/queuev2/virtual_queue.go b/service/history/queuev2/virtual_queue.go index 06ce7863138..13dea4a5ce7 100644 --- a/service/history/queuev2/virtual_queue.go +++ b/service/history/queuev2/virtual_queue.go @@ -67,6 +67,10 @@ type ( SplitSlices(func(VirtualSlice) (remaining []VirtualSlice, split bool)) // Pause pauses the virtual queue for a while Pause(time.Duration) + // InsertSingleTask inserts a single task to the virtual queue. Return false if the task's timestamp is out of range of the current queue slice.. + InsertSingleTask(task task.Task) bool + // RemoveScheduledTasksAfter removes the scheduled tasks after the given time + RemoveScheduledTasksAfter(time.Time) } VirtualQueueOptions struct { @@ -394,24 +398,7 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() { now := q.timeSource.Now() for _, task := range tasks { - if persistence.IsTaskCorrupted(task) { - q.logger.Error("Virtual queue encountered a corrupted task", tag.Dynamic("task", task)) - q.metricsScope.IncCounter(metrics.CorruptedHistoryTaskCounter) - task.Ack() - continue - } - - scheduledTime := task.GetTaskKey().GetScheduledTime() - // if the scheduled time is in the future, we need to reschedule the task - if now.Before(scheduledTime) { - q.rescheduler.RescheduleTask(task, scheduledTime) - continue - } - // shard level metrics for the duration between a task being written to a queue and being fetched from it - q.metricsScope.RecordHistogramDuration(metrics.TaskEnqueueToFetchLatency, now.Sub(task.GetVisibilityTimestamp())) - task.SetInitialSubmitTime(now) - submitted, err := q.processor.TrySubmit(task) - if err != nil { + if err := q.submitTask(now, task); err != nil { select { case <-q.ctx.Done(): return @@ -419,10 +406,6 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() { q.logger.Error("Virtual queue failed to submit task", tag.Error(err)) } } - if !submitted { - q.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter) - q.rescheduler.RescheduleTask(task, q.timeSource.Now().Add(taskSchedulerThrottleBackoffInterval)) - } } if sliceToRead.HasMoreTasks() { @@ -436,6 +419,86 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() { } } +func (q *virtualQueueImpl) InsertSingleTask(task task.Task) bool { + q.Lock() + defer q.Unlock() + + taskKey := task.GetTaskKey() + var slice VirtualSlice + + for e := q.virtualSlices.Front(); e != nil; e = e.Next() { + s := e.Value.(VirtualSlice) + r := s.GetState().Range + if taskKey.Compare(r.InclusiveMinTaskKey) >= 0 && taskKey.Compare(r.ExclusiveMaxTaskKey) < 0 { + slice = s + break + } + } + + if slice == nil { + // the new task is outside of the current range, it will be read from the DB on the next poll + return false + } + + slice.InsertTask(task) + q.monitor.SetSlicePendingTaskCount(slice, slice.GetPendingTaskCount()) + + now := q.timeSource.Now() + if err := q.submitTask(now, task); err != nil { + q.logger.Error("Virtual queue failed to submit task", tag.Error(err)) + return false + } + + return true +} + +func (q *virtualQueueImpl) RemoveScheduledTasksAfter(t time.Time) { + q.Lock() + defer q.Unlock() + + for e := q.virtualSlices.Front(); e != nil; e = e.Next() { + s := e.Value.(VirtualSlice) + r := s.GetState().Range + + if t.Before(r.InclusiveMinTaskKey.GetScheduledTime()) { + continue + } + + // TODO: remove scheduled tasks from virtual slices + q.monitor.SetSlicePendingTaskCount(s, s.GetPendingTaskCount()) + } +} + +func (q *virtualQueueImpl) submitTask(now time.Time, task task.Task) error { + if persistence.IsTaskCorrupted(task) { + q.logger.Error("Virtual queue encountered a corrupted task", tag.Dynamic("task", task)) + q.metricsScope.IncCounter(metrics.CorruptedHistoryTaskCounter) + task.Ack() + return nil + } + + scheduledTime := task.GetTaskKey().GetScheduledTime() + // if the scheduled time is in the future, we need to reschedule the task + if now.Before(scheduledTime) { + q.rescheduler.RescheduleTask(task, scheduledTime) + return nil + } + // shard level metrics for the duration between a task being written to a queue and being fetched from it + q.metricsScope.RecordHistogramDuration(metrics.TaskEnqueueToFetchLatency, now.Sub(task.GetVisibilityTimestamp())) + task.SetInitialSubmitTime(now) + submitted, err := q.processor.TrySubmit(task) + if err != nil { + return err + } + + if !submitted { + q.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter) + q.rescheduler.RescheduleTask(task, q.timeSource.Now().Add(taskSchedulerThrottleBackoffInterval)) + } + + return nil +} + func (q *virtualQueueImpl) resetNextReadSliceLocked() { q.sliceToRead = nil for element := q.virtualSlices.Front(); element != nil; element = element.Next() { diff --git a/service/history/queuev2/virtual_queue_manager.go b/service/history/queuev2/virtual_queue_manager.go index 5629c12d488..f734cf7332a 100644 --- a/service/history/queuev2/virtual_queue_manager.go +++ b/service/history/queuev2/virtual_queue_manager.go @@ -56,6 +56,9 @@ type ( // Add a new virtual slice to the root queue. This is used when new tasks are generated and max read level is updated. // By default, all new tasks belong to the root queue, so we need to add a new virtual slice to the root queue. AddNewVirtualSliceToRootQueue(VirtualSlice) + // Insert a single task to the current slice. Return false if the task's timestamp is out of range of the current slice. + InsertSingleTaskToRootQueue(task.Task) bool + RemoveScheduledTasksAfter(time.Time) } virtualQueueManagerImpl struct { @@ -218,6 +221,24 @@ func (m *virtualQueueManagerImpl) AddNewVirtualSliceToRootQueue(s VirtualSlice) m.virtualQueues[rootQueueID].Start() } +func (m *virtualQueueManagerImpl) InsertSingleTaskToRootQueue(t task.Task) bool { + m.Lock() + defer m.Unlock() + if vq, ok := m.virtualQueues[rootQueueID]; ok { + return vq.InsertSingleTask(t) + } + + // if a root queue is not created yet, no need to schedule an incoming task, it will be read from the slice + return false +} + +func (m *virtualQueueManagerImpl) RemoveScheduledTasksAfter(t time.Time) { + m.Lock() + defer m.Unlock() + for _, vq := range m.virtualQueues { + vq.RemoveScheduledTasksAfter(t) + } +} func (m *virtualQueueManagerImpl) appendOrMergeSlice(vq VirtualQueue, s VirtualSlice) { now := m.timeSource.Now() newVirtualSliceState := s.GetState() diff --git a/service/history/queuev2/virtual_queue_manager_mock.go b/service/history/queuev2/virtual_queue_manager_mock.go index 62771415933..21a595afa87 100644 --- a/service/history/queuev2/virtual_queue_manager_mock.go +++ b/service/history/queuev2/virtual_queue_manager_mock.go @@ -11,8 +11,11 @@ package queuev2 import ( reflect "reflect" + time "time" gomock "go.uber.org/mock/gomock" + + task "github.com/uber/cadence/service/history/task" ) // MockVirtualQueueManager is a mock of VirtualQueueManager interface. @@ -65,6 +68,32 @@ func (mr *MockVirtualQueueManagerMockRecorder) GetOrCreateVirtualQueue(arg0 any) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateVirtualQueue", reflect.TypeOf((*MockVirtualQueueManager)(nil).GetOrCreateVirtualQueue), arg0) } +// InsertSingleTaskToRootQueue mocks base method. +func (m *MockVirtualQueueManager) InsertSingleTaskToRootQueue(arg0 task.Task) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InsertSingleTaskToRootQueue", arg0) + ret0, _ := ret[0].(bool) + return ret0 +} + +// InsertSingleTaskToRootQueue indicates an expected call of InsertSingleTaskToRootQueue. +func (mr *MockVirtualQueueManagerMockRecorder) InsertSingleTaskToRootQueue(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertSingleTaskToRootQueue", reflect.TypeOf((*MockVirtualQueueManager)(nil).InsertSingleTaskToRootQueue), arg0) +} + +// RemoveScheduledTasksAfter mocks base method. +func (m *MockVirtualQueueManager) RemoveScheduledTasksAfter(arg0 time.Time) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RemoveScheduledTasksAfter", arg0) +} + +// RemoveScheduledTasksAfter indicates an expected call of RemoveScheduledTasksAfter. +func (mr *MockVirtualQueueManagerMockRecorder) RemoveScheduledTasksAfter(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveScheduledTasksAfter", reflect.TypeOf((*MockVirtualQueueManager)(nil).RemoveScheduledTasksAfter), arg0) +} + // Start mocks base method. func (m *MockVirtualQueueManager) Start() { m.ctrl.T.Helper() diff --git a/service/history/queuev2/virtual_queue_mock.go b/service/history/queuev2/virtual_queue_mock.go index 517a8411ce8..6873d2d6fb3 100644 --- a/service/history/queuev2/virtual_queue_mock.go +++ b/service/history/queuev2/virtual_queue_mock.go @@ -14,6 +14,8 @@ import ( time "time" gomock "go.uber.org/mock/gomock" + + task "github.com/uber/cadence/service/history/task" ) // MockVirtualQueue is a mock of VirtualQueue interface. @@ -82,6 +84,20 @@ func (mr *MockVirtualQueueMockRecorder) GetState() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetState", reflect.TypeOf((*MockVirtualQueue)(nil).GetState)) } +// InsertSingleTask mocks base method. +func (m *MockVirtualQueue) InsertSingleTask(task task.Task) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InsertSingleTask", task) + ret0, _ := ret[0].(bool) + return ret0 +} + +// InsertSingleTask indicates an expected call of InsertSingleTask. +func (mr *MockVirtualQueueMockRecorder) InsertSingleTask(task any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertSingleTask", reflect.TypeOf((*MockVirtualQueue)(nil).InsertSingleTask), task) +} + // IterateSlices mocks base method. func (m *MockVirtualQueue) IterateSlices(arg0 func(VirtualSlice)) { m.ctrl.T.Helper() @@ -134,6 +150,18 @@ func (mr *MockVirtualQueueMockRecorder) Pause(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Pause", reflect.TypeOf((*MockVirtualQueue)(nil).Pause), arg0) } +// RemoveScheduledTasksAfter mocks base method. +func (m *MockVirtualQueue) RemoveScheduledTasksAfter(arg0 time.Time) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RemoveScheduledTasksAfter", arg0) +} + +// RemoveScheduledTasksAfter indicates an expected call of RemoveScheduledTasksAfter. +func (mr *MockVirtualQueueMockRecorder) RemoveScheduledTasksAfter(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveScheduledTasksAfter", reflect.TypeOf((*MockVirtualQueue)(nil).RemoveScheduledTasksAfter), arg0) +} + // SplitSlices mocks base method. func (m *MockVirtualQueue) SplitSlices(arg0 func(VirtualSlice) ([]VirtualSlice, bool)) { m.ctrl.T.Helper() diff --git a/service/history/queuev2/virtual_slice.go b/service/history/queuev2/virtual_slice.go index 4638a017560..70530df02af 100644 --- a/service/history/queuev2/virtual_slice.go +++ b/service/history/queuev2/virtual_slice.go @@ -37,6 +37,7 @@ type ( GetState() VirtualSliceState IsEmpty() bool GetTasks(context.Context, int) ([]task.Task, error) + InsertTask(task.Task) HasMoreTasks() bool UpdateAndGetState() VirtualSliceState GetPendingTaskCount() int @@ -115,6 +116,10 @@ func (s *virtualSliceImpl) Clear() { } } +func (s *virtualSliceImpl) InsertTask(task task.Task) { + s.pendingTaskTracker.AddTask(task) +} + func (s *virtualSliceImpl) GetTasks(ctx context.Context, pageSize int) ([]task.Task, error) { if len(s.progress) == 0 { return nil, nil diff --git a/service/history/queuev2/virtual_slice_mock.go b/service/history/queuev2/virtual_slice_mock.go index ea6e7085cf5..dc623b12d3f 100644 --- a/service/history/queuev2/virtual_slice_mock.go +++ b/service/history/queuev2/virtual_slice_mock.go @@ -112,6 +112,18 @@ func (mr *MockVirtualSliceMockRecorder) HasMoreTasks() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasMoreTasks", reflect.TypeOf((*MockVirtualSlice)(nil).HasMoreTasks)) } +// InsertTask mocks base method. +func (m *MockVirtualSlice) InsertTask(arg0 task.Task) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "InsertTask", arg0) +} + +// InsertTask indicates an expected call of InsertTask. +func (mr *MockVirtualSliceMockRecorder) InsertTask(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertTask", reflect.TypeOf((*MockVirtualSlice)(nil).InsertTask), arg0) +} + // IsEmpty mocks base method. func (m *MockVirtualSlice) IsEmpty() bool { m.ctrl.T.Helper() diff --git a/service/history/shard/context.go b/service/history/shard/context.go index e16f0b274fd..9559b74d2e0 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -1327,23 +1327,29 @@ func (s *contextImpl) allocateTimerIDsLocked( } readCursorTS := s.scheduledTaskMaxReadLevelMap[cluster] + // make sure scheduled task timestamp is higher than // 1. max read level, so that queue processor can read the task back. // 2. current time. Otherwise the task timestamp is in the past and causes aritical load latency in queue processor metrics. // Above cases can happen if shard move and new host have a time SKU, // or there is db write delay, or we are simply (re-)generating tasks for an old workflow. - if ts.Before(readCursorTS) { - // This can happen if shard move and new host have a time SKU, or there is db write delay. - // We generate a new timer ID using timerMaxReadLevel. - s.logger.Warn("New timer generated is less than read level", - tag.WorkflowDomainID(domainEntry.GetInfo().ID), - tag.WorkflowID(workflowID), - tag.Timestamp(ts), - tag.CursorTimestamp(readCursorTS), - tag.ClusterName(cluster), - tag.ValueShardAllocateTimerBeforeRead) - ts = readCursorTS.Add(persistence.DBTimestampMinPrecision) + + // we need to skip this check for in-memory queue because it is going to be populated before the db is read + if !s.config.EnableTimerProcessorInMemoryQueue(s.shardID) { + if ts.Before(readCursorTS) { + // This can happen if shard move and new host have a time SKU, or there is db write delay. + // We generate a new timer ID using timerMaxReadLevel. + s.logger.Warn("New timer generated is less than read level", + tag.WorkflowDomainID(domainEntry.GetInfo().ID), + tag.WorkflowID(workflowID), + tag.Timestamp(ts), + tag.CursorTimestamp(readCursorTS), + tag.ClusterName(cluster), + tag.ValueShardAllocateTimerBeforeRead) + ts = readCursorTS.Add(persistence.DBTimestampMinPrecision) + } } + if ts.Before(now) { s.logger.Warn("New timer generated is in the past", tag.WorkflowDomainID(domainEntry.GetInfo().ID), diff --git a/service/sharddistributor/store/etcd/go.mod b/service/sharddistributor/store/etcd/go.mod index fd0fb24493d..fadf6f773ea 100644 --- a/service/sharddistributor/store/etcd/go.mod +++ b/service/sharddistributor/store/etcd/go.mod @@ -34,6 +34,7 @@ require ( github.com/golang-jwt/jwt/v5 v5.2.0 // indirect github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/google/gofuzz v1.0.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jonboulle/clockwork v0.5.0 // indirect From 16112ef0b498783e98f891b6ec68a301fe00eb8e Mon Sep 17 00:00:00 2001 From: Ignat Tubylov Date: Tue, 21 Oct 2025 11:35:19 +0200 Subject: [PATCH 2/7] Clean up Signed-off-by: Ignat Tubylov --- .../dynamicproperties/constants.go | 19 ++++++++++++------- service/history/config/config.go | 4 ++-- service/history/config/config_test.go | 2 +- service/history/queuev2/virtual_queue.go | 6 +++++- service/history/queuev2/virtual_slice.go | 10 ++++++++++ service/history/queuev2/virtual_slice_mock.go | 12 ++++++++++++ service/history/shard/context.go | 11 ++++++++--- 7 files changed, 50 insertions(+), 14 deletions(-) diff --git a/common/dynamicconfig/dynamicproperties/constants.go b/common/dynamicconfig/dynamicproperties/constants.go index 4bd196c2ff9..b6ce3b67c82 100644 --- a/common/dynamicconfig/dynamicproperties/constants.go +++ b/common/dynamicconfig/dynamicproperties/constants.go @@ -2113,7 +2113,6 @@ const ( EnableTimerQueueV2 EnableTransferQueueV2PendingTaskCountAlert EnableTimerQueueV2PendingTaskCountAlert - EnableTimerProcessorInMemoryQueue // LastBoolKey must be the last one in this const group LastBoolKey @@ -2673,6 +2672,12 @@ const ( // Default value: 1s (1*time.Second) // Allowed filters: N/A TimerProcessorMaxTimeShift + // TimerProcessorInMemoryQueueMaxTimeShift is the max shift timer processor in memory queue can have. If set to 0, in memory queue is disabled. + // KeyName: history.timerProcessorInMemoryQueueMaxTimeShift + // Value type: Duration + // Default value: 0 + // Allowed filters: ShardID + TimerProcessorInMemoryQueueMaxTimeShift // TransferProcessorFailoverMaxStartJitterInterval is the max jitter interval for starting transfer // failover queue processing. The actual jitter interval used will be a random duration between // 0 and the max interval so that timer failover queue across different shards won't start at @@ -4681,12 +4686,6 @@ var BoolKeys = map[BoolKey]DynamicBool{ Filters: []Filter{ShardID}, DefaultValue: false, }, - EnableTimerProcessorInMemoryQueue: { - KeyName: "history.enableTimerProcessorInMemoryQueue", - Description: "EnableTimerProcessorInMemoryQueue is the flag to enable in-memory queue for timer processor", - Filters: []Filter{ShardID}, - DefaultValue: false, - }, } var FloatKeys = map[FloatKey]DynamicFloat{ @@ -5205,6 +5204,12 @@ var DurationKeys = map[DurationKey]DynamicDuration{ Description: "TimerProcessorMaxTimeShift is the max shift timer processor can have", DefaultValue: time.Second, }, + TimerProcessorInMemoryQueueMaxTimeShift: { + KeyName: "history.timerProcessorInMemoryQueueMaxTimeShift", + Filters: []Filter{ShardID}, + Description: "TimerProcessorInMemoryQueueMaxTimeShift is the max shift timer processor in memory queue can have. If set to 0, in memory queue is disabled.", + DefaultValue: 0, + }, TransferProcessorFailoverMaxStartJitterInterval: { KeyName: "history.transferProcessorFailoverMaxStartJitterInterval", Description: "TransferProcessorFailoverMaxStartJitterInterval is the max jitter interval for starting transfer failover queue processing. The actual jitter interval used will be a random duration between 0 and the max interval so that timer failover queue across different shards won't start at the same time", diff --git a/service/history/config/config.go b/service/history/config/config.go index 464636d739b..0d77af885fe 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -154,7 +154,7 @@ type Config struct { TimerProcessorSplitQueueIntervalJitterCoefficient dynamicproperties.FloatPropertyFn TimerProcessorMaxRedispatchQueueSize dynamicproperties.IntPropertyFn TimerProcessorMaxTimeShift dynamicproperties.DurationPropertyFn - EnableTimerProcessorInMemoryQueue dynamicproperties.BoolPropertyFnWithShardIDFilter + TimerProcessorInMemoryQueueMaxTimeShift dynamicproperties.DurationPropertyFnWithShardIDFilter TimerProcessorHistoryArchivalSizeLimit dynamicproperties.IntPropertyFn TimerProcessorArchivalTimeLimit dynamicproperties.DurationPropertyFn DisableTimerFailoverQueue dynamicproperties.BoolPropertyFn @@ -454,7 +454,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, i TimerProcessorSplitQueueIntervalJitterCoefficient: dc.GetFloat64Property(dynamicproperties.TimerProcessorSplitQueueIntervalJitterCoefficient), TimerProcessorMaxRedispatchQueueSize: dc.GetIntProperty(dynamicproperties.TimerProcessorMaxRedispatchQueueSize), TimerProcessorMaxTimeShift: dc.GetDurationProperty(dynamicproperties.TimerProcessorMaxTimeShift), - EnableTimerProcessorInMemoryQueue: dc.GetBoolPropertyFilteredByShardID(dynamicproperties.EnableTimerProcessorInMemoryQueue), + TimerProcessorInMemoryQueueMaxTimeShift: dc.GetDurationPropertyFilteredByShardID(dynamicproperties.TimerProcessorInMemoryQueueMaxTimeShift), TimerProcessorHistoryArchivalSizeLimit: dc.GetIntProperty(dynamicproperties.TimerProcessorHistoryArchivalSizeLimit), TimerProcessorArchivalTimeLimit: dc.GetDurationProperty(dynamicproperties.TimerProcessorArchivalTimeLimit), DisableTimerFailoverQueue: dc.GetBoolProperty(dynamicproperties.DisableTimerFailoverQueue), diff --git a/service/history/config/config_test.go b/service/history/config/config_test.go index d74d6f58a44..3e7baf57962 100644 --- a/service/history/config/config_test.go +++ b/service/history/config/config_test.go @@ -139,6 +139,7 @@ func TestNewConfig(t *testing.T) { "TimerProcessorSplitQueueIntervalJitterCoefficient": {dynamicproperties.TimerProcessorSplitQueueIntervalJitterCoefficient, 4.0}, "TimerProcessorMaxRedispatchQueueSize": {dynamicproperties.TimerProcessorMaxRedispatchQueueSize, 45}, "TimerProcessorMaxTimeShift": {dynamicproperties.TimerProcessorMaxTimeShift, time.Second}, + "TimerProcessorInMemoryQueueMaxTimeShift": {dynamicproperties.TimerProcessorInMemoryQueueMaxTimeShift, time.Duration(0)}, "TimerProcessorHistoryArchivalSizeLimit": {dynamicproperties.TimerProcessorHistoryArchivalSizeLimit, 46}, "TimerProcessorArchivalTimeLimit": {dynamicproperties.TimerProcessorArchivalTimeLimit, time.Second}, "TransferTaskBatchSize": {dynamicproperties.TransferTaskBatchSize, 47}, @@ -278,7 +279,6 @@ func TestNewConfig(t *testing.T) { "QueueMaxVirtualQueueCount": {dynamicproperties.QueueMaxVirtualQueueCount, 101}, "VirtualSliceForceAppendInterval": {dynamicproperties.VirtualSliceForceAppendInterval, time.Second}, "ReplicationTaskProcessorLatencyLogThreshold": {dynamicproperties.ReplicationTaskProcessorLatencyLogThreshold, time.Duration(0)}, - "EnableTimerProcessorInMemoryQueue": {dynamicproperties.EnableTimerProcessorInMemoryQueue, false}, } client := dynamicconfig.NewInMemoryClient() for fieldName, expected := range fields { diff --git a/service/history/queuev2/virtual_queue.go b/service/history/queuev2/virtual_queue.go index 13dea4a5ce7..b07b7186bae 100644 --- a/service/history/queuev2/virtual_queue.go +++ b/service/history/queuev2/virtual_queue.go @@ -464,7 +464,11 @@ func (q *virtualQueueImpl) RemoveScheduledTasksAfter(t time.Time) { continue } - // TODO: remove scheduled tasks from virtual slices + s.CancelTasks(func(task task.Task) bool { + taskTime := task.GetTaskKey().GetScheduledTime() + return taskTime.After(t) || taskTime.Equal(t) + }) + q.monitor.SetSlicePendingTaskCount(s, s.GetPendingTaskCount()) } } diff --git a/service/history/queuev2/virtual_slice.go b/service/history/queuev2/virtual_slice.go index 70530df02af..9cac693e9dc 100644 --- a/service/history/queuev2/virtual_slice.go +++ b/service/history/queuev2/virtual_slice.go @@ -43,6 +43,7 @@ type ( GetPendingTaskCount() int Clear() PendingTaskStats() PendingTaskStats + CancelTasks(predicate func(task.Task) bool) TrySplitByTaskKey(persistence.HistoryTaskKey) (VirtualSlice, VirtualSlice, bool) TrySplitByPredicate(Predicate) (VirtualSlice, VirtualSlice, bool) @@ -196,6 +197,15 @@ func (s *virtualSliceImpl) UpdateAndGetState() VirtualSliceState { return s.state } +func (s *virtualSliceImpl) CancelTasks(predicate func(task.Task) bool) { + taskMap := s.pendingTaskTracker.GetTasks() + for _, task := range taskMap { + if predicate(task) { + task.Cancel() + } + } +} + func (s *virtualSliceImpl) TrySplitByTaskKey(taskKey persistence.HistoryTaskKey) (VirtualSlice, VirtualSlice, bool) { leftState, rightState, ok := s.state.TrySplitByTaskKey(taskKey) if !ok { diff --git a/service/history/queuev2/virtual_slice_mock.go b/service/history/queuev2/virtual_slice_mock.go index dc623b12d3f..26de2b7cc9f 100644 --- a/service/history/queuev2/virtual_slice_mock.go +++ b/service/history/queuev2/virtual_slice_mock.go @@ -43,6 +43,18 @@ func (m *MockVirtualSlice) EXPECT() *MockVirtualSliceMockRecorder { return m.recorder } +// CancelTasks mocks base method. +func (m *MockVirtualSlice) CancelTasks(predicate func(task.Task) bool) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "CancelTasks", predicate) +} + +// CancelTasks indicates an expected call of CancelTasks. +func (mr *MockVirtualSliceMockRecorder) CancelTasks(predicate any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CancelTasks", reflect.TypeOf((*MockVirtualSlice)(nil).CancelTasks), predicate) +} + // Clear mocks base method. func (m *MockVirtualSlice) Clear() { m.ctrl.T.Helper() diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 9559b74d2e0..a871c9c3917 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -246,7 +246,12 @@ func (s *contextImpl) updateScheduledTaskMaxReadLevel(cluster string) persistenc currentTime = s.remoteClusterCurrentTime[cluster] } - newMaxReadLevel := currentTime.Add(s.config.TimerProcessorMaxTimeShift()).Truncate(persistence.DBTimestampMinPrecision) + maxTimeShift := s.config.TimerProcessorMaxTimeShift() + if s.config.TimerProcessorInMemoryQueueMaxTimeShift(s.shardID) > 0 { + maxTimeShift = s.config.TimerProcessorInMemoryQueueMaxTimeShift(s.shardID) + } + + newMaxReadLevel := currentTime.Add(maxTimeShift).Truncate(persistence.DBTimestampMinPrecision) if newMaxReadLevel.After(s.scheduledTaskMaxReadLevelMap[cluster]) { s.scheduledTaskMaxReadLevelMap[cluster] = newMaxReadLevel } @@ -1334,8 +1339,8 @@ func (s *contextImpl) allocateTimerIDsLocked( // Above cases can happen if shard move and new host have a time SKU, // or there is db write delay, or we are simply (re-)generating tasks for an old workflow. - // we need to skip this check for in-memory queue because it is going to be populated before the db is read - if !s.config.EnableTimerProcessorInMemoryQueue(s.shardID) { + if s.config.TimerProcessorInMemoryQueueMaxTimeShift(s.shardID) == 0 { + // this check is only required when an in memory queue is disabled. If it's enabled, we expect timers below read level to be enqueued in memory if ts.Before(readCursorTS) { // This can happen if shard move and new host have a time SKU, or there is db write delay. // We generate a new timer ID using timerMaxReadLevel. From 06e955300282681a2461083f5623420c688ba69a Mon Sep 17 00:00:00 2001 From: Ignat Tubylov Date: Mon, 27 Oct 2025 12:36:46 +0100 Subject: [PATCH 3/7] Reset slice progress after a potentially successful write Signed-off-by: Ignat Tubylov --- service/history/queuev2/queue_base.go | 4 +-- service/history/queuev2/queue_scheduled.go | 2 +- service/history/queuev2/virtual_queue.go | 21 ++++-------- .../history/queuev2/virtual_queue_manager.go | 8 +++-- .../queuev2/virtual_queue_manager_mock.go | 4 +-- service/history/queuev2/virtual_queue_mock.go | 3 +- service/history/queuev2/virtual_slice.go | 33 +++++++++++++++++-- service/history/queuev2/virtual_slice_mock.go | 12 +++---- 8 files changed, 54 insertions(+), 33 deletions(-) diff --git a/service/history/queuev2/queue_base.go b/service/history/queuev2/queue_base.go index 614c633a395..a573d5a58ad 100644 --- a/service/history/queuev2/queue_base.go +++ b/service/history/queuev2/queue_base.go @@ -278,8 +278,8 @@ func (q *queueBase) insertSingleTask(task task.Task) bool { return q.virtualQueueManager.InsertSingleTaskToRootQueue(task) } -func (q *queueBase) removeScheduledTasksAfter(t time.Time) { - q.virtualQueueManager.RemoveScheduledTasksAfter(t) +func (q *queueBase) removeScheduledTasksAfter(key persistence.HistoryTaskKey) { + q.virtualQueueManager.RemoveScheduledTasksAfter(key) } func (q *queueBase) updateQueueState(ctx context.Context) { diff --git a/service/history/queuev2/queue_scheduled.go b/service/history/queuev2/queue_scheduled.go index 547f2fea2c9..561284a72cf 100644 --- a/service/history/queuev2/queue_scheduled.go +++ b/service/history/queuev2/queue_scheduled.go @@ -175,7 +175,7 @@ func (q *scheduledQueue) NotifyNewTask(clusterName string, info *hcommon.NotifyT } if !nextReadTime.IsZero() { - q.base.removeScheduledTasksAfter(nextReadTime) + q.base.removeScheduledTasksAfter(persistence.NewHistoryTaskKey(nextReadTime, 0)) q.notify(nextReadTime) } diff --git a/service/history/queuev2/virtual_queue.go b/service/history/queuev2/virtual_queue.go index b07b7186bae..84b0d062f01 100644 --- a/service/history/queuev2/virtual_queue.go +++ b/service/history/queuev2/virtual_queue.go @@ -70,7 +70,7 @@ type ( // InsertSingleTask inserts a single task to the virtual queue. Return false if the task's timestamp is out of range of the current queue slice.. InsertSingleTask(task task.Task) bool // RemoveScheduledTasksAfter removes the scheduled tasks after the given time - RemoveScheduledTasksAfter(time.Time) + RemoveScheduledTasksAfter(persistence.HistoryTaskKey) } VirtualQueueOptions struct { @@ -452,24 +452,14 @@ func (q *virtualQueueImpl) InsertSingleTask(task task.Task) bool { return true } -func (q *virtualQueueImpl) RemoveScheduledTasksAfter(t time.Time) { +func (q *virtualQueueImpl) RemoveScheduledTasksAfter(key persistence.HistoryTaskKey) { q.Lock() defer q.Unlock() for e := q.virtualSlices.Front(); e != nil; e = e.Next() { - s := e.Value.(VirtualSlice) - r := s.GetState().Range - - if t.Before(r.InclusiveMinTaskKey.GetScheduledTime()) { - continue - } - - s.CancelTasks(func(task task.Task) bool { - taskTime := task.GetTaskKey().GetScheduledTime() - return taskTime.After(t) || taskTime.Equal(t) - }) - - q.monitor.SetSlicePendingTaskCount(s, s.GetPendingTaskCount()) + slice := e.Value.(VirtualSlice) + slice.CancelTasksAfter(key) + q.monitor.SetSlicePendingTaskCount(slice, slice.GetPendingTaskCount()) } } @@ -477,6 +467,7 @@ func (q *virtualQueueImpl) submitTask(now time.Time, task task.Task) error { if persistence.IsTaskCorrupted(task) { q.logger.Error("Virtual queue encountered a corrupted task", tag.Dynamic("task", task)) q.metricsScope.IncCounter(metrics.CorruptedHistoryTaskCounter) + task.Ack() return nil } diff --git a/service/history/queuev2/virtual_queue_manager.go b/service/history/queuev2/virtual_queue_manager.go index f734cf7332a..c999090c207 100644 --- a/service/history/queuev2/virtual_queue_manager.go +++ b/service/history/queuev2/virtual_queue_manager.go @@ -34,6 +34,7 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/service/history/task" ) @@ -58,7 +59,7 @@ type ( AddNewVirtualSliceToRootQueue(VirtualSlice) // Insert a single task to the current slice. Return false if the task's timestamp is out of range of the current slice. InsertSingleTaskToRootQueue(task.Task) bool - RemoveScheduledTasksAfter(time.Time) + RemoveScheduledTasksAfter(persistence.HistoryTaskKey) } virtualQueueManagerImpl struct { @@ -232,13 +233,14 @@ func (m *virtualQueueManagerImpl) InsertSingleTaskToRootQueue(t task.Task) bool return false } -func (m *virtualQueueManagerImpl) RemoveScheduledTasksAfter(t time.Time) { +func (m *virtualQueueManagerImpl) RemoveScheduledTasksAfter(key persistence.HistoryTaskKey) { m.Lock() defer m.Unlock() for _, vq := range m.virtualQueues { - vq.RemoveScheduledTasksAfter(t) + vq.RemoveScheduledTasksAfter(key) } } + func (m *virtualQueueManagerImpl) appendOrMergeSlice(vq VirtualQueue, s VirtualSlice) { now := m.timeSource.Now() newVirtualSliceState := s.GetState() diff --git a/service/history/queuev2/virtual_queue_manager_mock.go b/service/history/queuev2/virtual_queue_manager_mock.go index 21a595afa87..85d41a430cd 100644 --- a/service/history/queuev2/virtual_queue_manager_mock.go +++ b/service/history/queuev2/virtual_queue_manager_mock.go @@ -11,10 +11,10 @@ package queuev2 import ( reflect "reflect" - time "time" gomock "go.uber.org/mock/gomock" + persistence "github.com/uber/cadence/common/persistence" task "github.com/uber/cadence/service/history/task" ) @@ -83,7 +83,7 @@ func (mr *MockVirtualQueueManagerMockRecorder) InsertSingleTaskToRootQueue(arg0 } // RemoveScheduledTasksAfter mocks base method. -func (m *MockVirtualQueueManager) RemoveScheduledTasksAfter(arg0 time.Time) { +func (m *MockVirtualQueueManager) RemoveScheduledTasksAfter(arg0 persistence.HistoryTaskKey) { m.ctrl.T.Helper() m.ctrl.Call(m, "RemoveScheduledTasksAfter", arg0) } diff --git a/service/history/queuev2/virtual_queue_mock.go b/service/history/queuev2/virtual_queue_mock.go index 6873d2d6fb3..c96d5ff4f68 100644 --- a/service/history/queuev2/virtual_queue_mock.go +++ b/service/history/queuev2/virtual_queue_mock.go @@ -15,6 +15,7 @@ import ( gomock "go.uber.org/mock/gomock" + persistence "github.com/uber/cadence/common/persistence" task "github.com/uber/cadence/service/history/task" ) @@ -151,7 +152,7 @@ func (mr *MockVirtualQueueMockRecorder) Pause(arg0 any) *gomock.Call { } // RemoveScheduledTasksAfter mocks base method. -func (m *MockVirtualQueue) RemoveScheduledTasksAfter(arg0 time.Time) { +func (m *MockVirtualQueue) RemoveScheduledTasksAfter(arg0 persistence.HistoryTaskKey) { m.ctrl.T.Helper() m.ctrl.Call(m, "RemoveScheduledTasksAfter", arg0) } diff --git a/service/history/queuev2/virtual_slice.go b/service/history/queuev2/virtual_slice.go index 9cac693e9dc..8291b5420ca 100644 --- a/service/history/queuev2/virtual_slice.go +++ b/service/history/queuev2/virtual_slice.go @@ -43,7 +43,7 @@ type ( GetPendingTaskCount() int Clear() PendingTaskStats() PendingTaskStats - CancelTasks(predicate func(task.Task) bool) + CancelTasksAfter(key persistence.HistoryTaskKey) TrySplitByTaskKey(persistence.HistoryTaskKey) (VirtualSlice, VirtualSlice, bool) TrySplitByPredicate(Predicate) (VirtualSlice, VirtualSlice, bool) @@ -197,13 +197,40 @@ func (s *virtualSliceImpl) UpdateAndGetState() VirtualSliceState { return s.state } -func (s *virtualSliceImpl) CancelTasks(predicate func(task.Task) bool) { +func (s *virtualSliceImpl) CancelTasksAfter(key persistence.HistoryTaskKey) { taskMap := s.pendingTaskTracker.GetTasks() for _, task := range taskMap { - if predicate(task) { + if task.GetTaskKey().Compare(key) >= 0 { task.Cancel() } } + + if len(s.progress) == 0 { + s.progress = []*GetTaskProgress{ + { + Range: Range{ + InclusiveMinTaskKey: key, + ExclusiveMaxTaskKey: s.state.Range.ExclusiveMaxTaskKey, + }, + NextPageToken: nil, + NextTaskKey: key, + }, + } + return + } + + for i, progress := range s.progress { + if progress.NextTaskKey.Compare(key) > 0 { + s.progress[i] = &GetTaskProgress{ + Range: Range{ + InclusiveMinTaskKey: key, + ExclusiveMaxTaskKey: progress.Range.ExclusiveMaxTaskKey, + }, + NextPageToken: nil, + NextTaskKey: key, + } + } + } } func (s *virtualSliceImpl) TrySplitByTaskKey(taskKey persistence.HistoryTaskKey) (VirtualSlice, VirtualSlice, bool) { diff --git a/service/history/queuev2/virtual_slice_mock.go b/service/history/queuev2/virtual_slice_mock.go index 26de2b7cc9f..2b5323b89ba 100644 --- a/service/history/queuev2/virtual_slice_mock.go +++ b/service/history/queuev2/virtual_slice_mock.go @@ -43,16 +43,16 @@ func (m *MockVirtualSlice) EXPECT() *MockVirtualSliceMockRecorder { return m.recorder } -// CancelTasks mocks base method. -func (m *MockVirtualSlice) CancelTasks(predicate func(task.Task) bool) { +// CancelTasksAfter mocks base method. +func (m *MockVirtualSlice) CancelTasksAfter(key persistence.HistoryTaskKey) { m.ctrl.T.Helper() - m.ctrl.Call(m, "CancelTasks", predicate) + m.ctrl.Call(m, "CancelTasksAfter", key) } -// CancelTasks indicates an expected call of CancelTasks. -func (mr *MockVirtualSliceMockRecorder) CancelTasks(predicate any) *gomock.Call { +// CancelTasksAfter indicates an expected call of CancelTasksAfter. +func (mr *MockVirtualSliceMockRecorder) CancelTasksAfter(key any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CancelTasks", reflect.TypeOf((*MockVirtualSlice)(nil).CancelTasks), predicate) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CancelTasksAfter", reflect.TypeOf((*MockVirtualSlice)(nil).CancelTasksAfter), key) } // Clear mocks base method. From d26d21d2fc9f5070b73cca339dd6da542fc271e3 Mon Sep 17 00:00:00 2001 From: Ignat Tubylov Date: Tue, 28 Oct 2025 18:16:28 +0100 Subject: [PATCH 4/7] after review Signed-off-by: Ignat Tubylov --- config/development.yaml | 2 +- service/history/common/type.go | 4 +++- service/history/queuev2/queue_base.go | 4 ++-- service/history/queuev2/queue_scheduled.go | 2 +- service/history/queuev2/virtual_queue.go | 20 ++++++++-------- .../history/queuev2/virtual_queue_manager.go | 6 ++--- .../queuev2/virtual_queue_manager_mock.go | 12 +++++----- service/history/queuev2/virtual_queue_mock.go | 12 +++++----- service/history/queuev2/virtual_slice.go | 7 ++++-- service/history/queuev2/virtual_slice_mock.go | 24 +++++++++---------- 10 files changed, 49 insertions(+), 44 deletions(-) diff --git a/config/development.yaml b/config/development.yaml index 75b5ee889ce..cdb3886ce03 100644 --- a/config/development.yaml +++ b/config/development.yaml @@ -172,4 +172,4 @@ shardDistribution: prefix: "store" process: period: 1s - heartbeatTTL: 2s \ No newline at end of file + heartbeatTTL: 2s diff --git a/service/history/common/type.go b/service/history/common/type.go index 42b74d7acd1..59ce485e49b 100644 --- a/service/history/common/type.go +++ b/service/history/common/type.go @@ -38,7 +38,9 @@ type ( PersistenceError bool // if true, the task will be scheduled in memory for the current execution, otherwise - // it will only be scheduled after the next DB scan + // it will only be scheduled after the next DB scan. This notification is sometimes passed with a fake + // timer with the sole purpose of resetting the next scheduled DB read, that's why sometimes we want to + // avoid scheduling the task in memory. ScheduleInMemory bool } ) diff --git a/service/history/queuev2/queue_base.go b/service/history/queuev2/queue_base.go index a573d5a58ad..e8cbb965bdf 100644 --- a/service/history/queuev2/queue_base.go +++ b/service/history/queuev2/queue_base.go @@ -278,8 +278,8 @@ func (q *queueBase) insertSingleTask(task task.Task) bool { return q.virtualQueueManager.InsertSingleTaskToRootQueue(task) } -func (q *queueBase) removeScheduledTasksAfter(key persistence.HistoryTaskKey) { - q.virtualQueueManager.RemoveScheduledTasksAfter(key) +func (q *queueBase) resetProgress(key persistence.HistoryTaskKey) { + q.virtualQueueManager.ResetProgress(key) } func (q *queueBase) updateQueueState(ctx context.Context) { diff --git a/service/history/queuev2/queue_scheduled.go b/service/history/queuev2/queue_scheduled.go index 561284a72cf..1b439c488cf 100644 --- a/service/history/queuev2/queue_scheduled.go +++ b/service/history/queuev2/queue_scheduled.go @@ -175,7 +175,7 @@ func (q *scheduledQueue) NotifyNewTask(clusterName string, info *hcommon.NotifyT } if !nextReadTime.IsZero() { - q.base.removeScheduledTasksAfter(persistence.NewHistoryTaskKey(nextReadTime, 0)) + q.base.resetProgress(persistence.NewHistoryTaskKey(nextReadTime, 0)) q.notify(nextReadTime) } diff --git a/service/history/queuev2/virtual_queue.go b/service/history/queuev2/virtual_queue.go index 84b0d062f01..3ffa506f098 100644 --- a/service/history/queuev2/virtual_queue.go +++ b/service/history/queuev2/virtual_queue.go @@ -69,8 +69,8 @@ type ( Pause(time.Duration) // InsertSingleTask inserts a single task to the virtual queue. Return false if the task's timestamp is out of range of the current queue slice.. InsertSingleTask(task task.Task) bool - // RemoveScheduledTasksAfter removes the scheduled tasks after the given time - RemoveScheduledTasksAfter(persistence.HistoryTaskKey) + // ResetProgress removes the scheduled tasks after the given time + ResetProgress(persistence.HistoryTaskKey) } VirtualQueueOptions struct { @@ -445,21 +445,24 @@ func (q *virtualQueueImpl) InsertSingleTask(task task.Task) bool { now := q.timeSource.Now() if err := q.submitTask(now, task); err != nil { - q.logger.Error("Virtual queue failed to submit task", tag.Error(err)) - return false + q.logger.Error("Error submitting task to virtual queue", tag.Error(err)) } return true } -func (q *virtualQueueImpl) RemoveScheduledTasksAfter(key persistence.HistoryTaskKey) { +func (q *virtualQueueImpl) ResetProgress(key persistence.HistoryTaskKey) { q.Lock() defer q.Unlock() for e := q.virtualSlices.Front(); e != nil; e = e.Next() { slice := e.Value.(VirtualSlice) - slice.CancelTasksAfter(key) + slice.ResetProgress(key) q.monitor.SetSlicePendingTaskCount(slice, slice.GetPendingTaskCount()) + + if e == q.sliceToRead { + break + } } } @@ -482,16 +485,13 @@ func (q *virtualQueueImpl) submitTask(now time.Time, task task.Task) error { q.metricsScope.RecordHistogramDuration(metrics.TaskEnqueueToFetchLatency, now.Sub(task.GetVisibilityTimestamp())) task.SetInitialSubmitTime(now) submitted, err := q.processor.TrySubmit(task) - if err != nil { - return err - } if !submitted { q.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter) q.rescheduler.RescheduleTask(task, q.timeSource.Now().Add(taskSchedulerThrottleBackoffInterval)) } - return nil + return err } func (q *virtualQueueImpl) resetNextReadSliceLocked() { diff --git a/service/history/queuev2/virtual_queue_manager.go b/service/history/queuev2/virtual_queue_manager.go index c999090c207..6b1fec728a7 100644 --- a/service/history/queuev2/virtual_queue_manager.go +++ b/service/history/queuev2/virtual_queue_manager.go @@ -59,7 +59,7 @@ type ( AddNewVirtualSliceToRootQueue(VirtualSlice) // Insert a single task to the current slice. Return false if the task's timestamp is out of range of the current slice. InsertSingleTaskToRootQueue(task.Task) bool - RemoveScheduledTasksAfter(persistence.HistoryTaskKey) + ResetProgress(persistence.HistoryTaskKey) } virtualQueueManagerImpl struct { @@ -233,11 +233,11 @@ func (m *virtualQueueManagerImpl) InsertSingleTaskToRootQueue(t task.Task) bool return false } -func (m *virtualQueueManagerImpl) RemoveScheduledTasksAfter(key persistence.HistoryTaskKey) { +func (m *virtualQueueManagerImpl) ResetProgress(key persistence.HistoryTaskKey) { m.Lock() defer m.Unlock() for _, vq := range m.virtualQueues { - vq.RemoveScheduledTasksAfter(key) + vq.ResetProgress(key) } } diff --git a/service/history/queuev2/virtual_queue_manager_mock.go b/service/history/queuev2/virtual_queue_manager_mock.go index 85d41a430cd..ab5fafca605 100644 --- a/service/history/queuev2/virtual_queue_manager_mock.go +++ b/service/history/queuev2/virtual_queue_manager_mock.go @@ -82,16 +82,16 @@ func (mr *MockVirtualQueueManagerMockRecorder) InsertSingleTaskToRootQueue(arg0 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertSingleTaskToRootQueue", reflect.TypeOf((*MockVirtualQueueManager)(nil).InsertSingleTaskToRootQueue), arg0) } -// RemoveScheduledTasksAfter mocks base method. -func (m *MockVirtualQueueManager) RemoveScheduledTasksAfter(arg0 persistence.HistoryTaskKey) { +// ResetProgress mocks base method. +func (m *MockVirtualQueueManager) ResetProgress(arg0 persistence.HistoryTaskKey) { m.ctrl.T.Helper() - m.ctrl.Call(m, "RemoveScheduledTasksAfter", arg0) + m.ctrl.Call(m, "ResetProgress", arg0) } -// RemoveScheduledTasksAfter indicates an expected call of RemoveScheduledTasksAfter. -func (mr *MockVirtualQueueManagerMockRecorder) RemoveScheduledTasksAfter(arg0 any) *gomock.Call { +// ResetProgress indicates an expected call of ResetProgress. +func (mr *MockVirtualQueueManagerMockRecorder) ResetProgress(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveScheduledTasksAfter", reflect.TypeOf((*MockVirtualQueueManager)(nil).RemoveScheduledTasksAfter), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetProgress", reflect.TypeOf((*MockVirtualQueueManager)(nil).ResetProgress), arg0) } // Start mocks base method. diff --git a/service/history/queuev2/virtual_queue_mock.go b/service/history/queuev2/virtual_queue_mock.go index c96d5ff4f68..70b45c83429 100644 --- a/service/history/queuev2/virtual_queue_mock.go +++ b/service/history/queuev2/virtual_queue_mock.go @@ -151,16 +151,16 @@ func (mr *MockVirtualQueueMockRecorder) Pause(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Pause", reflect.TypeOf((*MockVirtualQueue)(nil).Pause), arg0) } -// RemoveScheduledTasksAfter mocks base method. -func (m *MockVirtualQueue) RemoveScheduledTasksAfter(arg0 persistence.HistoryTaskKey) { +// ResetProgress mocks base method. +func (m *MockVirtualQueue) ResetProgress(arg0 persistence.HistoryTaskKey) { m.ctrl.T.Helper() - m.ctrl.Call(m, "RemoveScheduledTasksAfter", arg0) + m.ctrl.Call(m, "ResetProgress", arg0) } -// RemoveScheduledTasksAfter indicates an expected call of RemoveScheduledTasksAfter. -func (mr *MockVirtualQueueMockRecorder) RemoveScheduledTasksAfter(arg0 any) *gomock.Call { +// ResetProgress indicates an expected call of ResetProgress. +func (mr *MockVirtualQueueMockRecorder) ResetProgress(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveScheduledTasksAfter", reflect.TypeOf((*MockVirtualQueue)(nil).RemoveScheduledTasksAfter), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetProgress", reflect.TypeOf((*MockVirtualQueue)(nil).ResetProgress), arg0) } // SplitSlices mocks base method. diff --git a/service/history/queuev2/virtual_slice.go b/service/history/queuev2/virtual_slice.go index 8291b5420ca..4f1b31b853e 100644 --- a/service/history/queuev2/virtual_slice.go +++ b/service/history/queuev2/virtual_slice.go @@ -43,7 +43,7 @@ type ( GetPendingTaskCount() int Clear() PendingTaskStats() PendingTaskStats - CancelTasksAfter(key persistence.HistoryTaskKey) + ResetProgress(key persistence.HistoryTaskKey) TrySplitByTaskKey(persistence.HistoryTaskKey) (VirtualSlice, VirtualSlice, bool) TrySplitByPredicate(Predicate) (VirtualSlice, VirtualSlice, bool) @@ -197,7 +197,10 @@ func (s *virtualSliceImpl) UpdateAndGetState() VirtualSliceState { return s.state } -func (s *virtualSliceImpl) CancelTasksAfter(key persistence.HistoryTaskKey) { +// this function is used when we are not sure if our in-memory state after the given key is correct, +// we want to cancel all the tasks after the given key and reset the progress to the given key, +// so that in the next poll, we will read the tasks from the DB starting from the given key. +func (s *virtualSliceImpl) ResetProgress(key persistence.HistoryTaskKey) { taskMap := s.pendingTaskTracker.GetTasks() for _, task := range taskMap { if task.GetTaskKey().Compare(key) >= 0 { diff --git a/service/history/queuev2/virtual_slice_mock.go b/service/history/queuev2/virtual_slice_mock.go index 2b5323b89ba..9d04262836d 100644 --- a/service/history/queuev2/virtual_slice_mock.go +++ b/service/history/queuev2/virtual_slice_mock.go @@ -43,18 +43,6 @@ func (m *MockVirtualSlice) EXPECT() *MockVirtualSliceMockRecorder { return m.recorder } -// CancelTasksAfter mocks base method. -func (m *MockVirtualSlice) CancelTasksAfter(key persistence.HistoryTaskKey) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "CancelTasksAfter", key) -} - -// CancelTasksAfter indicates an expected call of CancelTasksAfter. -func (mr *MockVirtualSliceMockRecorder) CancelTasksAfter(key any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CancelTasksAfter", reflect.TypeOf((*MockVirtualSlice)(nil).CancelTasksAfter), key) -} - // Clear mocks base method. func (m *MockVirtualSlice) Clear() { m.ctrl.T.Helper() @@ -164,6 +152,18 @@ func (mr *MockVirtualSliceMockRecorder) PendingTaskStats() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PendingTaskStats", reflect.TypeOf((*MockVirtualSlice)(nil).PendingTaskStats)) } +// ResetProgress mocks base method. +func (m *MockVirtualSlice) ResetProgress(key persistence.HistoryTaskKey) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ResetProgress", key) +} + +// ResetProgress indicates an expected call of ResetProgress. +func (mr *MockVirtualSliceMockRecorder) ResetProgress(key any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetProgress", reflect.TypeOf((*MockVirtualSlice)(nil).ResetProgress), key) +} + // TryMergeWithVirtualSlice mocks base method. func (m *MockVirtualSlice) TryMergeWithVirtualSlice(arg0 VirtualSlice) ([]VirtualSlice, bool) { m.ctrl.T.Helper() From 5ea633f65dd6f5c3aafdc75c50ae5a40b7804f3d Mon Sep 17 00:00:00 2001 From: Ignat Tubylov Date: Thu, 30 Oct 2025 15:00:20 +0100 Subject: [PATCH 5/7] After review #2 Signed-off-by: Ignat Tubylov --- service/history/queuev2/queue_base.go | 2 +- service/history/queuev2/virtual_queue.go | 41 ++++++++----------- .../history/queuev2/virtual_queue_manager.go | 16 +++++--- .../queuev2/virtual_queue_manager_mock.go | 6 +-- service/history/queuev2/virtual_slice.go | 24 +++++++++-- service/history/queuev2/virtual_slice_mock.go | 6 ++- service/history/shard/context.go | 9 ++++ 7 files changed, 64 insertions(+), 40 deletions(-) diff --git a/service/history/queuev2/queue_base.go b/service/history/queuev2/queue_base.go index e8cbb965bdf..8db0ef8d1a5 100644 --- a/service/history/queuev2/queue_base.go +++ b/service/history/queuev2/queue_base.go @@ -275,7 +275,7 @@ func (q *queueBase) processNewTasks() bool { } func (q *queueBase) insertSingleTask(task task.Task) bool { - return q.virtualQueueManager.InsertSingleTaskToRootQueue(task) + return q.virtualQueueManager.InsertSingleTask(task) } func (q *queueBase) resetProgress(key persistence.HistoryTaskKey) { diff --git a/service/history/queuev2/virtual_queue.go b/service/history/queuev2/virtual_queue.go index 3ffa506f098..88d33a71d96 100644 --- a/service/history/queuev2/virtual_queue.go +++ b/service/history/queuev2/virtual_queue.go @@ -67,7 +67,7 @@ type ( SplitSlices(func(VirtualSlice) (remaining []VirtualSlice, split bool)) // Pause pauses the virtual queue for a while Pause(time.Duration) - // InsertSingleTask inserts a single task to the virtual queue. Return false if the task's timestamp is out of range of the current queue slice.. + // InsertSingleTask inserts a single task to the virtual queue. Return false if the task's timestamp is out of range of the current queue slice or the task does not satisfy the predicate InsertSingleTask(task task.Task) bool // ResetProgress removes the scheduled tasks after the given time ResetProgress(persistence.HistoryTaskKey) @@ -398,7 +398,7 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() { now := q.timeSource.Now() for _, task := range tasks { - if err := q.submitTask(now, task); err != nil { + if err := q.submitOrRescheduleTask(now, task); err != nil { select { case <-q.ctx.Done(): return @@ -423,32 +423,21 @@ func (q *virtualQueueImpl) InsertSingleTask(task task.Task) bool { q.Lock() defer q.Unlock() - taskKey := task.GetTaskKey() - var slice VirtualSlice - for e := q.virtualSlices.Front(); e != nil; e = e.Next() { - s := e.Value.(VirtualSlice) - r := s.GetState().Range - if taskKey.Compare(r.InclusiveMinTaskKey) >= 0 && taskKey.Compare(r.ExclusiveMaxTaskKey) < 0 { - slice = s - break - } - } - - if slice == nil { - // the new task is outside of the current range, it will be read from the DB on the next poll - return false - } - - slice.InsertTask(task) - q.monitor.SetSlicePendingTaskCount(slice, slice.GetPendingTaskCount()) + slice := e.Value.(VirtualSlice) + inserted := slice.InsertTask(task) + if inserted { + q.monitor.SetSlicePendingTaskCount(slice, slice.GetPendingTaskCount()) + now := q.timeSource.Now() + if err := q.submitOrRescheduleTask(now, task); err != nil { + q.logger.Error("Error submitting task to virtual queue", tag.Error(err)) + } - now := q.timeSource.Now() - if err := q.submitTask(now, task); err != nil { - q.logger.Error("Error submitting task to virtual queue", tag.Error(err)) + return true + } } - return true + return false } func (q *virtualQueueImpl) ResetProgress(key persistence.HistoryTaskKey) { @@ -464,9 +453,11 @@ func (q *virtualQueueImpl) ResetProgress(key persistence.HistoryTaskKey) { break } } + + q.resetNextReadSliceLocked() } -func (q *virtualQueueImpl) submitTask(now time.Time, task task.Task) error { +func (q *virtualQueueImpl) submitOrRescheduleTask(now time.Time, task task.Task) error { if persistence.IsTaskCorrupted(task) { q.logger.Error("Virtual queue encountered a corrupted task", tag.Dynamic("task", task)) q.metricsScope.IncCounter(metrics.CorruptedHistoryTaskCounter) diff --git a/service/history/queuev2/virtual_queue_manager.go b/service/history/queuev2/virtual_queue_manager.go index 6b1fec728a7..f7dcedf6893 100644 --- a/service/history/queuev2/virtual_queue_manager.go +++ b/service/history/queuev2/virtual_queue_manager.go @@ -58,7 +58,8 @@ type ( // By default, all new tasks belong to the root queue, so we need to add a new virtual slice to the root queue. AddNewVirtualSliceToRootQueue(VirtualSlice) // Insert a single task to the current slice. Return false if the task's timestamp is out of range of the current slice. - InsertSingleTaskToRootQueue(task.Task) bool + InsertSingleTask(task.Task) bool + // ResetProgress resets the progress of the virtual queue to the given key. Pending tasks after the given key are cancelled. ResetProgress(persistence.HistoryTaskKey) } @@ -222,15 +223,18 @@ func (m *virtualQueueManagerImpl) AddNewVirtualSliceToRootQueue(s VirtualSlice) m.virtualQueues[rootQueueID].Start() } -func (m *virtualQueueManagerImpl) InsertSingleTaskToRootQueue(t task.Task) bool { +func (m *virtualQueueManagerImpl) InsertSingleTask(t task.Task) bool { m.Lock() defer m.Unlock() - if vq, ok := m.virtualQueues[rootQueueID]; ok { - return vq.InsertSingleTask(t) + + inserted := false + for _, vq := range m.virtualQueues { + if vq.InsertSingleTask(t) { + inserted = true + } } - // if a root queue is not created yet, no need to schedule an incoming task, it will be read from the slice - return false + return inserted } func (m *virtualQueueManagerImpl) ResetProgress(key persistence.HistoryTaskKey) { diff --git a/service/history/queuev2/virtual_queue_manager_mock.go b/service/history/queuev2/virtual_queue_manager_mock.go index ab5fafca605..4fdb0187d64 100644 --- a/service/history/queuev2/virtual_queue_manager_mock.go +++ b/service/history/queuev2/virtual_queue_manager_mock.go @@ -68,8 +68,8 @@ func (mr *MockVirtualQueueManagerMockRecorder) GetOrCreateVirtualQueue(arg0 any) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateVirtualQueue", reflect.TypeOf((*MockVirtualQueueManager)(nil).GetOrCreateVirtualQueue), arg0) } -// InsertSingleTaskToRootQueue mocks base method. -func (m *MockVirtualQueueManager) InsertSingleTaskToRootQueue(arg0 task.Task) bool { +// InsertSingleTask mocks base method. +func (m *MockVirtualQueueManager) InsertSingleTask(arg0 task.Task) bool { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "InsertSingleTaskToRootQueue", arg0) ret0, _ := ret[0].(bool) @@ -79,7 +79,7 @@ func (m *MockVirtualQueueManager) InsertSingleTaskToRootQueue(arg0 task.Task) bo // InsertSingleTaskToRootQueue indicates an expected call of InsertSingleTaskToRootQueue. func (mr *MockVirtualQueueManagerMockRecorder) InsertSingleTaskToRootQueue(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertSingleTaskToRootQueue", reflect.TypeOf((*MockVirtualQueueManager)(nil).InsertSingleTaskToRootQueue), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertSingleTaskToRootQueue", reflect.TypeOf((*MockVirtualQueueManager)(nil).InsertSingleTask), arg0) } // ResetProgress mocks base method. diff --git a/service/history/queuev2/virtual_slice.go b/service/history/queuev2/virtual_slice.go index 4f1b31b853e..c01cdba999b 100644 --- a/service/history/queuev2/virtual_slice.go +++ b/service/history/queuev2/virtual_slice.go @@ -37,7 +37,7 @@ type ( GetState() VirtualSliceState IsEmpty() bool GetTasks(context.Context, int) ([]task.Task, error) - InsertTask(task.Task) + InsertTask(task.Task) bool HasMoreTasks() bool UpdateAndGetState() VirtualSliceState GetPendingTaskCount() int @@ -117,8 +117,20 @@ func (s *virtualSliceImpl) Clear() { } } -func (s *virtualSliceImpl) InsertTask(task task.Task) { +func (s *virtualSliceImpl) InsertTask(task task.Task) bool { + r := s.state.Range + key := task.GetTaskKey() + + if key.Compare(r.InclusiveMinTaskKey) < 0 || key.Compare(r.ExclusiveMaxTaskKey) >= 0 { + return false + } + + if !s.state.Predicate.Check(task) { + return false + } + s.pendingTaskTracker.AddTask(task) + return true } func (s *virtualSliceImpl) GetTasks(ctx context.Context, pageSize int) ([]task.Task, error) { @@ -223,15 +235,21 @@ func (s *virtualSliceImpl) ResetProgress(key persistence.HistoryTaskKey) { } for i, progress := range s.progress { + // progress contains sorted non-overlapping ranges. If we found a range that contains the given key, we can reset + // this range's progress to the given key and merge the remaining ranges into it. if progress.NextTaskKey.Compare(key) > 0 { + maxTaskKey := s.progress[len(s.progress)-1].Range.ExclusiveMaxTaskKey s.progress[i] = &GetTaskProgress{ Range: Range{ InclusiveMinTaskKey: key, - ExclusiveMaxTaskKey: progress.Range.ExclusiveMaxTaskKey, + ExclusiveMaxTaskKey: maxTaskKey, }, NextPageToken: nil, NextTaskKey: key, } + + s.progress = s.progress[:i+1] + break } } } diff --git a/service/history/queuev2/virtual_slice_mock.go b/service/history/queuev2/virtual_slice_mock.go index 9d04262836d..0fdd296f54a 100644 --- a/service/history/queuev2/virtual_slice_mock.go +++ b/service/history/queuev2/virtual_slice_mock.go @@ -113,9 +113,11 @@ func (mr *MockVirtualSliceMockRecorder) HasMoreTasks() *gomock.Call { } // InsertTask mocks base method. -func (m *MockVirtualSlice) InsertTask(arg0 task.Task) { +func (m *MockVirtualSlice) InsertTask(arg0 task.Task) bool { m.ctrl.T.Helper() - m.ctrl.Call(m, "InsertTask", arg0) + ret := m.ctrl.Call(m, "InsertTask", arg0) + ret0, _ := ret[0].(bool) + return ret0 } // InsertTask indicates an expected call of InsertTask. diff --git a/service/history/shard/context.go b/service/history/shard/context.go index a871c9c3917..3f20b6ecefb 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -1363,6 +1363,15 @@ func (s *contextImpl) allocateTimerIDsLocked( tag.ValueShardAllocateTimerBeforeRead) ts = now.Add(persistence.DBTimestampMinPrecision) } + + if ts.Before(s.shardInfo.TimerAckLevel) { + s.logger.Warn("New timer generated is less than ack level", + tag.WorkflowDomainID(domainEntry.GetInfo().ID), + tag.WorkflowID(workflowID), + tag.Timestamp(ts)) + ts = s.shardInfo.TimerAckLevel.Add(persistence.DBTimestampMinPrecision) + } + task.SetVisibilityTimestamp(ts) seqNum, err := s.generateTaskIDLocked() From c622eba7da2c3e5a94e7ce69532793edb94b5997 Mon Sep 17 00:00:00 2001 From: Ignat Tubylov Date: Wed, 12 Nov 2025 13:46:35 +0100 Subject: [PATCH 6/7] Code cleanup; tests Signed-off-by: Ignat Tubylov --- .../history/queuev2/queue_scheduled_test.go | 513 ++++++++++++++++++ .../history/queuev2/virtual_queue_manager.go | 5 +- service/history/queuev2/virtual_queue_test.go | 493 +++++++++++++++++ service/history/queuev2/virtual_slice.go | 28 +- service/history/queuev2/virtual_slice_test.go | 364 +++++++++++++ .../dynamicconfig/queuev2_in_memory.yaml | 39 ++ .../history_simulation_queuev2_in_memory.yaml | 17 + 7 files changed, 1442 insertions(+), 17 deletions(-) create mode 100644 simulation/history/dynamicconfig/queuev2_in_memory.yaml create mode 100644 simulation/history/testdata/history_simulation_queuev2_in_memory.yaml diff --git a/service/history/queuev2/queue_scheduled_test.go b/service/history/queuev2/queue_scheduled_test.go index ce3477d99a1..c3deddd7aae 100644 --- a/service/history/queuev2/queue_scheduled_test.go +++ b/service/history/queuev2/queue_scheduled_test.go @@ -16,13 +16,50 @@ import ( "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/mocks" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" hcommon "github.com/uber/cadence/service/history/common" + historyconfig "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/shard" "github.com/uber/cadence/service/history/task" ) +func setupBasicShardMocks(mockShard *shard.MockContext, mockTimeSource clock.TimeSource, mockExecutionManager *persistence.MockExecutionManager) { + mockShard.EXPECT().GetShardID().Return(1).AnyTimes() + mockShard.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata).AnyTimes() + mockShard.EXPECT().GetTimeSource().Return(mockTimeSource).AnyTimes() + mockShard.EXPECT().GetConfig().Return(&historyconfig.Config{ + TaskCriticalRetryCount: dynamicproperties.GetIntPropertyFn(3), + }).AnyTimes() + mockShard.EXPECT().GetQueueState(persistence.HistoryTaskCategoryTimer).Return(&types.QueueState{ + VirtualQueueStates: map[int64]*types.VirtualQueueState{ + 0: { + VirtualSliceStates: []*types.VirtualSliceState{ + { + TaskRange: &types.TaskRange{ + InclusiveMin: &types.TaskKey{ + ScheduledTimeNano: mockTimeSource.Now().Add(-1 * time.Hour).UnixNano(), + }, + ExclusiveMax: &types.TaskKey{ + ScheduledTimeNano: mockTimeSource.Now().UnixNano(), + }, + }, + }, + }, + }, + }, + ExclusiveMaxReadLevel: &types.TaskKey{ + ScheduledTimeNano: mockTimeSource.Now().UnixNano(), + }, + }, nil).AnyTimes() + mockShard.EXPECT().GetExecutionManager().Return(mockExecutionManager).AnyTimes() + mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), gomock.Any()).Return(&persistence.GetHistoryTasksResponse{}, nil).AnyTimes() + mockExecutionManager.EXPECT().RangeCompleteHistoryTask(gomock.Any(), gomock.Any()).Return(&persistence.RangeCompleteHistoryTaskResponse{}, nil).AnyTimes() + mockShard.EXPECT().UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryTimer, cluster.TestCurrentClusterName).Return(persistence.NewHistoryTaskKey(time.Now(), 10)).AnyTimes() + mockShard.EXPECT().UpdateQueueState(persistence.HistoryTaskCategoryTimer, gomock.Any()).Return(nil).AnyTimes() +} + func TestScheduledQueue_LifeCycle(t *testing.T) { defer goleak.VerifyNone(t) ctrl := gomock.NewController(t) @@ -121,6 +158,482 @@ func TestScheduledQueue_LifeCycle(t *testing.T) { assert.Equal(t, common.DaemonStatusStopped, atomic.LoadInt32(&queue.status)) } +func TestScheduledQueue_NotifyNewTask_EmptyTasks(t *testing.T) { + defer goleak.VerifyNone(t) + ctrl := gomock.NewController(t) + + mockShard := shard.NewMockContext(ctrl) + mockTaskProcessor := task.NewMockProcessor(ctrl) + mockTaskExecutor := task.NewMockExecutor(ctrl) + mockLogger := testlogger.New(t) + mockMetricsClient := metrics.NoopClient + mockMetricsScope := &mocks.Scope{} + mockTimeSource := clock.NewMockedTimeSource() + mockExecutionManager := persistence.NewMockExecutionManager(ctrl) + + setupBasicShardMocks(mockShard, mockTimeSource, mockExecutionManager) + + options := &Options{ + DeleteBatchSize: dynamicproperties.GetIntPropertyFn(100), + RedispatchInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + PageSize: dynamicproperties.GetIntPropertyFn(100), + PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + UpdateAckInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + UpdateAckIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + MaxPollRPS: dynamicproperties.GetIntPropertyFn(100), + MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100), + PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0), + VirtualSliceForceAppendInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + CriticalPendingTaskCount: dynamicproperties.GetIntPropertyFn(90), + EnablePendingTaskCountAlert: func() bool { return true }, + MaxVirtualQueueCount: dynamicproperties.GetIntPropertyFn(2), + } + + queue := NewScheduledQueue( + mockShard, + persistence.HistoryTaskCategoryTimer, + mockTaskProcessor, + mockTaskExecutor, + mockLogger, + mockMetricsClient, + mockMetricsScope, + options, + ).(*scheduledQueue) + + assert.NotPanics(t, func() { + queue.NotifyNewTask("test-cluster", &hcommon.NotifyTaskInfo{ + Tasks: []persistence.Task{}, + }) + }) +} + +func TestScheduledQueue_NotifyNewTask_FieldCombinations(t *testing.T) { + defer goleak.VerifyNone(t) + + tests := []struct { + name string + scheduleInMemory bool + persistenceError bool + expectInMemory bool + expectDBRead bool + }{ + { + name: "ScheduleInMemory=true, PersistenceError=false - should try in-memory", + scheduleInMemory: true, + persistenceError: false, + expectInMemory: true, + expectDBRead: false, + }, + { + name: "ScheduleInMemory=true, PersistenceError=true - should skip in-memory", + scheduleInMemory: true, + persistenceError: true, + expectInMemory: false, + expectDBRead: true, + }, + { + name: "ScheduleInMemory=false, PersistenceError=false - should skip in-memory", + scheduleInMemory: false, + persistenceError: false, + expectInMemory: false, + expectDBRead: true, + }, + { + name: "ScheduleInMemory=false, PersistenceError=true - should skip in-memory", + scheduleInMemory: false, + persistenceError: true, + expectInMemory: false, + expectDBRead: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + + mockShard := shard.NewMockContext(ctrl) + mockTaskProcessor := task.NewMockProcessor(ctrl) + mockTaskExecutor := task.NewMockExecutor(ctrl) + mockLogger := testlogger.New(t) + mockMetricsClient := metrics.NoopClient + mockMetricsScope := &mocks.Scope{} + mockTimeSource := clock.NewMockedTimeSource() + mockExecutionManager := persistence.NewMockExecutionManager(ctrl) + mockVirtualQueueManager := NewMockVirtualQueueManager(ctrl) + + setupBasicShardMocks(mockShard, mockTimeSource, mockExecutionManager) + + mockMetricsScope.On("AddCounter", metrics.NewHistoryTaskCounter, int64(1)) + + options := &Options{ + DeleteBatchSize: dynamicproperties.GetIntPropertyFn(100), + RedispatchInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + PageSize: dynamicproperties.GetIntPropertyFn(100), + PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + UpdateAckInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + UpdateAckIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + MaxPollRPS: dynamicproperties.GetIntPropertyFn(100), + MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100), + PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0), + VirtualSliceForceAppendInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + CriticalPendingTaskCount: dynamicproperties.GetIntPropertyFn(90), + EnablePendingTaskCountAlert: func() bool { return true }, + MaxVirtualQueueCount: dynamicproperties.GetIntPropertyFn(2), + } + + queue := NewScheduledQueue( + mockShard, + persistence.HistoryTaskCategoryTimer, + mockTaskProcessor, + mockTaskExecutor, + mockLogger, + mockMetricsClient, + mockMetricsScope, + options, + ).(*scheduledQueue) + + queue.base.virtualQueueManager = mockVirtualQueueManager + + testTask := &persistence.DecisionTimeoutTask{ + TaskData: persistence.TaskData{ + VisibilityTimestamp: mockTimeSource.Now().Add(time.Hour), + }, + } + + if tt.expectInMemory { + mockVirtualQueueManager.EXPECT().InsertSingleTaskToRootQueue(gomock.Any()).Return(true) + } + if tt.expectDBRead { + mockVirtualQueueManager.EXPECT().ResetProgress(gomock.Any()) + } + + queue.NotifyNewTask("test-cluster", &hcommon.NotifyTaskInfo{ + Tasks: []persistence.Task{testTask}, + ScheduleInMemory: tt.scheduleInMemory, + PersistenceError: tt.persistenceError, + }) + }) + } +} + +func TestScheduledQueue_NotifyNewTask_InsertionScenarios(t *testing.T) { + defer goleak.VerifyNone(t) + + tests := []struct { + name string + numTasks int + insertionResults []bool + expectedTasksToReadFromDB int + }{ + { + name: "All tasks inserted successfully", + numTasks: 3, + insertionResults: []bool{true, true, true}, + expectedTasksToReadFromDB: 0, + }, + { + name: "Some tasks fail insertion", + numTasks: 3, + insertionResults: []bool{true, false, true}, + expectedTasksToReadFromDB: 1, + }, + { + name: "All tasks fail insertion", + numTasks: 3, + insertionResults: []bool{false, false, false}, + expectedTasksToReadFromDB: 3, + }, + { + name: "Single task success", + numTasks: 1, + insertionResults: []bool{true}, + expectedTasksToReadFromDB: 0, + }, + { + name: "Single task failure", + numTasks: 1, + insertionResults: []bool{false}, + expectedTasksToReadFromDB: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + + mockShard := shard.NewMockContext(ctrl) + mockTaskProcessor := task.NewMockProcessor(ctrl) + mockTaskExecutor := task.NewMockExecutor(ctrl) + mockLogger := testlogger.New(t) + mockMetricsClient := metrics.NoopClient + mockMetricsScope := &mocks.Scope{} + mockTimeSource := clock.NewMockedTimeSource() + mockExecutionManager := persistence.NewMockExecutionManager(ctrl) + mockVirtualQueueManager := NewMockVirtualQueueManager(ctrl) + + setupBasicShardMocks(mockShard, mockTimeSource, mockExecutionManager) + + mockMetricsScope.On("AddCounter", metrics.NewHistoryTaskCounter, int64(tt.numTasks)) + + options := &Options{ + DeleteBatchSize: dynamicproperties.GetIntPropertyFn(100), + RedispatchInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + PageSize: dynamicproperties.GetIntPropertyFn(100), + PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + UpdateAckInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + UpdateAckIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + MaxPollRPS: dynamicproperties.GetIntPropertyFn(100), + MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100), + PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0), + VirtualSliceForceAppendInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + CriticalPendingTaskCount: dynamicproperties.GetIntPropertyFn(90), + EnablePendingTaskCountAlert: func() bool { return true }, + MaxVirtualQueueCount: dynamicproperties.GetIntPropertyFn(2), + } + + queue := NewScheduledQueue( + mockShard, + persistence.HistoryTaskCategoryTimer, + mockTaskProcessor, + mockTaskExecutor, + mockLogger, + mockMetricsClient, + mockMetricsScope, + options, + ).(*scheduledQueue) + + queue.base.virtualQueueManager = mockVirtualQueueManager + + tasks := make([]persistence.Task, tt.numTasks) + for i := 0; i < tt.numTasks; i++ { + tasks[i] = &persistence.DecisionTimeoutTask{ + TaskData: persistence.TaskData{ + VisibilityTimestamp: mockTimeSource.Now().Add(time.Duration(i+1) * time.Hour), + }, + } + } + + for _, result := range tt.insertionResults { + mockVirtualQueueManager.EXPECT().InsertSingleTaskToRootQueue(gomock.Any()).Return(result).Times(1) + } + + if tt.expectedTasksToReadFromDB > 0 { + mockVirtualQueueManager.EXPECT().ResetProgress(gomock.Any()).Times(1) + } + + queue.NotifyNewTask("test-cluster", &hcommon.NotifyTaskInfo{ + Tasks: tasks, + ScheduleInMemory: true, + PersistenceError: false, + }) + }) + } +} + +func TestScheduledQueue_NotifyNewTask_TimestampCalculation(t *testing.T) { + defer goleak.VerifyNone(t) + + tests := []struct { + name string + taskTimestamps []time.Time + expectedEarliest time.Time + allInsertionsFail bool + }{ + { + name: "Multiple tasks with different timestamps", + taskTimestamps: []time.Time{ + time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), + time.Date(2023, 1, 1, 10, 0, 0, 0, time.UTC), + time.Date(2023, 1, 1, 14, 0, 0, 0, time.UTC), + }, + expectedEarliest: time.Date(2023, 1, 1, 10, 0, 0, 0, time.UTC), + allInsertionsFail: true, + }, + { + name: "Tasks with same timestamps", + taskTimestamps: []time.Time{ + time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), + time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), + time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), + }, + expectedEarliest: time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), + allInsertionsFail: true, + }, + { + name: "Single task", + taskTimestamps: []time.Time{ + time.Date(2023, 1, 1, 15, 30, 0, 0, time.UTC), + }, + expectedEarliest: time.Date(2023, 1, 1, 15, 30, 0, 0, time.UTC), + allInsertionsFail: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + + mockShard := shard.NewMockContext(ctrl) + mockTaskProcessor := task.NewMockProcessor(ctrl) + mockTaskExecutor := task.NewMockExecutor(ctrl) + mockLogger := testlogger.New(t) + mockMetricsClient := metrics.NoopClient + mockMetricsScope := &mocks.Scope{} + mockTimeSource := clock.NewMockedTimeSource() + mockExecutionManager := persistence.NewMockExecutionManager(ctrl) + mockVirtualQueueManager := NewMockVirtualQueueManager(ctrl) + + setupBasicShardMocks(mockShard, mockTimeSource, mockExecutionManager) + + mockMetricsScope.On("AddCounter", metrics.NewHistoryTaskCounter, int64(len(tt.taskTimestamps))) + + options := &Options{ + DeleteBatchSize: dynamicproperties.GetIntPropertyFn(100), + RedispatchInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + PageSize: dynamicproperties.GetIntPropertyFn(100), + PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + UpdateAckInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + UpdateAckIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + MaxPollRPS: dynamicproperties.GetIntPropertyFn(100), + MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100), + PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0), + VirtualSliceForceAppendInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + CriticalPendingTaskCount: dynamicproperties.GetIntPropertyFn(90), + EnablePendingTaskCountAlert: func() bool { return true }, + MaxVirtualQueueCount: dynamicproperties.GetIntPropertyFn(2), + } + + queue := NewScheduledQueue( + mockShard, + persistence.HistoryTaskCategoryTimer, + mockTaskProcessor, + mockTaskExecutor, + mockLogger, + mockMetricsClient, + mockMetricsScope, + options, + ).(*scheduledQueue) + + queue.base.virtualQueueManager = mockVirtualQueueManager + + tasks := make([]persistence.Task, len(tt.taskTimestamps)) + for i, ts := range tt.taskTimestamps { + tasks[i] = &persistence.DecisionTimeoutTask{ + TaskData: persistence.TaskData{ + VisibilityTimestamp: ts, + }, + } + } + + if tt.allInsertionsFail { + mockVirtualQueueManager.EXPECT().InsertSingleTaskToRootQueue(gomock.Any()).Return(false).Times(len(tasks)) + expectedKey := persistence.NewHistoryTaskKey(tt.expectedEarliest, 0) + mockVirtualQueueManager.EXPECT().ResetProgress(expectedKey).Times(1) + } + + queue.NotifyNewTask("test-cluster", &hcommon.NotifyTaskInfo{ + Tasks: tasks, + ScheduleInMemory: true, + PersistenceError: false, + }) + }) + } +} + +func TestScheduledQueue_NotifyNewTask_MultipleTaskTypes(t *testing.T) { + defer goleak.VerifyNone(t) + ctrl := gomock.NewController(t) + + mockShard := shard.NewMockContext(ctrl) + mockTaskProcessor := task.NewMockProcessor(ctrl) + mockTaskExecutor := task.NewMockExecutor(ctrl) + mockLogger := testlogger.New(t) + mockMetricsClient := metrics.NoopClient + mockMetricsScope := &mocks.Scope{} + mockTimeSource := clock.NewMockedTimeSource() + mockExecutionManager := persistence.NewMockExecutionManager(ctrl) + mockVirtualQueueManager := NewMockVirtualQueueManager(ctrl) + + setupBasicShardMocks(mockShard, mockTimeSource, mockExecutionManager) + + mockMetricsScope.On("AddCounter", metrics.NewHistoryTaskCounter, int64(4)) + + options := &Options{ + DeleteBatchSize: dynamicproperties.GetIntPropertyFn(100), + RedispatchInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + PageSize: dynamicproperties.GetIntPropertyFn(100), + PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + MaxPollIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + UpdateAckInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + UpdateAckIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.1), + MaxPollRPS: dynamicproperties.GetIntPropertyFn(100), + MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100), + PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0), + VirtualSliceForceAppendInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + CriticalPendingTaskCount: dynamicproperties.GetIntPropertyFn(90), + EnablePendingTaskCountAlert: func() bool { return true }, + MaxVirtualQueueCount: dynamicproperties.GetIntPropertyFn(2), + } + + queue := NewScheduledQueue( + mockShard, + persistence.HistoryTaskCategoryTimer, + mockTaskProcessor, + mockTaskExecutor, + mockLogger, + mockMetricsClient, + mockMetricsScope, + options, + ).(*scheduledQueue) + + queue.base.virtualQueueManager = mockVirtualQueueManager + + baseTime := mockTimeSource.Now() + + tasks := []persistence.Task{ + &persistence.DecisionTimeoutTask{ + TaskData: persistence.TaskData{ + VisibilityTimestamp: baseTime.Add(1 * time.Hour), + }, + }, + &persistence.ActivityTimeoutTask{ + TaskData: persistence.TaskData{ + VisibilityTimestamp: baseTime.Add(30 * time.Minute), // Earliest + }, + }, + &persistence.WorkflowTimeoutTask{ + TaskData: persistence.TaskData{ + VisibilityTimestamp: baseTime.Add(2 * time.Hour), + }, + }, + &persistence.UserTimerTask{ + TaskData: persistence.TaskData{ + VisibilityTimestamp: baseTime.Add(45 * time.Minute), + }, + }, + } + + mockVirtualQueueManager.EXPECT().InsertSingleTaskToRootQueue(gomock.Any()).Return(false).Times(4) + + expectedKey := persistence.NewHistoryTaskKey(baseTime.Add(30*time.Minute), 0) + mockVirtualQueueManager.EXPECT().ResetProgress(expectedKey).Times(1) + + queue.NotifyNewTask("test-cluster", &hcommon.NotifyTaskInfo{ + Tasks: tasks, + ScheduleInMemory: true, + PersistenceError: false, + }) +} + func TestScheduledQueue_LookAheadTask(t *testing.T) { defer goleak.VerifyNone(t) diff --git a/service/history/queuev2/virtual_queue_manager.go b/service/history/queuev2/virtual_queue_manager.go index f7dcedf6893..68afba4867f 100644 --- a/service/history/queuev2/virtual_queue_manager.go +++ b/service/history/queuev2/virtual_queue_manager.go @@ -227,14 +227,13 @@ func (m *virtualQueueManagerImpl) InsertSingleTask(t task.Task) bool { m.Lock() defer m.Unlock() - inserted := false for _, vq := range m.virtualQueues { if vq.InsertSingleTask(t) { - inserted = true + return true } } - return inserted + return false } func (m *virtualQueueManagerImpl) ResetProgress(key persistence.HistoryTaskKey) { diff --git a/service/history/queuev2/virtual_queue_test.go b/service/history/queuev2/virtual_queue_test.go index ccb5ccb2e7e..5aa46a4a13e 100644 --- a/service/history/queuev2/virtual_queue_test.go +++ b/service/history/queuev2/virtual_queue_test.go @@ -1791,3 +1791,496 @@ func TestVirtualQueue_MergeWithLastSlice(t *testing.T) { }) } } + +func TestVirtualQueue_InsertSingleTask(t *testing.T) { + tests := []struct { + name string + setupMocks func(ctrl *gomock.Controller) ([]VirtualSlice, task.Task, task.Processor, task.Rescheduler, Monitor, clock.TimeSource) + expectedResult bool + }{ + { + name: "Successfully insert task into first slice", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, task.Task, task.Processor, task.Rescheduler, Monitor, clock.TimeSource) { + mockSlice1 := NewMockVirtualSlice(ctrl) + mockSlice2 := NewMockVirtualSlice(ctrl) + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMonitor := NewMockMonitor(ctrl) + mockTimeSource := clock.NewMockedTimeSource() + + mockSlice1.EXPECT().InsertTask(mockTask).Return(true) + mockSlice1.EXPECT().GetPendingTaskCount().Return(5) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice1, 5) + + mockTask.EXPECT().GetDomainID().Return("test-domain").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("test-workflow").AnyTimes() + mockTask.EXPECT().GetRunID().Return("test-run").AnyTimes() + mockTask.EXPECT().GetTaskKey().Return(persistence.NewHistoryTaskKey(mockTimeSource.Now().Add(-time.Second), 1)) + mockTask.EXPECT().GetVisibilityTimestamp().Return(mockTimeSource.Now().Add(-time.Second)) + mockTask.EXPECT().SetInitialSubmitTime(gomock.Any()) + mockProcessor.EXPECT().TrySubmit(mockTask).Return(true, nil) + + return []VirtualSlice{mockSlice1, mockSlice2}, mockTask, mockProcessor, mockRescheduler, mockMonitor, mockTimeSource + }, + expectedResult: true, + }, + { + name: "Successfully insert task into second slice", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, task.Task, task.Processor, task.Rescheduler, Monitor, clock.TimeSource) { + mockSlice1 := NewMockVirtualSlice(ctrl) + mockSlice2 := NewMockVirtualSlice(ctrl) + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMonitor := NewMockMonitor(ctrl) + mockTimeSource := clock.NewMockedTimeSource() + + mockSlice1.EXPECT().InsertTask(mockTask).Return(false) + mockSlice2.EXPECT().InsertTask(mockTask).Return(true) + mockSlice2.EXPECT().GetPendingTaskCount().Return(3) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice2, 3) + + mockTask.EXPECT().GetDomainID().Return("test-domain").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("test-workflow").AnyTimes() + mockTask.EXPECT().GetRunID().Return("test-run").AnyTimes() + mockTask.EXPECT().GetTaskKey().Return(persistence.NewHistoryTaskKey(mockTimeSource.Now().Add(-time.Second), 1)) + mockTask.EXPECT().GetVisibilityTimestamp().Return(mockTimeSource.Now().Add(-time.Second)) + mockTask.EXPECT().SetInitialSubmitTime(gomock.Any()) + mockProcessor.EXPECT().TrySubmit(mockTask).Return(true, nil) + + return []VirtualSlice{mockSlice1, mockSlice2}, mockTask, mockProcessor, mockRescheduler, mockMonitor, mockTimeSource + }, + expectedResult: true, + }, + { + name: "No slice accepts the task", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, task.Task, task.Processor, task.Rescheduler, Monitor, clock.TimeSource) { + mockSlice1 := NewMockVirtualSlice(ctrl) + mockSlice2 := NewMockVirtualSlice(ctrl) + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMonitor := NewMockMonitor(ctrl) + mockTimeSource := clock.NewMockedTimeSource() + + mockSlice1.EXPECT().InsertTask(mockTask).Return(false) + mockSlice2.EXPECT().InsertTask(mockTask).Return(false) + + return []VirtualSlice{mockSlice1, mockSlice2}, mockTask, mockProcessor, mockRescheduler, mockMonitor, mockTimeSource + }, + expectedResult: false, + }, + { + name: "Insert task with submission error", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, task.Task, task.Processor, task.Rescheduler, Monitor, clock.TimeSource) { + mockSlice1 := NewMockVirtualSlice(ctrl) + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMonitor := NewMockMonitor(ctrl) + mockTimeSource := clock.NewMockedTimeSource() + + mockSlice1.EXPECT().InsertTask(mockTask).Return(true) + mockSlice1.EXPECT().GetPendingTaskCount().Return(1) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice1, 1) + + mockTask.EXPECT().GetDomainID().Return("test-domain").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("test-workflow").AnyTimes() + mockTask.EXPECT().GetRunID().Return("test-run").AnyTimes() + mockTask.EXPECT().GetTaskKey().Return(persistence.NewHistoryTaskKey(mockTimeSource.Now().Add(-time.Second), 1)) + mockTask.EXPECT().GetVisibilityTimestamp().Return(mockTimeSource.Now().Add(-time.Second)) + mockTask.EXPECT().SetInitialSubmitTime(gomock.Any()) + mockProcessor.EXPECT().TrySubmit(mockTask).Return(true, assert.AnError) + + return []VirtualSlice{mockSlice1}, mockTask, mockProcessor, mockRescheduler, mockMonitor, mockTimeSource + }, + expectedResult: true, + }, + { + name: "Insert task with empty slice list", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, task.Task, task.Processor, task.Rescheduler, Monitor, clock.TimeSource) { + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMonitor := NewMockMonitor(ctrl) + mockTimeSource := clock.NewMockedTimeSource() + + return []VirtualSlice{}, mockTask, mockProcessor, mockRescheduler, mockMonitor, mockTimeSource + }, + expectedResult: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + slices, mockTask, mockProcessor, mockRescheduler, mockMonitor, mockTimeSource := tt.setupMocks(ctrl) + mockLogger := testlogger.New(t) + mockMetricsScope := metrics.NoopScope + mockRateLimiter := quotas.NewMockLimiter(ctrl) + + queue := NewVirtualQueue( + mockProcessor, + mockRescheduler, + mockLogger, + mockMetricsScope, + mockTimeSource, + mockRateLimiter, + mockMonitor, + slices, + &VirtualQueueOptions{ + PageSize: dynamicproperties.GetIntPropertyFn(10), + MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100), + PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0), + }, + ) + + result := queue.InsertSingleTask(mockTask) + assert.Equal(t, tt.expectedResult, result) + }) + } +} + +func TestVirtualQueue_ResetProgress(t *testing.T) { + tests := []struct { + name string + setupMocks func(ctrl *gomock.Controller) ([]VirtualSlice, Monitor, persistence.HistoryTaskKey) + expectedSliceToReadIdx *int + }{ + { + name: "Reset progress on all slices before sliceToRead", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, Monitor, persistence.HistoryTaskKey) { + mockSlice1 := NewMockVirtualSlice(ctrl) + mockSlice2 := NewMockVirtualSlice(ctrl) + mockSlice3 := NewMockVirtualSlice(ctrl) + mockMonitor := NewMockMonitor(ctrl) + resetKey := persistence.NewHistoryTaskKey(time.Now(), 5) + + mockSlice1.EXPECT().ResetProgress(resetKey) + mockSlice1.EXPECT().GetPendingTaskCount().Return(2) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice1, 2) + mockSlice1.EXPECT().HasMoreTasks().Return(false) + + mockSlice2.EXPECT().ResetProgress(resetKey) + mockSlice2.EXPECT().GetPendingTaskCount().Return(3) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice2, 3) + mockSlice2.EXPECT().HasMoreTasks().Return(true) + + return []VirtualSlice{mockSlice1, mockSlice2, mockSlice3}, mockMonitor, resetKey + }, + expectedSliceToReadIdx: common.Ptr(1), + }, + { + name: "Reset progress with no sliceToRead set", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, Monitor, persistence.HistoryTaskKey) { + mockSlice1 := NewMockVirtualSlice(ctrl) + mockSlice2 := NewMockVirtualSlice(ctrl) + mockMonitor := NewMockMonitor(ctrl) + resetKey := persistence.NewHistoryTaskKey(time.Now(), 10) + + mockSlice1.EXPECT().ResetProgress(resetKey) + mockSlice1.EXPECT().GetPendingTaskCount().Return(0) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice1, 0) + mockSlice1.EXPECT().HasMoreTasks().Return(false) + + mockSlice2.EXPECT().ResetProgress(resetKey) + mockSlice2.EXPECT().GetPendingTaskCount().Return(1) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice2, 1) + mockSlice2.EXPECT().HasMoreTasks().Return(true) + + return []VirtualSlice{mockSlice1, mockSlice2}, mockMonitor, resetKey + }, + expectedSliceToReadIdx: common.Ptr(1), + }, + { + name: "Reset progress with no slices having more tasks", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, Monitor, persistence.HistoryTaskKey) { + mockSlice1 := NewMockVirtualSlice(ctrl) + mockSlice2 := NewMockVirtualSlice(ctrl) + mockMonitor := NewMockMonitor(ctrl) + resetKey := persistence.NewHistoryTaskKey(time.Now(), 15) + + mockSlice1.EXPECT().ResetProgress(resetKey) + mockSlice1.EXPECT().GetPendingTaskCount().Return(0) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice1, 0) + mockSlice1.EXPECT().HasMoreTasks().Return(false) + + mockSlice2.EXPECT().ResetProgress(resetKey) + mockSlice2.EXPECT().GetPendingTaskCount().Return(0) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice2, 0) + mockSlice2.EXPECT().HasMoreTasks().Return(false) + + return []VirtualSlice{mockSlice1, mockSlice2}, mockMonitor, resetKey + }, + expectedSliceToReadIdx: nil, + }, + { + name: "Reset progress with empty slice list", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, Monitor, persistence.HistoryTaskKey) { + mockMonitor := NewMockMonitor(ctrl) + resetKey := persistence.NewHistoryTaskKey(time.Now(), 20) + + return []VirtualSlice{}, mockMonitor, resetKey + }, + expectedSliceToReadIdx: nil, + }, + { + name: "Reset progress stops at first slice when it's sliceToRead", + setupMocks: func(ctrl *gomock.Controller) ([]VirtualSlice, Monitor, persistence.HistoryTaskKey) { + mockSlice1 := NewMockVirtualSlice(ctrl) + mockSlice2 := NewMockVirtualSlice(ctrl) + mockMonitor := NewMockMonitor(ctrl) + resetKey := persistence.NewHistoryTaskKey(time.Now(), 25) + + mockSlice1.EXPECT().ResetProgress(resetKey) + mockSlice1.EXPECT().GetPendingTaskCount().Return(4) + mockMonitor.EXPECT().SetSlicePendingTaskCount(mockSlice1, 4) + mockSlice1.EXPECT().HasMoreTasks().Return(true) + + return []VirtualSlice{mockSlice1, mockSlice2}, mockMonitor, resetKey + }, + expectedSliceToReadIdx: common.Ptr(0), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + slices, mockMonitor, resetKey := tt.setupMocks(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockLogger := testlogger.New(t) + mockMetricsScope := metrics.NoopScope + mockTimeSource := clock.NewMockedTimeSource() + mockRateLimiter := quotas.NewMockLimiter(ctrl) + + queue := NewVirtualQueue( + mockProcessor, + mockRescheduler, + mockLogger, + mockMetricsScope, + mockTimeSource, + mockRateLimiter, + mockMonitor, + slices, + &VirtualQueueOptions{ + PageSize: dynamicproperties.GetIntPropertyFn(10), + MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100), + PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0), + }, + ) + + queueImpl := queue.(*virtualQueueImpl) + + if len(slices) > 0 && tt.name != "Reset progress with no sliceToRead set" && tt.name != "Reset progress with empty slice list" { + if tt.name == "Reset progress stops at first slice when it's sliceToRead" { + queueImpl.sliceToRead = queueImpl.virtualSlices.Front() + } else { + queueImpl.sliceToRead = queueImpl.virtualSlices.Front().Next() + } + } else if tt.name == "Reset progress with no sliceToRead set" { + queueImpl.sliceToRead = nil + } + + queue.ResetProgress(resetKey) + + if tt.expectedSliceToReadIdx == nil { + assert.Nil(t, queueImpl.sliceToRead, "sliceToRead should be nil") + } else { + assert.NotNil(t, queueImpl.sliceToRead, "sliceToRead should not be nil") + if queueImpl.sliceToRead != nil { + expectedSlice := slices[*tt.expectedSliceToReadIdx] + assert.Equal(t, expectedSlice, queueImpl.sliceToRead.Value.(VirtualSlice)) + } + } + }) + } +} + +func TestVirtualQueue_SubmitOrRescheduleTask(t *testing.T) { + tests := []struct { + name string + setupMocks func(ctrl *gomock.Controller) (task.Task, task.Processor, task.Rescheduler, metrics.Scope, clock.TimeSource, time.Time) + expectErr bool + }{ + { + name: "Submit corrupted task - should ack and return nil", + setupMocks: func(ctrl *gomock.Controller) (task.Task, task.Processor, task.Rescheduler, metrics.Scope, clock.TimeSource, time.Time) { + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMetricsScope := metrics.NoopScope + mockTimeSource := clock.NewMockedTimeSource() + now := mockTimeSource.Now() + + mockTask.EXPECT().GetDomainID().Return("").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("").AnyTimes() + mockTask.EXPECT().GetRunID().Return("").AnyTimes() + mockTask.EXPECT().Ack() + + return mockTask, mockProcessor, mockRescheduler, mockMetricsScope, mockTimeSource, now + }, + expectErr: false, + }, + { + name: "Reschedule future task", + setupMocks: func(ctrl *gomock.Controller) (task.Task, task.Processor, task.Rescheduler, metrics.Scope, clock.TimeSource, time.Time) { + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMetricsScope := metrics.NoopScope + mockTimeSource := clock.NewMockedTimeSource() + now := mockTimeSource.Now() + futureTime := now.Add(time.Hour) + + mockTask.EXPECT().GetDomainID().Return("test-domain").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("test-workflow").AnyTimes() + mockTask.EXPECT().GetRunID().Return("test-run").AnyTimes() + mockTask.EXPECT().GetTaskKey().Return(persistence.NewHistoryTaskKey(futureTime, 1)) + mockRescheduler.EXPECT().RescheduleTask(mockTask, futureTime) + + return mockTask, mockProcessor, mockRescheduler, mockMetricsScope, mockTimeSource, now + }, + expectErr: false, + }, + { + name: "Successfully submit task", + setupMocks: func(ctrl *gomock.Controller) (task.Task, task.Processor, task.Rescheduler, metrics.Scope, clock.TimeSource, time.Time) { + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMetricsScope := metrics.NoopScope + mockTimeSource := clock.NewMockedTimeSource() + now := mockTimeSource.Now() + pastTime := now.Add(-time.Hour) + + mockTask.EXPECT().GetDomainID().Return("test-domain").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("test-workflow").AnyTimes() + mockTask.EXPECT().GetRunID().Return("test-run").AnyTimes() + mockTask.EXPECT().GetTaskKey().Return(persistence.NewHistoryTaskKey(pastTime, 1)) + mockTask.EXPECT().GetVisibilityTimestamp().Return(pastTime) + mockTask.EXPECT().SetInitialSubmitTime(now) + mockProcessor.EXPECT().TrySubmit(mockTask).Return(true, nil) + + return mockTask, mockProcessor, mockRescheduler, mockMetricsScope, mockTimeSource, now + }, + expectErr: false, + }, + { + name: "Submit task with processor error", + setupMocks: func(ctrl *gomock.Controller) (task.Task, task.Processor, task.Rescheduler, metrics.Scope, clock.TimeSource, time.Time) { + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMetricsScope := metrics.NoopScope + mockTimeSource := clock.NewMockedTimeSource() + now := mockTimeSource.Now() + pastTime := now.Add(-time.Hour) + + mockTask.EXPECT().GetDomainID().Return("test-domain").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("test-workflow").AnyTimes() + mockTask.EXPECT().GetRunID().Return("test-run").AnyTimes() + mockTask.EXPECT().GetTaskKey().Return(persistence.NewHistoryTaskKey(pastTime, 1)) + mockTask.EXPECT().GetVisibilityTimestamp().Return(pastTime) + mockTask.EXPECT().SetInitialSubmitTime(now) + mockProcessor.EXPECT().TrySubmit(mockTask).Return(true, assert.AnError) + + return mockTask, mockProcessor, mockRescheduler, mockMetricsScope, mockTimeSource, now + }, + expectErr: true, + }, + { + name: "Task submission throttled - should reschedule", + setupMocks: func(ctrl *gomock.Controller) (task.Task, task.Processor, task.Rescheduler, metrics.Scope, clock.TimeSource, time.Time) { + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMetricsScope := metrics.NoopScope + mockTimeSource := clock.NewMockedTimeSource() + now := mockTimeSource.Now() + pastTime := now.Add(-time.Hour) + + mockTask.EXPECT().GetDomainID().Return("test-domain").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("test-workflow").AnyTimes() + mockTask.EXPECT().GetRunID().Return("test-run").AnyTimes() + mockTask.EXPECT().GetTaskKey().Return(persistence.NewHistoryTaskKey(pastTime, 1)) + mockTask.EXPECT().GetVisibilityTimestamp().Return(pastTime) + mockTask.EXPECT().SetInitialSubmitTime(now) + mockProcessor.EXPECT().TrySubmit(mockTask).Return(false, nil) + mockRescheduler.EXPECT().RescheduleTask(mockTask, now.Add(taskSchedulerThrottleBackoffInterval)) + + return mockTask, mockProcessor, mockRescheduler, mockMetricsScope, mockTimeSource, now + }, + expectErr: false, + }, + { + name: "Task submission throttled with error - should reschedule and return error", + setupMocks: func(ctrl *gomock.Controller) (task.Task, task.Processor, task.Rescheduler, metrics.Scope, clock.TimeSource, time.Time) { + mockTask := task.NewMockTask(ctrl) + mockProcessor := task.NewMockProcessor(ctrl) + mockRescheduler := task.NewMockRescheduler(ctrl) + mockMetricsScope := metrics.NoopScope + mockTimeSource := clock.NewMockedTimeSource() + now := mockTimeSource.Now() + pastTime := now.Add(-time.Hour) + + mockTask.EXPECT().GetDomainID().Return("test-domain").AnyTimes() + mockTask.EXPECT().GetWorkflowID().Return("test-workflow").AnyTimes() + mockTask.EXPECT().GetRunID().Return("test-run").AnyTimes() + mockTask.EXPECT().GetTaskKey().Return(persistence.NewHistoryTaskKey(pastTime, 1)) + mockTask.EXPECT().GetVisibilityTimestamp().Return(pastTime) + mockTask.EXPECT().SetInitialSubmitTime(now) + mockProcessor.EXPECT().TrySubmit(mockTask).Return(false, assert.AnError) + mockRescheduler.EXPECT().RescheduleTask(mockTask, now.Add(taskSchedulerThrottleBackoffInterval)) + + return mockTask, mockProcessor, mockRescheduler, mockMetricsScope, mockTimeSource, now + }, + expectErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockTask, mockProcessor, mockRescheduler, mockMetricsScope, mockTimeSource, now := tt.setupMocks(ctrl) + mockLogger := testlogger.New(t) + mockRateLimiter := quotas.NewMockLimiter(ctrl) + mockMonitor := NewMockMonitor(ctrl) + + queue := NewVirtualQueue( + mockProcessor, + mockRescheduler, + mockLogger, + mockMetricsScope, + mockTimeSource, + mockRateLimiter, + mockMonitor, + []VirtualSlice{}, + &VirtualQueueOptions{ + PageSize: dynamicproperties.GetIntPropertyFn(10), + MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100), + PollBackoffInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10), + PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0), + }, + ) + + queueImpl := queue.(*virtualQueueImpl) + err := queueImpl.submitOrRescheduleTask(now, mockTask) + + if tt.expectErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/service/history/queuev2/virtual_slice.go b/service/history/queuev2/virtual_slice.go index c01cdba999b..405c91c036e 100644 --- a/service/history/queuev2/virtual_slice.go +++ b/service/history/queuev2/virtual_slice.go @@ -118,19 +118,12 @@ func (s *virtualSliceImpl) Clear() { } func (s *virtualSliceImpl) InsertTask(task task.Task) bool { - r := s.state.Range - key := task.GetTaskKey() - - if key.Compare(r.InclusiveMinTaskKey) < 0 || key.Compare(r.ExclusiveMaxTaskKey) >= 0 { - return false - } - - if !s.state.Predicate.Check(task) { - return false + if s.state.Contains(task) { + s.pendingTaskTracker.AddTask(task) + return true } - s.pendingTaskTracker.AddTask(task) - return true + return false } func (s *virtualSliceImpl) GetTasks(ctx context.Context, pageSize int) ([]task.Task, error) { @@ -213,6 +206,12 @@ func (s *virtualSliceImpl) UpdateAndGetState() VirtualSliceState { // we want to cancel all the tasks after the given key and reset the progress to the given key, // so that in the next poll, we will read the tasks from the DB starting from the given key. func (s *virtualSliceImpl) ResetProgress(key persistence.HistoryTaskKey) { + + // the given key is after the current slice, no need to reset + if key.Compare(s.state.Range.ExclusiveMaxTaskKey) >= 0 { + return + } + taskMap := s.pendingTaskTracker.GetTasks() for _, task := range taskMap { if task.GetTaskKey().Compare(key) >= 0 { @@ -237,17 +236,18 @@ func (s *virtualSliceImpl) ResetProgress(key persistence.HistoryTaskKey) { for i, progress := range s.progress { // progress contains sorted non-overlapping ranges. If we found a range that contains the given key, we can reset // this range's progress to the given key and merge the remaining ranges into it. - if progress.NextTaskKey.Compare(key) > 0 { + if progress.ExclusiveMaxTaskKey.Compare(key) > 0 { maxTaskKey := s.progress[len(s.progress)-1].Range.ExclusiveMaxTaskKey s.progress[i] = &GetTaskProgress{ Range: Range{ - InclusiveMinTaskKey: key, + InclusiveMinTaskKey: persistence.MinHistoryTaskKey(key, progress.InclusiveMinTaskKey), ExclusiveMaxTaskKey: maxTaskKey, }, NextPageToken: nil, - NextTaskKey: key, + NextTaskKey: persistence.MinHistoryTaskKey(key, progress.NextTaskKey), } + s.state.Range.InclusiveMinTaskKey = persistence.MinHistoryTaskKey(key, s.state.Range.InclusiveMinTaskKey) s.progress = s.progress[:i+1] break } diff --git a/service/history/queuev2/virtual_slice_test.go b/service/history/queuev2/virtual_slice_test.go index 6f0b7953639..8b21242396b 100644 --- a/service/history/queuev2/virtual_slice_test.go +++ b/service/history/queuev2/virtual_slice_test.go @@ -3472,3 +3472,367 @@ func TestMergeVirtualSlicesWithDifferentPredicate(t *testing.T) { }) } } + +func TestVirtualSliceImpl_InsertTask(t *testing.T) { + tests := []struct { + name string + setupSlice func(ctrl *gomock.Controller) *virtualSliceImpl + setupTask func(ctrl *gomock.Controller) task.Task + expectedResult bool + expectAddTask bool + }{ + { + name: "Task within range and predicate matches - should insert successfully", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPredicate := NewMockPredicate(ctrl) + mockPredicate.EXPECT().Check(gomock.Any()).Return(true) + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + Predicate: mockPredicate, + }, + pendingTaskTracker: mockPendingTaskTracker, + } + }, + setupTask: func(ctrl *gomock.Controller) task.Task { + mockTask := task.NewMockTask(ctrl) + mockTask.EXPECT().GetTaskKey().Return(persistence.NewImmediateTaskKey(5)).AnyTimes() + return mockTask + }, + expectedResult: true, + expectAddTask: true, + }, + { + name: "Task outside range - should not insert", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPredicate := NewMockPredicate(ctrl) + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + Predicate: mockPredicate, + }, + pendingTaskTracker: mockPendingTaskTracker, + } + }, + setupTask: func(ctrl *gomock.Controller) task.Task { + mockTask := task.NewMockTask(ctrl) + mockTask.EXPECT().GetTaskKey().Return(persistence.NewImmediateTaskKey(15)).AnyTimes() + return mockTask + }, + expectedResult: false, + expectAddTask: false, + }, + { + name: "Task within range but predicate doesn't match - should not insert", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPredicate := NewMockPredicate(ctrl) + mockPredicate.EXPECT().Check(gomock.Any()).Return(false) + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + Predicate: mockPredicate, + }, + pendingTaskTracker: mockPendingTaskTracker, + } + }, + setupTask: func(ctrl *gomock.Controller) task.Task { + mockTask := task.NewMockTask(ctrl) + mockTask.EXPECT().GetTaskKey().Return(persistence.NewImmediateTaskKey(5)).AnyTimes() + return mockTask + }, + expectedResult: false, + expectAddTask: false, + }, + { + name: "Task at inclusive min boundary - should insert successfully", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPredicate := NewMockPredicate(ctrl) + mockPredicate.EXPECT().Check(gomock.Any()).Return(true) + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + Predicate: mockPredicate, + }, + pendingTaskTracker: mockPendingTaskTracker, + } + }, + setupTask: func(ctrl *gomock.Controller) task.Task { + mockTask := task.NewMockTask(ctrl) + mockTask.EXPECT().GetTaskKey().Return(persistence.NewImmediateTaskKey(1)).AnyTimes() + return mockTask + }, + expectedResult: true, + expectAddTask: true, + }, + { + name: "Task at exclusive max boundary - should not insert", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPredicate := NewMockPredicate(ctrl) + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + Predicate: mockPredicate, + }, + pendingTaskTracker: mockPendingTaskTracker, + } + }, + setupTask: func(ctrl *gomock.Controller) task.Task { + mockTask := task.NewMockTask(ctrl) + mockTask.EXPECT().GetTaskKey().Return(persistence.NewImmediateTaskKey(10)).AnyTimes() + return mockTask + }, + expectedResult: false, + expectAddTask: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + slice := tt.setupSlice(ctrl) + task := tt.setupTask(ctrl) + + if tt.expectAddTask { + slice.pendingTaskTracker.(*MockPendingTaskTracker).EXPECT().AddTask(task).Times(1) + } + + result := slice.InsertTask(task) + + assert.Equal(t, tt.expectedResult, result) + }) + } +} + +func TestVirtualSliceImpl_ResetProgress(t *testing.T) { + tests := []struct { + name string + setupSlice func(ctrl *gomock.Controller) *virtualSliceImpl + resetKey persistence.HistoryTaskKey + expectedProgressLen int + expectedCancelCalls int + validateProgress func(t *testing.T, progress []*GetTaskProgress) + }{ + { + name: "Reset key after slice range - should not reset", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPendingTaskTracker.EXPECT().GetTasks().Return(map[persistence.HistoryTaskKey]task.Task{}).AnyTimes() + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + }, + pendingTaskTracker: mockPendingTaskTracker, + progress: []*GetTaskProgress{ + { + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + NextTaskKey: persistence.NewImmediateTaskKey(5), + }, + }, + } + }, + resetKey: persistence.NewImmediateTaskKey(15), + expectedProgressLen: 1, + expectedCancelCalls: 0, + validateProgress: func(t *testing.T, progress []*GetTaskProgress) { + assert.Equal(t, persistence.NewImmediateTaskKey(5), progress[0].NextTaskKey) + }, + }, + { + name: "Reset key within range with no progress - should create new progress", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPendingTaskTracker.EXPECT().GetTasks().Return(map[persistence.HistoryTaskKey]task.Task{}).AnyTimes() + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + }, + pendingTaskTracker: mockPendingTaskTracker, + progress: []*GetTaskProgress{}, + } + }, + resetKey: persistence.NewImmediateTaskKey(5), + expectedProgressLen: 1, + expectedCancelCalls: 0, + validateProgress: func(t *testing.T, progress []*GetTaskProgress) { + assert.Equal(t, persistence.NewImmediateTaskKey(5), progress[0].NextTaskKey) + assert.Equal(t, persistence.NewImmediateTaskKey(5), progress[0].InclusiveMinTaskKey) + assert.Equal(t, persistence.NewImmediateTaskKey(10), progress[0].ExclusiveMaxTaskKey) + }, + }, + { + name: "Reset key within range with existing progress - should reset and cancel tasks", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockTask1 := task.NewMockTask(ctrl) + mockTask2 := task.NewMockTask(ctrl) + mockTask3 := task.NewMockTask(ctrl) + + mockTask1.EXPECT().GetTaskKey().Return(persistence.NewImmediateTaskKey(3)).AnyTimes() + + mockTask2.EXPECT().GetTaskKey().Return(persistence.NewImmediateTaskKey(5)).AnyTimes() + mockTask2.EXPECT().Cancel().Times(1) + + mockTask3.EXPECT().GetTaskKey().Return(persistence.NewImmediateTaskKey(7)).AnyTimes() + mockTask3.EXPECT().Cancel().Times(1) + + mockPendingTaskTracker.EXPECT().GetTasks().Return(map[persistence.HistoryTaskKey]task.Task{ + persistence.NewImmediateTaskKey(3): mockTask1, + persistence.NewImmediateTaskKey(5): mockTask2, + persistence.NewImmediateTaskKey(7): mockTask3, + }) + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + }, + pendingTaskTracker: mockPendingTaskTracker, + progress: []*GetTaskProgress{ + { + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + NextTaskKey: persistence.NewImmediateTaskKey(8), + }, + }, + } + }, + resetKey: persistence.NewImmediateTaskKey(5), + expectedProgressLen: 1, + expectedCancelCalls: 2, + validateProgress: func(t *testing.T, progress []*GetTaskProgress) { + assert.Equal(t, persistence.NewImmediateTaskKey(5), progress[0].NextTaskKey) + assert.Equal(t, persistence.NewImmediateTaskKey(1), progress[0].InclusiveMinTaskKey) + assert.Equal(t, persistence.NewImmediateTaskKey(10), progress[0].ExclusiveMaxTaskKey) + }, + }, + { + name: "Reset key with multiple progress ranges - should merge remaining ranges", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPendingTaskTracker.EXPECT().GetTasks().Return(map[persistence.HistoryTaskKey]task.Task{}).AnyTimes() + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(20), + }, + }, + pendingTaskTracker: mockPendingTaskTracker, + progress: []*GetTaskProgress{ + { + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + NextTaskKey: persistence.NewImmediateTaskKey(5), + }, + { + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(10), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(20), + }, + NextTaskKey: persistence.NewImmediateTaskKey(15), + }, + }, + } + }, + resetKey: persistence.NewImmediateTaskKey(7), + expectedProgressLen: 1, + expectedCancelCalls: 0, + validateProgress: func(t *testing.T, progress []*GetTaskProgress) { + assert.Equal(t, persistence.NewImmediateTaskKey(5), progress[0].NextTaskKey) + assert.Equal(t, persistence.NewImmediateTaskKey(1), progress[0].InclusiveMinTaskKey) + assert.Equal(t, persistence.NewImmediateTaskKey(20), progress[0].ExclusiveMaxTaskKey) + }, + }, + { + name: "Reset key before current progress next task key - should use reset key as next task key", + setupSlice: func(ctrl *gomock.Controller) *virtualSliceImpl { + mockPendingTaskTracker := NewMockPendingTaskTracker(ctrl) + mockPendingTaskTracker.EXPECT().GetTasks().Return(map[persistence.HistoryTaskKey]task.Task{}).AnyTimes() + + return &virtualSliceImpl{ + state: VirtualSliceState{ + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + }, + pendingTaskTracker: mockPendingTaskTracker, + progress: []*GetTaskProgress{ + { + Range: Range{ + InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1), + ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10), + }, + NextTaskKey: persistence.NewImmediateTaskKey(8), + }, + }, + } + }, + resetKey: persistence.NewImmediateTaskKey(3), + expectedProgressLen: 1, + expectedCancelCalls: 0, + validateProgress: func(t *testing.T, progress []*GetTaskProgress) { + assert.Equal(t, persistence.NewImmediateTaskKey(3), progress[0].NextTaskKey) + assert.Equal(t, persistence.NewImmediateTaskKey(1), progress[0].InclusiveMinTaskKey) + assert.Equal(t, persistence.NewImmediateTaskKey(10), progress[0].ExclusiveMaxTaskKey) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + slice := tt.setupSlice(ctrl) + + slice.ResetProgress(tt.resetKey) + + assert.Equal(t, tt.expectedProgressLen, len(slice.progress)) + if tt.validateProgress != nil { + tt.validateProgress(t, slice.progress) + } + }) + } +} diff --git a/simulation/history/dynamicconfig/queuev2_in_memory.yaml b/simulation/history/dynamicconfig/queuev2_in_memory.yaml new file mode 100644 index 00000000000..261b97a780c --- /dev/null +++ b/simulation/history/dynamicconfig/queuev2_in_memory.yaml @@ -0,0 +1,39 @@ +system.workflowDeletionJitterRange: +- value: 0 + constraints: {} +history.enableTimerQueueV2: +- value: true + constraints: {} +history.enableTransferQueueV2: +- value: true + constraints: {} +history.queueMaxPendingTaskCount: +- value: 10000 + constraints: {} +history.timerTaskBatchSize: +- value: 100 + constraints: {} +history.timerProcessorUpdateAckInterval: +- value: 30s + constraints: {} +history.timerProcessorUpdateAckIntervalJitterCoefficient: +- value: 0 + constraints: {} +history.timerProcessorMaxPollRPS: +- value: 20 + constraints: {} +history.transferTaskBatchSize: +- value: 100 + constraints: {} +history.transferProcessorUpdateAckInterval: +- value: 30s + constraints: {} +history.transferProcessorUpdateAckIntervalJitterCoefficient: +- value: 0 + constraints: {} +history.transferProcessorMaxPollRPS: +- value: 20 + constraints: {} +history.timerProcessorInMemoryQueueMaxTimeShift: +- value: 30s + constraints: {} \ No newline at end of file diff --git a/simulation/history/testdata/history_simulation_queuev2_in_memory.yaml b/simulation/history/testdata/history_simulation_queuev2_in_memory.yaml new file mode 100644 index 00000000000..02060f60f55 --- /dev/null +++ b/simulation/history/testdata/history_simulation_queuev2_in_memory.yaml @@ -0,0 +1,17 @@ +enablearchival: false +clusterno: 0 +messagingclientconfig: + usemock: true +historyconfig: + numhistoryshards: 4 + numhistoryhosts: 1 +matchingconfig: + nummatchinghosts: 1 +workerconfig: + enableasyncwfconsumer: false + enablearchiver: false + enablereplicator: false + enableindexer: false +dynamicclientconfig: + filepath: "dynamicconfig/queuev2_in_memory.yaml" + pollInterval: "10s" From 5c8e317d7476acf211baf53249338ffe4b3ff4c3 Mon Sep 17 00:00:00 2001 From: Ignat Tubylov Date: Wed, 12 Nov 2025 14:22:57 +0100 Subject: [PATCH 7/7] Fix tests Signed-off-by: Ignat Tubylov --- service/history/queuev2/queue_scheduled_test.go | 8 ++++---- service/history/queuev2/virtual_queue_manager_mock.go | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/service/history/queuev2/queue_scheduled_test.go b/service/history/queuev2/queue_scheduled_test.go index c3deddd7aae..2cdb9b5c79f 100644 --- a/service/history/queuev2/queue_scheduled_test.go +++ b/service/history/queuev2/queue_scheduled_test.go @@ -305,7 +305,7 @@ func TestScheduledQueue_NotifyNewTask_FieldCombinations(t *testing.T) { } if tt.expectInMemory { - mockVirtualQueueManager.EXPECT().InsertSingleTaskToRootQueue(gomock.Any()).Return(true) + mockVirtualQueueManager.EXPECT().InsertSingleTask(gomock.Any()).Return(true) } if tt.expectDBRead { mockVirtualQueueManager.EXPECT().ResetProgress(gomock.Any()) @@ -420,7 +420,7 @@ func TestScheduledQueue_NotifyNewTask_InsertionScenarios(t *testing.T) { } for _, result := range tt.insertionResults { - mockVirtualQueueManager.EXPECT().InsertSingleTaskToRootQueue(gomock.Any()).Return(result).Times(1) + mockVirtualQueueManager.EXPECT().InsertSingleTask(gomock.Any()).Return(result).Times(1) } if tt.expectedTasksToReadFromDB > 0 { @@ -534,7 +534,7 @@ func TestScheduledQueue_NotifyNewTask_TimestampCalculation(t *testing.T) { } if tt.allInsertionsFail { - mockVirtualQueueManager.EXPECT().InsertSingleTaskToRootQueue(gomock.Any()).Return(false).Times(len(tasks)) + mockVirtualQueueManager.EXPECT().InsertSingleTask(gomock.Any()).Return(false).Times(len(tasks)) expectedKey := persistence.NewHistoryTaskKey(tt.expectedEarliest, 0) mockVirtualQueueManager.EXPECT().ResetProgress(expectedKey).Times(1) } @@ -622,7 +622,7 @@ func TestScheduledQueue_NotifyNewTask_MultipleTaskTypes(t *testing.T) { }, } - mockVirtualQueueManager.EXPECT().InsertSingleTaskToRootQueue(gomock.Any()).Return(false).Times(4) + mockVirtualQueueManager.EXPECT().InsertSingleTask(gomock.Any()).Return(false).Times(4) expectedKey := persistence.NewHistoryTaskKey(baseTime.Add(30*time.Minute), 0) mockVirtualQueueManager.EXPECT().ResetProgress(expectedKey).Times(1) diff --git a/service/history/queuev2/virtual_queue_manager_mock.go b/service/history/queuev2/virtual_queue_manager_mock.go index 4fdb0187d64..29136036ccd 100644 --- a/service/history/queuev2/virtual_queue_manager_mock.go +++ b/service/history/queuev2/virtual_queue_manager_mock.go @@ -71,15 +71,15 @@ func (mr *MockVirtualQueueManagerMockRecorder) GetOrCreateVirtualQueue(arg0 any) // InsertSingleTask mocks base method. func (m *MockVirtualQueueManager) InsertSingleTask(arg0 task.Task) bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InsertSingleTaskToRootQueue", arg0) + ret := m.ctrl.Call(m, "InsertSingleTask", arg0) ret0, _ := ret[0].(bool) return ret0 } -// InsertSingleTaskToRootQueue indicates an expected call of InsertSingleTaskToRootQueue. -func (mr *MockVirtualQueueManagerMockRecorder) InsertSingleTaskToRootQueue(arg0 any) *gomock.Call { +// InsertSingleTask indicates an expected call of InsertSingleTask. +func (mr *MockVirtualQueueManagerMockRecorder) InsertSingleTask(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertSingleTaskToRootQueue", reflect.TypeOf((*MockVirtualQueueManager)(nil).InsertSingleTask), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertSingleTask", reflect.TypeOf((*MockVirtualQueueManager)(nil).InsertSingleTask), arg0) } // ResetProgress mocks base method.