diff --git a/libs/server/Storage/SizeTracker/CacheSizeTracker.cs b/libs/server/Storage/SizeTracker/CacheSizeTracker.cs index 96c12069f6a..2efed592666 100644 --- a/libs/server/Storage/SizeTracker/CacheSizeTracker.cs +++ b/libs/server/Storage/SizeTracker/CacheSizeTracker.cs @@ -62,14 +62,14 @@ public CacheSizeTracker(TsavoriteKV store, long { mainLogTracker = new LogSizeTracker(store.Log, targetSize, targetSize / HighTargetSizeDeltaFraction, targetSize / LowTargetSizeDeltaFraction, loggerFactory?.CreateLogger("MainLogSizeTracker")); - store.Log.SetLogSizeTracker(mainLogTracker); + store.Log.SubscribeEvictions(mainLogTracker); } if (store.ReadCache != null && readCacheTargetSize > 0) { readCacheTracker = new LogSizeTracker(store.ReadCache, readCacheTargetSize, readCacheTargetSize / HighTargetSizeDeltaFraction, readCacheTargetSize / LowTargetSizeDeltaFraction, loggerFactory?.CreateLogger("ReadCacheSizeTracker")); - store.ReadCache.SetLogSizeTracker(readCacheTracker); + store.ReadCache.SubscribeEvictions(readCacheTracker); } } diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs index 2a47f547a68..337824cb652 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs @@ -990,7 +990,7 @@ internal void ShiftReadOnlyAddressWithWait(long newReadOnlyAddress, bool wait) /// /// New ReadOnlyAddress /// New HeadAddress - /// Wait for operation to complete (may involve page flushing and closing) + /// Wait for eviction to complete, i.e., until ClosedUntilAddress catches up (may involve page flushing, closing, and eviction callbacks) public void ShiftAddressesWithWait(long newReadOnlyAddress, long newHeadAddress, bool waitForEviction) { Debug.Assert(newHeadAddress <= newReadOnlyAddress, $"new HeadAddress {newHeadAddress} must not be ahead of newReadOnlyAddress {newReadOnlyAddress}"); @@ -1010,14 +1010,14 @@ public void ShiftAddressesWithWait(long newReadOnlyAddress, long newHeadAddress, epoch.Suspend(); } - while (waitForEviction && SafeHeadAddress < newHeadAddress) + while (waitForEviction && ClosedUntilAddress < newHeadAddress) _ = Thread.Yield(); return; } // Epoch already protected, so launch the shift and wait for eviction to complete _ = ShiftHeadAddress(newHeadAddress); - while (waitForEviction && SafeHeadAddress < newHeadAddress) + while (waitForEviction && ClosedUntilAddress < newHeadAddress) epoch.ProtectAndDrain(); } diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Common/LogSizeTracker.cs b/libs/storage/Tsavorite/cs/src/core/Index/Common/LogSizeTracker.cs index a56c86183f5..d1fd55c8c44 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Common/LogSizeTracker.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Common/LogSizeTracker.cs @@ -394,8 +394,8 @@ private void ResizeIfNeeded(CancellationToken cancellationToken) // ShiftHeadAddress caps the new HeadAddress at FlushedUntilAddress. Wait until the SHA eviction is complete to avoid going further over budget. logAccessor.ShiftAddresses(readOnlyAddress, headAddress, waitForEviction: true); - // Now subtract what we were able to trim from heapSize. Inline page total size is tracked separately in logAccessor.MemorySizeBytes. - heapSize.Increment(-heapTrimmedSize); + // Heap size subtraction is handled by the OnNext eviction callback (called during ShiftAddresses), + // which subtracts each record's CURRENT HeapMemorySize at eviction time. Debug.Assert(heapSize.Total >= 0, $"HeapSize.Total should be >= 0 but is {heapSize.Total} in Resize"); // Calculate the number of trimmed pages and report the new expected AllocatedPageCount here, since our last iteration (which may have been the only one) diff --git a/test/Garnet.test/RespListTests.cs b/test/Garnet.test/RespListTests.cs index 91eb4682645..7f1826aa629 100644 --- a/test/Garnet.test/RespListTests.cs +++ b/test/Garnet.test/RespListTests.cs @@ -1076,7 +1076,7 @@ public void CanHandleNoPrexistentKey() [Test] [Repeat(10)] - public void ListPushPopStressTest() + public async Task ListPushPopStressTest() { using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); var db = redis.GetDatabase(0); @@ -1102,22 +1102,22 @@ public void ListPushPopStressTest() await db.ListLeftPushAsync(key, j).ConfigureAwait(false); }); - tasks[i + 1] = Task.Run(() => + tasks[i + 1] = Task.Run(async () => { var key = keyArray[idx >> 1]; for (int j = 0; j < ppCount; j++) { - var value = db.ListRightPop(key); + var value = await db.ListRightPopAsync(key).ConfigureAwait(false); while (value.IsNull) { - Thread.Yield(); - value = db.ListRightPop(key); + await Task.Delay(1).ConfigureAwait(false); + value = await db.ListRightPopAsync(key).ConfigureAwait(false); } ClassicAssert.IsTrue((int)value >= 0 && (int)value < ppCount, "Pop value inconsistency"); } }); } - Task.WaitAll(tasks); + await Task.WhenAll(tasks); foreach (var key in keyArray) {