From d00ae37d9f5861e1edf35374a9178007a53f4673 Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 20 Mar 2026 18:35:06 +0000 Subject: [PATCH 01/10] build: add OpenTelemetry shaded and unshaded dependencies to pom.xml --- .../google-cloud-bigquery-jdbc/pom.xml | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml index b22f569e174e..cb687e934879 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml +++ b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml @@ -131,6 +131,10 @@ io com.google.bqjdbc.shaded.io + + io.opentelemetry.api.* + io.opentelemetry.context.* + @@ -277,6 +281,42 @@ httpcore5 + + + io.opentelemetry + opentelemetry-api + 1.37.0 + + + io.opentelemetry + opentelemetry-context + 1.37.0 + + + + + io.opentelemetry + opentelemetry-sdk + 1.37.0 + + + com.google.cloud.opentelemetry + exporter-trace + 0.33.0 + + + + + org.slf4j + slf4j-api + 2.0.16 + + + com.google.re2j + re2j + 1.8 + + com.google.truth From 01734d17ba166bf8c96ff0290928cef649eda823 Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 20 Mar 2026 18:36:01 +0000 Subject: [PATCH 02/10] feat: add openTelemetry connection properties --- .../bigquery/jdbc/BigQueryConnection.java | 19 +++ .../jdbc/BigQueryJdbcOpenTelemetry.java | 119 ++++++++++++++++++ .../bigquery/jdbc/BigQueryJdbcUrlUtility.java | 16 +++ .../cloud/bigquery/jdbc/DataSource.java | 38 ++++++ 4 files changed, 192 insertions(+) create mode 100644 java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java index 17471e252205..808506434469 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java @@ -41,6 +41,8 @@ import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; import com.google.cloud.http.HttpTransportOptions; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Tracer; import java.io.IOException; import java.io.InputStream; import java.sql.CallableStatement; @@ -138,6 +140,9 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { Long connectionPoolSize; Long listenerPoolSize; String partnerToken; + boolean enableOpenTelemetry; + String openTelemetryExporter; + Tracer tracer = OpenTelemetry.noop().getTracer(""); BigQueryConnection(String url) throws IOException { this(url, DataSource.fromUrl(url)); @@ -242,6 +247,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { this.connectionPoolSize = ds.getConnectionPoolSize(); this.listenerPoolSize = ds.getListenerPoolSize(); this.partnerToken = ds.getPartnerToken(); + this.enableOpenTelemetry = ds.getEnableOpenTelemetry(); + this.openTelemetryExporter = ds.getOpenTelemetryExporter(); this.headerProvider = createHeaderProvider(); this.bigQuery = getBigQueryConnection(); @@ -935,6 +942,14 @@ private BigQuery getBigQueryConnection() { bigQueryOptions.setTransportOptions(this.httpTransportOptions); } + OpenTelemetry openTelemetry = + BigQueryJdbcOpenTelemetry.getOpenTelemetry(this.enableOpenTelemetry, this.openTelemetryExporter); + if (this.enableOpenTelemetry) { + this.tracer = BigQueryJdbcOpenTelemetry.getTracer(openTelemetry); + bigQueryOptions.setOpenTelemetryTracer(this.tracer); + BigQueryJdbcOpenTelemetry.attachLoggingBridge(); + } + BigQueryOptions options = bigQueryOptions.setHeaderProvider(this.headerProvider).build(); options.setDefaultJobCreationMode( this.useStatelessQueryMode @@ -1083,4 +1098,8 @@ public CallableStatement prepareCall( } return prepareCall(sql); } + + public Tracer getTracer() { + return this.tracer; + } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java new file mode 100644 index 000000000000..7b15f0ca5377 --- /dev/null +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java @@ -0,0 +1,119 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.jdbc; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import com.google.cloud.opentelemetry.trace.TraceConfiguration; +import com.google.cloud.opentelemetry.trace.TraceExporter; +import java.io.IOException; + +public class BigQueryJdbcOpenTelemetry { + + private static final BigQueryJdbcCustomLogger LOG = new BigQueryJdbcCustomLogger(BigQueryJdbcOpenTelemetry.class.getName()); + private static final Object lock = new Object(); + private static volatile OpenTelemetrySdk autoConfiguredOpenTelemetry; + private static volatile boolean initialized = false; + private static final String INSTRUMENTATION_SCOPE_NAME = "com.google.cloud.bigquery.jdbc"; + + /** + * Initializes or returns the OpenTelemetry instance based on hybrid logic. + * Prefer GlobalOpenTelemetry; fallback to an auto-configured GCP exporter if requested. + */ + public static OpenTelemetry getOpenTelemetry(boolean enableOpenTelemetry, String exporterType) { + if (!enableOpenTelemetry) { + return OpenTelemetry.noop(); + } + + OpenTelemetry globalOtel = GlobalOpenTelemetry.get(); + if ("gcp".equalsIgnoreCase(exporterType)) { + return getAutoConfiguredOpenTelemetry(); + } + + return globalOtel; + } + + /** + * Gets a Tracer for the JDBC driver instrumentation scope. + */ + public static Tracer getTracer(OpenTelemetry openTelemetry) { + return openTelemetry.getTracer(INSTRUMENTATION_SCOPE_NAME); + } + + /** + * TODO(b/491245568): Attaches the OpenTelemetry logging bridge to the root java.util.logging.Logger. + * This is currently a no-op due to shading issues with `opentelemetry-appender-jul`. + */ + public static void attachLoggingBridge() { + // No-op for now. + } + + private static OpenTelemetry getAutoConfiguredOpenTelemetry() { + if (!initialized) { + synchronized (lock) { + if (!initialized) { + try { + autoConfiguredOpenTelemetry = initGcpOpenTelemetry(); + } catch (Exception e) { + LOG.warning("Failed to initialize OpenTelemetry with GCP exporter: " + e.getMessage()); + autoConfiguredOpenTelemetry = null; + } finally { + initialized = true; + } + } + } + } + return autoConfiguredOpenTelemetry != null ? autoConfiguredOpenTelemetry : OpenTelemetry.noop(); + } + + private static OpenTelemetrySdk initGcpOpenTelemetry() throws IOException { + LOG.info("Initializing BigQuery JDBC standalone OpenTelemetry SDK with GCP exporter."); + + SpanExporter traceExporter; + try { + TraceConfiguration configuration = TraceConfiguration.builder().build(); + traceExporter = TraceExporter.createWithConfiguration(configuration); + } catch (Exception e) { + throw new RuntimeException("Could not create TraceExporter. Ensure exporter-trace is on classpath.", e); + } + + SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(BatchSpanProcessor.builder(traceExporter).build()) + .build(); + + OpenTelemetrySdk sdk = + OpenTelemetrySdk.builder() + .setTracerProvider(tracerProvider) + .build(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + tracerProvider.close(); + } catch (Exception e) { + LOG.warning("Error closing tracer provider: " + e.getMessage()); + } + })); + + return sdk; + } +} diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java index 5b89cf27eecf..daab178f69dc 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java @@ -162,6 +162,10 @@ protected boolean removeEldestEntry(Map.Entry> eldes static final int DEFAULT_SWA_APPEND_ROW_COUNT_VALUE = 1000; static final String SWA_ACTIVATION_ROW_COUNT_PROPERTY_NAME = "SWA_ActivationRowCount"; static final int DEFAULT_SWA_ACTIVATION_ROW_COUNT_VALUE = 3; + static final String ENABLE_OPENTELEMETRY_PROPERTY_NAME = "EnableOpenTelemetry"; + static final boolean DEFAULT_ENABLE_OPENTELEMETRY_VALUE = false; + static final String OPENTELEMETRY_EXPORTER_PROPERTY_NAME = "OpenTelemetryExporter"; + static final String DEFAULT_OPENTELEMETRY_EXPORTER_VALUE = "none"; private static final BigQueryJdbcCustomLogger LOG = new BigQueryJdbcCustomLogger(BigQueryJdbcUrlUtility.class.getName()); static final String FILTER_TABLES_ON_DEFAULT_DATASET_PROPERTY_NAME = @@ -607,6 +611,18 @@ protected boolean removeEldestEntry(Map.Entry> eldes .setDescription( "Reason for the request, which is passed as the x-goog-request-reason" + " header.") + .build(), + BigQueryConnectionProperty.newBuilder() + .setName(ENABLE_OPENTELEMETRY_PROPERTY_NAME) + .setDescription( + "Enables or disables OpenTelemetry features in the Driver. Disabled by default.") + .setDefaultValue(String.valueOf(DEFAULT_ENABLE_OPENTELEMETRY_VALUE)) + .build(), + BigQueryConnectionProperty.newBuilder() + .setName(OPENTELEMETRY_EXPORTER_PROPERTY_NAME) + .setDescription( + "Specifies the auto-configured OpenTelemetry exporter (e.g., gcp).") + .setDefaultValue(DEFAULT_OPENTELEMETRY_EXPORTER_VALUE) .build()))); private static final List NETWORK_PROPERTIES = diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java index 681595f8b05c..b268eb5b0d87 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java @@ -113,6 +113,8 @@ public class DataSource implements javax.sql.DataSource { private String privateServiceConnect; private Long connectionPoolSize; private Long listenerPoolSize; + private Boolean enableOpenTelemetry; + private String openTelemetryExporter; // Make sure the JDBC driver class is loaded. static { @@ -324,6 +326,13 @@ public class DataSource implements javax.sql.DataSource { .put( BigQueryJdbcUrlUtility.LISTENER_POOL_SIZE_PROPERTY_NAME, (ds, val) -> ds.setListenerPoolSize(Long.parseLong(val))) + .put( + BigQueryJdbcUrlUtility.ENABLE_OPENTELEMETRY_PROPERTY_NAME, + (ds, val) -> + ds.setEnableOpenTelemetry( + BigQueryJdbcUrlUtility.convertIntToBoolean( + val, BigQueryJdbcUrlUtility.ENABLE_OPENTELEMETRY_PROPERTY_NAME))) + .put(BigQueryJdbcUrlUtility.OPENTELEMETRY_EXPORTER_PROPERTY_NAME, DataSource::setOpenTelemetryExporter) .build(); public static DataSource fromUrl(String url) { @@ -616,6 +625,15 @@ private Properties createProperties() { BigQueryJdbcUrlUtility.LISTENER_POOL_SIZE_PROPERTY_NAME, String.valueOf(this.listenerPoolSize)); } + if (this.enableOpenTelemetry != null) { + connectionProperties.setProperty( + BigQueryJdbcUrlUtility.ENABLE_OPENTELEMETRY_PROPERTY_NAME, + String.valueOf(this.enableOpenTelemetry)); + } + if (this.openTelemetryExporter != null) { + connectionProperties.setProperty( + BigQueryJdbcUrlUtility.OPENTELEMETRY_EXPORTER_PROPERTY_NAME, this.openTelemetryExporter); + } return connectionProperties; } @@ -737,6 +755,26 @@ public void setListenerPoolSize(Long listenerPoolSize) { this.listenerPoolSize = listenerPoolSize; } + public Boolean getEnableOpenTelemetry() { + return enableOpenTelemetry != null + ? enableOpenTelemetry + : BigQueryJdbcUrlUtility.DEFAULT_ENABLE_OPENTELEMETRY_VALUE; + } + + public void setEnableOpenTelemetry(Boolean enableOpenTelemetry) { + this.enableOpenTelemetry = enableOpenTelemetry; + } + + public String getOpenTelemetryExporter() { + return openTelemetryExporter != null + ? openTelemetryExporter + : BigQueryJdbcUrlUtility.DEFAULT_OPENTELEMETRY_EXPORTER_VALUE; + } + + public void setOpenTelemetryExporter(String openTelemetryExporter) { + this.openTelemetryExporter = openTelemetryExporter; + } + public void setHighThroughputMinTableSize(Integer highThroughputMinTableSize) { this.highThroughputMinTableSize = highThroughputMinTableSize; } From b743313c2b2c263e3e0f54154113b09aa42c89d1 Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 20 Mar 2026 18:36:12 +0000 Subject: [PATCH 03/10] feat: add basic OpenTelemetry tracing hooks to BigQuery JDBC statements --- .../bigquery/jdbc/BigQueryStatement.java | 315 ++++++++++++------ 1 file changed, 210 insertions(+), 105 deletions(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index ca579d1d0c1b..6852e10cdc08 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -57,11 +57,18 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.util.concurrent.Uninterruptibles; +import io.opentelemetry.context.Context; import java.lang.ref.ReferenceQueue; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLWarning; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -117,6 +124,7 @@ public class BigQueryStatement extends BigQueryNoOpsStatement { private int fetchSize; private String scriptQuery; private Map extraLabels = new HashMap<>(); + protected Context otelContext = null; private BigQueryReadClient bigQueryReadClient = null; private final BigQuery bigQuery; @@ -233,40 +241,73 @@ private BigQuerySettings generateBigQuerySettings() { */ @Override public ResultSet executeQuery(String sql) throws SQLException { - // TODO: write method to return state variables to original state. LOG.finest("++enter++"); - logQueryExecutionStart(sql); - try { - QueryJobConfiguration jobConfiguration = - setDestinationDatasetAndTableInJobConfig(getJobConfig(sql).build()); - runQuery(sql, jobConfiguration); - } catch (InterruptedException ex) { - throw new BigQueryJdbcException(ex); - } + Tracer tracer = getSafeTracer(); + Span span = tracer.spanBuilder("BigQueryStatement.executeQuery").startSpan(); + try (Scope scope = span.makeCurrent()) { + span.setAttribute("db.statement", sql); + this.otelContext = Context.current(); + logQueryExecutionStart(sql); + try { + QueryJobConfiguration jobConfiguration = + setDestinationDatasetAndTableInJobConfig(getJobConfig(sql).build()); + runQuery(sql, jobConfiguration); + } catch (InterruptedException ex) { + span.recordException(ex); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw new BigQueryJdbcException(ex); + } catch (Exception ex) { + span.recordException(ex); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } - if (!isSingularResultSet()) { - throw new BigQueryJdbcException( - "Query returned more than one or didn't return any ResultSet."); + if (!isSingularResultSet()) { + BigQueryJdbcException ex = new BigQueryJdbcException( + "Query returned more than one or didn't return any ResultSet."); + span.recordException(ex); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } + // This contains all the other assertions spec required on this method + return getCurrentResultSet(); + } finally { + span.end(); } - // This contains all the other assertions spec required on this method - return getCurrentResultSet(); } @Override public long executeLargeUpdate(String sql) throws SQLException { LOG.finest("++enter++"); - logQueryExecutionStart(sql); - try { - QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql); - runQuery(sql, jobConfiguration.build()); - } catch (InterruptedException ex) { - throw new BigQueryJdbcRuntimeException(ex); - } - if (this.currentUpdateCount == -1) { - throw new BigQueryJdbcException( - "Update query expected to return affected row count. Double check query type."); + Tracer tracer = getSafeTracer(); + Span span = tracer.spanBuilder("BigQueryStatement.executeLargeUpdate").startSpan(); + try (Scope scope = span.makeCurrent()) { + span.setAttribute("db.statement", sql); + this.otelContext = Context.current(); + logQueryExecutionStart(sql); + try { + QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql); + runQuery(sql, jobConfiguration.build()); + } catch (InterruptedException ex) { + span.recordException(ex); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw new BigQueryJdbcRuntimeException(ex); + } catch (Exception ex) { + span.recordException(ex); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } + if (this.currentUpdateCount == -1) { + BigQueryJdbcException ex = new BigQueryJdbcException( + "Update query expected to return affected row count. Double check query type."); + span.recordException(ex); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } + return this.currentUpdateCount; + } finally { + span.end(); } - return this.currentUpdateCount; } @Override @@ -288,18 +329,32 @@ int checkUpdateCount(long updateCount) { @Override public boolean execute(String sql) throws SQLException { LOG.finest("++enter++"); - logQueryExecutionStart(sql); - try { - QueryJobConfiguration jobConfiguration = getJobConfig(sql).build(); - // If Large Results are enabled, ensure query type is SELECT - if (isLargeResultsEnabled() && getQueryType(jobConfiguration, null) == SqlType.SELECT) { - jobConfiguration = setDestinationDatasetAndTableInJobConfig(jobConfiguration); + Tracer tracer = getSafeTracer(); + Span span = tracer.spanBuilder("BigQueryStatement.execute").startSpan(); + try (Scope scope = span.makeCurrent()) { + span.setAttribute("db.statement", sql); + this.otelContext = Context.current(); + logQueryExecutionStart(sql); + try { + QueryJobConfiguration jobConfiguration = getJobConfig(sql).build(); + // If Large Results are enabled, ensure query type is SELECT + if (isLargeResultsEnabled() && getQueryType(jobConfiguration, null) == SqlType.SELECT) { + jobConfiguration = setDestinationDatasetAndTableInJobConfig(jobConfiguration); + } + runQuery(sql, jobConfiguration); + } catch (InterruptedException ex) { + span.recordException(ex); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw new BigQueryJdbcRuntimeException(ex); + } catch (Exception ex) { + span.recordException(ex); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; } - runQuery(sql, jobConfiguration); - } catch (InterruptedException ex) { - throw new BigQueryJdbcRuntimeException(ex); + return getCurrentResultSet() != null; + } finally { + span.end(); } - return getCurrentResultSet() != null; } StatementType getStatementType(QueryJobConfiguration queryJobConfiguration) throws SQLException { @@ -810,9 +865,10 @@ Thread populateArrowBufferedQueue( LOG.finest("++enter++"); Runnable arrowStreamProcessor = - () -> { - long rowsRead = 0; - int retryCount = 0; + Context.current().wrap( + () -> { + long rowsRead = 0; + int retryCount = 0; try { // Use the first stream to perform reading. String streamName = readSession.getStreams(0).getName(); @@ -880,7 +936,7 @@ Thread populateArrowBufferedQueue( } finally { // logic needed for graceful shutdown enqueueEndOfStream(arrowBatchWrapperBlockingQueue); } - }; + }); Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(arrowStreamProcessor); populateBufferWorker.start(); @@ -1029,53 +1085,70 @@ Thread runNextPageTaskAsync( // calls populateFirstPage(result, rpcResponseQueue); + Context asyncContext = (this.otelContext != null) ? this.otelContext : Context.current(); + // This thread makes the RPC calls and paginates Runnable nextPageTask = - () -> { - String currentPageToken = firstPageToken; - TableResult currentResults = result; - TableId destinationTable = null; - if (firstPageToken != null) { - destinationTable = getDestinationTable(jobId); - } - - try { - while (currentPageToken != null) { - // do not process further pages and shutdown - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { - LOG.warning( - "%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName()); - break; + asyncContext.wrap( + () -> { + Tracer tracer = getSafeTracer(); + String currentPageToken = firstPageToken; + TableResult currentResults = result; + TableId destinationTable = null; + if (firstPageToken != null) { + destinationTable = getDestinationTable(jobId); } - long startTime = System.nanoTime(); - currentResults = - this.bigQuery.listTableData( - destinationTable, - TableDataListOption.pageSize(querySettings.getMaxResultPerPage()), - TableDataListOption.pageToken(currentPageToken)); - - currentPageToken = currentResults.getNextPageToken(); - // this will be parsed asynchronously without blocking the current - // thread - Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(currentResults, true)); - LOG.fine( - "Fetched %d results from the server in %d ms.", - querySettings.getMaxResultPerPage(), - (int) ((System.nanoTime() - startTime) / 1000000)); - } - } catch (Exception ex) { - Uninterruptibles.putUninterruptibly( - bigQueryFieldValueListWrapperBlockingQueue, - BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex))); - } finally { - // this will stop the parseDataTask as well when the pagination - // completes - Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(null, false)); - } - // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not - // have finished processing the records and even that will be interrupted - }; + try { + while (currentPageToken != null) { + // do not process further pages and shutdown + if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + LOG.warning( + "%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName()); + break; + } + + Span paginationSpan = tracer.spanBuilder("BigQueryStatement.pagination").startSpan(); + try (Scope scope = paginationSpan.makeCurrent()) { + paginationSpan.setAttribute("db.pagination.page_token", currentPageToken); + + long startTime = System.nanoTime(); + currentResults = + this.bigQuery.listTableData( + destinationTable, + TableDataListOption.pageSize(querySettings.getMaxResultPerPage()), + TableDataListOption.pageToken(currentPageToken)); + + long duration = (System.nanoTime() - startTime) / 1000000; + paginationSpan.setAttribute("db.pagination.duration_ms", duration); + paginationSpan.setAttribute("db.pagination.rows_fetched", querySettings.getMaxResultPerPage()); + + currentPageToken = currentResults.getNextPageToken(); + // this will be parsed asynchronously without blocking the current thread + Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(currentResults, true)); + LOG.fine( + "Fetched %d results from the server in %d ms.", + querySettings.getMaxResultPerPage(), + (int) duration); + } catch (Exception e) { + paginationSpan.recordException(e); + paginationSpan.setStatus(StatusCode.ERROR, e.getMessage()); + throw e; + } finally { + paginationSpan.end(); + } + } + } catch (Exception ex) { + Uninterruptibles.putUninterruptibly( + bigQueryFieldValueListWrapperBlockingQueue, + BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex))); + } finally { + // this will stop the parseDataTask as well when the pagination completes + Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(null, false)); + } + // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not + // have finished processing the records and even that will be interrupted + }); Thread nextPageWorker = JDBC_THREAD_FACTORY.newThread(nextPageTask); nextPageWorker.start(); @@ -1094,8 +1167,9 @@ Thread parseAndPopulateRpcDataAsync( LOG.finest("++enter++"); Runnable populateBufferRunnable = - () -> { // producer thread populating the buffer - try { + Context.current().wrap( + () -> { // producer thread populating the buffer + try { Iterable fieldValueLists; // as we have to process the first page boolean hasRows = true; @@ -1150,7 +1224,7 @@ Thread parseAndPopulateRpcDataAsync( } finally { enqueueBufferEndOfStream(bigQueryFieldValueListWrapperBlockingQueue); } - }; + }); Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(populateBufferRunnable); populateBufferWorker.start(); @@ -1402,29 +1476,42 @@ public void clearBatch() { @Override public int[] executeBatch() throws SQLException { LOG.finest("++enter++"); - int[] result = new int[this.batchQueries.size()]; - if (this.batchQueries.isEmpty()) { - return result; - } + Tracer tracer = getSafeTracer(); + Span span = tracer.spanBuilder("BigQueryStatement.executeBatch").startSpan(); + try (Scope scope = span.makeCurrent()) { + span.setAttribute("db.statement.count", this.batchQueries.size()); + this.otelContext = Context.current(); + + int[] result = new int[this.batchQueries.size()]; + if (this.batchQueries.isEmpty()) { + return result; + } - try { - String combinedQueries = String.join("", this.batchQueries); - QueryJobConfiguration.Builder jobConfiguration = getJobConfig(combinedQueries); - jobConfiguration.setPriority(QueryJobConfiguration.Priority.BATCH); - runQuery(combinedQueries, jobConfiguration.build()); - } catch (InterruptedException ex) { - throw new BigQueryJdbcRuntimeException(ex); - } + try { + String combinedQueries = String.join("", this.batchQueries); + QueryJobConfiguration.Builder jobConfiguration = getJobConfig(combinedQueries); + jobConfiguration.setPriority(QueryJobConfiguration.Priority.BATCH); + runQuery(combinedQueries, jobConfiguration.build()); + } catch (InterruptedException ex) { + throw new BigQueryJdbcRuntimeException(ex); + } - int i = 0; - while (getUpdateCount() != -1 && i < this.batchQueries.size()) { - result[i] = getUpdateCount(); - getMoreResults(); - i++; - } + int i = 0; + while (getUpdateCount() != -1 && i < this.batchQueries.size()) { + result[i] = getUpdateCount(); + getMoreResults(); + i++; + } - clearBatch(); - return result; + clearBatch(); + return result; + } catch (Exception e) { + span.recordException(e); + span.setStatus(StatusCode.ERROR, e.getMessage()); + throw e; + } finally { + span.end(); + } } @Override @@ -1573,4 +1660,22 @@ private void enqueueBufferError(BlockingQueue que private void enqueueBufferEndOfStream(BlockingQueue queue) { Uninterruptibles.putUninterruptibly(queue, BigQueryFieldValueListWrapper.of(null, null, true)); } + + /** + * Gets the OpenTelemetry Context from the statement execution. Used by ResultSet for + * pagination span context. + */ + private Tracer getSafeTracer() { + if (connection != null) { + Tracer tracer = connection.getTracer(); + if (tracer != null) { + return tracer; + } + } + return GlobalOpenTelemetry.getTracer("google-cloud-bigquery-jdbc-noop"); + } + + public Context getOtelContext() { + return this.otelContext; + } } From 122c14eaf887fd979cfc10bf2c422404aec68474 Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 20 Mar 2026 18:36:43 +0000 Subject: [PATCH 04/10] lint: fix lint --- .../bigquery/jdbc/BigQueryConnection.java | 3 +- .../jdbc/BigQueryJdbcOpenTelemetry.java | 47 +-- .../bigquery/jdbc/BigQueryStatement.java | 306 +++++++++--------- .../cloud/bigquery/jdbc/DataSource.java | 4 +- 4 files changed, 189 insertions(+), 171 deletions(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java index 808506434469..dfbc69507ee6 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java @@ -943,7 +943,8 @@ private BigQuery getBigQueryConnection() { } OpenTelemetry openTelemetry = - BigQueryJdbcOpenTelemetry.getOpenTelemetry(this.enableOpenTelemetry, this.openTelemetryExporter); + BigQueryJdbcOpenTelemetry.getOpenTelemetry( + this.enableOpenTelemetry, this.openTelemetryExporter); if (this.enableOpenTelemetry) { this.tracer = BigQueryJdbcOpenTelemetry.getTracer(openTelemetry); bigQueryOptions.setOpenTelemetryTracer(this.tracer); diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java index 7b15f0ca5377..830034d7c578 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java @@ -16,6 +16,8 @@ package com.google.cloud.bigquery.jdbc; +import com.google.cloud.opentelemetry.trace.TraceConfiguration; +import com.google.cloud.opentelemetry.trace.TraceExporter; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Tracer; @@ -23,21 +25,20 @@ import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; import io.opentelemetry.sdk.trace.export.SpanExporter; -import com.google.cloud.opentelemetry.trace.TraceConfiguration; -import com.google.cloud.opentelemetry.trace.TraceExporter; import java.io.IOException; public class BigQueryJdbcOpenTelemetry { - private static final BigQueryJdbcCustomLogger LOG = new BigQueryJdbcCustomLogger(BigQueryJdbcOpenTelemetry.class.getName()); + private static final BigQueryJdbcCustomLogger LOG = + new BigQueryJdbcCustomLogger(BigQueryJdbcOpenTelemetry.class.getName()); private static final Object lock = new Object(); private static volatile OpenTelemetrySdk autoConfiguredOpenTelemetry; private static volatile boolean initialized = false; private static final String INSTRUMENTATION_SCOPE_NAME = "com.google.cloud.bigquery.jdbc"; /** - * Initializes or returns the OpenTelemetry instance based on hybrid logic. - * Prefer GlobalOpenTelemetry; fallback to an auto-configured GCP exporter if requested. + * Initializes or returns the OpenTelemetry instance based on hybrid logic. Prefer + * GlobalOpenTelemetry; fallback to an auto-configured GCP exporter if requested. */ public static OpenTelemetry getOpenTelemetry(boolean enableOpenTelemetry, String exporterType) { if (!enableOpenTelemetry) { @@ -52,16 +53,15 @@ public static OpenTelemetry getOpenTelemetry(boolean enableOpenTelemetry, String return globalOtel; } - /** - * Gets a Tracer for the JDBC driver instrumentation scope. - */ + /** Gets a Tracer for the JDBC driver instrumentation scope. */ public static Tracer getTracer(OpenTelemetry openTelemetry) { return openTelemetry.getTracer(INSTRUMENTATION_SCOPE_NAME); } /** - * TODO(b/491245568): Attaches the OpenTelemetry logging bridge to the root java.util.logging.Logger. - * This is currently a no-op due to shading issues with `opentelemetry-appender-jul`. + * TODO(b/491245568): Attaches the OpenTelemetry logging bridge to the root + * java.util.logging.Logger. This is currently a no-op due to shading issues with + * `opentelemetry-appender-jul`. */ public static void attachLoggingBridge() { // No-op for now. @@ -93,7 +93,8 @@ private static OpenTelemetrySdk initGcpOpenTelemetry() throws IOException { TraceConfiguration configuration = TraceConfiguration.builder().build(); traceExporter = TraceExporter.createWithConfiguration(configuration); } catch (Exception e) { - throw new RuntimeException("Could not create TraceExporter. Ensure exporter-trace is on classpath.", e); + throw new RuntimeException( + "Could not create TraceExporter. Ensure exporter-trace is on classpath.", e); } SdkTracerProvider tracerProvider = @@ -101,18 +102,18 @@ private static OpenTelemetrySdk initGcpOpenTelemetry() throws IOException { .addSpanProcessor(BatchSpanProcessor.builder(traceExporter).build()) .build(); - OpenTelemetrySdk sdk = - OpenTelemetrySdk.builder() - .setTracerProvider(tracerProvider) - .build(); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - tracerProvider.close(); - } catch (Exception e) { - LOG.warning("Error closing tracer provider: " + e.getMessage()); - } - })); + OpenTelemetrySdk sdk = OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build(); + + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + try { + tracerProvider.close(); + } catch (Exception e) { + LOG.warning("Error closing tracer provider: " + e.getMessage()); + } + })); return sdk; } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index 6852e10cdc08..6bde973d2495 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -57,18 +57,17 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.util.concurrent.Uninterruptibles; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import java.lang.ref.ReferenceQueue; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLWarning; - -import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.context.Scope; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -263,8 +262,9 @@ public ResultSet executeQuery(String sql) throws SQLException { } if (!isSingularResultSet()) { - BigQueryJdbcException ex = new BigQueryJdbcException( - "Query returned more than one or didn't return any ResultSet."); + BigQueryJdbcException ex = + new BigQueryJdbcException( + "Query returned more than one or didn't return any ResultSet."); span.recordException(ex); span.setStatus(StatusCode.ERROR, ex.getMessage()); throw ex; @@ -298,8 +298,9 @@ public long executeLargeUpdate(String sql) throws SQLException { throw ex; } if (this.currentUpdateCount == -1) { - BigQueryJdbcException ex = new BigQueryJdbcException( - "Update query expected to return affected row count. Double check query type."); + BigQueryJdbcException ex = + new BigQueryJdbcException( + "Update query expected to return affected row count. Double check query type."); span.recordException(ex); span.setStatus(StatusCode.ERROR, ex.getMessage()); throw ex; @@ -865,78 +866,84 @@ Thread populateArrowBufferedQueue( LOG.finest("++enter++"); Runnable arrowStreamProcessor = - Context.current().wrap( - () -> { - long rowsRead = 0; - int retryCount = 0; - try { - // Use the first stream to perform reading. - String streamName = readSession.getStreams(0).getName(); - - while (true) { - try { - ReadRowsRequest readRowsRequest = - ReadRowsRequest.newBuilder() - .setReadStream(streamName) - .setOffset(rowsRead) - .build(); - - // Process each block of rows as they arrive and decode using our simple row reader. - com.google.api.gax.rpc.ServerStream stream = - bqReadClient.readRowsCallable().call(readRowsRequest); - for (ReadRowsResponse response : stream) { - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { - break; + Context.current() + .wrap( + () -> { + long rowsRead = 0; + int retryCount = 0; + try { + // Use the first stream to perform reading. + String streamName = readSession.getStreams(0).getName(); + + while (true) { + try { + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder() + .setReadStream(streamName) + .setOffset(rowsRead) + .build(); + + // Process each block of rows as they arrive and decode using our simple row + // reader. + com.google.api.gax.rpc.ServerStream stream = + bqReadClient.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + if (Thread.currentThread().isInterrupted() + || queryTaskExecutor.isShutdown()) { + break; + } + + ArrowRecordBatch currentBatch = response.getArrowRecordBatch(); + Uninterruptibles.putUninterruptibly( + arrowBatchWrapperBlockingQueue, + BigQueryArrowBatchWrapper.of(currentBatch)); + rowsRead += response.getRowCount(); + } + break; + } catch (com.google.api.gax.rpc.ApiException e) { + if (e.getStatusCode().getCode() + == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) { + LOG.warning("Read session expired or not found: %s", e.getMessage()); + enqueueError(arrowBatchWrapperBlockingQueue, e); + break; + } + if (retryCount >= MAX_RETRY_COUNT) { + LOG.log( + Level.SEVERE, + "\n" + + Thread.currentThread().getName() + + " Interrupted @ arrowStreamProcessor, max retries exceeded", + e); + enqueueError(arrowBatchWrapperBlockingQueue, e); + break; + } + retryCount++; + LOG.warning( + "Connection interrupted during arrow stream read, retrying. attempt: %d", + retryCount); + Thread.sleep(RETRY_DELAY_MS); + } + } + + } catch (InterruptedException e) { + LOG.log( + Level.WARNING, + "\n" + + Thread.currentThread().getName() + + " Interrupted @ arrowStreamProcessor", + e); + enqueueError(arrowBatchWrapperBlockingQueue, e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Error @ arrowStreamProcessor", + e); + enqueueError(arrowBatchWrapperBlockingQueue, e); + } finally { // logic needed for graceful shutdown + enqueueEndOfStream(arrowBatchWrapperBlockingQueue); } - - ArrowRecordBatch currentBatch = response.getArrowRecordBatch(); - Uninterruptibles.putUninterruptibly( - arrowBatchWrapperBlockingQueue, BigQueryArrowBatchWrapper.of(currentBatch)); - rowsRead += response.getRowCount(); - } - break; - } catch (com.google.api.gax.rpc.ApiException e) { - if (e.getStatusCode().getCode() - == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) { - LOG.warning("Read session expired or not found: %s", e.getMessage()); - enqueueError(arrowBatchWrapperBlockingQueue, e); - break; - } - if (retryCount >= MAX_RETRY_COUNT) { - LOG.log( - Level.SEVERE, - "\n" - + Thread.currentThread().getName() - + " Interrupted @ arrowStreamProcessor, max retries exceeded", - e); - enqueueError(arrowBatchWrapperBlockingQueue, e); - break; - } - retryCount++; - LOG.warning( - "Connection interrupted during arrow stream read, retrying. attempt: %d", - retryCount); - Thread.sleep(RETRY_DELAY_MS); - } - } - - } catch (InterruptedException e) { - LOG.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Interrupted @ arrowStreamProcessor", - e); - enqueueError(arrowBatchWrapperBlockingQueue, e); - Thread.currentThread().interrupt(); - } catch (Exception e) { - LOG.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Error @ arrowStreamProcessor", - e); - enqueueError(arrowBatchWrapperBlockingQueue, e); - } finally { // logic needed for graceful shutdown - enqueueEndOfStream(arrowBatchWrapperBlockingQueue); - } - }); + }); Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(arrowStreamProcessor); populateBufferWorker.start(); @@ -1108,7 +1115,8 @@ Thread runNextPageTaskAsync( break; } - Span paginationSpan = tracer.spanBuilder("BigQueryStatement.pagination").startSpan(); + Span paginationSpan = + tracer.spanBuilder("BigQueryStatement.pagination").startSpan(); try (Scope scope = paginationSpan.makeCurrent()) { paginationSpan.setAttribute("db.pagination.page_token", currentPageToken); @@ -1121,15 +1129,16 @@ Thread runNextPageTaskAsync( long duration = (System.nanoTime() - startTime) / 1000000; paginationSpan.setAttribute("db.pagination.duration_ms", duration); - paginationSpan.setAttribute("db.pagination.rows_fetched", querySettings.getMaxResultPerPage()); + paginationSpan.setAttribute( + "db.pagination.rows_fetched", querySettings.getMaxResultPerPage()); currentPageToken = currentResults.getNextPageToken(); // this will be parsed asynchronously without blocking the current thread - Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(currentResults, true)); + Uninterruptibles.putUninterruptibly( + rpcResponseQueue, Tuple.of(currentResults, true)); LOG.fine( "Fetched %d results from the server in %d ms.", - querySettings.getMaxResultPerPage(), - (int) duration); + querySettings.getMaxResultPerPage(), (int) duration); } catch (Exception e) { paginationSpan.recordException(e); paginationSpan.setStatus(StatusCode.ERROR, e.getMessage()); @@ -1167,64 +1176,69 @@ Thread parseAndPopulateRpcDataAsync( LOG.finest("++enter++"); Runnable populateBufferRunnable = - Context.current().wrap( - () -> { // producer thread populating the buffer - try { - Iterable fieldValueLists; - // as we have to process the first page - boolean hasRows = true; - while (hasRows) { - try { - Tuple nextPageTuple = rpcResponseQueue.take(); - if (nextPageTuple.x() != null) { - fieldValueLists = nextPageTuple.x().getValues(); - } else { - fieldValueLists = null; - } - hasRows = nextPageTuple.y(); - - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); - // Thread might get interrupted while calling the Cancel method, which is - // expected, so logging this instead of throwing the exception back - break; - } - - if (Thread.currentThread().isInterrupted() - || queryTaskExecutor.isShutdown() - || fieldValueLists == null) { - // do not process further pages and shutdown (outerloop) - break; - } - - long startTime = System.nanoTime(); - long results = 0; - for (FieldValueList fieldValueList : fieldValueLists) { - - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { - // do not process further pages and shutdown (inner loop) - break; - } - Uninterruptibles.putUninterruptibly( - bigQueryFieldValueListWrapperBlockingQueue, - BigQueryFieldValueListWrapper.of(schema.getFields(), fieldValueList)); - results += 1; - } - LOG.fine( - "Processed %d results in %d ms.", - results, (int) ((System.nanoTime() - startTime) / 1000000)); - } - - } catch (Exception ex) { - LOG.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Error @ populateBufferAsync", - ex); - enqueueBufferError(bigQueryFieldValueListWrapperBlockingQueue, ex); - } finally { - enqueueBufferEndOfStream(bigQueryFieldValueListWrapperBlockingQueue); - } - }); + Context.current() + .wrap( + () -> { // producer thread populating the buffer + try { + Iterable fieldValueLists; + // as we have to process the first page + boolean hasRows = true; + while (hasRows) { + try { + Tuple nextPageTuple = rpcResponseQueue.take(); + if (nextPageTuple.x() != null) { + fieldValueLists = nextPageTuple.x().getValues(); + } else { + fieldValueLists = null; + } + hasRows = nextPageTuple.y(); + + } catch (InterruptedException e) { + LOG.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted", + e); + // Thread might get interrupted while calling the Cancel method, which is + // expected, so logging this instead of throwing the exception back + break; + } + + if (Thread.currentThread().isInterrupted() + || queryTaskExecutor.isShutdown() + || fieldValueLists == null) { + // do not process further pages and shutdown (outerloop) + break; + } + + long startTime = System.nanoTime(); + long results = 0; + for (FieldValueList fieldValueList : fieldValueLists) { + + if (Thread.currentThread().isInterrupted() + || queryTaskExecutor.isShutdown()) { + // do not process further pages and shutdown (inner loop) + break; + } + Uninterruptibles.putUninterruptibly( + bigQueryFieldValueListWrapperBlockingQueue, + BigQueryFieldValueListWrapper.of(schema.getFields(), fieldValueList)); + results += 1; + } + LOG.fine( + "Processed %d results in %d ms.", + results, (int) ((System.nanoTime() - startTime) / 1000000)); + } + + } catch (Exception ex) { + LOG.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Error @ populateBufferAsync", + ex); + enqueueBufferError(bigQueryFieldValueListWrapperBlockingQueue, ex); + } finally { + enqueueBufferEndOfStream(bigQueryFieldValueListWrapperBlockingQueue); + } + }); Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(populateBufferRunnable); populateBufferWorker.start(); @@ -1662,8 +1676,8 @@ private void enqueueBufferEndOfStream(BlockingQueue Date: Mon, 23 Mar 2026 21:08:34 +0000 Subject: [PATCH 05/10] chore: refactor spans --- .../bigquery/jdbc/BigQueryStatement.java | 46 +++++++------------ 1 file changed, 16 insertions(+), 30 deletions(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index 6bde973d2495..3ce6c94a101b 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -252,25 +252,19 @@ public ResultSet executeQuery(String sql) throws SQLException { setDestinationDatasetAndTableInJobConfig(getJobConfig(sql).build()); runQuery(sql, jobConfiguration); } catch (InterruptedException ex) { - span.recordException(ex); - span.setStatus(StatusCode.ERROR, ex.getMessage()); throw new BigQueryJdbcException(ex); - } catch (Exception ex) { - span.recordException(ex); - span.setStatus(StatusCode.ERROR, ex.getMessage()); - throw ex; } if (!isSingularResultSet()) { - BigQueryJdbcException ex = - new BigQueryJdbcException( - "Query returned more than one or didn't return any ResultSet."); - span.recordException(ex); - span.setStatus(StatusCode.ERROR, ex.getMessage()); - throw ex; + throw new BigQueryJdbcException( + "Query returned more than one or didn't return any ResultSet."); } // This contains all the other assertions spec required on this method return getCurrentResultSet(); + } catch (Exception ex) { + span.recordException(ex); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; } finally { span.end(); } @@ -289,23 +283,17 @@ public long executeLargeUpdate(String sql) throws SQLException { QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql); runQuery(sql, jobConfiguration.build()); } catch (InterruptedException ex) { - span.recordException(ex); - span.setStatus(StatusCode.ERROR, ex.getMessage()); throw new BigQueryJdbcRuntimeException(ex); - } catch (Exception ex) { - span.recordException(ex); - span.setStatus(StatusCode.ERROR, ex.getMessage()); - throw ex; } if (this.currentUpdateCount == -1) { - BigQueryJdbcException ex = - new BigQueryJdbcException( - "Update query expected to return affected row count. Double check query type."); - span.recordException(ex); - span.setStatus(StatusCode.ERROR, ex.getMessage()); - throw ex; + throw new BigQueryJdbcException( + "Update query expected to return affected row count. Double check query type."); } return this.currentUpdateCount; + } catch (Exception ex) { + span.recordException(ex); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; } finally { span.end(); } @@ -344,15 +332,13 @@ public boolean execute(String sql) throws SQLException { } runQuery(sql, jobConfiguration); } catch (InterruptedException ex) { - span.recordException(ex); - span.setStatus(StatusCode.ERROR, ex.getMessage()); throw new BigQueryJdbcRuntimeException(ex); - } catch (Exception ex) { - span.recordException(ex); - span.setStatus(StatusCode.ERROR, ex.getMessage()); - throw ex; } return getCurrentResultSet() != null; + } catch (Exception ex) { + span.recordException(ex); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; } finally { span.end(); } From 927599a3373e719bcc878901da40860740c26b5e Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Mon, 23 Mar 2026 21:26:33 +0000 Subject: [PATCH 06/10] chore: fix dependencies --- .../google-cloud-bigquery-jdbc/pom.xml | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml index cb687e934879..3bcb4d5f0f28 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml +++ b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml @@ -166,6 +166,22 @@ google-cloud-bigquery-parent 2.62.0 + + + + + org.slf4j + slf4j-api + 2.0.16 + + + com.google.re2j + re2j + 1.8 + + + + com.google.cloud @@ -297,24 +313,15 @@ io.opentelemetry opentelemetry-sdk - 1.37.0 com.google.cloud.opentelemetry exporter-trace 0.33.0 - - - org.slf4j - slf4j-api - 2.0.16 - - - com.google.re2j - re2j - 1.8 + io.opentelemetry + opentelemetry-sdk-trace From cb578ac5bd819498f1c7937007ed126c88701043 Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Mon, 23 Mar 2026 21:31:36 +0000 Subject: [PATCH 07/10] fix: remove versions from dependencies --- java-bigquery/google-cloud-bigquery-jdbc/pom.xml | 2 -- 1 file changed, 2 deletions(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml index 3bcb4d5f0f28..29570077b102 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml +++ b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml @@ -301,12 +301,10 @@ io.opentelemetry opentelemetry-api - 1.37.0 io.opentelemetry opentelemetry-context - 1.37.0 From d9418fc3775f2d325f7de0b66b6d797422c24c9a Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Tue, 24 Mar 2026 00:14:02 +0000 Subject: [PATCH 08/10] refactor: create functional interface `withTracing` --- .../bigquery/jdbc/BigQueryStatement.java | 206 +++++++++--------- 1 file changed, 99 insertions(+), 107 deletions(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index 3ce6c94a101b..9dbb79ea2151 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -241,62 +241,48 @@ private BigQuerySettings generateBigQuerySettings() { @Override public ResultSet executeQuery(String sql) throws SQLException { LOG.finest("++enter++"); - Tracer tracer = getSafeTracer(); - Span span = tracer.spanBuilder("BigQueryStatement.executeQuery").startSpan(); - try (Scope scope = span.makeCurrent()) { - span.setAttribute("db.statement", sql); - this.otelContext = Context.current(); - logQueryExecutionStart(sql); - try { - QueryJobConfiguration jobConfiguration = - setDestinationDatasetAndTableInJobConfig(getJobConfig(sql).build()); - runQuery(sql, jobConfiguration); - } catch (InterruptedException ex) { - throw new BigQueryJdbcException(ex); - } + return withTracing( + "BigQueryStatement.executeQuery", + (span) -> { + span.setAttribute("db.statement", sql); + logQueryExecutionStart(sql); + try { + QueryJobConfiguration jobConfiguration = + setDestinationDatasetAndTableInJobConfig(getJobConfig(sql).build()); + runQuery(sql, jobConfiguration); + } catch (InterruptedException ex) { + throw new BigQueryJdbcException(ex); + } - if (!isSingularResultSet()) { - throw new BigQueryJdbcException( - "Query returned more than one or didn't return any ResultSet."); - } - // This contains all the other assertions spec required on this method - return getCurrentResultSet(); - } catch (Exception ex) { - span.recordException(ex); - span.setStatus(StatusCode.ERROR, ex.getMessage()); - throw ex; - } finally { - span.end(); - } + if (!isSingularResultSet()) { + throw new BigQueryJdbcException( + "Query returned more than one or didn't return any ResultSet."); + } + // This contains all the other assertions spec required on this method + return getCurrentResultSet(); + }); } @Override public long executeLargeUpdate(String sql) throws SQLException { LOG.finest("++enter++"); - Tracer tracer = getSafeTracer(); - Span span = tracer.spanBuilder("BigQueryStatement.executeLargeUpdate").startSpan(); - try (Scope scope = span.makeCurrent()) { - span.setAttribute("db.statement", sql); - this.otelContext = Context.current(); - logQueryExecutionStart(sql); - try { - QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql); - runQuery(sql, jobConfiguration.build()); - } catch (InterruptedException ex) { - throw new BigQueryJdbcRuntimeException(ex); - } - if (this.currentUpdateCount == -1) { - throw new BigQueryJdbcException( - "Update query expected to return affected row count. Double check query type."); - } - return this.currentUpdateCount; - } catch (Exception ex) { - span.recordException(ex); - span.setStatus(StatusCode.ERROR, ex.getMessage()); - throw ex; - } finally { - span.end(); - } + return withTracing( + "BigQueryStatement.executeLargeUpdate", + (span) -> { + span.setAttribute("db.statement", sql); + logQueryExecutionStart(sql); + try { + QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql); + runQuery(sql, jobConfiguration.build()); + } catch (InterruptedException ex) { + throw new BigQueryJdbcRuntimeException(ex); + } + if (this.currentUpdateCount == -1) { + throw new BigQueryJdbcException( + "Update query expected to return affected row count. Double check query type."); + } + return this.currentUpdateCount; + }); } @Override @@ -318,30 +304,23 @@ int checkUpdateCount(long updateCount) { @Override public boolean execute(String sql) throws SQLException { LOG.finest("++enter++"); - Tracer tracer = getSafeTracer(); - Span span = tracer.spanBuilder("BigQueryStatement.execute").startSpan(); - try (Scope scope = span.makeCurrent()) { - span.setAttribute("db.statement", sql); - this.otelContext = Context.current(); - logQueryExecutionStart(sql); - try { - QueryJobConfiguration jobConfiguration = getJobConfig(sql).build(); - // If Large Results are enabled, ensure query type is SELECT - if (isLargeResultsEnabled() && getQueryType(jobConfiguration, null) == SqlType.SELECT) { - jobConfiguration = setDestinationDatasetAndTableInJobConfig(jobConfiguration); - } - runQuery(sql, jobConfiguration); - } catch (InterruptedException ex) { - throw new BigQueryJdbcRuntimeException(ex); - } - return getCurrentResultSet() != null; - } catch (Exception ex) { - span.recordException(ex); - span.setStatus(StatusCode.ERROR, ex.getMessage()); - throw ex; - } finally { - span.end(); - } + return withTracing( + "BigQueryStatement.execute", + (span) -> { + span.setAttribute("db.statement", sql); + logQueryExecutionStart(sql); + try { + QueryJobConfiguration jobConfiguration = getJobConfig(sql).build(); + // If Large Results are enabled, ensure query type is SELECT + if (isLargeResultsEnabled() && getQueryType(jobConfiguration, null) == SqlType.SELECT) { + jobConfiguration = setDestinationDatasetAndTableInJobConfig(jobConfiguration); + } + runQuery(sql, jobConfiguration); + } catch (InterruptedException ex) { + throw new BigQueryJdbcRuntimeException(ex); + } + return getCurrentResultSet() != null; + }); } StatementType getStatementType(QueryJobConfiguration queryJobConfiguration) throws SQLException { @@ -1476,42 +1455,35 @@ public void clearBatch() { @Override public int[] executeBatch() throws SQLException { LOG.finest("++enter++"); - Tracer tracer = getSafeTracer(); - Span span = tracer.spanBuilder("BigQueryStatement.executeBatch").startSpan(); - try (Scope scope = span.makeCurrent()) { - span.setAttribute("db.statement.count", this.batchQueries.size()); - this.otelContext = Context.current(); - - int[] result = new int[this.batchQueries.size()]; - if (this.batchQueries.isEmpty()) { - return result; - } + return withTracing( + "BigQueryStatement.executeBatch", + (span) -> { + span.setAttribute("db.statement.count", this.batchQueries.size()); + + int[] result = new int[this.batchQueries.size()]; + if (this.batchQueries.isEmpty()) { + return result; + } - try { - String combinedQueries = String.join("", this.batchQueries); - QueryJobConfiguration.Builder jobConfiguration = getJobConfig(combinedQueries); - jobConfiguration.setPriority(QueryJobConfiguration.Priority.BATCH); - runQuery(combinedQueries, jobConfiguration.build()); - } catch (InterruptedException ex) { - throw new BigQueryJdbcRuntimeException(ex); - } + try { + String combinedQueries = String.join("", this.batchQueries); + QueryJobConfiguration.Builder jobConfiguration = getJobConfig(combinedQueries); + jobConfiguration.setPriority(QueryJobConfiguration.Priority.BATCH); + runQuery(combinedQueries, jobConfiguration.build()); + } catch (InterruptedException ex) { + throw new BigQueryJdbcRuntimeException(ex); + } - int i = 0; - while (getUpdateCount() != -1 && i < this.batchQueries.size()) { - result[i] = getUpdateCount(); - getMoreResults(); - i++; - } + int i = 0; + while (getUpdateCount() != -1 && i < this.batchQueries.size()) { + result[i] = getUpdateCount(); + getMoreResults(); + i++; + } - clearBatch(); - return result; - } catch (Exception e) { - span.recordException(e); - span.setStatus(StatusCode.ERROR, e.getMessage()); - throw e; - } finally { - span.end(); - } + clearBatch(); + return result; + }); } @Override @@ -1661,6 +1633,26 @@ private void enqueueBufferEndOfStream(BlockingQueue { + T run(Span span) throws SQLException; + } + + private T withTracing(String spanName, TracedOperation operation) throws SQLException { + Tracer tracer = getSafeTracer(); + Span span = tracer.spanBuilder(spanName).startSpan(); + try (Scope scope = span.makeCurrent()) { + this.otelContext = Context.current(); + return operation.run(span); + } catch (Exception ex) { + span.recordException(ex); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + /** * Gets the OpenTelemetry Context from the statement execution. Used by ResultSet for pagination * span context. From 7b5660662c418fe2e008259819b8caf2883dc39f Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Tue, 24 Mar 2026 22:45:21 +0000 Subject: [PATCH 09/10] refactor: remove exporter dep and logic --- .../google-cloud-bigquery-jdbc/pom.xml | 28 ------- .../bigquery/jdbc/BigQueryConnection.java | 7 +- .../cloud/bigquery/jdbc/BigQueryDriver.java | 11 ++- .../jdbc/BigQueryJdbcOpenTelemetry.java | 82 ++----------------- .../bigquery/jdbc/BigQueryJdbcUrlUtility.java | 8 -- .../cloud/bigquery/jdbc/DataSource.java | 26 +++--- 6 files changed, 28 insertions(+), 134 deletions(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml index 40da45a85b72..80376924cab1 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml +++ b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml @@ -167,20 +167,6 @@ 2.63.0-SNAPSHOT - - - - org.slf4j - slf4j-api - 2.0.16 - - - com.google.re2j - re2j - 1.8 - - - @@ -307,20 +293,6 @@ opentelemetry-context - - - io.opentelemetry - opentelemetry-sdk - - - com.google.cloud.opentelemetry - exporter-trace - 0.33.0 - - - io.opentelemetry - opentelemetry-sdk-trace - diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java index dfbc69507ee6..7bb9bfa409c1 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java @@ -141,7 +141,7 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { Long listenerPoolSize; String partnerToken; boolean enableOpenTelemetry; - String openTelemetryExporter; + OpenTelemetry customOpenTelemetry; Tracer tracer = OpenTelemetry.noop().getTracer(""); BigQueryConnection(String url) throws IOException { @@ -248,7 +248,7 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { this.listenerPoolSize = ds.getListenerPoolSize(); this.partnerToken = ds.getPartnerToken(); this.enableOpenTelemetry = ds.getEnableOpenTelemetry(); - this.openTelemetryExporter = ds.getOpenTelemetryExporter(); + this.customOpenTelemetry = ds.getCustomOpenTelemetry(); this.headerProvider = createHeaderProvider(); this.bigQuery = getBigQueryConnection(); @@ -944,11 +944,10 @@ private BigQuery getBigQueryConnection() { OpenTelemetry openTelemetry = BigQueryJdbcOpenTelemetry.getOpenTelemetry( - this.enableOpenTelemetry, this.openTelemetryExporter); + this.enableOpenTelemetry, this.customOpenTelemetry); if (this.enableOpenTelemetry) { this.tracer = BigQueryJdbcOpenTelemetry.getTracer(openTelemetry); bigQueryOptions.setOpenTelemetryTracer(this.tracer); - BigQueryJdbcOpenTelemetry.attachLoggingBridge(); } BigQueryOptions options = bigQueryOptions.setHeaderProvider(this.headerProvider).build(); diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java index 930fc42af2bc..02bfd3cc164c 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java @@ -20,6 +20,7 @@ import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException; import io.grpc.LoadBalancerRegistry; import io.grpc.internal.PickFirstLoadBalancerProvider; +import io.opentelemetry.api.OpenTelemetry; import java.io.IOException; import java.sql.Connection; import java.sql.Driver; @@ -121,9 +122,14 @@ public Connection connect(String url, Properties info) throws SQLException { LOG.finest("++enter++"); try { if (acceptsURL(url)) { - // strip 'jdbc:' from the URL, add any extra properties + Properties connectInfo = info == null ? new Properties() : (Properties) info.clone(); + OpenTelemetry customOpenTelemetry = null; + if (connectInfo.containsKey("customOpenTelemetry")) { + customOpenTelemetry = (OpenTelemetry) connectInfo.remove("customOpenTelemetry"); + } String connectionUri = - BigQueryJdbcUrlUtility.appendPropertiesToURL(url.substring(5), this.toString(), info); + BigQueryJdbcUrlUtility.appendPropertiesToURL( + url.substring(5), this.toString(), connectInfo); try { BigQueryJdbcUrlUtility.parseUrl(connectionUri); } catch (BigQueryJdbcRuntimeException e) { @@ -131,6 +137,7 @@ public Connection connect(String url, Properties info) throws SQLException { } DataSource ds = DataSource.fromUrl(connectionUri); + ds.setCustomOpenTelemetry(customOpenTelemetry); // LogLevel String logLevelStr = ds.getLogLevel(); diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java index 830034d7c578..811158bb7dad 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java @@ -16,105 +16,33 @@ package com.google.cloud.bigquery.jdbc; -import com.google.cloud.opentelemetry.trace.TraceConfiguration; -import com.google.cloud.opentelemetry.trace.TraceExporter; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.sdk.OpenTelemetrySdk; -import io.opentelemetry.sdk.trace.SdkTracerProvider; -import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; -import io.opentelemetry.sdk.trace.export.SpanExporter; -import java.io.IOException; public class BigQueryJdbcOpenTelemetry { - private static final BigQueryJdbcCustomLogger LOG = - new BigQueryJdbcCustomLogger(BigQueryJdbcOpenTelemetry.class.getName()); - private static final Object lock = new Object(); - private static volatile OpenTelemetrySdk autoConfiguredOpenTelemetry; - private static volatile boolean initialized = false; private static final String INSTRUMENTATION_SCOPE_NAME = "com.google.cloud.bigquery.jdbc"; /** * Initializes or returns the OpenTelemetry instance based on hybrid logic. Prefer * GlobalOpenTelemetry; fallback to an auto-configured GCP exporter if requested. */ - public static OpenTelemetry getOpenTelemetry(boolean enableOpenTelemetry, String exporterType) { + public static OpenTelemetry getOpenTelemetry( + boolean enableOpenTelemetry, OpenTelemetry customOpenTelemetry) { if (!enableOpenTelemetry) { return OpenTelemetry.noop(); } - OpenTelemetry globalOtel = GlobalOpenTelemetry.get(); - if ("gcp".equalsIgnoreCase(exporterType)) { - return getAutoConfiguredOpenTelemetry(); + if (customOpenTelemetry != null) { + return customOpenTelemetry; } - return globalOtel; + return GlobalOpenTelemetry.get(); } /** Gets a Tracer for the JDBC driver instrumentation scope. */ public static Tracer getTracer(OpenTelemetry openTelemetry) { return openTelemetry.getTracer(INSTRUMENTATION_SCOPE_NAME); } - - /** - * TODO(b/491245568): Attaches the OpenTelemetry logging bridge to the root - * java.util.logging.Logger. This is currently a no-op due to shading issues with - * `opentelemetry-appender-jul`. - */ - public static void attachLoggingBridge() { - // No-op for now. - } - - private static OpenTelemetry getAutoConfiguredOpenTelemetry() { - if (!initialized) { - synchronized (lock) { - if (!initialized) { - try { - autoConfiguredOpenTelemetry = initGcpOpenTelemetry(); - } catch (Exception e) { - LOG.warning("Failed to initialize OpenTelemetry with GCP exporter: " + e.getMessage()); - autoConfiguredOpenTelemetry = null; - } finally { - initialized = true; - } - } - } - } - return autoConfiguredOpenTelemetry != null ? autoConfiguredOpenTelemetry : OpenTelemetry.noop(); - } - - private static OpenTelemetrySdk initGcpOpenTelemetry() throws IOException { - LOG.info("Initializing BigQuery JDBC standalone OpenTelemetry SDK with GCP exporter."); - - SpanExporter traceExporter; - try { - TraceConfiguration configuration = TraceConfiguration.builder().build(); - traceExporter = TraceExporter.createWithConfiguration(configuration); - } catch (Exception e) { - throw new RuntimeException( - "Could not create TraceExporter. Ensure exporter-trace is on classpath.", e); - } - - SdkTracerProvider tracerProvider = - SdkTracerProvider.builder() - .addSpanProcessor(BatchSpanProcessor.builder(traceExporter).build()) - .build(); - - OpenTelemetrySdk sdk = OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build(); - - Runtime.getRuntime() - .addShutdownHook( - new Thread( - () -> { - try { - tracerProvider.close(); - } catch (Exception e) { - LOG.warning("Error closing tracer provider: " + e.getMessage()); - } - })); - - return sdk; - } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java index daab178f69dc..23040f345dda 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java @@ -164,8 +164,6 @@ protected boolean removeEldestEntry(Map.Entry> eldes static final int DEFAULT_SWA_ACTIVATION_ROW_COUNT_VALUE = 3; static final String ENABLE_OPENTELEMETRY_PROPERTY_NAME = "EnableOpenTelemetry"; static final boolean DEFAULT_ENABLE_OPENTELEMETRY_VALUE = false; - static final String OPENTELEMETRY_EXPORTER_PROPERTY_NAME = "OpenTelemetryExporter"; - static final String DEFAULT_OPENTELEMETRY_EXPORTER_VALUE = "none"; private static final BigQueryJdbcCustomLogger LOG = new BigQueryJdbcCustomLogger(BigQueryJdbcUrlUtility.class.getName()); static final String FILTER_TABLES_ON_DEFAULT_DATASET_PROPERTY_NAME = @@ -617,12 +615,6 @@ protected boolean removeEldestEntry(Map.Entry> eldes .setDescription( "Enables or disables OpenTelemetry features in the Driver. Disabled by default.") .setDefaultValue(String.valueOf(DEFAULT_ENABLE_OPENTELEMETRY_VALUE)) - .build(), - BigQueryConnectionProperty.newBuilder() - .setName(OPENTELEMETRY_EXPORTER_PROPERTY_NAME) - .setDescription( - "Specifies the auto-configured OpenTelemetry exporter (e.g., gcp).") - .setDefaultValue(DEFAULT_OPENTELEMETRY_EXPORTER_VALUE) .build()))); private static final List NETWORK_PROPERTIES = diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java index 3c2aa5e70a04..e1bb06625a0e 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java @@ -20,6 +20,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.opentelemetry.api.OpenTelemetry; import java.io.PrintWriter; import java.sql.Connection; import java.sql.DriverManager; @@ -114,7 +115,7 @@ public class DataSource implements javax.sql.DataSource { private Long connectionPoolSize; private Long listenerPoolSize; private Boolean enableOpenTelemetry; - private String openTelemetryExporter; + private OpenTelemetry customOpenTelemetry; // Make sure the JDBC driver class is loaded. static { @@ -332,9 +333,6 @@ public class DataSource implements javax.sql.DataSource { ds.setEnableOpenTelemetry( BigQueryJdbcUrlUtility.convertIntToBoolean( val, BigQueryJdbcUrlUtility.ENABLE_OPENTELEMETRY_PROPERTY_NAME))) - .put( - BigQueryJdbcUrlUtility.OPENTELEMETRY_EXPORTER_PROPERTY_NAME, - DataSource::setOpenTelemetryExporter) .build(); public static DataSource fromUrl(String url) { @@ -386,7 +384,11 @@ public Connection getConnection() throws SQLException { throw new BigQueryJdbcException( "The URL " + getURL() + " is invalid. Please specify a valid Connection URL. "); } - return DriverManager.getConnection(getURL(), createProperties()); + Properties props = createProperties(); + if (this.customOpenTelemetry != null) { + props.put("customOpenTelemetry", this.customOpenTelemetry); + } + return DriverManager.getConnection(getURL(), props); } private Properties createProperties() { @@ -632,10 +634,6 @@ private Properties createProperties() { BigQueryJdbcUrlUtility.ENABLE_OPENTELEMETRY_PROPERTY_NAME, String.valueOf(this.enableOpenTelemetry)); } - if (this.openTelemetryExporter != null) { - connectionProperties.setProperty( - BigQueryJdbcUrlUtility.OPENTELEMETRY_EXPORTER_PROPERTY_NAME, this.openTelemetryExporter); - } return connectionProperties; } @@ -767,14 +765,12 @@ public void setEnableOpenTelemetry(Boolean enableOpenTelemetry) { this.enableOpenTelemetry = enableOpenTelemetry; } - public String getOpenTelemetryExporter() { - return openTelemetryExporter != null - ? openTelemetryExporter - : BigQueryJdbcUrlUtility.DEFAULT_OPENTELEMETRY_EXPORTER_VALUE; + public OpenTelemetry getCustomOpenTelemetry() { + return customOpenTelemetry; } - public void setOpenTelemetryExporter(String openTelemetryExporter) { - this.openTelemetryExporter = openTelemetryExporter; + public void setCustomOpenTelemetry(OpenTelemetry customOpenTelemetry) { + this.customOpenTelemetry = customOpenTelemetry; } public void setHighThroughputMinTableSize(Integer highThroughputMinTableSize) { From 27016c0d52b55800f4ef05f8f501df942ae4b5f3 Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Tue, 24 Mar 2026 23:51:44 +0000 Subject: [PATCH 10/10] fix: lint --- java-bigquery/google-cloud-bigquery-jdbc/pom.xml | 3 --- 1 file changed, 3 deletions(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml index 80376924cab1..a4710371d965 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml +++ b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml @@ -166,8 +166,6 @@ google-cloud-bigquery-parent 2.63.0-SNAPSHOT - - com.google.cloud @@ -293,7 +291,6 @@ opentelemetry-context - com.google.truth