From 86e841854ae737dd95ec25ed1097c2961a0ebc3a Mon Sep 17 00:00:00 2001 From: ccwss <1782935682@qq.com> Date: Fri, 16 Jan 2026 16:14:59 +0800 Subject: [PATCH] Fix AsyncLearner startup logic and disable master election --- .../broker/controller/ReplicasManager.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java index 93d48de1dd9..0ba37c8055f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java @@ -52,6 +52,7 @@ import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader; import org.apache.rocketmq.store.config.BrokerRole; +import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService; import org.apache.rocketmq.store.ha.autoswitch.BrokerMetadata; import org.apache.rocketmq.store.ha.autoswitch.TempBrokerMetadata; @@ -75,6 +76,7 @@ public class ReplicasManager { private final BrokerController brokerController; private final AutoSwitchHAService haService; private final BrokerConfig brokerConfig; + private final MessageStoreConfig messageStoreConfig; private final String brokerAddress; private final BrokerOuterAPI brokerOuterAPI; private List controllerAddresses; @@ -112,6 +114,7 @@ public ReplicasManager(final BrokerController brokerController) { new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("ReplicasManager_scan_thread_", brokerController.getBrokerIdentity())); this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService(); this.brokerConfig = brokerController.getBrokerConfig(); + this.messageStoreConfig = brokerController.getMessageStoreConfig(); this.availableControllerAddresses = new ConcurrentHashMap<>(); this.syncStateSet = new HashSet<>(); this.brokerAddress = brokerController.getBrokerAddr(); @@ -373,6 +376,32 @@ private void handleSlaveSynchronize(final BrokerRole role) { } private boolean brokerElect() { + if (this.messageStoreConfig.isAsyncLearner()) { + LOGGER.info("Current broker is AsyncLearner, skip election and try to fetch master metadata."); + try { + Pair result = this.brokerOuterAPI.getReplicaInfo( + this.controllerLeaderAddress, + this.brokerConfig.getBrokerName() + ); + + GetReplicaInfoResponseHeader info = result.getObject1(); + Long masterId = info.getMasterBrokerId(); + String masterAddr = info.getMasterAddress(); + + if (masterId != null && StringUtils.isNotEmpty(masterAddr)) { + LOGGER.info("AsyncLearner found master [Id:{}, Addr:{}] from controller.", masterId, masterAddr); + changeToSlave(masterAddr, info.getMasterEpoch(), masterId); + return true; + } else { + LOGGER.warn("AsyncLearner failed to find a master. Retrying..."); + return false; + } + } catch (Exception e) { + LOGGER.error("AsyncLearner failed to fetch replica info", e); + return false; + } + } + // Broker try to elect itself as a master in broker set. try { Pair> tryElectResponsePair = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(),