Skip to content
This repository was archived by the owner on Sep 9, 2025. It is now read-only.

Commit 21dd56b

Browse files
author
hamidr
committed
Fix some architecture issues
1 parent 1186cd6 commit 21dd56b

File tree

14 files changed

+262
-218
lines changed

14 files changed

+262
-218
lines changed

includes/connection.hpp

Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,57 +24,87 @@ namespace async_redis {
2424

2525
template<typename ...Args>
2626
inline void connect(Args... args) {
27+
if (!socket_->is_valid())
28+
socket_ = std::make_unique<SocketType>(event_loop_);
29+
2730
socket_->template async_connect<SocketType>(0, std::forward<Args>(args)...);
2831
}
2932

3033
bool is_connected() const
3134
{ return socket_ && socket_->is_connected(); }
3235

33-
inline int pressure() const {
34-
return req_queue_.size();
35-
}
36+
inline int pressure() const
37+
{ return req_queue_.size(); }
3638

37-
void send(const string& command, const reply_cb_t& reply_cb) {
39+
void disconnect() {
40+
socket_->close();
41+
decltype(req_queue_) free_me;
42+
free_me.swap(req_queue_);
43+
}
3844

39-
socket_->async_write(command, [this, reply_cb]() {
40-
req_queue_.emplace(reply_cb, nullptr);
45+
bool pipelined_send(string&& pipelined_cmds, std::vector<reply_cb_t>&& callbacks)
46+
{
47+
return
48+
socket_->async_write(pipelined_cmds, [this, cbs = std::move(callbacks)](ssize_t sent_chunk_len) {
49+
if (sent_chunk_len == 0)
50+
return disconnect();
4151

42-
if (req_queue_.size() == 1)
52+
if (!req_queue_.size() && cbs.size())
4353
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
54+
55+
for(auto &&cb : cbs)
56+
req_queue_.emplace(std::move(cb), nullptr);
4457
});
4558
}
4659

47-
private:
48-
void reply_received(int len) {
49-
ssize_t acc = 0;
60+
bool send(const string&& command, const reply_cb_t& reply_cb)
61+
{
62+
bool read_it = !req_queue_.size();
63+
req_queue_.emplace(reply_cb, nullptr);
5064

51-
while (acc < len && req_queue_.size()) {
65+
return
66+
socket_->async_write(std::move(command), [this, read_it](ssize_t sent_chunk_len) {
67+
if (sent_chunk_len == 0)
68+
return disconnect();
69+
70+
if (read_it)
71+
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
72+
});
73+
}
74+
75+
protected:
76+
void reply_received(ssize_t len)
77+
{
78+
if (len == 0)
79+
return disconnect();
80+
81+
ssize_t acc = 0;
82+
while (acc < len && req_queue_.size())
83+
{
5284
auto& request = req_queue_.front();
5385

5486
auto &cb = std::get<0>(request);
5587
auto &parser = std::get<1>(request);
5688

57-
if (0 != len && -1 != len) {
58-
59-
bool is_finished = false;
60-
acc += ParserPolicy(parser).append_chunk(data_ + acc, len - acc, is_finished);
89+
bool is_finished = false;
90+
acc += ParserPolicy(parser).append_chunk(data_ + acc, len - acc, is_finished);
6191

62-
if (!is_finished)
63-
break;
92+
if (!is_finished)
93+
break;
6494

65-
cb(parser);
66-
req_queue_.pop(); //free the resources
67-
}
95+
cb(parser);
96+
req_queue_.pop(); //free the resources
6897
}
6998

70-
if (req_queue_.size() != 0)
99+
if (req_queue_.size())
71100
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
72101
}
73102

74103
private:
75104
std::unique_ptr<SocketType> socket_;
76-
InputOutputHandler& event_loop_;
77105
std::queue<std::tuple<reply_cb_t, parser_t>> req_queue_;
106+
107+
InputOutputHandler& event_loop_;
78108
enum { max_data_size = 1024 };
79109
char data_[max_data_size];
80110
};

includes/event_loop/event_loop_ev.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ namespace async_redis {
1717

1818
public:
1919
using action = std::function<void()>;
20-
using socket_identifier_t = socket_queue *;
20+
using socket_identifier_t = std::shared_ptr<socket_queue>;
2121

2222
private:
2323
struct timer_watcher
@@ -38,7 +38,6 @@ namespace async_redis {
3838

3939
ev_io write_watcher;
4040
ev_io read_watcher;
41-
bool free_me = false;
4241

4342
std::queue<action> write_handlers;
4443
std::queue<action> read_handlers;
@@ -53,7 +52,7 @@ namespace async_redis {
5352
read_watcher.data = this;
5453
}
5554

56-
~socket_queue() {
55+
void stop() {
5756
loop_.stop(write_watcher);
5857
loop_.stop(read_watcher);
5958
}

includes/network/async_socket.hpp

Lines changed: 50 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ namespace async_redis {
2727
{
2828
public:
2929
using socket_identifier_t = typename InputOutputHanler::socket_identifier_t;
30-
using recv_cb_t = std::function<void (int)>;
31-
using ready_cb_t = std::function<void ()>;
30+
using recv_cb_t = std::function<void (ssize_t)>;
31+
using ready_cb_t = std::function<void (ssize_t)>;
3232
using connect_handler_t = std::function<void (bool)>;
3333

3434
async_socket(InputOutputHanler& io)
@@ -45,63 +45,83 @@ namespace async_redis {
4545
}
4646

4747
~async_socket() {
48-
if (is_connected_) {
49-
close();
50-
io_.unwatch(id_);
51-
}
48+
close();
49+
}
50+
51+
inline bool is_valid() {
52+
return fd_ != -1;
5253
}
5354

54-
inline int send(const string& data) {
55-
return ::send(fd_, data.data(), data.size(), 0);
55+
inline ssize_t send(const string& data) {
56+
return send(data.data(), data.size());
5657
}
5758

58-
inline int send(const char *data, size_t len) {
59+
inline ssize_t send(const char *data, size_t len) {
5960
return ::send(fd_, data, len, 0);
6061
}
6162

62-
inline int receive(char *data, size_t len) {
63+
inline ssize_t receive(char *data, size_t len) {
6364
return ::recv(fd_, data, len, 0);
6465
}
6566

66-
inline bool listen() {
67-
return ::listen(fd_, 0) == 0;
67+
inline bool listen(int backlog = 0) {
68+
return ::listen(fd_, backlog) == 0;
6869
}
6970

7071
inline int accept() {
7172
return ::accept(fd_, nullptr, nullptr);
7273
}
7374

74-
bool close() {
75+
bool close()
76+
{
77+
if (!is_connected_)
78+
return true;
79+
80+
if(id_)
81+
io_.unwatch(id_);
82+
7583
auto res = ::close(fd_) == 0;
7684
is_connected_ = false;
7785
fd_ = -1;
7886
return res;
7987
}
8088

81-
void async_write(const string& data, const ready_cb_t& cb)
89+
bool async_write(const string& data, const ready_cb_t& cb)
8290
{
83-
if (!is_connected())
84-
return;
91+
if (!is_connected() || !data.size())
92+
return false;
8593

86-
return io_.async_write(id_, [this, data, cb]() {
87-
send(data);
88-
cb();
94+
io_.async_write(id_, [this, data, cb]() -> void {
95+
auto sent_chunk = send(data);
96+
97+
if(sent_chunk == 0)
98+
close();
99+
100+
if (sent_chunk < data.size() && sent_chunk != -1) {
101+
async_write(data.substr(sent_chunk, data.size()), cb);
102+
return;
103+
}
104+
105+
cb(sent_chunk);
89106
});
107+
108+
return true;
90109
}
91110

92-
void async_read(char *buffer, int max_len, const recv_cb_t& cb)
111+
bool async_read(char *buffer, int max_len, const recv_cb_t& cb)
93112
{
94113
if (!is_connected())
95-
return;
114+
return false;
96115

97-
return io_.async_read(id_, [&, buffer, max_len, cb]() {
116+
io_.async_read(id_, [&, buffer, max_len, cb]() -> void {
98117
auto l = receive(buffer, max_len);
99-
if (l == 0) {
100-
io_.unwatch(id_);
118+
if (l == 0)
101119
close();
102-
}
120+
103121
cb(l);
104122
});
123+
124+
return true;
105125
}
106126

107127
template <typename SocketType, typename... Args>
@@ -141,14 +161,15 @@ namespace async_redis {
141161

142162
if (-1 == fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL) | O_NONBLOCK))
143163
throw nonblocking_socket_exception();
144-
145-
id_ = io_.watch(fd_);
146164
}
147165

166+
//TODO: well i guess retry with create_socket in these functions
148167
int connect_to(socket_t* socket_addr, int len) {
149168
int ret = ::connect(fd_, socket_addr, len);
150-
if (!ret)
169+
if (!ret) {
170+
id_ = io_.watch(fd_);
151171
is_connected_ = true;
172+
}
152173

153174
return ret;
154175
}
@@ -160,7 +181,7 @@ namespace async_redis {
160181
private:
161182
bool is_connected_ = false;
162183
InputOutputHanler& io_;
163-
socket_identifier_t id_ = nullptr;
184+
socket_identifier_t id_;
164185
int fd_ = -1;
165186
};
166187
}

includes/network/tcp_server.hpp

Lines changed: 0 additions & 91 deletions
This file was deleted.

0 commit comments

Comments
 (0)