@@ -418,13 +418,12 @@ void WorkerService::PullKVCache(::google::protobuf::RpcController* controller,
418418
419419void WorkerService::TransferBlocks (
420420 ::google::protobuf::RpcController* controller,
421- const ::xllm:: proto::BlockTransferInfos* req,
422- ::xllm:: proto::TransferStatus* resp,
421+ const proto::BlockTransferInfos* req,
422+ proto::TransferStatus* resp,
423423 ::google::protobuf::Closure* done) {
424424 brpc::ClosureGuard done_guard (done);
425425 std::vector<BlockTransferInfo> block_transfer_info;
426- uint64_t batch_id;
427- proto_to_block_transfer_info (*req, batch_id, block_transfer_info);
426+ uint64_t batch_id = proto_to_block_transfer_info (*req, block_transfer_info);
428427
429428 if (batch_id == 0x0 ) {
430429 resp->set_success_cnt (worker_->transfer_kv_blocks (block_transfer_info));
@@ -434,6 +433,114 @@ void WorkerService::TransferBlocks(
434433 return ;
435434}
436435
436+ class ServerStreamHandler : public brpc ::StreamInputHandler {
437+ private:
438+ std::promise<void > close_promise_;
439+ std::atomic<bool > promise_set_{false };
440+
441+ public:
442+ ~ServerStreamHandler () {
443+ if (!promise_set_.exchange (true )) {
444+ try {
445+ close_promise_.set_value ();
446+ } catch (const std::exception& e) {
447+ LOG (WARNING) << " Exception in destructor: " << e.what ();
448+ }
449+ }
450+ }
451+
452+ std::future<void > get_close_future () { return close_promise_.get_future (); }
453+
454+ int on_received_messages (brpc::StreamId id,
455+ butil::IOBuf* const messages[],
456+ size_t size) override {
457+ LOG (WARNING) << " ServerStreamHandler::on_received_messages not implement." ;
458+ return 0 ;
459+ }
460+
461+ void on_closed (brpc::StreamId id) override {
462+ if (!promise_set_.exchange (true )) {
463+ close_promise_.set_value ();
464+ }
465+ }
466+
467+ void on_idle_timeout (brpc::StreamId id) override {
468+ if (!promise_set_.exchange (true )) {
469+ LOG (WARNING) << " Stream idle timeout: " << id;
470+ close_promise_.set_value ();
471+ }
472+ }
473+ };
474+
475+ void WorkerService::PrefetchFromStorage (
476+ google::protobuf::RpcController* controller,
477+ const proto::BlockTransferInfos* req,
478+ proto::Status* resp,
479+ google::protobuf::Closure* done) {
480+ brpc::ClosureGuard done_guard (done);
481+ brpc::Controller* cntl = static_cast <brpc::Controller*>(controller);
482+
483+ auto stream_handler = std::make_unique<ServerStreamHandler>();
484+ auto stream_id = std::make_unique<brpc::StreamId>();
485+ brpc::StreamOptions stream_options;
486+ stream_options.handler = stream_handler.get ();
487+ if (brpc::StreamAccept (stream_id.get (), *cntl, &stream_options) != 0 ) {
488+ resp->set_ok (false );
489+ LOG (ERROR) << " Failed to accept stream!" ;
490+ return ;
491+ }
492+
493+ std::vector<BlockTransferInfo> block_transfer_info;
494+ proto_to_block_transfer_info (*req, block_transfer_info);
495+
496+ copy_threadpool_.schedule (
497+ [this ,
498+ block_transfer_info = std::move (block_transfer_info),
499+ stream_id = std::move (stream_id),
500+ stream_handler = std::move (stream_handler)]() mutable {
501+ Slice<BlockTransferInfo> transfer_slice{block_transfer_info};
502+ auto close_future = stream_handler->get_close_future ();
503+ bool is_completed = false ;
504+
505+ for (size_t i = 0 ; i < transfer_slice.size ();
506+ i += stream_copy_batch_size_) {
507+ auto current_slice = transfer_slice.slice (
508+ i, std::min (i + stream_copy_batch_size_, transfer_slice.size ()));
509+
510+ auto success_cnt = worker_->prefetch_from_storage (current_slice);
511+
512+ if (success_cnt != current_slice.size () ||
513+ i + stream_copy_batch_size_ >= transfer_slice.size ()) {
514+ is_completed = true ;
515+ }
516+
517+ butil::IOBuf buf;
518+ buf.append (std::to_string (success_cnt));
519+ if (brpc::StreamWrite (*stream_id.get (), buf) != 0 ) {
520+ brpc::StreamClose (*stream_id.get ());
521+ is_completed = false ;
522+ break ;
523+ }
524+
525+ if (is_completed) {
526+ if (success_cnt != 0 ) {
527+ butil::IOBuf buf_end;
528+ buf_end.append (" 0" );
529+ brpc::StreamWrite (*stream_id.get (), buf_end);
530+ }
531+ break ;
532+ }
533+ }
534+ if (is_completed) {
535+ close_future.wait ();
536+ }
537+ brpc::StreamClose (*stream_id.get ());
538+ });
539+
540+ resp->set_ok (true );
541+ return ;
542+ }
543+
437544void WorkerService::GetDeviceInfo (::google::protobuf::RpcController* controller,
438545 const proto::Empty* req,
439546 proto::DeviceInfo* resp,
0 commit comments