Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions tools/server/server-http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,3 +381,87 @@ void server_http_context::post(const std::string & path, server_http_context::ha
});
}

//
// server_http_client
//

server_http_client::server_http_client(
const std::string & method,
const std::string & host,
int port,
const std::string & path,
const std::map<std::string, std::string> & headers,
const std::string & body,
const std::function<bool()> should_stop) {
// shared between reader and writer threads
auto cli = std::make_shared<httplib::Client>(host, port);
auto pipe = std::make_shared<pipe_t<msg_t>>();

// setup Client
cli->set_connection_timeout(0, 200000); // 200 milliseconds
this->status = 500; // to be overwritten upon response
this->cleanup = [pipe]() {
pipe->close_read();
pipe->close_write();
};

// wire up the receive end of the pipe
this->next = [pipe, should_stop](std::string & out) -> bool {
msg_t msg;
bool has_next = pipe->read(msg, should_stop);
if (!msg.data.empty()) {
out = std::move(msg.data);
}
return has_next;
};

// wire up the HTTP client
// note: do NOT capture `this` pointer, as it may be destroyed before the thread ends
httplib::ResponseHandler response_handler = [pipe, cli](const httplib::Response & response) {
msg_t msg;
msg.status = response.status;
for (const auto & [key, value] : response.headers) {
msg.headers[key] = value;
}
pipe->write(std::move(msg)); // send headers first
return true;
};
httplib::ContentReceiverWithProgress content_receiver = [pipe](const char * data, size_t data_length, size_t, size_t) {
return pipe->write({{}, 0, std::string(data, data_length)}); // send data chunks
};

// prepare the request to destination server
httplib::Request req;
{
req.method = method;
req.path = path;
for (const auto & [key, value] : headers) {
req.set_header(key, value);
}
req.body = body;
req.response_handler = response_handler;
req.content_receiver = content_receiver;
}

// start the proxy thread
SRV_DBG("start proxy thread %s %s\n", req.method.c_str(), req.path.c_str());
this->thread = std::thread([cli, pipe, req]() {
auto result = cli->send(std::move(req));
if (result.error() != httplib::Error::Success) {
auto err_str = httplib::to_string(result.error());
SRV_ERR("http client error: %s\n", err_str.c_str());
pipe->write({{}, 500, ""}); // header
pipe->write({{}, 0, "proxy error: " + err_str}); // body
}
pipe->close_write(); // signal EOF to reader
SRV_DBG("%s", "client request thread ended\n");
});
this->thread.detach();

// wait for the first chunk (headers)
msg_t header;
pipe->read(header, should_stop);
SRV_DBG("%s", "received response headers\n");
this->status = header.status;
this->headers = header.headers;
}
72 changes: 72 additions & 0 deletions tools/server/server-http.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <string>
#include <thread>
#include <atomic>
#include <queue>
#include <mutex>

// generator-like API for HTTP response generation
// this object response with one of the 2 modes:
Expand Down Expand Up @@ -74,3 +76,73 @@ struct server_http_context {
// for debugging
std::string listening_address;
};

// simple HTTP client with blocking API
struct server_http_client : server_http_res {
std::function<void()> cleanup = nullptr;
public:
server_http_client(const std::string & method,
const std::string & host,
int port,
const std::string & path,
const std::map<std::string, std::string> & headers,
const std::string & body,
const std::function<bool()> should_stop);
~server_http_client() {
if (cleanup) {
cleanup();
}
}
private:
std::thread thread;
struct msg_t {
std::map<std::string, std::string> headers;
int status = 0;
std::string data;
};
// simple implementation of a pipe
template<typename T>
struct pipe_t {
std::mutex mutex;
std::condition_variable cv;
std::queue<T> queue;
std::atomic<bool> writer_closed{false};
std::atomic<bool> reader_closed{false};
void close_write() {
writer_closed.store(true);
cv.notify_all();
}
void close_read() {
reader_closed.store(true);
cv.notify_all();
}
bool read(T & output, const std::function<bool()> & should_stop) {
std::unique_lock<std::mutex> lk(mutex);
constexpr auto poll_interval = std::chrono::milliseconds(500);
while (true) {
if (!queue.empty()) {
output = std::move(queue.front());
queue.pop();
return true;
}
if (writer_closed.load()) {
return false; // clean EOF
}
if (should_stop()) {
close_read(); // signal broken pipe to writer
return false; // cancelled / reader no longer alive
}
cv.wait_for(lk, poll_interval);
}
}
bool write(T && data) {
std::lock_guard<std::mutex> lk(mutex);
if (reader_closed.load()) {
return false; // broken pipe
}
queue.push(std::move(data));
cv.notify_one();
return true;
}
};
};
7 changes: 6 additions & 1 deletion tools/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5562,7 +5562,7 @@ int main(int argc, char ** argv) {
ctx_http.post("/completions", ex_wrapper(routes.post_completions));
ctx_http.post("/v1/completions", ex_wrapper(routes.post_completions_oai));
ctx_http.post("/chat/completions", ex_wrapper(routes.post_chat_completions));
ctx_http.post("/v1/chat/completions", ex_wrapper(routes.post_chat_completions));
//ctx_http.post("/v1/chat/completions", ex_wrapper(routes.post_chat_completions));
ctx_http.post("/api/chat", ex_wrapper(routes.post_chat_completions)); // ollama specific endpoint
ctx_http.post("/infill", ex_wrapper(routes.post_infill));
ctx_http.post("/embedding", ex_wrapper(routes.post_embeddings)); // legacy
Expand All @@ -5581,6 +5581,11 @@ int main(int argc, char ** argv) {
// Save & load slots
ctx_http.get ("/slots", ex_wrapper(routes.get_slots));
ctx_http.post("/slots/:id_slot", ex_wrapper(routes.post_slots));
ctx_http.post("/v1/chat/completions", [&params](const server_http_req & req) {
return std::unique_ptr<server_http_client>(new server_http_client(
"POST", params.hostname, 8080, "/chat/completions", req.headers, req.body, req.should_stop
));
});

//
// Start the server
Expand Down
Loading