diff --git a/examples/mqtt.eve b/examples/mqtt.eve new file mode 100644 index 000000000..f7ba70985 --- /dev/null +++ b/examples/mqtt.eve @@ -0,0 +1,21 @@ + +# MQTT + +## Listening to incoming messages + +```eve +search @mqtt + m = [#message #incoming topic payload] +bind @browser + [#div text: "MQTT message at {{topic}} : {{payload}}"] +``` + +## Sending messages + +```eve +search + i = range[from: 1 to: 3] + +commit @mqtt + [#message #outgoing topic: "eve/test/" + i, payload: i] +``` diff --git a/examples/mqtt_temperature.eve b/examples/mqtt_temperature.eve new file mode 100644 index 000000000..7aa84d6a8 --- /dev/null +++ b/examples/mqtt_temperature.eve @@ -0,0 +1,40 @@ + +# MQTT +## Temperature +From mqtt://mqtt.bitraf.no see http://iot.bitraf.no +### Define the sensors +```eve +commit @shared + [#bitraf #temperature name: "lab" topic: "bitraf/temperature/1"] + [#bitraf #temperature name: "office" topic: "bitraf/temperature/2/value"] + [#bitraf #temperature name: "outside" topic: "bitraf/temperature/3/value"] +``` +### Show latest sensor values +```eve +search @shared + sensor = [#bitraf #temperature name topic] +search @mqtt + message = [#message #incoming topic: topic, payload] + sort[value: payload, per: topic] = 1 +bind @browser + [#div text: "{{name}}: {{payload}} C"] +``` + +## Testing + +```eve disabled +search @shared @mqtt + sensor = [#bitraf #temperature name topic] + message = [#message #incoming topic: topic, payload] + ix = sort[value: name] +bind @view + [#history | values: payload] +``` + +```eve disabled +search + i = range[from: 1 to: 3] + +commit @mqtt + [#message #outgoing topic: "eve/test/" + i, payload: i] +``` diff --git a/package.json b/package.json index 7cc0eaa09..8d3def418 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ "@types/glob": "^5.0.30", "@types/minimist": "^1.1.29", "@types/mkdirp": "^0.3.29", + "@types/mqtt": "0.0.32", "@types/node": "^6.0.41", "@types/request": "0.0.31", "@types/tape": "^4.2.28", @@ -17,6 +18,7 @@ "glob": "^7.1.1", "minimist": "^1.2.0", "mkdirp": "^0.5.1", + "mqtt": "^2.0.1", "node-uuid": "^1.4.7", "request": "^2.75.0", "typescript": "^2.0.3", diff --git a/src/runtime/databases/node/mqtt.ts b/src/runtime/databases/node/mqtt.ts new file mode 100644 index 000000000..2569f120b --- /dev/null +++ b/src/runtime/databases/node/mqtt.ts @@ -0,0 +1,176 @@ +//--------------------------------------------------------------------- +// Node Server Database +//--------------------------------------------------------------------- + +import {InsertAction} from "../../actions" +import {Changes} from "../../changes"; +import {Evaluation, Database} from "../../runtime"; + +import * as url from "url"; +import * as mqtt from "mqtt"; + + +function serializeMessage(payload : any) : string { + let serialized = payload.toString(); + if (typeof payload == 'boolean') { + serialized = payload ? 'true' : 'false'; + } else if (typeof payload == 'object') { + serialized = (payload) ? JSON.stringify(payload) : 'null'; + } else { + // treat as string + } + return serialized; +} + +function deserializeMessage(payload : string) : any { + let parsed : any = payload; + if (payload == 'true') { + parsed = true; + } else if (payload == 'false') { + parsed = false; + } else if (payload[0] == '{' || payload[0] == '[') { + try { + parsed = JSON.parse(payload); + } catch (_) { + } + } else { + try { + parsed = parseFloat(payload); + } catch (_) { + } + } + return parsed; +} + +export class MqttDatabase extends Database { + + receiving: boolean; + requestId: number; + client: mqtt.Client; + + constructor() { + super(); + this.requestId = 0; + this.receiving = false; + this.client = null; + } + + setup() { + let broker = process.env.EVE_MQTT_BROKER || 'mqtt://localhost:1883'; + let parsed = url.parse(broker); + let auth = (parsed.auth || ':').split(':'); + let options = { + port: parsed.port || 1883, + clientId: 'eve' + Math.random().toString(16).substr(2, 8), + username: auth[0], + password: auth[1] + }; + let cleanedUrl = "mqtt://"+parsed.host; + let client = mqtt.connect(cleanedUrl, options); + let onMessage = this.handleMqttMessage.bind(this); + this.client = client; + client.on('error', function(err) { + console.error('MQTT error', err); + }); + client.on('connect', function() { + // TODO: be smarter, only subscribe to things there are bindings against + client.subscribe("#", function(s) { + client.on('message', onMessage); + console.log('MQTT subscribed to', cleanedUrl); + }); + }); + } + + handleMqttMessage(topic, message, packet) { + console.log('MQTT got message', topic, message.length); + + if(!this.receiving) { + return console.log("Nothing is listening to MQTT messages"); + } + + // NOTE: assumes UTF-8, no support for binary/Buffer data + let parsed = deserializeMessage(message.toString()); + + let scopes = ["mqtt"]; + let requestId = `request|${this.requestId++}|${(new Date()).getTime()}` + let actions = [ + new InsertAction("mqtt|tag", requestId, "tag", "message", undefined, scopes), + new InsertAction("mqtt|tag", requestId, "tag", "incoming", undefined, scopes), + new InsertAction("mqtt|topic", requestId, "topic", topic, undefined, scopes), + ]; + +// TODO: implement entry setting like server.ts does? +// if(parsed && typeof parsed === "object") { +// let bodyId = `${requestId}|body`; +// for(let key of Object.keys(body)) { +// actions.push(new InsertAction("mqtt|message-entry", bodyId, key, body[key], undefined, scopes)); +// } +// body = bodyId; +// } + actions.push(new InsertAction("mqtt|message-payload", requestId, "payload", parsed, undefined, scopes)) + + let evaluation = this.evaluations[0]; + evaluation.executeActions(actions); + } + + analyze(evaluation: Evaluation, db: Database) { + for(let block of db.blocks) { + for(let scan of block.parse.scanLike) { + if(scan.type === "record" && scan.scopes.indexOf("mqtt") > -1) { + for(let attribute of scan.attributes) { + if(attribute.attribute === "tag" && attribute.value.value === "message") { + console.log('MQTT found listener'); + this.receiving = true; + } + } + } + } + } + } + + sendMessage(requestId, topic, payload) { + console.log('MQTT sendMessage', topic); + const serialized = serializeMessage(payload); + this.client.publish(topic, serialized); + } + + onFixpoint(evaluation: Evaluation, changes: Changes) { + let name = evaluation.databaseToName(this); + let result = changes.result({[name]: true}); + let handled = {}; + let index = this.index; + let actions = []; + for(let insert of result.insert) { + let [e,a,v] = insert; + if(!handled[e]) { + handled[e] = true; + let isOutgoingMessage = index.lookup(e,"tag", "message") && index.lookup(e,"tag", "outgoing"); + let isSent = index.lookup(e, "tag", "sent"); + if(isOutgoingMessage && !isSent) { + // TODO: error/warn if multiple payloads (not supported) + let payloads = index.asValues(e, "payload"); + if (payloads === undefined) { + console.error("no payloads for outgoing message") + continue; + } + let [payload] = payloads; + + // TODO: support multiple topics, or error/warn + let topics = index.asValues(e, "topic"); + let [topic] = topics; + + actions.push(new InsertAction("mqtt|message-sent", e, "tag", "sent", undefined, [name])); + + this.sendMessage(e, topic, payload); + } + } + } + if(actions.length) { + process.nextTick(() => { + evaluation.executeActions(actions); + }) + } + } +} + + diff --git a/src/runtime/server.ts b/src/runtime/server.ts index 2fb0182b1..cb1fb283e 100644 --- a/src/runtime/server.ts +++ b/src/runtime/server.ts @@ -14,6 +14,7 @@ import {ActionImplementations} from "./actions"; import {PersistedDatabase} from "./databases/persisted"; import {HttpDatabase} from "./databases/node/http"; import {ServerDatabase} from "./databases/node/server"; +import {MqttDatabase} from "./databases/node/mqtt"; import {RuntimeClient} from "./runtimeClient"; //--------------------------------------------------------------------- @@ -34,6 +35,8 @@ const contentTypes = { const BROWSER = !argv["server"]; const PORT = process.env.PORT || 8080; const serverDatabase = new ServerDatabase(); +const mqttDatabase = new MqttDatabase(); +mqttDatabase.setup(); const shared = new PersistedDatabase(); global["browser"] = false; @@ -94,6 +97,7 @@ class ServerRuntimeClient extends RuntimeClient { constructor(socket:WebSocket, withIDE = true) { const dbs = { "http": new HttpDatabase(), + "mqtt": mqttDatabase, "shared": shared, } super(dbs, withIDE);