Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
from opentelemetry.resourcedetector.gcp_resource_detector import GoogleCloudResourceDetector
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import SERVICE_INSTANCE_ID, SERVICE_NAME, Resource
from opentelemetry.sdk.resources import SERVICE_INSTANCE_ID, SERVICE_NAME, SERVICE_VERSION, Resource
from opentelemetry.sdk.trace.sampling import Sampler
from opentelemetry.trace import get_current_span, span as trace_span

from genkit.core import GENKIT_VERSION
from genkit.core.environment import is_dev_environment
from genkit.core.logging import get_logger
from genkit.core.tracing import add_custom_exporter
Expand Down Expand Up @@ -237,6 +238,9 @@ def _configure_tracing(self) -> None:
return

try:
# Create Firebase-compatible resource for consistent labeling
resource = self._create_firebase_resource()

exporter_kwargs = self._build_exporter_kwargs()
base_exporter = GenkitGCPExporter(**exporter_kwargs) if exporter_kwargs else GenkitGCPExporter()

Expand All @@ -245,35 +249,57 @@ def _configure_tracing(self) -> None:
log_input_and_output=self.log_input_and_output,
project_id=self.project_id,
error_handler=handle_tracing_error,
resource=resource, # Pass resource for consistent attribution
)

add_custom_exporter(trace_exporter, 'gcp_telemetry_server')
except Exception as e:
handle_tracing_error(e)

def _create_firebase_resource(self) -> Resource:
"""Create resource with Firebase-compatible attributes for dashboard recognition.

This matches the resource configuration patterns used by JS and Go implementations
to ensure the Firebase Genkit monitoring dashboard can properly filter and display
telemetry data.
"""
# Base resource with Firebase-required attributes
resource_attributes = {
SERVICE_NAME: 'genkit',
SERVICE_VERSION: GENKIT_VERSION,
SERVICE_INSTANCE_ID: str(uuid.uuid4()),
'type': 'global',
'source': 'py',
'sourceVersion': GENKIT_VERSION,
}

base_resource = Resource.create(resource_attributes)

# Merge with GCP resource detection (matches JS/Go pattern)
# Suppress detector warnings during GCP resource detection
detector_logger = logging.getLogger('opentelemetry.resourcedetector.gcp_resource_detector')
original_level = detector_logger.level
detector_logger.setLevel(logging.ERROR)

try:
gcp_resource = GoogleCloudResourceDetector(raise_on_error=True).detect()
merged_resource = base_resource.merge(gcp_resource)
logger.debug('Successfully merged Firebase base resource with GCP resource detection')
return merged_resource
except Exception as e:
# For detection failure, log and use the base resource
logger.warning('Google Cloud resource detection failed, using base resource', error=str(e))
return base_resource
finally:
detector_logger.setLevel(original_level)

def _configure_metrics(self) -> None:
if self.disable_metrics:
return

try:
resource = Resource.create({
SERVICE_NAME: 'genkit',
SERVICE_INSTANCE_ID: str(uuid.uuid4()),
})

# Suppress detector warnings during GCP resource detection
detector_logger = logging.getLogger('opentelemetry.resourcedetector.gcp_resource_detector')
original_level = detector_logger.level
detector_logger.setLevel(logging.ERROR)

try:
gcp_resource = GoogleCloudResourceDetector(raise_on_error=True).detect()
resource = resource.merge(gcp_resource)
except Exception as e:
# For detection failure log the exception and use the default resource
detector_logger.warning(f'Google Cloud resource detection failed: {e}')
finally:
detector_logger.setLevel(original_level)
# Use Firebase-compatible resource configuration
resource = self._create_firebase_resource()

exporter_kwargs = self._build_exporter_kwargs()
cloud_monitoring_exporter = CloudMonitoringMetricsExporter(**exporter_kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,24 @@ def _write(self, message: str, payload: dict[str, Any], severity: str) -> None:

Args:
message: Log message for fallback logging.
payload: Structured payload with all metadata.
payload: Structured payload with nested metadata.
severity: Cloud Logging severity (INFO, ERROR).
"""
if self._export and self._cloud_logger:
try:
self._cloud_logger.log_struct(payload, severity=severity, labels={'module': 'genkit'})
from google.cloud.logging_v2.resource import Resource as MonitoredResource

# Attach resource for proper dashboard filtering
resource = MonitoredResource(
type='global', labels={'project_id': self._project_id} if self._project_id else {}
)

self._cloud_logger.log_struct(
payload,
severity=severity,
labels={'module': 'genkit'},
resource=resource,
)
except Exception as e:
logger.error('Failed to write to Cloud Logging', error=str(e), message=message)
# Fallback to console
Expand Down Expand Up @@ -224,8 +236,9 @@ def log_structured(self, message: str, metadata: dict[str, Any] | None = None) -
logger.warning(f'[FALLBACK] {message}', **(metadata or {}))
return

payload = metadata.copy() if metadata else {}
payload['message'] = message
# Create nested payload: {message, metadata: {...}, trace fields}
metadata_dict = metadata.copy() if metadata else {}
payload = {'message': message, 'metadata': metadata_dict}
payload.update(self._get_trace_context())
self._write(message, payload, 'INFO')

Expand All @@ -236,8 +249,8 @@ def log_structured_error(self, message: str, metadata: dict[str, Any] | None = N
message: Log message.
metadata: Additional structured metadata.
"""
payload = metadata.copy() if metadata else {}
payload['message'] = message
metadata_dict = metadata.copy() if metadata else {}
payload = {'message': message, 'metadata': metadata_dict}
payload.update(self._get_trace_context())
self._write(message, payload, 'ERROR')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from google.api_core import exceptions as core_exceptions, retry as retries
from google.cloud.trace_v2 import BatchWriteSpansRequest
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

Expand Down Expand Up @@ -154,6 +155,7 @@ def __init__(
log_input_and_output: bool = False,
project_id: str | None = None,
error_handler: Callable[[Exception], None] | None = None,
resource: 'Resource | None' = None,
) -> None:
"""Initialize the GCP adjusting trace exporter.

Expand All @@ -163,7 +165,9 @@ def __init__(
Defaults to False (redact for privacy).
project_id: Optional GCP project ID for log correlation.
error_handler: Optional callback invoked when export errors occur.
resource: Optional OpenTelemetry resource for consistent attribution.
"""
self.resource = resource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The resource parameter is stored in self.resource but it's never used. This means the Firebase-compatible resource attributes will not be applied to traces, and they won't appear correctly in the Firebase dashboard. The resource needs to be attached to the spans before they are exported.

The standard OpenTelemetry approach is to configure the TracerProvider with a Resource. However, Genkit's tracing initialization seems to make this difficult to do from a plugin.

A possible fix within this exporter would be to wrap the span and override its resource property before exporting. This would involve creating a custom ReadableSpan wrapper that merges self.resource with the span's existing resource.

This is a significant issue that prevents the PR from working as intended for traces.

super().__init__(
exporter=exporter,
log_input_and_output=log_input_and_output,
Expand Down
13 changes: 10 additions & 3 deletions py/plugins/google-cloud/tests/tracing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def test_add_gcp_telemetry_disable_traces() -> None:


def test_add_gcp_telemetry_disable_metrics() -> None:
"""Test that disable_metrics=True skips metrics export (JS/Go parity)."""
"""Test that disable_metrics=True skips metrics export but still enables traces with resource detection."""
with (
mock.patch.dict(os.environ, {EnvVar.GENKIT_ENV: GenkitEnvironment.PROD}),
patch('genkit.plugins.google_cloud.telemetry.config.GenkitGCPExporter'),
Expand All @@ -199,17 +199,24 @@ def test_add_gcp_telemetry_disable_metrics() -> None:
patch('genkit.plugins.google_cloud.telemetry.config.PeriodicExportingMetricReader') as mock_reader,
patch('genkit.plugins.google_cloud.telemetry.config.metrics'),
):
# Configure mock detector to return a mock resource
mock_resource = mock.MagicMock()
mock_detector.return_value.detect.return_value = mock_resource

from genkit.plugins.google_cloud.telemetry.tracing import add_gcp_telemetry

# Call with disable_metrics=True (JS/Go: disableMetrics)
add_gcp_telemetry(disable_metrics=True)

# Verify metrics exporter was NOT created
mock_detector.assert_not_called()
# Verify metrics exporters were NOT created (metrics disabled)
mock_metric_exp.assert_not_called()
mock_genkit_metric.assert_not_called()
mock_reader.assert_not_called()

# Verify resource detection was called for traces (Firebase dashboard needs this)
mock_detector.assert_called_once_with(raise_on_error=True)
mock_detector.return_value.detect.assert_called_once()
Comment on lines +216 to +218
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This test correctly verifies that resource detection is performed. However, it doesn't verify that the detected and created resource is actually applied to the exported traces. As noted in another comment, the resource object is currently not being used by the trace exporter, which is a critical bug.

The test should be extended to assert that the exported spans contain the merged resource attributes. This could be done by inspecting the spans passed to the mocked GenkitGCPExporter.export method.



def test_add_gcp_telemetry_custom_metric_interval() -> None:
"""Test that metric_export_interval_ms is passed correctly (JS/Go parity)."""
Expand Down
Loading