auto update

dependabot/npm_and_yarn/ushio/www/session/lodash-4.17.15
iotcat 5 years ago
parent de66c8e79d
commit 1cc5dcb4cd
  1. 256
      yimian/iot/mqtt/smartfarm/mqtt-server.js

@ -5,7 +5,7 @@
var mqtt_server = function (o_params) {
var o = {
MaxWaitTime: 10000,
MaxWaitTime: 1000,
port: 30032,
sql: {
host: 'cn.db.yimian.xyz',
@ -14,7 +14,10 @@ var mqtt_server = function (o_params) {
port: '3306',
database: 'smartfarm'
},
debug: true,
debug: false,
intervalTime: 3000,
CheckMinTime: 1000,
MaxTryTimes: 3,
};
@ -28,6 +31,13 @@ var mqtt_server = function (o_params) {
const redis = require('redis');
/* tmp global var */
var timer = null;
var fStatus = {
node0: 'disconnect',
node1: 'disconnect',
station: 'disconnect',
waterSys: 'disconnect'
};
var cache = {
node0: {
status: null,
@ -36,7 +46,8 @@ var mqtt_server = function (o_params) {
temperature: null,
humidity: null,
BeginTime: null,
EndTime: null
EndTime: null,
qos: null,
},
node1: {
status: null,
@ -45,25 +56,30 @@ var mqtt_server = function (o_params) {
temperature: null,
humidity: null,
BeginTime: null,
EndTime: null
EndTime: null,
qos: null,
},
station: {
status: null,
batteryLevel: null,
waterSwitch: null,
light: null,
temperature: null,
humidity: null,
rainfall: null,
CO: null,
NH3: null,
airPressure: null,
BeginTime: null,
EndTime: null
EndTime: null,
qos: null,
},
waterSys: {
status: null,
batteryLevel: null,
waterSwitch: null,
temperature: null,
humidity: null,
pump0: null,
pump1: null,
BeginTime: null,
EndTime: null
EndTime: null,
qos: null,
}
}
@ -88,6 +104,9 @@ var mqtt_server = function (o_params) {
/* mqtt events */
mqtt_broker.on('ready', function(){
console.log((new Date((new Date()).getTime())).toLocaleString()+' - mqtt broker ready at port '+o.port);
});
mqtt_broker.on('published', function (packet, client) {
switch (packet.topic) {
/* node0 */
@ -106,6 +125,9 @@ var mqtt_server = function (o_params) {
case 'res/node0/humidity':
tools.cache.push('humidity', packet.payload.toString(), 'node0');
break;
case 'qos/node0':
tools.cache.push('qos', packet.payload.toString(), 'node0');
break;
/* node 1*/
case 'res/node1/status':
@ -124,6 +146,56 @@ var mqtt_server = function (o_params) {
case 'res/node1/humidity':
tools.cache.push('humidity', packet.payload.toString(), 'node1');
break;
case 'qos/node1':
tools.cache.push('qos', packet.payload.toString(), 'node1');
break;
/* station*/
case 'res/station/status':
tools.cache.push('status', packet.payload.toString(), 'station');
break;
case 'res/station/batteryLevel':
tools.cache.push('batteryLevel', packet.payload.toString(), 'station');
break;
case 'res/station/light':
tools.cache.push('light', packet.payload.toString(), 'station');
break;
case 'res/station/temperature':
tools.cache.push('temperature', packet.payload.toString(), 'station');
break;
case 'res/station/humidity':
tools.cache.push('humidity', packet.payload.toString(), 'station');
break;
case 'res/station/rainfall':
tools.cache.push('rainfall', packet.payload.toString(), 'station');
break;
case 'res/station/CO':
tools.cache.push('CO', packet.payload.toString(), 'station');
break;
case 'res/station/NH3':
tools.cache.push('NH3', packet.payload.toString(), 'station');
break;
case 'res/station/airPressure':
tools.cache.push('airPressure', packet.payload.toString(), 'station');
break;
case 'qos/station':
tools.cache.push('qos', packet.payload.toString(), 'station');
break;
/* waterSys*/
case 'res/waterSys/status':
tools.cache.push('status', packet.payload.toString(), 'waterSys');
break;
case 'res/waterSys/pump0':
tools.cache.push('pump0', packet.payload.toString(), 'waterSys');
break;
case 'res/waterSys/pump1':
tools.cache.push('pump1', packet.payload.toString(), 'waterSys');
break;
case 'qos/waterSys':
tools.cache.push('qos', packet.payload.toString(), 'waterSys');
break;
}
});
@ -134,24 +206,61 @@ var mqtt_server = function (o_params) {
cache:{
push: function(t, s, name){
var obj = cache[name];
var f = tools.sql[name].push;
rc.hset('sf/'+name, t, s);
if(obj.BeginTime && (new Date()).valueOf() - obj.BeginTime > o.MaxWaitTime) tools.obj.reset(o);
if(t != 'qos') rc.hset('sf/'+name, t, s);
rc.hset('sf/LastConnectTime', name, (new Date()).valueOf());
if(cache[name].BeginTime && (new Date()).valueOf() - cache[name].BeginTime > o.MaxWaitTime) tools.obj.reset(cache[name]);
if(tools.obj.getNum(obj, null) == Object.keys(obj).length){
obj.BeginTime = (new Date()).valueOf();
rc.hset('sf/'+name, 'BeginTime', obj.BeginTime);
if(tools.obj.getNum(cache[name], null) == Object.keys(cache[name]).length){
cache[name].BeginTime = (new Date()).valueOf();
rc.hset('sf/'+name, 'BeginTime', cache[name].BeginTime);
}
if(obj.hasOwnProperty(t)) obj[t] = s;
if(t == 'qos'){
rc.hset('sf/clientStatus', name, 'connect');
if(s == -1){
rc.hset("sf/"+name, 'qos', '-1');
cache[name].qos = -1;
}else{
rc.get('sf/sync/'+s, function(err, res){
if(err){
rc.hset("sf/"+name, 'qos', '-1');
cache[name].qos = -1;
}else{
rc.hset("sf/"+name, 'qos', (new Date()).valueOf() - res);
cache[name].qos = (new Date()).valueOf() - res;
}
if(tools.obj.getNum(cache[name], null) == 1){
cache[name].EndTime = (new Date()).valueOf();
rc.hset('sf/'+name, 'EndTime', cache[name].EndTime);
rc.publish('sf/chnnel/'+name, 'ok');
if(o.debug) console.log(cache[name]);
if(o.debug && false){
rc.hkeys("sf/"+name, function (err, replies) {
console.log(replies.length + " replies:");
replies.forEach(function (reply, i) {
rc.hget("sf/"+name, reply, function(err, res){
console.log(reply+': '+res);
});
});
});
}
f(cache[name]);
tools.obj.reset(cache[name]);
}
});
}
}
else if(cache[name].hasOwnProperty(t)) cache[name][t] = s;
if(tools.obj.getNum(obj, null) == 1){
obj.EndTime = (new Date()).valueOf();
rc.hset('sf/'+name, 'EndTime', obj.EndTime);
if(tools.obj.getNum(cache[name], null) == 1){
cache[name].EndTime = (new Date()).valueOf();
rc.hset('sf/'+name, 'EndTime', cache[name].EndTime);
rc.publish('sf/chnnel/'+name, 'ok');
console.log(obj);
if(o.debug){
if(o.debug) console.log(cache[name]);
if(o.debug && false){
rc.hkeys("sf/"+name, function (err, replies) {
console.log(replies.length + " replies:");
replies.forEach(function (reply, i) {
@ -161,7 +270,8 @@ var mqtt_server = function (o_params) {
});
});
}
f(obj);
f(cache[name]);
tools.obj.reset(cache[name]);
}
},
},
@ -189,9 +299,44 @@ var mqtt_server = function (o_params) {
node:{
push: function(id, obj){
var addSql = 'INSERT INTO node(timestamp,id,status,qos,batterylevel,waterswitch,temperature,humidity) VALUES(?,?,?,?,?,?,?,?)';
var addSqlParams = [new Date(obj.BeginTime).toISOString().slice(0, 19).replace('T', ' '), id, obj.status, 1, obj.batteryLevel, obj.waterSwitch, obj.temperature, obj.humidity];
var addSqlParams = [new Date(obj.BeginTime).toISOString().slice(0, 19).replace('T', ' '), id, obj.status, obj.qos, obj.batteryLevel, obj.waterSwitch, obj.temperature, obj.humidity];
sqlCnt.query(addSql,addSqlParams,function (err, result) {
if(err){
console.log('[INSERT ERROR] - ',err.message);
return;
}
});
},
},
station:{
push: function(obj){
var addSql = 'INSERT INTO station(timestamp,status,qos,batterylevel,light,temperature,humidity,rainfall,co,nh3,airpressure) VALUES(?,?,?,?,?,?,?,?,?,?,?)';
var addSqlParams = [new Date(obj.BeginTime).toISOString().slice(0, 19).replace('T', ' '), obj.status, obj.qos, obj.batteryLevel, obj.light, obj.temperature, obj.humidity, obj.rainfall, obj.CO, obj.NH3, obj.airPressure];
sqlCnt.query(addSql,addSqlParams,function (err, result) {
if(err){
console.log('[INSERT ERROR] - ',err.message);
return;
}
});
},
},
waterSys:{
push: function(obj){
var addSql = 'INSERT INTO watersys(timestamp,status,qos,pump0,pump1) VALUES(?,?,?,?,?)';
var addSqlParams = [new Date(obj.BeginTime).toISOString().slice(0, 19).replace('T', ' '), obj.status, obj.qos, obj.pump0, obj.pump1];
sqlCnt.query(addSql,addSqlParams,function (err, result) {
if(err){
console.log('[INSERT ERROR] - ',err.message);
return;
}
});
},
},
mqttLog:{
push: function(obj){
var addSql = 'INSERT INTO mqtt_log(timestamp,client,event,delay) VALUES(?,?,?,?)';
var addSqlParams = [(new Date()).toISOString().slice(0, 19).replace('T', ' '), obj.client, obj.event, obj.delay];
sqlCnt.query(addSql,addSqlParams,function (err, result) {
tools.obj.reset(obj);
if(err){
console.log('[INSERT ERROR] - ',err.message);
return;
@ -201,6 +346,65 @@ var mqtt_server = function (o_params) {
},
},
}
/* timer */
timer = function(){
return setInterval(function(){
var key = Math.floor(Math.random()*100);
rc.set('sf/sync/'+key, (new Date()).valueOf());
mqtt_broker.publish({ topic:'qos/sync', payload: key.toString()});
}, o.intervalTime);
}();
setInterval(function(){
rc.hkeys('sf/LastConnectTime', function(err, keys){
if(!err){
keys.forEach(function(key, i){
rc.hget('sf/clientStatus', key, function(err3, clientStatus){
if(!err3){
if(clientStatus != 'disconnect'){
rc.hget('sf/LastConnectTime', key, function(err2, val){
if(!err2){
if((new Date()).valueOf() - val > o.MaxWaitTime + o.MaxTryTimes * o.intervalTime){
rc.hset('sf/clientStatus', key, 'disconnect');
}
}
});
}
}
});
});
}
});
rc.hkeys('sf/clientStatus', function(err, keys){
if(!err){
keys.forEach(function(key, i){
rc.hget('sf/clientStatus', key, function(err2, val){
if(!err2){
if(val != fStatus[key]){
rc.hget('sf/'+key, 'qos', function(err3, qos){
if(!err3){
tools.sql.mqttLog.push({
client: key,
event: val,
delay: qos
});
fStatus[key] = val;
console.log((new Date((new Date()).getTime())).toLocaleString()+' - '+key+': '+val);
}
});
}
}
});
});
}
})
}, o.CheckMinTime);
return o;
};

Loading…
Cancel
Save