pyduino.pyduino2
1from typing import Union 2import logging 3from time import sleep, time 4import pandas as pd 5from collections import OrderedDict 6from pyduino.log import Log 7import pandas as pd 8from multiprocessing import Pool, Process 9from functools import partial 10import os 11from pyduino.utils import bcolors, get_servers, get_meta 12import requests 13from urllib.parse import urljoin 14import logging 15from datetime import datetime 16from pyduino.paths import PATHS 17 18__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) 19logging.basicConfig( 20 filename='pyduino.log', 21 filemode='w', 22 level=PATHS.SYSTEM_PARAMETERS.get('log_level', logging.INFO), 23) 24 25 26STEP = 1/8 27HEADER_DELAY = 5 28COLN = 48 #Number of columns to parse from Arduino (used for sanity tests) 29CACHEPATH = "cache.csv" 30 31#From https://stackoverflow.com/a/312464/6451772 32def chunks(lst, n): 33 """Yield successive n-sized chunks from lst.""" 34 for i in range(0, len(lst), n): 35 yield lst[i:i + n] 36 37class Reactor: 38 """ 39 Master of HTTP Server to Serial handler. 40 """ 41 42 def __init__(self, url): 43 self.connected = False 44 self.url = url 45 self.meta = get_meta(url) 46 self.id = self.meta["id"] 47 48 def http_get(self,route): 49 return requests.get(urljoin(self.url,route)) 50 51 def http_post(self,route,command,await_response): 52 return requests.post(urljoin(self.url,route),json={ 53 "command": command, 54 "await_response": await_response 55 }) 56 57 def connect(self): 58 """ 59 Starts connection 60 """ 61 resp = self.http_get("connect") 62 if resp.ok: 63 self.connected = True 64 return resp.ok 65 66 def reboot(self,retry_time=5): 67 try: 68 resp = self.http_get("reboot") 69 except requests.ConnectionError: 70 self.connected = False 71 print(bcolors.WARNING,"Rebooting reactor",self.id,"at",self.url,bcolors.ENDC) 72 while not self.connected: 73 try: 74 self.connect() 75 except requests.ConnectionError: 76 print(bcolors.WARNING,"Waiting for reactor",self.id,bcolors.ENDC) 77 sleep(retry_time) 78 79 def reset(self): 80 """ 81 Resets connection. 82 """ 83 resp = self.http_get("reset") 84 return resp.ok 85 86 def close(self): 87 """ 88 Sends 'fim'. 89 """ 90 self.http_post("send","fim",False,0) 91 92 def send(self, msg): 93 """ 94 Sends command and awaits for a response 95 """ 96 resp = self.http_post("send",msg,True) 97 return resp.json()["response"] 98 99 def _send(self, msg): 100 """ 101 Sends command and doesn't await for a response 102 """ 103 resp = self.http_post("send",msg,False) 104 return resp.ok 105 106 def set_in_chunks(self,params,chunksize=4): 107 """ 108 Sets params into chunks. 109 """ 110 ch = chunks(params,chunksize) 111 for chunk in ch: 112 self.set(dict(chunk)) 113 sleep(2) 114 115 def __repr__(self): 116 return f"{bcolors.OKCYAN}<Reactor {self.id} at {self.meta['hostname']}({self.url})>{bcolors.ENDC}" 117 118 def set(self, data=None, **kwargs): 119 """ 120 Reactor.set({"440": 50, "brilho": 100}) 121 """ 122 data = {**(data or {}), **kwargs} 123 args = ",".join(f'{k},{v}' for k, v in data.items()) 124 cmd = f"set({args})" 125 self._send(cmd) 126 127 def horacerta(self): 128 """ 129 Synchronizes Arduino clock with the client computer. 130 """ 131 now = datetime.now() 132 logging.info(f"Set clock on {self.meta['hostname']}") 133 self.set_in_chunks([["ano",now.year],["mes",now.month],["dia",now.day],["hora",now.hour],["minuto",now.minute]],chunksize=2) 134 sleep(2) 135 logging.debug(f"Clock set on {self.meta['hostname']}") 136 self._send("horacerta") 137 138 139def send_wrapper(reactor,command,await_response): 140 id, reactor = reactor 141 if await_response: 142 return (id,reactor.send(command)) 143 else: 144 return (id,reactor._send(command)) 145 146def reboot_wrapper(reactor): 147 id,reactor = reactor 148 reactor.reboot(retry_time=PATHS.SYSTEM_PARAMETERS["reboot_wait_time"]) 149 return True 150 151def set_in_chunks(X): 152 reactor,row,chunksize = X 153 reactor.set_in_chunks(list(row.items()),chunksize) 154 155class ReactorManager: 156 """ 157 A class that manages multiple reactors. 158 159 Attributes: 160 pinged (bool): Indicates if the reactors have been pinged. 161 network (str): The network address of the reactors. 162 port (int): The port number of the reactors. 163 exclude (list): A list of reactors to exclude. 164 reactors (dict): A dictionary of reactor objects. 165 servers (dict): A dictionary of server addresses. 166 header (list): A list of header values. 167 payload (dict): A dictionary of payload values. 168 connected (bool): Indicates if the reactors are connected. 169 log (log): A log object for logging data. 170 171 Methods: 172 __init__(self, include: dict = None): Initializes the ReactorManager object. 173 ids(self) -> list: Returns a list of reactor IDs. 174 send(self, command, await_response=True, **kwargs): Sends a command to the reactors. 175 send_parallel(self, command, delay, await_response=True): Sends a command to the reactors in parallel. 176 set(self, data=None, **kwargs): Sets data on the reactors. 177 get(self, key=None): Gets data from the reactors. 178 connect(self): Connects to the reactors. 179 reset(self): Resets the reactors. 180 reboot(self): Reboots the reactors. 181 horacerta(self): Updates Arduino clocks with the clock of the current system. 182 log_init(self, **kwargs): Creates log directories for each Arduino. 183 dados(self, save_cache=True): Gets data from the Arduinos. 184 log_dados(self, save_cache=True): Logs output of `dados` in CSV format. 185 set_preset_state(self, path="preset_state.csv", sep="\t", chunksize=4, params=PATHS.REACTOR_PARAMETERS, **kwargs): Prepares Arduinos with preset parameters from a CSV file. 186 calibrate(self, deltaT=120, dir="calibrate"): Runs `curva` and dumps the result into txts. 187 """ 188 189 pinged = False 190 def __init__(self, include: dict = None): 191 """ 192 Initializes the ReactorManager object. 193 194 Args: 195 include (dict): A dictionary of reactor IDs and their corresponding server addresses. 196 """ 197 self.network = PATHS.SLAVE_PARAMETERS["network"] 198 self.port = PATHS.SLAVE_PARAMETERS["port"] 199 self.exclude = PATHS.SLAVE_PARAMETERS.get("exclude", None) 200 self.reactors = {} 201 202 if include is None: 203 self.servers = get_servers(self.network,self.port,self.exclude) 204 else: 205 self.servers = include 206 207 for id, host in self.servers.items(): 208 self.reactors[id] = Reactor(host) 209 210 logging.info("Connection completed") 211 212 #Info 213 print("\n".join(map(lambda x: f"Reactor {bcolors.OKCYAN}{x.id}{bcolors.ENDC} at {bcolors.OKCYAN}{x.meta['hostname']}({x.url}){bcolors.ENDC}",self.reactors.values()))) 214 self.header = None 215 self.payload = None 216 217 if PATHS.SYSTEM_PARAMETERS.get("sync_clocks", True): 218 self.horacerta() 219 self.ids = sorted(list(self.reactors.keys())) 220 221 def send(self,command,await_response=True,**kwargs): 222 out = {} 223 for k,r in self.reactors.items(): 224 if await_response: 225 out[k] = r.send(command,**kwargs) 226 else: 227 r._send(command) 228 return out 229 230 def send_parallel(self,command,await_response=True): 231 with Pool(7) as p: 232 out = p.map(partial(send_wrapper,command=command,await_response=await_response),list(self.reactors.items())) 233 return dict(out) 234 235 def set(self, data=None, **kwargs): 236 for k,r in self.reactors.items(): 237 r.set(data=data, **kwargs) 238 def get(self,key=None): 239 for k,r in self.reactors.items(): 240 r.get(key=key) 241 def connect(self): 242 for k,r in self.reactors.items(): 243 r.connect() 244 def reset(self): 245 for k,r in self.reactors.items(): 246 r.reset() 247 self.connected = True 248 def reboot(self): 249 self.connected = False 250 with Pool(7) as p: 251 response = p.map(reboot_wrapper,list(self.reactors.items())) 252 self.connected = True 253 def horacerta(self): 254 """ 255 Updates Arduino clocks with clock of current system. 256 """ 257 print("[INFO]", f"{bcolors.BOLD}Syncronizing clocks{bcolors.ENDC}") 258 for k,r in self.reactors.items(): 259 r.horacerta() 260 261 #Logging 262 263 def log_init(self,**kwargs): 264 """ 265 Creates log directories for each Arduino. 266 267 Args: 268 name (str): Name of the subdirectory in the log folder where the files will be saved. 269 """ 270 self.log = Log(subdir=list(self.reactors.keys()),**kwargs) 271 print(f"Log will be saved on: {bcolors.OKGREEN}{self.log.prefix}{bcolors.ENDC}") 272 273 @property 274 def brilho(self): 275 """ 276 Convenience method to get brilho from reactors. 277 """ 278 out = self.send_parallel(f"get({self.brilho_param.lower()})") 279 out = {k: float(v.strip()) for k,v in out.items()} 280 return out 281 282 def dados(self,save_cache=True): 283 """ 284 Get data from Arduinos. 285 286 Args: 287 save_cache (bool): Whether or not so save a cache file with the last reading with `log.log.cache_data`. 288 """ 289 290 if self.header is None: 291 self.header = list(self.reactors.values())[0].send("cabecalho").split(" ") 292 293 len_empty = None 294 while len_empty!=0: 295 rows = self.send_parallel("dados",await_response=True).items() 296 #Checking if any reactor didn't respond. 297 empty = list(filter(lambda x: x[1] is None,rows)) 298 len_empty = len(empty) 299 if len_empty!=0: 300 #Error treatment in case some reactor fails to respond. 301 empty = list(map(lambda x: x[0],empty)) 302 print(bcolors.FAIL+"[FAIL]","The following reactors didn't respond:"+"\n\t"+"\n\t".join(list(map(str,empty)))) 303 print("Resetting serial") 304 sleep(10) 305 for i in empty: 306 self.reactors[i].reset() 307 self.reactors[i].connect() 308 print("Recovering last state") 309 self.set_preset_state(path=PATHS.INITIAL_STATE_PATH) 310 self.set_preset_state(path=CACHEPATH) 311 sleep(10) 312 print("Done"+bcolors.ENDC) 313 314 rows = dict(map(lambda x: (x[0],OrderedDict(zip(self.header,x[1].split(" ")))),rows)) 315 if save_cache: 316 self.log.cache_data(rows) #Index set to False because ID already exists in rows. 317 return rows 318 319 def log_dados(self,save_cache=True): 320 """ 321 Logs output of `dados` in csv format. 322 323 Args: 324 save_cache (bool): Whether or not so save a cache file with the last reading with `log.log.cache_data`. 325 """ 326 self.garbage() 327 header = list(self.reactors.values())[0].send("cabecalho").split(" ") 328 329 rows = self.send_parallel("dados",delay=13) 330 rows = list(map(lambda x: (x[0],OrderedDict(zip(header,x[1].split(" ")))),rows)) 331 332 for _id,row in rows: 333 self.log.log_rows(rows=[row],subdir=_id) 334 rows = dict(rows) 335 if save_cache: 336 self.log.cache_data(rows) #Index set to False because ID already exists in rows. 337 return rows 338 339 def set_preset_state(self,path="preset_state.csv",sep="\t",chunksize=4, params=PATHS.REACTOR_PARAMETERS, **kwargs): 340 """ 341 Prepare Arduinos with preset parameters from a csv file. 342 Args: 343 path (str): Path to the csv file. 344 chunksize (int): How many to commands to send in a single line. A large value can cause Serial errors. 345 sep (str): Column separator used in the csv. 346 """ 347 df = pd.read_csv(path,sep=sep,index_col='ID',**kwargs) 348 df = df[df.index.isin(self.ids)] 349 df.columns = df.columns.str.lower() #Commands must be sent in lowercase 350 #Dropping empty columns 351 df.dropna(axis=1,inplace=True) 352 if params: 353 cols = list(set(df.columns)&set(params)) 354 df = df.loc[:,cols] 355 #Setting schema 356 schema = set(df.columns)&PATHS.SCHEMA.keys() 357 schema = {k:PATHS.SCHEMA[k] for k in schema} 358 df = df.astype(schema) 359 with Pool(7) as p: 360 p.map(set_in_chunks,map(lambda x: (self.reactors[x[0]],x[1],chunksize),df.to_dict(orient="index").items())) 361 #Saving relevant parameters' values 362 cols = list(set(df.columns)&set(PATHS.RELEVANT_PARAMETERS)) 363 self.preset_state = df.loc[:,cols] 364 self.payload = self.preset_state.to_dict('index').copy() 365 366 def calibrate(self,deltaT=120,dir="calibrate"): 367 """ 368 Runs `curva` and dumps the result into txts. 369 """ 370 if not os.path.exists(dir): 371 os.mkdir(dir) 372 out = {} 373 self.send("curva",await_response=False) 374 sleep(deltaT) 375 for name,reactor in self.reactors.items(): 376 out[name] = reactor._conn.read_until('*** fim da curva dos LEDs ***'.encode('ascii')) 377 with open(os.path.join(dir,f"reator_{name}.txt"),"w") as f: 378 f.write(out[name].decode('ascii')) 379 return out 380 381 def __repr__(self): 382 return f"{bcolors.OKCYAN}<Manager of reactors {''.join(str(self.ids))}>{bcolors.ENDC}" 383 384 385if __name__ == '__main__': 386 r = ReactorManager()
33def chunks(lst, n): 34 """Yield successive n-sized chunks from lst.""" 35 for i in range(0, len(lst), n): 36 yield lst[i:i + n]
Yield successive n-sized chunks from lst.
38class Reactor: 39 """ 40 Master of HTTP Server to Serial handler. 41 """ 42 43 def __init__(self, url): 44 self.connected = False 45 self.url = url 46 self.meta = get_meta(url) 47 self.id = self.meta["id"] 48 49 def http_get(self,route): 50 return requests.get(urljoin(self.url,route)) 51 52 def http_post(self,route,command,await_response): 53 return requests.post(urljoin(self.url,route),json={ 54 "command": command, 55 "await_response": await_response 56 }) 57 58 def connect(self): 59 """ 60 Starts connection 61 """ 62 resp = self.http_get("connect") 63 if resp.ok: 64 self.connected = True 65 return resp.ok 66 67 def reboot(self,retry_time=5): 68 try: 69 resp = self.http_get("reboot") 70 except requests.ConnectionError: 71 self.connected = False 72 print(bcolors.WARNING,"Rebooting reactor",self.id,"at",self.url,bcolors.ENDC) 73 while not self.connected: 74 try: 75 self.connect() 76 except requests.ConnectionError: 77 print(bcolors.WARNING,"Waiting for reactor",self.id,bcolors.ENDC) 78 sleep(retry_time) 79 80 def reset(self): 81 """ 82 Resets connection. 83 """ 84 resp = self.http_get("reset") 85 return resp.ok 86 87 def close(self): 88 """ 89 Sends 'fim'. 90 """ 91 self.http_post("send","fim",False,0) 92 93 def send(self, msg): 94 """ 95 Sends command and awaits for a response 96 """ 97 resp = self.http_post("send",msg,True) 98 return resp.json()["response"] 99 100 def _send(self, msg): 101 """ 102 Sends command and doesn't await for a response 103 """ 104 resp = self.http_post("send",msg,False) 105 return resp.ok 106 107 def set_in_chunks(self,params,chunksize=4): 108 """ 109 Sets params into chunks. 110 """ 111 ch = chunks(params,chunksize) 112 for chunk in ch: 113 self.set(dict(chunk)) 114 sleep(2) 115 116 def __repr__(self): 117 return f"{bcolors.OKCYAN}<Reactor {self.id} at {self.meta['hostname']}({self.url})>{bcolors.ENDC}" 118 119 def set(self, data=None, **kwargs): 120 """ 121 Reactor.set({"440": 50, "brilho": 100}) 122 """ 123 data = {**(data or {}), **kwargs} 124 args = ",".join(f'{k},{v}' for k, v in data.items()) 125 cmd = f"set({args})" 126 self._send(cmd) 127 128 def horacerta(self): 129 """ 130 Synchronizes Arduino clock with the client computer. 131 """ 132 now = datetime.now() 133 logging.info(f"Set clock on {self.meta['hostname']}") 134 self.set_in_chunks([["ano",now.year],["mes",now.month],["dia",now.day],["hora",now.hour],["minuto",now.minute]],chunksize=2) 135 sleep(2) 136 logging.debug(f"Clock set on {self.meta['hostname']}") 137 self._send("horacerta")
Master of HTTP Server to Serial handler.
58 def connect(self): 59 """ 60 Starts connection 61 """ 62 resp = self.http_get("connect") 63 if resp.ok: 64 self.connected = True 65 return resp.ok
Starts connection
67 def reboot(self,retry_time=5): 68 try: 69 resp = self.http_get("reboot") 70 except requests.ConnectionError: 71 self.connected = False 72 print(bcolors.WARNING,"Rebooting reactor",self.id,"at",self.url,bcolors.ENDC) 73 while not self.connected: 74 try: 75 self.connect() 76 except requests.ConnectionError: 77 print(bcolors.WARNING,"Waiting for reactor",self.id,bcolors.ENDC) 78 sleep(retry_time)
80 def reset(self): 81 """ 82 Resets connection. 83 """ 84 resp = self.http_get("reset") 85 return resp.ok
Resets connection.
93 def send(self, msg): 94 """ 95 Sends command and awaits for a response 96 """ 97 resp = self.http_post("send",msg,True) 98 return resp.json()["response"]
Sends command and awaits for a response
107 def set_in_chunks(self,params,chunksize=4): 108 """ 109 Sets params into chunks. 110 """ 111 ch = chunks(params,chunksize) 112 for chunk in ch: 113 self.set(dict(chunk)) 114 sleep(2)
Sets params into chunks.
119 def set(self, data=None, **kwargs): 120 """ 121 Reactor.set({"440": 50, "brilho": 100}) 122 """ 123 data = {**(data or {}), **kwargs} 124 args = ",".join(f'{k},{v}' for k, v in data.items()) 125 cmd = f"set({args})" 126 self._send(cmd)
Reactor.set({"440": 50, "brilho": 100})
128 def horacerta(self): 129 """ 130 Synchronizes Arduino clock with the client computer. 131 """ 132 now = datetime.now() 133 logging.info(f"Set clock on {self.meta['hostname']}") 134 self.set_in_chunks([["ano",now.year],["mes",now.month],["dia",now.day],["hora",now.hour],["minuto",now.minute]],chunksize=2) 135 sleep(2) 136 logging.debug(f"Clock set on {self.meta['hostname']}") 137 self._send("horacerta")
Synchronizes Arduino clock with the client computer.
156class ReactorManager: 157 """ 158 A class that manages multiple reactors. 159 160 Attributes: 161 pinged (bool): Indicates if the reactors have been pinged. 162 network (str): The network address of the reactors. 163 port (int): The port number of the reactors. 164 exclude (list): A list of reactors to exclude. 165 reactors (dict): A dictionary of reactor objects. 166 servers (dict): A dictionary of server addresses. 167 header (list): A list of header values. 168 payload (dict): A dictionary of payload values. 169 connected (bool): Indicates if the reactors are connected. 170 log (log): A log object for logging data. 171 172 Methods: 173 __init__(self, include: dict = None): Initializes the ReactorManager object. 174 ids(self) -> list: Returns a list of reactor IDs. 175 send(self, command, await_response=True, **kwargs): Sends a command to the reactors. 176 send_parallel(self, command, delay, await_response=True): Sends a command to the reactors in parallel. 177 set(self, data=None, **kwargs): Sets data on the reactors. 178 get(self, key=None): Gets data from the reactors. 179 connect(self): Connects to the reactors. 180 reset(self): Resets the reactors. 181 reboot(self): Reboots the reactors. 182 horacerta(self): Updates Arduino clocks with the clock of the current system. 183 log_init(self, **kwargs): Creates log directories for each Arduino. 184 dados(self, save_cache=True): Gets data from the Arduinos. 185 log_dados(self, save_cache=True): Logs output of `dados` in CSV format. 186 set_preset_state(self, path="preset_state.csv", sep="\t", chunksize=4, params=PATHS.REACTOR_PARAMETERS, **kwargs): Prepares Arduinos with preset parameters from a CSV file. 187 calibrate(self, deltaT=120, dir="calibrate"): Runs `curva` and dumps the result into txts. 188 """ 189 190 pinged = False 191 def __init__(self, include: dict = None): 192 """ 193 Initializes the ReactorManager object. 194 195 Args: 196 include (dict): A dictionary of reactor IDs and their corresponding server addresses. 197 """ 198 self.network = PATHS.SLAVE_PARAMETERS["network"] 199 self.port = PATHS.SLAVE_PARAMETERS["port"] 200 self.exclude = PATHS.SLAVE_PARAMETERS.get("exclude", None) 201 self.reactors = {} 202 203 if include is None: 204 self.servers = get_servers(self.network,self.port,self.exclude) 205 else: 206 self.servers = include 207 208 for id, host in self.servers.items(): 209 self.reactors[id] = Reactor(host) 210 211 logging.info("Connection completed") 212 213 #Info 214 print("\n".join(map(lambda x: f"Reactor {bcolors.OKCYAN}{x.id}{bcolors.ENDC} at {bcolors.OKCYAN}{x.meta['hostname']}({x.url}){bcolors.ENDC}",self.reactors.values()))) 215 self.header = None 216 self.payload = None 217 218 if PATHS.SYSTEM_PARAMETERS.get("sync_clocks", True): 219 self.horacerta() 220 self.ids = sorted(list(self.reactors.keys())) 221 222 def send(self,command,await_response=True,**kwargs): 223 out = {} 224 for k,r in self.reactors.items(): 225 if await_response: 226 out[k] = r.send(command,**kwargs) 227 else: 228 r._send(command) 229 return out 230 231 def send_parallel(self,command,await_response=True): 232 with Pool(7) as p: 233 out = p.map(partial(send_wrapper,command=command,await_response=await_response),list(self.reactors.items())) 234 return dict(out) 235 236 def set(self, data=None, **kwargs): 237 for k,r in self.reactors.items(): 238 r.set(data=data, **kwargs) 239 def get(self,key=None): 240 for k,r in self.reactors.items(): 241 r.get(key=key) 242 def connect(self): 243 for k,r in self.reactors.items(): 244 r.connect() 245 def reset(self): 246 for k,r in self.reactors.items(): 247 r.reset() 248 self.connected = True 249 def reboot(self): 250 self.connected = False 251 with Pool(7) as p: 252 response = p.map(reboot_wrapper,list(self.reactors.items())) 253 self.connected = True 254 def horacerta(self): 255 """ 256 Updates Arduino clocks with clock of current system. 257 """ 258 print("[INFO]", f"{bcolors.BOLD}Syncronizing clocks{bcolors.ENDC}") 259 for k,r in self.reactors.items(): 260 r.horacerta() 261 262 #Logging 263 264 def log_init(self,**kwargs): 265 """ 266 Creates log directories for each Arduino. 267 268 Args: 269 name (str): Name of the subdirectory in the log folder where the files will be saved. 270 """ 271 self.log = Log(subdir=list(self.reactors.keys()),**kwargs) 272 print(f"Log will be saved on: {bcolors.OKGREEN}{self.log.prefix}{bcolors.ENDC}") 273 274 @property 275 def brilho(self): 276 """ 277 Convenience method to get brilho from reactors. 278 """ 279 out = self.send_parallel(f"get({self.brilho_param.lower()})") 280 out = {k: float(v.strip()) for k,v in out.items()} 281 return out 282 283 def dados(self,save_cache=True): 284 """ 285 Get data from Arduinos. 286 287 Args: 288 save_cache (bool): Whether or not so save a cache file with the last reading with `log.log.cache_data`. 289 """ 290 291 if self.header is None: 292 self.header = list(self.reactors.values())[0].send("cabecalho").split(" ") 293 294 len_empty = None 295 while len_empty!=0: 296 rows = self.send_parallel("dados",await_response=True).items() 297 #Checking if any reactor didn't respond. 298 empty = list(filter(lambda x: x[1] is None,rows)) 299 len_empty = len(empty) 300 if len_empty!=0: 301 #Error treatment in case some reactor fails to respond. 302 empty = list(map(lambda x: x[0],empty)) 303 print(bcolors.FAIL+"[FAIL]","The following reactors didn't respond:"+"\n\t"+"\n\t".join(list(map(str,empty)))) 304 print("Resetting serial") 305 sleep(10) 306 for i in empty: 307 self.reactors[i].reset() 308 self.reactors[i].connect() 309 print("Recovering last state") 310 self.set_preset_state(path=PATHS.INITIAL_STATE_PATH) 311 self.set_preset_state(path=CACHEPATH) 312 sleep(10) 313 print("Done"+bcolors.ENDC) 314 315 rows = dict(map(lambda x: (x[0],OrderedDict(zip(self.header,x[1].split(" ")))),rows)) 316 if save_cache: 317 self.log.cache_data(rows) #Index set to False because ID already exists in rows. 318 return rows 319 320 def log_dados(self,save_cache=True): 321 """ 322 Logs output of `dados` in csv format. 323 324 Args: 325 save_cache (bool): Whether or not so save a cache file with the last reading with `log.log.cache_data`. 326 """ 327 self.garbage() 328 header = list(self.reactors.values())[0].send("cabecalho").split(" ") 329 330 rows = self.send_parallel("dados",delay=13) 331 rows = list(map(lambda x: (x[0],OrderedDict(zip(header,x[1].split(" ")))),rows)) 332 333 for _id,row in rows: 334 self.log.log_rows(rows=[row],subdir=_id) 335 rows = dict(rows) 336 if save_cache: 337 self.log.cache_data(rows) #Index set to False because ID already exists in rows. 338 return rows 339 340 def set_preset_state(self,path="preset_state.csv",sep="\t",chunksize=4, params=PATHS.REACTOR_PARAMETERS, **kwargs): 341 """ 342 Prepare Arduinos with preset parameters from a csv file. 343 Args: 344 path (str): Path to the csv file. 345 chunksize (int): How many to commands to send in a single line. A large value can cause Serial errors. 346 sep (str): Column separator used in the csv. 347 """ 348 df = pd.read_csv(path,sep=sep,index_col='ID',**kwargs) 349 df = df[df.index.isin(self.ids)] 350 df.columns = df.columns.str.lower() #Commands must be sent in lowercase 351 #Dropping empty columns 352 df.dropna(axis=1,inplace=True) 353 if params: 354 cols = list(set(df.columns)&set(params)) 355 df = df.loc[:,cols] 356 #Setting schema 357 schema = set(df.columns)&PATHS.SCHEMA.keys() 358 schema = {k:PATHS.SCHEMA[k] for k in schema} 359 df = df.astype(schema) 360 with Pool(7) as p: 361 p.map(set_in_chunks,map(lambda x: (self.reactors[x[0]],x[1],chunksize),df.to_dict(orient="index").items())) 362 #Saving relevant parameters' values 363 cols = list(set(df.columns)&set(PATHS.RELEVANT_PARAMETERS)) 364 self.preset_state = df.loc[:,cols] 365 self.payload = self.preset_state.to_dict('index').copy() 366 367 def calibrate(self,deltaT=120,dir="calibrate"): 368 """ 369 Runs `curva` and dumps the result into txts. 370 """ 371 if not os.path.exists(dir): 372 os.mkdir(dir) 373 out = {} 374 self.send("curva",await_response=False) 375 sleep(deltaT) 376 for name,reactor in self.reactors.items(): 377 out[name] = reactor._conn.read_until('*** fim da curva dos LEDs ***'.encode('ascii')) 378 with open(os.path.join(dir,f"reator_{name}.txt"),"w") as f: 379 f.write(out[name].decode('ascii')) 380 return out 381 382 def __repr__(self): 383 return f"{bcolors.OKCYAN}<Manager of reactors {''.join(str(self.ids))}>{bcolors.ENDC}"
A class that manages multiple reactors.
Attributes:
- pinged (bool): Indicates if the reactors have been pinged.
- network (str): The network address of the reactors.
- port (int): The port number of the reactors.
- exclude (list): A list of reactors to exclude.
- reactors (dict): A dictionary of reactor objects.
- servers (dict): A dictionary of server addresses.
- header (list): A list of header values.
- payload (dict): A dictionary of payload values.
- connected (bool): Indicates if the reactors are connected.
- log (log): A log object for logging data.
Methods:
__init__(self, include: dict = None): Initializes the ReactorManager object. ids(self) -> list: Returns a list of reactor IDs. send(self, command, await_response=True, *kwargs): Sends a command to the reactors. send_parallel(self, command, delay, await_response=True): Sends a command to the reactors in parallel. set(self, data=None, *kwargs): Sets data on the reactors. get(self, key=None): Gets data from the reactors. connect(self): Connects to the reactors. reset(self): Resets the reactors. reboot(self): Reboots the reactors. horacerta(self): Updates Arduino clocks with the clock of the current system. log_init(self, *kwargs): Creates log directories for each Arduino. dados(self, save_cache=True): Gets data from the Arduinos. log_dados(self, save_cache=True): Logs output of
dados
in CSV format. set_preset_state(self, path="preset_state.csv", sep=" ", chunksize=4, params=PATHS.REACTOR_PARAMETERS, *kwargs): Prepares Arduinos with preset parameters from a CSV file. calibrate(self, deltaT=120, dir="calibrate"): Runscurva
and dumps the result into txts.
191 def __init__(self, include: dict = None): 192 """ 193 Initializes the ReactorManager object. 194 195 Args: 196 include (dict): A dictionary of reactor IDs and their corresponding server addresses. 197 """ 198 self.network = PATHS.SLAVE_PARAMETERS["network"] 199 self.port = PATHS.SLAVE_PARAMETERS["port"] 200 self.exclude = PATHS.SLAVE_PARAMETERS.get("exclude", None) 201 self.reactors = {} 202 203 if include is None: 204 self.servers = get_servers(self.network,self.port,self.exclude) 205 else: 206 self.servers = include 207 208 for id, host in self.servers.items(): 209 self.reactors[id] = Reactor(host) 210 211 logging.info("Connection completed") 212 213 #Info 214 print("\n".join(map(lambda x: f"Reactor {bcolors.OKCYAN}{x.id}{bcolors.ENDC} at {bcolors.OKCYAN}{x.meta['hostname']}({x.url}){bcolors.ENDC}",self.reactors.values()))) 215 self.header = None 216 self.payload = None 217 218 if PATHS.SYSTEM_PARAMETERS.get("sync_clocks", True): 219 self.horacerta() 220 self.ids = sorted(list(self.reactors.keys()))
Initializes the ReactorManager object.
Arguments:
- include (dict): A dictionary of reactor IDs and their corresponding server addresses.
254 def horacerta(self): 255 """ 256 Updates Arduino clocks with clock of current system. 257 """ 258 print("[INFO]", f"{bcolors.BOLD}Syncronizing clocks{bcolors.ENDC}") 259 for k,r in self.reactors.items(): 260 r.horacerta()
Updates Arduino clocks with clock of current system.
264 def log_init(self,**kwargs): 265 """ 266 Creates log directories for each Arduino. 267 268 Args: 269 name (str): Name of the subdirectory in the log folder where the files will be saved. 270 """ 271 self.log = Log(subdir=list(self.reactors.keys()),**kwargs) 272 print(f"Log will be saved on: {bcolors.OKGREEN}{self.log.prefix}{bcolors.ENDC}")
Creates log directories for each Arduino.
Arguments:
- name (str): Name of the subdirectory in the log folder where the files will be saved.
274 @property 275 def brilho(self): 276 """ 277 Convenience method to get brilho from reactors. 278 """ 279 out = self.send_parallel(f"get({self.brilho_param.lower()})") 280 out = {k: float(v.strip()) for k,v in out.items()} 281 return out
Convenience method to get brilho from reactors.
283 def dados(self,save_cache=True): 284 """ 285 Get data from Arduinos. 286 287 Args: 288 save_cache (bool): Whether or not so save a cache file with the last reading with `log.log.cache_data`. 289 """ 290 291 if self.header is None: 292 self.header = list(self.reactors.values())[0].send("cabecalho").split(" ") 293 294 len_empty = None 295 while len_empty!=0: 296 rows = self.send_parallel("dados",await_response=True).items() 297 #Checking if any reactor didn't respond. 298 empty = list(filter(lambda x: x[1] is None,rows)) 299 len_empty = len(empty) 300 if len_empty!=0: 301 #Error treatment in case some reactor fails to respond. 302 empty = list(map(lambda x: x[0],empty)) 303 print(bcolors.FAIL+"[FAIL]","The following reactors didn't respond:"+"\n\t"+"\n\t".join(list(map(str,empty)))) 304 print("Resetting serial") 305 sleep(10) 306 for i in empty: 307 self.reactors[i].reset() 308 self.reactors[i].connect() 309 print("Recovering last state") 310 self.set_preset_state(path=PATHS.INITIAL_STATE_PATH) 311 self.set_preset_state(path=CACHEPATH) 312 sleep(10) 313 print("Done"+bcolors.ENDC) 314 315 rows = dict(map(lambda x: (x[0],OrderedDict(zip(self.header,x[1].split(" ")))),rows)) 316 if save_cache: 317 self.log.cache_data(rows) #Index set to False because ID already exists in rows. 318 return rows
Get data from Arduinos.
Arguments:
- save_cache (bool): Whether or not so save a cache file with the last reading with
log.log.cache_data
.
320 def log_dados(self,save_cache=True): 321 """ 322 Logs output of `dados` in csv format. 323 324 Args: 325 save_cache (bool): Whether or not so save a cache file with the last reading with `log.log.cache_data`. 326 """ 327 self.garbage() 328 header = list(self.reactors.values())[0].send("cabecalho").split(" ") 329 330 rows = self.send_parallel("dados",delay=13) 331 rows = list(map(lambda x: (x[0],OrderedDict(zip(header,x[1].split(" ")))),rows)) 332 333 for _id,row in rows: 334 self.log.log_rows(rows=[row],subdir=_id) 335 rows = dict(rows) 336 if save_cache: 337 self.log.cache_data(rows) #Index set to False because ID already exists in rows. 338 return rows
Logs output of dados
in csv format.
Arguments:
- save_cache (bool): Whether or not so save a cache file with the last reading with
log.log.cache_data
.
340 def set_preset_state(self,path="preset_state.csv",sep="\t",chunksize=4, params=PATHS.REACTOR_PARAMETERS, **kwargs): 341 """ 342 Prepare Arduinos with preset parameters from a csv file. 343 Args: 344 path (str): Path to the csv file. 345 chunksize (int): How many to commands to send in a single line. A large value can cause Serial errors. 346 sep (str): Column separator used in the csv. 347 """ 348 df = pd.read_csv(path,sep=sep,index_col='ID',**kwargs) 349 df = df[df.index.isin(self.ids)] 350 df.columns = df.columns.str.lower() #Commands must be sent in lowercase 351 #Dropping empty columns 352 df.dropna(axis=1,inplace=True) 353 if params: 354 cols = list(set(df.columns)&set(params)) 355 df = df.loc[:,cols] 356 #Setting schema 357 schema = set(df.columns)&PATHS.SCHEMA.keys() 358 schema = {k:PATHS.SCHEMA[k] for k in schema} 359 df = df.astype(schema) 360 with Pool(7) as p: 361 p.map(set_in_chunks,map(lambda x: (self.reactors[x[0]],x[1],chunksize),df.to_dict(orient="index").items())) 362 #Saving relevant parameters' values 363 cols = list(set(df.columns)&set(PATHS.RELEVANT_PARAMETERS)) 364 self.preset_state = df.loc[:,cols] 365 self.payload = self.preset_state.to_dict('index').copy()
Prepare Arduinos with preset parameters from a csv file.
Arguments:
- path (str): Path to the csv file.
- chunksize (int): How many to commands to send in a single line. A large value can cause Serial errors.
- sep (str): Column separator used in the csv.
367 def calibrate(self,deltaT=120,dir="calibrate"): 368 """ 369 Runs `curva` and dumps the result into txts. 370 """ 371 if not os.path.exists(dir): 372 os.mkdir(dir) 373 out = {} 374 self.send("curva",await_response=False) 375 sleep(deltaT) 376 for name,reactor in self.reactors.items(): 377 out[name] = reactor._conn.read_until('*** fim da curva dos LEDs ***'.encode('ascii')) 378 with open(os.path.join(dir,f"reator_{name}.txt"),"w") as f: 379 f.write(out[name].decode('ascii')) 380 return out
Runs curva
and dumps the result into txts.