@@ -48,7 +48,7 @@ class LeapArray {
4848 const int32_t bucket_length_ms_; // time length of each bucket
4949 private:
5050 const std::unique_ptr<WindowWrapSharedPtr<T>[]> array_;
51- std::mutex mtx_ ;
51+ mutable std::mutex window_wrap_array_mtx_ ;
5252
5353 int32_t CalculateTimeIdx (/* @Valid*/ int64_t time_millis) const ;
5454 int64_t CalculateWindowStart (/* @Valid*/ int64_t time_millis) const ;
@@ -78,24 +78,21 @@ WindowWrapSharedPtr<T> LeapArray<T>::CurrentWindow(int64_t time_millis) {
7878 int64_t bucket_start = CalculateWindowStart (time_millis);
7979
8080 while (true ) {
81- WindowWrapSharedPtr<T> old = array_[idx];
82- if (old == nullptr ) {
83- std::unique_lock<std::mutex> lck (mtx_, std::defer_lock);
84- if (lck.try_lock () && array_[idx] == nullptr ) {
85- WindowWrapSharedPtr<T> bucket = std::make_shared<WindowWrap<T>>(
86- bucket_length_ms_, bucket_start, NewEmptyBucket (time_millis));
87- array_[idx] = bucket;
88- return bucket;
89- }
90- } else if (bucket_start == old->BucketStart ()) {
91- return old;
92- } else if (bucket_start > old->BucketStart ()) {
93- std::unique_lock<std::mutex> lck (mtx_, std::defer_lock);
81+ std::unique_lock<std::mutex> lck (window_wrap_array_mtx_, std::defer_lock);
82+ if (lck.try_lock () && array_[idx] == nullptr ) {
83+ WindowWrapSharedPtr<T> bucket = std::make_shared<WindowWrap<T>>(
84+ bucket_length_ms_, bucket_start, NewEmptyBucket (time_millis));
85+ array_[idx] = bucket;
86+ return bucket;
87+ } else if (bucket_start == array_[idx]->BucketStart ()) {
88+ return array_[idx];
89+ } else if (bucket_start > array_[idx]->BucketStart ()) {
90+ std::unique_lock<std::mutex> lck (window_wrap_array_mtx_, std::defer_lock);
9491 if (lck.try_lock ()) {
95- ResetWindowTo (old , bucket_start);
96- return old ;
92+ ResetWindowTo (array_[idx] , bucket_start);
93+ return array_[idx] ;
9794 }
98- } else if (bucket_start < old ->BucketStart ()) {
95+ } else if (bucket_start < array_[idx] ->BucketStart ()) {
9996 // Should not go through here, as the provided time is already behind.
10097 return std::make_shared<WindowWrap<T>>(bucket_length_ms_, bucket_start,
10198 NewEmptyBucket (time_millis));
@@ -148,7 +145,9 @@ std::vector<WindowWrapSharedPtr<T>> LeapArray<T>::Buckets(
148145 }
149146 int size = sample_count_; // array_.size()
150147 for (int i = 0 ; i < size; i++) {
148+ window_wrap_array_mtx_.lock ();
151149 auto w = array_[i];
150+ window_wrap_array_mtx_.unlock ();
152151 if (w == nullptr || IsBucketDeprecated (time_millis, w)) {
153152 continue ;
154153 }
@@ -166,7 +165,9 @@ std::vector<std::shared_ptr<T>> LeapArray<T>::Values(
166165 }
167166 int size = sample_count_; // array_.size()
168167 for (int i = 0 ; i < size; i++) {
168+ window_wrap_array_mtx_.lock ();
169169 WindowWrapSharedPtr<T> w = array_[i];
170+ window_wrap_array_mtx_.unlock ();
170171 if (w == nullptr || IsBucketDeprecated (time_millis, w)) {
171172 continue ;
172173 }
0 commit comments