233 lines
5.2 KiB
Haxe
Raw Normal View History

2021-12-20 01:55:30 +01:00
package kernel.net;
import kernel.peripherals.Peripherals.Peripheral;
import kernel.Log;
import kernel.KernelEvents;
import haxe.Exception;
import util.Promise;
import kernel.Timer;
import util.EventBus;
import cc.OS;
using Lambda;
using util.Extender.LambdaExtender;
/**
Class responsible for everything network related.
Used to send and recceive packages.
**/
class Net{
2022-02-21 01:50:19 +01:00
public static var instance:Net;
2021-12-20 01:55:30 +01:00
2022-02-21 01:50:19 +01:00
@:allow(kernel.Init)
private function new () {
2022-02-21 15:17:38 +01:00
KernelEvents.instance.onModemMessage.handle(params ->{
var pack: Package = {
fromID: params.replyChannel,
toID: params.channel,
msgID: params.message.msgID,
type: params.message.type,
data: params.message.data,
};
handelIncomming(pack,params.addr);
2021-12-20 01:55:30 +01:00
});
2022-02-21 15:17:38 +01:00
2021-12-20 01:55:30 +01:00
allModems = Peripheral.instance.getModems();
open();
discoverNeighbors();
}
2022-02-21 01:50:19 +01:00
public static inline final BRODCAST_PORT:Int = 65533;
public static inline final MESSAGE_TIMEOUT:Int = 3;
private var networkID:Int = OS.getComputerID();
private var responseBus: util.EventBus<Package> = new EventBus();
private var protoHandlers: Map<String,Package -> Void> = new Map();
private var allModems:Array<kernel.peripherals.Modem>;
private var routingTable: Map<Int,kernel.peripherals.Modem> = new Map();
2021-12-20 01:55:30 +01:00
private function handelIncomming(pack: Package, ?addr:String) {
switch pack.type {
case Data(_):
routeTo(pack);
case DataNoResponse(_):
routeTo(pack);
case Response | RouteDiscoverResponse:
responseBus.emit(Std.string(pack.msgID),pack);
case RouteDiscover:
handleRoute(pack,addr);
}
}
private function newRoutPackage(): Package {
var pack: Package = {
type: RouteDiscover,
toID: BRODCAST_PORT,
msgID: generateMessageID(),
fromID: networkID,
data: null,
}
return pack;
}
private function discoverNeighbors() {
for (modem in allModems) {
var pack = newRoutPackage();
var timeout: Timer = null;
var responeListner = responseBus.on(Std.string(pack.msgID),pack -> {
addRoute(pack.fromID,modem.addr);
});
timeout = new Timer(MESSAGE_TIMEOUT,() -> {
responseBus.removeListner(responeListner);
});
modem.transmit(BRODCAST_PORT,OS.getComputerID(),pack);
}
}
private function handleRoute(pack: Package, addr: String) {
addRoute(pack.fromID,addr);
// Respond to peer
var response: Package = {
toID: pack.fromID,
fromID: OS.getComputerID(),
msgID: pack.msgID,
type: RouteDiscoverResponse,
data: null
}
for (reponseModem in allModems.filter(m -> m.addr == addr)) {
reponseModem.transmit(pack.fromID,networkID,response);
}
}
private function addRoute(toID: Int,addr: String) {
Log.debug("Added new route to "+toID+" via "+addr);
routingTable.set(toID,allModems.find(item -> item.addr == addr));
}
public function getAllNeighbors(): Array<Int> {
return routingTable.mapi((index, item) -> index);
}
private function generateMessageID(): Int {
return Std.random(2147483647); // TODO: better uniqe number
}
/**
Open all wireless and wired modem.
**/
private function open() {
for (m in allModems) {
m.open(networkID);
m.open(BRODCAST_PORT);
}
}
/**
Send a message. Dont care if its reaches its destination nor it has a response.
**/
public function sendAndForget(dest:Int,proto:String,data: Dynamic){
var pack: Package = {
toID: dest,
fromID: networkID,
msgID: generateMessageID(),
type: DataNoResponse(proto),
data: data
}
sendRaw(pack);
}
public function respondTo(pack: Package,data: Dynamic) {
if (pack.type.match(DataNoResponse(_))){
Log.warn("Responed to a no response package. Ignoring");
return;
}
var response = pack.createResponse(data);
sendRaw(response);
}
private function routeTo(pack: Package) {
var proto: String = switch pack.type {
case Data(proto):
proto;
case DataNoResponse(proto):
proto;
case _:
return;
}
if (!protoHandlers.exists(proto) && protoHandlers[proto] != null){
return;
}
protoHandlers[proto](pack);
}
private function sendRaw(pack: Package){
if (pack.toID == networkID){
// Loopback
handelIncomming(pack);
}else{
if (routingTable.exists(pack.toID)){
routingTable[pack.toID].transmit(pack.toID,pack.fromID,pack);
}else{
// Route not found
// TODO: forward package or report not reachable
}
}
}
public function sendAndAwait(dest: Int,proto:String,data: Dynamic): Promise<Package> {
return new Promise<Package>((resolve, reject) -> {
var pack: Package = {
toID: dest,
fromID: networkID,
msgID: generateMessageID(),
type: Data(proto),
data: data
}
var timeout: Timer = null;
var responeListner = responseBus.once(Std.string(pack.msgID),p -> {
resolve(p);
if (timeout != null){
timeout.cancle();
}
});
timeout = new Timer(MESSAGE_TIMEOUT,() -> {
responseBus.removeListner(responeListner);
reject(new Exception("Timeout"));
});
sendRaw(pack);
});
}
public function registerProto(proto: String,cb: Package -> Void) {
if (protoHandlers.exists(proto)){
// Failed. Handler already exist.
// TODO: return error
return;
}
protoHandlers[proto] = cb;
}
public function removeProto(proto: String) {
protoHandlers.remove(proto);
}
}