From 4d38e67abd636c3f4c497c7210fc8a23143d9007 Mon Sep 17 00:00:00 2001 From: Jon Nordby Date: Sun, 13 Nov 2016 01:27:22 +0100 Subject: [PATCH 1/6] MQTT: Dirty but can receive messages --- examples/mqtt_gpio.eve | 17 ++++ package.json | 2 + src/runtime/databases/node/mqtt.ts | 152 +++++++++++++++++++++++++++++ src/runtime/server.ts | 4 + 4 files changed, 175 insertions(+) create mode 100644 examples/mqtt_gpio.eve create mode 100644 src/runtime/databases/node/mqtt.ts diff --git a/examples/mqtt_gpio.eve b/examples/mqtt_gpio.eve new file mode 100644 index 000000000..f841912df --- /dev/null +++ b/examples/mqtt_gpio.eve @@ -0,0 +1,17 @@ + +# MQTT GPIO + +```eve +search @mqtt + m = [#message topic payload] +bind @browser + [#div text: "MQTT message at {{topic}} : {{payload}}"] +``` + +```eve disabled +search + i = range[from: 1 to: 2] + +commit @mqtt + [#message topic: "gpio/test/" + i, payload: false] +``` diff --git a/package.json b/package.json index 7cc0eaa09..8d3def418 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ "@types/glob": "^5.0.30", "@types/minimist": "^1.1.29", "@types/mkdirp": "^0.3.29", + "@types/mqtt": "0.0.32", "@types/node": "^6.0.41", "@types/request": "0.0.31", "@types/tape": "^4.2.28", @@ -17,6 +18,7 @@ "glob": "^7.1.1", "minimist": "^1.2.0", "mkdirp": "^0.5.1", + "mqtt": "^2.0.1", "node-uuid": "^1.4.7", "request": "^2.75.0", "typescript": "^2.0.3", diff --git a/src/runtime/databases/node/mqtt.ts b/src/runtime/databases/node/mqtt.ts new file mode 100644 index 000000000..d16a2a011 --- /dev/null +++ b/src/runtime/databases/node/mqtt.ts @@ -0,0 +1,152 @@ +//--------------------------------------------------------------------- +// Node Server Database +//--------------------------------------------------------------------- + +import {InsertAction} from "../../actions" +import {Changes} from "../../changes"; +import {Evaluation, Database} from "../../runtime"; + +import * as url from "url"; +import * as mqtt from "mqtt"; + +export class MqttDatabase extends Database { + + receiving: boolean; + requestId: number; + client: mqtt.Client; + + constructor() { + super(); + this.requestId = 0; + this.receiving = false; + this.client = null; + } + + setup() { + console.log('mqttdatabase setting up'); + let broker = process.env.EVE_MQTT_BROKER || 'mqtt://localhost:1883'; + let parsed = url.parse(broker); + let auth = (parsed.auth || ':').split(':'); + let options = { + port: parsed.port, + clientId: 'eve' + Math.random().toString(16).substr(2, 8), + username: auth[0], + password: auth[1] + }; + let u = "mqtt://"+parsed.host; + let client = mqtt.connect(u, options); + let onMessage = this.handleMqttMessage.bind(this); + this.client = client; + client.on('error', function(err) { + console.error('MQTT error', err); + }); + client.on('connect', function() { + console.log('MQTT connected'); + // TODO: be smarter, only subscribe to things there are bindings against + client.subscribe("#", function(s) { + client.on('message', onMessage); + console.log('MQTT subscribed', s); + }); + }); + } + + handleMqttMessage(topic, message, packet) { + console.log('MQTT got message', topic, message); + + if(!this.receiving) { + return console.log("Nothing is listening to MQTT messages"); + } + + let payload = message.toString(); + let parsed = payload; + if (payload == 'true') { + parsed = true; + } else if (payload == 'false') { + parsed = false; + } else if (payload[0] == '{') { + try { + parsed = JSON.parse(payload); + } catch (err) { + console.error("JSON parsing of MQTT message failed", err); + } + } else { + + } + + let scopes = ["mqtt"]; + let requestId = `request|${this.requestId++}|${(new Date()).getTime()}` + let actions = [ + new InsertAction("mqtt|tag", requestId, "tag", "message", undefined, scopes), + new InsertAction("mqtt|topic", requestId, "topic", topic, undefined, scopes), + ]; + +// TODO: implement entry setting like server.ts does? +// if(parsed && typeof parsed === "object") { +// let bodyId = `${requestId}|body`; +// for(let key of Object.keys(body)) { +// actions.push(new InsertAction("mqtt|message-entry", bodyId, key, body[key], undefined, scopes)); +// } +// body = bodyId; +// } + actions.push(new InsertAction("mqtt|message-payload", requestId, "payload", parsed, undefined, scopes)) + + let evaluation = this.evaluations[0]; + evaluation.executeActions(actions); + } + + analyze(evaluation: Evaluation, db: Database) { + for(let block of db.blocks) { + for(let scan of block.parse.scanLike) { + if(scan.type === "record" && scan.scopes.indexOf("mqtt") > -1) { + for(let attribute of scan.attributes) { + if(attribute.attribute === "tag" && attribute.value.value === "message") { + console.log('MQTT found listener'); + this.receiving = true; + } + } + } + } + } + } + + sendMessage(requestId, msg) { + console.log('MQTT senc message'); +// let response = this.requestToResponse[requestId]; +// response.statusCode = status; +// response.end(body); + } + + onFixpoint(evaluation: Evaluation, changes: Changes) { + console.log('MQTT fixpoint'); + let name = evaluation.databaseToName(this); + let result = changes.result({[name]: true}); + let handled = {}; + let index = this.index; + let actions = []; + for(let insert of result.insert) { + let [e,a,v] = insert; + if(!handled[e]) { + handled[e] = true; + if(index.lookup(e,"tag", "message") && !index.lookup(e, "tag", "sent")) { + console.log('MQTT insert msg', e, a, v); + + +// let responses = index.asValues(e, "response"); +// if(responses === undefined) continue; +// let [response] = responses; +// let {topic, payload} = index.asObject(response); +// actions.push(new InsertAction("server|sender", e, "tag", "sent", undefined, [name])); + let msg = "HARDCODED"; + this.sendMessage(e, msg); + } + } + } + if(actions.length) { + process.nextTick(() => { + evaluation.executeActions(actions); + }) + } + } +} + + diff --git a/src/runtime/server.ts b/src/runtime/server.ts index 2fb0182b1..cb1fb283e 100644 --- a/src/runtime/server.ts +++ b/src/runtime/server.ts @@ -14,6 +14,7 @@ import {ActionImplementations} from "./actions"; import {PersistedDatabase} from "./databases/persisted"; import {HttpDatabase} from "./databases/node/http"; import {ServerDatabase} from "./databases/node/server"; +import {MqttDatabase} from "./databases/node/mqtt"; import {RuntimeClient} from "./runtimeClient"; //--------------------------------------------------------------------- @@ -34,6 +35,8 @@ const contentTypes = { const BROWSER = !argv["server"]; const PORT = process.env.PORT || 8080; const serverDatabase = new ServerDatabase(); +const mqttDatabase = new MqttDatabase(); +mqttDatabase.setup(); const shared = new PersistedDatabase(); global["browser"] = false; @@ -94,6 +97,7 @@ class ServerRuntimeClient extends RuntimeClient { constructor(socket:WebSocket, withIDE = true) { const dbs = { "http": new HttpDatabase(), + "mqtt": mqttDatabase, "shared": shared, } super(dbs, withIDE); From 9466b1d0739c3553b479f57d7ff8e1b7258c8836 Mon Sep 17 00:00:00 2001 From: Jon Nordby Date: Sun, 13 Nov 2016 15:56:12 +0100 Subject: [PATCH 2/6] MQTT: Can send messages --- examples/mqtt_gpio.eve | 6 ++-- src/runtime/databases/node/mqtt.ts | 52 +++++++++++++++++++++--------- 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/examples/mqtt_gpio.eve b/examples/mqtt_gpio.eve index f841912df..b478ea677 100644 --- a/examples/mqtt_gpio.eve +++ b/examples/mqtt_gpio.eve @@ -8,10 +8,10 @@ bind @browser [#div text: "MQTT message at {{topic}} : {{payload}}"] ``` -```eve disabled +```eve search - i = range[from: 1 to: 2] + i = range[from: 1 to: 5] commit @mqtt - [#message topic: "gpio/test/" + i, payload: false] + [#message #outgoing topic: "gpio/test/" + i, payload: i] ``` diff --git a/src/runtime/databases/node/mqtt.ts b/src/runtime/databases/node/mqtt.ts index d16a2a011..281f8b718 100644 --- a/src/runtime/databases/node/mqtt.ts +++ b/src/runtime/databases/node/mqtt.ts @@ -63,14 +63,17 @@ export class MqttDatabase extends Database { parsed = true; } else if (payload == 'false') { parsed = false; - } else if (payload[0] == '{') { + } else if (payload[0] == '{' || payload[0] == '[') { try { parsed = JSON.parse(payload); } catch (err) { console.error("JSON parsing of MQTT message failed", err); } } else { - + try { + parsed = parseFloat(payload); + } catch (_) { + } } let scopes = ["mqtt"]; @@ -109,11 +112,19 @@ export class MqttDatabase extends Database { } } - sendMessage(requestId, msg) { - console.log('MQTT senc message'); -// let response = this.requestToResponse[requestId]; -// response.statusCode = status; -// response.end(body); + sendMessage(requestId, topic, payload) { + console.log('MQTT sendMessage', topic); + + let serialized = payload.toString(); + if (typeof payload == 'boolean') { + serialized = payload ? 'true' : 'false'; + } else if (typeof payload == 'object') { + serialized = (payload) ? JSON.stringify(payload) : 'null'; + } else { + // treat as string + } + + this.client.publish(topic, serialized); } onFixpoint(evaluation: Evaluation, changes: Changes) { @@ -127,17 +138,26 @@ export class MqttDatabase extends Database { let [e,a,v] = insert; if(!handled[e]) { handled[e] = true; - if(index.lookup(e,"tag", "message") && !index.lookup(e, "tag", "sent")) { - console.log('MQTT insert msg', e, a, v); + let isOutgoingMessage = index.lookup(e,"tag", "message") && index.lookup(e,"tag", "outgoing"); + let isSent = index.lookup(e, "tag", "sent"); + if(isOutgoingMessage && !isSent) { + console.log('MQTT outgoing msg'); + + // TODO: error/warn if multiple payloads (not supported) + let payloads = index.asValues(e, "payload"); + if (payloads === undefined) { + console.error("no payloads for outgoing message") + continue; + } + let [payload] = payloads; + + // TODO: support multiple topics, or error/warn + let topics = index.asValues(e, "topic"); + let [topic] = topics; + actions.push(new InsertAction("mqtt|message-sent", e, "tag", "sent", undefined, [name])); -// let responses = index.asValues(e, "response"); -// if(responses === undefined) continue; -// let [response] = responses; -// let {topic, payload} = index.asObject(response); -// actions.push(new InsertAction("server|sender", e, "tag", "sent", undefined, [name])); - let msg = "HARDCODED"; - this.sendMessage(e, msg); + this.sendMessage(e, topic, payload); } } } From 487aa18436f81a51bbb90ad42c6699b4d4f03e38 Mon Sep 17 00:00:00 2001 From: Jon Nordby Date: Sun, 13 Nov 2016 16:20:18 +0100 Subject: [PATCH 3/6] MQTT: Clean up code a bit Move (de)serialization to separate functions, remove unneccesary console.log --- src/runtime/databases/node/mqtt.ts | 79 ++++++++++++++++-------------- 1 file changed, 41 insertions(+), 38 deletions(-) diff --git a/src/runtime/databases/node/mqtt.ts b/src/runtime/databases/node/mqtt.ts index 281f8b718..c7d9f640c 100644 --- a/src/runtime/databases/node/mqtt.ts +++ b/src/runtime/databases/node/mqtt.ts @@ -9,6 +9,39 @@ import {Evaluation, Database} from "../../runtime"; import * as url from "url"; import * as mqtt from "mqtt"; + +function serializeMessage(payload : any) : string { + let serialized = payload.toString(); + if (typeof payload == 'boolean') { + serialized = payload ? 'true' : 'false'; + } else if (typeof payload == 'object') { + serialized = (payload) ? JSON.stringify(payload) : 'null'; + } else { + // treat as string + } + return serialized; +} + +function deserializeMessage(payload : string) : any { + let parsed : any = payload; + if (payload == 'true') { + parsed = true; + } else if (payload == 'false') { + parsed = false; + } else if (payload[0] == '{' || payload[0] == '[') { + try { + parsed = JSON.parse(payload); + } catch (_) { + } + } else { + try { + parsed = parseFloat(payload); + } catch (_) { + } + } + return parsed; +} + export class MqttDatabase extends Database { receiving: boolean; @@ -23,58 +56,40 @@ export class MqttDatabase extends Database { } setup() { - console.log('mqttdatabase setting up'); let broker = process.env.EVE_MQTT_BROKER || 'mqtt://localhost:1883'; let parsed = url.parse(broker); let auth = (parsed.auth || ':').split(':'); let options = { - port: parsed.port, + port: parsed.port || 1883, clientId: 'eve' + Math.random().toString(16).substr(2, 8), username: auth[0], password: auth[1] }; - let u = "mqtt://"+parsed.host; - let client = mqtt.connect(u, options); + let cleanedUrl = "mqtt://"+parsed.host; + let client = mqtt.connect(cleanedUrl, options); let onMessage = this.handleMqttMessage.bind(this); this.client = client; client.on('error', function(err) { console.error('MQTT error', err); }); client.on('connect', function() { - console.log('MQTT connected'); // TODO: be smarter, only subscribe to things there are bindings against client.subscribe("#", function(s) { client.on('message', onMessage); - console.log('MQTT subscribed', s); + console.log('MQTT subscribed to', cleanedUrl); }); }); } handleMqttMessage(topic, message, packet) { - console.log('MQTT got message', topic, message); + console.log('MQTT got message', topic, message.length); if(!this.receiving) { return console.log("Nothing is listening to MQTT messages"); } - let payload = message.toString(); - let parsed = payload; - if (payload == 'true') { - parsed = true; - } else if (payload == 'false') { - parsed = false; - } else if (payload[0] == '{' || payload[0] == '[') { - try { - parsed = JSON.parse(payload); - } catch (err) { - console.error("JSON parsing of MQTT message failed", err); - } - } else { - try { - parsed = parseFloat(payload); - } catch (_) { - } - } + // NOTE: assumes UTF-8, no support for binary/Buffer data + let parsed = deserializeMessage(message.toString()); let scopes = ["mqtt"]; let requestId = `request|${this.requestId++}|${(new Date()).getTime()}` @@ -114,21 +129,11 @@ export class MqttDatabase extends Database { sendMessage(requestId, topic, payload) { console.log('MQTT sendMessage', topic); - - let serialized = payload.toString(); - if (typeof payload == 'boolean') { - serialized = payload ? 'true' : 'false'; - } else if (typeof payload == 'object') { - serialized = (payload) ? JSON.stringify(payload) : 'null'; - } else { - // treat as string - } - + const serialized = serializeMessage(payload); this.client.publish(topic, serialized); } onFixpoint(evaluation: Evaluation, changes: Changes) { - console.log('MQTT fixpoint'); let name = evaluation.databaseToName(this); let result = changes.result({[name]: true}); let handled = {}; @@ -141,8 +146,6 @@ export class MqttDatabase extends Database { let isOutgoingMessage = index.lookup(e,"tag", "message") && index.lookup(e,"tag", "outgoing"); let isSent = index.lookup(e, "tag", "sent"); if(isOutgoingMessage && !isSent) { - console.log('MQTT outgoing msg'); - // TODO: error/warn if multiple payloads (not supported) let payloads = index.asValues(e, "payload"); if (payloads === undefined) { From 564a3bc26953a824bd7a87b0ccfe2a58f0472bb7 Mon Sep 17 00:00:00 2001 From: Jon Nordby Date: Sun, 13 Nov 2016 16:23:23 +0100 Subject: [PATCH 4/6] MQTT: Add #incoming to incoming messages To allow differentiating from #outgoing --- src/runtime/databases/node/mqtt.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/runtime/databases/node/mqtt.ts b/src/runtime/databases/node/mqtt.ts index c7d9f640c..2569f120b 100644 --- a/src/runtime/databases/node/mqtt.ts +++ b/src/runtime/databases/node/mqtt.ts @@ -95,6 +95,7 @@ export class MqttDatabase extends Database { let requestId = `request|${this.requestId++}|${(new Date()).getTime()}` let actions = [ new InsertAction("mqtt|tag", requestId, "tag", "message", undefined, scopes), + new InsertAction("mqtt|tag", requestId, "tag", "incoming", undefined, scopes), new InsertAction("mqtt|topic", requestId, "topic", topic, undefined, scopes), ]; From 1a5abdc48aacebed4eae09f1fb667bd76bfb6d11 Mon Sep 17 00:00:00 2001 From: Jon Nordby Date: Sun, 13 Nov 2016 16:27:40 +0100 Subject: [PATCH 5/6] MQTT: Rename example, since it is generic --- examples/mqtt.eve | 21 +++++++++++++++++++++ examples/mqtt_gpio.eve | 17 ----------------- 2 files changed, 21 insertions(+), 17 deletions(-) create mode 100644 examples/mqtt.eve delete mode 100644 examples/mqtt_gpio.eve diff --git a/examples/mqtt.eve b/examples/mqtt.eve new file mode 100644 index 000000000..f7ba70985 --- /dev/null +++ b/examples/mqtt.eve @@ -0,0 +1,21 @@ + +# MQTT + +## Listening to incoming messages + +```eve +search @mqtt + m = [#message #incoming topic payload] +bind @browser + [#div text: "MQTT message at {{topic}} : {{payload}}"] +``` + +## Sending messages + +```eve +search + i = range[from: 1 to: 3] + +commit @mqtt + [#message #outgoing topic: "eve/test/" + i, payload: i] +``` diff --git a/examples/mqtt_gpio.eve b/examples/mqtt_gpio.eve deleted file mode 100644 index b478ea677..000000000 --- a/examples/mqtt_gpio.eve +++ /dev/null @@ -1,17 +0,0 @@ - -# MQTT GPIO - -```eve -search @mqtt - m = [#message topic payload] -bind @browser - [#div text: "MQTT message at {{topic}} : {{payload}}"] -``` - -```eve -search - i = range[from: 1 to: 5] - -commit @mqtt - [#message #outgoing topic: "gpio/test/" + i, payload: i] -``` From 38e5fbd1d0511c758b0e35c357e1fa4598452948 Mon Sep 17 00:00:00 2001 From: Jon Nordby Date: Sun, 13 Nov 2016 18:18:05 +0100 Subject: [PATCH 6/6] MQTT: Add example of temperature data From public MQTT of hackerspace Bitraf, http://iot.bitraf.no --- examples/mqtt_temperature.eve | 40 +++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 examples/mqtt_temperature.eve diff --git a/examples/mqtt_temperature.eve b/examples/mqtt_temperature.eve new file mode 100644 index 000000000..7aa84d6a8 --- /dev/null +++ b/examples/mqtt_temperature.eve @@ -0,0 +1,40 @@ + +# MQTT +## Temperature +From mqtt://mqtt.bitraf.no see http://iot.bitraf.no +### Define the sensors +```eve +commit @shared + [#bitraf #temperature name: "lab" topic: "bitraf/temperature/1"] + [#bitraf #temperature name: "office" topic: "bitraf/temperature/2/value"] + [#bitraf #temperature name: "outside" topic: "bitraf/temperature/3/value"] +``` +### Show latest sensor values +```eve +search @shared + sensor = [#bitraf #temperature name topic] +search @mqtt + message = [#message #incoming topic: topic, payload] + sort[value: payload, per: topic] = 1 +bind @browser + [#div text: "{{name}}: {{payload}} C"] +``` + +## Testing + +```eve disabled +search @shared @mqtt + sensor = [#bitraf #temperature name topic] + message = [#message #incoming topic: topic, payload] + ix = sort[value: name] +bind @view + [#history | values: payload] +``` + +```eve disabled +search + i = range[from: 1 to: 3] + +commit @mqtt + [#message #outgoing topic: "eve/test/" + i, payload: i] +```