pyduino.utils
1from pyduino.paths import PATHS 2import yaml 3from nmap import PortScanner 4import requests 5import numpy as np 6from collections import OrderedDict 7from urllib.parse import urljoin 8import logging 9 10logging.basicConfig(filename='pyduino.log', filemode='w', level=logging.DEBUG) 11 12class bcolors: 13 HEADER = '\033[95m' 14 OKBLUE = '\033[94m' 15 OKCYAN = '\033[96m' 16 OKGREEN = '\033[92m' 17 WARNING = '\033[93m' 18 FAIL = '\033[91m' 19 ENDC = '\033[0m' 20 BOLD = '\033[1m' 21 UNDERLINE = '\033[4m' 22 23def get_param(data, key: str, ids: set = False) -> OrderedDict: 24 25 """ 26 Retrieve a specific parameter from a dictionary of data. 27 28 Parameters: 29 - data: The dictionary containing the data. 30 - key: The key of the parameter to retrieve. 31 - ids: (optional) A set of IDs to filter the data. If not provided, all data will be returned. 32 33 Returns: 34 - An ordered dictionary containing the filtered data. 35 36 """ 37 filtered = OrderedDict(list(map(lambda x: (x[0], x[1][key]), data.items()))) 38 if not ids: 39 return filtered 40 else: 41 return OrderedDict(filter(lambda x: x[0] in ids,filtered.items())) 42 43def yaml_get(filename): 44 """ 45 Loads hyperparameters from a YAML file. 46 """ 47 y = None 48 with open(filename) as f: 49 y = yaml.load(f.read(),yaml.Loader) 50 return y 51 52 53def ReLUP(x): 54 """Computes probability from an array X after passing it through a ReLU unit (negatives are zero). 55 56 Args: 57 x (numpy.array): Input Array 58 """ 59 x_relu = x.copy() 60 x_relu[x_relu<0] = 0 61 62 if x_relu.sum() == 0: 63 return np.ones_like(x_relu)/len(x_relu) 64 else: 65 return x_relu/x_relu.sum() 66 67def get_meta(url): 68 resp = requests.get(urljoin(url, "ping")) 69 if resp.ok : 70 return resp.json() 71 else: 72 logging.error(f"Unable to connect to {url}") 73 raise ConnectionRefusedError(url) 74 75def get_servers( 76 net=PATHS.SYSTEM_PARAMETERS.get("network", "192.168.1.1/24"), 77 port=PATHS.SYSTEM_PARAMETERS.get("port", 5000), 78 exclude=PATHS.SYSTEM_PARAMETERS.get("exclude", None) 79 )->dict: 80 """ 81 Get a dictionary of available servers in the network. 82 83 Args: 84 net (str): The network address range to scan for servers. Default is "192.168.0.1/24". 85 port (str): The port number to scan for servers. Default is "5000". 86 exclude (str): IP addresses to exclude from the scan. Default is None. 87 88 Returns: 89 dict: A dictionary of available servers, where the keys are the server IDs and the values are the server URLs. 90 91 """ 92 logging.debug(f"Searching for devices on {net}:{port}") 93 port_scanner = PortScanner() 94 args = "--open" if exclude is None else f"--open --exclude {exclude}" 95 results = port_scanner.scan(net, str(port), arguments=args, timeout=60) 96 hosts = list(map(lambda x: f"http://{x}:{str(port)}", results["scan"].keys())) 97 servers = {} 98 for host in hosts: 99 try: 100 v = requests.get(host, timeout=2).text == "REACTOR SERVER" 101 meta = get_meta(host) 102 if v: 103 if meta["id"] in servers: 104 logging.warning(f"Duplicate ID found: {meta['id']}") 105 servers[meta["id"]] = host 106 except: 107 pass 108 logging.debug(f"Found {len(servers)} devices") 109 return servers 110 111class TriangleWave: 112 def __init__(self,p_0: float, p_i: float, p_f: float, N: int): 113 """Generates a triangular wave according to the formula: 114 115 Q\left(x\right)=(N-\operatorname{abs}(\operatorname{mod}\left(x,2N\right)-N))\left(\frac{p_{f}-p_{i}}{N}\right)+p_{i} 116 117 Args: 118 p_0 (float): Initial value at n=0 119 p_i (float): Lower bound 120 p_f (float): Upper bound 121 N (int): Steps to reach upper bound 122 """ 123 self.N = N 124 self.p_0 = p_0 125 self.p_i = p_i 126 self.p_f = p_f 127 128 self.a = N*(self.p_0 - self.p_i)/(self.p_f - self.p_i)#Phase factor 129 130 def Q(self,x: int): 131 return (self.N - abs((x%(2*self.N))-self.N))*(self.p_f - self.p_i)/self.N + self.p_i 132 133 def y(self,x: int): 134 return self.Q(x + self.a) 135 136def partition(X: np.ndarray, n: int) -> list: 137 """ 138 Partitions the array `X` in blocks of size `n` except the last. 139 140 Args: 141 X (numpy.array): Input 2D array 142 n (int): Number of partitions 143 144 Returns: 145 list: A list containing the array partitions. 146 """ 147 assert X.ndim == 2, "X must be a matrix" 148 #Number of partitions 149 r = X.shape[0] % n 150 m = X.shape[0] // n + (r > 0) 151 X_enlarged = np.pad(X, ((0, n*m - X.shape[0]), (0,0)), constant_values=0) 152 X_split = np.array_split(X_enlarged, m) 153 if r > 0: 154 X_split[-1] = X_split[-1][:r,:] 155 return X_split
class
bcolors:
def
get_param(data, key: str, ids: set = False) -> collections.OrderedDict:
24def get_param(data, key: str, ids: set = False) -> OrderedDict: 25 26 """ 27 Retrieve a specific parameter from a dictionary of data. 28 29 Parameters: 30 - data: The dictionary containing the data. 31 - key: The key of the parameter to retrieve. 32 - ids: (optional) A set of IDs to filter the data. If not provided, all data will be returned. 33 34 Returns: 35 - An ordered dictionary containing the filtered data. 36 37 """ 38 filtered = OrderedDict(list(map(lambda x: (x[0], x[1][key]), data.items()))) 39 if not ids: 40 return filtered 41 else: 42 return OrderedDict(filter(lambda x: x[0] in ids,filtered.items()))
Retrieve a specific parameter from a dictionary of data.
Parameters:
- data: The dictionary containing the data.
- key: The key of the parameter to retrieve.
- ids: (optional) A set of IDs to filter the data. If not provided, all data will be returned.
Returns:
- An ordered dictionary containing the filtered data.
def
yaml_get(filename):
44def yaml_get(filename): 45 """ 46 Loads hyperparameters from a YAML file. 47 """ 48 y = None 49 with open(filename) as f: 50 y = yaml.load(f.read(),yaml.Loader) 51 return y
Loads hyperparameters from a YAML file.
def
ReLUP(x):
54def ReLUP(x): 55 """Computes probability from an array X after passing it through a ReLU unit (negatives are zero). 56 57 Args: 58 x (numpy.array): Input Array 59 """ 60 x_relu = x.copy() 61 x_relu[x_relu<0] = 0 62 63 if x_relu.sum() == 0: 64 return np.ones_like(x_relu)/len(x_relu) 65 else: 66 return x_relu/x_relu.sum()
Computes probability from an array X after passing it through a ReLU unit (negatives are zero).
Arguments:
- x (numpy.array): Input Array
def
get_meta(url):
def
get_servers(net='192.168.1.1/24', port=5000, exclude=None) -> dict:
76def get_servers( 77 net=PATHS.SYSTEM_PARAMETERS.get("network", "192.168.1.1/24"), 78 port=PATHS.SYSTEM_PARAMETERS.get("port", 5000), 79 exclude=PATHS.SYSTEM_PARAMETERS.get("exclude", None) 80 )->dict: 81 """ 82 Get a dictionary of available servers in the network. 83 84 Args: 85 net (str): The network address range to scan for servers. Default is "192.168.0.1/24". 86 port (str): The port number to scan for servers. Default is "5000". 87 exclude (str): IP addresses to exclude from the scan. Default is None. 88 89 Returns: 90 dict: A dictionary of available servers, where the keys are the server IDs and the values are the server URLs. 91 92 """ 93 logging.debug(f"Searching for devices on {net}:{port}") 94 port_scanner = PortScanner() 95 args = "--open" if exclude is None else f"--open --exclude {exclude}" 96 results = port_scanner.scan(net, str(port), arguments=args, timeout=60) 97 hosts = list(map(lambda x: f"http://{x}:{str(port)}", results["scan"].keys())) 98 servers = {} 99 for host in hosts: 100 try: 101 v = requests.get(host, timeout=2).text == "REACTOR SERVER" 102 meta = get_meta(host) 103 if v: 104 if meta["id"] in servers: 105 logging.warning(f"Duplicate ID found: {meta['id']}") 106 servers[meta["id"]] = host 107 except: 108 pass 109 logging.debug(f"Found {len(servers)} devices") 110 return servers
Get a dictionary of available servers in the network.
Arguments:
- net (str): The network address range to scan for servers. Default is "192.168.0.1/24".
- port (str): The port number to scan for servers. Default is "5000".
- exclude (str): IP addresses to exclude from the scan. Default is None.
Returns:
dict: A dictionary of available servers, where the keys are the server IDs and the values are the server URLs.
class
TriangleWave:
112class TriangleWave: 113 def __init__(self,p_0: float, p_i: float, p_f: float, N: int): 114 """Generates a triangular wave according to the formula: 115 116 Q\left(x\right)=(N-\operatorname{abs}(\operatorname{mod}\left(x,2N\right)-N))\left(\frac{p_{f}-p_{i}}{N}\right)+p_{i} 117 118 Args: 119 p_0 (float): Initial value at n=0 120 p_i (float): Lower bound 121 p_f (float): Upper bound 122 N (int): Steps to reach upper bound 123 """ 124 self.N = N 125 self.p_0 = p_0 126 self.p_i = p_i 127 self.p_f = p_f 128 129 self.a = N*(self.p_0 - self.p_i)/(self.p_f - self.p_i)#Phase factor 130 131 def Q(self,x: int): 132 return (self.N - abs((x%(2*self.N))-self.N))*(self.p_f - self.p_i)/self.N + self.p_i 133 134 def y(self,x: int): 135 return self.Q(x + self.a)
TriangleWave(p_0: float, p_i: float, p_f: float, N: int)
113 def __init__(self,p_0: float, p_i: float, p_f: float, N: int): 114 """Generates a triangular wave according to the formula: 115 116 Q\left(x\right)=(N-\operatorname{abs}(\operatorname{mod}\left(x,2N\right)-N))\left(\frac{p_{f}-p_{i}}{N}\right)+p_{i} 117 118 Args: 119 p_0 (float): Initial value at n=0 120 p_i (float): Lower bound 121 p_f (float): Upper bound 122 N (int): Steps to reach upper bound 123 """ 124 self.N = N 125 self.p_0 = p_0 126 self.p_i = p_i 127 self.p_f = p_f 128 129 self.a = N*(self.p_0 - self.p_i)/(self.p_f - self.p_i)#Phase factor
Generates a triangular wave according to the formula:
Q\left(x ight)=(N-\operatorname{abs}(\operatorname{mod}\left(x,2N ight)-N))\left(rac{p_{f}-p_{i}}{N} ight)+p_{i}
Arguments:
- p_0 (float): Initial value at n=0
- p_i (float): Lower bound
- p_f (float): Upper bound
- N (int): Steps to reach upper bound
def
partition(X: numpy.ndarray, n: int) -> list:
137def partition(X: np.ndarray, n: int) -> list: 138 """ 139 Partitions the array `X` in blocks of size `n` except the last. 140 141 Args: 142 X (numpy.array): Input 2D array 143 n (int): Number of partitions 144 145 Returns: 146 list: A list containing the array partitions. 147 """ 148 assert X.ndim == 2, "X must be a matrix" 149 #Number of partitions 150 r = X.shape[0] % n 151 m = X.shape[0] // n + (r > 0) 152 X_enlarged = np.pad(X, ((0, n*m - X.shape[0]), (0,0)), constant_values=0) 153 X_split = np.array_split(X_enlarged, m) 154 if r > 0: 155 X_split[-1] = X_split[-1][:r,:] 156 return X_split
Partitions the array X
in blocks of size n
except the last.
Arguments:
- X (numpy.array): Input 2D array
- n (int): Number of partitions
Returns:
list: A list containing the array partitions.