Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 65 additions & 28 deletions google/cloud/spanner_v1/metrics/metrics_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,23 @@

"""Interceptor for collecting Cloud Spanner metrics."""

import logging

from grpc_interceptor import ClientInterceptor
from .constants import (
GOOGLE_CLOUD_RESOURCE_KEY,
METRIC_LABEL_KEY_DATABASE,
MONITORED_RES_LABEL_KEY_INSTANCE,
MONITORED_RES_LABEL_KEY_PROJECT,
SPANNER_METHOD_PREFIX,
)

from typing import Dict
from .spanner_metrics_tracer_factory import SpannerMetricsTracerFactory
import re

_logger = logging.getLogger(__name__)


class MetricsInterceptor(ClientInterceptor):
"""Interceptor that collects metrics for Cloud Spanner operations."""
Expand Down Expand Up @@ -92,27 +99,48 @@ def _set_metrics_tracer_attributes(self, resources: Dict[str, str]) -> None:
"""
Sets the metric tracer attributes based on the provided resources.

This method updates the current metric tracer's attributes with the project, instance, and database information extracted from the resources dictionary. If the current metric tracer is not set, the method does nothing.
This method updates the current metric tracer's attributes with the
project, instance, and database information extracted from the resources
dictionary. If the current metric tracer is not set, the method does
nothing.

Before setting each attribute, any existing value for that key is removed
from the current tracer's client_attributes to ensure the set_* methods
(which use set-once semantics) can overwrite stale values from previous
operations.

Additionally, this method updates the factory's client attributes for
project and instance to ensure these values are available for subsequent
operations. Database is not propagated to the factory because each
Spanner RPC may target a different database within the same instance.

Args:
resources (Dict[str, str]): A dictionary containing project, instance, and database information.
resources (Dict[str, str]): A dictionary containing project,
instance, and database information.
"""
if SpannerMetricsTracerFactory.current_metrics_tracer is None:
return

if resources:
tracer = SpannerMetricsTracerFactory.current_metrics_tracer
factory = SpannerMetricsTracerFactory()

# For each resource key, remove the existing value from
# client_attributes so the tracer's set_* method (which only
# writes if the key is absent) will accept the fresh value.
if "project" in resources:
SpannerMetricsTracerFactory.current_metrics_tracer.set_project(
resources["project"]
)
tracer.client_attributes.pop(MONITORED_RES_LABEL_KEY_PROJECT, None)
tracer.set_project(resources["project"])
factory.set_project(resources["project"])

if "instance" in resources:
SpannerMetricsTracerFactory.current_metrics_tracer.set_instance(
resources["instance"]
)
tracer.client_attributes.pop(MONITORED_RES_LABEL_KEY_INSTANCE, None)
tracer.set_instance(resources["instance"])
factory.set_instance(resources["instance"])

if "database" in resources:
SpannerMetricsTracerFactory.current_metrics_tracer.set_database(
resources["database"]
)
tracer.client_attributes.pop(METRIC_LABEL_KEY_DATABASE, None)
tracer.set_database(resources["database"])

def intercept(self, invoked_method, request_or_iterator, call_details):
"""Intercept gRPC calls to collect metrics.
Expand All @@ -133,24 +161,33 @@ def intercept(self, invoked_method, request_or_iterator, call_details):
return invoked_method(request_or_iterator, call_details)

# Setup Metric Tracer attributes from call details
## Extract Project / Instance / Databse from header information
resources = self._extract_resource_from_path(call_details.metadata)
self._set_metrics_tracer_attributes(resources)
try:
## Extract Project / Instance / Database from header information
resources = self._extract_resource_from_path(call_details.metadata)
self._set_metrics_tracer_attributes(resources)

## Format method to be be spanner.<method name>
method_name = self._remove_prefix(
call_details.method, SPANNER_METHOD_PREFIX
).replace("/", ".")
## Format method to be spanner.<method name>
method_name = self._remove_prefix(
call_details.method, SPANNER_METHOD_PREFIX
).replace("/", ".")

SpannerMetricsTracerFactory.current_metrics_tracer.set_method(method_name)
SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_start()
except Exception:
_logger.warning("Failed to set up metrics tracer attributes", exc_info=True)

SpannerMetricsTracerFactory.current_metrics_tracer.set_method(method_name)
SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_start()
response = invoked_method(request_or_iterator, call_details)
SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_completion()

# Process and send GFE metrics if enabled
if SpannerMetricsTracerFactory.current_metrics_tracer.gfe_enabled:
metadata = response.initial_metadata()
SpannerMetricsTracerFactory.current_metrics_trace.record_gfe_metrics(
metadata
)

try:
SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_completion()

# Process and send GFE metrics if enabled
if SpannerMetricsTracerFactory.current_metrics_tracer.gfe_enabled:
metadata = response.initial_metadata()
SpannerMetricsTracerFactory.current_metrics_tracer.record_gfe_metrics(
metadata
)
except Exception:
_logger.warning("Failed to record metrics", exc_info=True)

return response
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def __new__(
cls._metrics_tracer_factory.gfe_enabled = gfe_enabled

if cls._metrics_tracer_factory.enabled != enabled:
cls._metrics_tracer_factory.enabeld = enabled
cls._metrics_tracer_factory.enabled = enabled

return cls._metrics_tracer_factory

Expand Down
Loading
Loading