Skip to content

Commit 6a62f62

Browse files
committed
Beta!
1 parent 5b4f424 commit 6a62f62

File tree

6 files changed

+162
-25
lines changed

6 files changed

+162
-25
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
*.so.*
66
*.dylib
77

8-
tests/out
8+
test/out
99
build
1010
CMakeCache.txt
1111
CMakeFiles

examples/connect.lua

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#!/usr/bin/env tarantool
2+
3+
-- Load Tarantool mqtt
4+
local mqtt = require('mqtt')
5+
6+
-- Create instance
7+
connection = mqtt.new()
8+
9+
-- Connect to the server
10+
connection:connect({host='127.0.0.1', port=1883})
11+
12+
-- Set callback for recv new messages
13+
connection:on_message(function (message_id, topic, payload, gos, retain)
14+
print('New message', message_id, topic, payload, gos, retain)
15+
end)
16+
17+
-- Subscribe to a system topic
18+
connection:subscribe('$SYS/#')
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#!/usr/bin/env tarantool
2+
3+
local mqtt = require('mqtt')
4+
local fiber = require('fiber')
5+
local yaml = require('yaml')
6+
7+
box.cfg {wal_mode = 'none'}
8+
9+
-- Queue out
10+
queue = box.schema.space.create('queue')
11+
queue:create_index('id')
12+
13+
-- Create instance
14+
connection = mqtt.new()
15+
16+
-- Connect to the server
17+
local ok, emsg = connection:connect({host='127.0.0.1', port=1883})
18+
if not ok then
19+
error('connect ->', emsg)
20+
end
21+
22+
-- Subscribe to a system topic
23+
local ok, emsg = connection:subscribe('tarantool/box/info')
24+
if not ok then
25+
error('subscribe -> ',emsg)
26+
end
27+
28+
--
29+
-- Work
30+
--
31+
32+
-- Consumer
33+
connection:on_message(function (message_id, topic, payload, gos, retain)
34+
queue:auto_increment{yaml.decode(payload)}
35+
if queue:len() > 10 then
36+
print('Queue [[')
37+
for _, v in pairs(queue:select{}) do
38+
print(_, yaml.encode(v[2]))
39+
queue:delete(v[1])
40+
end
41+
print(']]')
42+
end
43+
end)
44+
45+
-- Producer
46+
fiber.create(function()
47+
while true do
48+
connection:publish('tarantool/box/info', yaml.encode{box.info()})
49+
fiber.sleep(0.5)
50+
end
51+
end)

mqtt/init.lua

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,14 @@ local new = function(id, clean_session)
5959
return setmetatable({
6060
VERSION = mqtt:version(), -- Str fmt: X.X.X
6161
RECONNECT_INTERVAL = 0.5,
62+
POLL_INTERVAL = 0.0,
6263

6364
mqtt = mqtt,
6465
connected = false,
6566
auto_reconect = true,
6667
fiber = nil,
68+
69+
subscribers = {},
6770
}, mqtt_mt)
6871
end
6972

@@ -87,7 +90,17 @@ mqtt_mt = {
8790
end,
8891
__try_reconnect = function(self)
8992
while not self.connected do
93+
94+
-- Reconnect
9095
self.connected, _ = self.mqtt:reconnect()
96+
97+
-- Restoring subscribing
98+
if self.connected then
99+
for topic, opts in pairs(self.subscribers) do
100+
self.connected, _ = self.mqtt:subscribe(topic, opts.qos)
101+
end
102+
end
103+
91104
fiber.sleep(self.RECONNECT_INTERVAL)
92105
end
93106
end,
@@ -102,10 +115,11 @@ mqtt_mt = {
102115
if self.auto_reconect then
103116
self:__try_reconnect()
104117
else
105-
log.error("mqtt[STOPED]: lost connection, error: '%s'", emsg)
118+
log.error(
119+
"mqtt: the client is not currently connected, error %s", emsg)
106120
end
107121
end
108-
fiber.sleep(0.3)
122+
fiber.sleep(self.POLL_INTERVAL)
109123
end
110124
end,
111125

@@ -164,27 +178,61 @@ mqtt_mt = {
164178

165179
-- Start work
166180
self.fiber = fiber.create(
167-
function()
168-
self:__try_connect(opts)
169-
self:__poll_forever()
170-
end
171-
)
181+
function()
182+
self:__try_connect(opts)
183+
self:__poll_forever()
184+
end)
172185

173186
return true
174187
end,
175188

189+
--
190+
-- Reconnect to a broker.
191+
--
192+
-- This function provides an easy way of reconnecting to a broker after a
193+
-- connection has been lost. It uses the values that were provided in the
194+
-- <connect> call. It must not be called before
195+
-- <connect>.
196+
--
197+
-- NOTE
198+
-- After reconnecting you must call<subscribe> for subscribing
199+
-- to a topics.
200+
--
201+
reconnect = function()
202+
return self.mqtt:reconnect()
203+
end,
204+
176205
--
177206
-- Subscribe to a topic.
178207
--
179208
-- Parameters:
180209
-- sub - the subscription pattern.
181210
-- qos - the requested Quality of Service for this subscription.
182211
--
183-
--Returns:
184-
-- true or false, integer mid
212+
-- Returns:
213+
-- true or false, integer mid or error message
214+
--
215+
subscribe = function(self, topic, qos)
216+
local ok, mid_or_emsg = self.mqtt:subscribe(topic, qos)
217+
if ok then
218+
self.subscribers[topic] = {qos = qos}
219+
end
220+
return ok, mid_or_emsg
221+
end,
222+
223+
--
224+
-- Unsubscribe from a topic.
225+
--
226+
-- Parameters:
227+
-- topic - the unsubscription pattern.
228+
--
229+
-- Returns:
230+
-- true or false, integer mid or error message
185231
--
186-
subscribe = function(self, channel, qos)
187-
return self.mqtt:subscribe(channel, qos)
232+
unsubscribe = function(self, topic)
233+
local ok, emsg = self.mqtt:unsubscribe(topic)
234+
self.subscribers[topic] = nil
235+
return ok, emsg
188236
end,
189237

190238
--
Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,53 +4,73 @@ local mqtt = require('mqtt')
44
local fiber = require('fiber')
55
local yaml = require('yaml')
66
local os = require('os')
7+
local log = require('log')
78

89
box.cfg{wal_mode = 'none'}
910
fan_in = box.schema.space.create('fan_in', {if_not_exists = true})
1011
fan_in:create_index('id')
1112

1213
conn = mqtt.new()
1314

15+
function die(msg)
16+
error('[-] ' .. msg)
17+
os.exit(1)
18+
end
19+
1420
function W(ok, msg)
1521
if not ok then
16-
error('[-] ' .. msg)
22+
die(msg)
23+
end
24+
return ok, msg
25+
end
26+
27+
function W2(ok, msg)
28+
if not ok then
29+
log.error(msg)
1730
end
1831
return ok, msg
1932
end
2033

34+
local TIME = '' .. fiber.time()
35+
2136
W(conn:on_message(
2237
function(mid, topic, payload, gos, retain)
38+
W2(conn:unsubscribe('channel2/messages'))
2339
fan_in:auto_increment{topic, payload, gos, retain}
24-
print("[+] Message - OK")
40+
W2(conn:publish('channel/messages', TIME))
2541
end))
2642

2743
W(conn:connect({host="127.0.0.1", port=1883}))
2844
W(conn:subscribe('channel/#'))
2945
W(conn:subscribe('channel2/#'))
3046

3147
print('[+] Starting tnt loop ...')
48+
49+
W(conn:publish('channel2/messages', TIME, 0, false))
50+
3251
fiber.create(function()
33-
local time = '' .. fiber.time()
3452
while true do
35-
W(conn:publish('channel/messages', time, 0, false))
36-
W(conn:publish('channel2/messages', time, 0, false))
37-
if fan_in:len() == 10 then
53+
54+
if fan_in:len() > 10 then
3855
for k, v in pairs(fan_in:select{}) do
56+
3957
if v[2] ~= 'channel/messages' and v[2] ~= 'channel2/messages' then
40-
error('expected channel{2}/messages, got ' .. v[2])
58+
die('expected channel{2}/messages, got ' .. v[2])
4159
end
42-
if v[3] ~= time then
43-
error('expected ' .. time .. ', got ' .. v[3])
60+
if v[3] ~= TIME then
61+
die('expected ' .. time .. ', got ' .. v[3])
4462
end
4563
if v[4] ~= 0 then
46-
error('expected 0, got ' .. v[4])
64+
die('expected 0, got ' .. v[4])
4765
end
4866
if v[5] ~= false then
49-
error('expected false, got ' .. v[5])
67+
die('expected false, got ' .. v[5])
5068
end
51-
print('[+] Test -- OK')
52-
os.exit(0)
69+
--os.exit(0)
70+
print('[+] Test -- OK', yaml.encode(v))
71+
fan_in:delete{v[1]}
5372
end
73+
os.exit(0)
5474
end
5575
fiber.sleep(0.0)
5676
end
File renamed without changes.

0 commit comments

Comments
 (0)