From 82b97c5384750db0290f98c126cccf0a057a8582 Mon Sep 17 00:00:00 2001 From: Eric Bowden Date: Mon, 4 May 2026 17:14:25 -0500 Subject: [PATCH] Add invoker-mode and async leader-change support to AeronCluster Adds Context.RunAgentInvokers and AddNewLeaderIngressPublication, wires invoker-mode into AsyncConnect.Poll, adds async-add publication handling to MemberIngress (AsyncAddPublication/AsyncGetPublication), and refactors AsyncConnect.UpdateMembers to reuse the prior leader publication when the new leader's endpoint is unchanged. UpdateMemberEndpoints now uses AddNewLeaderIngressPublication on the failover path (IngressEndpoints case). Match Java's behavior so invoker mode does not deadlock during initial connect or leader change. --- src/Adaptive.Cluster/Client/AeronCluster.cs | 111 ++++++++++++++++++-- 1 file changed, 104 insertions(+), 7 deletions(-) diff --git a/src/Adaptive.Cluster/Client/AeronCluster.cs b/src/Adaptive.Cluster/Client/AeronCluster.cs index 5b06ccd..7487767 100644 --- a/src/Adaptive.Cluster/Client/AeronCluster.cs +++ b/src/Adaptive.Cluster/Client/AeronCluster.cs @@ -1029,6 +1029,27 @@ internal static Publication AddIngressPublication(Context ctx, string channel, i } } + private Publication AddNewLeaderIngressPublication(Context ctx, string channel, int streamId) + { + long registrationId = AsyncAddIngressPublication(ctx, channel, streamId); + long deadlineNs = nanoClock.NanoTime() + ctx.MessageTimeoutNs(); + do + { + Publication publication = GetIngressPublication(ctx, registrationId); + if (null != publication) + { + return publication; + } + _idleStrategy.Idle(ctx.RunAgentInvokers()); + } + while (nanoClock.NanoTime() < deadlineNs); + + throw new AeronTimeoutException( + "failed to add new leader ingress publication (leaderMemberId=" + _leaderMemberId + + ", leadershipTermId=" + _leadershipTermId + ", channel=" + channel + ", streamId=" + streamId + + ") within " + ctx.MessageTimeoutNs() + "ns"); + } + internal static long AsyncAddIngressPublication(Context ctx, string channel, int streamId) { if (ctx.IsIngressExclusive()) @@ -1059,8 +1080,15 @@ private void UpdateMemberEndpoints(string ingressEndpoints, int leaderMemberId) var map = ParseIngressEndpoints(_ctx, ingressEndpoints); var newLeader = map.Get(leaderMemberId); - newLeader.CreateIngressPublication(); - _publication = newLeader.publication; + + ChannelUri channelUri = ChannelUri.Parse(_ctx.IngressChannel()); + if (channelUri.IsUdp) + { + channelUri.Put(ENDPOINT_PARAM_NAME, newLeader.endpoint); + } + + _publication = newLeader.publication = + AddNewLeaderIngressPublication(_ctx, channelUri.ToString(), _ctx.IngressStreamId()); _endpointByIdMap = map; } @@ -1969,6 +1997,27 @@ public AgentInvoker AgentInvoker() return agentInvoker; } + internal int RunAgentInvokers() + { + int workDone = 0; + + if (null != _aeron) + { + AgentInvoker conductorAgentInvoker = _aeron.ConductorAgentInvoker; + if (null != conductorAgentInvoker) + { + workDone += conductorAgentInvoker.Invoke(); + } + } + + if (null != agentInvoker) + { + workDone += agentInvoker.Invoke(); + } + + return workDone; + } + /// /// Close the context and free applicable resources. /// @@ -2168,6 +2217,7 @@ public static string StepName(int step) public AeronCluster Poll() { CheckDeadline(); + ctx.RunAgentInvokers(); switch (state) { @@ -2340,6 +2390,11 @@ private void AwaitPublicationConnected() { foreach (MemberIngress member in memberByIdMap.Values) { + if (null == member.publication && NULL_VALUE != member.registrationId) + { + member.AsyncGetPublication(); + } + if (null != member.publication && member.publication.IsConnected) { ingressPublication = member.publication; @@ -2448,15 +2503,28 @@ private void PrepareChallengeResponse(byte[] encodedCredentials) private void UpdateMembers() { + leaderMemberId = egressPoller.LeaderMemberId(); + MemberIngress oldLeader = memberByIdMap.Remove(leaderMemberId); + CloseHelper.Dispose(ingressPublication); + ingressPublication = null; CloseHelper.CloseAll(memberByIdMap.Values); - - leaderMemberId = egressPoller.LeaderMemberId(); + memberByIdMap = ParseIngressEndpoints(ctx, egressPoller.Detail()); - MemberIngress leader = memberByIdMap.Get(leaderMemberId); - leader.CreateIngressPublication(); - ingressPublication = leader.publication; + MemberIngress newLeader = memberByIdMap.Get(leaderMemberId); + if (null != oldLeader && + null == oldLeader.publicationException && + newLeader.endpoint.Equals(oldLeader.endpoint)) + { + ingressPublication = newLeader.publication = oldLeader.publication; + newLeader.registrationId = oldLeader.registrationId; + } + else + { + CloseHelper.Dispose(oldLeader); + newLeader.AsyncAddPublication(); + } State(AWAIT_PUBLICATION_CONNECTED); } @@ -2508,6 +2576,35 @@ internal void CreateIngressPublication() publication = AddIngressPublication(ctx, channelUri.ToString(), ctx.IngressStreamId()); } + + internal void AsyncAddPublication() + { + ChannelUri channelUri = ChannelUri.Parse(ctx.IngressChannel()); + if (channelUri.IsUdp) + { + channelUri.Put(ENDPOINT_PARAM_NAME, endpoint); + } + + registrationId = AsyncAddIngressPublication(ctx, channelUri.ToString(), ctx.IngressStreamId()); + publication = null; + } + + internal void AsyncGetPublication() + { + try + { + publication = GetIngressPublication(ctx, registrationId); + if (null != publication) + { + registrationId = NULL_VALUE; + } + } + catch (RegistrationException ex) + { + publicationException = ex; + registrationId = NULL_VALUE; + } + } public void Dispose() {