From 401a326497dbc1fd0061e0ad12d3b34b8ec58daf Mon Sep 17 00:00:00 2001 From: rameel Date: Sun, 22 Mar 2026 02:46:48 +0500 Subject: [PATCH 1/4] Make Flash/FlashAsync no-op --- .../S3UploadStream.cs | 50 +++++------ .../WritableAmazonFileSystemTests.cs | 85 ++++++++++++++++++- 2 files changed, 107 insertions(+), 28 deletions(-) diff --git a/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs b/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs index 0c5eaa4..1273948 100644 --- a/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs +++ b/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs @@ -17,6 +17,9 @@ internal sealed class S3UploadStream : Stream { private const long PartSize = 5 * 1024 * 1024; + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + private const long MaxPartSize = 5L * 1024 * 1024 * 1024; + private readonly IAmazonS3 _client; private readonly string _bucketName; private readonly string _key; @@ -146,16 +149,11 @@ public override void SetLength(long value) => /// public override void Flush() { - _stream.Flush(); - UploadPart(); } /// - public override async Task FlushAsync(CancellationToken cancellationToken) - { - await _stream.FlushAsync(cancellationToken).ConfigureAwait(false); - await UploadPartAsync(cancellationToken).ConfigureAwait(false); - } + public override Task FlushAsync(CancellationToken cancellationToken) => + Task.CompletedTask; /// protected override void Dispose(bool disposing) @@ -232,24 +230,28 @@ private async ValueTask UploadPartAsync(CancellationToken cancellationToken) { _stream.Position = 0; - // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html - // The maximum allowed part size is 5 gigabytes. - - var request = new UploadPartRequest + do { - BucketName = _bucketName, - Key = _key, - UploadId = _uploadId, - PartNumber = _partETags.Count + 1, - InputStream = _stream, - PartSize = _stream.Length - }; - - var response = await _client - .UploadPartAsync(request, cancellationToken) - .ConfigureAwait(false); - - _partETags.Add(new PartETag(response)); + var remaining = _stream.Length - _stream.Position; + var partSize = Math.Min(remaining, MaxPartSize); + + var request = new UploadPartRequest + { + BucketName = _bucketName, + Key = _key, + UploadId = _uploadId, + PartNumber = _partETags.Count + 1, + InputStream = _stream, + PartSize = partSize + }; + + var response = await _client + .UploadPartAsync(request, cancellationToken) + .ConfigureAwait(false); + + _partETags.Add(new PartETag(response)); + } + while (_stream.Position < _stream.Length); _stream.Position = 0; _stream.SetLength(0); diff --git a/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs b/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs index fc75b3b..10ca4f0 100644 --- a/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs +++ b/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs @@ -60,10 +60,8 @@ public async Task File_OpenWrite_InternalBufferWriteError_DoesNotCreateFile() var underlying = (FileStream)stream.GetType().GetField("_stream", BindingFlags.NonPublic | BindingFlags.Instance)!.GetValue(stream)!; Assert.That(underlying, Is.Not.Null); - await stream.WriteAsync(new ReadOnlyMemory(new byte[1024])); - - // Forces to upload buffer. - await stream.FlushAsync(); + // Write enough data to trigger automatic part upload (>= 5 MiB). + await stream.WriteAsync(new ReadOnlyMemory(new byte[6 * 1024 * 1024])); // Simulates an internal buffer write error. await underlying.DisposeAsync(); @@ -196,6 +194,85 @@ await reader.ReadToEndAsync(), } + [Test] + public async Task File_OpenWrite_FlushDoesNotCauseUndersizedParts() + { + using var fs = GetFileSystem(); + + var content = "Hello, World!"; + + { + await using var stream = await fs.OpenWriteAsync("/flush-test.txt"); + await using var writer = new StreamWriter(stream); + + // Write small data and flush multiple times. + // Flush should be a no-op and not upload undersized parts. + await writer.WriteAsync(content[..5]); + await writer.FlushAsync(); + await writer.WriteAsync(content[5..]); + await writer.FlushAsync(); + } + + { + var file = fs.GetFile("/flush-test.txt"); + Assert.That(await file.ExistsAsync(), Is.True); + + // ReSharper disable once UseAwaitUsing + using var stream = await file.OpenReadAsync(); + using var reader = new StreamReader(stream); + Assert.That(await reader.ReadToEndAsync(), Is.EqualTo(content)); + + await file.DeleteAsync(); + } + } + + [Test] + public async Task File_OpenWrite_FlushWithMultipartUpload() + { + using var fs = GetFileSystem(); + + // Write more than 5 MiB to trigger multipart upload, + // with Flush calls between writes. + var chunk = new byte[2 * 1024 * 1024]; + Random.Shared.NextBytes(chunk); + + { + await using var stream = await fs.OpenWriteAsync("/flush-multipart-test.bin"); + + // Write 4 chunks (8 MiB total) with flushes in between. + // Without the fix, each flush would upload an undersized part + // and CompleteMultipartUpload would fail. + for (var i = 0; i < 4; i++) + { + await stream.WriteAsync(chunk); + await stream.FlushAsync(); + } + } + + var file = fs.GetFile("/flush-multipart-test.bin"); + Assert.That(await file.ExistsAsync(), Is.True); + Assert.That(await file.GetLengthAsync(), Is.EqualTo(chunk.Length * 4)); + + await file.DeleteAsync(); + } + + [Test] + public async Task File_OpenWrite_EmptyFileWithFlush() + { + using var fs = GetFileSystem(); + + await using (var stream = await fs.OpenWriteAsync("/empty-flush-test.txt")) + await stream.FlushAsync(); + + fs.WriteAllBytesAsync() + + var file = fs.GetFile("/empty-flush-test.txt"); + Assert.That(await file.ExistsAsync(), Is.True); + Assert.That(await file.GetLengthAsync(), Is.EqualTo(0)); + + await file.DeleteAsync(); + } + [Test] public async Task Directory_BatchDeleting() { From 7125a0ef26ac6a6ed1ef1cdfa75c003a7a87e74d Mon Sep 17 00:00:00 2001 From: rameel Date: Sun, 22 Mar 2026 02:50:31 +0500 Subject: [PATCH 2/4] Clean up --- .../WritableAmazonFileSystemTests.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs b/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs index 10ca4f0..706e889 100644 --- a/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs +++ b/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs @@ -264,8 +264,6 @@ public async Task File_OpenWrite_EmptyFileWithFlush() await using (var stream = await fs.OpenWriteAsync("/empty-flush-test.txt")) await stream.FlushAsync(); - fs.WriteAllBytesAsync() - var file = fs.GetFile("/empty-flush-test.txt"); Assert.That(await file.ExistsAsync(), Is.True); Assert.That(await file.GetLengthAsync(), Is.EqualTo(0)); From d71862ed2c813045c853b2271da9fa40057238dd Mon Sep 17 00:00:00 2001 From: rameel Date: Wed, 25 Mar 2026 23:47:50 +0500 Subject: [PATCH 3/4] Revert part-splitting loop during S3 upload --- .../S3UploadStream.cs | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs b/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs index 1273948..656cd8a 100644 --- a/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs +++ b/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs @@ -15,10 +15,8 @@ namespace Ramstack.FileSystem.Amazon; /// internal sealed class S3UploadStream : Stream { - private const long PartSize = 5 * 1024 * 1024; - // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html - private const long MaxPartSize = 5L * 1024 * 1024 * 1024; + private const long PartSize = 5L * 1024 * 1024; private readonly IAmazonS3 _client; private readonly string _bucketName; @@ -230,28 +228,30 @@ private async ValueTask UploadPartAsync(CancellationToken cancellationToken) { _stream.Position = 0; - do + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + // The maximum allowed part size is 5 GiB. + // ----------------------------------------------------------------------------------- + // We don't need to worry about S3's 5 GiB part limit because: + // 1. All Write/WriteAsync methods are inherently limited by Array.MaxLength (~2 GiB). + // 2. The upload starts as soon as the buffer reaches MinPartSize (5 MiB). + // Even if a single write matches Array.MaxLength, the data is + // uploaded immediately, staying within AWS limits. + + var request = new UploadPartRequest { - var remaining = _stream.Length - _stream.Position; - var partSize = Math.Min(remaining, MaxPartSize); - - var request = new UploadPartRequest - { - BucketName = _bucketName, - Key = _key, - UploadId = _uploadId, - PartNumber = _partETags.Count + 1, - InputStream = _stream, - PartSize = partSize - }; - - var response = await _client - .UploadPartAsync(request, cancellationToken) - .ConfigureAwait(false); - - _partETags.Add(new PartETag(response)); - } - while (_stream.Position < _stream.Length); + BucketName = _bucketName, + Key = _key, + UploadId = _uploadId, + PartNumber = _partETags.Count + 1, + InputStream = _stream, + PartSize = _stream.Length + }; + + var response = await _client + .UploadPartAsync(request, cancellationToken) + .ConfigureAwait(false); + + _partETags.Add(new PartETag(response)); _stream.Position = 0; _stream.SetLength(0); From fabaa0a45282441c0c1ab037442aea8ea59d049c Mon Sep 17 00:00:00 2001 From: rameel Date: Wed, 25 Mar 2026 23:48:39 +0500 Subject: [PATCH 4/4] Clean up and formatting --- .../S3UploadStream.cs | 10 +-- .../WritableAmazonFileSystemTests.cs | 88 +++++++++++-------- 2 files changed, 53 insertions(+), 45 deletions(-) diff --git a/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs b/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs index 656cd8a..e1cdf42 100644 --- a/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs +++ b/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs @@ -10,13 +10,11 @@ namespace Ramstack.FileSystem.Amazon; /// /// Represents a stream for uploading data to Amazon S3 using multipart upload. -/// This stream accumulates data in a temporary buffer and uploads it to S3 in parts -/// once the buffer reaches a predefined size. /// internal sealed class S3UploadStream : Stream { // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html - private const long PartSize = 5L * 1024 * 1024; + private const long MinPartSize = 5L * 1024 * 1024; private readonly IAmazonS3 _client; private readonly string _bucketName; @@ -82,7 +80,7 @@ public S3UploadStream(IAmazonS3 client, string bucketName, string key, string up FileShare.None, bufferSize: 4096, FileOptions.DeleteOnClose - | FileOptions.Asynchronous); + | FileOptions.Asynchronous); } /// @@ -103,7 +101,7 @@ public override void Write(ReadOnlySpan buffer) { _stream.Write(buffer); - if (_stream.Length >= PartSize) + if (_stream.Length >= MinPartSize) UploadPart(); } catch (Exception exception) @@ -123,7 +121,7 @@ public override async ValueTask WriteAsync(ReadOnlyMemory buffer, Cancella try { await _stream.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); - if (_stream.Length >= PartSize) + if (_stream.Length >= MinPartSize) await UploadPartAsync(cancellationToken).ConfigureAwait(false); } catch (Exception exception) diff --git a/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs b/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs index 706e889..7d2e8c4 100644 --- a/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs +++ b/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs @@ -193,13 +193,12 @@ await reader.ReadToEndAsync(), await destination.DeleteAsync(); } - [Test] public async Task File_OpenWrite_FlushDoesNotCauseUndersizedParts() { using var fs = GetFileSystem(); - var content = "Hello, World!"; + const string Content = "Hello, World!"; { await using var stream = await fs.OpenWriteAsync("/flush-test.txt"); @@ -207,23 +206,21 @@ public async Task File_OpenWrite_FlushDoesNotCauseUndersizedParts() // Write small data and flush multiple times. // Flush should be a no-op and not upload undersized parts. - await writer.WriteAsync(content[..5]); - await writer.FlushAsync(); - await writer.WriteAsync(content[5..]); - await writer.FlushAsync(); + foreach (var ch in Content) + { + await writer.WriteAsync(ch); + await writer.FlushAsync(); + } } - { - var file = fs.GetFile("/flush-test.txt"); - Assert.That(await file.ExistsAsync(), Is.True); - // ReSharper disable once UseAwaitUsing - using var stream = await file.OpenReadAsync(); + using var stream = await fs.OpenReadAsync("/flush-test.txt"); using var reader = new StreamReader(stream); - Assert.That(await reader.ReadToEndAsync(), Is.EqualTo(content)); - await file.DeleteAsync(); + Assert.That(await reader.ReadToEndAsync(), Is.EqualTo(Content)); } + + await fs.DeleteFileAsync("/flush-test.txt"); } [Test] @@ -231,44 +228,57 @@ public async Task File_OpenWrite_FlushWithMultipartUpload() { using var fs = GetFileSystem(); - // Write more than 5 MiB to trigger multipart upload, - // with Flush calls between writes. - var chunk = new byte[2 * 1024 * 1024]; + const int Count = 5; + const string FileName = "/flush-multipart-test.bin"; + + var chunk = new byte[3 * 1024 * 1024]; Random.Shared.NextBytes(chunk); { - await using var stream = await fs.OpenWriteAsync("/flush-multipart-test.bin"); + await using var stream = await fs.OpenWriteAsync(FileName); + for (var i = 0; i < Count; i++) + await stream.WriteAsync(chunk); + } + + { + var file = fs.GetFile(FileName); + + Assert.That(await file.ExistsAsync(), Is.True); + Assert.That(await file.GetLengthAsync(), Is.EqualTo(chunk.Length * Count)); - // Write 4 chunks (8 MiB total) with flushes in between. - // Without the fix, each flush would upload an undersized part - // and CompleteMultipartUpload would fail. - for (var i = 0; i < 4; i++) + // ReSharper disable once UseAwaitUsing + using var stream = await file.OpenReadAsync(); + + var bytes = new byte[chunk.Length]; + + for (var i = 0; i < Count; i++) { - await stream.WriteAsync(chunk); - await stream.FlushAsync(); + var n = await ReadBlockAsync(stream, bytes); + Assert.That(n, Is.EqualTo(bytes.Length)); + + Assert.That( + bytes.AsSpan().SequenceEqual(chunk), + Is.True); } } - var file = fs.GetFile("/flush-multipart-test.bin"); - Assert.That(await file.ExistsAsync(), Is.True); - Assert.That(await file.GetLengthAsync(), Is.EqualTo(chunk.Length * 4)); - - await file.DeleteAsync(); - } + await fs.DeleteFileAsync(FileName); - [Test] - public async Task File_OpenWrite_EmptyFileWithFlush() - { - using var fs = GetFileSystem(); + static async Task ReadBlockAsync(Stream stream, Memory memory) + { + var count = memory.Length; - await using (var stream = await fs.OpenWriteAsync("/empty-flush-test.txt")) - await stream.FlushAsync(); + while (!memory.IsEmpty) + { + var n = await stream.ReadAsync(memory); + if (n == 0) + return 0; - var file = fs.GetFile("/empty-flush-test.txt"); - Assert.That(await file.ExistsAsync(), Is.True); - Assert.That(await file.GetLengthAsync(), Is.EqualTo(0)); + memory = memory[n..]; + } - await file.DeleteAsync(); + return count; + } } [Test]