diff --git a/docs/cn/rdma.md b/docs/cn/rdma.md index fd70686bb1..fa3a8650ab 100644 --- a/docs/cn/rdma.md +++ b/docs/cn/rdma.md @@ -88,3 +88,4 @@ RDMA支持事件驱动和轮询两种模式,默认是事件驱动模式,通 * rdma_poller_yield: 轮询模式下的poller是否主动放弃CPU,默认是false。 * event_dispatcher_edisp_unsched: 全局开关,控制EventDispatcher是否不可被调度(true时不可调度),默认是false。 * rdma_disable_bthread: 禁用bthread,默认是false。 +* rdma_extend: 是否允许通信双方协商rdma的高级特性(譬如mtu),默认是false。 diff --git a/docs/en/rdma.md b/docs/en/rdma.md index 98ac6981bc..9bd24028c8 100644 --- a/docs/en/rdma.md +++ b/docs/en/rdma.md @@ -86,3 +86,4 @@ Configurable parameters: * rdma_poller_yield: Whether pollers in polling mode voluntarily relinquish the CPU, default is false. * event_dispatcher_edisp_unsched: Global switch for EventDispatcher scheduling (true means unschedulable), default is false. * rdma_disable_bthread: Disables bthread, default is false. +* rdma_extend: server and client can negotiate the advance feature of rdma(such as mtu), default is false. diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index c69bf8ec07..3bfcf06e88 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -67,6 +67,7 @@ DEFINE_int32(rdma_poller_num, 1, "Poller number in RDMA polling mode."); DEFINE_bool(rdma_poller_yield, false, "Yield thread in RDMA polling mode."); DEFINE_bool(rdma_disable_bthread, false, "Disable bthread in RDMA"); DEFINE_bool(rdma_ece, false, "Open ece in RDMA, should use this feature when rdma nics are from the same merchant."); +DEFINE_bool(rdma_extend, false, "Use the extend fields to negotiate the advance feature of rdma, such as mtu."); static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent @@ -81,14 +82,16 @@ static const size_t RESERVED_WR_NUM = 3; // block size (4B) // sq size (2B) // rq size (2B) +// lid size (2B) // GID (16B) // QP number (4B) +// mtu type (2B) static const char* MAGIC_STR = "RDMA"; static const size_t MAGIC_STR_LEN = 4; static const size_t HELLO_MSG_LEN_MIN = 40; // static const size_t HELLO_MSG_LEN_MAX = 4096; static const size_t ACK_MSG_LEN = 4; -static uint16_t g_rdma_hello_msg_len = 40; // In Byte +static uint16_t g_rdma_hello_msg_len = 42; // In Byte static uint16_t g_rdma_hello_version = 2; static uint16_t g_rdma_impl_version = 1; static uint32_t g_rdma_recv_block_size = 0; @@ -105,10 +108,16 @@ static const uint32_t ACK_MSG_RDMA_OK = 0x1; static butil::Mutex* g_rdma_resource_mutex = NULL; static RdmaResource* g_rdma_resource_list = NULL; +// The HelloMessage should have all base fields, and the new versions of HelloMessage +// maybe have some extern fields. + struct HelloMessage { - void Serialize(void* data) const; - void Deserialize(void* data); + void BaseSerialize(void* data) const; + void ExtSerialize(void* data) const; + void BaseDeserialize(void* data); + uint16_t ExtDeserialize(void* data, uint16_t ext_len); + // base fields uint16_t msg_len; uint16_t hello_ver; uint16_t impl_ver; @@ -118,9 +127,12 @@ struct HelloMessage { uint16_t lid; ibv_gid gid; uint32_t qp_num; + + // extern fields + uint16_t mtu_type; }; -void HelloMessage::Serialize(void* data) const { +void HelloMessage::BaseSerialize(void* data) const { uint16_t* current_pos = (uint16_t*)data; *(current_pos++) = butil::HostToNet16(msg_len); *(current_pos++) = butil::HostToNet16(hello_ver); @@ -132,11 +144,17 @@ void HelloMessage::Serialize(void* data) const { *(current_pos++) = butil::HostToNet16(rq_size); *(current_pos++) = butil::HostToNet16(lid); memcpy(current_pos, gid.raw, 16); - uint32_t* qp_num_pos = (uint32_t*)((char*)current_pos + 16); + current_pos += 8; + uint32_t* qp_num_pos = (uint32_t*)(current_pos); *qp_num_pos = butil::HostToNet32(qp_num); } -void HelloMessage::Deserialize(void* data) { +void HelloMessage::ExtSerialize(void* data) const { + uint16_t* current_pos = (uint16_t*)data; + *(current_pos) = butil::HostToNet16(mtu_type); +} + +void HelloMessage::BaseDeserialize(void* data) { uint16_t* current_pos = (uint16_t*)data; msg_len = butil::NetToHost16(*current_pos++); hello_ver = butil::NetToHost16(*current_pos++); @@ -147,7 +165,26 @@ void HelloMessage::Deserialize(void* data) { rq_size = butil::NetToHost16(*current_pos++); lid = butil::NetToHost16(*current_pos++); memcpy(gid.raw, current_pos, 16); - qp_num = butil::NetToHost32(*(uint32_t*)((char*)current_pos + 16)); + current_pos += 8; + qp_num = butil::NetToHost32(*(uint32_t*)(current_pos)); +} + +uint16_t HelloMessage::ExtDeserialize(void* data, uint16_t ext_len) { + if (ext_len == 0) { + return 0; + } + + uint16_t remain_ext_len = ext_len; + + // try to deserialize mtu_type + if (remain_ext_len < 2) { + LOG(FATAL) << "illegal HelloMessage, remain ext len is " << remain_ext_len << ", should not be less than 2!!!"; + } + uint16_t* current_pos = (uint16_t*)data; + mtu_type = butil::NetToHost16(*current_pos++); + remain_ext_len -= 2; + + return remain_ext_len; } RdmaResource::~RdmaResource() { @@ -435,6 +472,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { << "Start handshake on " << s->_local_side; uint8_t data[g_rdma_hello_msg_len]; + uint16_t local_mtu_type = GetLocalMtuType(); // First initialize CQ and QP resources ep->_state = C_ALLOC_QPCQ; @@ -463,9 +501,15 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { // Only happens in UT local_msg.qp_num = 0; } + local_msg.mtu_type = local_mtu_type; memcpy(data, MAGIC_STR, 4); - local_msg.Serialize((char*)data + 4); - if (ep->WriteToFd(data, g_rdma_hello_msg_len) < 0) { + local_msg.BaseSerialize((char*)data + 4); + // If FLAGS_rdma_extend is not open, only send base fields of HelloMessage + if (FLAGS_rdma_extend) { + local_msg.ExtSerialize((char*)data + HELLO_MSG_LEN_MIN); + } + size_t msg_len = FLAGS_rdma_extend ? g_rdma_hello_msg_len : HELLO_MSG_LEN_MIN; + if (ep->WriteToFd(data, msg_len) < 0) { const int saved_errno = errno; PLOG(WARNING) << "Fail to send hello message to server:" << s->description(); s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s", @@ -502,7 +546,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { return NULL; } HelloMessage remote_msg; - remote_msg.Deserialize(data); + remote_msg.BaseDeserialize(data); if (remote_msg.msg_len < HELLO_MSG_LEN_MIN) { LOG(WARNING) << "Fail to parse Hello Message length from server:" << s->description(); @@ -512,9 +556,27 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { return NULL; } - if (remote_msg.msg_len > HELLO_MSG_LEN_MIN) { - // TODO: Read Hello Message customized data - // Just for future use, should not happen now + // In older versions of brpc, IBV_MTU_1024 is the default mtu type, + // So we set remote_mtu IBV_MTU_1024 at default to be compatible with older versions. + uint16_t remote_mtu_type = IBV_MTU_1024; + if (FLAGS_rdma_extend && remote_msg.msg_len > HELLO_MSG_LEN_MIN) { + // Read Hello Message customized data + uint16_t remote_msg_ext_len = remote_msg.msg_len - HELLO_MSG_LEN_MIN; + uint8_t ext_data[remote_msg_ext_len]; + if (ep->ReadFromFd(ext_data, remote_msg_ext_len) < 0) { + const int saved_errno = errno; + PLOG(WARNING) << "Fail to get Hello Message ext fields from server:" << s->description(); + s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s", + s->description().c_str(), berror(saved_errno)); + ep->_state = FAILED; + return NULL; + } + remote_msg.ExtDeserialize(ext_data, remote_msg_ext_len); + if (remote_msg_ext_len >= 2) { + // mtu_type field is valid + remote_mtu_type = remote_msg.mtu_type; + } + // TODO: other extern fields } if (!HelloNegotiationValid(remote_msg)) { @@ -534,7 +596,9 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { ep->_local_window_capacity, butil::memory_order_relaxed); ep->_state = C_BRINGUP_QP; - if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) < 0) { + // use the minimum of local mtu type and remote mtu type + uint16_t min_mtu_type = std::min(local_mtu_type, remote_mtu_type); + if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num, min_mtu_type) < 0) { LOG(WARNING) << "Fail to bringup QP, fallback to tcp:" << s->description(); rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF; } else { @@ -582,6 +646,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { << "Start handshake on " << s->description(); uint8_t data[g_rdma_hello_msg_len]; + uint16_t local_mtu_type = GetLocalMtuType(); ep->_state = S_HELLO_WAIT; if (ep->ReadFromFd(data, MAGIC_STR_LEN) < 0) { @@ -605,7 +670,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { return NULL; } - if (ep->ReadFromFd(data, g_rdma_hello_msg_len - MAGIC_STR_LEN) < 0) { + if (ep->ReadFromFd(data, HELLO_MSG_LEN_MIN - MAGIC_STR_LEN) < 0) { const int saved_errno = errno; PLOG(WARNING) << "Fail to read Hello Message from client:" << s->description(); s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s", @@ -615,7 +680,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { } HelloMessage remote_msg; - remote_msg.Deserialize(data); + remote_msg.BaseDeserialize(data); if (remote_msg.msg_len < HELLO_MSG_LEN_MIN) { LOG(WARNING) << "Fail to parse Hello Message length from client:" << s->description(); @@ -624,9 +689,28 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { ep->_state = FAILED; return NULL; } - if (remote_msg.msg_len > HELLO_MSG_LEN_MIN) { - // TODO: Read Hello Message customized header - // Just for future use, should not happen now + + // In older versions of brpc, IBV_MTU_1024 is the default mtu type, + // So we set remote_mtu IBV_MTU_1024 at default to be compatible with older versions. + uint16_t remote_mtu_type = IBV_MTU_1024; + if (FLAGS_rdma_extend && remote_msg.msg_len > HELLO_MSG_LEN_MIN) { + // Read Hello Message customized data + uint16_t remote_msg_ext_len = remote_msg.msg_len - HELLO_MSG_LEN_MIN; + uint8_t ext_data[remote_msg_ext_len]; + if (ep->ReadFromFd(ext_data, remote_msg_ext_len) < 0) { + const int saved_errno = errno; + PLOG(WARNING) << "Fail to get Hello Message ext fields from client:" << s->description(); + s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s", + s->description().c_str(), berror(saved_errno)); + ep->_state = FAILED; + return NULL; + } + remote_msg.ExtDeserialize(ext_data, remote_msg_ext_len); + if (remote_msg_ext_len >= 2) { + // mtu_type field is valid + remote_mtu_type = remote_msg.mtu_type; + } + // TODO: other extern fields } if (!HelloNegotiationValid(remote_msg)) { @@ -652,7 +736,9 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF; } else { ep->_state = S_BRINGUP_QP; - if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) < 0) { + // use the minimum of local mtu type and remote mtu type + uint16_t min_mtu_type = std::min(local_mtu_type, remote_mtu_type); + if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num, min_mtu_type) < 0) { LOG(WARNING) << "Fail to bringup QP, fallback to tcp:" << s->description(); rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF; @@ -681,10 +767,16 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { // Only happens in UT local_msg.qp_num = 0; } + local_msg.mtu_type = local_mtu_type; } memcpy(data, MAGIC_STR, 4); - local_msg.Serialize((char*)data + 4); - if (ep->WriteToFd(data, g_rdma_hello_msg_len) < 0) { + local_msg.BaseSerialize((char*)data + 4); + // If FLAGS_rdma_extend is not open, only send base fields of HelloMessage + if (FLAGS_rdma_extend) { + local_msg.ExtSerialize((char*)data + HELLO_MSG_LEN_MIN); + } + size_t msg_len = FLAGS_rdma_extend ? g_rdma_hello_msg_len : HELLO_MSG_LEN_MIN; + if (ep->WriteToFd(data, msg_len) < 0) { const int saved_errno = errno; PLOG(WARNING) << "Fail to send Hello Message to client:" << s->description(); s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s", @@ -1232,12 +1324,27 @@ int RdmaEndpoint::AllocateResources() { return 0; } -int RdmaEndpoint::BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num) { +int RdmaEndpoint::BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num, uint16_t mtu_type) { if (BAIDU_UNLIKELY(g_skip_rdma_init)) { // For UT return 0; } + if (mtu_type == IBV_MTU_256) { + LOG(INFO) << "negotiated mtu is 256"; + } else if (mtu_type == IBV_MTU_512) { + LOG(INFO) << "negotiated mtu is 512"; + } else if (mtu_type == IBV_MTU_1024) { + LOG(INFO) << "negotiated mtu is 1024"; + } else if (mtu_type == IBV_MTU_2048) { + LOG(INFO) << "negotiated mtu is 2048"; + } else if (mtu_type == IBV_MTU_4096) { + LOG(INFO) << "negotiated mtu is 4096"; + } else { + LOG(ERROR) << "unknown mtu " << mtu_type; + return -1; + } + ibv_qp_attr attr; attr.qp_state = IBV_QPS_INIT; @@ -1275,7 +1382,7 @@ int RdmaEndpoint::BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num) { } attr.qp_state = IBV_QPS_RTR; - attr.path_mtu = IBV_MTU_1024; // TODO: support more mtu in future + attr.path_mtu = ibv_mtu(mtu_type); attr.ah_attr.grh.dgid = gid; attr.ah_attr.grh.flow_label = 0; attr.ah_attr.grh.sgid_index = GetRdmaGidIndex(); diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index 54a008f1f7..82c2911fa5 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -193,10 +193,11 @@ friend class Socket; // lid: remote LID // gid: remote GID // qp_num: remote QP number + // mtu_type: the minimum of local mtu_type and remote mtu_type // Return: // 0: success // -1: failed, errno set - int BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num); + int BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num, uint16_t mtu_type); // Get event from comp channel and ack the events int GetAndAckEvents(SocketUniquePtr& s); diff --git a/src/brpc/rdma/rdma_helper.cpp b/src/brpc/rdma/rdma_helper.cpp index 768bf615e2..97afe604ff 100644 --- a/src/brpc/rdma/rdma_helper.cpp +++ b/src/brpc/rdma/rdma_helper.cpp @@ -90,6 +90,8 @@ static int g_comp_vector_index = 0; butil::atomic g_rdma_available(false); +static uint16_t local_mtu_type = IBV_MTU_4096; + DEFINE_int32(rdma_max_sge, 0, "Max SGE num in a WR"); DEFINE_string(rdma_device, "", "The name of the HCA device used " "(Empty means using the first active device)"); @@ -455,6 +457,36 @@ static ibv_context* OpenDevice(int num_total, int* num_available_devices) { return ret_context; } +static uint16_t detect_mtu(struct ibv_context* ctx, int port_num) { + struct ibv_port_attr port_attr; + + if (IbvQueryPort(ctx, port_num, &port_attr)) { + LOG(ERROR) << "IbvQueryPort failed"; + return 0; + } + + LOG(INFO) << "local active mtu type:" << port_attr.active_mtu + << ", max mtu type:" << port_attr.max_mtu; + + uint16_t mtu_type = port_attr.active_mtu; + if (mtu_type == IBV_MTU_256) { + LOG(INFO) << "local mtu is 256"; + } else if (mtu_type == IBV_MTU_512) { + LOG(INFO) << "local mtu is 512"; + } else if (mtu_type == IBV_MTU_1024) { + LOG(INFO) << "local mtu is 1024"; + } else if (mtu_type == IBV_MTU_2048) { + LOG(INFO) << "local mtu is 2048"; + } else if (mtu_type == IBV_MTU_4096) { + LOG(INFO) << "local mtu is 4096"; + } else { + LOG(ERROR) << "unknown mtu type " << mtu_type; + return 0; + } + + return mtu_type; +} + static void GlobalRdmaInitializeOrDieImpl() { if (BAIDU_UNLIKELY(g_skip_rdma_init)) { // Just for UT @@ -549,6 +581,11 @@ static void GlobalRdmaInitializeOrDieImpl() { g_max_sge = attr.max_sge; } + local_mtu_type = detect_mtu(g_context, g_port_num); + if (!local_mtu_type) { + PLOG(ERROR) << "Fail to get local mtu type"; + ExitWithError(); + } // Initialize RDMA memory pool (block_pool) if (!InitBlockPool(RdmaRegisterMemory)) { PLOG(ERROR) << "Fail to initialize RDMA memory pool"; @@ -701,6 +738,10 @@ bool SupportedByRdma(std::string protocol) { return false; } +uint16_t GetLocalMtuType() { + return local_mtu_type; +} + bool InitPollingModeWithTag(bthread_tag_t tag, std::function callback, std::function init_fn, diff --git a/src/brpc/rdma/rdma_helper.h b/src/brpc/rdma/rdma_helper.h index 052763325b..8f0f472e4b 100644 --- a/src/brpc/rdma/rdma_helper.h +++ b/src/brpc/rdma/rdma_helper.h @@ -89,6 +89,7 @@ void GlobalDisableRdma(); // If the given protocol supported by RDMA bool SupportedByRdma(std::string protocol); +uint16_t GetLocalMtuType(); } // namespace rdma } // namespace brpc #else diff --git a/test/brpc_rdma_unittest.cpp b/test/brpc_rdma_unittest.cpp index ccb280f1c8..c1246fdb68 100644 --- a/test/brpc_rdma_unittest.cpp +++ b/test/brpc_rdma_unittest.cpp @@ -42,7 +42,7 @@ #include "echo.pb.h" static const int PORT = 8713; -static const size_t RDMA_HELLO_MSG_LEN = 40; +static const size_t RDMA_HELLO_MSG_BASE_LEN = 40; static uint16_t RDMA_HELLO_VERSION = 2; static uint16_t RDMA_IMPL_VERSION = 1; @@ -57,9 +57,12 @@ DEFINE_bool(rdma_test_enable, false, "Enable tests requring rdma runtime."); namespace rdma { struct HelloMessage { - void Serialize(void* data) const; - void Deserialize(void* data); + void BaseSerialize(void* data) const; + void ExtSerialize(void* data) const; + void BaseDeserialize(void* data); + uint16_t ExtDeserialize(void* data, uint16_t ext_len); + // base fields uint16_t msg_len; uint16_t hello_ver; uint16_t impl_ver; @@ -69,6 +72,9 @@ struct HelloMessage { uint16_t lid; ibv_gid gid; uint32_t qp_num; + + // extern fields + uint16_t mtu_type; }; DECLARE_bool(rdma_trace_verbose); @@ -207,7 +213,7 @@ TEST_F(RdmaTest, client_hello_msg_invalid_magic_str) { Socket* s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); - uint8_t data[RDMA_HELLO_MSG_LEN]; + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; memcpy(data, "PRPC", 4); // send as normal baidu_std protocol ASSERT_EQ(4, write(sockfd, data, 4)); usleep(100000); // wait for server to handle the msg @@ -280,7 +286,7 @@ TEST_F(RdmaTest, client_hello_msg_invalid_len) { addr.sin_family = AF_INET; addr.sin_port = htons(PORT); Socket* s = NULL; - uint8_t data[RDMA_HELLO_MSG_LEN]; + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd1 >= 0); @@ -325,8 +331,8 @@ TEST_F(RdmaTest, client_hello_msg_invalid_version) { addr.sin_family = AF_INET; addr.sin_port = htons(PORT); Socket* s = NULL; - uint8_t data[RDMA_HELLO_MSG_LEN]; - uint16_t len = butil::HostToNet16(RDMA_HELLO_MSG_LEN); + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + uint16_t len = butil::HostToNet16(RDMA_HELLO_MSG_BASE_LEN); uint16_t ver = butil::HostToNet16(1); butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0)); @@ -391,8 +397,8 @@ TEST_F(RdmaTest, client_hello_msg_invalid_sq_rq_block_size) { Socket* s = NULL; uint32_t flags = butil::HostToNet32(0); rdma::HelloMessage msg{}; - uint8_t data[RDMA_HELLO_MSG_LEN]; - msg.msg_len = RDMA_HELLO_MSG_LEN; + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + msg.msg_len = RDMA_HELLO_MSG_BASE_LEN; msg.hello_ver = RDMA_HELLO_VERSION; msg.impl_ver = RDMA_IMPL_VERSION; @@ -400,7 +406,7 @@ TEST_F(RdmaTest, client_hello_msg_invalid_sq_rq_block_size) { msg.rq_size = 16; msg.block_size = 8192; memcpy(data, "RDMA", 4); - msg.Serialize(data + 4); + msg.BaseSerialize(data + 4); butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd1 >= 0); ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr))); @@ -425,7 +431,7 @@ TEST_F(RdmaTest, client_hello_msg_invalid_sq_rq_block_size) { msg.rq_size = 10; msg.block_size = 8192; memcpy(data, "RDMA", 4); - msg.Serialize(data + 4); + msg.BaseSerialize(data + 4); butil::fd_guard sockfd2(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd2 >= 0); ASSERT_EQ(0, connect(sockfd2, (sockaddr*)&addr, sizeof(sockaddr))); @@ -450,7 +456,7 @@ TEST_F(RdmaTest, client_hello_msg_invalid_sq_rq_block_size) { msg.rq_size = 16; msg.block_size = 1000; memcpy(data, "RDMA", 4); - msg.Serialize(data + 4); + msg.BaseSerialize(data + 4); butil::fd_guard sockfd3(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd3 >= 0); ASSERT_EQ(0, connect(sockfd3, (sockaddr*)&addr, sizeof(sockaddr))); @@ -483,8 +489,8 @@ TEST_F(RdmaTest, client_close_after_qp_build) { addr.sin_port = htons(PORT); Socket* s = NULL; rdma::HelloMessage msg; - uint8_t data[RDMA_HELLO_MSG_LEN]; - msg.msg_len = RDMA_HELLO_MSG_LEN; + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + msg.msg_len = RDMA_HELLO_MSG_BASE_LEN; msg.hello_ver = RDMA_HELLO_VERSION; msg.impl_ver = RDMA_IMPL_VERSION; msg.sq_size = 16; @@ -493,7 +499,7 @@ TEST_F(RdmaTest, client_close_after_qp_build) { msg.qp_num = 0; msg.gid = rdma::GetRdmaGid(); memcpy(data, "RDMA", 4); - msg.Serialize(data + 4); + msg.BaseSerialize(data + 4); butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd1 >= 0); @@ -520,8 +526,8 @@ TEST_F(RdmaTest, client_close_during_ack_send) { addr.sin_port = htons(PORT); Socket* s = NULL; rdma::HelloMessage msg; - uint8_t data[RDMA_HELLO_MSG_LEN]; - msg.msg_len = RDMA_HELLO_MSG_LEN; + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + msg.msg_len = RDMA_HELLO_MSG_BASE_LEN; msg.hello_ver = RDMA_HELLO_VERSION; msg.impl_ver = RDMA_IMPL_VERSION; msg.sq_size = 16; @@ -530,7 +536,7 @@ TEST_F(RdmaTest, client_close_during_ack_send) { msg.qp_num = 0; msg.gid = rdma::GetRdmaGid(); memcpy(data, "RDMA", 4); - msg.Serialize(data + 4); + msg.BaseSerialize(data + 4); butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd1 >= 0); @@ -564,8 +570,8 @@ TEST_F(RdmaTest, client_close_after_ack_send) { addr.sin_port = htons(PORT); Socket* s = NULL; rdma::HelloMessage msg; - uint8_t data[RDMA_HELLO_MSG_LEN]; - msg.msg_len = RDMA_HELLO_MSG_LEN; + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + msg.msg_len = RDMA_HELLO_MSG_BASE_LEN; msg.hello_ver = RDMA_HELLO_VERSION; msg.impl_ver = RDMA_IMPL_VERSION; msg.sq_size = 16; @@ -574,7 +580,7 @@ TEST_F(RdmaTest, client_close_after_ack_send) { msg.qp_num = 0; msg.gid = rdma::GetRdmaGid(); memcpy(data, "RDMA", 4); - msg.Serialize(data + 4); + msg.BaseSerialize(data + 4); butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd1 >= 0); @@ -629,8 +635,8 @@ TEST_F(RdmaTest, client_send_data_on_tcp_after_ack_send) { addr.sin_port = htons(PORT); Socket* s = NULL; rdma::HelloMessage msg; - uint8_t data[RDMA_HELLO_MSG_LEN]; - msg.msg_len = RDMA_HELLO_MSG_LEN; + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + msg.msg_len = RDMA_HELLO_MSG_BASE_LEN; msg.hello_ver = RDMA_HELLO_VERSION; msg.impl_ver = RDMA_IMPL_VERSION; msg.sq_size = 16; @@ -639,7 +645,7 @@ TEST_F(RdmaTest, client_send_data_on_tcp_after_ack_send) { msg.qp_num = 0; msg.gid = rdma::GetRdmaGid(); memcpy(data, "RDMA", 4); - msg.Serialize(data + 4); + msg.BaseSerialize(data + 4); butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd1 >= 0); @@ -741,8 +747,8 @@ TEST_F(RdmaTest, server_close_before_hello_send) { butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); - uint8_t data[RDMA_HELLO_MSG_LEN]; - ASSERT_EQ(RDMA_HELLO_MSG_LEN, read(acc_fd, data, RDMA_HELLO_MSG_LEN)); + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, read(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); close(acc_fd); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::FAILED, s->_rdma_ep->_state); @@ -777,8 +783,8 @@ TEST_F(RdmaTest, server_miss_during_magic_str) { butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); - uint8_t data[RDMA_HELLO_MSG_LEN]; - ASSERT_EQ(RDMA_HELLO_MSG_LEN, read(acc_fd, data, RDMA_HELLO_MSG_LEN)); + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, read(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); ASSERT_EQ(2, write(acc_fd, "RD", 2)); usleep(100000); bthread_id_join(cntl.call_id()); @@ -812,8 +818,8 @@ TEST_F(RdmaTest, server_close_during_magic_str) { butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); - uint8_t data[RDMA_HELLO_MSG_LEN]; - ASSERT_EQ(RDMA_HELLO_MSG_LEN, read(acc_fd, data, RDMA_HELLO_MSG_LEN)); + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, read(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); ASSERT_EQ(2, write(acc_fd, "RD", 2)); usleep(100000); close(acc_fd); @@ -850,8 +856,8 @@ TEST_F(RdmaTest, server_hello_invalid_magic_str) { butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); - uint8_t data[RDMA_HELLO_MSG_LEN]; - ASSERT_EQ(RDMA_HELLO_MSG_LEN, read(acc_fd, data, RDMA_HELLO_MSG_LEN)); + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, read(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); ASSERT_EQ(4, write(acc_fd, "ABCD", 4)); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::FAILED, s->_rdma_ep->_state); @@ -886,8 +892,8 @@ TEST_F(RdmaTest, server_miss_during_hello_msg) { butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); - uint8_t data[RDMA_HELLO_MSG_LEN]; - ASSERT_EQ(RDMA_HELLO_MSG_LEN, read(acc_fd, data, RDMA_HELLO_MSG_LEN)); + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, read(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); ASSERT_EQ(4, write(acc_fd, "RDMA", 4)); ASSERT_EQ(2, write(acc_fd, "00", 2)); bthread_id_join(cntl.call_id()); @@ -921,8 +927,8 @@ TEST_F(RdmaTest, server_close_during_hello_msg) { butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); - uint8_t data[RDMA_HELLO_MSG_LEN]; - ASSERT_EQ(RDMA_HELLO_MSG_LEN, read(acc_fd, data, RDMA_HELLO_MSG_LEN)); + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, read(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); ASSERT_EQ(4, write(acc_fd, "RDMA", 4)); ASSERT_EQ(2, write(acc_fd, "00", 2)); close(acc_fd); @@ -959,13 +965,13 @@ TEST_F(RdmaTest, server_hello_invalid_msg_len) { butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); - uint8_t data[RDMA_HELLO_MSG_LEN]; - ASSERT_EQ(RDMA_HELLO_MSG_LEN, read(acc_fd, data, RDMA_HELLO_MSG_LEN)); + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, read(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); memcpy(data, "RDMA", 4); uint16_t len = butil::HostToNet16(35); memcpy(data + 4, &len, 2); memset(data + 6, 0, 32); - ASSERT_EQ(RDMA_HELLO_MSG_LEN, write(acc_fd, data, RDMA_HELLO_MSG_LEN)); + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, write(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::FAILED, s->_rdma_ep->_state); bthread_id_join(cntl.call_id()); @@ -999,13 +1005,13 @@ TEST_F(RdmaTest, server_hello_invalid_version) { butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); - uint8_t data[RDMA_HELLO_MSG_LEN]; - ASSERT_EQ(RDMA_HELLO_MSG_LEN, read(acc_fd, data, RDMA_HELLO_MSG_LEN)); + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, read(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); memcpy(data, "RDMA", 4); - uint16_t len = butil::HostToNet16(RDMA_HELLO_MSG_LEN); + uint16_t len = butil::HostToNet16(RDMA_HELLO_MSG_BASE_LEN); memcpy(data + 4, &len, 2); memset(data + 6, 0, 32); - ASSERT_EQ(RDMA_HELLO_MSG_LEN, write(acc_fd, data, RDMA_HELLO_MSG_LEN)); + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, write(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); ASSERT_EQ(4, read(acc_fd, data, 4)); @@ -1042,11 +1048,11 @@ TEST_F(RdmaTest, server_hello_invalid_sq_rq_size) { butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); - uint8_t data[RDMA_HELLO_MSG_LEN]; - ASSERT_EQ(RDMA_HELLO_MSG_LEN, read(acc_fd, data, RDMA_HELLO_MSG_LEN)); + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, read(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); rdma::HelloMessage msg; - msg.msg_len = RDMA_HELLO_MSG_LEN; + msg.msg_len = RDMA_HELLO_MSG_BASE_LEN; msg.hello_ver = 1; msg.impl_ver = 1; msg.sq_size = 0; @@ -1055,8 +1061,8 @@ TEST_F(RdmaTest, server_hello_invalid_sq_rq_size) { msg.qp_num = 0; msg.gid = rdma::GetRdmaGid(); memcpy(data, "RDMA", 4); - msg.Serialize(data + 4); - ASSERT_EQ(RDMA_HELLO_MSG_LEN, write(acc_fd, data, RDMA_HELLO_MSG_LEN)); + msg.BaseSerialize(data + 4); + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, write(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); @@ -1094,11 +1100,11 @@ TEST_F(RdmaTest, server_miss_after_ack) { butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); - uint8_t data[RDMA_HELLO_MSG_LEN]; - ASSERT_EQ(RDMA_HELLO_MSG_LEN, read(acc_fd, data, RDMA_HELLO_MSG_LEN)); + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, read(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); rdma::HelloMessage msg; - msg.msg_len = RDMA_HELLO_MSG_LEN; + msg.msg_len = RDMA_HELLO_MSG_BASE_LEN; msg.hello_ver = RDMA_HELLO_VERSION; msg.impl_ver = RDMA_IMPL_VERSION; msg.sq_size = 16; @@ -1107,8 +1113,8 @@ TEST_F(RdmaTest, server_miss_after_ack) { msg.qp_num = 0; msg.gid = rdma::GetRdmaGid(); memcpy(data, "RDMA", 4); - msg.Serialize(data + 4); - ASSERT_EQ(RDMA_HELLO_MSG_LEN, write(acc_fd, data, RDMA_HELLO_MSG_LEN)); + msg.BaseSerialize(data + 4); + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, write(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::ESTABLISHED, s->_rdma_ep->_state); @@ -1146,11 +1152,11 @@ TEST_F(RdmaTest, server_close_after_ack) { butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); - uint8_t data[RDMA_HELLO_MSG_LEN]; - ASSERT_EQ(RDMA_HELLO_MSG_LEN, read(acc_fd, data, RDMA_HELLO_MSG_LEN)); + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, read(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); rdma::HelloMessage msg; - msg.msg_len = RDMA_HELLO_MSG_LEN; + msg.msg_len = RDMA_HELLO_MSG_BASE_LEN; msg.hello_ver = RDMA_HELLO_VERSION; msg.impl_ver = RDMA_IMPL_VERSION; msg.sq_size = 16; @@ -1159,8 +1165,8 @@ TEST_F(RdmaTest, server_close_after_ack) { msg.qp_num = 0; msg.gid = rdma::GetRdmaGid(); memcpy(data, "RDMA", 4); - msg.Serialize(data + 4); - ASSERT_EQ(RDMA_HELLO_MSG_LEN, write(acc_fd, data, RDMA_HELLO_MSG_LEN)); + msg.BaseSerialize(data + 4); + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, write(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::ESTABLISHED, s->_rdma_ep->_state); @@ -1199,11 +1205,11 @@ TEST_F(RdmaTest, server_send_data_on_tcp_after_ack) { butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); - uint8_t data[RDMA_HELLO_MSG_LEN]; - ASSERT_EQ(RDMA_HELLO_MSG_LEN, read(acc_fd, data, RDMA_HELLO_MSG_LEN)); + uint8_t data[RDMA_HELLO_MSG_BASE_LEN]; + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, read(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); rdma::HelloMessage msg; - msg.msg_len = RDMA_HELLO_MSG_LEN; + msg.msg_len = RDMA_HELLO_MSG_BASE_LEN; msg.hello_ver = RDMA_HELLO_VERSION; msg.impl_ver = RDMA_IMPL_VERSION; msg.sq_size = 16; @@ -1212,12 +1218,12 @@ TEST_F(RdmaTest, server_send_data_on_tcp_after_ack) { msg.qp_num = 0; msg.gid = rdma::GetRdmaGid(); memcpy(data, "RDMA", 4); - msg.Serialize(data + 4); - ASSERT_EQ(RDMA_HELLO_MSG_LEN, write(acc_fd, data, RDMA_HELLO_MSG_LEN)); + msg.BaseSerialize(data + 4); + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, write(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::ESTABLISHED, s->_rdma_ep->_state); - ASSERT_EQ(RDMA_HELLO_MSG_LEN, write(acc_fd, data, RDMA_HELLO_MSG_LEN)); + ASSERT_EQ(RDMA_HELLO_MSG_BASE_LEN, write(acc_fd, data, RDMA_HELLO_MSG_BASE_LEN)); bthread_id_join(cntl.call_id()); ASSERT_EQ(EPROTO, cntl.ErrorCode());