improved net

This commit is contained in:
Djeeberjr 2022-02-22 02:28:53 +01:00
parent 418952bd25
commit 33731778ea

View File

@ -8,7 +8,6 @@ import kernel.KernelEvents;
import haxe.Exception; import haxe.Exception;
import util.Promise; import util.Promise;
import kernel.Timer; import kernel.Timer;
import util.EventBus;
import cc.OS; import cc.OS;
using Lambda; using Lambda;
@ -20,8 +19,16 @@ using util.Extender.LambdaExtender;
**/ **/
class Net { class Net {
public static var instance:Net; public static var instance:Net;
public static inline final BRODCAST_PORT:Int = 65533;
public static inline final MESSAGE_TIMEOUT:Int = 3;
public final onNewNeigbor: Signal<Int>; public final onNewNeigbor: Signal<Int>;
private final networkID:Int = OS.getComputerID();
private var responseBus:Map<Int,Callback<Package>> = new Map();
private var protoHandlers:Map<String, Callback<Package>> = new Map();
private var allModems:Array<kernel.peripherals.Modem>;
private var routingTable:Map<Int, kernel.peripherals.Modem> = new Map();
private final onNewNeigborTrigger: SignalTrigger<Int> = Signal.trigger(); private final onNewNeigborTrigger: SignalTrigger<Int> = Signal.trigger();
@:allow(kernel.Init) @:allow(kernel.Init)
@ -40,19 +47,11 @@ class Net {
}); });
allModems = Peripheral.instance.getModems(); allModems = Peripheral.instance.getModems();
open(); open();
discoverNeighbors(); discoverNeighbors();
} }
public static inline final BRODCAST_PORT:Int = 65533;
public static inline final MESSAGE_TIMEOUT:Int = 3;
private var networkID:Int = OS.getComputerID();
private var responseBus:util.EventBus<Package> = new EventBus();
private var protoHandlers:Map<String, Package->Void> = new Map();
private var allModems:Array<kernel.peripherals.Modem>;
private var routingTable:Map<Int, kernel.peripherals.Modem> = new Map();
private function handelIncomming(pack:Package, ?addr:String) { private function handelIncomming(pack:Package, ?addr:String) {
switch pack.type { switch pack.type {
case Data(_): case Data(_):
@ -60,7 +59,9 @@ class Net {
case DataNoResponse(_): case DataNoResponse(_):
routeTo(pack); routeTo(pack);
case Response | RouteDiscoverResponse: case Response | RouteDiscoverResponse:
responseBus.emit(Std.string(pack.msgID), pack); if (responseBus.exists(pack.msgID)){
responseBus[pack.msgID].invoke(pack);
}
case RouteDiscover: case RouteDiscover:
handleRoute(pack, addr); handleRoute(pack, addr);
} }
@ -83,12 +84,13 @@ class Net {
var pack = newRoutPackage(); var pack = newRoutPackage();
var timeout:Timer = null; var timeout:Timer = null;
var responeListner = responseBus.on(Std.string(pack.msgID), pack -> {
addRoute(pack.fromID, modem.addr); responseBus[pack.msgID] = ((response:Package)->{
addRoute(response.fromID, modem.addr);
}); });
timeout = new Timer(MESSAGE_TIMEOUT, () -> { timeout = new Timer(MESSAGE_TIMEOUT, () -> {
responseBus.removeListner(responeListner); responseBus.remove(pack.msgID);
}); });
modem.transmit(BRODCAST_PORT, OS.getComputerID(), pack); modem.transmit(BRODCAST_PORT, OS.getComputerID(), pack);
@ -114,8 +116,8 @@ class Net {
private function addRoute(toID:Int, addr:String) { private function addRoute(toID:Int, addr:String) {
Log.debug("Added new route to " + toID + " via " + addr); Log.debug("Added new route to " + toID + " via " + addr);
this.onNewNeigborTrigger.trigger(toID);
routingTable.set(toID, allModems.find(item -> item.addr == addr)); routingTable.set(toID, allModems.find(item -> item.addr == addr));
this.onNewNeigborTrigger.trigger(toID);
} }
public function getAllNeighbors():Array<Int> { public function getAllNeighbors():Array<Int> {
@ -176,7 +178,7 @@ class Net {
return; return;
} }
protoHandlers[proto](pack); protoHandlers[proto].invoke(pack);
} }
private function sendRaw(pack:Package) { private function sendRaw(pack:Package) {
@ -204,15 +206,16 @@ class Net {
} }
var timeout:Timer = null; var timeout:Timer = null;
var responeListner = responseBus.once(Std.string(pack.msgID), p -> {
resolve(p); responseBus[pack.msgID] = ((reponse: Package) ->{
resolve(reponse);
if (timeout != null) { if (timeout != null) {
timeout.cancle(); timeout.cancle();
} }
}); });
timeout = new Timer(MESSAGE_TIMEOUT, () -> { timeout = new Timer(MESSAGE_TIMEOUT, () -> {
responseBus.removeListner(responeListner); responseBus.remove(pack.msgID);
reject(new Exception("Timeout")); reject(new Exception("Timeout"));
}); });
@ -220,17 +223,21 @@ class Net {
}); });
} }
public function registerProto(proto:String, cb:Package->Void) { public function registerProto(proto:String, callback:Callback<Package>) {
if (protoHandlers.exists(proto)) { if (protoHandlers.exists(proto)) {
// Failed. Handler already exist. // Failed. Handler already exist.
// TODO: return error // TODO: return error
return; return;
} }
protoHandlers[proto] = cb; protoHandlers[proto] = callback;
} }
public function removeProto(proto:String) { public function removeProto(proto:String) {
protoHandlers.remove(proto); protoHandlers.remove(proto);
} }
public function getNeigbors():Array<Int> {
return this.routingTable.mapi((index, item) -> {return index;});
}
} }