diff --git a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java index dd37f42b2c5..84dcccaf750 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java @@ -185,6 +185,11 @@ public CompletableFuture asyncPutMessage(MessageExtBrokerInner messageExt.setWaitStoreMsgOK(false); final TopicPublishInfo topicPublishInfo = this.brokerController.getTopicRouteInfoManager().tryToFindTopicPublishInfo(messageExt.getTopic()); + if (null == topicPublishInfo || !topicPublishInfo.ok()) { + LOG.warn("asyncPutMessage: no route info of topic {} when escaping message, msgId={}", + messageExt.getTopic(), messageExt.getMsgId()); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true)); + } final String producerGroup = getProducerGroup(messageExt); final MessageQueue mqSelected = topicPublishInfo.selectOneMessageQueue(); @@ -250,6 +255,11 @@ public CompletableFuture asyncRemotePutMessageToSpecificQueue( messageExt.setWaitStoreMsgOK(false); final TopicPublishInfo topicPublishInfo = this.brokerController.getTopicRouteInfoManager().tryToFindTopicPublishInfo(messageExt.getTopic()); + if (null == topicPublishInfo || !topicPublishInfo.ok()) { + LOG.warn("asyncRemotePutMessageToSpecificQueue: no route info of topic {} when escaping message, msgId={}", + messageExt.getTopic(), messageExt.getMsgId()); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true)); + } List mqs = topicPublishInfo.getMessageQueueList(); if (null == mqs || mqs.isEmpty()) {