Skip to content

Commit 0d645c3

Browse files
committed
feat: add layer-wise KV cache H2D copy optimization.
1 parent ca6dba2 commit 0d645c3

File tree

16 files changed

+188
-163
lines changed

16 files changed

+188
-163
lines changed

xllm/core/framework/kv_cache/kv_cache_store.cpp

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,16 +128,14 @@ uint32_t KVCacheStore::batch_put(
128128
return block_transfer_info.size();
129129
}
130130

131-
uint64_t success_cnt = str_keys.size();
131+
uint64_t success_cnt = block_transfer_info.size() - str_keys.size();
132132
auto results = client_ptr_->BatchPut(str_keys, slices, rep_config_);
133133

134134
for (int i = 0; i < str_keys.size(); i++) {
135135
if (!results[i].has_value()) {
136-
success_cnt = i;
137-
// LOG(ERROR) << "success_cnt: " << success_cnt
138-
// << ", failed to BatchPut: " << toString(results[i].error());
139136
break;
140137
}
138+
success_cnt++;
141139
}
142140
return success_cnt;
143141
}
@@ -179,15 +177,13 @@ uint32_t KVCacheStore::batch_get(
179177
return 0;
180178
}
181179

182-
uint64_t success_cnt = str_keys.size();
180+
uint64_t success_cnt = 0;
183181
auto results = client_ptr_->BatchGet(str_keys, slices);
184182
for (int i = 0; i < str_keys.size(); i++) {
185183
if (!results[i].has_value()) {
186-
success_cnt = i;
187-
// LOG(ERROR) << "success_cnt: " << success_cnt
188-
// << ", failed to BatchGet: " << toString(results[i].error());
189184
break;
190185
}
186+
success_cnt++;
191187
}
192188
return success_cnt;
193189
}

xllm/core/framework/model/model_input_params.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ struct ModelInputParams {
127127
params.paged_kv_indices = safe_to(paged_kv_indices, device);
128128
params.paged_kv_last_page_len = safe_to(paged_kv_last_page_len, device);
129129

130+
params.batch_id = batch_id;
131+
130132
return params;
131133
}
132134

@@ -212,6 +214,8 @@ struct ModelInputParams {
212214

213215
#if defined(USE_NPU)
214216
std::shared_ptr<NPULayerSynchronizerImpl> layer_synchronizer = nullptr;
217+
std::shared_ptr<NPULayerSynchronizerImpl> layer_wise_load_synchronizer =
218+
nullptr;
215219
#endif
216220

217221
DpEpPaddingData dp_ep_padding_data;

xllm/core/platform/device.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@ int Device::synchronize_default_stream() {
110110
return 0;
111111
}
112112

113-
std::unique_ptr<Stream> Device::get_stream_from_pool() {
114-
return std::make_unique<Stream>();
113+
std::unique_ptr<Stream> Device::get_stream_from_pool(const int32_t timeout) {
114+
return std::make_unique<Stream>(timeout);
115115
}
116116

117117
} // namespace xllm

xllm/core/platform/device.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class Device {
4646
int64_t free_memory();
4747

4848
int synchronize_default_stream();
49-
std::unique_ptr<Stream> get_stream_from_pool();
49+
std::unique_ptr<Stream> get_stream_from_pool(const int32_t timeout = -1);
5050

5151
private:
5252
struct DeviceMem {

xllm/core/platform/npu/npu_layer_synchronizer.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ limitations under the License.
1919

2020
namespace xllm {
2121

22-
NPULayerSynchronizerImpl::NPULayerSynchronizerImpl(const int64_t num_layers)
23-
: events_(num_layers, nullptr), event_record_flags_(num_layers) {
22+
NPULayerSynchronizerImpl::NPULayerSynchronizerImpl(const int64_t num_layers,
23+
const int32_t timeout)
24+
: events_(num_layers, nullptr),
25+
event_record_flags_(num_layers),
26+
timeout_(timeout) {
2427
uint32_t flags = ACL_EVENT_SYNC;
2528
for (int64_t i = 0; i < num_layers; ++i) {
2629
auto ret = aclrtCreateEventWithFlag(&events_[i], flags);
@@ -45,9 +48,9 @@ std::atomic<bool>* NPULayerSynchronizerImpl::get_event_flag(
4548

4649
bool NPULayerSynchronizerImpl::synchronize_layer(const int64_t layer_index) {
4750
while (!event_record_flags_[layer_index].load(std::memory_order_acquire));
48-
auto ret = aclrtSynchronizeEvent(events_[layer_index]);
51+
auto ret = aclrtSynchronizeEventWithTimeout(events_[layer_index], timeout_);
4952
if (ret != ACL_SUCCESS) {
50-
LOG(ERROR) << "Synchronize event failed.";
53+
LOG(ERROR) << "Synchronize event failed: " << ret;
5154
return false;
5255
}
5356
return true;

xllm/core/platform/npu/npu_layer_synchronizer.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ namespace xllm {
2424

2525
class NPULayerSynchronizerImpl {
2626
public:
27-
NPULayerSynchronizerImpl(const int64_t num_layers);
27+
NPULayerSynchronizerImpl(const int64_t num_layers,
28+
const int32_t timeout = -1);
2829
virtual ~NPULayerSynchronizerImpl();
2930

3031
aclrtEvent* get_event(const int64_t layer_index);
@@ -34,6 +35,7 @@ class NPULayerSynchronizerImpl {
3435
private:
3536
std::vector<aclrtEvent> events_;
3637
std::vector<std::atomic<bool>> event_record_flags_;
38+
const int32_t timeout_;
3739
};
3840

3941
} // namespace xllm

xllm/core/platform/stream.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,20 @@ limitations under the License.
1818
namespace xllm {
1919

2020
#if defined(USE_NPU)
21-
Stream::Stream() : stream_(c10_npu::getNPUStreamFromPool()) {}
21+
Stream::Stream(const int32_t timeout)
22+
: stream_(c10_npu::getNPUStreamFromPool()), timeout_(timeout) {}
2223
#elif defined(USE_MLU)
23-
Stream::Stream() : stream_(torch_mlu::getStreamFromPool()) {}
24+
Stream::Stream(const int32_t timeout)
25+
: stream_(torch_mlu::getStreamFromPool()), timeout_(timeout) {}
2426
#elif defined(USE_CUDA)
25-
Stream::Stream() : stream_(c10::cuda::getStreamFromPool()) {}
27+
Stream::Stream(const int32_t timeout)
28+
: stream_(c10::cuda::getStreamFromPool()), timeout_(timeout) {}
2629
#endif
2730

2831
int Stream::synchronize() const {
2932
#if defined(USE_NPU)
30-
return aclrtSynchronizeStream(stream_.stream());
31-
#else
33+
return aclrtSynchronizeStreamWithTimeout(stream_.stream(), timeout_);
34+
#elif defined(USE_MLU)
3235
stream_.unwrap().synchronize();
3336
return 0;
3437
#endif

xllm/core/platform/stream.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ namespace xllm {
3838

3939
class Stream {
4040
public:
41-
Stream();
41+
Stream(const int32_t timeout = -1);
4242
~Stream() = default;
4343

4444
Stream(const Stream&) = delete;
@@ -48,6 +48,11 @@ class Stream {
4848

4949
int synchronize() const;
5050
c10::StreamGuard set_stream_guard() const;
51+
#if defined(USE_NPU)
52+
c10_npu::NPUStream* get_stream() { return &stream_; }
53+
#elif defined(USE_MLU)
54+
torch_mlu::MLUStream* get_stream() { return &stream_; }
55+
#endif
5156

5257
private:
5358
#if defined(USE_NPU)
@@ -57,6 +62,7 @@ class Stream {
5762
#elif defined(USE_CUDA)
5863
c10::cuda::CUDAStream stream_;
5964
#endif
65+
const int32_t timeout_;
6066
};
6167

6268
} // namespace xllm

0 commit comments

Comments
 (0)