Skip to content

Reduce code duplication across sync/async in ZIP and TAR #127376

@rzikm

Description

@rzikm

Current TarReader/Writer and ZipArchive contain lots of logic, that is duplicated across the sync and async API implementations. See e.g.

// Moves the underlying archive stream position pointer to the beginning of the next header.
internal void AdvanceDataStreamIfNeeded()
{
if (_previouslyReadEntry == null)
{
return;
}
if (_archiveStream.CanSeek)
{
Debug.Assert(_previouslyReadEntry._header._endOfHeaderAndDataAndBlockAlignment > 0);
_archiveStream.Position = _previouslyReadEntry._header._endOfHeaderAndDataAndBlockAlignment;
}
else if (_previouslyReadEntry._header._size > 0)
{
// When working with seekable streams, every time we return an entry, we avoid advancing the pointer beyond the data section
// This is so the user can read the data if desired. But if the data was not read by the user, we need to advance the pointer
// here until it's located at the beginning of the next entry header.
// This should only be done if the previous entry came from a TarReader and it still had its original SubReadStream or SeekableSubReadStream.
if (_previouslyReadEntry._header._dataStream is not SubReadStream dataStream)
{
return;
}
dataStream.AdvanceToEnd();
TarHelpers.SkipBlockAlignmentPadding(_archiveStream, _previouslyReadEntry._header._size);
}
}
// Asynchronously moves the underlying archive stream position pointer to the beginning of the next header.
internal async ValueTask AdvanceDataStreamIfNeededAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
if (_previouslyReadEntry == null)
{
return;
}
if (_archiveStream.CanSeek)
{
Debug.Assert(_previouslyReadEntry._header._endOfHeaderAndDataAndBlockAlignment > 0);
_archiveStream.Position = _previouslyReadEntry._header._endOfHeaderAndDataAndBlockAlignment;
}
else if (_previouslyReadEntry._header._size > 0)
{
// When working with seekable streams, every time we return an entry, we avoid advancing the pointer beyond the data section
// This is so the user can read the data if desired. But if the data was not read by the user, we need to advance the pointer
// here until it's located at the beginning of the next entry header.
// This should only be done if the previous entry came from a TarReader and it still had its original SubReadStream or SeekableSubReadStream.
if (_previouslyReadEntry._header._dataStream is not SubReadStream dataStream)
{
return;
}
await dataStream.AdvanceToEndAsync(cancellationToken).ConfigureAwait(false);
await TarHelpers.SkipBlockAlignmentPaddingAsync(_archiveStream, _previouslyReadEntry._header._size, cancellationToken).ConfigureAwait(false);
}
}

Similarly, when introducing new tests, the developers often resort to duplicating the same test across sync and async variants, see e.g. TarWriter_WriteEntryAsync_File_Tests and TarWriter_WriteEntry_File_Tests. This risks increased regressions where bugs may be fixed in one code path but not the other (and not caught because only one variant of test was added). Deduplicating the logic would decrease developer toil and reduce the risk of regressions.

Since the code generallly boils down to calling a sync/async variant of a Read or Write method on a Stream, we can use the same technique of deduplicating the implementation as we did in SslStream and SmtpClient, see e.g. #115366.

IReadWriteAdapter

internal interface IReadWriteAdapter
{
static abstract ValueTask<int> ReadAsync(Stream stream, Memory<byte> buffer, CancellationToken cancellationToken);
static abstract ValueTask<int> ReadAtLeastAsync(Stream stream, Memory<byte> buffer, int minimumBytes, bool throwOnEndOfStream, CancellationToken cancellationToken);
static abstract ValueTask WriteAsync(Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken);
static abstract Task FlushAsync(Stream stream, CancellationToken cancellationToken);
static abstract Task WaitAsync(TaskCompletionSource<bool> waiter);
static abstract Task WaitAsync(Task task);
static abstract ValueTask<T> WaitAsync<T>(ValueTask<T> task);
}
internal readonly struct AsyncReadWriteAdapter : IReadWriteAdapter
{
public static ValueTask<int> ReadAsync(Stream stream, Memory<byte> buffer, CancellationToken cancellationToken) =>
stream.ReadAsync(buffer, cancellationToken);
public static ValueTask<int> ReadAtLeastAsync(Stream stream, Memory<byte> buffer, int minimumBytes, bool throwOnEndOfStream, CancellationToken cancellationToken) =>
stream.ReadAtLeastAsync(buffer, minimumBytes, throwOnEndOfStream, cancellationToken);
public static ValueTask WriteAsync(Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken) =>
stream.WriteAsync(buffer, cancellationToken);
public static Task FlushAsync(Stream stream, CancellationToken cancellationToken) => stream.FlushAsync(cancellationToken);
public static Task WaitAsync(TaskCompletionSource<bool> waiter) => waiter.Task;
public static Task WaitAsync(Task task) => task;
public static ValueTask<T> WaitAsync<T>(ValueTask<T> task) => task;
}
internal readonly struct SyncReadWriteAdapter : IReadWriteAdapter
{
public static ValueTask<int> ReadAsync(Stream stream, Memory<byte> buffer, CancellationToken cancellationToken) =>
new ValueTask<int>(stream.Read(buffer.Span));
public static ValueTask<int> ReadAtLeastAsync(Stream stream, Memory<byte> buffer, int minimumBytes, bool throwOnEndOfStream, CancellationToken cancellationToken) =>
new ValueTask<int>(stream.ReadAtLeast(buffer.Span, minimumBytes, throwOnEndOfStream));
public static ValueTask WriteAsync(Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
stream.Write(buffer.Span);
return default;
}
public static Task FlushAsync(Stream stream, CancellationToken cancellationToken)
{
stream.Flush();
return Task.CompletedTask;
}
public static Task WaitAsync(TaskCompletionSource<bool> waiter)
{
waiter.Task.GetAwaiter().GetResult();
return Task.CompletedTask;
}
public static Task WaitAsync(Task task)
{
task.GetAwaiter().GetResult();
return Task.CompletedTask;
}
public static ValueTask<T> WaitAsync<T>(ValueTask<T> task)
{
return ValueTask.FromResult(task.AsTask().GetAwaiter().GetResult());
}
}

Usage example:

internal async Task<LineInfo[]> ReadLinesAsync<TIOAdapter>(SmtpReplyReader caller, bool oneLine = false, CancellationToken cancellationToken = default) where TIOAdapter : IReadWriteAdapter
{
if (caller != _currentReader || _readState == ReadState.Done)
{
return Array.Empty<LineInfo>();
}
_byteBuffer ??= new byte[DefaultBufferSize];
System.Diagnostics.Debug.Assert(_readState == ReadState.Status0);
var builder = new StringBuilder();
var lines = new List<LineInfo>();
int statusRead = 0;
int start = 0;
int read = 0;
while (true)
{
if (start == read)
{
start = 0;
read = await TIOAdapter.ReadAsync(_bufferedStream, _byteBuffer, cancellationToken).ConfigureAwait(false);
if (read == 0)
{
throw new IOException(SR.Format(SR.net_io_readfailure, SR.net_io_connectionclosed));
}
}
int actual = ProcessRead(_byteBuffer!.AsSpan(start, read - start), true);

public override int Read(byte[] buffer, int offset, int count)
{
ThrowIfExceptionalOrNotAuthenticated();
ValidateBufferArguments(buffer, offset, count);
ValueTask<int> vt = ReadAsyncInternal<SyncReadWriteAdapter>(new Memory<byte>(buffer, offset, count), default(CancellationToken));
Debug.Assert(vt.IsCompleted, "Sync operation must have completed synchronously");
return vt.GetAwaiter().GetResult();
}

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ThrowIfExceptionalOrNotAuthenticated();
ValidateBufferArguments(buffer, offset, count);
return ReadAsyncInternal<AsyncReadWriteAdapter>(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions