From 756862d1e4793334810f681257eb7593ebcc82eb Mon Sep 17 00:00:00 2001 From: j2rong4cn Date: Sat, 20 Jun 2026 08:43:38 +0800 Subject: [PATCH 1/2] refactor(generic_sync): simplify MapOf implementation using sync.Map --- pkg/generic_sync/map.go | 417 ++++------------------------------------ 1 file changed, 37 insertions(+), 380 deletions(-) diff --git a/pkg/generic_sync/map.go b/pkg/generic_sync/map.go index 96612f0cc..b3eaddf4a 100644 --- a/pkg/generic_sync/map.go +++ b/pkg/generic_sync/map.go @@ -1,412 +1,69 @@ -// Copyright 2016 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - package generic_sync import ( "sync" - "sync/atomic" - "unsafe" ) -// MapOf is like a Go map[interface{}]interface{} but is safe for concurrent use -// by multiple goroutines without additional locking or coordination. -// Loads, stores, and deletes run in amortized constant time. -// -// The MapOf type is specialized. Most code should use a plain Go map instead, -// with separate locking or coordination, for better type safety and to make it -// easier to maintain other invariants along with the map content. -// -// The MapOf type is optimized for two common use cases: (1) when the entry for a given -// key is only ever written once but read many times, as in caches that only grow, -// or (2) when multiple goroutines read, write, and overwrite entries for disjoint -// sets of keys. In these two cases, use of a MapOf may significantly reduce lock -// contention compared to a Go map paired with a separate Mutex or RWMutex. -// -// The zero MapOf is empty and ready for use. A MapOf must not be copied after first use. type MapOf[K comparable, V any] struct { - mu sync.Mutex - - // read contains the portion of the map's contents that are safe for - // concurrent access (with or without mu held). - // - // The read field itself is always safe to load, but must only be stored with - // mu held. - // - // Entries stored in read may be updated concurrently without mu, but updating - // a previously-expunged entry requires that the entry be copied to the dirty - // map and unexpunged with mu held. - read atomic.Value // readOnly - - // dirty contains the portion of the map's contents that require mu to be - // held. To ensure that the dirty map can be promoted to the read map quickly, - // it also includes all of the non-expunged entries in the read map. - // - // Expunged entries are not stored in the dirty map. An expunged entry in the - // clean map must be unexpunged and added to the dirty map before a new value - // can be stored to it. - // - // If the dirty map is nil, the next write to the map will initialize it by - // making a shallow copy of the clean map, omitting stale entries. - dirty map[K]*entry[V] - - // misses counts the number of loads since the read map was last updated that - // needed to lock mu to determine whether the key was present. - // - // Once enough misses have occurred to cover the cost of copying the dirty - // map, the dirty map will be promoted to the read map (in the unamended - // state) and the next store to the map will make a new dirty copy. - misses int + sync.Map } -// readOnly is an immutable struct stored atomically in the MapOf.read field. -type readOnly[K comparable, V any] struct { - m map[K]*entry[V] - amended bool // true if the dirty map contains some key not in m. +func (m *MapOf[K, V]) CompareAndDelete(key K, old V) (deleted bool) { + return m.Map.CompareAndDelete(key, old) } - -// expunged is an arbitrary pointer that marks entries which have been deleted -// from the dirty map. -var expunged = unsafe.Pointer(new(interface{})) - -// An entry is a slot in the map corresponding to a particular key. -type entry[V any] struct { - // p points to the interface{} value stored for the entry. - // - // If p == nil, the entry has been deleted and m.dirty == nil. - // - // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry - // is missing from m.dirty. - // - // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty - // != nil, in m.dirty[key]. - // - // An entry can be deleted by atomic replacement with nil: when m.dirty is - // next created, it will atomically replace nil with expunged and leave - // m.dirty[key] unset. - // - // An entry's associated value can be updated by atomic replacement, provided - // p != expunged. If p == expunged, an entry's associated value can be updated - // only after first setting m.dirty[key] = e so that lookups using the dirty - // map find the entry. - p unsafe.Pointer // *interface{} +func (m *MapOf[K, V]) CompareAndSwap(key K, old V, new V) (swapped bool) { + return m.Map.CompareAndSwap(key, old, new) } - -func newEntry[V any](i V) *entry[V] { - return &entry[V]{p: unsafe.Pointer(&i)} +func (m *MapOf[K, V]) Delete(key K) { + m.Map.Delete(key) } - -// Load returns the value stored in the map for a key, or nil if no -// value is present. -// The ok result indicates whether value was found in the map. func (m *MapOf[K, V]) Load(key K) (value V, ok bool) { - read, _ := m.read.Load().(readOnly[K, V]) - e, ok := read.m[key] - if !ok && read.amended { - m.mu.Lock() - // Avoid reporting a spurious miss if m.dirty got promoted while we were - // blocked on m.mu. (If further loads of the same key will not miss, it's - // not worth copying the dirty map for this key.) - read, _ = m.read.Load().(readOnly[K, V]) - e, ok = read.m[key] - if !ok && read.amended { - e, ok = m.dirty[key] - // Regardless of whether the entry was present, record a miss: this key - // will take the slow path until the dirty map is promoted to the read - // map. - m.missLocked() - } - m.mu.Unlock() - } - if !ok { - return value, false - } - return e.load() -} - -func (m *MapOf[K, V]) Has(key K) bool { - _, ok := m.Load(key) - return ok -} - -func (e *entry[V]) load() (value V, ok bool) { - p := atomic.LoadPointer(&e.p) - if p == nil || p == expunged { - return value, false - } - return *(*V)(p), true -} - -// Store sets the value for a key. -func (m *MapOf[K, V]) Store(key K, value V) { - read, _ := m.read.Load().(readOnly[K, V]) - if e, ok := read.m[key]; ok && e.tryStore(&value) { - return - } - - m.mu.Lock() - read, _ = m.read.Load().(readOnly[K, V]) - if e, ok := read.m[key]; ok { - if e.unexpungeLocked() { - // The entry was previously expunged, which implies that there is a - // non-nil dirty map and this entry is not in it. - m.dirty[key] = e - } - e.storeLocked(&value) - } else if e, ok := m.dirty[key]; ok { - e.storeLocked(&value) - } else { - if !read.amended { - // We're adding the first new key to the dirty map. - // Make sure it is allocated and mark the read-only map as incomplete. - m.dirtyLocked() - m.read.Store(readOnly[K, V]{m: read.m, amended: true}) - } - m.dirty[key] = newEntry(value) + v, ok := m.Map.Load(key) + if ok { + value = v.(V) } - m.mu.Unlock() + return value, ok } - -// tryStore stores a value if the entry has not been expunged. -// -// If the entry is expunged, tryStore returns false and leaves the entry -// unchanged. -func (e *entry[V]) tryStore(i *V) bool { - for { - p := atomic.LoadPointer(&e.p) - if p == expunged { - return false - } - if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { - return true - } +func (m *MapOf[K, V]) LoadAndDelete(key K) (value V, loaded bool) { + v, loaded := m.Map.LoadAndDelete(key) + if loaded { + value = v.(V) } + return value, loaded } - -// unexpungeLocked ensures that the entry is not marked as expunged. -// -// If the entry was previously expunged, it must be added to the dirty map -// before m.mu is unlocked. -func (e *entry[V]) unexpungeLocked() (wasExpunged bool) { - return atomic.CompareAndSwapPointer(&e.p, expunged, nil) -} - -// storeLocked unconditionally stores a value to the entry. -// -// The entry must be known not to be expunged. -func (e *entry[V]) storeLocked(i *V) { - atomic.StorePointer(&e.p, unsafe.Pointer(i)) -} - -// LoadOrStore returns the existing value for the key if present. -// Otherwise, it stores and returns the given value. -// The loaded result is true if the value was loaded, false if stored. func (m *MapOf[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) { - // Avoid locking if it's a clean hit. - read, _ := m.read.Load().(readOnly[K, V]) - if e, ok := read.m[key]; ok { - actual, loaded, ok := e.tryLoadOrStore(value) - if ok { - return actual, loaded - } - } - - m.mu.Lock() - read, _ = m.read.Load().(readOnly[K, V]) - if e, ok := read.m[key]; ok { - if e.unexpungeLocked() { - m.dirty[key] = e - } - actual, loaded, _ = e.tryLoadOrStore(value) - } else if e, ok := m.dirty[key]; ok { - actual, loaded, _ = e.tryLoadOrStore(value) - m.missLocked() - } else { - if !read.amended { - // We're adding the first new key to the dirty map. - // Make sure it is allocated and mark the read-only map as incomplete. - m.dirtyLocked() - m.read.Store(readOnly[K, V]{m: read.m, amended: true}) - } - m.dirty[key] = newEntry(value) - actual, loaded = value, false + a, loaded := m.Map.LoadOrStore(key, value) + if loaded { + actual = a.(V) } - m.mu.Unlock() - return actual, loaded } - -// tryLoadOrStore atomically loads or stores a value if the entry is not -// expunged. -// -// If the entry is expunged, tryLoadOrStore leaves the entry unchanged and -// returns with ok==false. -func (e *entry[V]) tryLoadOrStore(i V) (actual V, loaded, ok bool) { - p := atomic.LoadPointer(&e.p) - if p == expunged { - return actual, false, false - } - if p != nil { - return *(*V)(p), true, true - } - - // Copy the interface after the first load to make this method more amenable - // to escape analysis: if we hit the "load" path or the entry is expunged, we - // shouldn'V bother heap-allocating. - ic := i - for { - if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) { - return i, false, true - } - p = atomic.LoadPointer(&e.p) - if p == expunged { - return actual, false, false - } - if p != nil { - return *(*V)(p), true, true - } - } -} - -// Delete deletes the value for a key. -func (m *MapOf[K, V]) Delete(key K) { - read, _ := m.read.Load().(readOnly[K, V]) - e, ok := read.m[key] - if !ok && read.amended { - m.mu.Lock() - read, _ = m.read.Load().(readOnly[K, V]) - e, ok = read.m[key] - if !ok && read.amended { - delete(m.dirty, key) - } - m.mu.Unlock() - } - if ok { - e.delete() - } -} - -func (e *entry[V]) delete() (hadValue bool) { - for { - p := atomic.LoadPointer(&e.p) - if p == nil || p == expunged { - return false - } - if atomic.CompareAndSwapPointer(&e.p, p, nil) { - return true - } - } -} - -// Range calls f sequentially for each key and value present in the map. -// If f returns false, range stops the iteration. -// -// Range does not necessarily correspond to any consistent snapshot of the MapOf's -// contents: no key will be visited more than once, but if the value for any key -// is stored or deleted concurrently, Range may reflect any mapping for that key -// from any point during the Range call. -// -// Range may be O(N) with the number of elements in the map even if f returns -// false after a constant number of calls. func (m *MapOf[K, V]) Range(f func(key K, value V) bool) { - // We need to be able to iterate over all of the keys that were already - // present at the start of the call to Range. - // If read.amended is false, then read.m satisfies that property without - // requiring us to hold m.mu for a long time. - read, _ := m.read.Load().(readOnly[K, V]) - if read.amended { - // m.dirty contains keys not in read.m. Fortunately, Range is already O(N) - // (assuming the caller does not break out early), so a call to Range - // amortizes an entire copy of the map: we can promote the dirty copy - // immediately! - m.mu.Lock() - read, _ = m.read.Load().(readOnly[K, V]) - if read.amended { - read = readOnly[K, V]{m: m.dirty} - m.read.Store(read) - m.dirty = nil - m.misses = 0 - } - m.mu.Unlock() - } - - for k, e := range read.m { - v, ok := e.load() - if !ok { - continue - } - if !f(k, v) { - break - } - } -} - -// Values returns a slice of the values in the map. -func (m *MapOf[K, V]) Values() []V { - var values []V - m.Range(func(key K, value V) bool { - values = append(values, value) - return true + m.Map.Range(func(key, value any) bool { + return f(key.(K), value.(V)) }) - return values } - -func (m *MapOf[K, V]) Count() int { - return len(m.dirty) +func (m *MapOf[K, V]) Store(key K, value V) { + m.Map.Store(key, value) } - -func (m *MapOf[K, V]) Empty() bool { - return m.Count() == 0 +func (m *MapOf[K, V]) Swap(key K, value V) (previous V, loaded bool) { + p, loaded := m.Map.Swap(key, value) + if loaded { + previous = p.(V) + } + return previous, loaded } -func (m *MapOf[K, V]) ToMap() map[K]V { - ans := make(map[K]V) - m.Range(func(key K, value V) bool { - ans[key] = value - return true - }) - return ans +func (m *MapOf[K, V]) Has(key K) bool { + _, ok := m.Map.Load(key) + return ok } -func (m *MapOf[K, V]) Clear() { - m.Range(func(key K, value V) bool { - m.Delete(key) +func (m *MapOf[K, V]) Values() []V { + var res []V + m.Map.Range(func(_, value any) bool { + res = append(res, value.(V)) return true }) -} - -func (m *MapOf[K, V]) missLocked() { - m.misses++ - if m.misses < len(m.dirty) { - return - } - m.read.Store(readOnly[K, V]{m: m.dirty}) - m.dirty = nil - m.misses = 0 -} - -func (m *MapOf[K, V]) dirtyLocked() { - if m.dirty != nil { - return - } - - read, _ := m.read.Load().(readOnly[K, V]) - m.dirty = make(map[K]*entry[V], len(read.m)) - for k, e := range read.m { - if !e.tryExpungeLocked() { - m.dirty[k] = e - } - } -} - -func (e *entry[V]) tryExpungeLocked() (isExpunged bool) { - p := atomic.LoadPointer(&e.p) - for p == nil { - if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { - return true - } - p = atomic.LoadPointer(&e.p) - } - return p == expunged + return res } From 9ac2f3b7f00713d8a181c3a769eef843368432fa Mon Sep 17 00:00:00 2001 From: j2rong4cn Date: Sat, 20 Jun 2026 08:45:40 +0800 Subject: [PATCH 2/2] refactor(task): remove task package and related files --- pkg/generic_sync/map_test.go | 74 ------------------ pkg/task/errors.go | 8 -- pkg/task/manager.go | 145 ----------------------------------- pkg/task/task.go | 124 ------------------------------ pkg/task/task_test.go | 96 ----------------------- 5 files changed, 447 deletions(-) delete mode 100644 pkg/generic_sync/map_test.go delete mode 100644 pkg/task/errors.go delete mode 100644 pkg/task/manager.go delete mode 100644 pkg/task/task.go delete mode 100644 pkg/task/task_test.go diff --git a/pkg/generic_sync/map_test.go b/pkg/generic_sync/map_test.go deleted file mode 100644 index 9a3a03ef6..000000000 --- a/pkg/generic_sync/map_test.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2016 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package generic_sync_test - -import ( - "math/rand" - "runtime" - "sync" - "testing" - - "github.com/OpenListTeam/OpenList/v4/pkg/generic_sync" -) - -func TestConcurrentRange(t *testing.T) { - const mapSize = 1 << 10 - - m := new(generic_sync.MapOf[int64, int64]) - for n := int64(1); n <= mapSize; n++ { - m.Store(n, int64(n)) - } - - done := make(chan struct{}) - var wg sync.WaitGroup - defer func() { - close(done) - wg.Wait() - }() - for g := int64(runtime.GOMAXPROCS(0)); g > 0; g-- { - r := rand.New(rand.NewSource(g)) - wg.Add(1) - go func(g int64) { - defer wg.Done() - for i := int64(0); ; i++ { - select { - case <-done: - return - default: - } - for n := int64(1); n < mapSize; n++ { - if r.Int63n(mapSize) == 0 { - m.Store(n, n*i*g) - } else { - m.Load(n) - } - } - } - }(g) - } - - iters := 1 << 10 - if testing.Short() { - iters = 16 - } - for n := iters; n > 0; n-- { - seen := make(map[int64]bool, mapSize) - - m.Range(func(k, v int64) bool { - if v%k != 0 { - t.Fatalf("while Storing multiples of %v, Range saw value %v", k, v) - } - if seen[k] { - t.Fatalf("Range visited key %v twice", k) - } - seen[k] = true - return true - }) - - if len(seen) != mapSize { - t.Fatalf("Range visited %v elements of %v-element MapOf", len(seen), mapSize) - } - } -} diff --git a/pkg/task/errors.go b/pkg/task/errors.go deleted file mode 100644 index 3f2c4302e..000000000 --- a/pkg/task/errors.go +++ /dev/null @@ -1,8 +0,0 @@ -package task - -import "errors" - -var ( - ErrTaskNotFound = errors.New("task not found") - ErrTaskRunning = errors.New("task is running") -) diff --git a/pkg/task/manager.go b/pkg/task/manager.go deleted file mode 100644 index 69a52c3cf..000000000 --- a/pkg/task/manager.go +++ /dev/null @@ -1,145 +0,0 @@ -package task - -import ( - "github.com/OpenListTeam/OpenList/v4/pkg/generic_sync" - "github.com/OpenListTeam/OpenList/v4/pkg/utils" - "github.com/pkg/errors" - log "github.com/sirupsen/logrus" -) - -type Manager[K comparable] struct { - curID K - workerC chan struct{} - updateID func(*K) - tasks generic_sync.MapOf[K, *Task[K]] -} - -func (tm *Manager[K]) Submit(task *Task[K]) K { - if tm.updateID != nil { - tm.updateID(&tm.curID) - task.ID = tm.curID - } - tm.tasks.Store(task.ID, task) - tm.do(task) - return task.ID -} - -func (tm *Manager[K]) do(task *Task[K]) { - go func() { - log.Debugf("task [%s] waiting for worker", task.Name) - select { - case <-tm.workerC: - log.Debugf("task [%s] starting", task.Name) - task.run() - log.Debugf("task [%s] ended", task.Name) - case <-task.Ctx.Done(): - log.Debugf("task [%s] canceled", task.Name) - return - } - // return worker - tm.workerC <- struct{}{} - }() -} - -func (tm *Manager[K]) GetAll() []*Task[K] { - return tm.tasks.Values() -} - -func (tm *Manager[K]) Get(tid K) (*Task[K], bool) { - return tm.tasks.Load(tid) -} - -func (tm *Manager[K]) MustGet(tid K) *Task[K] { - task, _ := tm.Get(tid) - return task -} - -func (tm *Manager[K]) Retry(tid K) error { - t, ok := tm.Get(tid) - if !ok { - return errors.WithStack(ErrTaskNotFound) - } - tm.do(t) - return nil -} - -func (tm *Manager[K]) Cancel(tid K) error { - t, ok := tm.Get(tid) - if !ok { - return errors.WithStack(ErrTaskNotFound) - } - t.Cancel() - return nil -} - -func (tm *Manager[K]) Remove(tid K) error { - t, ok := tm.Get(tid) - if !ok { - return errors.WithStack(ErrTaskNotFound) - } - if !t.Done() { - return errors.WithStack(ErrTaskRunning) - } - tm.tasks.Delete(tid) - return nil -} - -// RemoveAll removes all tasks from the manager, this maybe shouldn't be used -// because the task maybe still running. -func (tm *Manager[K]) RemoveAll() { - tm.tasks.Clear() -} - -func (tm *Manager[K]) RemoveByStates(states ...string) { - tasks := tm.GetAll() - for _, task := range tasks { - if utils.SliceContains(states, task.GetState()) { - _ = tm.Remove(task.ID) - } - } -} - -func (tm *Manager[K]) GetByStates(states ...string) []*Task[K] { - var tasks []*Task[K] - tm.tasks.Range(func(key K, value *Task[K]) bool { - if utils.SliceContains(states, value.GetState()) { - tasks = append(tasks, value) - } - return true - }) - return tasks -} - -func (tm *Manager[K]) ListUndone() []*Task[K] { - return tm.GetByStates(PENDING, RUNNING, CANCELING) -} - -func (tm *Manager[K]) ListDone() []*Task[K] { - return tm.GetByStates(SUCCEEDED, CANCELED, ERRORED) -} - -func (tm *Manager[K]) ClearDone() { - tm.RemoveByStates(SUCCEEDED, CANCELED, ERRORED) -} - -func (tm *Manager[K]) ClearSucceeded() { - tm.RemoveByStates(SUCCEEDED) -} - -func (tm *Manager[K]) RawTasks() *generic_sync.MapOf[K, *Task[K]] { - return &tm.tasks -} - -func NewTaskManager[K comparable](maxWorker int, updateID ...func(*K)) *Manager[K] { - tm := &Manager[K]{ - tasks: generic_sync.MapOf[K, *Task[K]]{}, - workerC: make(chan struct{}, maxWorker), - } - for i := 0; i < maxWorker; i++ { - tm.workerC <- struct{}{} - } - if len(updateID) > 0 { - tm.updateID = updateID[0] - } - return tm -} diff --git a/pkg/task/task.go b/pkg/task/task.go deleted file mode 100644 index 5b634f10c..000000000 --- a/pkg/task/task.go +++ /dev/null @@ -1,124 +0,0 @@ -// Package task manage task, such as file upload, file copy between storages, offline download, etc. -package task - -import ( - "context" - "runtime" - - "github.com/pkg/errors" - log "github.com/sirupsen/logrus" -) - -var ( - PENDING = "pending" - RUNNING = "running" - SUCCEEDED = "succeeded" - CANCELING = "canceling" - CANCELED = "canceled" - ERRORED = "errored" -) - -type Func[K comparable] func(task *Task[K]) error -type Callback[K comparable] func(task *Task[K]) - -type Task[K comparable] struct { - ID K - Name string - state string // pending, running, finished, canceling, canceled, errored - status string - progress float64 - - Error error - - Func Func[K] - callback Callback[K] - - Ctx context.Context - cancel context.CancelFunc -} - -func (t *Task[K]) SetStatus(status string) { - t.status = status -} - -func (t *Task[K]) SetProgress(percentage float64) { - t.progress = percentage -} - -func (t Task[K]) GetProgress() float64 { - return t.progress -} - -func (t Task[K]) GetState() string { - return t.state -} - -func (t Task[K]) GetStatus() string { - return t.status -} - -func (t Task[K]) GetErrMsg() string { - if t.Error == nil { - return "" - } - return t.Error.Error() -} - -func getCurrentGoroutineStack() string { - buf := make([]byte, 1<<16) - n := runtime.Stack(buf, false) - return string(buf[:n]) -} - -func (t *Task[K]) run() { - t.state = RUNNING - defer func() { - if err := recover(); err != nil { - log.Errorf("error [%s] while run task [%s],stack trace:\n%s", err, t.Name, getCurrentGoroutineStack()) - t.Error = errors.Errorf("panic: %+v", err) - t.state = ERRORED - } - }() - t.Error = t.Func(t) - if t.Error != nil { - log.Errorf("error [%+v] while run task [%s]", t.Error, t.Name) - } - if errors.Is(t.Ctx.Err(), context.Canceled) { - t.state = CANCELED - } else if t.Error != nil { - t.state = ERRORED - } else { - t.state = SUCCEEDED - t.SetProgress(100) - if t.callback != nil { - t.callback(t) - } - } -} - -func (t *Task[K]) retry() { - t.run() -} - -func (t *Task[K]) Done() bool { - return t.state == SUCCEEDED || t.state == CANCELED || t.state == ERRORED -} - -func (t *Task[K]) Cancel() { - if t.state == SUCCEEDED || t.state == CANCELED { - return - } - if t.cancel != nil { - t.cancel() - } - // maybe can't cancel - t.state = CANCELING -} - -func WithCancelCtx[K comparable](task *Task[K]) *Task[K] { - ctx, cancel := context.WithCancel(context.Background()) - task.Ctx = ctx - task.cancel = cancel - task.state = PENDING - return task -} diff --git a/pkg/task/task_test.go b/pkg/task/task_test.go deleted file mode 100644 index 94f283094..000000000 --- a/pkg/task/task_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package task - -import ( - "sync/atomic" - "testing" - "time" - - "github.com/OpenListTeam/OpenList/v4/pkg/utils" - "github.com/pkg/errors" -) - -func TestTask_Manager(t *testing.T) { - tm := NewTaskManager(3, func(id *uint64) { - atomic.AddUint64(id, 1) - }) - id := tm.Submit(WithCancelCtx(&Task[uint64]{ - Name: "test", - Func: func(task *Task[uint64]) error { - time.Sleep(time.Millisecond * 500) - return nil - }, - })) - task, ok := tm.Get(id) - if !ok { - t.Fatal("task not found") - } - time.Sleep(time.Millisecond * 100) - if task.state != RUNNING { - t.Errorf("task status not running: %s", task.state) - } - time.Sleep(time.Second) - if task.state != SUCCEEDED { - t.Errorf("task status not finished: %s", task.state) - } -} - -func TestTask_Cancel(t *testing.T) { - tm := NewTaskManager(3, func(id *uint64) { - atomic.AddUint64(id, 1) - }) - id := tm.Submit(WithCancelCtx(&Task[uint64]{ - Name: "test", - Func: func(task *Task[uint64]) error { - for { - if utils.IsCanceled(task.Ctx) { - return nil - } else { - t.Logf("task is running") - } - } - }, - })) - task, ok := tm.Get(id) - if !ok { - t.Fatal("task not found") - } - time.Sleep(time.Microsecond * 50) - task.Cancel() - time.Sleep(time.Millisecond) - if task.state != CANCELED { - t.Errorf("task status not canceled: %s", task.state) - } -} - -func TestTask_Retry(t *testing.T) { - tm := NewTaskManager(3, func(id *uint64) { - atomic.AddUint64(id, 1) - }) - num := 0 - id := tm.Submit(WithCancelCtx(&Task[uint64]{ - Name: "test", - Func: func(task *Task[uint64]) error { - num++ - if num&1 == 1 { - return errors.New("test error") - } - return nil - }, - })) - task, ok := tm.Get(id) - if !ok { - t.Fatal("task not found") - } - time.Sleep(time.Millisecond) - if task.Error == nil { - t.Error(task.state) - t.Fatal("task error is nil, but expected error") - } else { - t.Logf("task error: %s", task.Error) - } - task.retry() - time.Sleep(time.Millisecond) - if task.Error != nil { - t.Errorf("task error: %+v, but expected nil", task.Error) - } -}