Compare commits

...

2 Commits

Author SHA1 Message Date
IoTcat 197d16cc13 rm_data_floder 3 years ago
IoTcat 90ceb57392 pwm_adc_operate 3 years ago
  1. 32
      examples/bus_test.js
  2. 30
      examples/hource_race_led.js
  3. 29
      examples/operate.js
  4. 36
      examples/pwm_operate_adc.js
  5. 230
      src/cli/compiler.js
  6. 109
      src/cli/node.js
  7. 1
      src/director/.gitignore
  8. 10
      src/director/modules/nodetable.js
  9. BIN
      src/drivers/nodemcu/bin/full.bin
  10. BIN
      src/drivers/nodemcu/bin/min.bin
  11. 5
      src/drivers/nodemcu/lua/init.lua
  12. BIN
      src/drivers/nodemcu/lua/lfs.img
  13. BIN
      src/drivers/nodemcu/lua/lfs.zip
  14. 72
      src/drivers/nodemcu/lua/lfs/redis.lua
  15. 7
      src/drivers/nodemcu/lua/lfs/wiot.lua

@ -0,0 +1,32 @@
//import wiot compiler
const wiot = require('../../src/cli/compiler.js');
//node ID array
let nids = ['7534c', 'b840a', '1d17a', '9f163', '8908c', '39747', '308b4', 'd689e', '30e3f'];
//node object array constructed from the above node IDs
let nodes = nids.map(nid=>new wiot.node.nodemcu(nid));
//create a virtual wire
let w_all = new wiot.wire();
//for each node in the nodes array, do
nodes.forEach((node, index) => {
//if(!index){//if is the first node (index=0, nid='7534c'), do
//create a gpio input listener on D3 pin and connect to the virtual wire w
//D3 pin is connected to an on-board button on NodeMCU
//}else{//if is not the first node (index!=0), do
//create another virtual wire
let w = new wiot.wire();
wiot.buffer(w_all/*output wire*/, w/*input wire*/, node);
wiot.gpio(wiot.INPUT, node.D3, w/*output wire*/, node);
//implement a buffer module so only when signal in wire has changed and held
//for 0.2*index seconds, the w_delay will be updated with the value in the w
//create a gpio output module on D4 pin, controlled by the w_delayed virtual wire
//D4 is connected to an on-board LED on NodeMCU
wiot.gpio(wiot.OUTPUT, node.D4, w_all, node);
//}
});

@ -0,0 +1,30 @@
//import wiot compiler
const wiot = require('../../src/cli/compiler.js');
//node ID array
let nids = ['7534c', 'b840a', '1d17a', '9f163', '8908c', '39747', '308b4', 'd689e', '30e3f'];
//node object array constructed from the above node IDs
let nodes = nids.map(nid=>new wiot.node.nodemcu(nid));
//create a virtual wire
let w = new wiot.wire();
//for each node in the nodes array, do
nodes.forEach((node, index) => {
if(!index){//if is the first node (index=0, nid='7534c'), do
//create a gpio input listener on D3 pin and connect to the virtual wire w
//D3 pin is connected to an on-board button on NodeMCU
wiot.gpio(wiot.INPUT, node.D3, w/*output wire*/, node);
}else{//if is not the first node (index!=0), do
//create another virtual wire
let w_delayed = new wiot.wire();
//implement a buffer module so only when signal in wire has changed and held
//for 0.2*index seconds, the w_delay will be updated with the value in the w
wiot.buffer(w_delayed/*output wire*/, w/*input wire*/, node, .2*index/*delay*/);
//create a gpio output module on D4 pin, controlled by the w_delayed virtual wire
//D4 is connected to an on-board LED on NodeMCU
wiot.gpio(wiot.OUTPUT, node.D4, w_delayed, node);
}
});

@ -0,0 +1,29 @@
//import wiot compiler
const wiot = require('../../src/cli/compiler.js');
//node ID array
let nids = ['7534c', 'b840a', '1d17a', '9f163', '8908c',
'39747', '308b4', 'd689e', '30e3f', '0d672'];
//node object array constructed from the above node IDs
let nodes = nids.map(nid=>new wiot.node.nodemcu(nid));
//create a virtual wire
let w = new wiot.wire(),
w1 = new wiot.wire(),
w2 = new wiot.wire();
let node = nodes[0];
let node1 = nodes[1];
wiot.gpio(wiot.INPUT, node.D3, w1, node);
wiot.gpio(wiot.INPUT, node1.D3, w2, node1);
//or ($0+$1)%2+math.floor(($0+$1)/2)
//and math.floor(($0+$1)/2)
//xor ($0+$1)%2
//not ($0+1)%2
wiot.operate('(($0+$1)%2+1)%2', node, w, w1, w2);
wiot.gpio(wiot.OUTPUT, node.D4, w, node);
//wiot.pwm(nodes[0].D4, new wiot.wire(800), new wiot.wire(500), nodes[0]);

@ -0,0 +1,36 @@
//import wiot compiler
const wiot = require('../../src/cli/compiler.js');
//node ID array
let nids = ['7534c', 'b840a', '1d17a', '9f163', '8908c',
'39747', '308b4', 'd689e', '30e3f', '0d672'];
//node object array constructed from the above node IDs
let nodes = nids.map(nid=>new wiot.node.nodemcu(nid));
//create a virtual wire
let w = new wiot.wire(0),
w1 = new wiot.wire(),
w2 = new wiot.wire();
let node = nodes[0];
let node1 = nodes[1];
/* wiot.gpio(wiot.INPUT, node.D3, w1, node);
wiot.gpio(wiot.INPUT, node1.D3, w2, node1);
//or ($0+$1)%2+math.floor(($0+$1)/2)
//and math.floor(($0+$1)/2)
//xor ($0+$1)%2
//not ($0+1)%2
wiot.operate('(($0+$1)%2+1)%2', node, w, w1, w2);
wiot.gpio(wiot.OUTPUT, node.D4, w, node);
*/
//wiot.gpio(wiot.INPUT, node1.D3, w2, node1);
wiot.adc(w1, node);
//wiot.gpio(wiot.OUTPUT, node.D4, w, node);
//wiot.operate('math.floor($0/600)', node, w, w1);
wiot.operate('($0/2)+300', node, w, w1);
wiot.pwm(nodes[0].D4, w, new wiot.wire(500), nodes[0]);

@ -22,11 +22,12 @@ const Reg = (() => {
return () => {
let div = ++cnt,
rem = 0;
let s = '';
let s = '_';
while(div > 0){
rem = Math.floor(div%26);
div = parseInt(div/26);
s += String.fromCharCode(rem + 97);
rem = Math.floor(div%52);
div = parseInt(div/52);
if(rem<26) s += String.fromCharCode(rem + 97);
else s += String.fromCharCode(rem-26 + 65);
}
return s;
}
@ -36,37 +37,43 @@ const sync = sync => {
Object.keys(wiot.__).forEach(nid => {
let s = '';
let seg = wiot.__[nid];
seg.head = uniqueArr(seg.head);
seg.body = uniqueArr(seg.body);
seg.loop = uniqueArr(seg.loop);
seg.footer = uniqueArr(seg.footer);
seg.head.forEach(item => s += item);
Object.keys(seg.regs).forEach(item =>{
seg.regs[item].trigger = uniqueArr(seg.regs[item].trigger);
if(seg.regs[item].trigger.length) s += `${item},F${item}=${seg.regs[item].default},${seg.regs[item].default};`
else s += `${item}=${seg.regs[item].default};`
let node = wiot.__[nid];
node.unique();
node.head.forEach(item => s += item);
Object.keys(node.regs).forEach(item =>{
let default_val = node.regs[item].default;
if(node.regs[item].isPersist){
default_val = `db.get('${item}')`;
s += `${item}=${default_val};if ${item}==nil then ${item}=${node.regs[item].default};end;`
if(node.regs[item].trigger.length) s += `F${item}=${item};`
}else{
s += `${item}=${default_val};`
if(node.regs[item].trigger.length) s += `F${item}=${item};`
}
});
seg.body.forEach(item=> s += item);
node.body.forEach(item=> s += item);
s += 'tmr.create():alarm('+wiot.CLOCK_INTERVAL+',tmr.ALARM_AUTO,function()';
Object.keys(seg.regs).forEach(item =>{
if(seg.regs[item].trigger.length){
Object.keys(node.regs).forEach(item =>{
if(node.regs[item].trigger.length){
s += `if not(F${item}==${item})then `
seg.regs[item].trigger.forEach(funcs=>{
node.regs[item].trigger.forEach(funcs=>{
s += `${funcs}`;
});
if(node.regs[item].isPersist){
s += `db.set('${item}',${item})`;
}
s += `end;`;
s += `F${item}=${item};`
}
});
seg.loop.forEach(item => s += item);
node.loop.forEach(item => s += item);
s += 'end);';
seg.footer.forEach(item => s += item);
node.footer.forEach(item => s += item);
fs.writeFileSync(config.root+'.wiot/compiled_files/'+nid, s);
@ -78,7 +85,7 @@ const sync = sync => {
const wiot = {
INPUT: 'gpio.INPUT',
OUTPUT: 'gpio.OUTPUT',
CLOCK_INTERVAL: 30,
CLOCK_INTERVAL: 100,
__: {
},
@ -133,36 +140,101 @@ const wiot = {
return wire;
},
channel: (wire) => {
if(wire.input.length == 1 && wire.output.length == 1 && wire.input[0] == wire.output[0]) return;
wire.output.forEach(udp_output => {
let node_output = udp_output.node;
node_output.MSGonSend(wire.reg, `${wire.reg}=body;`);
});
/*
let iANDo = wire.input.filter(function(v){ return wire.output.indexOf(v) > -1 });
nodes_output = wire.output.map(udp=>udp.node);
nodes_output = uniqueArr(nodes_output);
nodes_input = wire.input.map(udp=>udp.node);
nodes_input = uniqueArr(nodes_input);
iANDo.forEach(udp=>{
if(nodes_input.length == 1 && nodes_output.length == 1 && nodes_input[0] == nodes_output[0]) return;
if(nodes_input.length == 1 && nodes_output.length == 0) return;
if(nodes_input.length == 0 && nodes_output.length == 1) return;
nodes_output.forEach(node_output => {
node_output.footer.push(node_output.MSGonSend(wire.reg, `${wire.reg}=body;FR${wire.reg}=from;`));
});
});
//console.log(nodes_input.map(d=>d.regs[wire.reg].trigger))
*/
wire.input.forEach(udp_input => {
let node_input = udp_input.node;
wire.output.forEach(udp_output => {
let node_output = udp_output.node;
if(node_input == node_output) return;
nodes_input.forEach(node_input => {
node_input.unique();
nodes_output.forEach(node_output => {
//if(node_input == node_output) return;
node_output.unique();
let arr = node_input.regs[wire.reg].trigger;
let s2 = node_input.MSGSend(node_output.nid, wire.reg, wire.reg);
node_input.trigger(wire, s2);
//console.log(arr)
//console.log(s2)
if(arr.indexOf(s2) !== -1) arr.splice(arr.indexOf(s2), 1);;
s2 = `if not(FR${wire.reg}=='${node_output.nid}')then ${node_input.MSGSend(node_output.nid, wire.reg, wire.reg)}end;`;
if(arr.indexOf(s2) !== -1) arr.splice(arr.indexOf(s2), 1);
s2 = `FR${wire.reg}=nil;`;
if(arr.indexOf(s2) !== -1) arr.splice(arr.indexOf(s2), 1);
//console.log(arr)
})
});
let io = nodes_input.filter(function(v){ return nodes_output.indexOf(v) > -1 });
let i = nodes_input.filter(v=>io.indexOf(v) == -1);
let o = nodes_output.filter(v=>io.indexOf(v) == -1);
//console.log(nodes_input.map(d=>d.regs[wire.reg].trigger))
if(!io.length){
if(!o.length) return;
let pnt = 0;
i.forEach(node => {
let s2 = node.MSGSend(o[pnt%o.length].nid, wire.reg, wire.reg);
node.trigger(wire, s2);
pnt++;
});
}else{
let pnt = 0;
i.forEach(node => {
let s2 = node.MSGSend(io[pnt%io.length].nid, wire.reg, wire.reg);
node.trigger(wire, s2);
pnt++;
});
pnt = 0;
o.forEach(node=>{
let s2 = io[pnt%io.length].MSGSend(node.nid, wire.reg, wire.reg);
io[pnt%io.length].trigger(wire, s2);
pnt++;
});
pnt = 0;
io.forEach((node,index)=>{
if(index){
let s2 = node.MSGSend(io[index-1].nid, wire.reg, wire.reg);
s2 = `if not(FR${wire.reg}=='${io[index-1].nid}')then ${s2}end;`
node.trigger(wire, s2);
}
if(index != io.length-1){
let s2 = node.MSGSend(io[index+1].nid, wire.reg, wire.reg);
s2 = `if not(FR${wire.reg}=='${io[index+1].nid}')then ${s2}end;`
node.trigger(wire, s2);
}
node.trigger(wire, `FR${wire.reg}=nil;`);
});
}
//console.log(nodes_input.map(d=>d.regs[wire.reg].trigger))
}
},
__udpMethod: (node) => ({
trigger: function(wire, cmd){
node.regs[wire.reg].trigger.push(cmd);
},
unique: function(){
node.head = uniqueArr(node.head);
node.body = uniqueArr(node.body);
node.loop = uniqueArr(node.loop);
node.footer = uniqueArr(node.footer);
Object.keys(node.regs).forEach(item =>{
node.regs[item].trigger = uniqueArr(node.regs[item].trigger);
});
},
setReg: function(reg, default_val = 0, isPersist = false){
if(Object.keys(node.regs).indexOf(reg) == -1) {
node.regs[reg] = {
@ -175,7 +247,7 @@ const wiot = {
}
},
MSGonSend: function(name, cmd, bodyMark = 'body', fromMark = 'from'){
node.footer.push(`msg.onSend('${name}',function(${fromMark},${bodyMark})${cmd}end);`);
return `msg.onSend('${name}',function(${fromMark},${bodyMark})${cmd}end);`;
},
MSGSend: function(to, name, body, proxy = false){
return `msg.send('${to}','${name}',${body}${(proxy?',true':'')});`;
@ -185,6 +257,9 @@ const wiot = {
},
init: function(cmd){
node.head.push(cmd);
},
prepare: function(cmd){
node.body.push(cmd);
}
}),
__systemMethod: {
@ -195,13 +270,13 @@ const wiot = {
let o =getObj(...arguments);
o.type = type;
o.__hash = md5(Math.random());
if(o.input){
o.input.output.push(o);
o.input.generate();
if(o.input && o.input.length){
o.input.forEach(wire=>wire.output.push(o));
o.input.forEach(wire=>wire.generate());
}
if(o.output){
o.output.input.push(o);
o.output.generate();
if(o.output && o.output.length){
o.output.forEach(wire=>wire.input.push(o));
o.output.forEach(wire=>wire.generate());
}
genCode(o, wiot.__systemMethod);
@ -220,41 +295,82 @@ wiot.wire = wiot.__systemPrimitive.wire;
/* pre-installed udp defination */
wiot.newUDP('gpio', (mode, pin, wire, node, default_output = 1) => ({
wiot.newUDP('gpio', (mode, pin, wire, node) => ({
node: node,
pin: pin,
mode: mode,
[(mode==wiot.INPUT)?'output':'input']: wire,
default_output: default_output
[(mode==wiot.INPUT)?'output':'input']: [wire]
}), (o, method) => {
node = o.node;
node.init(`gpio.mode(${o.pin},${o.mode});`);
if(o.mode == wiot.INPUT){
node.always(`${o.output.reg}=gpio.read(${o.pin});`);
node.always(`${o.output[0].reg}=gpio.read(${o.pin});`);
}
if(o.mode == wiot.OUTPUT){
node.setReg(o.input.reg, o.default_output);
node.trigger(o.input, `gpio.write(${o.pin},${o.input.reg});`);
node.prepare(`gpio.write(${o.pin},${o.input[0].reg}%2);`);
node.trigger(o.input[0], `gpio.write(${o.pin},${o.input[0].reg}%2);`);
}
});
wiot.newUDP('buffer', (wire_input, wire_output, node, delay_s = 0) => ({
wiot.newUDP('buffer', (wire_output, wire_input, node, delay_s = 0) => ({
node: node,
delay_s: delay_s,
input: wire_input,
output: wire_output
input: [wire_input],
output: [wire_output]
}), (o, method) => {
let node = o.node;
let cnt = method.Reg();
node.setReg(cnt, 0);
node.trigger(o.input, `${cnt}=${(Math.round(o.delay_s*1000/wiot.CLOCK_INTERVAL)||1)+1};`);
node.trigger(o.input[0], `${cnt}=${(Math.round(o.delay_s*1000/wiot.CLOCK_INTERVAL)||1)+1};`);
node.always(`if not(${cnt}==0)then ${cnt}=${cnt}-1;end;`);
node.always(`if ${cnt}==1 then ${o.output.reg}=${o.input.reg};end;`);
node.always(`if ${cnt}==1 then ${o.output[0].reg}=${o.input[0].reg};end;`);
});
wiot.newUDP('operate', (expression, node, wire_output, ...wires_input) => ({
node: node,
expression: expression,
input: wires_input,
output: [wire_output]
}), (o, method) => {
let node = o.node;
let s = o.expression;
o.input.forEach((wire, index) => {
s = s.replace(new RegExp('\\$'+index, 'g'), wire.reg);
});
let func = method.Reg();
node.setReg(func);
node.prepare(`${func}=function()${o.output[0].reg}=${s};end;`);
o.input.forEach(wire => {
node.trigger(wire, `${func}();`);
});
});
wiot.newUDP('pwm', (pin, wire_duty, wire_clock, node) => ({
node: node,
pin: pin,
input: [wire_duty, wire_clock]
}), (o, method) => {
let node = o.node;
node.prepare(`pwm.setup(${o.pin},${o.input[1].reg},${o.input[0].reg});`);
node.prepare(`pwm.setduty(${o.pin},${o.input[0].reg});`);
node.trigger(o.input[0], `pwm.setduty(${o.pin},${o.input[0].reg});`);
node.trigger(o.input[1], `pwm.setclock(${o.pin},${o.input[1].reg});`);
});
wiot.newUDP('adc', (wire_output, node) => ({
node: node,
pin: 0,
output: [wire_output]
}), (o, method) => {
let node = o.node;
node.always(`${o.output[0].reg}=adc.read(${o.pin});`);
});
module.exports = wiot;

@ -20,6 +20,20 @@ module.exports = (yargs) => {
let nid = o.nidGen();
if(argv.nid){
nid = nidMatch(argv.nid);
if(!nid || !nid.length){
error('No node selected! \nPlease use "'+argv.$0+' ls" to check the nid.');
return;
}
if(nid.length > 1){
error('Found multiple nodes. Which one do you want?');
return;
}
nid = nid[0];
}
if(!config.wifi[argv.wifiIndex]){
error('Invalid WiFi index '+argv.wifiIndex+'!\nSee "'+argv.$0+' wifi ls" to find a wifi index.');
return;
@ -29,23 +43,23 @@ module.exports = (yargs) => {
await reset(argv.port);
ban.info('Preparing '+nid+'...10%');
if(argv.config){
if(!argv.nid){
Object.keys(config.nodes).forEach(nid => {
if(config.nodes[nid].nickname == argv.nickname){
error('Nickname "'+argv.nickname+'"" has already been used by '+nid);
throw null;
}
})
Object.keys(config.nodes).forEach(nid => {
if(config.nodes[nid].nickname == argv.nickname){
error('Nickname "'+argv.nickname+'"" has already been used by '+nid);
throw null;
config.nodes[nid] = {
nickname: argv.nickname,
msgport: argv.msgport,
wifiIndex: argv.wifiIndex
}
})
config.nodes[nid] = {
nickname: argv.nickname,
msgport: argv.msgport,
wifiIndex: argv.wifiIndex
let raw_config = JSON.parse(fs.readFileSync(config.root + 'config.json', 'utf-8'));
raw_config.nodes = config.nodes;
fs.writeFileSync(config.root + 'config.json', JSON.stringify(raw_config, null, 2), 'utf-8');
}
let raw_config = JSON.parse(fs.readFileSync(config.root + 'config.json', 'utf-8'));
raw_config.nodes = config.nodes;
fs.writeFileSync(config.root + 'config.json', JSON.stringify(raw_config, null, 2), 'utf-8');
let config_path = config.root + '.wiot/cache/config.json';
let config_obj = {
nid: nid,
@ -95,7 +109,7 @@ module.exports = (yargs) => {
const reset = require(__dirname + '/modules/reset.js');
const error = require(__dirname + '/modules/error.js');
const Table = require('cli-table');
const nidMatch = require(__dirname + '/modules/nidMatch.js');
yargs = yargs
@ -173,6 +187,10 @@ module.exports = (yargs) => {
type: 'boolean',
describe: 'update config.json'
})
.option('nid', {
type: 'string',
describe: 'update existed node firmware'
})
.example([
['$0 node init COM3 -n firstnode', 'flash and prepare the NodeMCU device on COM3 and name it as "firstnode"']
])
@ -249,6 +267,10 @@ module.exports = (yargs) => {
type: 'boolean',
describe: 'update config.json'
})
.option('nid', {
type: 'string',
describe: 'update existed node firmware'
})
.example([
['$0 node prepare COM3 -n firstnode', 'prepare the NodeMCU device on COM3 and name it as "firstnode"']
])
@ -266,6 +288,63 @@ module.exports = (yargs) => {
await o.check(argv);
await o.terminal(argv);
})
.command('update <port> [nid]', "update wIoT system environment on the NodeMCU", yargs => {
let config = require(__dirname + '/modules/getConfig.js')();
if(!config) return;
return yargs
.option('wifiIndex', {
alias: 'w',
default: 0,
type: 'number',
describe: 'determine which wifi config to use'
})
.option('msgport', {
alias: 'm',
default: 6789,
type: 'number',
describe: 'udp port on NodeMCU for MSG communication'
})
.option('heartbeat', {
default: 10000,
type: 'number',
describe: 'heartbeat interval'
})
.option('directorHost', {
default: require('url').parse(config.director).hostname,
type: 'string',
describe: 'director hostname'
})
.option('directorPort', {
default: 6789,
type: 'number',
describe: 'director MSG port'
})
.option('lua', {
alias: 'l',
default: true,
type: 'boolean',
describe: 'update init.lua'
})
.option('img', {
alias: 'i',
default: true,
type: 'boolean',
describe: 'update lfs.img'
})
.option('config', {
alias: 'c',
default: true,
type: 'boolean',
describe: 'update config.json'
})
.example([
['$0 node prepare COM3 -n firstnode', 'prepare the NodeMCU device on COM3 and name it as "firstnode"']
])
}, async argv => {
if(!argv.nid) argv.config = false;
await o.check(argv);
await o.upload(argv);
})
}, async argv => {

@ -0,0 +1 @@
data/

@ -162,8 +162,10 @@ function isJson(str) {
if(data.to != 'director'){
if(nidtable.hasOwnProperty(data.to)){
nidtable[data.to].socket.write(JSON.stringify(data));
flow.log('[FORWARD]', data.to+'<--'+data.from, data.name, data.body);
try{
nidtable[data.to].socket.write(JSON.stringify(data));
flow.log('[FORWARD]', data.to+'<--'+data.from, data.name, data.body);
}catch(e){console.error(e)}
}else{
flow.error('[FORWARD]', '<nid lookup failure>', data.to+'<--'+data.from, data.name, data.body);
}
@ -189,6 +191,7 @@ function isJson(str) {
socket.setTimeout(60 * 1000);
socket.on('timeout', () => {
access.info(nid, socket.remoteAddress+':'+socket.remotePort, '[TIMEOUT]', 'After', socket.timeout, 'ms');
try{
nidtable[nid].socket = null;
nidtable[nid].ip = null;
nidtable[nid].port = null;
@ -197,6 +200,9 @@ function isJson(str) {
cbArr.disconnect.forEach(cb => {
cb(nid, nidtable[nid]);
});
}catch(e){
delete nidtable[nid];
}
socket.destroy();
});

Binary file not shown.

Binary file not shown.

@ -4,4 +4,9 @@ elseif file.exists('lfs.img') then
file.rename('lfs.img', '_lfs.img');
node.LFS.reload('_lfs.img');
end
if adc.force_init_mode(adc.INIT_ADC)
then
node.restart()
return
end
node.LFS.wiot()

Binary file not shown.

Binary file not shown.

@ -0,0 +1,72 @@
local M
do
-- const
local REDIS_PORT = 6379
-- cache
local pairs, tonumber = pairs, tonumber
--
local publish = function(self, chn, s)
self._fd:send(("*3\r\n$7\r\npublish\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n"):format(
#chn, chn, #s, s
))
-- TODO: confirmation? then queue of answers needed
end
local subscribe = function(self, chn, handler)
-- TODO: subscription to all channels, with single handler
self._fd:send(("*2\r\n$9\r\nsubscribe\r\n$%d\r\n%s\r\n"):format(
#chn, chn
))
self._handlers[chn] = handler
-- TODO: confirmation? then queue of answers needed
end
local unsubscribe = function(self, chn)
self._handlers[chn] = false
end
-- NB: pity we can not just augment what net.createConnection returns
local close = function(self)
self._fd:close()
end
local connect = function(host, port)
local _fd = net.createConnection(net.TCP, 0)
local self = {
_fd = _fd,
_handlers = { },
-- TODO: consider metatables?
close = close,
publish = publish,
subscribe = subscribe,
unsubscribe = unsubscribe,
}
_fd:on("connection", function()
--print("+FD")
end)
_fd:on("disconnection", function()
-- FIXME: this suddenly occurs. timeout?
--print("-FD")
end)
_fd:on("receive", function(fd, s) --luacheck: no unused
--print("IN", s)
-- TODO: subscription to all channels
-- lookup message pattern to determine channel and payload
-- NB: pairs() iteration gives no fixed order!
for chn, handler in pairs(self._handlers) do
local p = ("*3\r\n$7\r\nmessage\r\n$%d\r\n%s\r\n$"):format(#chn, chn)
if s:find(p, 1, true) then
-- extract and check message length
-- NB: only the first TCP packet considered!
local _, start, len = s:find("(%d-)\r\n", #p)
if start and tonumber(len) == #s - start - 2 and handler then
handler(chn, s:sub(start + 1, -2)) -- ends with \r\n
end
end
end
end)
_fd:connect(port or REDIS_PORT, host)
return self
end
-- expose
M = {
connect = connect,
}
end
return M

@ -585,6 +585,7 @@ __main = coroutine.create(function(__run)
ns[k] = v;
end
end
db.clear();
--restart the system to run new Func
node.restart();
end
@ -596,6 +597,12 @@ __main = coroutine.create(function(__run)
node.restart();
end
end
--restart API
msgReg['__reset'] = function(from, body)
if from == 'director' then
node.restart();
end
end
--For Debug Purpose
--collectgarbage("collect")

Loading…
Cancel
Save