From a5a1f808e07ce82b2555a743e30ebc93a3113295 Mon Sep 17 00:00:00 2001 From: Google Team Member Date: Fri, 27 Feb 2026 02:25:12 -0800 Subject: [PATCH] feat: BigQueryAgentAnalyticsPlugin Initial version of a new BigQueryAgentAnalyticsPlugin which logs agent events to BigQuery using the Storage Write API. PiperOrigin-RevId: 876143930 --- .../adk/plugins/bigquery/AnalyticsWriter.java | 16 + .../adk/plugins/bigquery/AnsiColors.java | 17 + .../adk/plugins/bigquery/BatchProcessor.java | 96 +++++ .../BigQueryAgentAnalyticsPlugin.java | 367 ++++++++++++++++++ .../plugins/bigquery/BatchProcessorTest.java | 113 ++++++ .../BigQueryAgentAnalyticsPluginTest.java | 190 +++++++++ .../plugins/bigquery/FakeAnalyticsWriter.java | 32 ++ 7 files changed, 831 insertions(+) create mode 100644 core/src/main/java/com/google/adk/plugins/bigquery/AnalyticsWriter.java create mode 100644 core/src/main/java/com/google/adk/plugins/bigquery/AnsiColors.java create mode 100644 core/src/main/java/com/google/adk/plugins/bigquery/BatchProcessor.java create mode 100644 core/src/main/java/com/google/adk/plugins/bigquery/BigQueryAgentAnalyticsPlugin.java create mode 100644 core/src/test/java/com/google/adk/plugins/bigquery/BatchProcessorTest.java create mode 100644 core/src/test/java/com/google/adk/plugins/bigquery/BigQueryAgentAnalyticsPluginTest.java create mode 100644 core/src/test/java/com/google/adk/plugins/bigquery/FakeAnalyticsWriter.java diff --git a/core/src/main/java/com/google/adk/plugins/bigquery/AnalyticsWriter.java b/core/src/main/java/com/google/adk/plugins/bigquery/AnalyticsWriter.java new file mode 100644 index 000000000..273ff4b9d --- /dev/null +++ b/core/src/main/java/com/google/adk/plugins/bigquery/AnalyticsWriter.java @@ -0,0 +1,16 @@ +package com.google.adk.plugins.bigquery; + +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.List; + +/** Agent analytics writer interface. */ +public interface AnalyticsWriter { + void start() throws IOException; + + boolean isReady(); + + void writeBatch(List> batch); + + void close(); +} diff --git a/core/src/main/java/com/google/adk/plugins/bigquery/AnsiColors.java b/core/src/main/java/com/google/adk/plugins/bigquery/AnsiColors.java new file mode 100644 index 000000000..469fb25e8 --- /dev/null +++ b/core/src/main/java/com/google/adk/plugins/bigquery/AnsiColors.java @@ -0,0 +1,17 @@ +package com.google.adk.plugins.bigquery; + +/** ANSI colors for printing to the console. */ +public final class AnsiColors { + private AnsiColors() {} + + public static final String RESET = "\u001B[0m"; + public static final String RED = "\u001B[31m"; + public static final String GREEN = "\u001B[32m"; + public static final String BLUE = "\u001B[34m"; + public static final String MAGENTA = "\u001B[35m"; + public static final String PURPLE = "\u001B[35m"; + public static final String YELLOW = "\u001B[33m"; + public static final String CYAN = "\u001B[36m"; + public static final String WHITE = "\u001B[37m"; + public static final String BOLD = "\u001B[1m"; +} diff --git a/core/src/main/java/com/google/adk/plugins/bigquery/BatchProcessor.java b/core/src/main/java/com/google/adk/plugins/bigquery/BatchProcessor.java new file mode 100644 index 000000000..59fe58c6b --- /dev/null +++ b/core/src/main/java/com/google/adk/plugins/bigquery/BatchProcessor.java @@ -0,0 +1,96 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.adk.plugins.bigquery; + +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Processes analytics events in batches and writes them using the configured AnalyticsWriter. */ +public class BatchProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(BatchProcessor.class); + + private final AnalyticsWriter analyticsWriter; + private final int batchSize; + private final long flushIntervalMs; + private final long shutdownTimeoutMs; + private final BlockingQueue> queue; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + public BatchProcessor( + AnalyticsWriter analyticsWriter, + int batchSize, + long flushIntervalMs, + int queueMaxSize, + long shutdownTimeoutMs) { + this.analyticsWriter = analyticsWriter; + this.batchSize = batchSize; + this.flushIntervalMs = flushIntervalMs; + this.shutdownTimeoutMs = shutdownTimeoutMs; + this.queue = new LinkedBlockingQueue<>(queueMaxSize); + } + + public void start() throws IOException { + synchronized (analyticsWriter) { + analyticsWriter.start(); + } + var unused = + scheduler.scheduleAtFixedRate( + this::flush, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS); + } + + public void shutdown() { + scheduler.shutdown(); + try { + scheduler.awaitTermination(shutdownTimeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOGGER.warn("Interrupted while waiting for scheduler to terminate", e); + } + synchronized (analyticsWriter) { + analyticsWriter.close(); + } + } + + public void enqueue(ImmutableMap row) { + if (!queue.offer(row)) { + LOGGER.warn("Queue full, dropping event"); + } + } + + private void flush() { + synchronized (analyticsWriter) { + if (!analyticsWriter.isReady()) { + LOGGER.warn("Analytics writer is not ready, skipping flush"); + return; + } + List> batch = new ArrayList<>(); + queue.drainTo(batch, batchSize); + if (batch.isEmpty()) { + return; + } + + analyticsWriter.writeBatch(batch); + } + } +} diff --git a/core/src/main/java/com/google/adk/plugins/bigquery/BigQueryAgentAnalyticsPlugin.java b/core/src/main/java/com/google/adk/plugins/bigquery/BigQueryAgentAnalyticsPlugin.java new file mode 100644 index 000000000..68bf95b9c --- /dev/null +++ b/core/src/main/java/com/google/adk/plugins/bigquery/BigQueryAgentAnalyticsPlugin.java @@ -0,0 +1,367 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.adk.plugins.bigquery; + +import com.google.adk.Version; +import com.google.adk.agents.BaseAgent; +import com.google.adk.agents.CallbackContext; +import com.google.adk.agents.InvocationContext; +import com.google.adk.models.LlmRequest; +import com.google.adk.models.LlmResponse; +import com.google.adk.plugins.BasePlugin; +import com.google.adk.tools.BaseTool; +import com.google.adk.tools.ToolContext; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import com.google.genai.types.Content; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Maybe; +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.jspecify.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * BigQuery Agent Analytics Plugin. + * + *

Logs agent events to BigQuery using the provided {@link AnalyticsWriter} implementation. + */ +public class BigQueryAgentAnalyticsPlugin extends BasePlugin { + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryAgentAnalyticsPlugin.class); + private static final String TRACER_NAME = "google.adk.plugins.bigquery_agent_analytics"; + + private final Tracer tracer = + GlobalOpenTelemetry.getTracer(TRACER_NAME, Version.JAVA_ADK_VERSION); + private final BigQueryLoggerConfig config; + private final BatchProcessor batchProcessor; + + // Trace management + private final ThreadLocal> spanStack = ThreadLocal.withInitial(ArrayList::new); + private final ThreadLocal> scopeStack = ThreadLocal.withInitial(ArrayList::new); + + public BigQueryAgentAnalyticsPlugin(AnalyticsWriter analyticsWriter) throws IOException { + this(analyticsWriter, BigQueryLoggerConfig.builder().build()); + } + + public BigQueryAgentAnalyticsPlugin(AnalyticsWriter analyticsWriter, BigQueryLoggerConfig config) + throws IOException { + super("bigquery_agent_analytics"); + this.config = config; + this.batchProcessor = + new BatchProcessor( + analyticsWriter, + config.batchSize, + config.batchFlushIntervalMs, + config.queueMaxSize, + config.shutdownTimeoutMs); + this.batchProcessor.start(); + } + + /** Configuration for BigQuery logging. */ + public static class BigQueryLoggerConfig { + public final boolean enabled; + public final ImmutableList eventAllowlist; + public final ImmutableList eventDenylist; + public final int batchSize; + public final long batchFlushIntervalMs; + public final int maxContentLength; // 0 means no truncation + public final int queueMaxSize; + public final long shutdownTimeoutMs; + + private BigQueryLoggerConfig(Builder builder) { + this.enabled = builder.enabled; + this.eventAllowlist = ImmutableList.copyOf(builder.eventAllowlist); + this.eventDenylist = ImmutableList.copyOf(builder.eventDenylist); + this.batchSize = builder.batchSize; + this.batchFlushIntervalMs = builder.batchFlushIntervalMs; + this.maxContentLength = builder.maxContentLength; + this.queueMaxSize = builder.queueMaxSize; + this.shutdownTimeoutMs = builder.shutdownTimeoutMs; + } + + public static Builder builder() { + return new Builder(); + } + + /** Builder for {@link BigQueryLoggerConfig}. */ + public static class Builder { + private boolean enabled = true; + private int batchSize = 10; + private long batchFlushIntervalMs = 1000; + private List eventAllowlist = ImmutableList.of(); + private List eventDenylist = ImmutableList.of(); + private int maxContentLength = 0; + private int queueMaxSize = 10000; + private long shutdownTimeoutMs = 10000; + + private Builder() {} + + @CanIgnoreReturnValue + public Builder setEnabled(boolean enabled) { + this.enabled = enabled; + return this; + } + + @CanIgnoreReturnValue + public Builder setBatchSize(int batchSize) { + this.batchSize = batchSize; + return this; + } + + @CanIgnoreReturnValue + public Builder setBatchFlushIntervalMs(long batchFlushIntervalMs) { + this.batchFlushIntervalMs = batchFlushIntervalMs; + return this; + } + + @CanIgnoreReturnValue + public Builder setEventAllowlist(List eventAllowlist) { + this.eventAllowlist = eventAllowlist; + return this; + } + + @CanIgnoreReturnValue + public Builder setEventDenylist(List eventDenylist) { + this.eventDenylist = eventDenylist; + return this; + } + + @CanIgnoreReturnValue + public Builder setMaxContentLength(int maxContentLength) { + this.maxContentLength = maxContentLength; + return this; + } + + public BigQueryLoggerConfig build() { + return new BigQueryLoggerConfig(this); + } + } + } + + // --- Trace Helpers --- + + @SuppressWarnings("MustBeClosed") + private void pushSpan(String name) { + Span span = tracer.spanBuilder(name).startSpan(); + Scope scope = span.makeCurrent(); + spanStack.get().add(span); + scopeStack.get().add(scope); + } + + private void popSpan() { + List spans = spanStack.get(); + List scopes = scopeStack.get(); + + if (!spans.isEmpty()) { + Span span = spans.remove(spans.size() - 1); + span.end(); + } + if (!scopes.isEmpty()) { + Scope scope = scopes.remove(scopes.size() - 1); + scope.close(); + } + } + + private @Nullable String getCurrentSpanId() { + List spans = spanStack.get(); + if (!spans.isEmpty()) { + SpanContext ctx = spans.get(spans.size() - 1).getSpanContext(); + if (ctx.isValid()) { + return ctx.getSpanId(); + } + } + return null; + } + + private @Nullable String getCurrentTraceId() { + List spans = spanStack.get(); + if (!spans.isEmpty()) { + SpanContext ctx = spans.get(spans.size() - 1).getSpanContext(); + if (ctx.isValid()) { + return ctx.getTraceId(); + } + } + return null; + } + + @Override + public Completable close() { + return Completable.fromAction( + () -> { + batchProcessor.shutdown(); + }) + .andThen(super.close()); + } + + // --- Callbacks --- + + @Override + public Maybe beforeRunCallback(InvocationContext invocationContext) { + return Maybe.fromAction( + () -> { + pushSpan("invocation_" + invocationContext.invocationId()); + logEvent( + "INVOCATION_STARTING", invocationContext.agent().name(), null, invocationContext); + }); + } + + @Override + public Completable afterRunCallback(InvocationContext invocationContext) { + return Completable.fromAction( + () -> { + logEvent( + "INVOCATION_COMPLETED", invocationContext.agent().name(), null, invocationContext); + popSpan(); + }); + } + + @Override + public Maybe beforeAgentCallback(BaseAgent agent, CallbackContext callbackContext) { + return Maybe.fromAction( + () -> { + pushSpan("agent_" + agent.name()); + logEvent("AGENT_STARTING", agent.name(), null, callbackContext); + }); + } + + @Override + public Maybe afterAgentCallback(BaseAgent agent, CallbackContext callbackContext) { + return Maybe.fromAction( + () -> { + logEvent("AGENT_COMPLETED", agent.name(), null, callbackContext); + popSpan(); + }); + } + + @Override + public Maybe beforeModelCallback( + CallbackContext callbackContext, LlmRequest.Builder llmRequest) { + return Maybe.fromAction( + () -> { + pushSpan("llm_request"); + LlmRequest request = llmRequest.build(); + logEvent( + "LLM_REQUEST", + callbackContext.agentName(), + ImmutableMap.of("model", request.model().orElse("default")), + callbackContext); + }); + } + + @Override + public Maybe afterModelCallback( + CallbackContext callbackContext, LlmResponse llmResponse) { + return Maybe.fromAction( + () -> { + logEvent("LLM_RESPONSE", callbackContext.agentName(), ImmutableMap.of(), callbackContext); + popSpan(); + }); + } + + @Override + public Maybe> beforeToolCallback( + BaseTool tool, Map toolArgs, ToolContext toolContext) { + return Maybe.fromAction( + () -> { + pushSpan("tool_" + tool.name()); + logEvent( + "TOOL_CALL", + toolContext.agentName(), + ImmutableMap.of("tool_name", tool.name(), "arguments", toolArgs), + toolContext); + }); + } + + @Override + public Maybe> afterToolCallback( + BaseTool tool, + Map toolArgs, + ToolContext toolContext, + Map result) { + return Maybe.fromAction( + () -> { + logEvent( + "TOOL_RESPONSE", + toolContext.agentName(), + ImmutableMap.of("tool_name", tool.name(), "result", result), + toolContext); + popSpan(); + }); + } + + private void logEvent( + String eventType, String agentName, ImmutableMap content, Object context) { + if (!config.enabled) { + return; + } + + if (!config.eventAllowlist.isEmpty() && !config.eventAllowlist.contains(eventType)) { + return; + } + + if (config.eventDenylist.contains(eventType)) { + return; + } + + ImmutableMap.Builder row = ImmutableMap.builder(); + row.put("timestamp", Instant.now().toString()); // ISO-8601 + row.put("event_type", eventType); + row.put("agent", agentName); + String traceId = getCurrentTraceId(); + if (traceId != null) { + row.put("trace_id", traceId); + } + String spanId = getCurrentSpanId(); + if (spanId != null) { + row.put("span_id", spanId); + } + + if (context instanceof InvocationContext ctx) { + row.put("session_id", ctx.session().id()); + row.put("invocation_id", ctx.invocationId()); + row.put("user_id", ctx.userId()); + } else if (context instanceof CallbackContext ctx) { + row.put("invocation_id", ctx.invocationId()); + } + + if (content != null) { + String str = content.toString(); + String truncated = truncateContent(str, config.maxContentLength); + row.put("content", truncated); + row.put("is_truncated", truncated.length() < str.length()); + } + + batchProcessor.enqueue(row.buildOrThrow()); + } + + private static String truncateContent(String content, int maxLength) { + if (maxLength > 0 && content.length() > maxLength) { + int truncateAt = Math.max(0, maxLength - 13); // "...[TRUNCATED]" length is 13 + return content.substring(0, truncateAt) + "...[TRUNCATED]"; + } + return content; + } +} diff --git a/core/src/test/java/com/google/adk/plugins/bigquery/BatchProcessorTest.java b/core/src/test/java/com/google/adk/plugins/bigquery/BatchProcessorTest.java new file mode 100644 index 000000000..0455185eb --- /dev/null +++ b/core/src/test/java/com/google/adk/plugins/bigquery/BatchProcessorTest.java @@ -0,0 +1,113 @@ +package com.google.adk.plugins.bigquery; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.common.collect.ImmutableMap; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public final class BatchProcessorTest { + + private FakeAnalyticsWriter fakeWriter; + private BatchProcessor processor; + + @Before + public void setUp() { + fakeWriter = new FakeAnalyticsWriter(); + processor = new BatchProcessor(fakeWriter, 2, 50, 2, 100); + } + + @After + public void tearDown() { + if (processor != null) { + processor.shutdown(); + } + } + + @Test + public void start_startsWriter() throws Exception { + processor.start(); + assertThat(fakeWriter.isReady()).isTrue(); + } + + private void waitForFlush(int waitMs) throws InterruptedException { + long startTime = System.currentTimeMillis(); + while (fakeWriter.batch.isEmpty() && System.currentTimeMillis() - startTime < waitMs) { + Thread.sleep(10); + } + } + + @Test + public void flush_writesBatchIfReady() throws Exception { + processor.start(); + + ImmutableMap event1 = ImmutableMap.of("k1", "v1"); + ImmutableMap event2 = ImmutableMap.of("k2", "v2"); + + processor.enqueue(event1); + processor.enqueue(event2); + + waitForFlush(1000); + + assertThat(fakeWriter.batch).containsExactly(event1, event2); + } + + @Test + public void flush_doesNotWriteIfNotReady() throws Exception { + processor.start(); + // Force writer to not be ready after it was started + fakeWriter.ready = false; + + processor.enqueue(ImmutableMap.of("k1", "v1")); + + // Wait a bit to ensure flush would have happened + Thread.sleep(150); + + assertThat(fakeWriter.batch).isEmpty(); + } + + @Test + public void flush_doesNotWriteIfQueueEmpty() throws Exception { + processor.start(); + + // No events enqueued + + // Wait a bit to ensure flush would have happened + Thread.sleep(150); + + assertThat(fakeWriter.batch).isEmpty(); + } + + @Test + public void enqueue_dropsWhenQueueFull() throws Exception { + processor.shutdown(); // Stop the default instance from setUp + + // Queue max size = 1, interval = 200ms + processor = new BatchProcessor(fakeWriter, 2, 200, 1, 100); + processor.start(); + + ImmutableMap event1 = ImmutableMap.of("k1", "v1"); + ImmutableMap event2 = ImmutableMap.of("k2", "v2"); + + // The first event succeeds + processor.enqueue(event1); + // The second event should be dropped because queue is full + processor.enqueue(event2); + + waitForFlush(1000); + + // Only event1 was kept, event2 was dropped + assertThat(fakeWriter.batch).containsExactly(event1); + } + + @Test + public void shutdown_cancelsSchedulerAndClosesWriter() throws Exception { + processor.start(); + processor.shutdown(); + assertThat(fakeWriter.isReady()).isFalse(); + } +} diff --git a/core/src/test/java/com/google/adk/plugins/bigquery/BigQueryAgentAnalyticsPluginTest.java b/core/src/test/java/com/google/adk/plugins/bigquery/BigQueryAgentAnalyticsPluginTest.java new file mode 100644 index 000000000..daca83780 --- /dev/null +++ b/core/src/test/java/com/google/adk/plugins/bigquery/BigQueryAgentAnalyticsPluginTest.java @@ -0,0 +1,190 @@ +package com.google.adk.plugins.bigquery; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.adk.agents.CallbackContext; +import com.google.adk.agents.InvocationContext; +import com.google.adk.models.LlmRequest; +import com.google.adk.models.LlmResponse; +import com.google.adk.sessions.Session; +import com.google.adk.testing.TestBaseAgent; +import com.google.adk.tools.BaseTool; +import com.google.adk.tools.ToolContext; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.reactivex.rxjava3.core.Flowable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public final class BigQueryAgentAnalyticsPluginTest { + private FakeAnalyticsWriter writer; + private BigQueryAgentAnalyticsPlugin plugin; + private InvocationContext invocationContext; + private CallbackContext callbackContext; + private TestBaseAgent agent; + private BaseTool tool; + private ToolContext toolContext; + private LlmRequest.Builder llmRequestBuilder; + private LlmResponse llmResponse; + + @Before + public void setUp() throws Exception { + writer = new FakeAnalyticsWriter(); + BigQueryAgentAnalyticsPlugin.BigQueryLoggerConfig config = + BigQueryAgentAnalyticsPlugin.BigQueryLoggerConfig.builder() + .setBatchSize(1) // flush immediately + .setBatchFlushIntervalMs(1) + .build(); + plugin = new BigQueryAgentAnalyticsPlugin(writer, config); + + invocationContext = mock(InvocationContext.class); + when(invocationContext.invocationId()).thenReturn("inv-123"); + when(invocationContext.userId()).thenReturn("user-1"); + Session session = Session.builder("session-1").build(); + when(invocationContext.session()).thenReturn(session); + + agent = + new TestBaseAgent( + "agent-1", + "description-1", + ImmutableList.of(), + ImmutableList.of(), + () -> Flowable.empty()); + when(invocationContext.agent()).thenReturn(agent); + + callbackContext = mock(CallbackContext.class); + when(callbackContext.invocationId()).thenReturn("inv-123"); + when(callbackContext.agentName()).thenReturn("agent-1"); + + tool = mock(BaseTool.class); + when(tool.name()).thenReturn("tool-1"); + + toolContext = mock(ToolContext.class); + when(toolContext.invocationId()).thenReturn("inv-123"); + when(toolContext.agentName()).thenReturn("agent-1"); + + llmRequestBuilder = LlmRequest.builder().model("gemini-1.5"); + llmResponse = LlmResponse.builder().build(); + } + + @After + public void tearDown() throws Exception { + if (plugin != null) { + plugin.close().test().await(); + } + } + + @Test + public void beforeRunCallback_logsInvocationStarting() throws Exception { + plugin.beforeRunCallback(invocationContext).test().await().assertComplete(); + Thread.sleep(200); + + assertThat(writer.batch).hasSize(1); + assertThat(writer.batch.get(0)).containsEntry("event_type", "INVOCATION_STARTING"); + assertThat(writer.batch.get(0)).containsEntry("agent", "agent-1"); + assertThat(writer.batch.get(0)).containsEntry("invocation_id", "inv-123"); + assertThat(writer.batch.get(0)).containsEntry("session_id", "session-1"); + assertThat(writer.batch.get(0)).containsEntry("user_id", "user-1"); + } + + @Test + public void afterRunCallback_logsInvocationCompleted() throws Exception { + plugin.afterRunCallback(invocationContext).test().await().assertComplete(); + Thread.sleep(200); + + assertThat(writer.batch).hasSize(1); + assertThat(writer.batch.get(0)).containsEntry("event_type", "INVOCATION_COMPLETED"); + } + + @Test + public void beforeAgentCallback_logsAgentStarting() throws Exception { + plugin.beforeAgentCallback(agent, callbackContext).test().await().assertComplete(); + Thread.sleep(200); + + assertThat(writer.batch).hasSize(1); + assertThat(writer.batch.get(0)).containsEntry("event_type", "AGENT_STARTING"); + assertThat(writer.batch.get(0)).containsEntry("agent", "agent-1"); + } + + @Test + public void afterAgentCallback_logsAgentCompleted() throws Exception { + plugin.afterAgentCallback(agent, callbackContext).test().await().assertComplete(); + Thread.sleep(200); + + assertThat(writer.batch).hasSize(1); + assertThat(writer.batch.get(0)).containsEntry("event_type", "AGENT_COMPLETED"); + } + + @Test + public void beforeModelCallback_logsLlmRequest() throws Exception { + plugin.beforeModelCallback(callbackContext, llmRequestBuilder).test().await().assertComplete(); + Thread.sleep(200); + + assertThat(writer.batch).hasSize(1); + assertThat(writer.batch.get(0)).containsEntry("event_type", "LLM_REQUEST"); + assertThat(writer.batch.get(0)).containsKey("content"); + } + + @Test + public void afterModelCallback_logsLlmResponse() throws Exception { + plugin.afterModelCallback(callbackContext, llmResponse).test().await().assertComplete(); + Thread.sleep(200); + + assertThat(writer.batch).hasSize(1); + assertThat(writer.batch.get(0)).containsEntry("event_type", "LLM_RESPONSE"); + } + + @Test + public void beforeToolCallback_logsToolCall() throws Exception { + Map args = new HashMap<>(); + args.put("key", "val"); + plugin.beforeToolCallback(tool, args, toolContext).test().await().assertComplete(); + Thread.sleep(200); + + assertThat(writer.batch).hasSize(1); + assertThat(writer.batch.get(0)).containsEntry("event_type", "TOOL_CALL"); + assertThat(writer.batch.get(0)).containsKey("content"); + } + + @Test + public void afterToolCallback_logsToolResponse() throws Exception { + Map args = new HashMap<>(); + Map result = new HashMap<>(); + plugin.afterToolCallback(tool, args, toolContext, result).test().await().assertComplete(); + Thread.sleep(200); + + assertThat(writer.batch).hasSize(1); + assertThat(writer.batch.get(0)).containsEntry("event_type", "TOOL_RESPONSE"); + } + + @Test + public void configLimits_areRespected() throws Exception { + BigQueryAgentAnalyticsPlugin.BigQueryLoggerConfig config = + BigQueryAgentAnalyticsPlugin.BigQueryLoggerConfig.builder() + .setBatchSize(1) + .setBatchFlushIntervalMs(1) + .setMaxContentLength(20) + .build(); + plugin = new BigQueryAgentAnalyticsPlugin(writer, config); + + Map args = new HashMap<>(); + args.put("key", "a_very_long_string_that_should_be_truncated_now"); + plugin.beforeToolCallback(tool, args, toolContext).test().await().assertComplete(); + Thread.sleep(200); + + assertThat(writer.batch).hasSize(1); + String content = (String) writer.batch.get(0).get("content"); + assertThat(content).endsWith("...[TRUNCATED]"); + } +} diff --git a/core/src/test/java/com/google/adk/plugins/bigquery/FakeAnalyticsWriter.java b/core/src/test/java/com/google/adk/plugins/bigquery/FakeAnalyticsWriter.java new file mode 100644 index 000000000..8fac20209 --- /dev/null +++ b/core/src/test/java/com/google/adk/plugins/bigquery/FakeAnalyticsWriter.java @@ -0,0 +1,32 @@ +package com.google.adk.plugins.bigquery; + +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class FakeAnalyticsWriter implements AnalyticsWriter { + boolean ready = false; + List> batch = new ArrayList<>(); + + @Override + public void start() throws IOException { + batch.clear(); + ready = true; + } + + @Override + public void writeBatch(List> inputBatch) { + batch.addAll(inputBatch); + } + + @Override + public boolean isReady() { + return ready; + } + + @Override + public void close() { + ready = false; + } +}