package kernel.net; import haxe.ds.ReadOnlyArray; using tink.CoreApi; import kernel.net.Package.NetworkID; import kernel.peripherals.Peripherals.Peripheral; import kernel.Log; import kernel.Timer; import cc.OS; using Lambda; using util.Extender.LambdaExtender; /** Class responsible for everything network related. Used to send and recceive packages. **/ class Net { /** Depends on: KernelEvents **/ public static var instance:Net; public static inline final BRODCAST_PORT:Int = 65533; public static inline final MESSAGE_TIMEOUT:Int = 3; public static inline final DEFAULT_TTL:Int = 10; public final networkID:NetworkID = OS.getComputerID(); private final responseBus:Map> = new Map(); private final protoHandlers:Map> = new Map(); private var interfaces:Array; @:allow(kernel.Init) private function new() { this.interfaces = [for (e in Peripheral.instance.getModems()) e ]; // TODO: is this the way to do it? this.interfaces.push(Loopback.instance); for (interf in interfaces){ setupInterf(interf); } setupPingHandle(); } private function setupPingHandle() { this.registerProto('ping', pack -> { this.respondTo(pack, pack.data); }); } private function setupInterf(interf: INetworkInterface) { interf.onMessage.handle(pack -> handle(pack,interf)); interf.listen(networkID); interf.listen(BRODCAST_PORT); } /** Called when a new package comes in. **/ private function handle(pack:Package,interf: INetworkInterface) { if (pack.toID == this.networkID || pack.toID == Net.BRODCAST_PORT){ switch pack.type { case Data(_) | DataNoResponse(_): // Let a local proccess handle it routeToProto(pack); case Response: // Got a response to a send message. Invoke the callback if (responseBus.exists(pack.msgID)) { responseBus[pack.msgID].invoke(pack); } case RouteDiscover(_) | RouteDiscoverResponse(_) | RouteDiscoverUpdate(_): // Delegate to Routing Routing.instance.handleRoutePackage(pack,interf); } }else{ // New message received but its not ment for us. Forward if possible. forwardPackage(pack); } } private function generateMessageID():Int { return Std.random(2147483647); // TODO: better uniqe number } /** Send a message. Dont care if its reaches its destination nor it has a response. **/ public function sendAndForget(dest:Int, proto:String, data:Dynamic) { var pack:Package = { toID: dest, fromID: networkID, msgID: generateMessageID(), type: DataNoResponse(proto), data: data, ttl: Net.DEFAULT_TTL, } sendRaw(pack); } private function forwardPackage(pack: Package) { if (pack.ttl == 0){ // Drop package return; } pack.ttl--; if (!sendRaw(pack)){ // Cant forward } } public function respondTo(pack:Package, data:Dynamic) { if (pack.type.match(DataNoResponse(_))) { Log.warn("Responed to a no response package. Ignoring"); return; } var response = pack.createResponse(data); sendRaw(response); } /** Send to package to the localy register handler based on the proto **/ private function routeToProto(pack:Package) { var proto = switch pack.type { case Data(proto): proto; case DataNoResponse(proto): proto; default: return; } if (!protoHandlers.exists(proto)) { Log.warn('Trying to route package to proto: $proto but nothing was register'); return; } protoHandlers[proto].invoke(pack); } /** Just send the package to the right modem. Returns true if message was send **/ private function sendRaw(pack:Package): Bool { var route = Routing.instance.getRouteToID(pack.toID); if (route == null){ return false; } route.interf.send(route.rep,this.networkID,pack); return true; } /** Send a message and wait for a response. **/ public function sendAndAwait(dest:NetworkID, proto:String, data:Dynamic):Promise { return new Promise((resolve, reject) -> { var pack:Package = { toID: dest, fromID: networkID, msgID: generateMessageID(), type: Data(proto), data: data, ttl: Net.DEFAULT_TTL, } var timeout:Timer = null; responseBus[pack.msgID] = ((reponse:Package) -> { resolve(reponse); if (timeout != null) { timeout.cancle(); } }); timeout = new Timer(MESSAGE_TIMEOUT, () -> { responseBus.remove(pack.msgID); reject(new Error(InternalError,"Message timeout")); }); if (!sendRaw(pack)){ reject(new Error("ID unreachable")); } // No callback link for you return null; }); } public function registerProto(proto:String, callback:Callback) { if (protoHandlers.exists(proto)) { // Failed. Handler already exist. // TODO: return error return; } protoHandlers[proto] = callback; } public function removeProto(proto:String) { protoHandlers.remove(proto); } /** Sends a ping package to the given id. Returns true if there was a response. **/ public function ping(toID: NetworkID): Promise { return new Promise((resolve,reject)->{ this.sendAndAwait(toID,"ping",null).handle(pack -> { switch pack { case Success(_): resolve(true); case Failure(err): resolve(false); } }); return null; }); } public function getActiveProtocols(): ReadOnlyArray { var arr = new Array(); for (proto in protoHandlers.keys()) { arr.push(proto); } return arr; } }