improved route discovery

This commit is contained in:
Djeeberjr 2022-02-24 02:05:43 +01:00
parent 33731778ea
commit e695fc9b6d
2 changed files with 73 additions and 43 deletions

View File

@ -2,11 +2,11 @@ package kernel.net;
using tink.CoreApi; using tink.CoreApi;
import kernel.net.Package.NetworkID;
import kernel.peripherals.Peripherals.Peripheral; import kernel.peripherals.Peripherals.Peripheral;
import kernel.Log; import kernel.Log;
import kernel.KernelEvents; import kernel.KernelEvents;
import haxe.Exception; import haxe.Exception;
import util.Promise;
import kernel.Timer; import kernel.Timer;
import cc.OS; import cc.OS;
@ -22,14 +22,14 @@ class Net {
public static inline final BRODCAST_PORT:Int = 65533; public static inline final BRODCAST_PORT:Int = 65533;
public static inline final MESSAGE_TIMEOUT:Int = 3; public static inline final MESSAGE_TIMEOUT:Int = 3;
public final onNewNeigbor: Signal<Int>; public final onNewNeigbor:Signal<Int>;
private final networkID:Int = OS.getComputerID(); public final networkID:NetworkID = OS.getComputerID();
private var responseBus:Map<Int,Callback<Package>> = new Map(); private final responseBus:Map<Int, Callback<Package>> = new Map();
private var protoHandlers:Map<String, Callback<Package>> = new Map(); private final protoHandlers:Map<String, Callback<Package>> = new Map();
private var allModems:Array<kernel.peripherals.Modem>; private var allModems:Array<kernel.peripherals.Modem>;
private var routingTable:Map<Int, kernel.peripherals.Modem> = new Map(); private final routingTable:Map<NetworkID, kernel.peripherals.Modem> = new Map();
private final onNewNeigborTrigger: SignalTrigger<Int> = Signal.trigger(); private final onNewNeigborTrigger:SignalTrigger<NetworkID> = Signal.trigger();
@:allow(kernel.Init) @:allow(kernel.Init)
private function new() { private function new() {
@ -52,24 +52,27 @@ class Net {
discoverNeighbors(); discoverNeighbors();
} }
/**
Called when a new package comes in.
**/
private function handelIncomming(pack:Package, ?addr:String) { private function handelIncomming(pack:Package, ?addr:String) {
switch pack.type { switch pack.type {
case Data(_): case Data(_):
routeTo(pack); routeToProto(pack);
case DataNoResponse(_): case DataNoResponse(_):
routeTo(pack); routeToProto(pack);
case Response | RouteDiscoverResponse: case Response:
if (responseBus.exists(pack.msgID)){ if (responseBus.exists(pack.msgID)) {
responseBus[pack.msgID].invoke(pack); responseBus[pack.msgID].invoke(pack);
} }
case RouteDiscover: case RouteDiscover(_) | RouteDiscoverResponse(_):
handleRoute(pack, addr); handleRouteDiscover(pack, addr);
} }
} }
private function newRoutPackage():Package { private function newRoutDiscoverPackage():Package {
var pack:Package = { var pack:Package = {
type: RouteDiscover, type: RouteDiscover(getAllNeighbors()),
toID: BRODCAST_PORT, toID: BRODCAST_PORT,
msgID: generateMessageID(), msgID: generateMessageID(),
fromID: networkID, fromID: networkID,
@ -79,33 +82,45 @@ class Net {
return pack; return pack;
} }
/**
Send out discover packages on all modems.
**/
private function discoverNeighbors() { private function discoverNeighbors() {
for (modem in allModems) { for (modem in allModems) {
var pack = newRoutPackage(); var pack = newRoutDiscoverPackage();
var timeout:Timer = null; modem.transmit(BRODCAST_PORT, networkID, pack);
responseBus[pack.msgID] = ((response:Package)->{
addRoute(response.fromID, modem.addr);
});
timeout = new Timer(MESSAGE_TIMEOUT, () -> {
responseBus.remove(pack.msgID);
});
modem.transmit(BRODCAST_PORT, OS.getComputerID(), pack);
} }
} }
private function handleRoute(pack:Package, addr:String) { /**
Handle incomming packages that involve route discovery.
**/
private function handleRouteDiscover(pack:Package, addr:String) {
addRoute(pack.fromID, addr); addRoute(pack.fromID, addr);
var forward = getAllNeighbors().filter(i -> i == pack.fromID);
switch pack.type{
case RouteDiscoverResponse(reachableIDs):
for (id in reachableIDs) {
addRoute(id, addr);
}
return; // Dont respond to a response
case RouteDiscover(reachableIDs):
for (id in reachableIDs) {
addRoute(id, addr);
}
default:
throw new Error("Expcted RouteDiscover or RouteDiscoverResponse type");
}
// Respond to peer // Respond to peer
var response:Package = { var response:Package = {
toID: pack.fromID, toID: pack.fromID,
fromID: OS.getComputerID(), fromID: networkID,
msgID: pack.msgID, msgID: pack.msgID,
type: RouteDiscoverResponse, type: RouteDiscoverResponse(forward),
data: null data: null
} }
@ -114,8 +129,12 @@ class Net {
} }
} }
/**
Called when a route to a client has been disoverd.
Its posible to be called multiple times with the same id but different addr.
**/
private function addRoute(toID:Int, addr:String) { private function addRoute(toID:Int, addr:String) {
Log.debug("Added new route to " + toID + " via " + addr); Log.info("Added new route to " + toID + " via " + addr);
routingTable.set(toID, allModems.find(item -> item.addr == addr)); routingTable.set(toID, allModems.find(item -> item.addr == addr));
this.onNewNeigborTrigger.trigger(toID); this.onNewNeigborTrigger.trigger(toID);
} }
@ -164,7 +183,10 @@ class Net {
sendRaw(response); sendRaw(response);
} }
private function routeTo(pack:Package) { /**
Send to package to the localy register handler based on the proto
**/
private function routeToProto(pack:Package) {
var proto:String = switch pack.type { var proto:String = switch pack.type {
case Data(proto): case Data(proto):
proto; proto;
@ -181,6 +203,9 @@ class Net {
protoHandlers[proto].invoke(pack); protoHandlers[proto].invoke(pack);
} }
/**
Just send the package to the right modem.
**/
private function sendRaw(pack:Package) { private function sendRaw(pack:Package) {
if (pack.toID == networkID) { if (pack.toID == networkID) {
// Loopback // Loopback
@ -195,7 +220,11 @@ class Net {
} }
} }
public function sendAndAwait(dest:Int, proto:String, data:Dynamic):Promise<Package> { /**
Send a message and wait for a response.
**/
public function sendAndAwait(dest:NetworkID, proto:String, data:Dynamic):Promise<Package> {
return new Promise<Package>((resolve, reject) -> { return new Promise<Package>((resolve, reject) -> {
var pack:Package = { var pack:Package = {
toID: dest, toID: dest,
@ -207,7 +236,7 @@ class Net {
var timeout:Timer = null; var timeout:Timer = null;
responseBus[pack.msgID] = ((reponse: Package) ->{ responseBus[pack.msgID] = ((reponse:Package) -> {
resolve(reponse); resolve(reponse);
if (timeout != null) { if (timeout != null) {
timeout.cancle(); timeout.cancle();
@ -216,10 +245,13 @@ class Net {
timeout = new Timer(MESSAGE_TIMEOUT, () -> { timeout = new Timer(MESSAGE_TIMEOUT, () -> {
responseBus.remove(pack.msgID); responseBus.remove(pack.msgID);
reject(new Exception("Timeout")); reject(new Error(InternalError,"Message timeout"));
}); });
sendRaw(pack); sendRaw(pack);
// No callback link for you
return null;
}); });
} }
@ -236,8 +268,4 @@ class Net {
public function removeProto(proto:String) { public function removeProto(proto:String) {
protoHandlers.remove(proto); protoHandlers.remove(proto);
} }
public function getNeigbors():Array<Int> {
return this.routingTable.mapi((index, item) -> {return index;});
}
} }

View File

@ -1,19 +1,21 @@
package kernel.net; package kernel.net;
typedef NetworkID = Int;
enum PackageTypes { enum PackageTypes {
Data(proto:String); Data(proto:String);
DataNoResponse(proto:String); DataNoResponse(proto:String);
Response; Response;
RouteDiscover(); RouteDiscover(reachableIDs: Array<NetworkID>);
RouteDiscoverResponse(); RouteDiscoverResponse(reachableIDs: Array<NetworkID>);
} }
/** /**
Representing a network package. Representing a network package.
**/ **/
@:structInit class Package { @:structInit class Package {
public final fromID:Int; public final fromID:NetworkID;
public final toID:Int; public final toID:NetworkID;
public final msgID:Int; public final msgID:Int;
public final type:PackageTypes; public final type:PackageTypes;
public final data:Dynamic; public final data:Dynamic;