Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions be/src/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,28 @@ class TaskExecutorSimplifiedScanScheduler final : public ScannerScheduler {

Status submit_scan_task(SimplifiedScanTask scan_task) override {
if (!_is_stop) {
if (scan_task.scanner_context == nullptr) {
return Status::InternalError<false>("scanner pool {} got null scanner context.",
_sched_name);
}
if (scan_task.scan_task == nullptr) {
return Status::InternalError<false>("scanner pool {} got null scan task.",
_sched_name);
}
auto task_handle = scan_task.scanner_context->task_handle();
if (task_handle == nullptr) {
return Status::InternalError<false>(
"scanner pool {} got null task handle, scan task first schedule: {}, "
"scanner context: {}",
_sched_name, scan_task.scan_task->is_first_schedule,
scan_task.scanner_context->debug_string());
}
std::shared_ptr<SplitRunner> split_runner;
if (scan_task.scan_task->is_first_schedule) {
split_runner = std::make_shared<ScannerSplitRunner>("scanner_split_runner",
scan_task.scan_func);
RETURN_IF_ERROR(split_runner->init());
auto result = _task_executor->enqueue_splits(
scan_task.scanner_context->task_handle(), false, {split_runner});
auto result = _task_executor->enqueue_splits(task_handle, false, {split_runner});
if (!result.has_value()) {
LOG(WARNING) << "enqueue_splits failed: " << result.error();
return result.error();
Expand All @@ -309,8 +324,7 @@ class TaskExecutorSimplifiedScanScheduler final : public ScannerScheduler {
if (split_runner == nullptr) {
return Status::OK();
}
RETURN_IF_ERROR(_task_executor->re_enqueue_split(
scan_task.scanner_context->task_handle(), false, split_runner));
RETURN_IF_ERROR(_task_executor->re_enqueue_split(task_handle, false, split_runner));
}
scan_task.scan_task->split_runner = split_runner;
return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,24 @@ extern ::doris::MetricPrototype METRIC_thread_pool_task_execution_count_total;
extern ::doris::MetricPrototype METRIC_thread_pool_task_wait_worker_time_ns_total;
extern ::doris::MetricPrototype METRIC_thread_pool_task_wait_worker_count_total;

namespace {

Result<std::shared_ptr<TimeSharingTaskHandle>> get_time_sharing_task_handle(
const std::shared_ptr<TaskHandle>& task_handle, const char* operation) {
if (task_handle == nullptr) {
return ResultError(Status::InternalError("{} got null task handle", operation));
}

auto handle = std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle);
if (handle == nullptr) {
return ResultError(Status::InternalError("{} got invalid task handle type, task id: {}",
operation, task_handle->task_id().to_string()));
}
return handle;
}

} // namespace

SplitThreadPoolToken::SplitThreadPoolToken(TimeSharingTaskExecutor* pool,
TimeSharingTaskExecutor::ExecutionMode mode,
std::shared_ptr<SplitQueue> split_queue,
Expand Down Expand Up @@ -743,7 +761,7 @@ Status TimeSharingTaskExecutor::add_task(const TaskId& task_id,
}

Status TimeSharingTaskExecutor::remove_task(std::shared_ptr<TaskHandle> task_handle) {
auto handle = std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle);
auto handle = DORIS_TRY(get_time_sharing_task_handle(task_handle, "remove_task"));
std::vector<std::shared_ptr<PrioritizedSplitRunner>> splits_to_destroy;

{
Expand Down Expand Up @@ -806,7 +824,11 @@ Result<std::vector<SharedListenableFuture<Void>>> TimeSharingTaskExecutor::enque
}
}};
std::vector<SharedListenableFuture<Void>> finished_futures;
auto handle = std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle);
auto handle_result = get_time_sharing_task_handle(task_handle, "enqueue_splits");
if (!handle_result.has_value()) {
return ResultError(handle_result.error());
}
auto handle = handle_result.value();
{
std::unique_lock<std::mutex> lock(_mutex);
for (const auto& task_split : splits) {
Expand Down Expand Up @@ -839,7 +861,7 @@ Result<std::vector<SharedListenableFuture<Void>>> TimeSharingTaskExecutor::enque
Status TimeSharingTaskExecutor::re_enqueue_split(std::shared_ptr<TaskHandle> task_handle,
bool intermediate,
const std::shared_ptr<SplitRunner>& split) {
auto handle = std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle);
auto handle = DORIS_TRY(get_time_sharing_task_handle(task_handle, "re_enqueue_split"));
std::shared_ptr<PrioritizedSplitRunner> prioritized_split =
handle->get_split(split, intermediate);
prioritized_split->reset_level_priority();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <future>
#include <mutex>
#include <random>
#include <string>
#include <thread>

#include "common/exception.h"
Expand Down Expand Up @@ -339,6 +340,20 @@ class QueueOnlySplitRunner : public SplitRunner {
std::atomic<bool> _finished {false};
};

class TestingTaskHandle final : public TaskHandle {
public:
explicit TestingTaskHandle(std::string task_id) : _task_id(std::move(task_id)) {}

Status init() override { return Status::OK(); }

bool is_closed() const override { return false; }

TaskId task_id() const override { return _task_id; }

private:
TaskId _task_id;
};

class TimeSharingTaskExecutorTest : public testing::Test {
protected:
void SetUp() override {}
Expand Down Expand Up @@ -424,6 +439,48 @@ TEST_F(TimeSharingTaskExecutorTest, test_remove_task_clears_queued_task_count) {
executor.stop();
}

TEST_F(TimeSharingTaskExecutorTest, test_invalid_task_handle_returns_error) {
auto ticker = std::make_shared<TestingTicker>();

TimeSharingTaskExecutor::ThreadConfig thread_config;
thread_config.thread_name = "invalid_task_handle";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new test leaves thread_config.max_thread_num and thread_config.min_thread_num uninitialized here. ThreadConfig only defaults max_queue_size; the constructor copies the two indeterminate ints into _max_threads/_min_threads, and init() immediately calls _try_create_thread(_min_threads, ...). That makes this test nondeterministic: it can try to create an arbitrary number of worker threads or fail depending on stack contents. Please initialize both fields, like the adjacent tests do, before constructing the executor.

thread_config.workload_group = "normal";
TimeSharingTaskExecutor executor(thread_config, 0, 1, 1, ticker);
ASSERT_TRUE(executor.init().ok());

auto split = std::make_shared<QueueOnlySplitRunner>();

auto null_enqueue_result = executor.enqueue_splits(nullptr, false, {split});
ASSERT_FALSE(null_enqueue_result.has_value());
EXPECT_NE(std::string(null_enqueue_result.error().msg()).find("null task handle"),
std::string::npos);

Status null_re_enqueue_status = executor.re_enqueue_split(nullptr, false, split);
ASSERT_FALSE(null_re_enqueue_status.ok());
EXPECT_NE(std::string(null_re_enqueue_status.msg()).find("null task handle"),
std::string::npos);

Status null_remove_status = executor.remove_task(nullptr);
ASSERT_FALSE(null_remove_status.ok());
EXPECT_NE(std::string(null_remove_status.msg()).find("null task handle"), std::string::npos);

auto invalid_task_handle = std::make_shared<TestingTaskHandle>("invalid_task");
auto invalid_enqueue_result = executor.enqueue_splits(invalid_task_handle, false, {split});
ASSERT_FALSE(invalid_enqueue_result.has_value());
EXPECT_NE(std::string(invalid_enqueue_result.error().msg()).find("invalid task handle type"),
std::string::npos);

Status invalid_re_enqueue_status = executor.re_enqueue_split(invalid_task_handle, false, split);
ASSERT_FALSE(invalid_re_enqueue_status.ok());
EXPECT_NE(std::string(invalid_re_enqueue_status.msg()).find("invalid task handle type"),
std::string::npos);

Status invalid_remove_status = executor.remove_task(invalid_task_handle);
ASSERT_FALSE(invalid_remove_status.ok());
EXPECT_NE(std::string(invalid_remove_status.msg()).find("invalid task handle type"),
std::string::npos);
}

TEST_F(TimeSharingTaskExecutorTest, test_tasks_complete) {
auto ticker = std::make_shared<TestingTicker>();

Expand Down
Loading