From 1cc5dcb4cdcd95d35117c755eb2c48ca7e21a7a1 Mon Sep 17 00:00:00 2001 From: iotcat Date: Mon, 1 Jul 2019 18:01:06 +0800 Subject: [PATCH] auto update --- yimian/iot/mqtt/smartfarm/mqtt-server.js | 256 ++++++++++++++++++++--- 1 file changed, 230 insertions(+), 26 deletions(-) diff --git a/yimian/iot/mqtt/smartfarm/mqtt-server.js b/yimian/iot/mqtt/smartfarm/mqtt-server.js index 7bc7771f..74b4247d 100755 --- a/yimian/iot/mqtt/smartfarm/mqtt-server.js +++ b/yimian/iot/mqtt/smartfarm/mqtt-server.js @@ -5,7 +5,7 @@ var mqtt_server = function (o_params) { var o = { - MaxWaitTime: 10000, + MaxWaitTime: 1000, port: 30032, sql: { host: 'cn.db.yimian.xyz', @@ -14,7 +14,10 @@ var mqtt_server = function (o_params) { port: '3306', database: 'smartfarm' }, - debug: true, + debug: false, + intervalTime: 3000, + CheckMinTime: 1000, + MaxTryTimes: 3, }; @@ -28,6 +31,13 @@ var mqtt_server = function (o_params) { const redis = require('redis'); /* tmp global var */ + var timer = null; + var fStatus = { + node0: 'disconnect', + node1: 'disconnect', + station: 'disconnect', + waterSys: 'disconnect' + }; var cache = { node0: { status: null, @@ -36,7 +46,8 @@ var mqtt_server = function (o_params) { temperature: null, humidity: null, BeginTime: null, - EndTime: null + EndTime: null, + qos: null, }, node1: { status: null, @@ -45,25 +56,30 @@ var mqtt_server = function (o_params) { temperature: null, humidity: null, BeginTime: null, - EndTime: null + EndTime: null, + qos: null, }, station: { status: null, batteryLevel: null, - waterSwitch: null, + light: null, temperature: null, humidity: null, + rainfall: null, + CO: null, + NH3: null, + airPressure: null, BeginTime: null, - EndTime: null + EndTime: null, + qos: null, }, waterSys: { status: null, - batteryLevel: null, - waterSwitch: null, - temperature: null, - humidity: null, + pump0: null, + pump1: null, BeginTime: null, - EndTime: null + EndTime: null, + qos: null, } } @@ -88,6 +104,9 @@ var mqtt_server = function (o_params) { /* mqtt events */ + mqtt_broker.on('ready', function(){ + console.log((new Date((new Date()).getTime())).toLocaleString()+' - mqtt broker ready at port '+o.port); + }); mqtt_broker.on('published', function (packet, client) { switch (packet.topic) { /* node0 */ @@ -106,6 +125,9 @@ var mqtt_server = function (o_params) { case 'res/node0/humidity': tools.cache.push('humidity', packet.payload.toString(), 'node0'); break; + case 'qos/node0': + tools.cache.push('qos', packet.payload.toString(), 'node0'); + break; /* node 1*/ case 'res/node1/status': @@ -124,6 +146,56 @@ var mqtt_server = function (o_params) { case 'res/node1/humidity': tools.cache.push('humidity', packet.payload.toString(), 'node1'); break; + case 'qos/node1': + tools.cache.push('qos', packet.payload.toString(), 'node1'); + break; + + + /* station*/ + case 'res/station/status': + tools.cache.push('status', packet.payload.toString(), 'station'); + break; + case 'res/station/batteryLevel': + tools.cache.push('batteryLevel', packet.payload.toString(), 'station'); + break; + case 'res/station/light': + tools.cache.push('light', packet.payload.toString(), 'station'); + break; + case 'res/station/temperature': + tools.cache.push('temperature', packet.payload.toString(), 'station'); + break; + case 'res/station/humidity': + tools.cache.push('humidity', packet.payload.toString(), 'station'); + break; + case 'res/station/rainfall': + tools.cache.push('rainfall', packet.payload.toString(), 'station'); + break; + case 'res/station/CO': + tools.cache.push('CO', packet.payload.toString(), 'station'); + break; + case 'res/station/NH3': + tools.cache.push('NH3', packet.payload.toString(), 'station'); + break; + case 'res/station/airPressure': + tools.cache.push('airPressure', packet.payload.toString(), 'station'); + break; + case 'qos/station': + tools.cache.push('qos', packet.payload.toString(), 'station'); + break; + + /* waterSys*/ + case 'res/waterSys/status': + tools.cache.push('status', packet.payload.toString(), 'waterSys'); + break; + case 'res/waterSys/pump0': + tools.cache.push('pump0', packet.payload.toString(), 'waterSys'); + break; + case 'res/waterSys/pump1': + tools.cache.push('pump1', packet.payload.toString(), 'waterSys'); + break; + case 'qos/waterSys': + tools.cache.push('qos', packet.payload.toString(), 'waterSys'); + break; } }); @@ -134,24 +206,61 @@ var mqtt_server = function (o_params) { cache:{ push: function(t, s, name){ - var obj = cache[name]; + var f = tools.sql[name].push; - rc.hset('sf/'+name, t, s); - if(obj.BeginTime && (new Date()).valueOf() - obj.BeginTime > o.MaxWaitTime) tools.obj.reset(o); + if(t != 'qos') rc.hset('sf/'+name, t, s); + rc.hset('sf/LastConnectTime', name, (new Date()).valueOf()); + if(cache[name].BeginTime && (new Date()).valueOf() - cache[name].BeginTime > o.MaxWaitTime) tools.obj.reset(cache[name]); - if(tools.obj.getNum(obj, null) == Object.keys(obj).length){ - obj.BeginTime = (new Date()).valueOf(); - rc.hset('sf/'+name, 'BeginTime', obj.BeginTime); + if(tools.obj.getNum(cache[name], null) == Object.keys(cache[name]).length){ + cache[name].BeginTime = (new Date()).valueOf(); + rc.hset('sf/'+name, 'BeginTime', cache[name].BeginTime); } - if(obj.hasOwnProperty(t)) obj[t] = s; + if(t == 'qos'){ + rc.hset('sf/clientStatus', name, 'connect'); + if(s == -1){ + rc.hset("sf/"+name, 'qos', '-1'); + cache[name].qos = -1; + }else{ + rc.get('sf/sync/'+s, function(err, res){ + if(err){ + rc.hset("sf/"+name, 'qos', '-1'); + cache[name].qos = -1; + }else{ + rc.hset("sf/"+name, 'qos', (new Date()).valueOf() - res); + cache[name].qos = (new Date()).valueOf() - res; + } + if(tools.obj.getNum(cache[name], null) == 1){ + cache[name].EndTime = (new Date()).valueOf(); + rc.hset('sf/'+name, 'EndTime', cache[name].EndTime); + rc.publish('sf/chnnel/'+name, 'ok'); + if(o.debug) console.log(cache[name]); + if(o.debug && false){ + rc.hkeys("sf/"+name, function (err, replies) { + console.log(replies.length + " replies:"); + replies.forEach(function (reply, i) { + rc.hget("sf/"+name, reply, function(err, res){ + console.log(reply+': '+res); + }); + }); + }); + } + f(cache[name]); + tools.obj.reset(cache[name]); + } + }); + } + } + + else if(cache[name].hasOwnProperty(t)) cache[name][t] = s; - if(tools.obj.getNum(obj, null) == 1){ - obj.EndTime = (new Date()).valueOf(); - rc.hset('sf/'+name, 'EndTime', obj.EndTime); + if(tools.obj.getNum(cache[name], null) == 1){ + cache[name].EndTime = (new Date()).valueOf(); + rc.hset('sf/'+name, 'EndTime', cache[name].EndTime); rc.publish('sf/chnnel/'+name, 'ok'); - console.log(obj); - if(o.debug){ + if(o.debug) console.log(cache[name]); + if(o.debug && false){ rc.hkeys("sf/"+name, function (err, replies) { console.log(replies.length + " replies:"); replies.forEach(function (reply, i) { @@ -161,7 +270,8 @@ var mqtt_server = function (o_params) { }); }); } - f(obj); + f(cache[name]); + tools.obj.reset(cache[name]); } }, }, @@ -189,9 +299,44 @@ var mqtt_server = function (o_params) { node:{ push: function(id, obj){ var addSql = 'INSERT INTO node(timestamp,id,status,qos,batterylevel,waterswitch,temperature,humidity) VALUES(?,?,?,?,?,?,?,?)'; - var addSqlParams = [new Date(obj.BeginTime).toISOString().slice(0, 19).replace('T', ' '), id, obj.status, 1, obj.batteryLevel, obj.waterSwitch, obj.temperature, obj.humidity]; + var addSqlParams = [new Date(obj.BeginTime).toISOString().slice(0, 19).replace('T', ' '), id, obj.status, obj.qos, obj.batteryLevel, obj.waterSwitch, obj.temperature, obj.humidity]; + sqlCnt.query(addSql,addSqlParams,function (err, result) { + if(err){ + console.log('[INSERT ERROR] - ',err.message); + return; + } + }); + }, + }, + station:{ + push: function(obj){ + var addSql = 'INSERT INTO station(timestamp,status,qos,batterylevel,light,temperature,humidity,rainfall,co,nh3,airpressure) VALUES(?,?,?,?,?,?,?,?,?,?,?)'; + var addSqlParams = [new Date(obj.BeginTime).toISOString().slice(0, 19).replace('T', ' '), obj.status, obj.qos, obj.batteryLevel, obj.light, obj.temperature, obj.humidity, obj.rainfall, obj.CO, obj.NH3, obj.airPressure]; + sqlCnt.query(addSql,addSqlParams,function (err, result) { + if(err){ + console.log('[INSERT ERROR] - ',err.message); + return; + } + }); + }, + }, + waterSys:{ + push: function(obj){ + var addSql = 'INSERT INTO watersys(timestamp,status,qos,pump0,pump1) VALUES(?,?,?,?,?)'; + var addSqlParams = [new Date(obj.BeginTime).toISOString().slice(0, 19).replace('T', ' '), obj.status, obj.qos, obj.pump0, obj.pump1]; + sqlCnt.query(addSql,addSqlParams,function (err, result) { + if(err){ + console.log('[INSERT ERROR] - ',err.message); + return; + } + }); + }, + }, + mqttLog:{ + push: function(obj){ + var addSql = 'INSERT INTO mqtt_log(timestamp,client,event,delay) VALUES(?,?,?,?)'; + var addSqlParams = [(new Date()).toISOString().slice(0, 19).replace('T', ' '), obj.client, obj.event, obj.delay]; sqlCnt.query(addSql,addSqlParams,function (err, result) { - tools.obj.reset(obj); if(err){ console.log('[INSERT ERROR] - ',err.message); return; @@ -201,6 +346,65 @@ var mqtt_server = function (o_params) { }, }, } + + + + /* timer */ + timer = function(){ + return setInterval(function(){ + var key = Math.floor(Math.random()*100); + rc.set('sf/sync/'+key, (new Date()).valueOf()); + mqtt_broker.publish({ topic:'qos/sync', payload: key.toString()}); + }, o.intervalTime); + }(); + + setInterval(function(){ + rc.hkeys('sf/LastConnectTime', function(err, keys){ + if(!err){ + keys.forEach(function(key, i){ + rc.hget('sf/clientStatus', key, function(err3, clientStatus){ + if(!err3){ + if(clientStatus != 'disconnect'){ + rc.hget('sf/LastConnectTime', key, function(err2, val){ + if(!err2){ + if((new Date()).valueOf() - val > o.MaxWaitTime + o.MaxTryTimes * o.intervalTime){ + rc.hset('sf/clientStatus', key, 'disconnect'); + } + } + }); + } + } + }); + }); + } + }); + rc.hkeys('sf/clientStatus', function(err, keys){ + if(!err){ + keys.forEach(function(key, i){ + rc.hget('sf/clientStatus', key, function(err2, val){ + if(!err2){ + if(val != fStatus[key]){ + rc.hget('sf/'+key, 'qos', function(err3, qos){ + if(!err3){ + tools.sql.mqttLog.push({ + client: key, + event: val, + delay: qos + }); + fStatus[key] = val; + console.log((new Date((new Date()).getTime())).toLocaleString()+' - '+key+': '+val); + } + }); + } + } + }); + }); + } + }) + }, o.CheckMinTime); + + + return o; };