Skip to content
This repository was archived by the owner on Sep 9, 2025. It is now read-only.

Commit 699c233

Browse files
author
hamidr
committed
Bug fix: Free watchers after calling call backs returned not while it's processing.
1 parent f0f1d9a commit 699c233

File tree

5 files changed

+97
-38
lines changed

5 files changed

+97
-38
lines changed

includes/event_loop/event_loop_ev.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ namespace async_redis {
1717

1818
public:
1919
using action = std::function<void()>;
20-
using socket_identifier_t = std::unordered_map<socket_id, std::unique_ptr<socket_queue>>::iterator;
20+
using socket_identifier_t = socket_queue *;
2121

2222
private:
2323
struct timer_watcher
@@ -38,6 +38,7 @@ namespace async_redis {
3838

3939
ev_io write_watcher;
4040
ev_io read_watcher;
41+
bool free_me = false;
4142

4243
std::queue<action> write_handlers;
4344
std::queue<action> read_handlers;
@@ -79,7 +80,6 @@ namespace async_redis {
7980

8081
private:
8182
struct ev_loop* loop_;
82-
std::unordered_map<socket_id, std::unique_ptr<socket_queue>> watchers_;
8383
};
8484
}
8585
}

includes/network/async_socket.hpp

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,11 @@ namespace async_redis {
4444
id_ = io_.watch(fd_);
4545
}
4646

47-
inline ~async_socket() {
48-
close();
49-
io_.unwatch(id_);
47+
~async_socket() {
48+
if (is_connected_) {
49+
close();
50+
io_.unwatch(id_);
51+
}
5052
}
5153

5254
inline int send(const string& data) {
@@ -75,6 +77,9 @@ namespace async_redis {
7577

7678
void async_write(const string& data, const ready_cb_t& cb)
7779
{
80+
if (!is_connected())
81+
return;
82+
7883
return io_.async_write(id_, [this, data, cb]() {
7984
send(data);
8085
cb();
@@ -83,8 +88,18 @@ namespace async_redis {
8388

8489
void async_read(const recv_cb_t& cb)
8590
{
91+
if (!is_connected())
92+
return;
93+
8694
return io_.async_read(id_, [&, cb]() {
8795
auto l = receive(buffer_, max_length);
96+
if (l == 0) {
97+
is_connected_ = false;
98+
io_.unwatch(id_);
99+
id_ = nullptr;
100+
close();
101+
fd_ = -1;
102+
}
88103
cb(buffer_, l);
89104
});
90105
}
@@ -145,7 +160,7 @@ namespace async_redis {
145160
private:
146161
bool is_connected_ = false;
147162
InputOutputHanler& io_;
148-
socket_identifier_t id_;
163+
socket_identifier_t id_ = nullptr;
149164
int fd_ = -1;
150165
enum { max_length = 1024 };
151166
char buffer_[max_length]; //TODO: move to user land!

includes/network/tcp_server.hpp

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,21 +37,44 @@ namespace tcp_server {
3737
}
3838

3939
private:
40-
void chunk_received(const char* data, int len, std::shared_ptr<tcp_socket>& socket) {
41-
ssize_t acc = 0;
42-
bool is_finished = false;
40+
void chunk_received(const char* data, int len, std::shared_ptr<tcp_socket>& socket)
41+
{
42+
std::string command;
4343

44-
if (len == 0) {
44+
if (len <= 0) {
4545
conns_.erase(socket);
4646
return;
4747
}
4848

49-
socket->async_write("hello world!", [this, &socket]() {
50-
loop_.async_timeout(.1, [this, &socket]() {
51-
conns_.erase(socket);
52-
});
53-
});
49+
for(int n = 0; n < len; ++n) {
5450

51+
char c = data[n];
52+
switch(c)
53+
{
54+
case '\r':
55+
case '\n':
56+
continue;
57+
break;
58+
59+
default:
60+
command.push_back(c);
61+
}
62+
}
63+
64+
fprintf(stdout, ("cmd: " + command + "\n").data());
65+
fflush(stdout);
66+
67+
if (command == "close") {
68+
socket->async_write("good bye!", [this, &socket]() {
69+
loop_.async_timeout(1, [this, &socket]() {
70+
conns_.erase(socket);
71+
});
72+
});
73+
return; // dont read
74+
}
75+
76+
auto receiver = std::bind(&tcp_server::chunk_received, this, std::placeholders::_1, std::placeholders::_2, socket);
77+
socket->async_read(receiver);
5578
}
5679

5780
private:

src/event_loop/event_loop_ev.cpp

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@ void event_loop_ev::run()
1818
ev_run (loop_, 0);
1919
}
2020

21-
void event_loop_ev::async_write(socket_identifier_t& id, const action& cb)
21+
void event_loop_ev::async_write(socket_identifier_t& watcher, const action& cb)
2222
{
23-
socket_queue *watcher = id->second.get();
24-
2523
auto &handlers = watcher->write_handlers;
2624
handlers.push(cb);
2725

@@ -31,10 +29,8 @@ void event_loop_ev::async_write(socket_identifier_t& id, const action& cb)
3129
}
3230
}
3331

34-
void event_loop_ev::async_read(socket_identifier_t& id, const action& cb)
32+
void event_loop_ev::async_read(socket_identifier_t& watcher, const action& cb)
3533
{
36-
socket_queue *watcher = id->second.get();
37-
3834
auto &handlers = watcher->read_handlers;
3935
handlers.push(cb);
4036

@@ -50,14 +46,20 @@ void event_loop_ev::async_timeout(double time, const action& cb )
5046
ev_timer_start (loop_, &w->timer);
5147
}
5248

53-
void event_loop_ev::read_handler(EV_P_ ev_io* w, int revents) {
54-
/* LOG_THIS; */
49+
void event_loop_ev::read_handler(EV_P_ ev_io* w, int revents)
50+
{
5551
if (!(revents & EV_READ)) {
5652
// LOG_ERR("WRONG EVENT ON read_handler");
5753
return;
5854
}
5955

6056
socket_queue* sq = reinterpret_cast<socket_queue*>(w->data);
57+
58+
if (sq->free_me) {
59+
delete sq;
60+
return;
61+
}
62+
6163
auto &handlers = sq->read_handlers;
6264

6365
if (handlers.size() != 0)
@@ -69,17 +71,25 @@ void event_loop_ev::read_handler(EV_P_ ev_io* w, int revents) {
6971

7072
if (handlers.size() == 0)
7173
ev_io_stop(loop, &sq->read_watcher);
74+
75+
if (sq->free_me)
76+
delete sq;
7277
}
7378

7479
void event_loop_ev::write_handler(EV_P_ ev_io* w, int revents)
7580
{
76-
/* LOG_THIS; */
7781
if (!(revents & EV_WRITE)) {
7882
// LOG_ERR("WRONG EVENT ON read_handler");
7983
return;
8084
}
8185

8286
socket_queue* sq = reinterpret_cast<socket_queue*>(w->data);
87+
88+
if (sq->free_me) {
89+
delete sq;
90+
return;
91+
}
92+
8393
auto &handlers = sq->write_handlers;
8494

8595
if (handlers.size() != 0)
@@ -91,6 +101,9 @@ void event_loop_ev::write_handler(EV_P_ ev_io* w, int revents)
91101

92102
if (handlers.size() == 0)
93103
ev_io_stop(loop, &sq->write_watcher);
104+
105+
if (sq->free_me)
106+
delete sq;
94107
}
95108

96109
void event_loop_ev::timer_handler(EV_P_ ev_timer* w, int revents)
@@ -102,31 +115,24 @@ void event_loop_ev::timer_handler(EV_P_ ev_timer* w, int revents)
102115

103116
void event_loop_ev::stop(ev_io& io)
104117
{
105-
/* LOG_THIS; */
118+
ev_clear_pending(loop_, &io);
106119
ev_io_stop(loop_, &io);
107120
}
108121

109122
void event_loop_ev::start(ev_io& io)
110123
{
111-
/* LOG_THIS; */
112124
ev_io_start(loop_, &io);
113125
}
114126

115127
event_loop_ev::socket_identifier_t event_loop_ev::watch(int fd)
116128
{
117-
auto iter = watchers_.find(fd);
118-
119-
if (iter == watchers_.end()) {
120-
auto w = watchers_.emplace(fd, std::make_unique<event_loop_ev::socket_queue>(*this, fd));
121-
return w.first;
122-
}
123-
124-
return iter;
129+
socket_identifier_t ptr = new event_loop_ev::socket_queue(*this, fd);
130+
return ptr;
125131
}
126132

127133
void event_loop_ev::unwatch(socket_identifier_t& id)
128134
{
129-
watchers_.erase(id);
135+
id->free_me = true;
130136
}
131137

132138
}}

test/main.cpp

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,27 @@ int main(int argc, char** args)
4343
return;
4444
}
4545

46-
client.set("h1", "value1", [&](parser_t paresed) {
46+
client.set("h1", "wwww", [&](parser_t paresed) {
4747
std::cout << paresed->to_string() << std::endl;
4848
client.get("h1", [&](parser_t p) {
4949
std::cout << p->to_string() << std::endl;
50-
client.set("h2", "fooooo", [](parser_t p2) {
51-
std::cout << p2->to_string() << std::endl;
50+
51+
client.set("wtff", "hello", [&](parser_t paresed) {
52+
client.get("wtff", [](parser_t p2) {
53+
std::cout << p2->to_string() << std::endl;
54+
});
55+
56+
client.get("h1", [](parser_t p2) {
57+
std::cout << p2->to_string() << std::endl;
58+
});
59+
60+
client.get("wtff", [&](parser_t p2) {
61+
std::cout << p2->to_string() << std::endl;
62+
63+
client.get("h1", [](parser_t p2) {
64+
std::cout << p2->to_string() << std::endl;
65+
});
66+
});
5267
});
5368
});
5469
});

0 commit comments

Comments
 (0)