Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 63 additions & 64 deletions src/Ramstack.FileSystem.Amazon/S3UploadStream.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.ExceptionServices;

using Amazon.S3;
using Amazon.S3.Model;
Expand All @@ -23,7 +22,7 @@ internal sealed class S3UploadStream : Stream
private readonly FileStream _stream;
private readonly List<PartETag> _partETags;

private bool _disposed;
private volatile int _disposed;

/// <inheritdoc />
public override bool CanRead => false;
Expand Down Expand Up @@ -163,13 +162,11 @@ protected override void Dispose(bool disposing)
/// <inheritdoc />
public override async ValueTask DisposeAsync()
{
if (_disposed)
if (Interlocked.Exchange(ref _disposed, 1) != 0)
return;

try
{
_disposed = true;

await UploadPartAsync(CancellationToken.None).ConfigureAwait(false);

var request = new CompleteMultipartUploadRequest
Expand All @@ -184,16 +181,24 @@ await _client
.CompleteMultipartUploadAsync(request)
.ConfigureAwait(false);
}
catch (Exception exception)
catch
{
await AbortAsync(CancellationToken.None).ConfigureAwait(false);
ExceptionDispatchInfo.Throw(exception);
throw;
}
finally
{
await _stream
.DisposeAsync()
.ConfigureAwait(false);
try
{
await _stream.DisposeAsync().ConfigureAwait(false);
}
catch
{
// Ignore:
// Errors when disposing the temporary buffer are not significant here, because:
// 1) If the upload succeeded, the job is done; a cleanup error can be ignored.
// 2) If the upload failed, preserving the original exception is more important for us.
}
}
}

Expand All @@ -216,49 +221,35 @@ private void UploadPart()
/// </returns>
private async ValueTask UploadPartAsync(CancellationToken cancellationToken)
{
// Upload an empty part if nothing has been uploaded yet,
// since we must specify at least one part.

if (_stream.Length != 0 || _partETags.Count == 0)
_stream.Position = 0;

// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
// The maximum allowed part size is 5 GiB.
// -----------------------------------------------------------------------------------
// We don't need to worry about S3's 5 GiB part limit because:
// 1. All Write/WriteAsync methods are inherently limited by Array.MaxLength (~2 GiB).
// 2. The upload starts as soon as the buffer reaches MinPartSize (5 MiB).
// Even if a single write matches Array.MaxLength, the data is
// uploaded immediately, staying within AWS limits.

var request = new UploadPartRequest
{
try
{
_stream.Position = 0;

// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
// The maximum allowed part size is 5 GiB.
// -----------------------------------------------------------------------------------
// We don't need to worry about S3's 5 GiB part limit because:
// 1. All Write/WriteAsync methods are inherently limited by Array.MaxLength (~2 GiB).
// 2. The upload starts as soon as the buffer reaches MinPartSize (5 MiB).
// Even if a single write matches Array.MaxLength, the data is
// uploaded immediately, staying within AWS limits.

var request = new UploadPartRequest
{
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId,
PartNumber = _partETags.Count + 1,
InputStream = _stream,
PartSize = _stream.Length
};
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId,
PartNumber = _partETags.Count + 1,
InputStream = _stream,
PartSize = _stream.Length
};

var response = await _client
.UploadPartAsync(request, cancellationToken)
.ConfigureAwait(false);
var response = await _client
.UploadPartAsync(request, cancellationToken)
.ConfigureAwait(false);

_partETags.Add(new PartETag(response));
_partETags.Add(new PartETag(response));

_stream.Position = 0;
_stream.SetLength(0);
}
catch
{
await AbortAsync(cancellationToken).ConfigureAwait(false);
throw;
}
}
_stream.Position = 0;
_stream.SetLength(0);
}

/// <summary>
Expand Down Expand Up @@ -289,23 +280,31 @@ private void Abort()
/// </remarks>
private async ValueTask AbortAsync(CancellationToken cancellationToken)
{
var request = new AbortMultipartUploadRequest
{
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId
};

await _client
.AbortMultipartUploadAsync(request, cancellationToken)
.ConfigureAwait(false);
if (Interlocked.Exchange(ref _disposed, 1) != 0)
return;

_disposed = true;
try
{
await using (_stream.ConfigureAwait(false))
{
var request = new AbortMultipartUploadRequest
{
BucketName = _bucketName,
Key = _key,
UploadId = _uploadId
};

// Prevent subsequent writes to the stream.
await _stream
.DisposeAsync()
.ConfigureAwait(false);
await _client
.AbortMultipartUploadAsync(request, cancellationToken)
.ConfigureAwait(false);
}
}
catch
{
// IGNORE:
// Suppressing the exception during abort to preserve
// the original exception that triggered the abort.
}
}

/// <summary>
Expand Down
Loading