Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ DEFINE_Int32(be_port, "9060");
// port for brpc
DEFINE_Int32(brpc_port, "8060");

DEFINE_Bool(enable_arrow_flight, "true");

DEFINE_Int32(arrow_flight_sql_port, "8050");

DEFINE_Int32(cdc_client_port, "9096");
Expand Down
4 changes: 3 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ DECLARE_Int32(be_port);
DECLARE_Int32(brpc_port);

// port for arrow flight sql
// Default -1, do not start arrow flight sql server.
DECLARE_Int32(arrow_flight_sql_port);

// Whether to enable arrow flight service
DECLARE_Bool(enable_arrow_flight);

// port for cdc client scan oltp cdc data
DECLARE_Int32(cdc_client_port);

Expand Down
4 changes: 0 additions & 4 deletions be/src/service/arrow_flight/flight_sql_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>> FlightSqlServer:
}

Status FlightSqlServer::init(int port) {
if (port == -1) {
LOG(INFO) << "Arrow Flight Service not start";
return Status::OK();
}
_inited = true;
arrow::flight::Location bind_location;
RETURN_DORIS_STATUS_IF_ERROR(
Expand Down
12 changes: 7 additions & 5 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,11 +590,13 @@ int main(int argc, char** argv) {
status.to_string());

// 5. arrow flight service
std::shared_ptr<doris::flight::FlightSqlServer> flight_server =
std::move(doris::flight::FlightSqlServer::create()).ValueOrDie();
status = flight_server->init(doris::config::arrow_flight_sql_port);
stop_work_if_error(
status, "Arrow Flight Service did not start correctly, exiting, " + status.to_string());
if (doris::config::enable_arrow_flight) {
std::shared_ptr<doris::flight::FlightSqlServer> flight_server =
std::move(doris::flight::FlightSqlServer::create()).ValueOrDie();
status = flight_server->init(doris::config::arrow_flight_sql_port);
stop_work_if_error(status, "Arrow Flight Service did not start correctly, exiting, " +
status.to_string());
}

// 6. start daemon thread to do clean or gc jobs
doris::Daemon daemon;
Expand Down
64 changes: 40 additions & 24 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include <exception>
#include <filesystem>
#include <memory>
#include <optional>
#include <set>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -228,14 +229,16 @@ PInternalService::PInternalService(ExecEnv* exec_env)
config::brpc_light_work_pool_max_queue_size != -1
? config::brpc_light_work_pool_max_queue_size
: std::max(10240, CpuInfo::num_cores() * 320),
"brpc_light"),
_arrow_flight_work_pool(config::brpc_arrow_flight_work_pool_threads != -1
? config::brpc_arrow_flight_work_pool_threads
: std::max(512, CpuInfo::num_cores() * 2),
config::brpc_arrow_flight_work_pool_max_queue_size != -1
? config::brpc_arrow_flight_work_pool_max_queue_size
: std::max(20480, CpuInfo::num_cores() * 640),
"brpc_arrow_flight") {
"brpc_light") {
if (config::enable_arrow_flight) {
_arrow_flight_work_pool.emplace(config::brpc_arrow_flight_work_pool_threads != -1
? config::brpc_arrow_flight_work_pool_threads
: std::max(512, CpuInfo::num_cores() * 2),
config::brpc_arrow_flight_work_pool_max_queue_size != -1
? config::brpc_arrow_flight_work_pool_max_queue_size
: std::max(20480, CpuInfo::num_cores() * 640),
"brpc_arrow_flight");
}
REGISTER_HOOK_METRIC(heavy_work_pool_queue_size,
[this]() { return _heavy_work_pool.get_queue_size(); });
REGISTER_HOOK_METRIC(light_work_pool_queue_size,
Expand All @@ -254,14 +257,16 @@ PInternalService::PInternalService(ExecEnv* exec_env)
REGISTER_HOOK_METRIC(light_work_max_threads,
[]() { return config::brpc_light_work_pool_threads; });

REGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size,
[this]() { return _arrow_flight_work_pool.get_queue_size(); });
REGISTER_HOOK_METRIC(arrow_flight_work_active_threads,
[this]() { return _arrow_flight_work_pool.get_active_threads(); });
REGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size,
[]() { return config::brpc_arrow_flight_work_pool_max_queue_size; });
REGISTER_HOOK_METRIC(arrow_flight_work_max_threads,
[]() { return config::brpc_arrow_flight_work_pool_threads; });
if (_arrow_flight_work_pool) {
REGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size,
[this]() { return _arrow_flight_work_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(arrow_flight_work_active_threads,
[this]() { return _arrow_flight_work_pool->get_active_threads(); });
REGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size,
[]() { return config::brpc_arrow_flight_work_pool_max_queue_size; });
REGISTER_HOOK_METRIC(arrow_flight_work_max_threads,
[]() { return config::brpc_arrow_flight_work_pool_threads; });
}

_exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool);

Expand All @@ -285,10 +290,12 @@ PInternalService::~PInternalService() {
DEREGISTER_HOOK_METRIC(heavy_work_max_threads);
DEREGISTER_HOOK_METRIC(light_work_max_threads);

DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size);
DEREGISTER_HOOK_METRIC(arrow_flight_work_active_threads);
DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size);
DEREGISTER_HOOK_METRIC(arrow_flight_work_max_threads);
if (_arrow_flight_work_pool) {
DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size);
DEREGISTER_HOOK_METRIC(arrow_flight_work_active_threads);
DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size);
DEREGISTER_HOOK_METRIC(arrow_flight_work_max_threads);
}

CHECK_EQ(0, bthread_key_delete(btls_key));
CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key));
Expand Down Expand Up @@ -670,7 +677,12 @@ void PInternalService::fetch_arrow_data(google::protobuf::RpcController* control
const PFetchArrowDataRequest* request,
PFetchArrowDataResult* result,
google::protobuf::Closure* done) {
bool ret = _arrow_flight_work_pool.try_offer([request, result, done]() {
if (!_arrow_flight_work_pool) {
offer_failed(result, done, _light_work_pool);
return;
}

bool ret = _arrow_flight_work_pool->try_offer([request, result, done]() {
auto ctx = vectorized::GetArrowResultBatchCtx::create_shared(result, done);
TUniqueId unique_id = UniqueId(request->finst_id()).to_thrift(); // query_id or instance_id
std::shared_ptr<vectorized::ArrowFlightResultBlockBuffer> arrow_buffer;
Expand All @@ -684,7 +696,7 @@ void PInternalService::fetch_arrow_data(google::protobuf::RpcController* control
}
});
if (!ret) {
offer_failed(result, done, _arrow_flight_work_pool);
offer_failed(result, done, *_arrow_flight_work_pool);
return;
}
}
Expand Down Expand Up @@ -917,7 +929,11 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController
const PFetchArrowFlightSchemaRequest* request,
PFetchArrowFlightSchemaResult* result,
google::protobuf::Closure* done) {
bool ret = _arrow_flight_work_pool.try_offer([request, result, done]() {
if (!_arrow_flight_work_pool) {
offer_failed(result, done, _light_work_pool);
return;
}
bool ret = _arrow_flight_work_pool->try_offer([request, result, done]() {
brpc::ClosureGuard closure_guard(done);
std::shared_ptr<arrow::Schema> schema;
std::shared_ptr<vectorized::ArrowFlightResultBlockBuffer> buffer;
Expand Down Expand Up @@ -949,7 +965,7 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController
st.to_protobuf(result->mutable_status());
});
if (!ret) {
offer_failed(result, done, _arrow_flight_work_pool);
offer_failed(result, done, *_arrow_flight_work_pool);
return;
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class PInternalService : public PBackendService {
// otherwise as light interface
FifoThreadPool _heavy_work_pool;
FifoThreadPool _light_work_pool;
FifoThreadPool _arrow_flight_work_pool;
std::optional<FifoThreadPool> _arrow_flight_work_pool;
};

// `StorageEngine` mixin for `PInternalService`
Expand Down