diff --git a/api/src/main/java/io/grpc/ClientStreamTracer.java b/api/src/main/java/io/grpc/ClientStreamTracer.java index 8e11e781e7c..d654fd33358 100644 --- a/api/src/main/java/io/grpc/ClientStreamTracer.java +++ b/api/src/main/java/io/grpc/ClientStreamTracer.java @@ -57,6 +57,45 @@ public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadat public void createPendingStream() { } + /** + * Called when an attempt-level delay segment (such as waiting for a load balancing pick or + * connection establishment) starts. + * + *

This method is invoked synchronously on the attempt thread. Implementations should start + * internal timers or child tracing spans (named strictly {@code "Attempt Delay"}) carrying the + * canonical {@code grpc.delay_type} attribute. + * + * @param delayType canonical low-cardinality label categorizing the delay (e.g., "connecting") + * @param delayReason high-cardinality diagnostic string describing granular runtime conditions + * @since 1.82.0 + */ + public void recordAttemptDelayStart(String delayType, String delayReason) { + } + + /** + * Called when an attempt-level delay reason changes while the overall delay type remains + * constant (for example, when a priority load balancing policy fails over between tiers). + * + *

Implementations should record structured events (such as {@code "Delay state transition"}) + * on the active delay span without recreating the span or resetting cumulative timers. + * + * @param delayReason updated high-cardinality diagnostic string describing new conditions + * @since 1.82.0 + */ + public void recordAttemptDelayReasonChanged(String delayReason) { + } + + /** + * Called when an attempt-level delay segment ends upon successful pick or stream creation. + * + *

Implementations should simultaneously close active child tracing spans and record elapsed + * duration to the {@code grpc.client.attempt.delay.duration} histogram. + * + * @since 1.82.0 + */ + public void recordAttemptDelayEnd() { + } + /** * Headers has been sent to the socket. */ diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 5dd44a492ee..e5c3d053ee7 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -547,25 +547,32 @@ public static final class PickResult { // True if the result is created by withDrop() private final boolean drop; @Nullable private final String authorityOverride; + @Nullable private final String delayType; + @Nullable private final String delayReason; private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop) { - this.subchannel = subchannel; - this.streamTracerFactory = streamTracerFactory; - this.status = checkNotNull(status, "status"); - this.drop = drop; - this.authorityOverride = null; + this(subchannel, streamTracerFactory, status, drop, null, null, null); } private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop, @Nullable String authorityOverride) { + this(subchannel, streamTracerFactory, status, drop, authorityOverride, null, null); + } + + private PickResult( + @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, + Status status, boolean drop, @Nullable String authorityOverride, + @Nullable String delayType, @Nullable String delayReason) { this.subchannel = subchannel; this.streamTracerFactory = streamTracerFactory; this.status = checkNotNull(status, "status"); this.drop = drop; this.authorityOverride = authorityOverride; + this.delayType = delayType; + this.delayReason = delayReason; } /** @@ -677,7 +684,7 @@ public static PickResult withSubchannel(Subchannel subchannel) { */ public PickResult copyWithSubchannel(Subchannel subchannel) { return new PickResult(checkNotNull(subchannel, "subchannel"), streamTracerFactory, - status, drop, authorityOverride); + status, drop, authorityOverride, delayType, delayReason); } /** @@ -688,7 +695,9 @@ public PickResult copyWithSubchannel(Subchannel subchannel) { */ public PickResult copyWithStreamTracerFactory( @Nullable ClientStreamTracer.Factory streamTracerFactory) { - return new PickResult(subchannel, streamTracerFactory, status, drop, authorityOverride); + return new PickResult( + subchannel, streamTracerFactory, status, drop, authorityOverride, delayType, + delayReason); } /** @@ -725,6 +734,31 @@ public static PickResult withNoResult() { return NO_RESULT; } + /** + * No decision could be made. The RPC will stay buffered with a specific delay type and reason. + * + * @param delayType low-cardinality root cause label (e.g., "connecting") + * @param delayReason high-cardinality diagnostic string for trace events + * @since 1.82.0 + */ + public static PickResult withNoResult(String delayType, String delayReason) { + Preconditions.checkNotNull(delayType, "delayType"); + Preconditions.checkNotNull(delayReason, "delayReason"); + return new PickResult(null, null, Status.OK, false, null, delayType, delayReason); + } + + /** Returns the delay type label if any. */ + @Nullable + public String getDelayType() { + return delayType; + } + + /** Returns the diagnostic delay reason if any. */ + @Nullable + public String getDelayReason() { + return delayReason; + } + /** Returns the authority override if any. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11656") @Nullable diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 5569e1eecf8..3c0550229c3 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -37,6 +37,7 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedHashSet; +import java.util.Objects; import java.util.concurrent.Executor; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -157,7 +158,9 @@ public final ClientStream newStream( synchronized (lock) { PickerState newerState = pickerState; if (state == newerState) { - return createPendingStream(args, tracers, pickResult); + String delayType = determineQueuingDelayType(pickResult); + String delayReason = determineQueuingDelayReason(pickResult); + return createPendingStream(args, tracers, pickResult, delayType, delayReason); } state = newerState; } @@ -173,8 +176,8 @@ public final ClientStream newStream( */ @GuardedBy("lock") private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, - PickResult pickResult) { - PendingStream pendingStream = new PendingStream(args, tracers); + PickResult pickResult, @Nullable String delayType, @Nullable String delayReason) { + PendingStream pendingStream = new PendingStream(args, tracers, delayType, delayReason); if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) { pendingStream.lastPickStatus = pickResult.getStatus(); } @@ -245,7 +248,7 @@ public final void shutdownNow(Status status) { } if (savedReportTransportTerminated != null) { for (PendingStream stream : savedPendingStreams) { - Runnable runnable = stream.setStream( + Runnable runnable = stream.setStreamAndEndDelay( new FailingClientStream(status, RpcProgress.REFUSED, stream.tracers)); if (runnable != null) { // Drain in-line instead of using an executor as failing stream just throws everything @@ -303,6 +306,7 @@ final void reprocess(@Nullable SubchannelPicker picker) { final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, callOptions.isWaitForReady()); if (transport != null) { + stream.endDelay(); Executor executor = defaultAppExecutor; // createRealStream may be expensive. It will start real streams on the transport. If // there are pending requests, they will be serialized too, which may be expensive. Since @@ -315,7 +319,11 @@ final void reprocess(@Nullable SubchannelPicker picker) { executor.execute(runnable); } toRemove.add(stream); - } // else: stay pending + } else { // stay pending + String delayType = determineQueuingDelayType(pickResult); + String delayReason = determineQueuingDelayReason(pickResult); + stream.updateDelay(delayType, delayReason); + } } synchronized (lock) { @@ -356,16 +364,108 @@ public InternalLogId getLogId() { return logId; } + private static String determineQueuingDelayType(@Nullable PickResult pickResult) { + if (pickResult == null) { + return "connecting"; + } + if (pickResult.getSubchannel() != null) { + return "subchannel_state_mismatch"; + } + if (!pickResult.getStatus().isOk()) { + return "picker_failing_with_wait_for_ready"; + } + if (pickResult.getDelayType() != null) { + return pickResult.getDelayType(); + } + return "connecting"; + } + + private static String determineQueuingDelayReason(@Nullable PickResult pickResult) { + if (pickResult == null) { + return "client channel: waiting for picker"; + } + if (pickResult.getSubchannel() != null) { + return "subchannel returned by LB picker has no connected subchannel"; + } + if (!pickResult.getStatus().isOk()) { + return "wait_for_ready RPC failed with status: " + pickResult.getStatus(); + } + if (pickResult.getDelayReason() != null) { + return pickResult.getDelayReason(); + } + return "client channel: waiting for picker"; + } + private class PendingStream extends DelayedStream { private final PickSubchannelArgs args; private final Context context = Context.current(); private final ClientStreamTracer[] tracers; private volatile Status lastPickStatus; + @Nullable private String activeDelayType; + @Nullable private String activeDelayReason; - private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) { + private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, + @Nullable String initialType, @Nullable String initialReason) { super("connecting_and_lb"); this.args = args; this.tracers = tracers; + this.activeDelayType = initialType; + this.activeDelayReason = initialReason; + if (initialType != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.recordAttemptDelayStart(initialType, initialReason != null ? initialReason : ""); + } + } + } + + /** + * Updates active attempt delay telemetry state upon load balancing state transitions. + * + *

If {@code newType} differs from the active delay type, active segment timers and child + * spans are ended and a new segment is initiated. If only {@code newReason} changes, a + * structured transition event is appended to the active span without span re-creation. + */ + void updateDelay(@Nullable String newType, @Nullable String newReason) { + if (!Objects.equals(activeDelayType, newType)) { + // Delay type changed (e.g., from RLS lookup to connecting). End the previous delay. + if (activeDelayType != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.recordAttemptDelayEnd(); + } + } + activeDelayType = newType; + activeDelayReason = null; + if (newType != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.recordAttemptDelayStart(newType, newReason != null ? newReason : ""); + } + } + } + if (newType != null && newReason != null && !Objects.equals(activeDelayReason, newReason)) { + // Delay type is unchanged, but the reason changed (e.g., priority failover). + activeDelayReason = newReason; + for (ClientStreamTracer tracer : tracers) { + tracer.recordAttemptDelayReasonChanged(newReason); + } + } + } + + /** + * Ends active attempt delay segment telemetry upon stream creation or stream cancellation. + */ + void endDelay() { + if (activeDelayType != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.recordAttemptDelayEnd(); + } + activeDelayType = null; + activeDelayReason = null; + } + } + + Runnable setStreamAndEndDelay(ClientStream stream) { + endDelay(); + return setStream(stream); } /** Runnable may be null. */ @@ -386,11 +486,12 @@ private Runnable createRealStream(ClientTransport transport, String authorityOve // been called on the delayed stream. realStream.setAuthority(authorityOverride); } - return setStream(realStream); + return setStreamAndEndDelay(realStream); } @Override public void cancel(Status reason) { + endDelay(); super.cancel(reason); synchronized (lock) { if (reportTransportTerminated != null) { diff --git a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java index e7679ea14cc..3ecdcdfbcaa 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java +++ b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java @@ -39,6 +39,21 @@ public void createPendingStream() { delegate().createPendingStream(); } + @Override + public void recordAttemptDelayStart(String delayType, String delayReason) { + delegate().recordAttemptDelayStart(delayType, delayReason); + } + + @Override + public void recordAttemptDelayReasonChanged(String delayReason) { + delegate().recordAttemptDelayReasonChanged(delayReason); + } + + @Override + public void recordAttemptDelayEnd() { + delegate().recordAttemptDelayEnd(); + } + @Override public void outboundHeaders() { delegate().outboundHeaders(); diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index ab60a024e1f..6c880aeb353 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -167,7 +167,10 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { if (noOldAddrs) { // Make tests happy; they don't properly assume starting in CONNECTING rawConnectivityState = CONNECTING; - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState( + CONNECTING, + new FixedResultPicker( + PickResult.withNoResult("connecting", "pick_first: address list updated"))); } if (rawConnectivityState == READY) { @@ -340,10 +343,13 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo // the current address of a valid index exists. if ((!enableHappyEyeballs && !addressIndex.isValid()) || (addressIndex.isValid() && !subchannels.containsKey( - addressIndex.getCurrentAddress()))) { + addressIndex.getCurrentAddress()))) { addressIndex.seekTo(getAddress(subchannelData.subchannel)); } - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState( + CONNECTING, + new FixedResultPicker( + PickResult.withNoResult("connecting", "pick_first: attempting to connect"))); break; case READY: @@ -668,7 +674,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { if (connectionRequested.compareAndSet(false, true)) { helper.getSynchronizationContext().execute(pickFirstLeafLoadBalancer::requestConnection); } - return PickResult.withNoResult(); + return PickResult.withNoResult( + "connecting", "pick_first: requesting connection"); } } diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index cf4b4c94e04..705cfdbc1dc 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -38,6 +38,8 @@ * list and sticking to the first that works. */ final class PickFirstLoadBalancer extends LoadBalancer { + private static final PickResult CONNECTING_RESULT = + PickResult.withNoResult("connecting", "pick_first: attempting to connect"); private final Helper helper; private Subchannel subchannel; private ConnectivityState currentState = IDLE; @@ -83,7 +85,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) { // The channel state does not get updated when doing name resolving today, so for the moment // let LB report CONNECTION and call subchannel.requestConnection() immediately. - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(connectingResult())); subchannel.requestConnection(); } else { subchannel.updateAddresses(servers); @@ -135,7 +137,7 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo case CONNECTING: // It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave // the current picker in-place. But ignoring the potential optimization is simpler. - picker = new FixedResultPicker(PickResult.withNoResult()); + picker = new FixedResultPicker(connectingResult()); break; case READY: picker = new FixedResultPicker(PickResult.withSubchannel(subchannel)); @@ -169,6 +171,10 @@ public void requestConnection() { } } + private PickResult connectingResult() { + return CONNECTING_RESULT; + } + /** Picker that requests connection during the first pick, and returns noResult. */ private final class RequestConnectionPicker extends SubchannelPicker { private final AtomicBoolean connectionRequested = new AtomicBoolean(false); @@ -178,7 +184,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { if (connectionRequested.compareAndSet(false, true)) { helper.getSynchronizationContext().execute(PickFirstLoadBalancer.this::requestConnection); } - return PickResult.withNoResult(); + return PickResult.withNoResult( + "connecting", "pick_first: requesting connection"); } } diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index d7e1d4ca4f6..e88550da24b 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -51,6 +51,10 @@ import io.grpc.StringMarshaller; import io.grpc.SynchronizationContext; import io.grpc.internal.ClientStreamListener.RpcProgress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -772,6 +776,169 @@ public void pendingStream_appendTimeoutInsight_waitForReady_withLastPickFailure( + " connecting_and_lb_delay=[0-9]+ns, was_still_waiting]"); } + @Test + public void streamDelayMetrics() { + FakeStreamTracer fakeTracer = new FakeStreamTracer(); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { fakeTracer }; + + delayedTransport.reprocess(fakePicker( + PickResult.withNoResult("connecting", "pick_first: attempting to connect"))); + delayedTransport.newStream(method, headers, callOptions, customTracers); + + assertEquals(Collections.singletonList("connecting"), fakeTracer.startedDelayTypes); + assertEquals(Collections.singletonList("pick_first: attempting to connect"), + fakeTracer.startedDelayReasons); + + delayedTransport.reprocess(fakePicker( + PickResult.withNoResult("rls_lookup_pending", "RLS request pending."))); + + assertEquals(1, fakeTracer.delayEndedCount); + assertEquals(Arrays.asList("connecting", "rls_lookup_pending"), + fakeTracer.startedDelayTypes); + assertEquals(Arrays.asList("pick_first: attempting to connect", "RLS request pending."), + fakeTracer.startedDelayReasons); + + delayedTransport.reprocess(mockPicker); + + assertEquals(2, fakeTracer.delayEndedCount); + } + + @Test + public void streamDelayMetrics_cancelled() { + FakeStreamTracer fakeTracer = new FakeStreamTracer(); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { fakeTracer }; + + delayedTransport.reprocess(fakePicker( + PickResult.withNoResult("connecting", "pick_first: attempting to connect"))); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + stream.start(streamListener); + + assertEquals(Collections.singletonList("connecting"), fakeTracer.startedDelayTypes); + + stream.cancel(Status.CANCELLED); + + assertEquals(1, fakeTracer.delayEndedCount); + } + + @Test + public void streamDelayMetrics_shutdownNow() { + FakeStreamTracer fakeTracer = new FakeStreamTracer(); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { fakeTracer }; + + delayedTransport.reprocess(fakePicker( + PickResult.withNoResult("connecting", "pick_first: attempting to connect"))); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + stream.start(streamListener); + + assertEquals(Collections.singletonList("connecting"), fakeTracer.startedDelayTypes); + + delayedTransport.shutdownNow(Status.UNAVAILABLE); + + assertEquals(1, fakeTracer.delayEndedCount); + } + + @Test + public void streamDelayMetrics_cadenceReasonUpdate_doesNotStartNewTypeSegment() { + FakeStreamTracer fakeTracer = new FakeStreamTracer(); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { fakeTracer }; + + delayedTransport.reprocess(fakePicker( + PickResult.withNoResult("connecting", "attempt 1"))); + delayedTransport.newStream(method, headers, callOptions, customTracers); + + assertEquals(Collections.singletonList("connecting"), fakeTracer.startedDelayTypes); + assertEquals(Collections.singletonList("attempt 1"), fakeTracer.startedDelayReasons); + + delayedTransport.reprocess(fakePicker( + PickResult.withNoResult("connecting", "attempt 2"))); + + assertEquals(Collections.singletonList("connecting"), fakeTracer.startedDelayTypes); + assertEquals(Collections.singletonList("attempt 1"), fakeTracer.startedDelayReasons); + assertEquals(Collections.singletonList("attempt 2"), fakeTracer.changedDelayReasons); + assertEquals(0, fakeTracer.delayEndedCount); + } + + @Test + public void streamDelayMetrics_channelFallback_clientChannelInit() { + FakeStreamTracer fakeTracer = new FakeStreamTracer(); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { fakeTracer }; + + // No picker reprocessed yet (lastPicker == null) + delayedTransport.newStream(method, headers, callOptions, customTracers); + + assertEquals(Collections.singletonList("connecting"), fakeTracer.startedDelayTypes); + assertEquals(Collections.singletonList("client channel: waiting for picker"), + fakeTracer.startedDelayReasons); + } + + @Test + public void streamDelayMetrics_channelFallback_subchannelStateMismatch() { + FakeStreamTracer fakeTracer = new FakeStreamTracer(); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { fakeTracer }; + + io.grpc.LoadBalancer.Subchannel disconnectedSubchannel = + mock(io.grpc.LoadBalancer.Subchannel.class); + when(disconnectedSubchannel.getInternalSubchannel()) + .thenReturn(newTransportProvider(null)); + + delayedTransport.reprocess(fakePicker(PickResult.withSubchannel(disconnectedSubchannel))); + delayedTransport.newStream(method, headers, callOptions, customTracers); + + assertEquals(Collections.singletonList("subchannel_state_mismatch"), + fakeTracer.startedDelayTypes); + assertEquals(Collections.singletonList( + "subchannel returned by LB picker has no connected subchannel"), + fakeTracer.startedDelayReasons); + } + + @Test + public void streamDelayMetrics_channelFallback_waitForReadyFailed() { + FakeStreamTracer fakeTracer = new FakeStreamTracer(); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { fakeTracer }; + + delayedTransport.reprocess(fakePicker(PickResult.withError(Status.UNAVAILABLE))); + CallOptions wfrOptions = callOptions.withWaitForReady(); + delayedTransport.newStream(method, headers, wfrOptions, customTracers); + + assertEquals(Collections.singletonList("picker_failing_with_wait_for_ready"), + fakeTracer.startedDelayTypes); + assertEquals(Collections.singletonList( + "wait_for_ready RPC failed with status: " + Status.UNAVAILABLE), + fakeTracer.startedDelayReasons); + } + + private static final class FakeStreamTracer extends ClientStreamTracer { + final List startedDelayTypes = new ArrayList<>(); + final List startedDelayReasons = new ArrayList<>(); + final List changedDelayReasons = new ArrayList<>(); + int delayEndedCount = 0; + + @Override + public void recordAttemptDelayStart(String delayType, String delayReason) { + startedDelayTypes.add(delayType); + startedDelayReasons.add(delayReason); + } + + @Override + public void recordAttemptDelayReasonChanged(String delayReason) { + changedDelayReasons.add(delayReason); + } + + @Override + public void recordAttemptDelayEnd() { + delayEndedCount++; + } + } + + private static SubchannelPicker fakePicker(final PickResult result) { + return new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return result; + } + }; + } + private static TransportProvider newTransportProvider(final ClientTransport transport) { return new TransportProvider() { @Override diff --git a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java index 1e130423a45..1c3182237d3 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java @@ -147,8 +147,10 @@ public void pickAfterResolved() throws Exception { verify(mockSubchannel).requestConnection(); // Calling pickSubchannel() twice gave the same result - assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), - pickerCaptor.getValue().pickSubchannel(mockArgs)); + PickResult result = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertThat(result.getDelayType()).isEqualTo("connecting"); + assertThat(result.getDelayReason()).isEqualTo("pick_first: attempting to connect"); + assertEquals(result, pickerCaptor.getValue().pickSubchannel(mockArgs)); verifyNoMoreInteractions(mockHelper); } diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 5ed84ade2f8..024ea84f60e 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -124,7 +124,8 @@ final class GrpclbState { static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() { @Override public PickResult picked(PickSubchannelArgs args) { - return PickResult.withNoResult(); + return PickResult.withNoResult( + "connecting", "grpclb: waiting for backend server list"); } @Override diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java index 87ad61c9f27..1243e0fff59 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java @@ -232,6 +232,17 @@ static OpenTelemetryMetricsResource createMetricInstruments(Meter meter, .build()); } + if (isDelayObservabilityEnabled() + && isMetricEnabled("grpc.client.attempt.delay.duration", enableMetrics, disableDefault)) { + builder.clientAttemptDelayCounter( + meter.histogramBuilder( + "grpc.client.attempt.delay.duration") + .setUnit("s") + .setDescription("Time taken before a client call attempt starts") + .setExplicitBucketBoundariesAdvice(LATENCY_BUCKETS) + .build()); + } + if (isMetricEnabled("grpc.client.attempt.sent_total_compressed_message_size", enableMetrics, disableDefault)) { builder.clientTotalSentCompressedMessageSizeCounter( @@ -349,6 +360,17 @@ static OpenTelemetryMetricsResource createMetricInstruments(Meter meter, return builder.build(); } + /** + * Checks whether experimental client attempt and call delay observability is globally enabled. + * + *

Guarded strictly by the {@code GRPC_EXPERIMENTAL_ENABLE_DELAY_OBSERVABILITY} environment + * variable (defaults to {@code false}). When disabled, delay spans and + * duration histograms are suppressed to avoid runtime overhead. + */ + static boolean isDelayObservabilityEnabled() { + return GrpcUtil.getFlag("GRPC_EXPERIMENTAL_ENABLE_DELAY_OBSERVABILITY", false); + } + static boolean isMetricEnabled(String metricName, Map enableMetrics, boolean disableDefault) { Boolean explicitlyEnabled = enableMetrics.get(metricName); diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index f783b9495dd..008a9754b29 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -50,12 +50,14 @@ import io.grpc.internal.StatsTraceContext.ServerCallMethodListener; import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter; import io.opentelemetry.api.baggage.Baggage; +import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.context.Context; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; @@ -203,6 +205,8 @@ private static final class ClientTracer extends ClientStreamTracer { volatile String backendService; long attemptNanos; Code statusCode; + @Nullable private volatile Stopwatch activeDelayStopwatch; + @Nullable private volatile String activeDelayType; ClientTracer(CallAttemptsTracerFactory attemptsState, OpenTelemetryMetricsModule module, StreamInfo info, String target, String fullMethodName, @@ -216,6 +220,55 @@ private static final class ClientTracer extends ClientStreamTracer { this.stopwatch = module.stopwatchSupplier.get().start(); } + @Override + public void streamCreated(io.grpc.Attributes transportAtts, Metadata headers) { + recordAttemptDelayEnd(); + } + + @Override + public void recordAttemptDelayStart(String delayType, String delayReason) { + if (!GrpcOpenTelemetry.isDelayObservabilityEnabled() + || (activeDelayStopwatch != null && Objects.equals(activeDelayType, delayType))) { + // Do not reset the stopwatch if the delay type is unchanged. + return; + } + recordAttemptDelayEnd(); + activeDelayType = delayType; + activeDelayStopwatch = module.stopwatchSupplier.get().start(); + } + + @Override + public void recordAttemptDelayReasonChanged(String delayReason) { + // Reason strings are high-cardinality diagnostics intended for tracing spans. + } + + @Override + public void recordAttemptDelayEnd() { + Stopwatch delayStopwatch = activeDelayStopwatch; + String delayType = activeDelayType; + if (delayStopwatch != null && delayType != null) { + delayStopwatch.stop(); + long delayNanos = delayStopwatch.elapsed(TimeUnit.NANOSECONDS); + activeDelayStopwatch = null; + activeDelayType = null; + if (module.resource.clientAttemptDelayCounter() != null) { + AttributesBuilder builder = Attributes.builder() + .put(METHOD_KEY, fullMethodName) + .put(TARGET_KEY, target) + .put("grpc.delay_type", delayType); + if (module.customLabelEnabled) { + builder.put( + CUSTOM_LABEL_KEY, info.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL)); + } + for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) { + plugin.addLabels(builder); + } + module.resource.clientAttemptDelayCounter() + .record(delayNanos * SECONDS_PER_NANO, builder.build(), attemptsState.otelContext); + } + } + } + @Override public void inboundHeaders(Metadata headers) { for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) { @@ -262,6 +315,7 @@ public void inboundTrailers(Metadata trailers) { @Override public void streamClosed(Status status) { + recordAttemptDelayEnd(); stopwatch.stop(); attemptNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); Deadline deadline = info.getCallOptions().getDeadline(); diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsResource.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsResource.java index d32ae1e67f5..085498d746e 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsResource.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsResource.java @@ -35,6 +35,9 @@ abstract class OpenTelemetryMetricsResource { @Nullable abstract DoubleHistogram clientAttemptDurationCounter(); + @Nullable + abstract DoubleHistogram clientAttemptDelayCounter(); + @Nullable abstract LongHistogram clientTotalSentCompressedMessageSizeCounter(); @@ -79,6 +82,8 @@ abstract static class Builder { abstract Builder clientAttemptDurationCounter(DoubleHistogram counter); + abstract Builder clientAttemptDelayCounter(DoubleHistogram counter); + abstract Builder clientTotalSentCompressedMessageSizeCounter(LongHistogram counter); abstract Builder clientTotalReceivedCompressedMessageSizeCounter( diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java index d214e99bd75..32aab870f0f 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java @@ -41,6 +41,7 @@ import io.grpc.opentelemetry.internal.OpenTelemetryConstants; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.baggage.Baggage; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; @@ -48,6 +49,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.context.propagation.ContextPropagators; +import java.util.Objects; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.logging.Level; import java.util.logging.Logger; @@ -192,6 +194,8 @@ private final class ClientTracer extends ClientStreamTracer { private final Span parentSpan; volatile int seqNo; boolean isPendingStream; + @Nullable private volatile Span activeDelaySpan; + @Nullable private volatile String activeDelayType; ClientTracer(Span span, Span parentSpan) { this.span = checkNotNull(span, "span"); @@ -200,6 +204,7 @@ private final class ClientTracer extends ClientStreamTracer { @Override public void streamCreated(Attributes transportAtts, Metadata headers) { + recordAttemptDelayEnd(); contextPropagators.getTextMapPropagator().inject(Context.current().with(span), headers, metadataSetter); if (isPendingStream) { @@ -212,6 +217,56 @@ public void createPendingStream() { isPendingStream = true; } + @Override + public void recordAttemptDelayStart(String delayType, String delayReason) { + if (!GrpcOpenTelemetry.isDelayObservabilityEnabled()) { + return; + } + if (activeDelaySpan != null && Objects.equals(activeDelayType, delayType)) { + // Do not recreate the span if the delay type is unchanged (e.g., priority failover). + recordAttemptDelayReasonChanged(delayReason); + return; + } + // Close any previous delay segment before starting a new canonical segment. + recordAttemptDelayEnd(); + activeDelayType = delayType; + // All attempt queuing segments use the strict child span name "Attempt Delay". + Span delaySpan = otelTracer.spanBuilder("Attempt Delay") + .setParent(Context.current().with(span)) + .setAttribute("grpc.delay_type", delayType) + .startSpan(); + activeDelaySpan = delaySpan; + delaySpan.addEvent( + "Delay state transition", + io.opentelemetry.api.common.Attributes.of( + AttributeKey.stringKey("grpc.delay_type"), delayType, + AttributeKey.stringKey("grpc.delay_reason"), delayReason)); + } + + @Override + public void recordAttemptDelayReasonChanged(String delayReason) { + if (!GrpcOpenTelemetry.isDelayObservabilityEnabled() || activeDelaySpan == null) { + return; + } + String type = activeDelayType; + activeDelaySpan.addEvent( + "Delay state transition", + io.opentelemetry.api.common.Attributes.of( + AttributeKey.stringKey("grpc.delay_type"), type != null ? type : "", + AttributeKey.stringKey("grpc.delay_reason"), delayReason)); + } + + @Override + public void recordAttemptDelayEnd() { + Span delaySpan = activeDelaySpan; + if (delaySpan != null) { + // End active child span upon pick completion or transport cancellation. + delaySpan.end(); + activeDelaySpan = null; + activeDelayType = null; + } + } + @Override public void outboundMessageSent( int seqNo, long optionalWireSize, long optionalUncompressedSize) { @@ -238,6 +293,7 @@ public void inboundUncompressedSize(long bytes) { @Override public void streamClosed(io.grpc.Status status) { + recordAttemptDelayEnd(); endSpanWithStatus(span, status); } } diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 7c9db875196..dcda716e771 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -40,11 +40,19 @@ import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; import io.grpc.ClientStreamTracer; +import io.grpc.ConnectivityState; +import io.grpc.EquivalentAddressGroup; import io.grpc.Grpc; import io.grpc.KnownLength; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.NameResolver; +import io.grpc.NameResolverProvider; +import io.grpc.NameResolverRegistry; import io.grpc.Server; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; @@ -54,8 +62,10 @@ import io.grpc.ServiceDescriptor; import io.grpc.Status; import io.grpc.Status.Code; +import io.grpc.StatusOr; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.inprocess.InProcessSocketAddress; import io.grpc.internal.FakeClock; import io.grpc.internal.StatsTraceContext.ServerCallMethodListener; import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter; @@ -79,10 +89,15 @@ import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; import java.io.IOException; import java.io.InputStream; +import java.net.SocketAddress; +import java.net.URI; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -204,6 +219,7 @@ public String parse(InputStream stream) { @Before public void setUp() throws Exception { + System.setProperty("GRPC_EXPERIMENTAL_ENABLE_DELAY_OBSERVABILITY", "true"); testMeter = openTelemetryTesting.getOpenTelemetry() .getMeter(OpenTelemetryConstants.INSTRUMENTATION_SCOPE); @@ -211,6 +227,7 @@ public void setUp() throws Exception { @After public void tearDown() { + System.clearProperty("GRPC_EXPERIMENTAL_ENABLE_DELAY_OBSERVABILITY"); if (channel != null) { channel.shutdownNow(); } @@ -1611,6 +1628,200 @@ public void customLabel_present() { point -> point.hasAttribute(attributeKey, customValue)))); } + @Test + public void clientAttemptDelayDuration_recorded() { + Map enabledMetrics = ImmutableMap.of( + "grpc.client.attempt.delay.duration", true + ); + OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments( + testMeter, enabledMetrics, disableDefaultMetrics); + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); + CallAttemptsTracerFactory callAttemptsTracerFactory = + new CallAttemptsTracerFactory( + module, "target:///", STREAM_INFO.getCallOptions(), method.getFullMethodName(), + emptyList(), Context.root()); + + ClientStreamTracer tracer = + callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); + tracer.recordAttemptDelayStart("connecting", "connecting reason"); + fakeClock.forwardTime(250, TimeUnit.MILLISECONDS); + tracer.recordAttemptDelayEnd(); + + assertThat(openTelemetryTesting.getMetrics()) + .anySatisfy( + metric -> assertThat(metric) + .hasName("grpc.client.attempt.delay.duration") + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> { + point.hasSum(0.25); + point.hasAttribute(METHOD_KEY, method.getFullMethodName()); + point.hasAttribute(TARGET_KEY, "target:///"); + point.hasAttribute( + AttributeKey.stringKey("grpc.delay_type"), "connecting"); + }))); + } + + @Test + public void clientAttemptDelayDuration_endToEnd_inProcessTransport() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + LoadBalancerProvider slowLbProvider = new LoadBalancerProvider() { + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + @Override + public String getPolicyName() { + return "slow_metrics_connecting_policy"; + } + + @Override + public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { + return new LoadBalancer() { + @Override + public Status acceptResolvedAddresses(LoadBalancer.ResolvedAddresses resolvedAddresses) { + helper.updateBalancingState(ConnectivityState.CONNECTING, new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withNoResult("connecting", + "Simulated slow TLS handshake with backend"); + } + }); + latch.countDown(); + return Status.OK; + } + + @Override + public void handleNameResolutionError(Status error) {} + + @Override + public void shutdown() {} + }; + } + }; + LoadBalancerRegistry.getDefaultRegistry().register(slowLbProvider); + + NameResolverProvider customResolverProvider = new NameResolverProvider() { + @Override + protected boolean isAvailable() { + return true; + } + + @Override + protected int priority() { + return 5; + } + + @Override + public String getDefaultScheme() { + return "inprocmetricse2e"; + } + + @Override + public Collection> getProducedSocketAddressTypes() { + return Collections.singleton(InProcessSocketAddress.class); + } + + @Override + public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { + return new NameResolver() { + @Override + public String getServiceAuthority() { + return "inprocmetricse2e"; + } + + @Override + public void start(Listener2 listener) { + listener.onResult(ResolutionResult.newBuilder() + .setAddressesOrError(StatusOr.fromValue(Collections.singletonList( + new EquivalentAddressGroup( + new InProcessSocketAddress("test-metrics-e2e"))))) + .build()); + } + + @Override + public void shutdown() {} + }; + } + }; + NameResolverRegistry.getDefaultRegistry().register(customResolverProvider); + + GrpcOpenTelemetry grpcOpenTelemetry = GrpcOpenTelemetry.newBuilder() + .sdk(openTelemetryTesting.getOpenTelemetry()) + .enableMetrics(Collections.singleton("grpc.client.attempt.delay.duration")) + .build(); + + InProcessChannelBuilder channelBuilder = + InProcessChannelBuilder.forTarget("inprocmetricse2e:///test-metrics-e2e") + .defaultLoadBalancingPolicy("slow_metrics_connecting_policy"); + grpcOpenTelemetry.configureChannelBuilder(channelBuilder); + ManagedChannel channel = channelBuilder.build(); + try { + ClientCall call = channel.newCall(method, CallOptions.DEFAULT); + call.start(new ClientCall.Listener() {}, new Metadata()); + call.request(1); + + latch.await(5, TimeUnit.SECONDS); + Thread.sleep(50); + call.cancel("End test delay segment", null); + } finally { + channel.shutdownNow(); + channel.awaitTermination(5, TimeUnit.SECONDS); + LoadBalancerRegistry.getDefaultRegistry().deregister(slowLbProvider); + NameResolverRegistry.getDefaultRegistry().deregister(customResolverProvider); + } + + assertThat(openTelemetryTesting.getMetrics()) + .anySatisfy( + metric -> assertThat(metric) + .hasName("grpc.client.attempt.delay.duration") + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> { + point.hasAttribute(METHOD_KEY, method.getFullMethodName()); + point.hasAttribute(TARGET_KEY, "inprocmetricse2e:///test-metrics-e2e"); + point.hasAttribute( + AttributeKey.stringKey("grpc.delay_type"), "connecting"); + }))); + } + + @Test + public void clientAttemptDelayStart_featureFlagDisabled_zeroMetrics() { + System.setProperty("GRPC_EXPERIMENTAL_ENABLE_DELAY_OBSERVABILITY", "false"); + try { + Map enabledMetrics = ImmutableMap.of( + "grpc.client.attempt.delay.duration", true + ); + OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments( + testMeter, enabledMetrics, disableDefaultMetrics); + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); + CallAttemptsTracerFactory callAttemptsTracerFactory = + new CallAttemptsTracerFactory( + module, "target:///", STREAM_INFO.getCallOptions(), method.getFullMethodName(), + emptyList(), Context.root()); + + ClientStreamTracer tracer = + callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); + tracer.recordAttemptDelayStart("connecting", "connecting reason"); + fakeClock.forwardTime(250, TimeUnit.MILLISECONDS); + tracer.recordAttemptDelayEnd(); + + assertThat(openTelemetryTesting.getMetrics()) + .extracting("name") + .doesNotContain("grpc.client.attempt.delay.duration"); + } finally { + System.setProperty("GRPC_EXPERIMENTAL_ENABLE_DELAY_OBSERVABILITY", "true"); + } + } + @Test public void serverBasicMetrics() { OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java index e6759aadb1e..9a723ea1e60 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java @@ -42,10 +42,18 @@ import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; import io.grpc.ClientStreamTracer; +import io.grpc.ConnectivityState; +import io.grpc.EquivalentAddressGroup; import io.grpc.KnownLength; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.NameResolver; +import io.grpc.NameResolverProvider; +import io.grpc.NameResolverRegistry; import io.grpc.NoopServerCall; import io.grpc.Server; import io.grpc.ServerCall; @@ -55,14 +63,17 @@ import io.grpc.ServerServiceDefinition; import io.grpc.ServerStreamTracer; import io.grpc.Status; +import io.grpc.StatusOr; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.inprocess.InProcessSocketAddress; import io.grpc.opentelemetry.OpenTelemetryTracingModule.CallAttemptsTracerFactory; import io.grpc.opentelemetry.internal.OpenTelemetryConstants; import io.grpc.testing.GrpcCleanupRule; import io.grpc.testing.GrpcServerRule; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.baggage.Baggage; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanBuilder; import io.opentelemetry.api.trace.SpanContext; @@ -83,9 +94,16 @@ import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; import java.io.InputStream; +import java.net.SocketAddress; +import java.net.URI; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -184,6 +202,7 @@ public String parse(InputStream stream) { @Before public void setUp() { + System.setProperty("GRPC_EXPERIMENTAL_ENABLE_DELAY_OBSERVABILITY", "true"); tracerRule = openTelemetryRule.getOpenTelemetry().getTracer( OpenTelemetryConstants.INSTRUMENTATION_SCOPE); TracerProvider mockTracerProvider = mock(TracerProvider.class); @@ -199,6 +218,11 @@ public void setUp() { when(mockTracer.spanBuilder(any())).thenReturn(mockSpanBuilder); } + @After + public void tearDown() { + System.clearProperty("GRPC_EXPERIMENTAL_ENABLE_DELAY_OBSERVABILITY"); + } + // Use mock instead of OpenTelemetryRule to verify inOrder and propagator. @Test public void clientBasicTracingMocking() { @@ -381,6 +405,221 @@ public void clientBasicTracingRule() { assertEquals(attemptSpanData.hasEnded(), true); } + @Test + public void clientAttemptDelayTracing_reasonChangedInvariant() { + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( + openTelemetryRule.getOpenTelemetry()); + Span clientSpan = tracerRule.spanBuilder("test-client-span").startSpan(); + CallAttemptsTracerFactory callTracer = + tracingModule.newClientCallTracer(clientSpan, method); + ClientStreamTracer clientStreamTracer = + callTracer.newClientStreamTracer(STREAM_INFO, new Metadata()); + + clientStreamTracer.recordAttemptDelayStart("connecting", "reason1"); + clientStreamTracer.recordAttemptDelayReasonChanged("reason2"); + clientStreamTracer.recordAttemptDelayStart("connecting", "reason3"); + clientStreamTracer.recordAttemptDelayEnd(); + clientStreamTracer.streamClosed(Status.OK); + callTracer.callEnded(Status.OK); + clientSpan.end(); + + List spans = openTelemetryRule.getSpans(); + assertEquals(3, spans.size()); + SpanData delaySpanData = spans.get(0); + + assertEquals("Attempt Delay", delaySpanData.getName()); + assertEquals("connecting", delaySpanData.getAttributes().get( + AttributeKey.stringKey("grpc.delay_type"))); + assertEquals(3, delaySpanData.getEvents().size()); + + EventData event1 = delaySpanData.getEvents().get(0); + assertEquals("Delay state transition", event1.getName()); + assertEquals("connecting", event1.getAttributes().get( + AttributeKey.stringKey("grpc.delay_type"))); + assertEquals("reason1", event1.getAttributes().get( + AttributeKey.stringKey("grpc.delay_reason"))); + + EventData event2 = delaySpanData.getEvents().get(1); + assertEquals("Delay state transition", event2.getName()); + assertEquals("connecting", event2.getAttributes().get( + AttributeKey.stringKey("grpc.delay_type"))); + assertEquals("reason2", event2.getAttributes().get( + AttributeKey.stringKey("grpc.delay_reason"))); + + EventData event3 = delaySpanData.getEvents().get(2); + assertEquals("Delay state transition", event3.getName()); + assertEquals("connecting", event3.getAttributes().get( + AttributeKey.stringKey("grpc.delay_type"))); + assertEquals("reason3", event3.getAttributes().get( + AttributeKey.stringKey("grpc.delay_reason"))); + } + + @Test + public void clientAttemptDelayTracing_endToEnd_inProcessTransport() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + LoadBalancerProvider slowLbProvider = new LoadBalancerProvider() { + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + @Override + public String getPolicyName() { + return "slow_connecting_policy"; + } + + @Override + public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { + return new LoadBalancer() { + @Override + public Status acceptResolvedAddresses(LoadBalancer.ResolvedAddresses resolvedAddresses) { + helper.updateBalancingState(ConnectivityState.CONNECTING, new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withNoResult("connecting", + "Simulated slow TLS handshake with backend"); + } + }); + latch.countDown(); + return Status.OK; + } + + @Override + public void handleNameResolutionError(Status error) {} + + @Override + public void shutdown() {} + }; + } + }; + LoadBalancerRegistry.getDefaultRegistry().register(slowLbProvider); + + NameResolverProvider customResolverProvider = new NameResolverProvider() { + @Override + protected boolean isAvailable() { + return true; + } + + @Override + protected int priority() { + return 5; + } + + @Override + public String getDefaultScheme() { + return "inproce2e"; + } + + @Override + public Collection> getProducedSocketAddressTypes() { + return Collections.singleton(InProcessSocketAddress.class); + } + + @Override + public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { + return new NameResolver() { + @Override + public String getServiceAuthority() { + return "inproce2e"; + } + + @Override + public void start(Listener2 listener) { + listener.onResult(ResolutionResult.newBuilder() + .setAddressesOrError(StatusOr.fromValue(Collections.singletonList( + new EquivalentAddressGroup(new InProcessSocketAddress("test-e2e"))))) + .build()); + } + + @Override + public void shutdown() {} + }; + } + }; + NameResolverRegistry.getDefaultRegistry().register(customResolverProvider); + + GrpcOpenTelemetry grpcOpenTelemetry = GrpcOpenTelemetry.newBuilder() + .sdk(openTelemetryRule.getOpenTelemetry()) + .enableTracing(true) + .build(); + + InProcessChannelBuilder channelBuilder = + InProcessChannelBuilder.forTarget("inproce2e:///test-e2e") + .defaultLoadBalancingPolicy("slow_connecting_policy"); + grpcOpenTelemetry.configureChannelBuilder(channelBuilder); + ManagedChannel channel = channelBuilder.build(); + try { + ClientCall call = channel.newCall(method, CallOptions.DEFAULT); + call.start(new ClientCall.Listener() {}, new Metadata()); + call.request(1); + + latch.await(5, TimeUnit.SECONDS); + Thread.sleep(50); + call.cancel("End test delay segment", null); + } finally { + channel.shutdownNow(); + channel.awaitTermination(5, TimeUnit.SECONDS); + LoadBalancerRegistry.getDefaultRegistry().deregister(slowLbProvider); + NameResolverRegistry.getDefaultRegistry().deregister(customResolverProvider); + } + + List spans = openTelemetryRule.getSpans(); + SpanData delaySpanData = null; + for (SpanData s : spans) { + if ("Attempt Delay".equals(s.getName())) { + delaySpanData = s; + break; + } + } + assertNotNull(delaySpanData); + assertEquals("connecting", + delaySpanData.getAttributes().get(AttributeKey.stringKey("grpc.delay_type"))); + + boolean foundTransition = false; + for (EventData event : delaySpanData.getEvents()) { + if ("Delay state transition".equals(event.getName()) + && "Simulated slow TLS handshake with backend".equals( + event.getAttributes().get(AttributeKey.stringKey("grpc.delay_reason")))) { + foundTransition = true; + break; + } + } + assertTrue(foundTransition); + } + + @Test + public void clientAttemptDelayStart_featureFlagDisabled_zeroChildSpans() { + System.setProperty("GRPC_EXPERIMENTAL_ENABLE_DELAY_OBSERVABILITY", "false"); + try { + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( + openTelemetryRule.getOpenTelemetry()); + Span clientSpan = tracerRule.spanBuilder("test-client-span").startSpan(); + CallAttemptsTracerFactory callTracer = + tracingModule.newClientCallTracer(clientSpan, method); + ClientStreamTracer clientStreamTracer = + callTracer.newClientStreamTracer(STREAM_INFO, new Metadata()); + + clientStreamTracer.recordAttemptDelayStart("connecting", "attempting to connect"); + clientStreamTracer.recordAttemptDelayEnd(); + clientStreamTracer.streamClosed(Status.OK); + callTracer.callEnded(Status.OK); + clientSpan.end(); + + List spans = openTelemetryRule.getSpans(); + assertEquals(2, spans.size()); + for (SpanData span : spans) { + assertTrue(!span.getName().equals("Attempt Delay")); + } + } finally { + System.setProperty("GRPC_EXPERIMENTAL_ENABLE_DELAY_OBSERVABILITY", "true"); + } + } + @Test public void clientInterceptor() { testClientInterceptors(false); diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index a2846fd04c8..ca3ec3b9db5 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -1028,7 +1028,9 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { SubchannelPicker picker = (childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null; if (picker == null) { - return PickResult.withNoResult(); + // Child policy is connecting. Preserve leaf delay type. + return PickResult.withNoResult( + "connecting", "RLS child policy connecting"); } // Happy path PickResult pickResult = picker.pickSubchannel(args); @@ -1037,6 +1039,11 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { Arrays.asList(helper.getChannelTarget(), lookupService, childPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)), Arrays.asList(determineCustomLabel(args))); + } else if (pickResult.getDelayType() != null) { + return PickResult.withNoResult( + pickResult.getDelayType(), + "RLS child (" + childPolicyWrapper.getTarget() + ") delayed: " + + pickResult.getDelayReason()); } return pickResult; } else if (response.hasError()) { @@ -1050,7 +1057,10 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { convertRlsServerStatus(response.getStatus(), lbPolicyConfig.getRouteLookupConfig().lookupService())); } else { - return PickResult.withNoResult(); + // RLS control-plane query is pending. + return PickResult.withNoResult( + "rls_lookup_pending", + "Route Lookup Service query pending on " + lookupService); } } @@ -1058,7 +1068,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { private PickResult useFallback(PickSubchannelArgs args) { SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker(); if (picker == null) { - return PickResult.withNoResult(); + return PickResult.withNoResult( + "connecting", "RLS fallback child policy connecting"); } PickResult pickResult = picker.pickSubchannel(args); if (pickResult.hasResult()) { @@ -1066,6 +1077,11 @@ private PickResult useFallback(PickSubchannelArgs args) { Arrays.asList(helper.getChannelTarget(), lookupService, fallbackChildPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)), Arrays.asList(determineCustomLabel(args))); + } else if (pickResult.getDelayType() != null) { + return PickResult.withNoResult( + pickResult.getDelayType(), + "RLS fallback (" + fallbackChildPolicyWrapper.getTarget() + ") delayed: " + + pickResult.getDelayReason()); } return pickResult; } diff --git a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java index b349aecdbf3..c5f06195964 100644 --- a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java +++ b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java @@ -27,6 +27,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -607,6 +608,70 @@ public void get_updatesLbState() throws Exception { assertThat(fakeThrottler.getNumUnthrottled()).isEqualTo(1); } + @Test + public void rls_pendingLookup_returnsDelayAttributes() throws Exception { + setUpRlsLbClient(); + ArgumentCaptor pickerCaptor = + ArgumentCaptor.forClass(SubchannelPicker.class); + verify(helper) + .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); + PickResult pickResult = pickerCaptor.getValue().pickSubchannel( + new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod().toBuilder() + .setFullMethodName("service1/create").build(), + new Metadata(), + CallOptions.DEFAULT, + new PickDetailsConsumer() {})); + assertThat(pickResult.getDelayType()).isEqualTo("rls_lookup_pending"); + assertThat(pickResult.getDelayReason()).contains("Route Lookup Service query pending"); + } + + @Test + public void rls_childPolicyDelayed_returnsDelayAttributes() throws Exception { + setUpRlsLbClient(); + RlsProtoData.RouteLookupRequestKey routeLookupRequestKey = + RlsProtoData.RouteLookupRequestKey.create( + ImmutableMap.of( + "server", "bigtable.googleapis.com", "service-key", "service1", + "method-key", "create")); + rlsServerImpl.setLookupTable( + ImmutableMap.of( + routeLookupRequestKey, + RouteLookupResponse.create( + ImmutableList.of("target1"), "header"))); + + CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequestKey); + assertThat(resp.hasData()).isFalse(); + fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); + + resp = getInSyncContext(routeLookupRequestKey); + assertThat(resp.hasData()).isTrue(); + + LbPolicyConfiguration.ChildPolicyWrapper wrapper = resp.getChildPolicyWrapper(); + wrapper.getHelper().updateBalancingState(ConnectivityState.CONNECTING, new SubchannelPicker() { + @Override + public PickResult pickSubchannel(LoadBalancer.PickSubchannelArgs args) { + return PickResult.withNoResult("connecting", "TCP handshake in progress"); + } + }); + + ArgumentCaptor pickerCaptor = + ArgumentCaptor.forClass(SubchannelPicker.class); + verify(helper, atLeastOnce()) + .updateBalancingState(any(), pickerCaptor.capture()); + PickResult pickResult = pickerCaptor.getValue().pickSubchannel( + new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod().toBuilder() + .setFullMethodName("service1/create").build(), + new Metadata(), + CallOptions.DEFAULT, + new PickDetailsConsumer() {})); + assertThat(pickResult.hasResult()).isFalse(); + assertThat(pickResult.getDelayType()).isEqualTo("connecting"); + assertThat(pickResult.getDelayReason()).isEqualTo( + "RLS child (target1) delayed: TCP handshake in progress"); + } + @Test public void timeout_not_changing_picked_subchannel() throws Exception { setUpRlsLbClient(); diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index a52390743a6..79c9a95b119 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -262,6 +262,8 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); + assertThat(res.getDelayType()).isEqualTo("rls_lookup_pending"); + assertThat(res.getDelayReason()).contains("Route Lookup Service query pending"); // Cache is warm, but still unconnected res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); @@ -493,6 +495,8 @@ public void lb_working_withoutDefaultTarget() throws Exception { PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); + assertThat(res.getDelayType()).isEqualTo("rls_lookup_pending"); + assertThat(res.getDelayReason()).contains("Route Lookup Service query pending"); // Cache is warm, but still unconnected res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); diff --git a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java index 9c9998571e5..78f234a8a30 100644 --- a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java +++ b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java @@ -38,6 +38,21 @@ public void createPendingStream() { delegate().createPendingStream(); } + @Override + public void recordAttemptDelayStart(String delayType, String delayReason) { + delegate().recordAttemptDelayStart(delayType, delayReason); + } + + @Override + public void recordAttemptDelayReasonChanged(String delayReason) { + delegate().recordAttemptDelayReasonChanged(delayReason); + } + + @Override + public void recordAttemptDelayEnd() { + delegate().recordAttemptDelayEnd(); + } + @Override public void outboundHeaders() { delegate().outboundHeaders(); diff --git a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java index 22940e875ac..346d63ff45c 100644 --- a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java +++ b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java @@ -41,13 +41,19 @@ * EquivalentAddressGroup}s from the {@link NameResolver}. */ final class RoundRobinLoadBalancer extends MultiChildLoadBalancer { + private static final PickResult CONNECTING_RESULT = PickResult.withNoResult("connecting", + "round_robin connecting: TCP/TLS handshake in progress to child balancers"); private final AtomicInteger sequence = new AtomicInteger(new Random().nextInt()); - private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + private SubchannelPicker currentPicker = new FixedResultPicker(connectingResult()); public RoundRobinLoadBalancer(Helper helper) { super(helper); } + private PickResult connectingResult() { + return CONNECTING_RESULT; + } + /** * Updates picker with the list of active subchannels (state == READY). */ @@ -68,7 +74,7 @@ protected void updateOverallBalancingState() { } if (isConnecting) { - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(connectingResult())); } else { updateBalancingState(TRANSIENT_FAILURE, createReadyPicker(getChildLbStates())); } diff --git a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index 18854ca1bb6..c0f5bbc526f 100644 --- a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -86,7 +86,9 @@ public class RoundRobinLoadBalancerTest { private static final Attributes.Key MAJOR_KEY = Attributes.Key.create("major-key"); private static final SubchannelPicker EMPTY_PICKER = - new FixedResultPicker(PickResult.withNoResult()); + new FixedResultPicker( + PickResult.withNoResult("connecting", + "round_robin connecting: TCP/TLS handshake in progress to child balancers")); @Rule public final MockitoRule mocks = MockitoJUnit.rule(); @@ -576,6 +578,16 @@ private void deliverSubchannelState(Subchannel subchannel, ConnectivityStateInfo testHelperInst.deliverSubchannelState(subchannel, newState); } + @Test + public void roundRobin_delayAttributes() { + acceptAddresses(servers, affinity); + verify(mockHelper, atLeastOnce()) + .updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + PickResult res = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertThat(res.getDelayType()).isEqualTo("connecting"); + assertThat(res.getDelayReason()).contains("TCP/TLS handshake in progress"); + } + private static class FakeSocketAddress extends SocketAddress { final String name; diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 29b18fb6aa7..0d1c51cdf93 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -17,6 +17,7 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.CDS_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; @@ -118,6 +119,12 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { errorPrefix() + "Unable to find non-dynamic cluster")); } // The dynamic cluster must not have loaded yet + helper.updateBalancingState( + CONNECTING, + new FixedResultPicker( + PickResult.withNoResult( + "cds_dynamic_discovery", + "waiting for CDS resource definition for cluster " + clusterName))); return Status.OK; } if (!clusterConfigOr.hasValue()) { diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 64105144240..3a0eaf2638e 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -200,7 +200,8 @@ public void shutdown() { private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper { private final AtomicLong inFlights; private ConnectivityState currentState = ConnectivityState.IDLE; - private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + private SubchannelPicker currentPicker = new FixedResultPicker( + PickResult.withNoResult("connecting", "cluster_impl: initializing")); private List dropPolicies = Collections.emptyList(); private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS; @Nullable diff --git a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java index 22b5aaa7d73..6759ce1d6f4 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java @@ -150,7 +150,13 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { Status.UNAVAILABLE.withDescription("CDS encountered error: unable to find " + "available subchannel for cluster " + clusterName)); } - return childPicker.pickSubchannel(args); + PickResult childResult = childPicker.pickSubchannel(args); + if (!childResult.hasResult() && childResult.getDelayType() != null) { + String reason = "xds_cluster_manager: child '" + clusterName + "': " + + (childResult.getDelayReason() != null ? childResult.getDelayReason() : ""); + return PickResult.withNoResult(childResult.getDelayType(), reason); + } + return childResult; } @Override diff --git a/xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java b/xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java index b5f09c4ea93..8dbf021775b 100644 --- a/xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java @@ -107,7 +107,8 @@ private final class LazyPicker extends SubchannelPicker { public PickResult pickSubchannel(PickSubchannelArgs args) { // activate() is a no-op after shutdown() helper.getSynchronizationContext().execute(LazyDelegate.this::activate); - return PickResult.withNoResult(); + return PickResult.withNoResult( + "connecting", "lazy: waiting for connection"); } } } diff --git a/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java b/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java index ddaeb4f4be5..f638b23b565 100644 --- a/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java @@ -54,7 +54,8 @@ final class LeastRequestLoadBalancer extends MultiChildLoadBalancer { private final ThreadSafeRandom random; - private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + private SubchannelPicker currentPicker = new FixedResultPicker( + PickResult.withNoResult("connecting", "least_request: initializing")); private int choiceCount = DEFAULT_CHOICE_COUNT; LeastRequestLoadBalancer(Helper helper) { @@ -113,7 +114,10 @@ protected void updateOverallBalancingState() { } } if (isConnecting) { - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState( + CONNECTING, + new FixedResultPicker( + PickResult.withNoResult("connecting", "least_request: connecting"))); } else { // Give it all the failing children and let it randomly pick among them updateBalancingState(TRANSIENT_FAILURE, @@ -246,7 +250,8 @@ public boolean equals(Object o) { static final class EmptyPicker extends SubchannelPicker { @Override public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withNoResult(); + return PickResult.withNoResult( + "connecting", "least_request: waiting for subchannel"); } @Override diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index ca142af0af3..f75c13055fe 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -330,7 +330,11 @@ public void updateBalancingState(final ConnectivityState newState, } ConnectivityState oldState = connectivityState; connectivityState = newState; - picker = newPicker; + if (newState == CONNECTING || newState == IDLE) { + picker = new PriorityPicker(newPicker, priority); + } else { + picker = newPicker; + } if (deletionTimer != null && deletionTimer.isPending()) { return; @@ -365,4 +369,44 @@ protected Helper delegate() { } } } + + private static final class PriorityPicker extends SubchannelPicker { + private final SubchannelPicker delegate; + private final String priority; + + PriorityPicker(SubchannelPicker delegate, String priority) { + this.delegate = checkNotNull(delegate, "delegate"); + this.priority = checkNotNull(priority, "priority"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + PickResult childResult = delegate.pickSubchannel(args); + if (!childResult.hasResult() && childResult.getDelayType() != null) { + String childReason = childResult.getDelayReason(); + String composedType = priority + ":" + childResult.getDelayType(); + String reason = "waiting on priority group " + priority + " (" + + (childReason != null ? childReason : "connecting") + ")"; + return PickResult.withNoResult(composedType, reason); + } + return childResult; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PriorityPicker that = (PriorityPicker) o; + return delegate.equals(that.delegate) && priority.equals(that.priority); + } + + @Override + public int hashCode() { + return Objects.hash(delegate, priority); + } + } } diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 513f4d643ea..eb8ba235d82 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -356,6 +356,8 @@ public static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) { } private static final class RingHashPicker extends SubchannelPicker { + private static final PickResult RING_HASH_CONNECTING_RESULT = + PickResult.withNoResult("connecting", "ring_hash: waiting for connection"); private final SynchronizationContext syncContext; private final List ring; // Avoid synchronization between pickSubchannel and subchannel's connectivity state change, @@ -453,7 +455,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { // RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs // are failed unless there is a READY connection. if (subchannelView.connectivityState == CONNECTING) { - return PickResult.withNoResult(); + return RING_HASH_CONNECTING_RESULT; } if (subchannelView.connectivityState == IDLE) { @@ -463,7 +465,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } }); - return PickResult.withNoResult(); // Indicates that this should be retried after backoff + // Indicates that this should be retried after backoff + return RING_HASH_CONNECTING_RESULT; } } } else { @@ -487,7 +490,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } } if (requestedConnection) { - return PickResult.withNoResult(); + return RING_HASH_CONNECTING_RESULT; } } diff --git a/xds/src/main/java/io/grpc/xds/WeightedRandomPicker.java b/xds/src/main/java/io/grpc/xds/WeightedRandomPicker.java index 904f3872b6d..c8be54b819d 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedRandomPicker.java +++ b/xds/src/main/java/io/grpc/xds/WeightedRandomPicker.java @@ -132,7 +132,12 @@ public final PickResult pickSubchannel(PickSubchannelArgs args) { checkNotNull(childPicker, "childPicker not found"); } - return childPicker.pickSubchannel(args); + PickResult res = childPicker.pickSubchannel(args); + if (!res.hasResult() && res.getDelayType() != null) { + return PickResult.withNoResult(res.getDelayType(), + "weighted_target: " + (res.getDelayReason() != null ? res.getDelayReason() : "")); + } + return res; } @Override diff --git a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java index 75a8411b5a4..656f358308f 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java @@ -107,7 +107,8 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer { private final Ticker ticker; private String locality = ""; private String backendService = ""; - private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + private SubchannelPicker currentPicker = new FixedResultPicker( + PickResult.withNoResult("connecting", "weighted_round_robin: initializing")); // The metric instruments are only registered once and shared by all instances of this LB. static { @@ -227,7 +228,9 @@ protected void updateOverallBalancingState() { if (isConnecting) { updateBalancingState( - ConnectivityState.CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + ConnectivityState.CONNECTING, + new FixedResultPicker( + PickResult.withNoResult("connecting", "weighted_round_robin: connecting"))); } else { updateBalancingState( ConnectivityState.TRANSIENT_FAILURE, createReadyPicker(getChildLbStates())); diff --git a/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java index 9468a9daf9d..1901e4ed0d5 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java @@ -165,7 +165,8 @@ private void updateOverallBalancingState() { if (overallState == TRANSIENT_FAILURE) { picker = new WeightedRandomPicker(errorPickers); } else { - picker = new FixedResultPicker(PickResult.withNoResult()); + picker = new FixedResultPicker( + PickResult.withNoResult("connecting", "weighted_target: connecting")); } } else { picker = new WeightedRandomPicker(childPickers); @@ -197,7 +198,8 @@ private static ConnectivityState aggregateState( private final class ChildHelper extends ForwardingLoadBalancerHelper { String name; ConnectivityState currentState = CONNECTING; - SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + SubchannelPicker currentPicker = new FixedResultPicker( + PickResult.withNoResult("connecting", "weighted_target: initializing")); private ChildHelper(String name) { this.name = name; diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index ff4813fe6a8..178d31977dc 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -25,6 +25,7 @@ import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_RDS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -343,6 +344,36 @@ public void dynamicCluster() { assertThat(this.lastXdsConfig.getClusters()).doesNotContainKey(clusterName); } + @Test + public void discoverDynamicCluster_pending_emitsToken() { + String clusterName = "cluster2"; + CdsConfig cdsConfig = new CdsConfig(clusterName, /*dynamic=*/ true); + + XdsConfig xdsConfig = new XdsConfig(null, null, null, ImmutableMap.of()); + + loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes(Attributes.newBuilder() + .set(XdsAttributes.XDS_CONFIG, xdsConfig) + .set( + XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, + new XdsConfig.XdsClusterSubscriptionRegistry() { + @Override + public XdsConfig.Subscription subscribeToCluster(String clusterName) { + return mock(XdsConfig.Subscription.class); + } + }) + .build()) + .setLoadBalancingPolicyConfig(cdsConfig) + .build()); + + verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getDelayType()).isEqualTo("cds_dynamic_discovery"); + assertThat(result.getDelayReason()) + .isEqualTo("waiting for CDS resource definition for cluster cluster2"); + } + @Test public void discoverAggregateCluster_createsPriorityLbPolicy() { CdsLoadBalancerProvider cdsLoadBalancerProvider = new CdsLoadBalancerProvider(lbRegistry); @@ -638,6 +669,18 @@ private void startXdsDepManager(final CdsConfig cdsConfig) { fakeClock.forwardTime(10, TimeUnit.MINUTES); } + @Test + public void cds_resolutionError_updatesAttemptDelay() { + loadBalancer.handleNameResolutionError( + Status.UNAVAILABLE.withDescription("cds lookup failed")); + verify(helper, atLeastOnce()).updateBalancingState( + eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult pick = pickerCaptor.getValue().pickSubchannel( + mock(PickSubchannelArgs.class)); + assertThat(pick.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(pick.getStatus().getDescription()).contains("cds lookup failed"); + } + private static void assertPickerStatus(SubchannelPicker picker, Status expectedStatus) { PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class)); Status actualStatus = result.getStatus(); diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java index 988bc720e45..fd5e28923e3 100644 --- a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -437,6 +437,18 @@ public void handleNameResolutionError() { } } + @Test + public void handleNameResolutionError_updatesDelayAttributes() { + priorityLb.handleNameResolutionError( + Status.UNAVAILABLE.withDescription("priority dns error")); + verify(helper, atLeastOnce()).updateBalancingState( + eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult pick = pickerCaptor.getValue().pickSubchannel( + mock(PickSubchannelArgs.class)); + assertThat(pick.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(pick.getStatus().getDescription()).contains("priority dns error"); + } + @Test public void typicalPriorityFailOverFlow() { PriorityChildConfig priorityChildConfig0 = @@ -634,7 +646,8 @@ public void connectingResetFailOverIfSeenReadyOrIdleSinceTransientFailure() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); // Nothing important about this verify, other than to provide a baseline - verify(helper).updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); + verify(helper, times(2)) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); assertThat(fooBalancers).hasSize(1); assertThat(fooHelpers).hasSize(1); Helper helper0 = Iterables.getOnlyElement(fooHelpers); @@ -650,7 +663,7 @@ public void connectingResetFailOverIfSeenReadyOrIdleSinceTransientFailure() { helper0.updateBalancingState( CONNECTING, EMPTY_PICKER); - verify(helper, times(2)) + verify(helper, times(3)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); // failover happens @@ -676,7 +689,7 @@ public void failoverTimerNotRestartedOnDupConnecting() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); // Nothing important about this verify, other than to provide a baseline - inOrder.verify(helper) + inOrder.verify(helper, times(2)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); assertThat(fooBalancers).hasSize(1); assertThat(fooHelpers).hasSize(1); @@ -694,7 +707,7 @@ public void failoverTimerNotRestartedOnDupConnecting() { fakeClock.forwardTime(5, TimeUnit.SECONDS); assertThat(fooBalancers).hasSize(2); assertThat(fooHelpers).hasSize(2); - inOrder.verify(helper, times(2)) + inOrder.verify(helper, times(3)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); Helper helper1 = Iterables.getLast(fooHelpers); @@ -972,7 +985,7 @@ public void raceBetweenShutdownAndChildLbBalancingStateUpdate() { .setAddresses(ImmutableList.of()) .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); - verify(helper).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class)); + verify(helper, times(2)).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class)); // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, // any further balancing state update should be ignored. @@ -1010,7 +1023,76 @@ public void noDuplicateOverallBalancingStateUpdate() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); - verify(helper, times(4)).updateBalancingState(any(), any()); + verify(helper, times(6)).updateBalancingState(any(), any()); + } + + @Test + public void priorityPicker_prependsToken() throws Exception { + PriorityChildConfig priorityChildConfig0 = + new PriorityChildConfig(newChildConfig(fooLbProvider, new Object()), true); + PriorityLbConfig priorityLbConfig = + new PriorityLbConfig(ImmutableMap.of("p0", priorityChildConfig0), ImmutableList.of("p0")); + + priorityLb.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setLoadBalancingPolicyConfig(priorityLbConfig) + .build()); + + Helper helper0 = Iterables.getOnlyElement(fooHelpers); // priority p0 + + SubchannelPicker fakeChildPicker = new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withNoResult("connecting", "child_reason"); + } + }; + helper0.updateBalancingState(CONNECTING, fakeChildPicker); + + verify(helper, atLeastOnce()) + .updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + + SubchannelPicker priorityPicker = pickerCaptor.getValue(); + PickResult result = priorityPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + + assertThat(result.getDelayType()).isEqualTo("p0:connecting"); + assertThat(result.getDelayReason()) + .isEqualTo("waiting on priority group p0 (child_reason)"); + } + + @Test + public void priorityPicker_nestedPriorities_composesTokens() throws Exception { + PriorityChildConfig priorityChildConfig0 = + new PriorityChildConfig(newChildConfig(fooLbProvider, new Object()), true); + PriorityLbConfig priorityLbConfig = + new PriorityLbConfig(ImmutableMap.of("p0", priorityChildConfig0), ImmutableList.of("p0")); + + priorityLb.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setLoadBalancingPolicyConfig(priorityLbConfig) + .build()); + + Helper helper0 = Iterables.getOnlyElement(fooHelpers); // priority p0 + + SubchannelPicker nestedChildPicker = new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withNoResult("p1:connecting", + "waiting on priority group p1 (child_reason)"); + } + }; + helper0.updateBalancingState(CONNECTING, nestedChildPicker); + + verify(helper, atLeastOnce()) + .updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + + SubchannelPicker priorityPicker = pickerCaptor.getValue(); + PickResult result = priorityPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + + assertThat(result.getDelayType()).isEqualTo("p0:p1:connecting"); + assertThat(result.getDelayReason()).isEqualTo( + "waiting on priority group p0 (waiting on priority group p1 (child_reason))"); } private void assertLatestConnectivityState(ConnectivityState expectedState) { diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index b515ed81158..da11df24af4 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -160,6 +160,8 @@ public void subchannelLazyConnectUntilPicked() { PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); + assertThat(result.getDelayType()).isEqualTo("connecting"); + assertThat(result.getDelayReason()).isEqualTo("ring_hash: waiting for connection"); Subchannel subchannel = Iterables.getOnlyElement(subchannels.values()); int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() && !PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() ? 1 : 2; @@ -524,6 +526,8 @@ public void pickWithRandomHash_atLeastOneSubchannelConnecting() { PickResult result = picker.pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request + assertThat(result.getDelayType()).isEqualTo("connecting"); + assertThat(result.getDelayReason()).isEqualTo("ring_hash: waiting for connection"); verifyConnection(0); } @@ -546,6 +550,8 @@ public void pickWithRandomHash_firstSubchannelInTransientFailure_remainingSubcha PickResult result = picker.pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request + assertThat(result.getDelayType()).isEqualTo("connecting"); + assertThat(result.getDelayReason()).isEqualTo("ring_hash: waiting for connection"); verifyConnection(1); } diff --git a/xds/src/test/java/io/grpc/xds/WeightedRandomPickerTest.java b/xds/src/test/java/io/grpc/xds/WeightedRandomPickerTest.java index 691615762bf..89e00cc2960 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRandomPickerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRandomPickerTest.java @@ -28,6 +28,7 @@ import io.grpc.xds.WeightedRandomPicker.WeightedChildPicker; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.junit.Rule; import org.junit.Test; @@ -248,4 +249,24 @@ public void allZeroWeights() { assertThat(xdsPicker.pickSubchannel(pickSubchannelArgs)).isSameInstanceAs(pickResult3); assertThat(fakeRandom.bound).isEqualTo(4); } + + @Test + public void pickSubchannelEnrichesDelayReason() { + final PickResult pending = + PickResult.withNoResult("connecting", "TCP handshake in progress"); + SubchannelPicker delayedPicker = new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return pending; + } + }; + WeightedRandomPicker picker = new WeightedRandomPicker( + Collections.singletonList(new WeightedChildPicker(10, delayedPicker))); + + PickResult res = picker.pickSubchannel(pickSubchannelArgs); + assertThat(res.hasResult()).isFalse(); + assertThat(res.getDelayType()).isEqualTo("connecting"); + assertThat(res.getDelayReason()) + .isEqualTo("weighted_target: TCP handshake in progress"); + } }