Skip to content
This repository was archived by the owner on Sep 9, 2025. It is now read-only.

Commit 1186cd6

Browse files
author
hamidr
committed
Passing buffer for reading data from userland
1 parent 699c233 commit 1186cd6

File tree

4 files changed

+27
-24
lines changed

4 files changed

+27
-24
lines changed

includes/connection.hpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ namespace async_redis {
4040
req_queue_.emplace(reply_cb, nullptr);
4141

4242
if (req_queue_.size() == 1)
43-
socket_->async_read(std::bind(&connection::reply_received, this, std::placeholders::_1, std::placeholders::_2));
43+
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
4444
});
4545
}
4646

4747
private:
48-
void reply_received(const char* data, int len) {
48+
void reply_received(int len) {
4949
ssize_t acc = 0;
5050

5151
while (acc < len && req_queue_.size()) {
@@ -57,7 +57,7 @@ namespace async_redis {
5757
if (0 != len && -1 != len) {
5858

5959
bool is_finished = false;
60-
acc += ParserPolicy(parser).append_chunk(data + acc, len - acc, is_finished);
60+
acc += ParserPolicy(parser).append_chunk(data_ + acc, len - acc, is_finished);
6161

6262
if (!is_finished)
6363
break;
@@ -68,13 +68,15 @@ namespace async_redis {
6868
}
6969

7070
if (req_queue_.size() != 0)
71-
socket_->async_read(std::bind(&connection::reply_received, this, std::placeholders::_1, std::placeholders::_2));
71+
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
7272
}
7373

7474
private:
7575
std::unique_ptr<SocketType> socket_;
7676
InputOutputHandler& event_loop_;
7777
std::queue<std::tuple<reply_cb_t, parser_t>> req_queue_;
78+
enum { max_data_size = 1024 };
79+
char data_[max_data_size];
7880
};
7981

8082
}

includes/network/async_socket.hpp

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ namespace async_redis {
2727
{
2828
public:
2929
using socket_identifier_t = typename InputOutputHanler::socket_identifier_t;
30-
using recv_cb_t = std::function<void (const char*, int)>;
30+
using recv_cb_t = std::function<void (int)>;
3131
using ready_cb_t = std::function<void ()>;
3232
using connect_handler_t = std::function<void (bool)>;
3333

@@ -71,8 +71,11 @@ namespace async_redis {
7171
return ::accept(fd_, nullptr, nullptr);
7272
}
7373

74-
inline bool close() {
75-
return ::close(fd_) == 0;
74+
bool close() {
75+
auto res = ::close(fd_) == 0;
76+
is_connected_ = false;
77+
fd_ = -1;
78+
return res;
7679
}
7780

7881
void async_write(const string& data, const ready_cb_t& cb)
@@ -86,21 +89,18 @@ namespace async_redis {
8689
});
8790
}
8891

89-
void async_read(const recv_cb_t& cb)
92+
void async_read(char *buffer, int max_len, const recv_cb_t& cb)
9093
{
9194
if (!is_connected())
9295
return;
9396

94-
return io_.async_read(id_, [&, cb]() {
95-
auto l = receive(buffer_, max_length);
97+
return io_.async_read(id_, [&, buffer, max_len, cb]() {
98+
auto l = receive(buffer, max_len);
9699
if (l == 0) {
97-
is_connected_ = false;
98100
io_.unwatch(id_);
99-
id_ = nullptr;
100101
close();
101-
fd_ = -1;
102102
}
103-
cb(buffer_, l);
103+
cb(l);
104104
});
105105
}
106106

@@ -162,8 +162,6 @@ namespace async_redis {
162162
InputOutputHanler& io_;
163163
socket_identifier_t id_ = nullptr;
164164
int fd_ = -1;
165-
enum { max_length = 1024 };
166-
char buffer_[max_length]; //TODO: move to user land!
167165
};
168166
}
169167
}

includes/network/tcp_server.hpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@ namespace tcp_server {
3030
}
3131

3232
void accept(std::shared_ptr<tcp_socket> socket) {
33-
auto receiver = std::bind(&tcp_server::chunk_received, this, std::placeholders::_1, std::placeholders::_2, socket);
34-
socket->async_read(receiver);
33+
auto receiver = std::bind(&tcp_server::chunk_received, this, std::placeholders::_1, socket);
34+
socket->async_read(buffer_, max_buffer_length, receiver);
3535

3636
conns_.emplace(socket, nullptr);
3737
}
3838

3939
private:
40-
void chunk_received(const char* data, int len, std::shared_ptr<tcp_socket>& socket)
40+
void chunk_received(int len, std::shared_ptr<tcp_socket>& socket)
4141
{
4242
std::string command;
4343

@@ -48,7 +48,7 @@ namespace tcp_server {
4848

4949
for(int n = 0; n < len; ++n) {
5050

51-
char c = data[n];
51+
char c = buffer_[n];
5252
switch(c)
5353
{
5454
case '\r':
@@ -73,8 +73,8 @@ namespace tcp_server {
7373
return; // dont read
7474
}
7575

76-
auto receiver = std::bind(&tcp_server::chunk_received, this, std::placeholders::_1, std::placeholders::_2, socket);
77-
socket->async_read(receiver);
76+
auto receiver = std::bind(&tcp_server::chunk_received, this, std::placeholders::_1, socket);
77+
socket->async_read(buffer_, max_buffer_length, receiver);
7878
}
7979

8080
private:
@@ -83,6 +83,8 @@ namespace tcp_server {
8383
socket_t listener_;
8484
InputOutputHandler& loop_;
8585
std::unordered_map<socket_t, void*> conns_;
86+
enum { max_buffer_length = 1024 };
87+
char buffer_[max_buffer_length];
8688
};
8789

8890
}

src/event_loop/event_loop_ev.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ void event_loop_ev::async_timeout(double time, const action& cb )
4848

4949
void event_loop_ev::read_handler(EV_P_ ev_io* w, int revents)
5050
{
51-
if (!(revents & EV_READ)) {
51+
if (revents & EV_ERROR) {
5252
// LOG_ERR("WRONG EVENT ON read_handler");
5353
return;
5454
}
@@ -78,7 +78,7 @@ void event_loop_ev::read_handler(EV_P_ ev_io* w, int revents)
7878

7979
void event_loop_ev::write_handler(EV_P_ ev_io* w, int revents)
8080
{
81-
if (!(revents & EV_WRITE)) {
81+
if (revents & EV_ERROR) {
8282
// LOG_ERR("WRONG EVENT ON read_handler");
8383
return;
8484
}
@@ -133,6 +133,7 @@ event_loop_ev::socket_identifier_t event_loop_ev::watch(int fd)
133133
void event_loop_ev::unwatch(socket_identifier_t& id)
134134
{
135135
id->free_me = true;
136+
id = nullptr;
136137
}
137138

138139
}}

0 commit comments

Comments
 (0)