diff --git a/core/src/main/java/com/google/adk/models/chat/ChatCompletionsHttpClient.java b/core/src/main/java/com/google/adk/models/chat/ChatCompletionsHttpClient.java
new file mode 100644
index 000000000..62393485d
--- /dev/null
+++ b/core/src/main/java/com/google/adk/models/chat/ChatCompletionsHttpClient.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.adk.models.chat;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.adk.JsonBaseModel;
+import com.google.adk.models.LlmRequest;
+import com.google.adk.models.LlmResponse;
+import com.google.common.collect.ImmutableMap;
+import com.google.genai.types.HttpOptions;
+import io.reactivex.rxjava3.core.BackpressureStrategy;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.FlowableEmitter;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import okhttp3.Call;
+import okhttp3.Callback;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An HTTP client for interacting with OpenAI-compatible chat completions endpoints.
+ *
+ *
Supports both non-streaming responses (single {@link LlmResponse} emission) and streaming
+ * Server-Sent Events (SSE) responses (multiple incremental {@link LlmResponse} emissions). See the
+ * OpenAI Chat Completions API
+ * reference for the wire protocol.
+ */
+public class ChatCompletionsHttpClient {
+ private static final Logger logger = LoggerFactory.getLogger(ChatCompletionsHttpClient.class);
+ private static final ObjectMapper objectMapper = JsonBaseModel.getMapper();
+
+ private static final MediaType JSON = MediaType.get("application/json; charset=utf-8");
+
+ /**
+ * Default OkHttp call timeout used when the caller does not supply an {@link HttpOptions}
+ * timeout. Five minutes is long enough for most non-streaming completions and short enough to
+ * prevent indefinite hangs in the common case where the caller does not configure timeouts.
+ * Callers who need infinite (e.g. long batch jobs or open streams) can opt in by passing an
+ * {@link HttpOptions} with {@code timeout() == 0}.
+ */
+ private static final Duration DEFAULT_CALL_TIMEOUT = Duration.ofMinutes(5);
+
+ /**
+ * Shared OkHttpClient instance whose connection pool and thread dispatcher are reused across all
+ * {@link ChatCompletionsHttpClient} instances. Each instance forks this client via {@link
+ * OkHttpClient#newBuilder()} to apply per-instance timeouts without leaking pools.
+ */
+ private static final OkHttpClient SHARED_POOL_CLIENT = new OkHttpClient();
+
+ private final OkHttpClient client;
+ private final String baseUrl;
+ private final ImmutableMap headers;
+
+ /**
+ * Constructs a new {@link ChatCompletionsHttpClient} that facilitates API interaction with the
+ * standard {@code /chat/completions} REST endpoint.
+ *
+ * @param baseUrl The base URL of the chat completions endpoint. Must not be {@code null}. The
+ * trailing {@code /chat/completions} path is appended automatically if not already present.
+ * @param headers The headers to include in the outgoing requests. May be {@code null}, in which
+ * case no extra headers are added. The {@code Content-Type} header is set automatically and
+ * cannot be overridden by this map.
+ * @param httpOptions Optional HTTP configuration. A missing timeout defaults to 5 minutes ({@link
+ * #DEFAULT_CALL_TIMEOUT}). A timeout of 0 means no timeout (infinite wait).
+ */
+ public ChatCompletionsHttpClient(
+ String baseUrl, Map headers, HttpOptions httpOptions) {
+ this.baseUrl = Objects.requireNonNull(baseUrl, "baseUrl cannot be null");
+ // Defensive copy of caller-supplied headers; null is treated as no extra headers.
+ this.headers = headers == null ? ImmutableMap.of() : ImmutableMap.copyOf(headers);
+
+ // Apply custom timeouts per instance. All internal timeouts are bounded by callTimeout.
+ OkHttpClient.Builder builder = SHARED_POOL_CLIENT.newBuilder();
+ builder.connectTimeout(Duration.ZERO);
+ builder.readTimeout(Duration.ZERO);
+ builder.writeTimeout(Duration.ZERO);
+ builder.callTimeout(resolveCallTimeout(httpOptions));
+ this.client = builder.build();
+ }
+
+ /** Resolves the call timeout from HttpOptions. */
+ private static Duration resolveCallTimeout(HttpOptions httpOptions) {
+ if (httpOptions == null || httpOptions.timeout().isEmpty()) {
+ return DEFAULT_CALL_TIMEOUT;
+ }
+ long timeoutMs = httpOptions.timeout().get();
+ // 0 is treated as no timeout (Duration.ZERO).
+ return timeoutMs == 0L ? Duration.ZERO : Duration.ofMillis(timeoutMs);
+ }
+
+ /**
+ * Generates a conversational response from the chat completions endpoint based on the provided
+ * messages. This encapsulates building the HTTP payload, sending the request to the completions
+ * endpoint, and initiating the handling of the response data.
+ *
+ * @param llmRequest The request containing the model, configuration, and sequence of messages.
+ * @param stream Whether to request a streaming response.
+ * @return A {@link Flowable} emitting the discrete (or combined) {@link LlmResponse} objects.
+ */
+ public Flowable generateContent(LlmRequest llmRequest, boolean stream) {
+ return Flowable.defer(
+ () -> {
+ ChatCompletionsRequest dtoRequest =
+ ChatCompletionsRequest.fromLlmRequest(llmRequest, stream);
+ String jsonPayload = objectMapper.writeValueAsString(dtoRequest);
+ logger.trace(
+ "Chat Completion Request: model={}, stream={}, messagesCount={}",
+ dtoRequest.model,
+ dtoRequest.stream,
+ dtoRequest.messages != null ? dtoRequest.messages.size() : 0);
+
+ Request.Builder requestBuilder =
+ new Request.Builder()
+ .url(
+ baseUrl.endsWith("/")
+ ? baseUrl + "chat/completions"
+ : baseUrl + "/chat/completions")
+ .post(RequestBody.create(jsonPayload, JSON));
+
+ for (Map.Entry entry : headers.entrySet()) {
+ requestBuilder.addHeader(entry.getKey(), entry.getValue());
+ }
+ // Defensively force Content-Type to JSON by replacing instead of appending.
+ requestBuilder.header("Content-Type", JSON.toString());
+
+ Request request = requestBuilder.build();
+ if (stream) {
+ return createStreamingFlowable(request);
+ } else {
+ return createNonStreamingFlowable(request);
+ }
+ });
+ }
+
+ /** Placeholder for streaming responses. Errors with {@link UnsupportedOperationException}. */
+ @SuppressWarnings("UnusedVariable")
+ private Flowable createStreamingFlowable(Request request) {
+ return Flowable.error(
+ new UnsupportedOperationException("Streaming is not yet implemented in this client."));
+ }
+
+ /**
+ * Wraps an OkHttp {@link Callback} in a reactive {@link Flowable} for single-turn, non-streaming
+ * responses.
+ */
+ private Flowable createNonStreamingFlowable(Request request) {
+ return Flowable.create(
+ emitter -> {
+ Call call = client.newCall(request);
+ emitter.setCancellable(call::cancel);
+ call.enqueue(new NonStreamingCallback(emitter));
+ },
+ BackpressureStrategy.BUFFER);
+ }
+
+ /**
+ * Handles OkHttp failure and success callbacks, pushing {@link LlmResponse} results to the given
+ * emitter.
+ */
+ private static final class NonStreamingCallback implements Callback {
+ private final FlowableEmitter emitter;
+
+ NonStreamingCallback(FlowableEmitter emitter) {
+ this.emitter = emitter;
+ }
+
+ @Override
+ public void onFailure(Call call, IOException e) {
+ emitter.tryOnError(e);
+ }
+
+ @Override
+ public void onResponse(Call call, Response response) {
+ try (ResponseBody body = response.body()) {
+ if (!response.isSuccessful()) {
+ String bodyStr = body != null ? body.string() : "";
+ emitter.tryOnError(
+ new IOException("Unexpected code " + response + " - body: " + bodyStr));
+ return;
+ }
+ if (body == null) {
+ emitter.tryOnError(new IOException("Empty response body"));
+ return;
+ }
+
+ String jsonResponse = body.string();
+ ChatCompletionsResponse.ChatCompletion completion =
+ objectMapper.readValue(jsonResponse, ChatCompletionsResponse.ChatCompletion.class);
+ emitter.onNext(completion.toLlmResponse());
+ emitter.onComplete();
+ } catch (Exception e) {
+ emitter.tryOnError(e);
+ }
+ }
+ }
+}
diff --git a/core/src/test/java/com/google/adk/models/chat/ChatCompletionsHttpClientTest.java b/core/src/test/java/com/google/adk/models/chat/ChatCompletionsHttpClientTest.java
new file mode 100644
index 000000000..0c05d004e
--- /dev/null
+++ b/core/src/test/java/com/google/adk/models/chat/ChatCompletionsHttpClientTest.java
@@ -0,0 +1,444 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.adk.models.chat;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.adk.JsonBaseModel;
+import com.google.adk.models.LlmRequest;
+import com.google.adk.models.LlmResponse;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.genai.types.Content;
+import com.google.genai.types.FinishReason;
+import com.google.genai.types.HttpOptions;
+import com.google.genai.types.Part;
+import io.reactivex.rxjava3.subscribers.TestSubscriber;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import okhttp3.Call;
+import okhttp3.Callback;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import okio.Buffer;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+@RunWith(JUnit4.class)
+public final class ChatCompletionsHttpClientTest {
+ private static final ObjectMapper objectMapper = JsonBaseModel.getMapper();
+ private static final MediaType JSON = MediaType.get("application/json");
+
+ /**
+ * Bounded wait for {@link TestSubscriber#await(long, TimeUnit)} so a buggy callback wiring cannot
+ * hang the test JVM. The mock callbacks fire synchronously in the same thread, so this value is
+ * intentionally short -- on a successful run the await returns in microseconds, and on a hung run
+ * we fail fast instead of stalling the test suite.
+ */
+ private static final long AWAIT_TIMEOUT_MILLIS = 500L;
+
+ @Rule public final MockitoRule mocks = MockitoJUnit.rule();
+
+ @Mock private OkHttpClient mockHttpClient;
+ @Mock private Call mockCall;
+
+ private ChatCompletionsHttpClient client;
+
+ @Before
+ public void setUp() throws Exception {
+ client = new ChatCompletionsHttpClient("https://example.com/", ImmutableMap.of(), null);
+ swapInMockHttpClient(client);
+ }
+
+ /**
+ * Reflectively replaces the production {@link OkHttpClient} on a {@link
+ * ChatCompletionsHttpClient} with the test's mock so callbacks can be captured. Used by both
+ * setUp and tests that construct their own client (e.g. timeout tests, header tests).
+ */
+ private void swapInMockHttpClient(ChatCompletionsHttpClient target) throws Exception {
+ when(mockHttpClient.newCall(any())).thenReturn(mockCall);
+ Field clientField = ChatCompletionsHttpClient.class.getDeclaredField("client");
+ clientField.setAccessible(true);
+ clientField.set(target, mockHttpClient);
+ }
+
+ private Response createMockResponse(String body, MediaType mediaType) {
+ return createMockResponse(body, mediaType, 200, "OK");
+ }
+
+ private Response createMockResponse(String body, MediaType mediaType, int code, String message) {
+ Response.Builder builder =
+ new Response.Builder()
+ .request(new Request.Builder().url("https://example.com/chat/completions").build())
+ .protocol(Protocol.HTTP_1_1)
+ .code(code)
+ .message(message);
+ // OkHttp's Response.Builder rejects a null body via its Kotlin @NotNull contract; omit
+ // the body() call entirely to model an empty/null response body.
+ if (body != null) {
+ builder.body(ResponseBody.create(body, mediaType));
+ }
+ return builder.build();
+ }
+
+ /** Returns a minimal {@link LlmRequest} suitable for tests that don't care about the payload. */
+ private static LlmRequest minimalRequest() {
+ return LlmRequest.builder()
+ .model("gpt-4")
+ .contents(ImmutableList.of(Content.builder().parts(Part.fromText("hello")).build()))
+ .build();
+ }
+
+ @Test
+ public void generateContent_nonStreaming_sendsCorrectPayload() throws Exception {
+ String responseBody =
+ """
+ {
+ "choices": [
+ {
+ "message": {
+ "role": "assistant",
+ "content": "Hi"
+ },
+ "finish_reason": "stop"
+ }
+ ]
+ }
+ """;
+
+ Response mockResponse = createMockResponse(responseBody, JSON);
+
+ ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class);
+ doNothing().when(mockCall).enqueue(callbackCaptor.capture());
+
+ TestSubscriber testSubscriber =
+ client.generateContent(minimalRequest(), false).test();
+
+ callbackCaptor.getValue().onResponse(mockCall, mockResponse);
+ testSubscriber.await(AWAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ LlmResponse response = testSubscriber.values().get(0);
+
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Request.class);
+ verify(mockHttpClient).newCall(requestCaptor.capture());
+ Request capturedRequest = requestCaptor.getValue();
+ assertThat(capturedRequest.url().encodedPath()).isEqualTo("/chat/completions");
+
+ Buffer buffer = new Buffer();
+ capturedRequest.body().writeTo(buffer);
+ JsonNode requestBodyJson = objectMapper.readTree(buffer.readUtf8());
+ assertThat(requestBodyJson.get("model").asText()).isEqualTo("gpt-4");
+ assertThat(requestBodyJson.get("messages").get(0).get("role").asText()).isEqualTo("user");
+ assertThat(requestBodyJson.get("messages").get(0).get("content").asText()).isEqualTo("hello");
+
+ LlmResponse expectedResponse =
+ LlmResponse.builder()
+ .content(
+ Content.builder()
+ .role("model")
+ .parts(ImmutableList.of(Part.fromText("Hi")))
+ .build())
+ .finishReason(new FinishReason(FinishReason.Known.STOP.toString()))
+ .customMetadata(ImmutableList.of())
+ .build();
+
+ assertThat(response).isEqualTo(expectedResponse);
+ }
+
+ @Test
+ public void generateContent_nonStreaming_propagateFailure() throws Exception {
+ ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class);
+ doNothing().when(mockCall).enqueue(callbackCaptor.capture());
+
+ TestSubscriber testSubscriber =
+ client.generateContent(minimalRequest(), false).test();
+
+ callbackCaptor.getValue().onFailure(mockCall, new IOException("Network Error"));
+ testSubscriber.await(AWAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ testSubscriber.assertError(IOException.class);
+ }
+
+ // -- New tests for fixes #5, #6, #9, and additional coverage. --------------------------
+
+ /**
+ * Verifies that an HTTP error status (e.g. 500) propagates as a stream error and that the error
+ * message includes the response body so callers can debug. Covers the {@code
+ * !response.isSuccessful()} branch of the non-streaming path. The streaming counterpart lives in
+ * the streaming follow-up CL.
+ */
+ @Test
+ public void generateContent_nonStreaming_propagatesHttpErrorStatus() throws Exception {
+ Response mockResponse =
+ createMockResponse("{\"error\":\"server exploded\"}", JSON, 500, "Internal Server Error");
+
+ ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class);
+ doNothing().when(mockCall).enqueue(callbackCaptor.capture());
+
+ TestSubscriber testSubscriber =
+ client.generateContent(minimalRequest(), false).test();
+
+ callbackCaptor.getValue().onResponse(mockCall, mockResponse);
+ testSubscriber.await(AWAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ testSubscriber.assertError(
+ e ->
+ e instanceof IOException
+ && e.getMessage().contains("Unexpected code")
+ && e.getMessage().contains("server exploded"));
+ }
+
+ /**
+ * Verifies that an empty response body propagates as a stream error rather than silently emitting
+ * an empty value. The exact exception class depends on OkHttp's behavior:
+ *
+ *
+ * - If OkHttp produces a {@code null} body, our code surfaces an {@link IOException} with the
+ * message {@code "Empty response body"}.
+ *
- If OkHttp produces an empty (non-null) body, Jackson surfaces a {@link
+ * com.fasterxml.jackson.databind.exc.MismatchedInputException} ("No content to map").
+ *
+ *
+ * Both outcomes satisfy the contract: empty body must NOT silently produce a successful empty
+ * {@link LlmResponse}.
+ */
+ @Test
+ public void generateContent_nonStreaming_propagatesEmptyBody() throws Exception {
+ Response mockResponse = createMockResponse(null, JSON);
+
+ ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class);
+ doNothing().when(mockCall).enqueue(callbackCaptor.capture());
+
+ TestSubscriber testSubscriber =
+ client.generateContent(minimalRequest(), false).test();
+
+ callbackCaptor.getValue().onResponse(mockCall, mockResponse);
+ testSubscriber.await(AWAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ testSubscriber.assertNoValues();
+ testSubscriber.assertError(Throwable.class);
+ }
+
+ /**
+ * Verifies that caller-supplied headers reach the wire on the captured {@link Request}. This is
+ * the most common production failure mode (missing or wrong Authorization header), so it gets its
+ * own test rather than being implicit in other tests.
+ */
+ @Test
+ public void generateContent_sendsCustomHeaders() throws Exception {
+ ChatCompletionsHttpClient clientWithHeaders =
+ new ChatCompletionsHttpClient(
+ "https://example.com/",
+ ImmutableMap.of("Authorization", "Bearer test-token", "X-Custom", "value"),
+ null);
+ swapInMockHttpClient(clientWithHeaders);
+
+ String responseBody =
+ """
+ {"choices":[{"message":{"role":"assistant","content":"Hi"},"finish_reason":"stop"}]}
+ """;
+ Response mockResponse = createMockResponse(responseBody, JSON);
+
+ ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class);
+ doNothing().when(mockCall).enqueue(callbackCaptor.capture());
+
+ TestSubscriber testSubscriber =
+ clientWithHeaders.generateContent(minimalRequest(), false).test();
+
+ callbackCaptor.getValue().onResponse(mockCall, mockResponse);
+ testSubscriber.await(AWAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Request.class);
+ verify(mockHttpClient).newCall(requestCaptor.capture());
+ Request capturedRequest = requestCaptor.getValue();
+ assertThat(capturedRequest.header("Authorization")).isEqualTo("Bearer test-token");
+ assertThat(capturedRequest.header("X-Custom")).isEqualTo("value");
+ // Verifies fix #5: Content-Type is forced to application/json regardless of caller input.
+ assertThat(capturedRequest.header("Content-Type")).contains("application/json");
+ }
+
+ /**
+ * Verifies fix #5: even when a caller passes a conflicting {@code Content-Type} header, the
+ * client overrides it with {@code application/json} so the upstream API does not reject the
+ * request as a malformed payload.
+ */
+ @Test
+ public void generateContent_overridesCallerContentType() throws Exception {
+ ChatCompletionsHttpClient clientWithBadHeader =
+ new ChatCompletionsHttpClient(
+ "https://example.com/", ImmutableMap.of("Content-Type", "text/plain"), null);
+ swapInMockHttpClient(clientWithBadHeader);
+
+ String responseBody =
+ """
+ {"choices":[{"message":{"role":"assistant","content":"Hi"},"finish_reason":"stop"}]}
+ """;
+ Response mockResponse = createMockResponse(responseBody, JSON);
+
+ ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class);
+ doNothing().when(mockCall).enqueue(callbackCaptor.capture());
+
+ TestSubscriber testSubscriber =
+ clientWithBadHeader.generateContent(minimalRequest(), false).test();
+
+ callbackCaptor.getValue().onResponse(mockCall, mockResponse);
+ testSubscriber.await(AWAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Request.class);
+ verify(mockHttpClient).newCall(requestCaptor.capture());
+ Request capturedRequest = requestCaptor.getValue();
+ // Should be exactly one Content-Type header, not two.
+ assertThat(capturedRequest.headers("Content-Type")).hasSize(1);
+ assertThat(capturedRequest.header("Content-Type")).contains("application/json");
+ }
+
+ /**
+ * Verifies that a {@code baseUrl} without a trailing slash still produces the correct {@code
+ * /chat/completions} path. Covers the false branch of the trailing-slash check in {@link
+ * ChatCompletionsHttpClient#generateContent}.
+ */
+ @Test
+ public void generateContent_handlesBaseUrlWithoutTrailingSlash() throws Exception {
+ ChatCompletionsHttpClient clientNoSlash =
+ new ChatCompletionsHttpClient("https://example.com", ImmutableMap.of(), null);
+ swapInMockHttpClient(clientNoSlash);
+
+ String responseBody =
+ """
+ {"choices":[{"message":{"role":"assistant","content":"Hi"},"finish_reason":"stop"}]}
+ """;
+ Response mockResponse = createMockResponse(responseBody, JSON);
+
+ ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class);
+ doNothing().when(mockCall).enqueue(callbackCaptor.capture());
+
+ TestSubscriber testSubscriber =
+ clientNoSlash.generateContent(minimalRequest(), false).test();
+
+ callbackCaptor.getValue().onResponse(mockCall, mockResponse);
+ testSubscriber.await(AWAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Request.class);
+ verify(mockHttpClient).newCall(requestCaptor.capture());
+ assertThat(requestCaptor.getValue().url().encodedPath()).isEqualTo("/chat/completions");
+ }
+
+ /**
+ * Verifies fix #9: passing {@code null} for the headers parameter is treated as no extra headers,
+ * not as an NPE.
+ */
+ @Test
+ public void constructor_nullHeaders_isTreatedAsEmpty() throws Exception {
+ ChatCompletionsHttpClient clientWithNullHeaders =
+ new ChatCompletionsHttpClient("https://example.com/", null, null);
+ swapInMockHttpClient(clientWithNullHeaders);
+
+ String responseBody =
+ """
+ {"choices":[{"message":{"role":"assistant","content":"Hi"},"finish_reason":"stop"}]}
+ """;
+ Response mockResponse = createMockResponse(responseBody, JSON);
+
+ ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class);
+ doNothing().when(mockCall).enqueue(callbackCaptor.capture());
+
+ TestSubscriber testSubscriber =
+ clientWithNullHeaders.generateContent(minimalRequest(), false).test();
+
+ callbackCaptor.getValue().onResponse(mockCall, mockResponse);
+ testSubscriber.await(AWAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ testSubscriber.assertNoErrors();
+ testSubscriber.assertValueCount(1);
+ }
+
+ // -- Fix #6: tri-state timeout policy. --------------------------------------------------
+
+ /**
+ * Verifies fix #6 case (a): when {@code httpOptions} is {@code null}, the client applies the
+ * 5-minute default call timeout to prevent indefinite hangs in callers that did not explicitly
+ * configure a timeout.
+ */
+ @Test
+ public void constructor_nullHttpOptions_appliesDefaultFiveMinuteTimeout() {
+ ChatCompletionsHttpClient defaultClient =
+ new ChatCompletionsHttpClient("https://example.com/", ImmutableMap.of(), null);
+
+ OkHttpClient internal = readInternalClient(defaultClient);
+ assertThat(internal.callTimeoutMillis())
+ .isEqualTo((int) Duration.ofMinutes(5).toMillis()); // 300_000
+ }
+
+ /**
+ * Verifies fix #6 case (b): when the caller explicitly sets {@code httpOptions.timeout() == 0},
+ * the client respects this as the explicit opt-in to infinite hang. This is the migration path
+ * for long-running streams or batch jobs that need no timeout.
+ */
+ @Test
+ public void constructor_zeroTimeout_respectsInfiniteHang() {
+ HttpOptions zeroTimeout = HttpOptions.builder().timeout(0).build();
+ ChatCompletionsHttpClient infiniteClient =
+ new ChatCompletionsHttpClient("https://example.com/", ImmutableMap.of(), zeroTimeout);
+
+ OkHttpClient internal = readInternalClient(infiniteClient);
+ assertThat(internal.callTimeoutMillis()).isEqualTo(0); // OkHttp: 0 = no timeout
+ }
+
+ /**
+ * Verifies fix #6 case (c): when the caller sets a positive timeout, that value (in milliseconds)
+ * is used as the call timeout.
+ */
+ @Test
+ public void constructor_explicitTimeout_appliesIt() {
+ HttpOptions tenSeconds = HttpOptions.builder().timeout(10_000).build();
+ ChatCompletionsHttpClient timedClient =
+ new ChatCompletionsHttpClient("https://example.com/", ImmutableMap.of(), tenSeconds);
+
+ OkHttpClient internal = readInternalClient(timedClient);
+ assertThat(internal.callTimeoutMillis()).isEqualTo(10_000);
+ }
+
+ /** Reflectively reads the internal {@link OkHttpClient} to inspect the resolved timeout. */
+ private static OkHttpClient readInternalClient(ChatCompletionsHttpClient target) {
+ try {
+ Field clientField = ChatCompletionsHttpClient.class.getDeclaredField("client");
+ clientField.setAccessible(true);
+ return (OkHttpClient) clientField.get(target);
+ } catch (ReflectiveOperationException e) {
+ throw new LinkageError("Failed to read internal client", e);
+ }
+ }
+}