From 6a9c5ae906c80a9f19560092f6eae28949c2e617 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Sun, 20 Apr 2025 18:36:34 -0700 Subject: [PATCH] Add missing table storage read metrics --- .../Partitioning/BlobPartitionLeaseManager.cs | 6 +++--- src/DurableTask.AzureStorage/Storage/Table.cs | 12 ++++++++--- .../Storage/TableQueryResponse.cs | 20 +++++++++++++++---- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/src/DurableTask.AzureStorage/Partitioning/BlobPartitionLeaseManager.cs b/src/DurableTask.AzureStorage/Partitioning/BlobPartitionLeaseManager.cs index 932f5bd7f..cb7bb3df2 100644 --- a/src/DurableTask.AzureStorage/Partitioning/BlobPartitionLeaseManager.cs +++ b/src/DurableTask.AzureStorage/Partitioning/BlobPartitionLeaseManager.cs @@ -77,7 +77,7 @@ public async IAsyncEnumerable ListLeasesAsync([EnumeratorCan await foreach (Page page in this.taskHubContainer.ListBlobsAsync(this.blobDirectoryName, cancellationToken: cancellationToken).AsPages()) { // Start each of the Tasks in parallel - Task[] downloadTasks = page.Values.Select(b => this.DownloadLeaseBlob(b, cancellationToken)).ToArray(); + Task[] downloadTasks = page.Values.Select(b => DownloadLeaseBlob(b, cancellationToken)).ToArray(); foreach (Task downloadTask in downloadTasks) { @@ -134,7 +134,7 @@ public async Task GetLeaseAsync(string partitionId, Cancella Blob leaseBlob = this.taskHubContainer.GetBlobReference(partitionId, this.blobDirectoryName); if (await leaseBlob.ExistsAsync(cancellationToken)) { - return await this.DownloadLeaseBlob(leaseBlob, cancellationToken); + return await DownloadLeaseBlob(leaseBlob, cancellationToken); } return null; @@ -315,7 +315,7 @@ async Task GetTaskHubInfoAsync(CancellationToken cancellationToken) return null; } - async Task DownloadLeaseBlob(Blob blob, CancellationToken cancellationToken) + static async Task DownloadLeaseBlob(Blob blob, CancellationToken cancellationToken) { using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken); BlobPartitionLease deserializedLease = Utils.DeserializeFromJson(result.Content); diff --git a/src/DurableTask.AzureStorage/Storage/Table.cs b/src/DurableTask.AzureStorage/Storage/Table.cs index 11e6170f7..e47b49529 100644 --- a/src/DurableTask.AzureStorage/Storage/Table.cs +++ b/src/DurableTask.AzureStorage/Storage/Table.cs @@ -62,7 +62,7 @@ public async Task ExistsAsync(CancellationToken cancellationToken = defaul { // TODO: Re-evaluate the use of an "Exists" method as it was intentional omitted from the client API List tables = await this.tableServiceClient - .QueryAsync(filter: $"TableName eq '{tableClient.Name}'", cancellationToken: cancellationToken) + .QueryAsync(filter: $"TableName eq '{this.tableClient.Name}'", cancellationToken: cancellationToken) .DecorateFailure() .ToListAsync(cancellationToken); @@ -173,9 +173,15 @@ public async Task ExecuteBatchAsync(IEnumerable ExecuteQueryAsync(string? filter = null, int? maxPerPage = null, IEnumerable? select = null, CancellationToken cancellationToken = default) where T : class, ITableEntity, new() + public TableQueryResponse ExecuteQueryAsync( + string? filter = null, + int? maxPerPage = null, + IEnumerable? select = null, + CancellationToken cancellationToken = default) where T : class, ITableEntity, new() { - return new TableQueryResponse(this.tableClient.QueryAsync(filter, maxPerPage, select, cancellationToken).DecorateFailure()); + return new TableQueryResponse( + this.tableClient.QueryAsync(filter, maxPerPage, select, cancellationToken).DecorateFailure(), + this.stats); } } } diff --git a/src/DurableTask.AzureStorage/Storage/TableQueryResponse.cs b/src/DurableTask.AzureStorage/Storage/TableQueryResponse.cs index 901a24d30..c1b1c79e5 100644 --- a/src/DurableTask.AzureStorage/Storage/TableQueryResponse.cs +++ b/src/DurableTask.AzureStorage/Storage/TableQueryResponse.cs @@ -20,18 +20,28 @@ namespace DurableTask.AzureStorage.Storage using System.Threading; using System.Threading.Tasks; using Azure; + using DurableTask.AzureStorage.Monitoring; class TableQueryResponse : AsyncPageable where T : notnull { readonly AsyncPageable query; + readonly AzureStorageOrchestrationServiceStats stats; - public TableQueryResponse(AsyncPageable query) => + public TableQueryResponse(AsyncPageable query, AzureStorageOrchestrationServiceStats stats) + { this.query = query ?? throw new ArgumentNullException(nameof(query)); + this.stats = stats ?? throw new ArgumentNullException(nameof(stats)); + } - public override IAsyncEnumerable> AsPages(string? continuationToken = null, int? pageSizeHint = null) => - this.query.AsPages(continuationToken, pageSizeHint); + public override IAsyncEnumerable> AsPages(string? continuationToken = null, int? pageSizeHint = null) + { + return this.query.AsPages(continuationToken, pageSizeHint); + } - public async Task> GetResultsAsync(string? continuationToken = null, int? pageSizeHint = null, CancellationToken cancellationToken = default) + public async Task> GetResultsAsync( + string? continuationToken = null, + int? pageSizeHint = null, + CancellationToken cancellationToken = default) { var sw = Stopwatch.StartNew(); @@ -39,6 +49,8 @@ public async Task> GetResultsAsync(string? continuationToke var entities = new List(); await foreach (Page page in this.query.AsPages(continuationToken, pageSizeHint).WithCancellation(cancellationToken)) { + this.stats.TableEntitiesRead.Increment(page.Values.Count); + pages++; entities.AddRange(page.Values); }