diff --git a/tools/server/server-http.cpp b/tools/server/server-http.cpp index e2f91467faf96..139643fc38749 100644 --- a/tools/server/server-http.cpp +++ b/tools/server/server-http.cpp @@ -381,3 +381,87 @@ void server_http_context::post(const std::string & path, server_http_context::ha }); } +// +// server_http_client +// + +server_http_client::server_http_client( + const std::string & method, + const std::string & host, + int port, + const std::string & path, + const std::map & headers, + const std::string & body, + const std::function should_stop) { + // shared between reader and writer threads + auto cli = std::make_shared(host, port); + auto pipe = std::make_shared>(); + + // setup Client + cli->set_connection_timeout(0, 200000); // 200 milliseconds + this->status = 500; // to be overwritten upon response + this->cleanup = [pipe]() { + pipe->close_read(); + pipe->close_write(); + }; + + // wire up the receive end of the pipe + this->next = [pipe, should_stop](std::string & out) -> bool { + msg_t msg; + bool has_next = pipe->read(msg, should_stop); + if (!msg.data.empty()) { + out = std::move(msg.data); + } + return has_next; + }; + + // wire up the HTTP client + // note: do NOT capture `this` pointer, as it may be destroyed before the thread ends + httplib::ResponseHandler response_handler = [pipe, cli](const httplib::Response & response) { + msg_t msg; + msg.status = response.status; + for (const auto & [key, value] : response.headers) { + msg.headers[key] = value; + } + pipe->write(std::move(msg)); // send headers first + return true; + }; + httplib::ContentReceiverWithProgress content_receiver = [pipe](const char * data, size_t data_length, size_t, size_t) { + return pipe->write({{}, 0, std::string(data, data_length)}); // send data chunks + }; + + // prepare the request to destination server + httplib::Request req; + { + req.method = method; + req.path = path; + for (const auto & [key, value] : headers) { + req.set_header(key, value); + } + req.body = body; + req.response_handler = response_handler; + req.content_receiver = content_receiver; + } + + // start the proxy thread + SRV_DBG("start proxy thread %s %s\n", req.method.c_str(), req.path.c_str()); + this->thread = std::thread([cli, pipe, req]() { + auto result = cli->send(std::move(req)); + if (result.error() != httplib::Error::Success) { + auto err_str = httplib::to_string(result.error()); + SRV_ERR("http client error: %s\n", err_str.c_str()); + pipe->write({{}, 500, ""}); // header + pipe->write({{}, 0, "proxy error: " + err_str}); // body + } + pipe->close_write(); // signal EOF to reader + SRV_DBG("%s", "client request thread ended\n"); + }); + this->thread.detach(); + + // wait for the first chunk (headers) + msg_t header; + pipe->read(header, should_stop); + SRV_DBG("%s", "received response headers\n"); + this->status = header.status; + this->headers = header.headers; +} diff --git a/tools/server/server-http.h b/tools/server/server-http.h index 2e632e666d8b0..0198f0189319a 100644 --- a/tools/server/server-http.h +++ b/tools/server/server-http.h @@ -7,6 +7,8 @@ #include #include #include +#include +#include // generator-like API for HTTP response generation // this object response with one of the 2 modes: @@ -74,3 +76,73 @@ struct server_http_context { // for debugging std::string listening_address; }; + +// simple HTTP client with blocking API +struct server_http_client : server_http_res { + std::function cleanup = nullptr; +public: + server_http_client(const std::string & method, + const std::string & host, + int port, + const std::string & path, + const std::map & headers, + const std::string & body, + const std::function should_stop); + ~server_http_client() { + if (cleanup) { + cleanup(); + } + } +private: + std::thread thread; + struct msg_t { + std::map headers; + int status = 0; + std::string data; + }; + // simple implementation of a pipe + template + struct pipe_t { + std::mutex mutex; + std::condition_variable cv; + std::queue queue; + std::atomic writer_closed{false}; + std::atomic reader_closed{false}; + void close_write() { + writer_closed.store(true); + cv.notify_all(); + } + void close_read() { + reader_closed.store(true); + cv.notify_all(); + } + bool read(T & output, const std::function & should_stop) { + std::unique_lock lk(mutex); + constexpr auto poll_interval = std::chrono::milliseconds(500); + while (true) { + if (!queue.empty()) { + output = std::move(queue.front()); + queue.pop(); + return true; + } + if (writer_closed.load()) { + return false; // clean EOF + } + if (should_stop()) { + close_read(); // signal broken pipe to writer + return false; // cancelled / reader no longer alive + } + cv.wait_for(lk, poll_interval); + } + } + bool write(T && data) { + std::lock_guard lk(mutex); + if (reader_closed.load()) { + return false; // broken pipe + } + queue.push(std::move(data)); + cv.notify_one(); + return true; + } + }; +}; diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 1c9e9a58d7daf..ca3f3e326332a 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -5562,7 +5562,7 @@ int main(int argc, char ** argv) { ctx_http.post("/completions", ex_wrapper(routes.post_completions)); ctx_http.post("/v1/completions", ex_wrapper(routes.post_completions_oai)); ctx_http.post("/chat/completions", ex_wrapper(routes.post_chat_completions)); - ctx_http.post("/v1/chat/completions", ex_wrapper(routes.post_chat_completions)); + //ctx_http.post("/v1/chat/completions", ex_wrapper(routes.post_chat_completions)); ctx_http.post("/api/chat", ex_wrapper(routes.post_chat_completions)); // ollama specific endpoint ctx_http.post("/infill", ex_wrapper(routes.post_infill)); ctx_http.post("/embedding", ex_wrapper(routes.post_embeddings)); // legacy @@ -5581,6 +5581,11 @@ int main(int argc, char ** argv) { // Save & load slots ctx_http.get ("/slots", ex_wrapper(routes.get_slots)); ctx_http.post("/slots/:id_slot", ex_wrapper(routes.post_slots)); + ctx_http.post("/v1/chat/completions", [¶ms](const server_http_req & req) { + return std::unique_ptr(new server_http_client( + "POST", params.hostname, 8080, "/chat/completions", req.headers, req.body, req.should_stop + )); + }); // // Start the server