package kernel.net; using tink.CoreApi; import kernel.peripherals.Peripherals.Peripheral; import kernel.Log; import kernel.KernelEvents; import haxe.Exception; import util.Promise; import kernel.Timer; import util.EventBus; import cc.OS; using Lambda; using util.Extender.LambdaExtender; /** Class responsible for everything network related. Used to send and recceive packages. **/ class Net { public static var instance:Net; public final onNewNeigbor: Signal; private final onNewNeigborTrigger: SignalTrigger = Signal.trigger(); @:allow(kernel.Init) private function new() { onNewNeigbor = onNewNeigborTrigger.asSignal(); KernelEvents.instance.onModemMessage.handle(params -> { var pack:Package = { fromID: params.replyChannel, toID: params.channel, msgID: params.message.msgID, type: params.message.type, data: params.message.data, }; handelIncomming(pack, params.addr); }); allModems = Peripheral.instance.getModems(); open(); discoverNeighbors(); } public static inline final BRODCAST_PORT:Int = 65533; public static inline final MESSAGE_TIMEOUT:Int = 3; private var networkID:Int = OS.getComputerID(); private var responseBus:util.EventBus = new EventBus(); private var protoHandlers:MapVoid> = new Map(); private var allModems:Array; private var routingTable:Map = new Map(); private function handelIncomming(pack:Package, ?addr:String) { switch pack.type { case Data(_): routeTo(pack); case DataNoResponse(_): routeTo(pack); case Response | RouteDiscoverResponse: responseBus.emit(Std.string(pack.msgID), pack); case RouteDiscover: handleRoute(pack, addr); } } private function newRoutPackage():Package { var pack:Package = { type: RouteDiscover, toID: BRODCAST_PORT, msgID: generateMessageID(), fromID: networkID, data: null, } return pack; } private function discoverNeighbors() { for (modem in allModems) { var pack = newRoutPackage(); var timeout:Timer = null; var responeListner = responseBus.on(Std.string(pack.msgID), pack -> { addRoute(pack.fromID, modem.addr); }); timeout = new Timer(MESSAGE_TIMEOUT, () -> { responseBus.removeListner(responeListner); }); modem.transmit(BRODCAST_PORT, OS.getComputerID(), pack); } } private function handleRoute(pack:Package, addr:String) { addRoute(pack.fromID, addr); // Respond to peer var response:Package = { toID: pack.fromID, fromID: OS.getComputerID(), msgID: pack.msgID, type: RouteDiscoverResponse, data: null } for (reponseModem in allModems.filter(m -> m.addr == addr)) { reponseModem.transmit(pack.fromID, networkID, response); } } private function addRoute(toID:Int, addr:String) { Log.debug("Added new route to " + toID + " via " + addr); this.onNewNeigborTrigger.trigger(toID); routingTable.set(toID, allModems.find(item -> item.addr == addr)); } public function getAllNeighbors():Array { return routingTable.mapi((index, item) -> index); } private function generateMessageID():Int { return Std.random(2147483647); // TODO: better uniqe number } /** Open all wireless and wired modem. **/ private function open() { for (m in allModems) { m.open(networkID); m.open(BRODCAST_PORT); } } /** Send a message. Dont care if its reaches its destination nor it has a response. **/ public function sendAndForget(dest:Int, proto:String, data:Dynamic) { var pack:Package = { toID: dest, fromID: networkID, msgID: generateMessageID(), type: DataNoResponse(proto), data: data } sendRaw(pack); } public function respondTo(pack:Package, data:Dynamic) { if (pack.type.match(DataNoResponse(_))) { Log.warn("Responed to a no response package. Ignoring"); return; } var response = pack.createResponse(data); sendRaw(response); } private function routeTo(pack:Package) { var proto:String = switch pack.type { case Data(proto): proto; case DataNoResponse(proto): proto; case _: return; } if (!protoHandlers.exists(proto) && protoHandlers[proto] != null) { return; } protoHandlers[proto](pack); } private function sendRaw(pack:Package) { if (pack.toID == networkID) { // Loopback handelIncomming(pack); } else { if (routingTable.exists(pack.toID)) { routingTable[pack.toID].transmit(pack.toID, pack.fromID, pack); } else { // Route not found // TODO: forward package or report not reachable } } } public function sendAndAwait(dest:Int, proto:String, data:Dynamic):Promise { return new Promise((resolve, reject) -> { var pack:Package = { toID: dest, fromID: networkID, msgID: generateMessageID(), type: Data(proto), data: data } var timeout:Timer = null; var responeListner = responseBus.once(Std.string(pack.msgID), p -> { resolve(p); if (timeout != null) { timeout.cancle(); } }); timeout = new Timer(MESSAGE_TIMEOUT, () -> { responseBus.removeListner(responeListner); reject(new Exception("Timeout")); }); sendRaw(pack); }); } public function registerProto(proto:String, cb:Package->Void) { if (protoHandlers.exists(proto)) { // Failed. Handler already exist. // TODO: return error return; } protoHandlers[proto] = cb; } public function removeProto(proto:String) { protoHandlers.remove(proto); } }