diff --git a/docs/cn/io.md b/docs/cn/io.md index baadf27164..17c60d8ff5 100644 --- a/docs/cn/io.md +++ b/docs/cn/io.md @@ -14,6 +14,8 @@ linux一般使用non-blocking IO提高IO并发度。当IO并发度很低时,no 由于epoll的[一个bug](https://web.archive.org/web/20150423184820/https://patchwork.kernel.org/patch/1970231/)(开发brpc时仍有)及epoll_ctl较大的开销,EDISP使用Edge triggered模式。当收到事件时,EDISP给一个原子变量加1,只有当加1前的值是0时启动一个bthread处理对应fd上的数据。在背后,EDISP把所在的pthread让给了新建的bthread,使其有更好的cache locality,可以尽快地读取fd上的数据。而EDISP所在的bthread会被偷到另外一个pthread继续执行,这个过程即是bthread的work stealing调度。要准确理解那个原子变量的工作方式可以先阅读[atomic instructions](atomic_instructions.md),再看[Socket::StartInputEvent](https://github.com/apache/brpc/blob/master/src/brpc/socket.cpp)。这些方法使得brpc读取同一个fd时产生的竞争是[wait-free](http://en.wikipedia.org/wiki/Non-blocking_algorithm#Wait-freedom)的。 +在当前实现里,`Transport::ProcessEvent` 会按 `EventDispatcherUnsched()` 选择启动方式:返回 `false` 时走 `bthread_start_urgent`,返回 `true` 时走 `bthread_start_background`。此外,RDMA 在轮询模式与事件模式对 `last_msg` 的处理不同:`rdma_use_polling=false` 时不会在 `RdmaTransport::QueueMessage` 里处理 `last_msg`,轮询模式下会继续处理。 + [InputMessenger](https://github.com/apache/brpc/blob/master/src/brpc/input_messenger.h)负责从fd上切割和处理消息,它通过用户回调函数理解不同的格式。Parse一般是把消息从二进制流上切割下来,运行时间较固定;Process则是进一步解析消息(比如反序列化为protobuf)后调用用户回调,时间不确定。若一次从某个fd读取出n个消息(n > 1),InputMessenger会启动n-1个bthread分别处理前n-1个消息,最后一个消息则会在原地被Process。InputMessenger会逐一尝试多种协议,由于一个连接上往往只有一种消息格式,InputMessenger会记录下上次的选择,而避免每次都重复尝试。 可以看到,fd间和fd内的消息都会在brpc中获得并发,这使brpc非常擅长大消息的读取,在高负载时仍能及时处理不同来源的消息,减少长尾的存在。 diff --git a/docs/cn/rdma.md b/docs/cn/rdma.md index e775459893..6211f819d0 100644 --- a/docs/cn/rdma.md +++ b/docs/cn/rdma.md @@ -47,7 +47,27 @@ RDMA要求数据收发所使用的内存空间必须被注册(memory register RDMA是硬件相关的通信技术,有很多独特的概念,比如device、port、GID、LID、MaxSge等。这些参数在初始化时会从对应的网卡中读取出来,并且做出默认的选择(参见src/brpc/rdma/rdma_helper.cpp)。有时默认的选择并非用户的期望,则可以通过flag参数方式指定。 -RDMA支持事件驱动和轮询两种模式,默认是事件驱动模式,通过设置rdma_use_polling可以开启轮询模式。轮询模式下还可以设置轮询器数目(rdma_poller_num),以及是否主动放弃CPU(rdma_poller_yield)。轮询模式下还可以设置一个回调函数,在每次轮询时调用,可以配合io_uring/spdk等使用。在配合使用spdk等驱动的时候,因为spdk只支持轮询模式,并且只能在单线程使用(或者叫Run To Completion模式上使用)执行一个任务过程中不允许被调度到别的线程上,所以这时候需要设置(rdma_edisp_unsched)为true,使事件驱动程序一直占用一个worker线程,不能调度别的任务。 +RDMA支持事件驱动和轮询两种模式,默认是事件驱动模式,通过设置rdma_use_polling可以开启轮询模式。轮询模式下还可以设置轮询器数目(rdma_poller_num),以及是否主动放弃CPU(rdma_poller_yield)。轮询模式下还可以设置一个回调函数,在每次轮询时调用,可以配合io_uring/spdk等使用。 + +`event_dispatcher_edisp_unsched` 是全局开关,同时影响普通模式(TCP)和 RDMA 模式的 EventDispatcher 调度行为。 +它用于替代 `rdma_edisp_unsched`。当前保留 `rdma_edisp_unsched` 仅用于兼容历史命令行,未来版本会移除。两者语义一致:值为 `true` 时都表示 EventDispatcher 不可被调度。 + +历史说明:之前 RDMA 路径里出现过一次 `if` 判断 bug,导致行为和 flag 语义不一致;当前逻辑已修复,并按统一语义生效。 + +最终生效条件统一为: +`event_dispatcher_edisp_unsched || rdma_edisp_unsched` + +启动时不会再改写用户传入的 flag,运行时严格按用户配置值生效。 + +推荐使用方式: +1. 新部署:只配置 `event_dispatcher_edisp_unsched`。 +2. 存量部署:`rdma_edisp_unsched` 仅作过渡兼容,逐步迁移到 `event_dispatcher_edisp_unsched`。 +3. 避免脚本中给出“冲突值”;在统一 OR 语义下,只要任一 flag 为 `true`,EventDispatcher 就不可调度。 + +行为示例: +1. 仅设置 `-rdma_edisp_unsched=true`:`rdma_edisp_unsched=true`、`event_dispatcher_edisp_unsched=false`;TCP和RDMA均不可调度。 +2. 仅设置 `-event_dispatcher_edisp_unsched=true`:`rdma_edisp_unsched=false`、`event_dispatcher_edisp_unsched=true`;TCP和RDMA均不可调度。 +3. 同时设置 `-rdma_edisp_unsched=true -event_dispatcher_edisp_unsched=false`:`rdma_edisp_unsched=true`、`event_dispatcher_edisp_unsched=false`;TCP和RDMA均不可调度。 # 参数 @@ -73,5 +93,6 @@ RDMA支持事件驱动和轮询两种模式,默认是事件驱动模式,通 * rdma_use_polling: 是否使用RDMA的轮询模式,默认false。 * rdma_poller_num: 轮询模式下的poller数目,默认1。 * rdma_poller_yield: 轮询模式下的poller是否主动放弃CPU,默认是false。 -* rdma_edisp_unsched: 让事件驱动器不可以被调度,默认是false。 +* event_dispatcher_edisp_unsched: 全局开关,控制EventDispatcher是否不可被调度(true时不可调度),默认是false。 +* rdma_edisp_unsched: 废弃兼容参数(未来版本计划移除)。当前仍参与统一生效判断,默认是false。 * rdma_disable_bthread: 禁用bthread,默认是false。 diff --git a/docs/en/io.md b/docs/en/io.md index d048bcea5b..70ac85024f 100644 --- a/docs/en/io.md +++ b/docs/en/io.md @@ -14,6 +14,8 @@ A message is a bounded binary data read from a connection, which may be a reques Because of a [bug](https://web.archive.org/web/20150423184820/https://patchwork.kernel.org/patch/1970231/) of epoll (at the time of developing brpc) and overhead of epoll_ctl, edge triggered mode is used in EDISP. After receiving an event, an atomic variable associated with the fd is added by one atomically. If the variable is zero before addition, a bthread is started to handle the data from the fd. The pthread worker in which EDISP runs is yielded to the newly created bthread to make it start reading ASAP and have a better cache locality. The bthread in which EDISP runs will be stolen to another pthread and keep running, this mechanism is work stealing used in bthreads. To understand exactly how that atomic variable works, you can read [atomic instructions](atomic_instructions.md) first, then check [Socket::StartInputEvent](https://github.com/apache/brpc/blob/master/src/brpc/socket.cpp). These methods make contentions on dispatching events of one fd [wait-free](http://en.wikipedia.org/wiki/Non-blocking_algorithm#Wait-freedom). +In current implementation, `Transport::ProcessEvent` chooses start mode based on `EventDispatcherUnsched()`: `false` uses `bthread_start_urgent`, and `true` uses `bthread_start_background`. In addition, RDMA handles `last_msg` differently between polling and event modes: when `rdma_use_polling=false`, `RdmaTransport::QueueMessage` does not process `last_msg`; in polling mode it continues to process it. + [InputMessenger](https://github.com/apache/brpc/blob/master/src/brpc/input_messenger.h) cuts messages and uses customizable callbacks to handle different format of data. `Parse` callback cuts messages from binary data and has relatively stable running time; `Process` parses messages further(such as parsing by protobuf) and calls users' callbacks, which vary in running time. If n(n > 1) messages are read from the fd, InputMessenger launches n-1 bthreads to handle first n-1 messages respectively, and processes the last message in-place. InputMessenger tries protocols one by one. Since one connections often has only one type of messages, InputMessenger remembers current protocol to avoid trying for protocols next time. It can be seen that messages from different fds or even same fd are processed concurrently in brpc, which makes brpc good at handling large messages and reducing long tails on processing messages from different sources under high workloads. diff --git a/docs/en/rdma.md b/docs/en/rdma.md index 99f1ecd781..b36ac2033c 100644 --- a/docs/en/rdma.md +++ b/docs/en/rdma.md @@ -47,6 +47,26 @@ The application can manage memory by itself and send data with IOBuf::append_use RDMA is hardware-related. It has some different concepts such as device, port, GID, LID, MaxSge and so on. These parameters can be read from NICs at initialization, and brpc will make the default choice (see src/brpc/rdma/rdma_helper.cpp). Sometimes the default choice is not the expectation, then it can be changed in the flag way. +`event_dispatcher_edisp_unsched` is a global flag and affects EventDispatcher scheduling in both normal mode (TCP) and RDMA mode. +It replaces `rdma_edisp_unsched`. `rdma_edisp_unsched` is still kept only for command-line compatibility and is planned for removal in a future release. The two flags have the same semantics: `true` means EventDispatcher is unschedulable. + +Historical note: there was a previous `if`-condition bug on the RDMA path, where behavior did not match the flag semantics. The logic is now fixed and follows the unified semantics. + +The effective unsched condition is unified as: +`event_dispatcher_edisp_unsched || rdma_edisp_unsched` + +No startup synchronization rewrites user flags. Runtime behavior is determined directly from user-provided values. + +Recommended usage: +1. New deployment: set only `event_dispatcher_edisp_unsched`. +2. Existing deployment: keep `rdma_edisp_unsched` temporarily, but migrate to `event_dispatcher_edisp_unsched`. +3. Avoid conflicting values in scripts; with unified OR semantics, either flag being `true` makes EventDispatcher unschedulable. + +Examples: +1. Only `-rdma_edisp_unsched=true`: `rdma_edisp_unsched=true`, `event_dispatcher_edisp_unsched=false`; both TCP and RDMA are unschedulable. +2. Only `-event_dispatcher_edisp_unsched=true`: `rdma_edisp_unsched=false`, `event_dispatcher_edisp_unsched=true`; both TCP and RDMA are unschedulable. +3. Both `-rdma_edisp_unsched=true -event_dispatcher_edisp_unsched=false`: `rdma_edisp_unsched=true`, `event_dispatcher_edisp_unsched=false`; both TCP and RDMA are unschedulable. + # Parameters Configurable parameters: @@ -71,5 +91,6 @@ Configurable parameters: * rdma_use_polling: Whether to use RDMA polling mode, default is false. * rdma_poller_num: The number of pollers in polling mode, default is 1. * rdma_poller_yield: Whether pollers in polling mode voluntarily relinquish the CPU, default is false. -* rdma_edisp_unsched`: Prevents the event driver from being scheduled, default is false. +* event_dispatcher_edisp_unsched: Global switch for EventDispatcher scheduling (true means unschedulable), default is false. +* rdma_edisp_unsched: Deprecated compatibility flag (planned removal in a future release). It still participates in unified unsched condition, default is false. * rdma_disable_bthread: Disables bthread, default is false. diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp index bbd946846f..a4f253bcb6 100644 --- a/src/brpc/event_dispatcher.cpp +++ b/src/brpc/event_dispatcher.cpp @@ -30,6 +30,16 @@ DECLARE_int32(task_group_ntags); namespace brpc { DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher"); +DEFINE_bool(event_dispatcher_edisp_unsched, false, + "Disable event dispatcher schedule"); + +#if BRPC_WITH_RDMA +namespace rdma { +DEFINE_bool(rdma_edisp_unsched, false, + "Deprecated and will be removed in a future release, " + "use event_dispatcher_edisp_unsched instead"); +} // namespace rdma +#endif DEFINE_bool(usercode_in_pthread, false, "Call user's callback in pthreads, use bthreads otherwise"); @@ -41,6 +51,15 @@ static bvar::LatencyRecorder* g_edisp_read_lantency = NULL; static bvar::LatencyRecorder* g_edisp_write_lantency = NULL; static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT; +bool EventDispatcherUnsched() { +#if BRPC_WITH_RDMA + return FLAGS_event_dispatcher_edisp_unsched || + rdma::FLAGS_rdma_edisp_unsched; +#else + return FLAGS_event_dispatcher_edisp_unsched; +#endif +} + static void StopAndJoinGlobalDispatchers() { for (int i = 0; i < FLAGS_task_group_ntags; ++i) { for (int j = 0; j < FLAGS_event_dispatcher_num; ++j) { diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h index fd91d3c53f..3fdc9f17b9 100644 --- a/src/brpc/event_dispatcher.h +++ b/src/brpc/event_dispatcher.h @@ -19,6 +19,7 @@ #ifndef BRPC_EVENT_DISPATCHER_H #define BRPC_EVENT_DISPATCHER_H +#include // DECLARE_bool #include "butil/macros.h" // DISALLOW_COPY_AND_ASSIGN #include "bthread/types.h" // bthread_t, bthread_attr_t #include "brpc/versioned_ref_with_id.h" @@ -26,6 +27,8 @@ namespace brpc { +DECLARE_bool(event_dispatcher_edisp_unsched); + // Unique identifier of a IOEventData. // Users shall store EventDataId instead of EventData and call EventData::Address() // to convert the identifier to an unique_ptr at each access. Whenever a @@ -87,8 +90,9 @@ namespace rdma { class RdmaEndpoint; } -// Dispatch edge-triggered events of file descriptors to consumers -// running in separate bthreads. +// Dispatch edge-triggered events of file descriptors to consumers. +// By default callbacks run in spawned bthreads; when usercode-in-coroutine is +// enabled, the callback may run inline in the current coroutine. class EventDispatcher { friend class Socket; friend class rdma::RdmaEndpoint; @@ -188,6 +192,11 @@ template friend class IOEvent; EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag); +// Unified unsched switch for transport layer. +// false -> urgent start (foreground scheduling before caller continues), +// true -> background start (allowing schedule away). +bool EventDispatcherUnsched(); + // IOEvent class manages the IO events of a file descriptor conveniently. template class IOEvent { diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 3cc2107f23..a939332f4c 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -63,7 +63,6 @@ BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate); DEFINE_bool(rdma_use_polling, false, "Use polling mode for RDMA."); DEFINE_int32(rdma_poller_num, 1, "Poller number in RDMA polling mode."); DEFINE_bool(rdma_poller_yield, false, "Yield thread in RDMA polling mode."); -DEFINE_bool(rdma_edisp_unsched, false, "Disable event dispatcher schedule"); DEFINE_bool(rdma_disable_bthread, false, "Disable bthread in RDMA"); static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index eb4714ef0d..54a008f1f7 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -38,7 +38,6 @@ namespace rdma { DECLARE_bool(rdma_use_polling); DECLARE_int32(rdma_poller_num); -DECLARE_bool(rdma_edisp_unsched); DECLARE_bool(rdma_disable_bthread); class RdmaConnect : public AppConnect { diff --git a/src/brpc/rdma_transport.cpp b/src/brpc/rdma_transport.cpp index 8fe88c6b4b..88d89a7b06 100644 --- a/src/brpc/rdma_transport.cpp +++ b/src/brpc/rdma_transport.cpp @@ -18,6 +18,7 @@ #if BRPC_WITH_RDMA #include "brpc/rdma_transport.h" +#include "brpc/event_dispatcher.h" #include "brpc/tcp_transport.h" #include "brpc/rdma/rdma_endpoint.h" #include "brpc/rdma/rdma_helper.h" @@ -127,13 +128,13 @@ void RdmaTransport::ProcessEvent(bthread_attr_t attr) { bthread_t tid; if (FLAGS_usercode_in_coroutine) { OnEdge(_socket); - } else if (rdma::FLAGS_rdma_edisp_unsched == false) { - auto rc = bthread_start_background(&tid, &attr, OnEdge, _socket); + } else if (!EventDispatcherUnsched()) { + auto rc = bthread_start_urgent(&tid, &attr, OnEdge, _socket); if (rc != 0) { LOG(FATAL) << "Fail to start ProcessEvent"; OnEdge(_socket); } - } else if (bthread_start_urgent(&tid, &attr, OnEdge, _socket) != 0) { + } else if (bthread_start_background(&tid, &attr, OnEdge, _socket) != 0) { LOG(FATAL) << "Fail to start ProcessEvent"; OnEdge(_socket); } @@ -235,4 +236,4 @@ bool RdmaTransport::OptionsAvailableOverRdma(const ServerOptions* opt) { return true; } } // namespace brpc -#endif \ No newline at end of file +#endif diff --git a/src/brpc/tcp_transport.cpp b/src/brpc/tcp_transport.cpp index 37db7a8966..27e6ae87be 100644 --- a/src/brpc/tcp_transport.cpp +++ b/src/brpc/tcp_transport.cpp @@ -16,6 +16,7 @@ // under the License. #include "brpc/tcp_transport.h" +#include "brpc/event_dispatcher.h" namespace brpc { DECLARE_bool(usercode_in_coroutine); @@ -68,7 +69,13 @@ void TcpTransport::ProcessEvent(bthread_attr_t attr) { bthread_t tid; if (FLAGS_usercode_in_coroutine) { OnEdge(_socket); - } else if (bthread_start_urgent(&tid, &attr, OnEdge, _socket) != 0) { + } else if (!EventDispatcherUnsched()) { + auto rc = bthread_start_urgent(&tid, &attr, OnEdge, _socket); + if (rc != 0) { + LOG(FATAL) << "Fail to start ProcessEvent"; + OnEdge(_socket); + } + } else if (bthread_start_background(&tid, &attr, OnEdge, _socket) != 0) { LOG(FATAL) << "Fail to start ProcessEvent"; OnEdge(_socket); } @@ -96,4 +103,4 @@ void TcpTransport::QueueMessage(InputMessageClosure& input_msg, } } -} // namespace brpc \ No newline at end of file +} // namespace brpc