diff --git a/src/Adaptive.Cluster/Client/AeronCluster.cs b/src/Adaptive.Cluster/Client/AeronCluster.cs index 5b06ccd..7487767 100644 --- a/src/Adaptive.Cluster/Client/AeronCluster.cs +++ b/src/Adaptive.Cluster/Client/AeronCluster.cs @@ -1029,6 +1029,27 @@ internal static Publication AddIngressPublication(Context ctx, string channel, i } } + private Publication AddNewLeaderIngressPublication(Context ctx, string channel, int streamId) + { + long registrationId = AsyncAddIngressPublication(ctx, channel, streamId); + long deadlineNs = nanoClock.NanoTime() + ctx.MessageTimeoutNs(); + do + { + Publication publication = GetIngressPublication(ctx, registrationId); + if (null != publication) + { + return publication; + } + _idleStrategy.Idle(ctx.RunAgentInvokers()); + } + while (nanoClock.NanoTime() < deadlineNs); + + throw new AeronTimeoutException( + "failed to add new leader ingress publication (leaderMemberId=" + _leaderMemberId + + ", leadershipTermId=" + _leadershipTermId + ", channel=" + channel + ", streamId=" + streamId + + ") within " + ctx.MessageTimeoutNs() + "ns"); + } + internal static long AsyncAddIngressPublication(Context ctx, string channel, int streamId) { if (ctx.IsIngressExclusive()) @@ -1059,8 +1080,15 @@ private void UpdateMemberEndpoints(string ingressEndpoints, int leaderMemberId) var map = ParseIngressEndpoints(_ctx, ingressEndpoints); var newLeader = map.Get(leaderMemberId); - newLeader.CreateIngressPublication(); - _publication = newLeader.publication; + + ChannelUri channelUri = ChannelUri.Parse(_ctx.IngressChannel()); + if (channelUri.IsUdp) + { + channelUri.Put(ENDPOINT_PARAM_NAME, newLeader.endpoint); + } + + _publication = newLeader.publication = + AddNewLeaderIngressPublication(_ctx, channelUri.ToString(), _ctx.IngressStreamId()); _endpointByIdMap = map; } @@ -1969,6 +1997,27 @@ public AgentInvoker AgentInvoker() return agentInvoker; } + internal int RunAgentInvokers() + { + int workDone = 0; + + if (null != _aeron) + { + AgentInvoker conductorAgentInvoker = _aeron.ConductorAgentInvoker; + if (null != conductorAgentInvoker) + { + workDone += conductorAgentInvoker.Invoke(); + } + } + + if (null != agentInvoker) + { + workDone += agentInvoker.Invoke(); + } + + return workDone; + } + /// /// Close the context and free applicable resources. /// @@ -2168,6 +2217,7 @@ public static string StepName(int step) public AeronCluster Poll() { CheckDeadline(); + ctx.RunAgentInvokers(); switch (state) { @@ -2340,6 +2390,11 @@ private void AwaitPublicationConnected() { foreach (MemberIngress member in memberByIdMap.Values) { + if (null == member.publication && NULL_VALUE != member.registrationId) + { + member.AsyncGetPublication(); + } + if (null != member.publication && member.publication.IsConnected) { ingressPublication = member.publication; @@ -2448,15 +2503,28 @@ private void PrepareChallengeResponse(byte[] encodedCredentials) private void UpdateMembers() { + leaderMemberId = egressPoller.LeaderMemberId(); + MemberIngress oldLeader = memberByIdMap.Remove(leaderMemberId); + CloseHelper.Dispose(ingressPublication); + ingressPublication = null; CloseHelper.CloseAll(memberByIdMap.Values); - - leaderMemberId = egressPoller.LeaderMemberId(); + memberByIdMap = ParseIngressEndpoints(ctx, egressPoller.Detail()); - MemberIngress leader = memberByIdMap.Get(leaderMemberId); - leader.CreateIngressPublication(); - ingressPublication = leader.publication; + MemberIngress newLeader = memberByIdMap.Get(leaderMemberId); + if (null != oldLeader && + null == oldLeader.publicationException && + newLeader.endpoint.Equals(oldLeader.endpoint)) + { + ingressPublication = newLeader.publication = oldLeader.publication; + newLeader.registrationId = oldLeader.registrationId; + } + else + { + CloseHelper.Dispose(oldLeader); + newLeader.AsyncAddPublication(); + } State(AWAIT_PUBLICATION_CONNECTED); } @@ -2508,6 +2576,35 @@ internal void CreateIngressPublication() publication = AddIngressPublication(ctx, channelUri.ToString(), ctx.IngressStreamId()); } + + internal void AsyncAddPublication() + { + ChannelUri channelUri = ChannelUri.Parse(ctx.IngressChannel()); + if (channelUri.IsUdp) + { + channelUri.Put(ENDPOINT_PARAM_NAME, endpoint); + } + + registrationId = AsyncAddIngressPublication(ctx, channelUri.ToString(), ctx.IngressStreamId()); + publication = null; + } + + internal void AsyncGetPublication() + { + try + { + publication = GetIngressPublication(ctx, registrationId); + if (null != publication) + { + registrationId = NULL_VALUE; + } + } + catch (RegistrationException ex) + { + publicationException = ex; + registrationId = NULL_VALUE; + } + } public void Dispose() {