package kernel.net; using tink.CoreApi; import kernel.net.Package.NetworkID; import kernel.peripherals.Peripherals.Peripheral; import kernel.Log; import kernel.KernelEvents; import haxe.Exception; import kernel.Timer; import cc.OS; using Lambda; using util.Extender.LambdaExtender; /** Class responsible for everything network related. Used to send and recceive packages. **/ class Net { public static var instance:Net; public static inline final BRODCAST_PORT:Int = 65533; public static inline final MESSAGE_TIMEOUT:Int = 3; public final onNewNeigbor:Signal; public final networkID:NetworkID = OS.getComputerID(); private final responseBus:Map> = new Map(); private final protoHandlers:Map> = new Map(); private var allModems:Array; private final routingTable:Map = new Map(); private final onNewNeigborTrigger:SignalTrigger = Signal.trigger(); @:allow(kernel.Init) private function new() { onNewNeigbor = onNewNeigborTrigger.asSignal(); KernelEvents.instance.onModemMessage.handle(params -> { var pack:Package = { fromID: params.replyChannel, toID: params.channel, msgID: params.message.msgID, type: params.message.type, data: params.message.data, }; handelIncomming(pack, params.addr); }); allModems = Peripheral.instance.getModems(); open(); discoverNeighbors(); } /** Called when a new package comes in. **/ private function handelIncomming(pack:Package, ?addr:String) { switch pack.type { case Data(_): routeToProto(pack); case DataNoResponse(_): routeToProto(pack); case Response: if (responseBus.exists(pack.msgID)) { responseBus[pack.msgID].invoke(pack); } case RouteDiscover(_) | RouteDiscoverResponse(_): handleRouteDiscover(pack, addr); } } private function newRoutDiscoverPackage():Package { var pack:Package = { type: RouteDiscover(getAllNeighbors()), toID: BRODCAST_PORT, msgID: generateMessageID(), fromID: networkID, data: null, } return pack; } /** Send out discover packages on all modems. **/ private function discoverNeighbors() { for (modem in allModems) { var pack = newRoutDiscoverPackage(); modem.transmit(BRODCAST_PORT, networkID, pack); } } /** Handle incomming packages that involve route discovery. **/ private function handleRouteDiscover(pack:Package, addr:String) { addRoute(pack.fromID, addr); var forward = getAllNeighbors().filter(i -> i == pack.fromID); switch pack.type{ case RouteDiscoverResponse(reachableIDs): for (id in reachableIDs) { addRoute(id, addr); } return; // Dont respond to a response case RouteDiscover(reachableIDs): for (id in reachableIDs) { addRoute(id, addr); } default: throw new Error("Expcted RouteDiscover or RouteDiscoverResponse type"); } // Respond to peer var response:Package = { toID: pack.fromID, fromID: networkID, msgID: pack.msgID, type: RouteDiscoverResponse(forward), data: null } for (reponseModem in allModems.filter(m -> m.addr == addr)) { reponseModem.transmit(pack.fromID, networkID, response); } } /** Called when a route to a client has been disoverd. Its posible to be called multiple times with the same id but different addr. **/ private function addRoute(toID:Int, addr:String) { Log.info("Added new route to " + toID + " via " + addr); routingTable.set(toID, allModems.find(item -> item.addr == addr)); this.onNewNeigborTrigger.trigger(toID); } public function getAllNeighbors():Array { return routingTable.mapi((index, item) -> index); } private function generateMessageID():Int { return Std.random(2147483647); // TODO: better uniqe number } /** Open all wireless and wired modem. **/ private function open() { for (m in allModems) { m.open(networkID); m.open(BRODCAST_PORT); } } /** Send a message. Dont care if its reaches its destination nor it has a response. **/ public function sendAndForget(dest:Int, proto:String, data:Dynamic) { var pack:Package = { toID: dest, fromID: networkID, msgID: generateMessageID(), type: DataNoResponse(proto), data: data } sendRaw(pack); } public function respondTo(pack:Package, data:Dynamic) { if (pack.type.match(DataNoResponse(_))) { Log.warn("Responed to a no response package. Ignoring"); return; } var response = pack.createResponse(data); sendRaw(response); } /** Send to package to the localy register handler based on the proto **/ private function routeToProto(pack:Package) { var proto:String = switch pack.type { case Data(proto): proto; case DataNoResponse(proto): proto; case _: return; } if (!protoHandlers.exists(proto) && protoHandlers[proto] != null) { return; } protoHandlers[proto].invoke(pack); } /** Just send the package to the right modem. **/ private function sendRaw(pack:Package) { if (pack.toID == networkID) { // Loopback handelIncomming(pack); } else { if (routingTable.exists(pack.toID)) { routingTable[pack.toID].transmit(pack.toID, pack.fromID, pack); } else { // Route not found // TODO: forward package or report not reachable } } } /** Send a message and wait for a response. **/ public function sendAndAwait(dest:NetworkID, proto:String, data:Dynamic):Promise { return new Promise((resolve, reject) -> { var pack:Package = { toID: dest, fromID: networkID, msgID: generateMessageID(), type: Data(proto), data: data } var timeout:Timer = null; responseBus[pack.msgID] = ((reponse:Package) -> { resolve(reponse); if (timeout != null) { timeout.cancle(); } }); timeout = new Timer(MESSAGE_TIMEOUT, () -> { responseBus.remove(pack.msgID); reject(new Error(InternalError,"Message timeout")); }); sendRaw(pack); // No callback link for you return null; }); } public function registerProto(proto:String, callback:Callback) { if (protoHandlers.exists(proto)) { // Failed. Handler already exist. // TODO: return error return; } protoHandlers[proto] = callback; } public function removeProto(proto:String) { protoHandlers.remove(proto); } }