From d3f79605190988125bf7f3e5e535be2fd4bc5c57 Mon Sep 17 00:00:00 2001 From: yanyuan06 Date: Tue, 3 Mar 2026 17:41:07 +0800 Subject: [PATCH] Solve the issue of attachment being overwritten when backuprequest is triggered. --- src/brpc/selective_channel.cpp | 13 +++--- test/brpc_channel_unittest.cpp | 73 +++++++++++++++++++++++++++++++++- 2 files changed, 79 insertions(+), 7 deletions(-) diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index 567ffa51b8..a59580e321 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -364,12 +364,6 @@ void SubDone::Run() { << _cid.value << ": " << berror(rc); return; } - // NOTE: Copying gettable-but-settable fields which are generally set - // during the RPC to reflect details. - main_cntl->_remote_side = _cntl._remote_side; - // connection_type may be changed during CallMethod. - main_cntl->set_connection_type(_cntl.connection_type()); - main_cntl->response_attachment().swap(_cntl.response_attachment()); Resource r; r.response = _cntl._response; r.sub_done = this; @@ -377,6 +371,13 @@ void SubDone::Run() { return; } const int saved_error = main_cntl->ErrorCode(); + + // NOTE: Copying gettable-but-settable fields which are generally set + // during the RPC to reflect details. + main_cntl->_remote_side = _cntl._remote_side; + // connection_type may be changed during CallMethod. + main_cntl->set_connection_type(_cntl.connection_type()); + main_cntl->response_attachment().swap(_cntl.response_attachment()); if (_cntl.Failed()) { if (_cntl.ErrorCode() == ENODATA || _cntl.ErrorCode() == EHOSTDOWN) { diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index f96650211e..ad6670443a 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -176,6 +176,7 @@ class MyEchoService : public ::test::EchoService { res->add_code_list(req->code()); } res->set_receiving_socket_id(cntl->_current_call.sending_sock->id()); + if (mockfunc_) mockfunc_(cntl_base, req, res, done); brpc::ProtocolType protocol = cntl->request_protocol(); if ((brpc::PROTOCOL_HTTP == protocol || brpc::PROTOCOL_H2 == protocol) && @@ -198,6 +199,17 @@ class MyEchoService : public ::test::EchoService { EXPECT_TRUE(nullptr != request); EXPECT_TRUE(nullptr != response); } + +public: + using MockFuncType = void(google::protobuf::RpcController*, + const ::test::EchoRequest*, ::test::EchoResponse*, + google::protobuf::Closure*); + void SetMockFunc(std::function&& mockfunc) { + mockfunc_ = std::move(mockfunc); + } + +private: + std::function mockfunc_; }; pthread_once_t register_mock_protocol = PTHREAD_ONCE_INIT; @@ -1408,7 +1420,7 @@ class ChannelTest : public ::testing::Test{ SetUpChannel(subchan, single_server, short_connection); ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i; } - + brpc::Controller cntl; test::EchoRequest req; test::EchoResponse res; @@ -1427,6 +1439,55 @@ class ChannelTest : public ::testing::Test{ EXPECT_EQ(17, cntl.sub(0)->_real_timeout_ms); StopAndJoin(); } + + void TestBackupRequestSelective( + bool single_server, bool async, bool short_connection) { + std::cout << " *** single=" << single_server + << " async=" << async + << " short=" << short_connection << std::endl; + ASSERT_EQ(0, StartAccept(_ep)); + + const size_t NCHANS = 8; + brpc::SelectiveChannel channel; + ASSERT_EQ(0, channel.Init("rr", NULL)); + for (size_t i = 0; i < NCHANS; ++i) { + brpc::Channel* subchan = new brpc::Channel; + SetUpChannel(subchan, single_server, short_connection); + ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i; + } + + brpc::Controller cntl; + test::EchoRequest req; + test::EchoResponse res; + req.set_message(__FUNCTION__); + cntl.set_backup_request_ms(20); + cntl.set_timeout_ms(100); + std::atomic call_cnt(0); + _svc.SetMockFunc([&call_cnt](google::protobuf::RpcController* cntl_base, + const ::test::EchoRequest*, + ::test::EchoResponse*, + google::protobuf::Closure*) { + brpc::Controller* cntl = static_cast(cntl_base); + int see_cnt = call_cnt.fetch_add(1, std::memory_order_relaxed); + if (see_cnt == 0) { + LOG(INFO) << "slow node"; + bthread_usleep(30 * 1000); + } else { + LOG(INFO) << "normal node "; + butil::IOBuf iobuf; + iobuf.append("123"); + cntl->response_attachment().swap(iobuf); + } + }); + butil::Timer tm; + tm.start(); + CallMethod(&channel, &cntl, &req, &res, async); + tm.stop(); + EXPECT_FALSE(cntl.Failed()); + EXPECT_EQ(call_cnt.load(std::memory_order_relaxed), 2); + EXPECT_EQ(cntl.response_attachment().to_string(), "123"); + StopAndJoin(); + } void TestCloseFD(bool single_server, bool async, bool short_connection) { std::cout << " *** single=" << single_server @@ -2713,6 +2774,16 @@ TEST_F(ChannelTest, timeout_selective) { } } +TEST_F(ChannelTest, backuprequest_selective) { + for (int i = 0; i <= 1; ++i) { // Flag SingleServer + for (int j = 0; j <= 1; ++j) { // Flag Asynchronous + for (int k = 0; k <=1; ++k) { // Flag ShortConnection + TestBackupRequestSelective(i, j, k); + } + } + } +} + TEST_F(ChannelTest, close_fd) { for (int i = 0; i <= 1; ++i) { // Flag SingleServer for (int j = 0; j <= 1; ++j) { // Flag Asynchronous