Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public async IAsyncEnumerable<BlobPartitionLease> ListLeasesAsync([EnumeratorCan
await foreach (Page<Blob> page in this.taskHubContainer.ListBlobsAsync(this.blobDirectoryName, cancellationToken: cancellationToken).AsPages())
{
// Start each of the Tasks in parallel
Task<BlobPartitionLease>[] downloadTasks = page.Values.Select(b => this.DownloadLeaseBlob(b, cancellationToken)).ToArray();
Task<BlobPartitionLease>[] downloadTasks = page.Values.Select(b => DownloadLeaseBlob(b, cancellationToken)).ToArray();

foreach (Task<BlobPartitionLease> downloadTask in downloadTasks)
{
Expand Down Expand Up @@ -134,7 +134,7 @@ public async Task<BlobPartitionLease> GetLeaseAsync(string partitionId, Cancella
Blob leaseBlob = this.taskHubContainer.GetBlobReference(partitionId, this.blobDirectoryName);
if (await leaseBlob.ExistsAsync(cancellationToken))
{
return await this.DownloadLeaseBlob(leaseBlob, cancellationToken);
return await DownloadLeaseBlob(leaseBlob, cancellationToken);
}

return null;
Expand Down Expand Up @@ -315,7 +315,7 @@ async Task<TaskHubInfo> GetTaskHubInfoAsync(CancellationToken cancellationToken)
return null;
}

async Task<BlobPartitionLease> DownloadLeaseBlob(Blob blob, CancellationToken cancellationToken)
static async Task<BlobPartitionLease> DownloadLeaseBlob(Blob blob, CancellationToken cancellationToken)
{
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken);
BlobPartitionLease deserializedLease = Utils.DeserializeFromJson<BlobPartitionLease>(result.Content);
Expand Down
12 changes: 9 additions & 3 deletions src/DurableTask.AzureStorage/Storage/Table.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public async Task<bool> ExistsAsync(CancellationToken cancellationToken = defaul
{
// TODO: Re-evaluate the use of an "Exists" method as it was intentional omitted from the client API
List<TableItem> tables = await this.tableServiceClient
.QueryAsync(filter: $"TableName eq '{tableClient.Name}'", cancellationToken: cancellationToken)
.QueryAsync(filter: $"TableName eq '{this.tableClient.Name}'", cancellationToken: cancellationToken)
.DecorateFailure()
.ToListAsync(cancellationToken);

Expand Down Expand Up @@ -173,9 +173,15 @@ public async Task<TableTransactionResults> ExecuteBatchAsync(IEnumerable<TableTr
return new TableTransactionResults(batchResults, stopwatch.Elapsed);
}

public TableQueryResponse<T> ExecuteQueryAsync<T>(string? filter = null, int? maxPerPage = null, IEnumerable<string>? select = null, CancellationToken cancellationToken = default) where T : class, ITableEntity, new()
public TableQueryResponse<T> ExecuteQueryAsync<T>(
string? filter = null,
int? maxPerPage = null,
IEnumerable<string>? select = null,
CancellationToken cancellationToken = default) where T : class, ITableEntity, new()
{
return new TableQueryResponse<T>(this.tableClient.QueryAsync<T>(filter, maxPerPage, select, cancellationToken).DecorateFailure());
return new TableQueryResponse<T>(
this.tableClient.QueryAsync<T>(filter, maxPerPage, select, cancellationToken).DecorateFailure(),
this.stats);
}
}
}
20 changes: 16 additions & 4 deletions src/DurableTask.AzureStorage/Storage/TableQueryResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,37 @@ namespace DurableTask.AzureStorage.Storage
using System.Threading;
using System.Threading.Tasks;
using Azure;
using DurableTask.AzureStorage.Monitoring;

class TableQueryResponse<T> : AsyncPageable<T> where T : notnull
{
readonly AsyncPageable<T> query;
readonly AzureStorageOrchestrationServiceStats stats;

public TableQueryResponse(AsyncPageable<T> query) =>
public TableQueryResponse(AsyncPageable<T> query, AzureStorageOrchestrationServiceStats stats)
{
this.query = query ?? throw new ArgumentNullException(nameof(query));
this.stats = stats ?? throw new ArgumentNullException(nameof(stats));
}

public override IAsyncEnumerable<Page<T>> AsPages(string? continuationToken = null, int? pageSizeHint = null) =>
this.query.AsPages(continuationToken, pageSizeHint);
public override IAsyncEnumerable<Page<T>> AsPages(string? continuationToken = null, int? pageSizeHint = null)
{
return this.query.AsPages(continuationToken, pageSizeHint);
}

public async Task<TableQueryResults<T>> GetResultsAsync(string? continuationToken = null, int? pageSizeHint = null, CancellationToken cancellationToken = default)
public async Task<TableQueryResults<T>> GetResultsAsync(
string? continuationToken = null,
int? pageSizeHint = null,
CancellationToken cancellationToken = default)
{
var sw = Stopwatch.StartNew();

int pages = 0;
var entities = new List<T>();
await foreach (Page<T> page in this.query.AsPages(continuationToken, pageSizeHint).WithCancellation(cancellationToken))
{
this.stats.TableEntitiesRead.Increment(page.Values.Count);

pages++;
entities.AddRange(page.Values);
}
Expand Down
Loading