From de66c8e79de314a61664822721b7c296bd00c1b4 Mon Sep 17 00:00:00 2001 From: iotcat Date: Mon, 1 Jul 2019 15:01:13 +0800 Subject: [PATCH] auto update --- yimian/iot/mqtt/smartfarm/autoStart.sh | 2 +- yimian/iot/mqtt/smartfarm/index.js | 4 + yimian/iot/mqtt/smartfarm/mqtt-server.js | 370 +++++++++++++---------- 3 files changed, 217 insertions(+), 159 deletions(-) create mode 100644 yimian/iot/mqtt/smartfarm/index.js diff --git a/yimian/iot/mqtt/smartfarm/autoStart.sh b/yimian/iot/mqtt/smartfarm/autoStart.sh index 3622a679..d2154e18 100755 --- a/yimian/iot/mqtt/smartfarm/autoStart.sh +++ b/yimian/iot/mqtt/smartfarm/autoStart.sh @@ -1,2 +1,2 @@ #!/bin/bash -forever /home/yimian/iot/mqtt/smartfarm/mqtt-server.js +forever /home/yimian/iot/mqtt/smartfarm/index.js diff --git a/yimian/iot/mqtt/smartfarm/index.js b/yimian/iot/mqtt/smartfarm/index.js new file mode 100644 index 00000000..627eba69 --- /dev/null +++ b/yimian/iot/mqtt/smartfarm/index.js @@ -0,0 +1,4 @@ +const smartfarm = require('./mqtt-server'); +var broker = smartfarm.broker(); + + diff --git a/yimian/iot/mqtt/smartfarm/mqtt-server.js b/yimian/iot/mqtt/smartfarm/mqtt-server.js index 16836991..7bc7771f 100755 --- a/yimian/iot/mqtt/smartfarm/mqtt-server.js +++ b/yimian/iot/mqtt/smartfarm/mqtt-server.js @@ -1,158 +1,212 @@ -const mosca = require('mosca'); -const mysql = require('mysql'); -const fs = require('fs'); - - -var g_MaxWaitTime = 10000 - -var settings = { - port: 30032 -}; - -var server = new mosca.Server(settings); - -var connection = mysql.createConnection({ - host : 'cn.db.yimian.xyz', - user : 'smartfarm', - password : fs.readFileSync("smartfarm.db.key").toString().replace(/\s+/g,""), - port: '3306', - database: 'smartfarm' -}); - -connection.connect(); - - - -server.on('published', function (packet, client) { - switch (packet.topic) { - case 'res/node0/status': - //console.log(packet.payload.toString()); - pushElement('status', packet.payload.toString(), node0Cache, storeNode0Data); - break; - case 'res/node0/batteryLevel': - //console.log(packet.payload.toString()); - pushElement('batteryLevel', packet.payload.toString(), node0Cache, storeNode0Data); - break; - case 'res/node0/waterSwitch': - //console.log(packet.payload.toString()); - pushElement('waterSwitch', packet.payload.toString(), node0Cache, storeNode0Data); - break; - - case 'res/node0/temperature': - //console.log(packet.payload.toString()); - pushElement('temperature', packet.payload.toString(), node0Cache, storeNode0Data); - break; - case 'res/node0/humidity': - //console.log(packet.payload.toString()); - pushElement('humidity', packet.payload.toString(), node0Cache, storeNode0Data); - break; - - case 'res/node1/status': - //console.log(packet.payload.toString()); - pushElement('status', packet.payload.toString(), node1Cache, storeNode1Data); - break; - case 'res/node1/batteryLevel': - //console.log(packet.payload.toString()); - pushElement('batteryLevel', packet.payload.toString(), node1Cache, storeNode1Data); - break; - case 'res/node1/waterSwitch': - //console.log(packet.payload.toString()); - pushElement('waterSwitch', packet.payload.toString(), node1Cache, storeNode1Data); - break; - - case 'res/node1/temperature': - //console.log(packet.payload.toString()); - pushElement('temperature', packet.payload.toString(), node1Cache, storeNode1Data); - break; - case 'res/node1/humidity': - //console.log(packet.payload.toString()); - pushElement('humidity', packet.payload.toString(), node1Cache, storeNode1Data); - break; - - } -}); - -var node0Cache = { - status: null, - batteryLevel: null, - waterSwitch: null, - temperature: null, - humidity: null, - BeginTime: null, - EndTime: null, -} - -var node1Cache = { - status: null, - batteryLevel: null, - waterSwitch: null, - temperature: null, - humidity: null, - BeginTime: null, - EndTime: null, -} - - - - -var pushElement = function(t, s, o, f){ - if(o.BeginTime && Date.parse(new Date()) - o.BeginTime > g_MaxWaitTime) objReset(o); - - if(getNumInObj(o, null) == Object.keys(o).length){ - o.BeginTime = Date.parse(new Date()); - } - - if(o.hasOwnProperty(t)) o[t] = s; - - if(getNumInObj(o, null) == 1){ - o.EndTime = Date.parse(new Date()); - console.log(o); - f(o); - } - -}; - -var objReset = function(o){ - - for(i in o){ - - o[i] = null; - } -} - - -var getNumInObj = function(o, t){ - var c = 0; - for(i in o){ - - if(o[i] == t) c ++; - } - return c; -} - -var sql = 'SELECT * FROM api'; - -connection.query(sql,function (err, result) { - if(err){ - console.log('[SELECT ERROR] - ',err.message); - return; - } - console.log(result); -}); - -var storeNode0Data = (o)=>{storeNodeData(0, o)}; -var storeNode1Data = (o)=>{storeNodeData(1, o)}; - -var storeNodeData = function(id, o){ - -var addSql = 'INSERT INTO node(timestamp,id,status,qos,batterylevel,waterswitch,temperature,humidity) VALUES(?,?,?,?,?,?,?,?)'; - var addSqlParams = [new Date(o.BeginTime).toISOString().slice(0, 19).replace('T', ' '), id, o.status, 1, o.batteryLevel, o.waterSwitch, o.temperature, o.humidity]; - - connection.query(addSql,addSqlParams,function (err, result) { - objReset(o); - if(err){ - console.log('[INSERT ERROR] - ',err.message); - return; - } - }); -} + + + + + +var mqtt_server = function (o_params) { + var o = { + MaxWaitTime: 10000, + port: 30032, + sql: { + host: 'cn.db.yimian.xyz', + user: 'smartfarm', + password: null, + port: '3306', + database: 'smartfarm' + }, + debug: true, + + }; + + /* merge paras */ + Object.assign(o, o_params); + + /* require packages */ + const mosca = require('mosca'); + const mysql = require('mysql'); + const fs = require('fs'); + const redis = require('redis'); + + /* tmp global var */ + var cache = { + node0: { + status: null, + batteryLevel: null, + waterSwitch: null, + temperature: null, + humidity: null, + BeginTime: null, + EndTime: null + }, + node1: { + status: null, + batteryLevel: null, + waterSwitch: null, + temperature: null, + humidity: null, + BeginTime: null, + EndTime: null + }, + station: { + status: null, + batteryLevel: null, + waterSwitch: null, + temperature: null, + humidity: null, + BeginTime: null, + EndTime: null + }, + waterSys: { + status: null, + batteryLevel: null, + waterSwitch: null, + temperature: null, + humidity: null, + BeginTime: null, + EndTime: null + } + } + + /* mqtt ini */ + var mqtt_broker = new mosca.Server({port: o.port}); + + /* mysql ini */ + var sqlCnt = mysql.createConnection({ + host : o.sql.host, + user : o.sql.user, + password : o.sql.password || fs.readFileSync("smartfarm.db.key").toString().replace(/\s+/g,""), + port: o.sql.port, + database: o.sql.database + }); + sqlCnt.connect(); + + /* redis ini */ + var rc = redis.createClient(); + rc.on("error", function (err) { + console.log("Redis Error " + err); + }); + + + /* mqtt events */ + mqtt_broker.on('published', function (packet, client) { + switch (packet.topic) { + /* node0 */ + case 'res/node0/status': + tools.cache.push('status', packet.payload.toString(), 'node0'); + break; + case 'res/node0/batteryLevel': + tools.cache.push('batteryLevel', packet.payload.toString(), 'node0'); + break; + case 'res/node0/waterSwitch': + tools.cache.push('waterSwitch', packet.payload.toString(), 'node0'); + break; + case 'res/node0/temperature': + tools.cache.push('temperature', packet.payload.toString(), 'node0'); + break; + case 'res/node0/humidity': + tools.cache.push('humidity', packet.payload.toString(), 'node0'); + break; + + /* node 1*/ + case 'res/node1/status': + tools.cache.push('status', packet.payload.toString(), 'node1'); + break; + case 'res/node1/batteryLevel': + tools.cache.push('batteryLevel', packet.payload.toString(), 'node1'); + break; + case 'res/node1/waterSwitch': + tools.cache.push('waterSwitch', packet.payload.toString(), 'node1'); + break; + + case 'res/node1/temperature': + tools.cache.push('temperature', packet.payload.toString(), 'node1'); + break; + case 'res/node1/humidity': + tools.cache.push('humidity', packet.payload.toString(), 'node1'); + break; + + } + }); + + + /* tools */ + var tools = { + + cache:{ + push: function(t, s, name){ + var obj = cache[name]; + var f = tools.sql[name].push; + rc.hset('sf/'+name, t, s); + if(obj.BeginTime && (new Date()).valueOf() - obj.BeginTime > o.MaxWaitTime) tools.obj.reset(o); + + if(tools.obj.getNum(obj, null) == Object.keys(obj).length){ + obj.BeginTime = (new Date()).valueOf(); + rc.hset('sf/'+name, 'BeginTime', obj.BeginTime); + } + + if(obj.hasOwnProperty(t)) obj[t] = s; + + if(tools.obj.getNum(obj, null) == 1){ + obj.EndTime = (new Date()).valueOf(); + rc.hset('sf/'+name, 'EndTime', obj.EndTime); + rc.publish('sf/chnnel/'+name, 'ok'); + console.log(obj); + if(o.debug){ + rc.hkeys("sf/"+name, function (err, replies) { + console.log(replies.length + " replies:"); + replies.forEach(function (reply, i) { + rc.hget("sf/"+name, reply, function(err, res){ + console.log(reply+': '+res); + }); + }); + }); + } + f(obj); + } + }, + }, + obj: { + reset: function(obj){ + for(i in obj){ + obj[i] = null; + } + }, + getNum: function(obj, t){ + var c = 0; + for(i in obj){ + if(obj[i] == t) c ++; + } + return c; + }, + }, + sql: { + node0:{ + push: (obj)=>{tools.sql.node.push(0, obj)}, + }, + node1:{ + push: (obj)=>{tools.sql.node.push(1, obj)}, + }, + node:{ + push: function(id, obj){ + var addSql = 'INSERT INTO node(timestamp,id,status,qos,batterylevel,waterswitch,temperature,humidity) VALUES(?,?,?,?,?,?,?,?)'; + var addSqlParams = [new Date(obj.BeginTime).toISOString().slice(0, 19).replace('T', ' '), id, obj.status, 1, obj.batteryLevel, obj.waterSwitch, obj.temperature, obj.humidity]; + sqlCnt.query(addSql,addSqlParams,function (err, result) { + tools.obj.reset(obj); + if(err){ + console.log('[INSERT ERROR] - ',err.message); + return; + } + }); + }, + }, + }, + } + return o; +}; + + + + +/* exports */ +exports.broker = mqtt_server; +