diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java index 00437956a51..afe368085a6 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java @@ -25,8 +25,11 @@ import android.os.IBinder; import android.os.Parcel; import android.os.Process; + import androidx.annotation.BinderThread; import androidx.annotation.MainThread; + +import com.google.common.base.Preconditions; import com.google.common.base.Ticker; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -59,7 +62,9 @@ import io.grpc.internal.StatsTraceContext; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; + import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -80,6 +85,12 @@ public final class BinderClientTransport extends BinderTransport /** Number of ongoing calls which keep this transport "in-use". */ private final AtomicInteger numInUseStreams; + /** Last in-use state that was reported to the listener */ + private final AtomicBoolean listenerInUse; + + /** Synchronizes transport listener callbacks */ + private final Object listenerNotifyLock; + private final long readyTimeoutMillis; private final PingTracker pingTracker; private final boolean preAuthorizeServer; @@ -121,7 +132,9 @@ public BinderClientTransport( preAuthServerOverride != null ? preAuthServerOverride : factory.preAuthorizeServers; this.handshake = factory.useLegacyAuthStrategy ? new LegacyClientHandshake() : new V2ClientHandshake(); - numInUseStreams = new AtomicInteger(); + this.numInUseStreams = new AtomicInteger(); + this.listenerInUse = new AtomicBoolean(); + this.listenerNotifyLock = new Object(); pingTracker = new PingTracker(Ticker.systemTicker(), (id) -> sendPing(id)); serviceBinding = new ServiceBinding( @@ -265,9 +278,7 @@ public synchronized ClientStream newStream( return newFailingClientStream(failure, attributes, headers, tracers); } - if (inbound.countsForInUse() && numInUseStreams.getAndIncrement() == 0) { - clientTransportListener.transportInUse(true); - } + updateInUseStreamsCountIfNeeded(inbound.countsForInUse(), 1); Outbound.ClientOutbound outbound = new Outbound.ClientOutbound(this, callId, method, headers, statsTraceContext); if (method.getType().clientSendsOneMessage()) { @@ -279,9 +290,7 @@ public synchronized ClientStream newStream( @Override protected void unregisterInbound(Inbound inbound) { - if (inbound.countsForInUse() && numInUseStreams.decrementAndGet() == 0) { - clientTransportListener.transportInUse(false); - } + updateInUseStreamsCountIfNeeded(inbound.countsForInUse(), -1); super.unregisterInbound(inbound); } @@ -305,21 +314,38 @@ public synchronized void shutdownNow(Status reason) { @Override @GuardedBy("this") void notifyShutdown(Status status) { - clientTransportListener.transportShutdown(status); + // Defer to listener executor with external synchronization + scheduleOnListener( + new Runnable() { + @Override + public void run() { + clientTransportListener.transportShutdown(status); + } + }); } @Override @GuardedBy("this") void notifyTerminated() { if (numInUseStreams.getAndSet(0) > 0) { - clientTransportListener.transportInUse(false); + if (listenerInUse.compareAndSet(true, false)) { + scheduleTransportInUseNotification(false); + } else { + listenerInUse.set(false); + } } if (readyTimeoutFuture != null) { readyTimeoutFuture.cancel(false); readyTimeoutFuture = null; } serviceBinding.unbind(); - clientTransportListener.transportTerminated(); + scheduleOnListener( + new Runnable() { + @Override + public void run() { + clientTransportListener.transportTerminated(); + } + }); } @Override @@ -439,8 +465,11 @@ public void handleSetupTransport() { @GuardedBy("this") private void onHandshakeComplete() { setState(TransportState.READY); - attributes = clientTransportListener.filterTransport(attributes); - clientTransportListener.transportReady(); + final Attributes currentAttrs = attributes; + // Perform filter on listener thread with external synchronization, then update attrs and + // notify ready without holding transport lock to avoid deadlocks. + scheduleFilterTransportAndReady(currentAttrs); + if (readyTimeoutFuture != null) { readyTimeoutFuture.cancel(false); readyTimeoutFuture = null; @@ -452,6 +481,100 @@ private synchronized void handleAuthResult(Throwable t) { Status.INTERNAL.withDescription("Could not evaluate SecurityPolicy").withCause(t), true); } + /** + * Updates in-use stream count and notifies listener only on transitions between 0 and >0, without + * acquiring the transport lock. + */ + private void updateInUseStreamsCountIfNeeded(boolean countsForInUse, int delta) { + Preconditions.checkArgument(delta == -1 || delta == 1, "stream count delta must be -1 or +1"); + if (!countsForInUse) { + return; + } + + if (delta > 0) { + numInUseStreams.incrementAndGet(); + } else { + // Decrement with floor at 0 + int prev = numInUseStreams.get(); + + while (true) { + int current = prev; + int newValue = current > 0 ? current - 1 : 0; + if (numInUseStreams.compareAndSet(current, newValue)) { + break; + } + prev = numInUseStreams.get(); + } + } + reconcileInUseState(); + } + + /** Reconcile listenerInUse with the current stream count to avoid stale toggles under races. */ + private void reconcileInUseState() { + boolean nowInUse = numInUseStreams.get() > 0; + boolean prev = listenerInUse.get(); + + if(prev != nowInUse && listenerInUse.compareAndSet(prev, nowInUse)) { + scheduleTransportInUseNotification(nowInUse); + } + } + + private void scheduleTransportInUseNotification(final boolean inUse) { + getScheduledExecutorService() + .execute( + new Runnable() { + @Override + public void run() { + // Provide external synchronization as required by Listener contract, + // without taking the transport lock to avoid potential deadlocks. + synchronized (listenerNotifyLock) { + if (listenerInUse.get() == inUse) { + clientTransportListener.transportInUse(inUse); + } + } + } + }); + } + + private void scheduleFilterTransportAndReady(final Attributes attrsSnapshot) { + getScheduledExecutorService() + .execute( + new Runnable() { + @Override + public void run() { + final Attributes filtered; + synchronized (listenerNotifyLock) { + filtered = clientTransportListener.filterTransport(attrsSnapshot); + } + + synchronized (BinderClientTransport.class) { + attributes = filtered; + } + + scheduleOnListener( + new Runnable() { + @Override + public void run() { + clientTransportListener.transportReady(); + } + }); + } + }); + } + + private void scheduleOnListener(final Runnable task) { + getScheduledExecutorService() + .execute( + new Runnable() { + @Override + public void run() { + synchronized (listenerNotifyLock) { + task.run(); + } + } + }); + } + @GuardedBy("this") @Override protected void handlePingResponse(Parcel parcel) {