diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 3e49c887940710..2dd71786141811 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -63,6 +63,8 @@ DEFINE_Int32(be_port, "9060"); // port for brpc DEFINE_Int32(brpc_port, "8060"); +DEFINE_Bool(enable_arrow_flight, "true"); + DEFINE_Int32(arrow_flight_sql_port, "8050"); DEFINE_Int32(cdc_client_port, "9096"); diff --git a/be/src/common/config.h b/be/src/common/config.h index f08a260e20f815..f3b0f7e4dbcf21 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -97,9 +97,11 @@ DECLARE_Int32(be_port); DECLARE_Int32(brpc_port); // port for arrow flight sql -// Default -1, do not start arrow flight sql server. DECLARE_Int32(arrow_flight_sql_port); +// Whether to enable arrow flight service +DECLARE_Bool(enable_arrow_flight); + // port for cdc client scan oltp cdc data DECLARE_Int32(cdc_client_port); diff --git a/be/src/service/arrow_flight/flight_sql_service.cpp b/be/src/service/arrow_flight/flight_sql_service.cpp index e49f355ad308a7..827ba71985a234 100644 --- a/be/src/service/arrow_flight/flight_sql_service.cpp +++ b/be/src/service/arrow_flight/flight_sql_service.cpp @@ -121,10 +121,6 @@ arrow::Result> FlightSqlServer: } Status FlightSqlServer::init(int port) { - if (port == -1) { - LOG(INFO) << "Arrow Flight Service not start"; - return Status::OK(); - } _inited = true; arrow::flight::Location bind_location; RETURN_DORIS_STATUS_IF_ERROR( diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 8c8c14f92150e6..9fc3d9cb23ce60 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -590,11 +590,13 @@ int main(int argc, char** argv) { status.to_string()); // 5. arrow flight service - std::shared_ptr flight_server = - std::move(doris::flight::FlightSqlServer::create()).ValueOrDie(); - status = flight_server->init(doris::config::arrow_flight_sql_port); - stop_work_if_error( - status, "Arrow Flight Service did not start correctly, exiting, " + status.to_string()); + if (doris::config::enable_arrow_flight) { + std::shared_ptr flight_server = + std::move(doris::flight::FlightSqlServer::create()).ValueOrDie(); + status = flight_server->init(doris::config::arrow_flight_sql_port); + stop_work_if_error(status, "Arrow Flight Service did not start correctly, exiting, " + + status.to_string()); + } // 6. start daemon thread to do clean or gc jobs doris::Daemon daemon; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index f118e8b412211c..fa10afbb1ed41f 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -48,6 +48,7 @@ #include #include #include +#include #include #include #include @@ -228,14 +229,16 @@ PInternalService::PInternalService(ExecEnv* exec_env) config::brpc_light_work_pool_max_queue_size != -1 ? config::brpc_light_work_pool_max_queue_size : std::max(10240, CpuInfo::num_cores() * 320), - "brpc_light"), - _arrow_flight_work_pool(config::brpc_arrow_flight_work_pool_threads != -1 - ? config::brpc_arrow_flight_work_pool_threads - : std::max(512, CpuInfo::num_cores() * 2), - config::brpc_arrow_flight_work_pool_max_queue_size != -1 - ? config::brpc_arrow_flight_work_pool_max_queue_size - : std::max(20480, CpuInfo::num_cores() * 640), - "brpc_arrow_flight") { + "brpc_light") { + if (config::enable_arrow_flight) { + _arrow_flight_work_pool.emplace(config::brpc_arrow_flight_work_pool_threads != -1 + ? config::brpc_arrow_flight_work_pool_threads + : std::max(512, CpuInfo::num_cores() * 2), + config::brpc_arrow_flight_work_pool_max_queue_size != -1 + ? config::brpc_arrow_flight_work_pool_max_queue_size + : std::max(20480, CpuInfo::num_cores() * 640), + "brpc_arrow_flight"); + } REGISTER_HOOK_METRIC(heavy_work_pool_queue_size, [this]() { return _heavy_work_pool.get_queue_size(); }); REGISTER_HOOK_METRIC(light_work_pool_queue_size, @@ -254,14 +257,16 @@ PInternalService::PInternalService(ExecEnv* exec_env) REGISTER_HOOK_METRIC(light_work_max_threads, []() { return config::brpc_light_work_pool_threads; }); - REGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size, - [this]() { return _arrow_flight_work_pool.get_queue_size(); }); - REGISTER_HOOK_METRIC(arrow_flight_work_active_threads, - [this]() { return _arrow_flight_work_pool.get_active_threads(); }); - REGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size, - []() { return config::brpc_arrow_flight_work_pool_max_queue_size; }); - REGISTER_HOOK_METRIC(arrow_flight_work_max_threads, - []() { return config::brpc_arrow_flight_work_pool_threads; }); + if (_arrow_flight_work_pool) { + REGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size, + [this]() { return _arrow_flight_work_pool->get_queue_size(); }); + REGISTER_HOOK_METRIC(arrow_flight_work_active_threads, + [this]() { return _arrow_flight_work_pool->get_active_threads(); }); + REGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size, + []() { return config::brpc_arrow_flight_work_pool_max_queue_size; }); + REGISTER_HOOK_METRIC(arrow_flight_work_max_threads, + []() { return config::brpc_arrow_flight_work_pool_threads; }); + } _exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool); @@ -285,10 +290,12 @@ PInternalService::~PInternalService() { DEREGISTER_HOOK_METRIC(heavy_work_max_threads); DEREGISTER_HOOK_METRIC(light_work_max_threads); - DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size); - DEREGISTER_HOOK_METRIC(arrow_flight_work_active_threads); - DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size); - DEREGISTER_HOOK_METRIC(arrow_flight_work_max_threads); + if (_arrow_flight_work_pool) { + DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size); + DEREGISTER_HOOK_METRIC(arrow_flight_work_active_threads); + DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size); + DEREGISTER_HOOK_METRIC(arrow_flight_work_max_threads); + } CHECK_EQ(0, bthread_key_delete(btls_key)); CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key)); @@ -670,7 +677,12 @@ void PInternalService::fetch_arrow_data(google::protobuf::RpcController* control const PFetchArrowDataRequest* request, PFetchArrowDataResult* result, google::protobuf::Closure* done) { - bool ret = _arrow_flight_work_pool.try_offer([request, result, done]() { + if (!_arrow_flight_work_pool) { + offer_failed(result, done, _light_work_pool); + return; + } + + bool ret = _arrow_flight_work_pool->try_offer([request, result, done]() { auto ctx = vectorized::GetArrowResultBatchCtx::create_shared(result, done); TUniqueId unique_id = UniqueId(request->finst_id()).to_thrift(); // query_id or instance_id std::shared_ptr arrow_buffer; @@ -684,7 +696,7 @@ void PInternalService::fetch_arrow_data(google::protobuf::RpcController* control } }); if (!ret) { - offer_failed(result, done, _arrow_flight_work_pool); + offer_failed(result, done, *_arrow_flight_work_pool); return; } } @@ -917,7 +929,11 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController const PFetchArrowFlightSchemaRequest* request, PFetchArrowFlightSchemaResult* result, google::protobuf::Closure* done) { - bool ret = _arrow_flight_work_pool.try_offer([request, result, done]() { + if (!_arrow_flight_work_pool) { + offer_failed(result, done, _light_work_pool); + return; + } + bool ret = _arrow_flight_work_pool->try_offer([request, result, done]() { brpc::ClosureGuard closure_guard(done); std::shared_ptr schema; std::shared_ptr buffer; @@ -949,7 +965,7 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController st.to_protobuf(result->mutable_status()); }); if (!ret) { - offer_failed(result, done, _arrow_flight_work_pool); + offer_failed(result, done, *_arrow_flight_work_pool); return; } } diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index b1e11ec34b73c4..299c81ef5be382 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -275,7 +275,7 @@ class PInternalService : public PBackendService { // otherwise as light interface FifoThreadPool _heavy_work_pool; FifoThreadPool _light_work_pool; - FifoThreadPool _arrow_flight_work_pool; + std::optional _arrow_flight_work_pool; }; // `StorageEngine` mixin for `PInternalService`