From a43b0d95604fb75e5e1da6284d8d60919e7cf87e Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Wed, 3 Jun 2026 18:45:59 +0300 Subject: [PATCH 1/2] Fix Dataflow cost benchmark after Dataflow client migration --- .../load_tests/dataflow_cost_benchmark.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py b/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py index 3300831b8ea1..c24954bc911e 100644 --- a/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py +++ b/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py @@ -141,7 +141,7 @@ def _process_metrics_list(self, return system_metrics def _get_worker_time_interval( - self, job_id: str) -> tuple[Optional[str], Optional[str]]: + self, job_id: str) -> tuple[Optional[datetime], Optional[datetime]]: """Extracts worker start and stop times from job messages.""" start_time, end_time = None, None page_token = None @@ -155,7 +155,7 @@ def _get_worker_time_interval( page_token=page_token, minimum_importance='JOB_MESSAGE_DEBUG') for message in messages: - text = message.messageText + text = message.message_text if getattr(message, 'time', None): last_message_time = message.time if text: @@ -186,8 +186,8 @@ def _get_throughput_metrics( self, project: str, job_id: str, - start_time: str, - end_time: str, + start_time: datetime, + end_time: datetime, pcollection_name: Optional[str] = None, ) -> dict[str, float]: """Query Cloud Monitoring for per-PCollection throughput.""" @@ -256,7 +256,10 @@ def _point_numeric_value(point) -> float: return metrics def _get_streaming_throughput_metrics( - self, project: str, start_time: str, end_time: str) -> dict[str, float]: + self, + project: str, + start_time: datetime, + end_time: datetime) -> dict[str, float]: if not self.subscription: return {'AvgThroughputBytes': 0.0, 'AvgThroughputElements': 0.0} @@ -297,17 +300,14 @@ def _get_streaming_throughput_metrics( metrics[f"AvgThroughput{key}"] = avg_rate return metrics - def _get_job_runtime(self, start_time: str, end_time: str) -> float: + def _get_job_runtime(self, start_time: datetime, end_time: datetime) -> float: """Calculates the job runtime duration in seconds.""" - start_dt = datetime.fromisoformat(start_time[:-1]) - end_dt = datetime.fromisoformat(end_time[:-1]) - return (end_dt - start_dt).total_seconds() + return (end_time - start_time).total_seconds() def _get_additional_metrics(self, result: DataflowPipelineResult) -> dict[str, Any]: job_id = result.job_id() - job = self.dataflow_client.get_job(job_id) - project = job.projectId + project = self.project_id start_time, end_time = self._get_worker_time_interval(job_id) if not start_time or not end_time: logging.warning('Could not find valid worker start/end times.') From b1774ce7bc6452c0f0fe9488aa89f9540efa374b Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Wed, 3 Jun 2026 22:29:34 +0300 Subject: [PATCH 2/2] Fix formatter --- .../apache_beam/testing/load_tests/dataflow_cost_benchmark.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py b/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py index c24954bc911e..4d412eb228ec 100644 --- a/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py +++ b/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py @@ -256,9 +256,7 @@ def _point_numeric_value(point) -> float: return metrics def _get_streaming_throughput_metrics( - self, - project: str, - start_time: datetime, + self, project: str, start_time: datetime, end_time: datetime) -> dict[str, float]: if not self.subscription: return {'AvgThroughputBytes': 0.0, 'AvgThroughputElements': 0.0}