Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.ha.autoswitch.BrokerMetadata;
import org.apache.rocketmq.store.ha.autoswitch.TempBrokerMetadata;
Expand All @@ -75,6 +76,7 @@ public class ReplicasManager {
private final BrokerController brokerController;
private final AutoSwitchHAService haService;
private final BrokerConfig brokerConfig;
private final MessageStoreConfig messageStoreConfig;
private final String brokerAddress;
private final BrokerOuterAPI brokerOuterAPI;
private List<String> controllerAddresses;
Expand Down Expand Up @@ -112,6 +114,7 @@ public ReplicasManager(final BrokerController brokerController) {
new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("ReplicasManager_scan_thread_", brokerController.getBrokerIdentity()));
this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService();
this.brokerConfig = brokerController.getBrokerConfig();
this.messageStoreConfig = brokerController.getMessageStoreConfig();
this.availableControllerAddresses = new ConcurrentHashMap<>();
this.syncStateSet = new HashSet<>();
this.brokerAddress = brokerController.getBrokerAddr();
Expand Down Expand Up @@ -373,6 +376,32 @@ private void handleSlaveSynchronize(final BrokerRole role) {
}

private boolean brokerElect() {
if (this.messageStoreConfig.isAsyncLearner()) {
LOGGER.info("Current broker is AsyncLearner, skip election and try to fetch master metadata.");
try {
Pair<GetReplicaInfoResponseHeader, SyncStateSet> result = this.brokerOuterAPI.getReplicaInfo(
this.controllerLeaderAddress,
this.brokerConfig.getBrokerName()
);

GetReplicaInfoResponseHeader info = result.getObject1();
Long masterId = info.getMasterBrokerId();
String masterAddr = info.getMasterAddress();

if (masterId != null && StringUtils.isNotEmpty(masterAddr)) {
LOGGER.info("AsyncLearner found master [Id:{}, Addr:{}] from controller.", masterId, masterAddr);
changeToSlave(masterAddr, info.getMasterEpoch(), masterId);
return true;
} else {
LOGGER.warn("AsyncLearner failed to find a master. Retrying...");
return false;
}
} catch (Exception e) {
LOGGER.error("AsyncLearner failed to fetch replica info", e);
return false;
}
}

// Broker try to elect itself as a master in broker set.
try {
Pair<ElectMasterResponseHeader, Set<Long>> tryElectResponsePair = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(),
Expand Down
Loading