Skip to content

Commit 5ac51ad

Browse files
DragonFivemaxiaolong.maxwell
authored andcommitted
feat: add rec proto,serivce and utils for rec framework[2/6].
1 parent 2e2a304 commit 5ac51ad

File tree

15 files changed

+613
-54
lines changed

15 files changed

+613
-54
lines changed

xllm/api_service/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ cc_library(
88
api_service_impl.h
99
call.h
1010
completion_service_impl.h
11+
rec_completion_service_impl.h
1112
chat_service_impl.h
1213
embedding_service_impl.h
1314
image_generation_service_impl.h
@@ -23,6 +24,7 @@ cc_library(
2324
api_service.cpp
2425
call.cpp
2526
completion_service_impl.cpp
27+
rec_completion_service_impl.cpp
2628
chat_service_impl.cpp
2729
embedding_service_impl.cpp
2830
image_generation_service_impl.cpp

xllm/api_service/api_service.cpp

Lines changed: 71 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ limitations under the License.
2727
#include "core/common/metrics.h"
2828
#include "core/runtime/dit_master.h"
2929
#include "core/runtime/llm_master.h"
30+
// TODO. add following when next pr.
31+
// #include "core/runtime/rec_master.h"
3032
#include "core/runtime/vlm_master.h"
3133
#include "core/util/closure_guard.h"
3234
#include "embedding.pb.h"
@@ -70,6 +72,11 @@ APIService::APIService(Master* master,
7072
image_generation_service_impl_ =
7173
std::make_unique<ImageGenerationServiceImpl>(
7274
dynamic_cast<DiTMaster*>(master), model_names);
75+
} else if (FLAGS_backend == "rec") {
76+
// TODO. delete this when next pr.
77+
using RecMaster = LLMMaster;
78+
rec_completion_service_impl_ = std::make_unique<RecCompletionServiceImpl>(
79+
dynamic_cast<RecMaster*>(master), model_names);
7380
}
7481
models_service_impl_ =
7582
ServiceImplFactory<ModelsServiceImpl>::create_service_impl(
@@ -80,81 +87,97 @@ void APIService::Completions(::google::protobuf::RpcController* controller,
8087
const proto::CompletionRequest* request,
8188
proto::CompletionResponse* response,
8289
::google::protobuf::Closure* done) {
83-
// TODO with xllm-service
84-
}
85-
86-
void APIService::CompletionsHttp(::google::protobuf::RpcController* controller,
87-
const proto::HttpRequest* request,
88-
proto::HttpResponse* response,
89-
::google::protobuf::Closure* done) {
9090
xllm::ClosureGuard done_guard(
9191
done,
9292
std::bind(request_in_metric, nullptr),
9393
std::bind(request_out_metric, (void*)controller));
9494
if (!request || !response || !controller) {
95-
LOG(ERROR) << "brpc request | respose | controller is null";
95+
LOG(ERROR) << "brpc request | respose | controller is null.";
9696
return;
9797
}
98+
auto ctrl = reinterpret_cast<brpc::Controller*>(controller);
9899

99-
auto arena = response->GetArena();
100+
if (FLAGS_backend == "llm" || FLAGS_backend == "vlm") {
101+
CHECK(completion_service_impl_) << " completion service is invalid.";
102+
std::shared_ptr<Call> call = std::make_shared<CompletionCall>(
103+
ctrl,
104+
done_guard.release(),
105+
const_cast<proto::CompletionRequest*>(request),
106+
response);
107+
completion_service_impl_->process_async(call);
108+
} else if (FLAGS_backend == "rec") {
109+
CHECK(rec_completion_service_impl_)
110+
<< " rec completion service is invalid.";
111+
std::shared_ptr<Call> call = std::make_shared<CompletionCall>(
112+
ctrl,
113+
done_guard.release(),
114+
const_cast<proto::CompletionRequest*>(request),
115+
response);
116+
rec_completion_service_impl_->process_async(call);
117+
}
118+
}
119+
120+
namespace {
121+
template <typename Call, typename Service>
122+
void CommonCompletionsImpl(std::unique_ptr<Service>& service,
123+
xllm::ClosureGuard& guard,
124+
::google::protobuf::Arena* arena,
125+
brpc::Controller* ctrl) {
100126
auto req_pb =
101-
google::protobuf::Arena::CreateMessage<proto::CompletionRequest>(arena);
127+
google::protobuf::Arena::CreateMessage<typename Call::ReqType>(arena);
102128
auto resp_pb =
103-
google::protobuf::Arena::CreateMessage<proto::CompletionResponse>(arena);
129+
google::protobuf::Arena::CreateMessage<typename Call::ResType>(arena);
104130

105-
auto ctrl = reinterpret_cast<brpc::Controller*>(controller);
106131
std::string error;
107132
json2pb::Json2PbOptions options;
108133
butil::IOBuf& buf = ctrl->request_attachment();
109134
butil::IOBufAsZeroCopyInputStream iobuf_stream(buf);
110135
auto st = json2pb::JsonToProtoMessage(&iobuf_stream, req_pb, options, &error);
111136
if (!st) {
112137
ctrl->SetFailed(error);
113-
LOG(ERROR) << "parse json to proto failed: " << error;
138+
LOG(ERROR) << "parse json to proto failed: " << buf.to_string();
114139
return;
115140
}
116141

117-
std::shared_ptr<Call> call = std::make_shared<CompletionCall>(
118-
ctrl, done_guard.release(), req_pb, resp_pb);
119-
completion_service_impl_->process_async(call);
142+
auto call = std::make_shared<Call>(ctrl, guard.release(), req_pb, resp_pb);
143+
service->process_async(call);
120144
}
145+
} // namespace
121146

122-
void APIService::ChatCompletions(::google::protobuf::RpcController* controller,
123-
const proto::ChatRequest* request,
124-
proto::ChatResponse* response,
147+
void APIService::CompletionsHttp(::google::protobuf::RpcController* controller,
148+
const proto::HttpRequest* request,
149+
proto::HttpResponse* response,
125150
::google::protobuf::Closure* done) {
126-
// TODO with xllm-service
127-
}
128-
129-
namespace {
130-
template <typename ChatCall, typename Service>
131-
void ChatCompletionsImpl(std::unique_ptr<Service>& service,
132-
xllm::ClosureGuard& guard,
133-
::google::protobuf::Arena* arena,
134-
brpc::Controller* ctrl) {
135-
auto req_pb =
136-
google::protobuf::Arena::CreateMessage<typename ChatCall::ReqType>(arena);
137-
auto resp_pb =
138-
google::protobuf::Arena::CreateMessage<typename ChatCall::ResType>(arena);
151+
xllm::ClosureGuard done_guard(
152+
done,
153+
std::bind(request_in_metric, nullptr),
154+
std::bind(request_out_metric, (void*)controller));
155+
if (!request || !response || !controller) {
156+
LOG(ERROR) << "brpc request | respose | controller is null";
157+
return;
158+
}
139159

140-
std::string attachment = std::move(ctrl->request_attachment().to_string());
141-
std::string error;
160+
auto arena = response->GetArena();
161+
auto ctrl = reinterpret_cast<brpc::Controller*>(controller);
142162

143-
google::protobuf::util::JsonParseOptions options;
144-
options.ignore_unknown_fields = true;
145-
auto json_status =
146-
google::protobuf::util::JsonStringToMessage(attachment, req_pb, options);
147-
if (!json_status.ok()) {
148-
ctrl->SetFailed(json_status.ToString());
149-
LOG(ERROR) << "parse json to proto failed: " << json_status.ToString();
150-
return;
163+
if (FLAGS_backend == "llm" || FLAGS_backend == "vlm") {
164+
CHECK(completion_service_impl_) << " completion service is invalid.";
165+
CommonCompletionsImpl<CompletionCall, CompletionServiceImpl>(
166+
completion_service_impl_, done_guard, arena, ctrl);
167+
} else if (FLAGS_backend == "rec") {
168+
CHECK(rec_completion_service_impl_)
169+
<< " rec completion service is invalid.";
170+
CommonCompletionsImpl<CompletionCall, RecCompletionServiceImpl>(
171+
rec_completion_service_impl_, done_guard, arena, ctrl);
151172
}
173+
}
152174

153-
auto call = std::make_shared<ChatCall>(
154-
ctrl, guard.release(), req_pb, resp_pb, arena != nullptr /*use_arena*/);
155-
service->process_async(call);
175+
void APIService::ChatCompletions(::google::protobuf::RpcController* controller,
176+
const proto::ChatRequest* request,
177+
proto::ChatResponse* response,
178+
::google::protobuf::Closure* done) {
179+
// TODO with xllm-service
156180
}
157-
} // namespace
158181

159182
void APIService::ChatCompletionsHttp(
160183
::google::protobuf::RpcController* controller,
@@ -175,12 +198,11 @@ void APIService::ChatCompletionsHttp(
175198
if (FLAGS_backend == "llm") {
176199
auto arena = response->GetArena();
177200
CHECK(chat_service_impl_) << " chat service is invalid.";
178-
ChatCompletionsImpl<ChatCall, ChatServiceImpl>(
201+
CommonCompletionsImpl<ChatCall, ChatServiceImpl>(
179202
chat_service_impl_, done_guard, arena, ctrl);
180203
} else if (FLAGS_backend == "vlm") {
181204
CHECK(mm_chat_service_impl_) << " mm chat service is invalid.";
182-
// TODO: fix me - temporarily using heap allocation instead of arena
183-
ChatCompletionsImpl<MMChatCall, MMChatServiceImpl>(
205+
CommonCompletionsImpl<MMChatCall, MMChatServiceImpl>(
184206
mm_chat_service_impl_, done_guard, nullptr, ctrl);
185207
}
186208
}

xllm/api_service/api_service.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ limitations under the License.
2121
#include "image_generation_service_impl.h"
2222
#include "models_service_impl.h"
2323
#include "qwen3_rerank_service_impl.h"
24+
#include "rec_completion_service_impl.h"
2425
#include "rerank_service_impl.h"
2526
#include "xllm_service.pb.h"
2627

@@ -124,6 +125,7 @@ class APIService : public proto::XllmAPIService {
124125
std::unique_ptr<ModelsServiceImpl> models_service_impl_;
125126
std::unique_ptr<ImageGenerationServiceImpl> image_generation_service_impl_;
126127
std::unique_ptr<RerankServiceImpl> rerank_service_impl_;
128+
std::unique_ptr<RecCompletionServiceImpl> rec_completion_service_impl_;
127129
};
128130

129131
} // namespace xllm

0 commit comments

Comments
 (0)