From b7d3fe1fb6dada24df2fe1a67fc08a82c086f112 Mon Sep 17 00:00:00 2001 From: yx9o Date: Wed, 8 Apr 2026 00:02:02 +0800 Subject: [PATCH] [ISSUE #10247] Remove duplicate remove call in InvocationChannel --- .../service/channel/InvocationChannel.java | 1 - .../channel/InvocationChannelTest.java | 67 +++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 proxy/src/test/java/org/apache/rocketmq/proxy/service/channel/InvocationChannelTest.java diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/InvocationChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/InvocationChannel.java index 00e8cea99c9..bbaaddd293e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/InvocationChannel.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/InvocationChannel.java @@ -41,7 +41,6 @@ public ChannelFuture writeAndFlush(Object msg) { if (null != context) { context.handle(responseCommand); } - inFlightRequestMap.remove(responseCommand.getOpaque()); } return super.writeAndFlush(msg); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/channel/InvocationChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/channel/InvocationChannelTest.java new file mode 100644 index 00000000000..ddede4fbc86 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/channel/InvocationChannelTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.channel; + +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class InvocationChannelTest { + + @Test + public void testWriteAndFlushShouldNotRemoveReRegisteredContext() { + InvocationChannel channel = new InvocationChannel("127.0.0.1:8080", "127.0.0.1:8081"); + AtomicBoolean nextContextHandled = new AtomicBoolean(false); + + channel.registerInvocationContext(1, new InvocationContextInterface() { + @Override + public void handle(RemotingCommand remotingCommand) { + channel.registerInvocationContext(remotingCommand.getOpaque(), new InvocationContextInterface() { + @Override + public void handle(RemotingCommand nextRemotingCommand) { + nextContextHandled.set(true); + } + + @Override + public boolean expired(long expiredTimeSec) { + return false; + } + }); + } + + @Override + public boolean expired(long expiredTimeSec) { + return false; + } + }); + + RemotingCommand response = RemotingCommand.createResponseCommand(0, "OK"); + response.setOpaque(1); + + channel.writeAndFlush(response); + assertTrue(channel.isWritable()); + + channel.writeAndFlush(response); + assertTrue(nextContextHandled.get()); + assertFalse(channel.isWritable()); + } +}