Skip to content
This repository was archived by the owner on Sep 9, 2025. It is now read-only.

Commit 8d5b033

Browse files
author
hamidr
committed
Separation of concerns and single responsibility for event loop; Now we can implement file io event too
1 parent 9ce189b commit 8d5b033

File tree

9 files changed

+82
-132
lines changed

9 files changed

+82
-132
lines changed

includes/connection.hpp

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
#include <memory>
66
#include <tuple>
77

8-
#include "network/unix_socket.hpp"
9-
108
namespace async_redis {
119
namespace redis_impl
1210
{
@@ -21,7 +19,7 @@ namespace async_redis {
2119

2220
connection(InputOutputHandler &event_loop)
2321
: event_loop_(event_loop) {
24-
socket_ = std::make_shared<SocketType>(event_loop);
22+
socket_ = std::make_unique<SocketType>(event_loop);
2523
}
2624

2725
template<typename ...Args>
@@ -44,12 +42,10 @@ namespace async_redis {
4442
if (req_queue_.size() == 1)
4543
socket_->async_read(std::bind(&connection::reply_received, this, std::placeholders::_1, std::placeholders::_2));
4644
});
47-
4845
}
4946

5047
private:
51-
void reply_received(const char* data, ssize_t len) {
52-
48+
void reply_received(const char* data, int len) {
5349
ssize_t acc = 0;
5450

5551
while (acc < len && req_queue_.size()) {
@@ -68,7 +64,6 @@ namespace async_redis {
6864

6965
cb(parser);
7066
req_queue_.pop(); //free the resources
71-
7267
}
7368
}
7469

@@ -77,7 +72,7 @@ namespace async_redis {
7772
}
7873

7974
private:
80-
std::shared_ptr<SocketType> socket_;
75+
std::unique_ptr<SocketType> socket_;
8176
InputOutputHandler& event_loop_;
8277
std::queue<std::tuple<reply_cb_t, parser_t>> req_queue_;
8378
};

includes/event_loop/event_loop_ev.h

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,26 @@
66
#include <unordered_map>
77
#include <queue>
88

9-
#include "../network/async_socket.hpp"
10-
119
namespace async_redis {
1210
namespace event_loop
1311
{
1412
class event_loop_ev
1513
{
16-
using socket_id = int; //fd;
14+
using socket_id = int;
1715
using string = std::string;
1816
struct socket_queue;
1917

2018
public:
21-
using async_socket = ::async_redis::network::async_socket;
22-
using timeout_cb_t = std::function<void()>;
23-
using ready_cb_t = async_socket::ready_cb_t;
24-
using recv_cb_t = async_socket::recv_cb_t;
25-
19+
using action = std::function<void()>;
2620
using socket_identifier_t = std::unordered_map<socket_id, std::unique_ptr<socket_queue>>::iterator;
2721

2822
private:
2923
struct timer_watcher
3024
{
3125
ev_timer timer;
32-
timeout_cb_t timeout_cb;
26+
action timeout_cb;
3327

34-
timer_watcher(double time, const timeout_cb_t& cb)
28+
timer_watcher(double time, const action& cb)
3529
: timeout_cb(cb)
3630
{
3731
ev_timer_init (&timer, &event_loop_ev::timer_handler, time, 0.);
@@ -40,20 +34,16 @@ namespace async_redis {
4034

4135
struct socket_queue
4236
{
43-
async_socket& socket;
4437
event_loop_ev& loop_;
4538

4639
ev_io write_watcher;
4740
ev_io read_watcher;
4841

49-
using write_action = std::tuple<string, ready_cb_t>;
50-
using read_action = recv_cb_t;
51-
52-
std::queue<write_action> write_handlers;
53-
std::queue<read_action> read_handlers;
42+
std::queue<action> write_handlers;
43+
std::queue<action> read_handlers;
5444

55-
socket_queue(event_loop_ev& loop, int fd, async_socket& s)
56-
: loop_(loop), socket(s)
45+
socket_queue(event_loop_ev& loop, int fd)
46+
: loop_(loop)
5747
{
5848
ev_io_init(&read_watcher, &event_loop_ev::read_handler, fd, EV_READ);
5949
ev_io_init(&write_watcher, &event_loop_ev::write_handler, fd, EV_WRITE);
@@ -73,12 +63,12 @@ namespace async_redis {
7363
event_loop_ev(struct ev_loop *);
7464
void run();
7565

76-
socket_identifier_t watch(int, async_socket&);
66+
socket_identifier_t watch(int);
7767
void unwatch(socket_identifier_t&);
7868

79-
void async_write(socket_identifier_t& id, const string& data, const ready_cb_t& cb);
80-
void async_read(socket_identifier_t& id, const recv_cb_t& cb);
81-
void async_timeout(double time, const timeout_cb_t& cb );
69+
void async_write(socket_identifier_t& id, const action& cb);
70+
void async_read(socket_identifier_t& id, const action& cb);
71+
void async_timeout(double time, const action& cb );
8272

8373
private:
8474
static void read_handler(EV_P_ ev_io* w, int revents);

includes/network/async_socket.hpp

Lines changed: 38 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,33 @@ namespace async_redis {
2222
class connect_socket_exception : socket_excetion {};
2323
class nonblocking_socket_exception : socket_excetion {};
2424

25-
struct async_socket
25+
template <typename InputOutputHanler>
26+
class async_socket
2627
{
27-
using recv_cb_t = std::function<void (const char*, int )>;
28+
public:
29+
using socket_identifier_t = typename InputOutputHanler::socket_identifier_t;
30+
using recv_cb_t = std::function<void (const char*, int)>;
2831
using ready_cb_t = std::function<void ()>;
2932
using connect_handler_t = std::function<void (bool)>;
3033

34+
async_socket(InputOutputHanler& io)
35+
: io_(io)
36+
{ }
37+
38+
async_socket(InputOutputHanler &io, int fd)
39+
: io_(io)
40+
{
41+
fd_ = fd;
42+
is_connected_ = true;
43+
44+
id_ = io_.watch(fd_);
45+
}
46+
47+
inline ~async_socket() {
48+
close();
49+
io_.unwatch(id_);
50+
}
51+
3152
inline int send(const string& data) {
3253
return ::send(fd_, data.data(), data.size(), 0);
3354
}
@@ -52,49 +73,20 @@ namespace async_redis {
5273
return ::close(fd_) == 0;
5374
}
5475

55-
protected:
56-
int fd_ = -1;
57-
};
58-
59-
template <typename InputOutputHanler>
60-
class async_socket_t : public async_socket
61-
{
62-
using socket_identifier_t = typename InputOutputHanler::socket_identifier_t;
63-
64-
public:
65-
async_socket_t(InputOutputHanler& io)
66-
: io_(io)
67-
{ }
68-
69-
async_socket_t(InputOutputHanler &io, int fd)
70-
: io_(io)
76+
void async_write(const string& data, const ready_cb_t& cb)
7177
{
72-
fd_ = fd;
73-
is_connected_ = true;
74-
75-
id_ = io_.watch(fd_, *this);
76-
}
77-
78-
inline ~async_socket_t() {
79-
close();
80-
io_.unwatch(id_);
81-
}
82-
83-
inline
84-
void async_write(const string& data, const ready_cb_t& cb) {
85-
return io_.async_write(id_, data, cb);
86-
}
87-
88-
inline
89-
void async_write_then_read(const string& data, const recv_cb_t& cb) {
90-
return this->async_write(data, [&, cb]() {
91-
this->async_read(cb);
78+
return io_.async_write(id_, [this, data, cb]() {
79+
send(data);
80+
cb();
9281
});
9382
}
9483

95-
inline
96-
void async_read(const recv_cb_t& cb) {
97-
return io_.async_read(id_, cb);
84+
void async_read(const recv_cb_t& cb)
85+
{
86+
return io_.async_read(id_, [&, cb]() {
87+
auto l = receive(buffer_, max_length);
88+
cb(buffer_, l);
89+
});
9890
}
9991

10092
template <typename SocketType, typename... Args>
@@ -115,7 +107,7 @@ namespace async_redis {
115107
template<typename SocketType>
116108
void async_accept(const std::function<void(std::shared_ptr<SocketType>)>& cb)
117109
{
118-
return async_read([&, cb](const char* data, int len) {
110+
return io_.async_read(id_, [&, cb]() {
119111
int fd = this->accept();
120112
cb(std::make_shared<SocketType>(io_, fd));
121113
this->async_accept(cb);
@@ -135,7 +127,7 @@ namespace async_redis {
135127
if (-1 == fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL) | O_NONBLOCK))
136128
throw nonblocking_socket_exception();
137129

138-
id_ = io_.watch(fd_, *this);
130+
id_ = io_.watch(fd_);
139131
}
140132

141133
int connect_to(socket_t* socket_addr, int len) {
@@ -154,6 +146,9 @@ namespace async_redis {
154146
bool is_connected_ = false;
155147
InputOutputHanler& io_;
156148
socket_identifier_t id_;
149+
int fd_ = -1;
150+
enum { max_length = 1024 };
151+
char buffer_[max_length]; //TODO: move to user land!
157152
};
158153
}
159154
}

includes/network/tcp_server.hpp

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,31 +8,12 @@
88
#include <unordered_map>
99

1010
namespace async_redis {
11-
namespace tcp_server {
11+
namespace tcp_server {
1212

13-
using std::string;
14-
class test_parser
15-
{
16-
public:
17-
using parser = std::shared_ptr<string>;
18-
19-
test_parser(parser &ptr)
20-
{
21-
}
22-
23-
int append_chunk(const char* chunk, ssize_t length, bool &is_finished) {
24-
is_finished = true;
25-
return length;
26-
}
27-
};
28-
29-
30-
template<typename InputOutputHandler, typename ParserPolicy>
13+
template<typename InputOutputHandler>
3114
class tcp_server
3215
{
3316
public:
34-
using parser_t = typename ParserPolicy::parser;
35-
using receive_cb_t = std::function<void (parser_t)>;
3617
using tcp_socket = async_redis::network::tcp_socket<InputOutputHandler>;
3718

3819
tcp_server(InputOutputHandler &event_loop)
@@ -55,12 +36,8 @@ namespace async_redis {
5536
conns_.emplace(socket, nullptr);
5637
}
5738

58-
void data_received(parser_t& data) {
59-
LOG_ME(data->second);
60-
}
61-
6239
private:
63-
void chunk_received(const char* data, ssize_t len, std::shared_ptr<tcp_socket>& socket) {
40+
void chunk_received(const char* data, int len, std::shared_ptr<tcp_socket>& socket) {
6441
ssize_t acc = 0;
6542
bool is_finished = false;
6643

@@ -82,8 +59,8 @@ namespace async_redis {
8259

8360
socket_t listener_;
8461
InputOutputHandler& loop_;
85-
std::unordered_map<socket_t, parser_t> conns_;
62+
std::unordered_map<socket_t, void*> conns_;
8663
};
8764

88-
}
65+
}
8966
}

includes/network/tcp_socket.hpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,18 @@ namespace async_redis {
66
namespace network
77
{
88
template<typename InputOutputHanler>
9-
class tcp_socket : public async_socket_t<InputOutputHanler>
9+
class tcp_socket : public async_socket<InputOutputHanler>
1010
{
1111

1212
public:
1313
tcp_socket(InputOutputHanler& io)
14-
: async_socket_t<InputOutputHanler>(io)
14+
: async_socket<InputOutputHanler>(io)
1515
{
1616
this->create_socket(AF_INET);
1717
}
1818

1919
tcp_socket(InputOutputHanler& io, int fd)
20-
: async_socket_t<InputOutputHanler>(io, fd)
20+
: async_socket<InputOutputHanler>(io, fd)
2121
{}
2222

2323
bool bind(const string& host, int port)
@@ -27,13 +27,14 @@ namespace async_redis {
2727
addr.sin_port = ::htons(port);
2828
addr.sin_addr.s_addr = inet_addr(host.data());
2929

30-
// setsockopt (fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on));
3130

3231
return this->bind_to((socket_t *)&addr, sizeof(addr)) == 0;
3332
}
3433

3534
int connect(const string& host, int port)
3635
{
36+
//TODO:
37+
// setsockopt (fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on));
3738
struct sockaddr_in addr = {0};
3839
addr.sin_family = AF_INET;
3940
addr.sin_port = ::htons(port);

includes/network/unix_socket.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,20 @@ namespace async_redis {
66
namespace network
77
{
88
template<typename InputOutputHanler>
9-
class unix_socket : public async_socket_t<InputOutputHanler>
9+
class unix_socket : public async_socket<InputOutputHanler>
1010
{
1111
public:
1212

1313
inline
1414
unix_socket(InputOutputHanler &io)
15-
: async_socket_t<InputOutputHanler>(io)
15+
: async_socket<InputOutputHanler>(io)
1616
{
1717
this->create_socket(AF_UNIX);
1818
}
1919

2020
inline
2121
unix_socket(InputOutputHanler &io, int fd)
22-
: async_socket_t<InputOutputHanler>(io, fd)
22+
: async_socket<InputOutputHanler>(io, fd)
2323
{}
2424

2525
int connect(const string& path) {

includes/redis_client.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ namespace async_redis {
1818
{
1919
using connection_t = connection<InputOutputHandler, SocketType, ::async_redis::parser::redis_response>;
2020
using reply_cb_t = typename connection_t::reply_cb_t;
21-
using connect_handler_t = async_redis::network::async_socket::connect_handler_t;
21+
using connect_handler_t = typename async_redis::network::async_socket<SocketType>::connect_handler_t;
2222

2323
public:
2424
using parser_t = typename connection_t::parser_t;

0 commit comments

Comments
 (0)