Compare commits

...

5 Commits

Author SHA1 Message Date
ebd9709c3d added networkID to webconsole log 2022-02-24 02:06:50 +01:00
e695fc9b6d improved route discovery 2022-02-24 02:05:43 +01:00
33731778ea improved net 2022-02-22 02:28:53 +01:00
418952bd25 startup changes 2022-02-22 01:48:31 +01:00
e2431666e1 added onNewNeigbor 2022-02-22 01:47:55 +01:00
4 changed files with 102 additions and 65 deletions

View File

@@ -1,3 +1,4 @@
import haxe.MainLoop;
import kernel.Init;
import lib.ui.Observable;
import lib.ui.TextElement;
@@ -11,20 +12,11 @@ class Startup {
public static function main() {
Init.initKernel();
MainLoop.add(() -> {
KernelEvents.instance.startEventLoop();
},0);
exampleUI();
KernelEvents.instance.startEventLoop();
}
static function exampleProgramm() {
var context = WindowManager.instance.createNewContext();
context.clickSignal.handle(data -> {
context.setCursorPos(data.pos.x,data.pos.y);
context.write("x");
});
context.write("Example programm");
}
static function exampleUI() {
@@ -38,7 +30,7 @@ class Startup {
]);
ui.render();
context.clickSignal.handle(data -> {
text.set("Holla mundo");
});

View File

@@ -4,6 +4,7 @@ import kernel.ui.WindowContext;
import kernel.ui.WindowManager;
import lib.TermWriteable;
import lib.TermIO;
import kernel.net.Net;
import util.Debug;
/**
@@ -26,28 +27,28 @@ class Log {
public static function info(msg:Dynamic, ?pos:haxe.PosInfos) {
writer.writeLn("[INFO][" + pos.className + "]: " + Std.string(msg));
#if webconsole
Debug.printWeb("[INFO][" + pos.className + "]: " + Std.string(msg));
Debug.printWeb("["+Net.instance.networkID+"][INFO][" + pos.className + "]: " + Std.string(msg));
#end
}
public static function warn(msg:Dynamic, ?pos:haxe.PosInfos) {
writer.writeLn("[WARN][" + pos.className + "]: " + Std.string(msg), Yellow);
#if webconsole
Debug.printWeb("[WARN][" + pos.className + "]: " + Std.string(msg));
Debug.printWeb("["+Net.instance.networkID+"][WARN][" + pos.className + "]: " + Std.string(msg));
#end
}
public static function error(msg:Dynamic, ?pos:haxe.PosInfos) {
writer.writeLn("[ERRO][" + pos.className + "]: " + Std.string(msg), Red);
#if webconsole
Debug.printWeb("[ERRO][" + pos.className + "]: " + Std.string(msg));
Debug.printWeb("["+Net.instance.networkID+"][ERRO][" + pos.className + "]: " + Std.string(msg));
#end
}
public static function debug(msg:Dynamic, ?pos:haxe.PosInfos) {
writer.writeLn("[DEBG][" + pos.className + "]: " + Std.string(msg), Gray);
#if webconsole
Debug.printWeb("[DEBG][" + pos.className + "]: " + Std.string(msg));
Debug.printWeb("["+Net.instance.networkID+"][DEBG][" + pos.className + "]: " + Std.string(msg));
#end
}

View File

@@ -1,12 +1,13 @@
package kernel.net;
using tink.CoreApi;
import kernel.net.Package.NetworkID;
import kernel.peripherals.Peripherals.Peripheral;
import kernel.Log;
import kernel.KernelEvents;
import haxe.Exception;
import util.Promise;
import kernel.Timer;
import util.EventBus;
import cc.OS;
using Lambda;
@@ -18,9 +19,21 @@ using util.Extender.LambdaExtender;
**/
class Net {
public static var instance:Net;
public static inline final BRODCAST_PORT:Int = 65533;
public static inline final MESSAGE_TIMEOUT:Int = 3;
public final onNewNeigbor:Signal<Int>;
public final networkID:NetworkID = OS.getComputerID();
private final responseBus:Map<Int, Callback<Package>> = new Map();
private final protoHandlers:Map<String, Callback<Package>> = new Map();
private var allModems:Array<kernel.peripherals.Modem>;
private final routingTable:Map<NetworkID, kernel.peripherals.Modem> = new Map();
private final onNewNeigborTrigger:SignalTrigger<NetworkID> = Signal.trigger();
@:allow(kernel.Init)
private function new() {
onNewNeigbor = onNewNeigborTrigger.asSignal();
KernelEvents.instance.onModemMessage.handle(params -> {
var pack:Package = {
fromID: params.replyChannel,
@@ -34,35 +47,32 @@ class Net {
});
allModems = Peripheral.instance.getModems();
open();
discoverNeighbors();
}
public static inline final BRODCAST_PORT:Int = 65533;
public static inline final MESSAGE_TIMEOUT:Int = 3;
private var networkID:Int = OS.getComputerID();
private var responseBus:util.EventBus<Package> = new EventBus();
private var protoHandlers:Map<String, Package->Void> = new Map();
private var allModems:Array<kernel.peripherals.Modem>;
private var routingTable:Map<Int, kernel.peripherals.Modem> = new Map();
/**
Called when a new package comes in.
**/
private function handelIncomming(pack:Package, ?addr:String) {
switch pack.type {
case Data(_):
routeTo(pack);
routeToProto(pack);
case DataNoResponse(_):
routeTo(pack);
case Response | RouteDiscoverResponse:
responseBus.emit(Std.string(pack.msgID), pack);
case RouteDiscover:
handleRoute(pack, addr);
routeToProto(pack);
case Response:
if (responseBus.exists(pack.msgID)) {
responseBus[pack.msgID].invoke(pack);
}
case RouteDiscover(_) | RouteDiscoverResponse(_):
handleRouteDiscover(pack, addr);
}
}
private function newRoutPackage():Package {
private function newRoutDiscoverPackage():Package {
var pack:Package = {
type: RouteDiscover,
type: RouteDiscover(getAllNeighbors()),
toID: BRODCAST_PORT,
msgID: generateMessageID(),
fromID: networkID,
@@ -72,32 +82,45 @@ class Net {
return pack;
}
/**
Send out discover packages on all modems.
**/
private function discoverNeighbors() {
for (modem in allModems) {
var pack = newRoutPackage();
var pack = newRoutDiscoverPackage();
var timeout:Timer = null;
var responeListner = responseBus.on(Std.string(pack.msgID), pack -> {
addRoute(pack.fromID, modem.addr);
});
timeout = new Timer(MESSAGE_TIMEOUT, () -> {
responseBus.removeListner(responeListner);
});
modem.transmit(BRODCAST_PORT, OS.getComputerID(), pack);
modem.transmit(BRODCAST_PORT, networkID, pack);
}
}
private function handleRoute(pack:Package, addr:String) {
/**
Handle incomming packages that involve route discovery.
**/
private function handleRouteDiscover(pack:Package, addr:String) {
addRoute(pack.fromID, addr);
var forward = getAllNeighbors().filter(i -> i == pack.fromID);
switch pack.type{
case RouteDiscoverResponse(reachableIDs):
for (id in reachableIDs) {
addRoute(id, addr);
}
return; // Dont respond to a response
case RouteDiscover(reachableIDs):
for (id in reachableIDs) {
addRoute(id, addr);
}
default:
throw new Error("Expcted RouteDiscover or RouteDiscoverResponse type");
}
// Respond to peer
var response:Package = {
toID: pack.fromID,
fromID: OS.getComputerID(),
fromID: networkID,
msgID: pack.msgID,
type: RouteDiscoverResponse,
type: RouteDiscoverResponse(forward),
data: null
}
@@ -106,9 +129,14 @@ class Net {
}
}
/**
Called when a route to a client has been disoverd.
Its posible to be called multiple times with the same id but different addr.
**/
private function addRoute(toID:Int, addr:String) {
Log.debug("Added new route to " + toID + " via " + addr);
Log.info("Added new route to " + toID + " via " + addr);
routingTable.set(toID, allModems.find(item -> item.addr == addr));
this.onNewNeigborTrigger.trigger(toID);
}
public function getAllNeighbors():Array<Int> {
@@ -155,7 +183,10 @@ class Net {
sendRaw(response);
}
private function routeTo(pack:Package) {
/**
Send to package to the localy register handler based on the proto
**/
private function routeToProto(pack:Package) {
var proto:String = switch pack.type {
case Data(proto):
proto;
@@ -169,9 +200,12 @@ class Net {
return;
}
protoHandlers[proto](pack);
protoHandlers[proto].invoke(pack);
}
/**
Just send the package to the right modem.
**/
private function sendRaw(pack:Package) {
if (pack.toID == networkID) {
// Loopback
@@ -186,7 +220,11 @@ class Net {
}
}
public function sendAndAwait(dest:Int, proto:String, data:Dynamic):Promise<Package> {
/**
Send a message and wait for a response.
**/
public function sendAndAwait(dest:NetworkID, proto:String, data:Dynamic):Promise<Package> {
return new Promise<Package>((resolve, reject) -> {
var pack:Package = {
toID: dest,
@@ -197,30 +235,34 @@ class Net {
}
var timeout:Timer = null;
var responeListner = responseBus.once(Std.string(pack.msgID), p -> {
resolve(p);
responseBus[pack.msgID] = ((reponse:Package) -> {
resolve(reponse);
if (timeout != null) {
timeout.cancle();
}
});
timeout = new Timer(MESSAGE_TIMEOUT, () -> {
responseBus.removeListner(responeListner);
reject(new Exception("Timeout"));
responseBus.remove(pack.msgID);
reject(new Error(InternalError,"Message timeout"));
});
sendRaw(pack);
// No callback link for you
return null;
});
}
public function registerProto(proto:String, cb:Package->Void) {
public function registerProto(proto:String, callback:Callback<Package>) {
if (protoHandlers.exists(proto)) {
// Failed. Handler already exist.
// TODO: return error
return;
}
protoHandlers[proto] = cb;
protoHandlers[proto] = callback;
}
public function removeProto(proto:String) {

View File

@@ -1,19 +1,21 @@
package kernel.net;
typedef NetworkID = Int;
enum PackageTypes {
Data(proto:String);
DataNoResponse(proto:String);
Response;
RouteDiscover();
RouteDiscoverResponse();
RouteDiscover(reachableIDs: Array<NetworkID>);
RouteDiscoverResponse(reachableIDs: Array<NetworkID>);
}
/**
Representing a network package.
**/
@:structInit class Package {
public final fromID:Int;
public final toID:Int;
public final fromID:NetworkID;
public final toID:NetworkID;
public final msgID:Int;
public final type:PackageTypes;
public final data:Dynamic;