package kernel.net; import kernel.peripherals.Peripherals.Peripheral; import kernel.Log; import kernel.KernelEvents; import haxe.Exception; import util.Promise; import kernel.Timer; import util.EventBus; import cc.OS; using Lambda; using util.Extender.LambdaExtender; /** Class responsible for everything network related. Used to send and recceive packages. **/ class Net{ public static var instance:Net; @:allow(kernel.Init) private function new () { KernelEvents.instance.on("modem_message",(params)->{ var pack = Package.fromEvent(params); handelIncomming(pack,params[1]); }); allModems = Peripheral.instance.getModems(); open(); discoverNeighbors(); } public static inline final BRODCAST_PORT:Int = 65533; public static inline final MESSAGE_TIMEOUT:Int = 3; private var networkID:Int = OS.getComputerID(); private var responseBus: util.EventBus = new EventBus(); private var protoHandlers: Map Void> = new Map(); private var allModems:Array; private var routingTable: Map = new Map(); private function handelIncomming(pack: Package, ?addr:String) { switch pack.type { case Data(_): routeTo(pack); case DataNoResponse(_): routeTo(pack); case Response | RouteDiscoverResponse: responseBus.emit(Std.string(pack.msgID),pack); case RouteDiscover: handleRoute(pack,addr); } } private function newRoutPackage(): Package { var pack: Package = { type: RouteDiscover, toID: BRODCAST_PORT, msgID: generateMessageID(), fromID: networkID, data: null, } return pack; } private function discoverNeighbors() { for (modem in allModems) { var pack = newRoutPackage(); var timeout: Timer = null; var responeListner = responseBus.on(Std.string(pack.msgID),pack -> { addRoute(pack.fromID,modem.addr); }); timeout = new Timer(MESSAGE_TIMEOUT,() -> { responseBus.removeListner(responeListner); }); modem.transmit(BRODCAST_PORT,OS.getComputerID(),pack); } } private function handleRoute(pack: Package, addr: String) { addRoute(pack.fromID,addr); // Respond to peer var response: Package = { toID: pack.fromID, fromID: OS.getComputerID(), msgID: pack.msgID, type: RouteDiscoverResponse, data: null } for (reponseModem in allModems.filter(m -> m.addr == addr)) { reponseModem.transmit(pack.fromID,networkID,response); } } private function addRoute(toID: Int,addr: String) { Log.debug("Added new route to "+toID+" via "+addr); routingTable.set(toID,allModems.find(item -> item.addr == addr)); } public function getAllNeighbors(): Array { return routingTable.mapi((index, item) -> index); } private function generateMessageID(): Int { return Std.random(2147483647); // TODO: better uniqe number } /** Open all wireless and wired modem. **/ private function open() { for (m in allModems) { m.open(networkID); m.open(BRODCAST_PORT); } } /** Send a message. Dont care if its reaches its destination nor it has a response. **/ public function sendAndForget(dest:Int,proto:String,data: Dynamic){ var pack: Package = { toID: dest, fromID: networkID, msgID: generateMessageID(), type: DataNoResponse(proto), data: data } sendRaw(pack); } public function respondTo(pack: Package,data: Dynamic) { if (pack.type.match(DataNoResponse(_))){ Log.warn("Responed to a no response package. Ignoring"); return; } var response = pack.createResponse(data); sendRaw(response); } private function routeTo(pack: Package) { var proto: String = switch pack.type { case Data(proto): proto; case DataNoResponse(proto): proto; case _: return; } if (!protoHandlers.exists(proto) && protoHandlers[proto] != null){ return; } protoHandlers[proto](pack); } private function sendRaw(pack: Package){ if (pack.toID == networkID){ // Loopback handelIncomming(pack); }else{ if (routingTable.exists(pack.toID)){ routingTable[pack.toID].transmit(pack.toID,pack.fromID,pack); }else{ // Route not found // TODO: forward package or report not reachable } } } public function sendAndAwait(dest: Int,proto:String,data: Dynamic): Promise { return new Promise((resolve, reject) -> { var pack: Package = { toID: dest, fromID: networkID, msgID: generateMessageID(), type: Data(proto), data: data } var timeout: Timer = null; var responeListner = responseBus.once(Std.string(pack.msgID),p -> { resolve(p); if (timeout != null){ timeout.cancle(); } }); timeout = new Timer(MESSAGE_TIMEOUT,() -> { responseBus.removeListner(responeListner); reject(new Exception("Timeout")); }); sendRaw(pack); }); } public function registerProto(proto: String,cb: Package -> Void) { if (protoHandlers.exists(proto)){ // Failed. Handler already exist. // TODO: return error return; } protoHandlers[proto] = cb; } public function removeProto(proto: String) { protoHandlers.remove(proto); } }