Compare commits

...

5 Commits

Author SHA1 Message Date
ebd9709c3d added networkID to webconsole log 2022-02-24 02:06:50 +01:00
e695fc9b6d improved route discovery 2022-02-24 02:05:43 +01:00
33731778ea improved net 2022-02-22 02:28:53 +01:00
418952bd25 startup changes 2022-02-22 01:48:31 +01:00
e2431666e1 added onNewNeigbor 2022-02-22 01:47:55 +01:00
4 changed files with 102 additions and 65 deletions

View File

@@ -1,3 +1,4 @@
import haxe.MainLoop;
import kernel.Init; import kernel.Init;
import lib.ui.Observable; import lib.ui.Observable;
import lib.ui.TextElement; import lib.ui.TextElement;
@@ -11,20 +12,11 @@ class Startup {
public static function main() { public static function main() {
Init.initKernel(); Init.initKernel();
exampleUI(); MainLoop.add(() -> {
KernelEvents.instance.startEventLoop(); KernelEvents.instance.startEventLoop();
} },0);
static function exampleProgramm() { exampleUI();
var context = WindowManager.instance.createNewContext();
context.clickSignal.handle(data -> {
context.setCursorPos(data.pos.x,data.pos.y);
context.write("x");
});
context.write("Example programm");
} }
static function exampleUI() { static function exampleUI() {

View File

@@ -4,6 +4,7 @@ import kernel.ui.WindowContext;
import kernel.ui.WindowManager; import kernel.ui.WindowManager;
import lib.TermWriteable; import lib.TermWriteable;
import lib.TermIO; import lib.TermIO;
import kernel.net.Net;
import util.Debug; import util.Debug;
/** /**
@@ -26,28 +27,28 @@ class Log {
public static function info(msg:Dynamic, ?pos:haxe.PosInfos) { public static function info(msg:Dynamic, ?pos:haxe.PosInfos) {
writer.writeLn("[INFO][" + pos.className + "]: " + Std.string(msg)); writer.writeLn("[INFO][" + pos.className + "]: " + Std.string(msg));
#if webconsole #if webconsole
Debug.printWeb("[INFO][" + pos.className + "]: " + Std.string(msg)); Debug.printWeb("["+Net.instance.networkID+"][INFO][" + pos.className + "]: " + Std.string(msg));
#end #end
} }
public static function warn(msg:Dynamic, ?pos:haxe.PosInfos) { public static function warn(msg:Dynamic, ?pos:haxe.PosInfos) {
writer.writeLn("[WARN][" + pos.className + "]: " + Std.string(msg), Yellow); writer.writeLn("[WARN][" + pos.className + "]: " + Std.string(msg), Yellow);
#if webconsole #if webconsole
Debug.printWeb("[WARN][" + pos.className + "]: " + Std.string(msg)); Debug.printWeb("["+Net.instance.networkID+"][WARN][" + pos.className + "]: " + Std.string(msg));
#end #end
} }
public static function error(msg:Dynamic, ?pos:haxe.PosInfos) { public static function error(msg:Dynamic, ?pos:haxe.PosInfos) {
writer.writeLn("[ERRO][" + pos.className + "]: " + Std.string(msg), Red); writer.writeLn("[ERRO][" + pos.className + "]: " + Std.string(msg), Red);
#if webconsole #if webconsole
Debug.printWeb("[ERRO][" + pos.className + "]: " + Std.string(msg)); Debug.printWeb("["+Net.instance.networkID+"][ERRO][" + pos.className + "]: " + Std.string(msg));
#end #end
} }
public static function debug(msg:Dynamic, ?pos:haxe.PosInfos) { public static function debug(msg:Dynamic, ?pos:haxe.PosInfos) {
writer.writeLn("[DEBG][" + pos.className + "]: " + Std.string(msg), Gray); writer.writeLn("[DEBG][" + pos.className + "]: " + Std.string(msg), Gray);
#if webconsole #if webconsole
Debug.printWeb("[DEBG][" + pos.className + "]: " + Std.string(msg)); Debug.printWeb("["+Net.instance.networkID+"][DEBG][" + pos.className + "]: " + Std.string(msg));
#end #end
} }

View File

@@ -1,12 +1,13 @@
package kernel.net; package kernel.net;
using tink.CoreApi;
import kernel.net.Package.NetworkID;
import kernel.peripherals.Peripherals.Peripheral; import kernel.peripherals.Peripherals.Peripheral;
import kernel.Log; import kernel.Log;
import kernel.KernelEvents; import kernel.KernelEvents;
import haxe.Exception; import haxe.Exception;
import util.Promise;
import kernel.Timer; import kernel.Timer;
import util.EventBus;
import cc.OS; import cc.OS;
using Lambda; using Lambda;
@@ -18,9 +19,21 @@ using util.Extender.LambdaExtender;
**/ **/
class Net { class Net {
public static var instance:Net; public static var instance:Net;
public static inline final BRODCAST_PORT:Int = 65533;
public static inline final MESSAGE_TIMEOUT:Int = 3;
public final onNewNeigbor:Signal<Int>;
public final networkID:NetworkID = OS.getComputerID();
private final responseBus:Map<Int, Callback<Package>> = new Map();
private final protoHandlers:Map<String, Callback<Package>> = new Map();
private var allModems:Array<kernel.peripherals.Modem>;
private final routingTable:Map<NetworkID, kernel.peripherals.Modem> = new Map();
private final onNewNeigborTrigger:SignalTrigger<NetworkID> = Signal.trigger();
@:allow(kernel.Init) @:allow(kernel.Init)
private function new() { private function new() {
onNewNeigbor = onNewNeigborTrigger.asSignal();
KernelEvents.instance.onModemMessage.handle(params -> { KernelEvents.instance.onModemMessage.handle(params -> {
var pack:Package = { var pack:Package = {
fromID: params.replyChannel, fromID: params.replyChannel,
@@ -34,35 +47,32 @@ class Net {
}); });
allModems = Peripheral.instance.getModems(); allModems = Peripheral.instance.getModems();
open(); open();
discoverNeighbors(); discoverNeighbors();
} }
public static inline final BRODCAST_PORT:Int = 65533; /**
public static inline final MESSAGE_TIMEOUT:Int = 3; Called when a new package comes in.
**/
private var networkID:Int = OS.getComputerID();
private var responseBus:util.EventBus<Package> = new EventBus();
private var protoHandlers:Map<String, Package->Void> = new Map();
private var allModems:Array<kernel.peripherals.Modem>;
private var routingTable:Map<Int, kernel.peripherals.Modem> = new Map();
private function handelIncomming(pack:Package, ?addr:String) { private function handelIncomming(pack:Package, ?addr:String) {
switch pack.type { switch pack.type {
case Data(_): case Data(_):
routeTo(pack); routeToProto(pack);
case DataNoResponse(_): case DataNoResponse(_):
routeTo(pack); routeToProto(pack);
case Response | RouteDiscoverResponse: case Response:
responseBus.emit(Std.string(pack.msgID), pack); if (responseBus.exists(pack.msgID)) {
case RouteDiscover: responseBus[pack.msgID].invoke(pack);
handleRoute(pack, addr); }
case RouteDiscover(_) | RouteDiscoverResponse(_):
handleRouteDiscover(pack, addr);
} }
} }
private function newRoutPackage():Package { private function newRoutDiscoverPackage():Package {
var pack:Package = { var pack:Package = {
type: RouteDiscover, type: RouteDiscover(getAllNeighbors()),
toID: BRODCAST_PORT, toID: BRODCAST_PORT,
msgID: generateMessageID(), msgID: generateMessageID(),
fromID: networkID, fromID: networkID,
@@ -72,32 +82,45 @@ class Net {
return pack; return pack;
} }
/**
Send out discover packages on all modems.
**/
private function discoverNeighbors() { private function discoverNeighbors() {
for (modem in allModems) { for (modem in allModems) {
var pack = newRoutPackage(); var pack = newRoutDiscoverPackage();
var timeout:Timer = null; modem.transmit(BRODCAST_PORT, networkID, pack);
var responeListner = responseBus.on(Std.string(pack.msgID), pack -> {
addRoute(pack.fromID, modem.addr);
});
timeout = new Timer(MESSAGE_TIMEOUT, () -> {
responseBus.removeListner(responeListner);
});
modem.transmit(BRODCAST_PORT, OS.getComputerID(), pack);
} }
} }
private function handleRoute(pack:Package, addr:String) { /**
Handle incomming packages that involve route discovery.
**/
private function handleRouteDiscover(pack:Package, addr:String) {
addRoute(pack.fromID, addr); addRoute(pack.fromID, addr);
var forward = getAllNeighbors().filter(i -> i == pack.fromID);
switch pack.type{
case RouteDiscoverResponse(reachableIDs):
for (id in reachableIDs) {
addRoute(id, addr);
}
return; // Dont respond to a response
case RouteDiscover(reachableIDs):
for (id in reachableIDs) {
addRoute(id, addr);
}
default:
throw new Error("Expcted RouteDiscover or RouteDiscoverResponse type");
}
// Respond to peer // Respond to peer
var response:Package = { var response:Package = {
toID: pack.fromID, toID: pack.fromID,
fromID: OS.getComputerID(), fromID: networkID,
msgID: pack.msgID, msgID: pack.msgID,
type: RouteDiscoverResponse, type: RouteDiscoverResponse(forward),
data: null data: null
} }
@@ -106,9 +129,14 @@ class Net {
} }
} }
/**
Called when a route to a client has been disoverd.
Its posible to be called multiple times with the same id but different addr.
**/
private function addRoute(toID:Int, addr:String) { private function addRoute(toID:Int, addr:String) {
Log.debug("Added new route to " + toID + " via " + addr); Log.info("Added new route to " + toID + " via " + addr);
routingTable.set(toID, allModems.find(item -> item.addr == addr)); routingTable.set(toID, allModems.find(item -> item.addr == addr));
this.onNewNeigborTrigger.trigger(toID);
} }
public function getAllNeighbors():Array<Int> { public function getAllNeighbors():Array<Int> {
@@ -155,7 +183,10 @@ class Net {
sendRaw(response); sendRaw(response);
} }
private function routeTo(pack:Package) { /**
Send to package to the localy register handler based on the proto
**/
private function routeToProto(pack:Package) {
var proto:String = switch pack.type { var proto:String = switch pack.type {
case Data(proto): case Data(proto):
proto; proto;
@@ -169,9 +200,12 @@ class Net {
return; return;
} }
protoHandlers[proto](pack); protoHandlers[proto].invoke(pack);
} }
/**
Just send the package to the right modem.
**/
private function sendRaw(pack:Package) { private function sendRaw(pack:Package) {
if (pack.toID == networkID) { if (pack.toID == networkID) {
// Loopback // Loopback
@@ -186,7 +220,11 @@ class Net {
} }
} }
public function sendAndAwait(dest:Int, proto:String, data:Dynamic):Promise<Package> { /**
Send a message and wait for a response.
**/
public function sendAndAwait(dest:NetworkID, proto:String, data:Dynamic):Promise<Package> {
return new Promise<Package>((resolve, reject) -> { return new Promise<Package>((resolve, reject) -> {
var pack:Package = { var pack:Package = {
toID: dest, toID: dest,
@@ -197,30 +235,34 @@ class Net {
} }
var timeout:Timer = null; var timeout:Timer = null;
var responeListner = responseBus.once(Std.string(pack.msgID), p -> {
resolve(p); responseBus[pack.msgID] = ((reponse:Package) -> {
resolve(reponse);
if (timeout != null) { if (timeout != null) {
timeout.cancle(); timeout.cancle();
} }
}); });
timeout = new Timer(MESSAGE_TIMEOUT, () -> { timeout = new Timer(MESSAGE_TIMEOUT, () -> {
responseBus.removeListner(responeListner); responseBus.remove(pack.msgID);
reject(new Exception("Timeout")); reject(new Error(InternalError,"Message timeout"));
}); });
sendRaw(pack); sendRaw(pack);
// No callback link for you
return null;
}); });
} }
public function registerProto(proto:String, cb:Package->Void) { public function registerProto(proto:String, callback:Callback<Package>) {
if (protoHandlers.exists(proto)) { if (protoHandlers.exists(proto)) {
// Failed. Handler already exist. // Failed. Handler already exist.
// TODO: return error // TODO: return error
return; return;
} }
protoHandlers[proto] = cb; protoHandlers[proto] = callback;
} }
public function removeProto(proto:String) { public function removeProto(proto:String) {

View File

@@ -1,19 +1,21 @@
package kernel.net; package kernel.net;
typedef NetworkID = Int;
enum PackageTypes { enum PackageTypes {
Data(proto:String); Data(proto:String);
DataNoResponse(proto:String); DataNoResponse(proto:String);
Response; Response;
RouteDiscover(); RouteDiscover(reachableIDs: Array<NetworkID>);
RouteDiscoverResponse(); RouteDiscoverResponse(reachableIDs: Array<NetworkID>);
} }
/** /**
Representing a network package. Representing a network package.
**/ **/
@:structInit class Package { @:structInit class Package {
public final fromID:Int; public final fromID:NetworkID;
public final toID:Int; public final toID:NetworkID;
public final msgID:Int; public final msgID:Int;
public final type:PackageTypes; public final type:PackageTypes;
public final data:Dynamic; public final data:Dynamic;