Skip to content

Commit c8ba413

Browse files
committed
DPL: recycle buffers when sending metrics
This should reduce the memory churn
1 parent a8b684c commit c8ba413

4 files changed

Lines changed: 55 additions & 8 deletions

File tree

Framework/Core/src/DPLWebSocket.cxx

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,7 @@ struct WriteRequestContext {
516516
struct BulkWriteRequestContext {
517517
std::vector<uv_buf_t> buffers;
518518
ServiceRegistryRef ref;
519+
std::vector<char*>* freeList = nullptr; // if non-null, return chunks here instead of freeing
519520
};
520521

521522
void ws_client_write_callback(uv_write_t* h, int status)
@@ -543,11 +544,14 @@ void ws_client_bulk_write_callback(uv_write_t* h, int status)
543544
state.loopReason |= (DeviceState::WS_COMMUNICATION | DeviceState::WS_WRITING);
544545
if (status < 0) {
545546
LOG(error) << "uv_write error: " << uv_err_name(status);
546-
free(h);
547-
return;
548547
}
549-
if (context->buffers.size()) {
550-
for (auto& b : context->buffers) {
548+
// Return chunks to the free list (capped) so flushPending can pre-seed
549+
// the backlog for the next cycle without malloc-ing.
550+
constexpr size_t kMaxFreeChunks = 4;
551+
for (auto& b : context->buffers) {
552+
if (context->freeList && b.base && context->freeList->size() < kMaxFreeChunks) {
553+
context->freeList->push_back(b.base);
554+
} else {
551555
free(b.base);
552556
}
553557
}
@@ -584,4 +588,18 @@ void WSDPLClient::write(std::vector<uv_buf_t>& outputs)
584588
context->buffers.size(), ws_client_bulk_write_callback);
585589
}
586590

591+
void WSDPLClient::write(std::vector<uv_buf_t>& outputs, std::vector<char*>& freeList)
592+
{
593+
if (outputs.empty()) {
594+
return;
595+
}
596+
auto* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
597+
auto* context = new BulkWriteRequestContext{.ref = mContext->ref};
598+
context->buffers.swap(outputs);
599+
context->freeList = &freeList;
600+
write_req->data = context;
601+
uv_write(write_req, (uv_stream_t*)mStream, &context->buffers.at(0),
602+
context->buffers.size(), ws_client_bulk_write_callback);
603+
}
604+
587605
} // namespace o2::framework

Framework/Core/src/DPLWebSocket.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ struct WSDPLClient : public HTTPParser {
8989
/// Helper to write n buffers containing websockets frames to a server
9090
void write(std::vector<uv_buf_t>& outputs);
9191

92+
/// Like write() above but recycles the chunk memory into freeList once the
93+
/// kernel has consumed the buffers, instead of freeing it.
94+
void write(std::vector<uv_buf_t>& outputs, std::vector<char*>& freeList);
95+
9296
/// Dump headers
9397
void dumpHeaders();
9498
void sendHandshake();

Framework/Core/src/WSDriverClient.cxx

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,8 @@ void on_connect(uv_connect_t* connection, int status)
203203
return false;
204204
}
205205
return true;
206-
}, &name);
206+
},
207+
&name);
207208
});
208209

209210
client->observe("/signpost:disable", [](std::string_view cmd) {
@@ -221,7 +222,8 @@ void on_connect(uv_connect_t* connection, int status)
221222
return false;
222223
}
223224
return true;
224-
}, &name);
225+
},
226+
&name);
225227
});
226228

227229
// Client will be filled in the line after. I can probably have a single
@@ -263,6 +265,12 @@ WSDriverClient::WSDriverClient(ServiceRegistryRef registry, char const* ip, unsi
263265

264266
WSDriverClient::~WSDriverClient()
265267
{
268+
for (auto& buf : mBacklog) {
269+
free(buf.base);
270+
}
271+
for (auto* chunk : mFreeChunks) {
272+
free(chunk);
273+
}
266274
free(this->mAwakeMainThread);
267275
}
268276

@@ -329,8 +337,24 @@ void WSDriverClient::flushPending(ServiceRegistryRef mainThreadRef)
329337
printed1 = false;
330338
printed2 = false;
331339
}
332-
mClient->write(mBacklog);
333-
mBacklog.resize(0);
340+
// Return any pre-seeded but unused (zero-length) buffers to the free list
341+
// so we don't send pointless zero-byte writes to the kernel.
342+
while (!mBacklog.empty() && mBacklog.back().len == 0) {
343+
mFreeChunks.push_back(mBacklog.back().base);
344+
mBacklog.pop_back();
345+
}
346+
mClient->write(mBacklog, mFreeChunks);
347+
// Pre-seed mBacklog with one recycled chunk from the previous write's callback
348+
// so that the next encode_websocket_frames reuses memory instead of malloc-ing.
349+
// Only one chunk (64 KB) since encode_websocket_frames appends to outputs.back().
350+
if (!mFreeChunks.empty()) {
351+
mBacklog.push_back(uv_buf_init(mFreeChunks.back(), 0));
352+
mFreeChunks.pop_back();
353+
}
354+
for (auto* chunk : mFreeChunks) {
355+
free(chunk);
356+
}
357+
mFreeChunks.clear();
334358
}
335359

336360
} // namespace o2::framework

Framework/Core/src/WSDriverClient.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class WSDriverClient : public DriverClient
5656
std::mutex mClientMutex;
5757
ServiceRegistryRef mRegistry;
5858
std::vector<uv_buf_t> mBacklog;
59+
std::vector<char*> mFreeChunks; ///< recycled WebSocket buffer chunks (main-thread only)
5960
uv_async_t* mAwakeMainThread = nullptr;
6061
uv_connect_t* mConnection = nullptr;
6162
std::unique_ptr<WSDPLClient> mClient;

0 commit comments

Comments
 (0)