Skip to content

Commit 88b12ae

Browse files
committed
feat: remove redundant thread pool in WorkerImpl.
1 parent 2689dca commit 88b12ae

File tree

14 files changed

+64
-105
lines changed

14 files changed

+64
-105
lines changed

xllm/core/distributed_runtime/worker_service.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ limitations under the License.
3434

3535
namespace xllm {
3636

37+
constexpr uint32_t COPY_BATCH_SIZE = 1;
38+
3739
WorkerService::WorkerService(runtime::Options options,
3840
const torch::Device& device)
3941
: options_(options), device_(device), initialized_(false) {
@@ -499,15 +501,14 @@ void WorkerService::PrefetchFromStorage(
499501
auto close_future = stream_handler->get_close_future();
500502
bool is_completed = false;
501503

502-
for (size_t i = 0; i < transfer_slice.size();
503-
i += stream_copy_batch_size_) {
504+
for (size_t i = 0; i < transfer_slice.size(); i += COPY_BATCH_SIZE) {
504505
auto current_slice = transfer_slice.slice(
505-
i, std::min(i + stream_copy_batch_size_, transfer_slice.size()));
506+
i, std::min(i + COPY_BATCH_SIZE, transfer_slice.size()));
506507

507508
auto success_cnt = worker_->prefetch_from_storage(current_slice);
508509

509510
if (success_cnt != current_slice.size() ||
510-
i + stream_copy_batch_size_ >= transfer_slice.size()) {
511+
i + COPY_BATCH_SIZE >= transfer_slice.size()) {
511512
is_completed = true;
512513
}
513514

xllm/core/distributed_runtime/worker_service.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,8 @@ class WorkerService : public proto::DistributeWorker {
154154
std::unique_ptr<std::thread> polling_thread_;
155155

156156
std::unique_ptr<ThreadPool> threadpool_;
157-
ThreadPool copy_threadpool_{5};
158157

159-
uint32_t stream_copy_batch_size_ = 2;
158+
ThreadPool copy_threadpool_{5};
160159
};
161160

162161
} // namespace xllm

xllm/core/framework/block/block_manager_pool.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ void BlockManagerPool::deallocate(Sequence* sequence) {
118118
int32_t dp_rank = get_dp_rank(sequence);
119119
cache(sequence);
120120
if (!host_block_managers_.empty()) {
121-
record_offload_blocks(sequence);
121+
save_offload_blocks(sequence);
122122
}
123123
block_managers_[dp_rank]->deallocate(sequence->kv_state().kv_blocks());
124124
// release the blocks after prefix cache insertion
@@ -140,7 +140,7 @@ BlockManagerPool::get_load_block_transfer_infos() {
140140
return &load_block_transfer_infos_;
141141
}
142142

143-
void BlockManagerPool::set_offload_callback(
143+
void BlockManagerPool::postprocess_offload(
144144
std::vector<std::vector<folly::SemiFuture<uint32_t>>>& futures) {
145145
DCHECK(futures.size() == block_managers_.size());
146146
for (int i = 0; i < futures.size(); i++) {
@@ -356,7 +356,7 @@ void BlockManagerPool::allocate_host_shared(Sequence* sequence) {
356356
}
357357
}
358358

359-
void BlockManagerPool::record_offload_blocks(Sequence* sequence) {
359+
void BlockManagerPool::save_offload_blocks(Sequence* sequence) {
360360
DCHECK(sequence != nullptr);
361361

362362
auto* blocks = sequence->kv_state().mutable_kv_blocks();

xllm/core/framework/block/block_manager_pool.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class BlockManagerPool final : public KVCacheManager {
6464
get_offload_block_transfer_infos() override;
6565
std::vector<std::vector<BlockTransferInfo>>* get_load_block_transfer_infos()
6666
override;
67-
void set_offload_callback(
67+
void postprocess_offload(
6868
std::vector<std::vector<folly::SemiFuture<uint32_t>>>& futures) override;
6969
void reset_transfer_infos() override;
7070

@@ -77,6 +77,9 @@ class BlockManagerPool final : public KVCacheManager {
7777
std::vector<size_t> num_free_blocks() const override;
7878
std::vector<size_t> num_used_blocks() const override;
7979
double kv_cache_utilization() const override;
80+
bool allow_host_block_extend() override {
81+
return !host_block_managers_.empty();
82+
};
8083

8184
// get the options for the block manager
8285
const Options& options() const { return options_; }
@@ -86,7 +89,7 @@ class BlockManagerPool final : public KVCacheManager {
8689
int32_t get_dp_rank(Sequence* sequence) const;
8790

8891
void allocate_host_shared(Sequence* sequence);
89-
void record_offload_blocks(Sequence* sequence);
92+
void save_offload_blocks(Sequence* sequence);
9093

9194
void process_beam_search(Sequence* sequence, bool need_swap = false);
9295

xllm/core/framework/block/kv_cache_manager.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class KVCacheManager {
5151
virtual std::vector<std::vector<BlockTransferInfo>>*
5252
get_load_block_transfer_infos() = 0;
5353

54-
virtual void set_offload_callback(
54+
virtual void postprocess_offload(
5555
std::vector<std::vector<folly::SemiFuture<uint32_t>>>& futures) = 0;
5656

5757
virtual void reset_transfer_infos() = 0;
@@ -62,6 +62,7 @@ class KVCacheManager {
6262
virtual std::vector<size_t> num_free_blocks() const = 0;
6363
virtual std::vector<size_t> num_used_blocks() const = 0;
6464
virtual double kv_cache_utilization() const = 0;
65+
virtual bool allow_host_block_extend() { return false; };
6566

6667
protected:
6768
KVCacheManager() = default;

xllm/core/framework/kv_cache/kv_cache_store.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ bool KVCacheStore::init(const StoreConfig& config,
6464
return false;
6565
}
6666
} else {
67-
LOG(FATAL) << "rdma must RegisterLocalMemory, but got register size: "
67+
LOG(ERROR) << "rdma must RegisterLocalMemory, but got register size: "
6868
<< config_.total_size
6969
<< ", and data ptr: " << uint64_t(config_.tensor_data);
70+
return false;
7071
}
7172
}
7273
is_initialized_ = true;

xllm/core/framework/request/request.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,7 @@ class Request : public RequestBase {
7373

7474
size_t total_num_blocks();
7575

76-
void set_preempted() {
77-
state_.preempted = true;
78-
for (auto& seq : sequences_group_->sequences()) {
79-
seq->preempted();
80-
}
81-
}
76+
void set_preempted() { state_.preempted = true; }
8277

8378
bool preempted() const { return state_.preempted; }
8479

xllm/core/framework/request/sequence.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,6 @@ void Sequence::reset() {
383383
kv_state_.reset();
384384
host_kv_state_.reset();
385385
volatile_num_prompt_tokens_ = num_tokens_;
386-
preempted_ = false;
387386
}
388387

389388
void Sequence::add_shared_kv_blocks(std::vector<Block>&& blocks) {

xllm/core/framework/request/sequence.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,6 @@ class Sequence final {
192192
void close() { closed_ = true; }
193193
bool is_closed() const { return closed_; }
194194

195-
void preempted() { preempted_ = true; }
196-
bool is_preempted() const { return preempted_; }
197-
198195
// time between two tokens
199196
int64_t tbt(const absl::Time& now);
200197
// set sequence ttft
@@ -324,9 +321,6 @@ class Sequence final {
324321
// is the sequence closed.
325322
bool closed_ = false;
326323

327-
// is the sequence preempted.
328-
bool preempted_ = false;
329-
330324
// dp_rank
331325
int32_t dp_rank_ = -1;
332326

xllm/core/framework/xtensor/xtensor_manager_pool.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ class XTensorManagerPool final : public KVCacheManager {
7474
"manager pool";
7575
}
7676

77-
void set_offload_callback(
77+
void postprocess_offload(
7878
std::vector<std::vector<folly::SemiFuture<uint32_t>>>& futures) override {
79-
LOG(FATAL) << "set_offload_callback is not implemented for page "
79+
LOG(FATAL) << "postprocess_offload is not implemented for page "
8080
"manager pool";
8181
}
8282

0 commit comments

Comments
 (0)