Skip to content
This repository was archived by the owner on Mar 25, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sim/src/test_dmclock.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ namespace crimson {
};

using DmcQueue = dmc::PushPriorityQueue<ClientId,sim::TestRequest>;
using DmcServiceTracker = dmc::ServiceTracker<ServerId,dmc::BorrowingTracker>;

using DmcServer = sim::SimulatedServer<DmcQueue,
dmc::ReqParams,
dmc::PhaseType,
DmcAccum>;

using DmcClient = sim::SimulatedClient<dmc::ServiceTracker<ServerId>,
using DmcClient = sim::SimulatedClient<DmcServiceTracker,
dmc::ReqParams,
dmc::PhaseType,
DmcAccum>;
Expand Down
12 changes: 6 additions & 6 deletions sim/src/test_dmclock_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,9 @@ int main(int argc, char* argv[]) {


void test::client_data(std::ostream& out,
test::MySim* sim,
test::MySim::ClientFilter client_disp_filter,
int head_w, int data_w, int data_prec) {
test::MySim* sim,
test::MySim::ClientFilter client_disp_filter,
int head_w, int data_w, int data_prec) {
// report how many ops were done by reservation and proportion for
// each client

Expand Down Expand Up @@ -270,9 +270,9 @@ void test::client_data(std::ostream& out,


void test::server_data(std::ostream& out,
test::MySim* sim,
test::MySim::ServerFilter server_disp_filter,
int head_w, int data_w, int data_prec) {
test::MySim* sim,
test::MySim::ServerFilter server_disp_filter,
int head_w, int data_w, int data_prec) {
out << std::setw(head_w) << "res_ops:";
int total_r = 0;
for (uint i = 0; i < sim->get_server_count(); ++i) {
Expand Down
155 changes: 118 additions & 37 deletions src/dmclock_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,132 @@

namespace crimson {
namespace dmclock {
struct ServerInfo {

// OrigTracker is a best-effort implementation of the the original
// dmClock calculations of delta and rho. It adheres to an
// interface, implemented via a template type, that allows it to
// be replaced with an alternative. The interface consists of the
// static create, prepare_req, resp_update, and get_last_delta
// functions.
class OrigTracker {
Counter delta_prev_req;
Counter rho_prev_req;
uint32_t my_delta;
uint32_t my_rho;

ServerInfo(Counter _delta_prev_req,
Counter _rho_prev_req) :
delta_prev_req(_delta_prev_req),
rho_prev_req(_rho_prev_req),
public:

OrigTracker(Counter global_delta,
Counter global_rho) :
delta_prev_req(global_delta),
rho_prev_req(global_rho),
my_delta(0),
my_rho(0)
{
// empty
{ /* empty */ }

static inline OrigTracker create(Counter the_delta, Counter the_rho) {
return OrigTracker(the_delta, the_rho);
}

inline void req_update(Counter delta, Counter rho) {
delta_prev_req = delta;
rho_prev_req = rho;
inline ReqParams prepare_req(Counter& the_delta, Counter& the_rho) {
Counter delta_out = 1 + the_delta - delta_prev_req - my_delta;
Counter rho_out = 1 + the_rho - rho_prev_req - my_rho;
delta_prev_req = the_delta;
rho_prev_req = the_rho;
my_delta = 0;
my_rho = 0;
return ReqParams(uint32_t(delta_out), uint32_t(rho_out));
}

inline void resp_update(PhaseType phase) {
inline void resp_update(PhaseType phase,
Counter& the_delta,
Counter& the_rho) {
++the_delta;
++my_delta;
if (phase == PhaseType::reservation) ++my_rho;
if (phase == PhaseType::reservation) {
++the_rho;
++my_rho;
}
}

inline Counter get_last_delta() const {
return delta_prev_req;
}
};
}; // struct OrigTracker


// BorrowingTracker always returns a positive delta and rho. If
// not enough responses have come in to allow that, we will borrow
// a future response and repay it later.
class BorrowingTracker {
Counter delta_prev_req;
Counter rho_prev_req;
Counter delta_borrow;
Counter rho_borrow;

public:

BorrowingTracker(Counter global_delta, Counter global_rho) :
delta_prev_req(global_delta),
rho_prev_req(global_rho),
delta_borrow(0),
rho_borrow(0)
{ /* empty */ }

static inline BorrowingTracker create(Counter the_delta,
Counter the_rho) {
return BorrowingTracker(the_delta, the_rho);
}

inline Counter calc_with_borrow(const Counter& global,
const Counter& previous,
Counter& borrow) {
Counter result = global - previous;
if (0 == result) {
// if no replies have come in, borrow one from the future
++borrow;
return 1;
} else if (result > borrow) {
// if we can give back all of what we borrowed, do so
result -= borrow;
borrow = 0;
return result;
} else {
// can only return part of what was borrowed in order to
// return positive
borrow = borrow - result + 1;
return 1;
}
}

inline ReqParams prepare_req(Counter& the_delta, Counter& the_rho) {
Counter delta_out =
calc_with_borrow(the_delta, delta_prev_req, delta_borrow);
Counter rho_out =
calc_with_borrow(the_rho, rho_prev_req, rho_borrow);
delta_prev_req = the_delta;
rho_prev_req = the_rho;
return ReqParams(uint32_t(delta_out), uint32_t(rho_out));
}

inline void resp_update(PhaseType phase,
Counter& the_delta,
Counter& the_rho) {
++the_delta;
if (phase == PhaseType::reservation) {
++the_rho;
}
}

inline Counter get_last_delta() const {
return delta_prev_req;
}
}; // struct BorrowingTracker


// S is server identifier type
template<typename S>
// T is the server info class that adheres to ServerTrackerIfc interface
template<typename S, typename T = BorrowingTracker>
class ServiceTracker {
// we don't want to include gtest.h just for FRIEND_TEST
friend class dmclock_client_server_erase_Test;
Expand All @@ -64,15 +158,15 @@ namespace crimson {

Counter delta_counter; // # reqs completed
Counter rho_counter; // # reqs completed via reservation
std::map<S,ServerInfo> server_map;
std::map<S,T> server_map;
mutable std::mutex data_mtx; // protects Counters and map

using DataGuard = std::lock_guard<decltype(data_mtx)>;

// clean config

std::deque<MarkPoint> clean_mark_points;
Duration clean_age; // age at which ServerInfo cleaned
Duration clean_age; // age at which server tracker cleaned

// NB: All threads declared at end, so they're destructed firs!

Expand Down Expand Up @@ -119,38 +213,25 @@ namespace crimson {
// this code can only run if a request did not precede the
// response or if the record was cleaned up b/w when
// the request was made and now
ServerInfo si(delta_counter, rho_counter);
si.resp_update(phase);
server_map.emplace(server_id, si);
} else {
it->second.resp_update(phase);
}

++delta_counter;
if (PhaseType::reservation == phase) {
++rho_counter;
auto i = server_map.emplace(server_id,
T::create(delta_counter, rho_counter));
it = i.first;
}
it->second.resp_update(phase, delta_counter, rho_counter);
}


/*
* Returns the ReqParams for the given server.
*/
ReqParams get_req_params(const S& server) {
DataGuard g(data_mtx);
auto it = server_map.find(server);
if (server_map.end() == it) {
server_map.emplace(server, ServerInfo(delta_counter, rho_counter));
server_map.emplace(server,
T::create(delta_counter, rho_counter));
return ReqParams(1, 1);
} else {
Counter delta =
1 + delta_counter - it->second.delta_prev_req - it->second.my_delta;
Counter rho =
1 + rho_counter - it->second.rho_prev_req - it->second.my_rho;

it->second.req_update(delta_counter, rho_counter);

return ReqParams(uint32_t(delta), uint32_t(rho));
return it->second.prepare_req(delta_counter, rho_counter);
}
}

Expand Down Expand Up @@ -182,7 +263,7 @@ namespace crimson {
i != server_map.end();
/* empty */) {
auto i2 = i++;
if (i2->second.delta_prev_req <= earliest) {
if (i2->second.get_last_delta() <= earliest) {
server_map.erase(i2);
}
}
Expand Down
Loading