From a27043ba0393d59e87a0e7095279d3f93935996c Mon Sep 17 00:00:00 2001 From: uchenily Date: Fri, 30 Jan 2026 14:04:03 +0800 Subject: [PATCH 1/3] check arrow flight --- be/src/service/internal_service.cpp | 86 +++++++++++++++++------------ be/src/service/internal_service.h | 2 +- 2 files changed, 52 insertions(+), 36 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index f118e8b412211c..e1b9b2998f1589 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -48,6 +48,7 @@ #include #include #include +#include #include #include #include @@ -228,14 +229,16 @@ PInternalService::PInternalService(ExecEnv* exec_env) config::brpc_light_work_pool_max_queue_size != -1 ? config::brpc_light_work_pool_max_queue_size : std::max(10240, CpuInfo::num_cores() * 320), - "brpc_light"), - _arrow_flight_work_pool(config::brpc_arrow_flight_work_pool_threads != -1 - ? config::brpc_arrow_flight_work_pool_threads - : std::max(512, CpuInfo::num_cores() * 2), - config::brpc_arrow_flight_work_pool_max_queue_size != -1 - ? config::brpc_arrow_flight_work_pool_max_queue_size - : std::max(20480, CpuInfo::num_cores() * 640), - "brpc_arrow_flight") { + "brpc_light") { + if (config::arrow_flight_sql_port != -1) { + _arrow_flight_work_pool.emplace(config::brpc_arrow_flight_work_pool_threads != -1 + ? config::brpc_arrow_flight_work_pool_threads + : std::max(512, CpuInfo::num_cores() * 2), + config::brpc_arrow_flight_work_pool_max_queue_size != -1 + ? config::brpc_arrow_flight_work_pool_max_queue_size + : std::max(20480, CpuInfo::num_cores() * 640), + "brpc_arrow_flight"); + } REGISTER_HOOK_METRIC(heavy_work_pool_queue_size, [this]() { return _heavy_work_pool.get_queue_size(); }); REGISTER_HOOK_METRIC(light_work_pool_queue_size, @@ -254,14 +257,16 @@ PInternalService::PInternalService(ExecEnv* exec_env) REGISTER_HOOK_METRIC(light_work_max_threads, []() { return config::brpc_light_work_pool_threads; }); - REGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size, - [this]() { return _arrow_flight_work_pool.get_queue_size(); }); - REGISTER_HOOK_METRIC(arrow_flight_work_active_threads, - [this]() { return _arrow_flight_work_pool.get_active_threads(); }); - REGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size, - []() { return config::brpc_arrow_flight_work_pool_max_queue_size; }); - REGISTER_HOOK_METRIC(arrow_flight_work_max_threads, - []() { return config::brpc_arrow_flight_work_pool_threads; }); + if (_arrow_flight_work_pool) { + REGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size, + [this]() { return _arrow_flight_work_pool->get_queue_size(); }); + REGISTER_HOOK_METRIC(arrow_flight_work_active_threads, + [this]() { return _arrow_flight_work_pool->get_active_threads(); }); + REGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size, + []() { return config::brpc_arrow_flight_work_pool_max_queue_size; }); + REGISTER_HOOK_METRIC(arrow_flight_work_max_threads, + []() { return config::brpc_arrow_flight_work_pool_threads; }); + } _exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool); @@ -285,10 +290,12 @@ PInternalService::~PInternalService() { DEREGISTER_HOOK_METRIC(heavy_work_max_threads); DEREGISTER_HOOK_METRIC(light_work_max_threads); - DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size); - DEREGISTER_HOOK_METRIC(arrow_flight_work_active_threads); - DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size); - DEREGISTER_HOOK_METRIC(arrow_flight_work_max_threads); + if (_arrow_flight_work_pool) { + DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size); + DEREGISTER_HOOK_METRIC(arrow_flight_work_active_threads); + DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size); + DEREGISTER_HOOK_METRIC(arrow_flight_work_max_threads); + } CHECK_EQ(0, bthread_key_delete(btls_key)); CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key)); @@ -670,7 +677,12 @@ void PInternalService::fetch_arrow_data(google::protobuf::RpcController* control const PFetchArrowDataRequest* request, PFetchArrowDataResult* result, google::protobuf::Closure* done) { - bool ret = _arrow_flight_work_pool.try_offer([request, result, done]() { + if (!_arrow_flight_work_pool) { + offer_failed(result, done, _light_work_pool); + return; + } + + bool ret = _arrow_flight_work_pool->try_offer([request, result, done]() { auto ctx = vectorized::GetArrowResultBatchCtx::create_shared(result, done); TUniqueId unique_id = UniqueId(request->finst_id()).to_thrift(); // query_id or instance_id std::shared_ptr arrow_buffer; @@ -684,7 +696,7 @@ void PInternalService::fetch_arrow_data(google::protobuf::RpcController* control } }); if (!ret) { - offer_failed(result, done, _arrow_flight_work_pool); + offer_failed(result, done, *_arrow_flight_work_pool); return; } } @@ -917,7 +929,11 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController const PFetchArrowFlightSchemaRequest* request, PFetchArrowFlightSchemaResult* result, google::protobuf::Closure* done) { - bool ret = _arrow_flight_work_pool.try_offer([request, result, done]() { + if (!_arrow_flight_work_pool) { + offer_failed(result, done, _light_work_pool); + return; + } + bool ret = _arrow_flight_work_pool->try_offer([request, result, done]() { brpc::ClosureGuard closure_guard(done); std::shared_ptr schema; std::shared_ptr buffer; @@ -949,7 +965,7 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController st.to_protobuf(result->mutable_status()); }); if (!ret) { - offer_failed(result, done, _arrow_flight_work_pool); + offer_failed(result, done, *_arrow_flight_work_pool); return; } } @@ -1685,17 +1701,17 @@ void PInternalService::transmit_block(google::protobuf::RpcController* controlle // pool here. _transmit_block(controller, request, response, done, Status::OK(), 0); } else { - bool ret = _light_work_pool.try_offer([this, controller, request, response, done, - receive_time]() { - response->set_receive_time(receive_time); - // Sometimes transmit block function is the last owner of PlanFragmentExecutor - // It will release the object. And the object maybe a JNIContext. - // JNIContext will hold some TLS object. It could not work correctly under bthread - // Context. So that put the logic into pthread. - // But this is rarely happens, so this config is disabled by default. - _transmit_block(controller, request, response, done, Status::OK(), - GetCurrentTimeNanos() - receive_time); - }); + bool ret = _light_work_pool.try_offer( + [this, controller, request, response, done, receive_time]() { + response->set_receive_time(receive_time); + // Sometimes transmit block function is the last owner of PlanFragmentExecutor + // It will release the object. And the object maybe a JNIContext. + // JNIContext will hold some TLS object. It could not work correctly under bthread + // Context. So that put the logic into pthread. + // But this is rarely happens, so this config is disabled by default. + _transmit_block(controller, request, response, done, Status::OK(), + GetCurrentTimeNanos() - receive_time); + }); if (!ret) { offer_failed(response, done, _light_work_pool); return; diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index b1e11ec34b73c4..299c81ef5be382 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -275,7 +275,7 @@ class PInternalService : public PBackendService { // otherwise as light interface FifoThreadPool _heavy_work_pool; FifoThreadPool _light_work_pool; - FifoThreadPool _arrow_flight_work_pool; + std::optional _arrow_flight_work_pool; }; // `StorageEngine` mixin for `PInternalService` From 3c5fc0c7add8aa1ffba24e676dd675921ff5796d Mon Sep 17 00:00:00 2001 From: uchenily Date: Fri, 30 Jan 2026 14:20:23 +0800 Subject: [PATCH 2/3] enable_arrow_flight --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 4 +++- be/src/service/arrow_flight/flight_sql_service.cpp | 4 ---- be/src/service/doris_main.cpp | 12 +++++++----- be/src/service/internal_service.cpp | 2 +- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 3e49c887940710..2dd71786141811 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -63,6 +63,8 @@ DEFINE_Int32(be_port, "9060"); // port for brpc DEFINE_Int32(brpc_port, "8060"); +DEFINE_Bool(enable_arrow_flight, "true"); + DEFINE_Int32(arrow_flight_sql_port, "8050"); DEFINE_Int32(cdc_client_port, "9096"); diff --git a/be/src/common/config.h b/be/src/common/config.h index f08a260e20f815..f3b0f7e4dbcf21 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -97,9 +97,11 @@ DECLARE_Int32(be_port); DECLARE_Int32(brpc_port); // port for arrow flight sql -// Default -1, do not start arrow flight sql server. DECLARE_Int32(arrow_flight_sql_port); +// Whether to enable arrow flight service +DECLARE_Bool(enable_arrow_flight); + // port for cdc client scan oltp cdc data DECLARE_Int32(cdc_client_port); diff --git a/be/src/service/arrow_flight/flight_sql_service.cpp b/be/src/service/arrow_flight/flight_sql_service.cpp index e49f355ad308a7..827ba71985a234 100644 --- a/be/src/service/arrow_flight/flight_sql_service.cpp +++ b/be/src/service/arrow_flight/flight_sql_service.cpp @@ -121,10 +121,6 @@ arrow::Result> FlightSqlServer: } Status FlightSqlServer::init(int port) { - if (port == -1) { - LOG(INFO) << "Arrow Flight Service not start"; - return Status::OK(); - } _inited = true; arrow::flight::Location bind_location; RETURN_DORIS_STATUS_IF_ERROR( diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 8c8c14f92150e6..9fc3d9cb23ce60 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -590,11 +590,13 @@ int main(int argc, char** argv) { status.to_string()); // 5. arrow flight service - std::shared_ptr flight_server = - std::move(doris::flight::FlightSqlServer::create()).ValueOrDie(); - status = flight_server->init(doris::config::arrow_flight_sql_port); - stop_work_if_error( - status, "Arrow Flight Service did not start correctly, exiting, " + status.to_string()); + if (doris::config::enable_arrow_flight) { + std::shared_ptr flight_server = + std::move(doris::flight::FlightSqlServer::create()).ValueOrDie(); + status = flight_server->init(doris::config::arrow_flight_sql_port); + stop_work_if_error(status, "Arrow Flight Service did not start correctly, exiting, " + + status.to_string()); + } // 6. start daemon thread to do clean or gc jobs doris::Daemon daemon; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index e1b9b2998f1589..081c447b19c62b 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -230,7 +230,7 @@ PInternalService::PInternalService(ExecEnv* exec_env) ? config::brpc_light_work_pool_max_queue_size : std::max(10240, CpuInfo::num_cores() * 320), "brpc_light") { - if (config::arrow_flight_sql_port != -1) { + if (config::enable_arrow_flight) { _arrow_flight_work_pool.emplace(config::brpc_arrow_flight_work_pool_threads != -1 ? config::brpc_arrow_flight_work_pool_threads : std::max(512, CpuInfo::num_cores() * 2), From 79c9241a37881b9c53087e89b212b7782328c4d4 Mon Sep 17 00:00:00 2001 From: uchenily Date: Fri, 30 Jan 2026 14:40:17 +0800 Subject: [PATCH 3/3] reformat --- be/src/service/internal_service.cpp | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 081c447b19c62b..fa10afbb1ed41f 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1701,17 +1701,17 @@ void PInternalService::transmit_block(google::protobuf::RpcController* controlle // pool here. _transmit_block(controller, request, response, done, Status::OK(), 0); } else { - bool ret = _light_work_pool.try_offer( - [this, controller, request, response, done, receive_time]() { - response->set_receive_time(receive_time); - // Sometimes transmit block function is the last owner of PlanFragmentExecutor - // It will release the object. And the object maybe a JNIContext. - // JNIContext will hold some TLS object. It could not work correctly under bthread - // Context. So that put the logic into pthread. - // But this is rarely happens, so this config is disabled by default. - _transmit_block(controller, request, response, done, Status::OK(), - GetCurrentTimeNanos() - receive_time); - }); + bool ret = _light_work_pool.try_offer([this, controller, request, response, done, + receive_time]() { + response->set_receive_time(receive_time); + // Sometimes transmit block function is the last owner of PlanFragmentExecutor + // It will release the object. And the object maybe a JNIContext. + // JNIContext will hold some TLS object. It could not work correctly under bthread + // Context. So that put the logic into pthread. + // But this is rarely happens, so this config is disabled by default. + _transmit_block(controller, request, response, done, Status::OK(), + GetCurrentTimeNanos() - receive_time); + }); if (!ret) { offer_failed(response, done, _light_work_pool); return;