pyduino.utils

  1from pyduino.paths import PATHS
  2import yaml
  3from nmap import PortScanner
  4import requests
  5import numpy as np
  6from collections import OrderedDict
  7from urllib.parse import urljoin
  8import logging
  9
 10logging.basicConfig(filename='pyduino.log', filemode='w', level=logging.DEBUG)
 11
 12class bcolors:
 13    HEADER = '\033[95m'
 14    OKBLUE = '\033[94m'
 15    OKCYAN = '\033[96m'
 16    OKGREEN = '\033[92m'
 17    WARNING = '\033[93m'
 18    FAIL = '\033[91m'
 19    ENDC = '\033[0m'
 20    BOLD = '\033[1m'
 21    UNDERLINE = '\033[4m'
 22
 23def get_param(data, key: str, ids: set = False) -> OrderedDict:
 24
 25    """
 26    Retrieve a specific parameter from a dictionary of data.
 27
 28    Parameters:
 29    - data: The dictionary containing the data.
 30    - key: The key of the parameter to retrieve.
 31    - ids: (optional) A set of IDs to filter the data. If not provided, all data will be returned.
 32
 33    Returns:
 34    - An ordered dictionary containing the filtered data.
 35
 36    """
 37    filtered = OrderedDict(list(map(lambda x: (x[0], x[1][key]), data.items())))
 38    if not ids:
 39        return filtered
 40    else:
 41        return OrderedDict(filter(lambda x: x[0] in ids,filtered.items()))
 42
 43def yaml_get(filename):
 44    """
 45    Loads hyperparameters from a YAML file.
 46    """
 47    y = None
 48    with open(filename) as f:
 49        y = yaml.load(f.read(),yaml.Loader)
 50    return y
 51
 52
 53def ReLUP(x):
 54    """Computes probability from an array X after passing it through a ReLU unit (negatives are zero).
 55
 56    Args:
 57        x (numpy.array): Input Array
 58    """
 59    x_relu = x.copy()
 60    x_relu[x_relu<0] = 0
 61
 62    if x_relu.sum() == 0:
 63        return np.ones_like(x_relu)/len(x_relu)
 64    else:
 65        return x_relu/x_relu.sum()
 66
 67def get_meta(url):
 68    resp = requests.get(urljoin(url, "ping"))
 69    if resp.ok :
 70        return resp.json()
 71    else:
 72        logging.error(f"Unable to connect to {url}")
 73        raise ConnectionRefusedError(url)
 74
 75def get_servers(
 76        net=PATHS.SYSTEM_PARAMETERS.get("network", "192.168.1.1/24"),
 77        port=PATHS.SYSTEM_PARAMETERS.get("port", 5000),
 78        exclude=PATHS.SYSTEM_PARAMETERS.get("exclude", None)
 79    )->dict:
 80    """
 81    Get a dictionary of available servers in the network.
 82
 83    Args:
 84        net (str): The network address range to scan for servers. Default is "192.168.0.1/24".
 85        port (str): The port number to scan for servers. Default is "5000".
 86        exclude (str): IP addresses to exclude from the scan. Default is None.
 87
 88    Returns:
 89        dict: A dictionary of available servers, where the keys are the server IDs and the values are the server URLs.
 90
 91    """
 92    logging.debug(f"Searching for devices on {net}:{port}")
 93    port_scanner = PortScanner()
 94    args = "--open" if exclude is None else f"--open --exclude {exclude}"
 95    results = port_scanner.scan(net, str(port), arguments=args, timeout=60)
 96    hosts = list(map(lambda x: f"http://{x}:{str(port)}", results["scan"].keys()))
 97    servers = {}
 98    for host in hosts:
 99        try:
100            v = requests.get(host, timeout=2).text == "REACTOR SERVER"
101            meta = get_meta(host)
102            if v:
103                if meta["id"] in servers:
104                    logging.warning(f"Duplicate ID found: {meta['id']}")
105                servers[meta["id"]] = host
106        except:
107            pass
108    logging.debug(f"Found {len(servers)} devices")
109    return servers
110
111class TriangleWave:
112    def __init__(self,p_0: float, p_i: float, p_f: float, N: int):
113        """Generates a triangular wave according to the formula:
114
115        Q\left(x\right)=(N-\operatorname{abs}(\operatorname{mod}\left(x,2N\right)-N))\left(\frac{p_{f}-p_{i}}{N}\right)+p_{i}
116
117        Args:
118            p_0 (float): Initial value at n=0
119            p_i (float): Lower bound
120            p_f (float): Upper bound
121            N (int): Steps to reach upper bound
122        """
123        self.N = N
124        self.p_0 = p_0
125        self.p_i = p_i
126        self.p_f = p_f
127
128        self.a = N*(self.p_0 - self.p_i)/(self.p_f - self.p_i)#Phase factor
129    
130    def Q(self,x: int):
131        return (self.N - abs((x%(2*self.N))-self.N))*(self.p_f - self.p_i)/self.N + self.p_i
132    
133    def y(self,x: int):
134        return self.Q(x + self.a)
135
136def partition(X: np.ndarray, n: int) -> list:
137    """
138    Partitions the array `X` in blocks of size `n` except the last.
139
140    Args:
141        X (numpy.array): Input 2D array
142        n (int): Number of partitions
143    
144    Returns:
145        list: A list containing the array partitions.
146    """
147    assert X.ndim == 2, "X must be a matrix"
148    #Number of partitions
149    r = X.shape[0] % n
150    m = X.shape[0] // n + (r > 0)
151    X_enlarged = np.pad(X, ((0, n*m - X.shape[0]), (0,0)), constant_values=0)
152    X_split = np.array_split(X_enlarged, m)
153    if r > 0:
154        X_split[-1] = X_split[-1][:r,:]
155    return X_split
class bcolors:
13class bcolors:
14    HEADER = '\033[95m'
15    OKBLUE = '\033[94m'
16    OKCYAN = '\033[96m'
17    OKGREEN = '\033[92m'
18    WARNING = '\033[93m'
19    FAIL = '\033[91m'
20    ENDC = '\033[0m'
21    BOLD = '\033[1m'
22    UNDERLINE = '\033[4m'
HEADER = '\x1b[95m'
OKBLUE = '\x1b[94m'
OKCYAN = '\x1b[96m'
OKGREEN = '\x1b[92m'
WARNING = '\x1b[93m'
FAIL = '\x1b[91m'
ENDC = '\x1b[0m'
BOLD = '\x1b[1m'
UNDERLINE = '\x1b[4m'
def get_param(data, key: str, ids: set = False) -> collections.OrderedDict:
24def get_param(data, key: str, ids: set = False) -> OrderedDict:
25
26    """
27    Retrieve a specific parameter from a dictionary of data.
28
29    Parameters:
30    - data: The dictionary containing the data.
31    - key: The key of the parameter to retrieve.
32    - ids: (optional) A set of IDs to filter the data. If not provided, all data will be returned.
33
34    Returns:
35    - An ordered dictionary containing the filtered data.
36
37    """
38    filtered = OrderedDict(list(map(lambda x: (x[0], x[1][key]), data.items())))
39    if not ids:
40        return filtered
41    else:
42        return OrderedDict(filter(lambda x: x[0] in ids,filtered.items()))

Retrieve a specific parameter from a dictionary of data.

Parameters:

  • data: The dictionary containing the data.
  • key: The key of the parameter to retrieve.
  • ids: (optional) A set of IDs to filter the data. If not provided, all data will be returned.

Returns:

  • An ordered dictionary containing the filtered data.
def yaml_get(filename):
44def yaml_get(filename):
45    """
46    Loads hyperparameters from a YAML file.
47    """
48    y = None
49    with open(filename) as f:
50        y = yaml.load(f.read(),yaml.Loader)
51    return y

Loads hyperparameters from a YAML file.

def ReLUP(x):
54def ReLUP(x):
55    """Computes probability from an array X after passing it through a ReLU unit (negatives are zero).
56
57    Args:
58        x (numpy.array): Input Array
59    """
60    x_relu = x.copy()
61    x_relu[x_relu<0] = 0
62
63    if x_relu.sum() == 0:
64        return np.ones_like(x_relu)/len(x_relu)
65    else:
66        return x_relu/x_relu.sum()

Computes probability from an array X after passing it through a ReLU unit (negatives are zero).

Arguments:
  • x (numpy.array): Input Array
def get_meta(url):
68def get_meta(url):
69    resp = requests.get(urljoin(url, "ping"))
70    if resp.ok :
71        return resp.json()
72    else:
73        logging.error(f"Unable to connect to {url}")
74        raise ConnectionRefusedError(url)
def get_servers(net='192.168.1.1/24', port=5000, exclude=None) -> dict:
 76def get_servers(
 77        net=PATHS.SYSTEM_PARAMETERS.get("network", "192.168.1.1/24"),
 78        port=PATHS.SYSTEM_PARAMETERS.get("port", 5000),
 79        exclude=PATHS.SYSTEM_PARAMETERS.get("exclude", None)
 80    )->dict:
 81    """
 82    Get a dictionary of available servers in the network.
 83
 84    Args:
 85        net (str): The network address range to scan for servers. Default is "192.168.0.1/24".
 86        port (str): The port number to scan for servers. Default is "5000".
 87        exclude (str): IP addresses to exclude from the scan. Default is None.
 88
 89    Returns:
 90        dict: A dictionary of available servers, where the keys are the server IDs and the values are the server URLs.
 91
 92    """
 93    logging.debug(f"Searching for devices on {net}:{port}")
 94    port_scanner = PortScanner()
 95    args = "--open" if exclude is None else f"--open --exclude {exclude}"
 96    results = port_scanner.scan(net, str(port), arguments=args, timeout=60)
 97    hosts = list(map(lambda x: f"http://{x}:{str(port)}", results["scan"].keys()))
 98    servers = {}
 99    for host in hosts:
100        try:
101            v = requests.get(host, timeout=2).text == "REACTOR SERVER"
102            meta = get_meta(host)
103            if v:
104                if meta["id"] in servers:
105                    logging.warning(f"Duplicate ID found: {meta['id']}")
106                servers[meta["id"]] = host
107        except:
108            pass
109    logging.debug(f"Found {len(servers)} devices")
110    return servers

Get a dictionary of available servers in the network.

Arguments:
  • net (str): The network address range to scan for servers. Default is "192.168.0.1/24".
  • port (str): The port number to scan for servers. Default is "5000".
  • exclude (str): IP addresses to exclude from the scan. Default is None.
Returns:

dict: A dictionary of available servers, where the keys are the server IDs and the values are the server URLs.

class TriangleWave:
112class TriangleWave:
113    def __init__(self,p_0: float, p_i: float, p_f: float, N: int):
114        """Generates a triangular wave according to the formula:
115
116        Q\left(x\right)=(N-\operatorname{abs}(\operatorname{mod}\left(x,2N\right)-N))\left(\frac{p_{f}-p_{i}}{N}\right)+p_{i}
117
118        Args:
119            p_0 (float): Initial value at n=0
120            p_i (float): Lower bound
121            p_f (float): Upper bound
122            N (int): Steps to reach upper bound
123        """
124        self.N = N
125        self.p_0 = p_0
126        self.p_i = p_i
127        self.p_f = p_f
128
129        self.a = N*(self.p_0 - self.p_i)/(self.p_f - self.p_i)#Phase factor
130    
131    def Q(self,x: int):
132        return (self.N - abs((x%(2*self.N))-self.N))*(self.p_f - self.p_i)/self.N + self.p_i
133    
134    def y(self,x: int):
135        return self.Q(x + self.a)
TriangleWave(p_0: float, p_i: float, p_f: float, N: int)
113    def __init__(self,p_0: float, p_i: float, p_f: float, N: int):
114        """Generates a triangular wave according to the formula:
115
116        Q\left(x\right)=(N-\operatorname{abs}(\operatorname{mod}\left(x,2N\right)-N))\left(\frac{p_{f}-p_{i}}{N}\right)+p_{i}
117
118        Args:
119            p_0 (float): Initial value at n=0
120            p_i (float): Lower bound
121            p_f (float): Upper bound
122            N (int): Steps to reach upper bound
123        """
124        self.N = N
125        self.p_0 = p_0
126        self.p_i = p_i
127        self.p_f = p_f
128
129        self.a = N*(self.p_0 - self.p_i)/(self.p_f - self.p_i)#Phase factor

Generates a triangular wave according to the formula:

Q\left(x ight)=(N-\operatorname{abs}(\operatorname{mod}\left(x,2N ight)-N))\left( rac{p_{f}-p_{i}}{N} ight)+p_{i}

Arguments:
  • p_0 (float): Initial value at n=0
  • p_i (float): Lower bound
  • p_f (float): Upper bound
  • N (int): Steps to reach upper bound
N
p_0
p_i
p_f
a
def Q(self, x: int):
131    def Q(self,x: int):
132        return (self.N - abs((x%(2*self.N))-self.N))*(self.p_f - self.p_i)/self.N + self.p_i
def y(self, x: int):
134    def y(self,x: int):
135        return self.Q(x + self.a)
def partition(X: numpy.ndarray, n: int) -> list:
137def partition(X: np.ndarray, n: int) -> list:
138    """
139    Partitions the array `X` in blocks of size `n` except the last.
140
141    Args:
142        X (numpy.array): Input 2D array
143        n (int): Number of partitions
144    
145    Returns:
146        list: A list containing the array partitions.
147    """
148    assert X.ndim == 2, "X must be a matrix"
149    #Number of partitions
150    r = X.shape[0] % n
151    m = X.shape[0] // n + (r > 0)
152    X_enlarged = np.pad(X, ((0, n*m - X.shape[0]), (0,0)), constant_values=0)
153    X_split = np.array_split(X_enlarged, m)
154    if r > 0:
155        X_split[-1] = X_split[-1][:r,:]
156    return X_split

Partitions the array X in blocks of size n except the last.

Arguments:
  • X (numpy.array): Input 2D array
  • n (int): Number of partitions
Returns:

list: A list containing the array partitions.