pyduino.pyduino2

  1from typing import Union
  2import logging
  3from time import sleep, time
  4import pandas as pd
  5from collections import OrderedDict
  6from pyduino.log import Log
  7import pandas as pd
  8from multiprocessing import Pool, Process
  9from functools import partial
 10import os
 11from pyduino.utils import bcolors, get_servers, get_meta
 12import requests
 13from urllib.parse import urljoin
 14import logging
 15from datetime import datetime
 16from pyduino.paths import PATHS
 17
 18__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
 19logging.basicConfig(
 20    filename='pyduino.log',
 21    filemode='w',
 22    level=PATHS.SYSTEM_PARAMETERS.get('log_level', logging.INFO),
 23)
 24
 25
 26STEP = 1/8
 27HEADER_DELAY = 5
 28COLN = 48 #Number of columns to parse from Arduino (used for sanity tests)
 29CACHEPATH = "cache.csv"
 30
 31#From https://stackoverflow.com/a/312464/6451772
 32def chunks(lst, n):
 33    """Yield successive n-sized chunks from lst."""
 34    for i in range(0, len(lst), n):
 35        yield lst[i:i + n]
 36
 37class Reactor:
 38    """
 39    Master of HTTP Server to Serial handler. 
 40    """
 41
 42    def __init__(self, url):
 43        self.connected = False
 44        self.url = url
 45        self.meta = get_meta(url)
 46        self.id = self.meta["id"]
 47
 48    def http_get(self,route):
 49        return requests.get(urljoin(self.url,route))
 50    
 51    def http_post(self,route,command,await_response):
 52        return requests.post(urljoin(self.url,route),json={
 53            "command": command,
 54            "await_response": await_response
 55        })
 56
 57    def connect(self):
 58        """
 59        Starts connection
 60        """
 61        resp = self.http_get("connect")
 62        if resp.ok:
 63            self.connected = True
 64        return resp.ok
 65    
 66    def reboot(self,retry_time=5):
 67        try:
 68            resp = self.http_get("reboot")
 69        except requests.ConnectionError:
 70            self.connected = False
 71        print(bcolors.WARNING,"Rebooting reactor",self.id,"at",self.url,bcolors.ENDC)
 72        while not self.connected:
 73            try:
 74                self.connect()
 75            except requests.ConnectionError:
 76                print(bcolors.WARNING,"Waiting for reactor",self.id,bcolors.ENDC)
 77                sleep(retry_time)
 78
 79    def reset(self):
 80        """
 81        Resets connection.
 82        """
 83        resp = self.http_get("reset")
 84        return resp.ok
 85
 86    def close(self):
 87        """
 88        Sends 'fim'.
 89        """
 90        self.http_post("send","fim",False,0)
 91
 92    def send(self, msg):
 93        """
 94        Sends command and awaits for a response
 95        """
 96        resp = self.http_post("send",msg,True)
 97        return resp.json()["response"]
 98
 99    def _send(self, msg):
100        """
101        Sends command and doesn't await for a response
102        """
103        resp = self.http_post("send",msg,False)
104        return resp.ok
105    
106    def set_in_chunks(self,params,chunksize=4):
107        """
108        Sets params into chunks.
109        """
110        ch = chunks(params,chunksize)
111        for chunk in ch:
112            self.set(dict(chunk))
113            sleep(2)
114    
115    def __repr__(self):
116        return f"{bcolors.OKCYAN}<Reactor {self.id} at {self.meta['hostname']}({self.url})>{bcolors.ENDC}"
117
118    def set(self, data=None, **kwargs):
119        """
120        Reactor.set({"440": 50, "brilho": 100})
121        """
122        data = {**(data or {}), **kwargs}
123        args = ",".join(f'{k},{v}' for k, v in data.items())
124        cmd = f"set({args})"
125        self._send(cmd)
126    
127    def horacerta(self):
128        """
129        Synchronizes Arduino clock with the client computer.
130        """
131        now = datetime.now()
132        logging.info(f"Set clock on {self.meta['hostname']}")
133        self.set_in_chunks([["ano",now.year],["mes",now.month],["dia",now.day],["hora",now.hour],["minuto",now.minute]],chunksize=2)
134        sleep(2)
135        logging.debug(f"Clock set on {self.meta['hostname']}")
136        self._send("horacerta")
137
138
139def send_wrapper(reactor,command,await_response):
140    id, reactor = reactor
141    if await_response:
142        return (id,reactor.send(command))
143    else:
144        return (id,reactor._send(command))
145
146def reboot_wrapper(reactor):
147    id,reactor = reactor
148    reactor.reboot(retry_time=PATHS.SYSTEM_PARAMETERS["reboot_wait_time"])
149    return True
150
151def set_in_chunks(X):
152    reactor,row,chunksize = X
153    reactor.set_in_chunks(list(row.items()),chunksize)
154
155class ReactorManager:
156    """
157    A class that manages multiple reactors.
158
159    Attributes:
160        pinged (bool): Indicates if the reactors have been pinged.
161        network (str): The network address of the reactors.
162        port (int): The port number of the reactors.
163        exclude (list): A list of reactors to exclude.
164        reactors (dict): A dictionary of reactor objects.
165        servers (dict): A dictionary of server addresses.
166        header (list): A list of header values.
167        payload (dict): A dictionary of payload values.
168        connected (bool): Indicates if the reactors are connected.
169        log (log): A log object for logging data.
170
171    Methods:
172        __init__(self, include: dict = None): Initializes the ReactorManager object.
173        ids(self) -> list: Returns a list of reactor IDs.
174        send(self, command, await_response=True, **kwargs): Sends a command to the reactors.
175        send_parallel(self, command, delay, await_response=True): Sends a command to the reactors in parallel.
176        set(self, data=None, **kwargs): Sets data on the reactors.
177        get(self, key=None): Gets data from the reactors.
178        connect(self): Connects to the reactors.
179        reset(self): Resets the reactors.
180        reboot(self): Reboots the reactors.
181        horacerta(self): Updates Arduino clocks with the clock of the current system.
182        log_init(self, **kwargs): Creates log directories for each Arduino.
183        dados(self, save_cache=True): Gets data from the Arduinos.
184        log_dados(self, save_cache=True): Logs output of `dados` in CSV format.
185        set_preset_state(self, path="preset_state.csv", sep="\t", chunksize=4, params=PATHS.REACTOR_PARAMETERS, **kwargs): Prepares Arduinos with preset parameters from a CSV file.
186        calibrate(self, deltaT=120, dir="calibrate"): Runs `curva` and dumps the result into txts.
187    """
188    
189    pinged = False
190    def __init__(self, include: dict = None):
191        """
192        Initializes the ReactorManager object.
193
194        Args:
195            include (dict): A dictionary of reactor IDs and their corresponding server addresses.
196        """
197        self.network = PATHS.SLAVE_PARAMETERS["network"]
198        self.port = PATHS.SLAVE_PARAMETERS["port"]
199        self.exclude = PATHS.SLAVE_PARAMETERS.get("exclude", None)
200        self.reactors = {}
201
202        if include is None:
203            self.servers = get_servers(self.network,self.port,self.exclude)
204        else:
205            self.servers = include
206
207        for id, host in self.servers.items():
208            self.reactors[id] = Reactor(host)
209        
210        logging.info("Connection completed")
211
212        #Info
213        print("\n".join(map(lambda x: f"Reactor {bcolors.OKCYAN}{x.id}{bcolors.ENDC} at {bcolors.OKCYAN}{x.meta['hostname']}({x.url}){bcolors.ENDC}",self.reactors.values())))
214        self.header = None
215        self.payload = None
216
217        if PATHS.SYSTEM_PARAMETERS.get("sync_clocks", True):
218            self.horacerta()
219        self.ids = sorted(list(self.reactors.keys()))
220
221    def send(self,command,await_response=True,**kwargs):
222        out = {}
223        for k,r in self.reactors.items():
224            if await_response:
225                out[k] = r.send(command,**kwargs)
226            else:
227                r._send(command)
228        return out
229    
230    def send_parallel(self,command,await_response=True):
231        with Pool(7) as p:
232            out = p.map(partial(send_wrapper,command=command,await_response=await_response),list(self.reactors.items()))
233        return dict(out)
234
235    def set(self, data=None, **kwargs):
236        for k,r in self.reactors.items():
237            r.set(data=data, **kwargs)
238    def get(self,key=None):
239        for k,r in self.reactors.items():
240            r.get(key=key)
241    def connect(self):
242        for k,r in self.reactors.items():
243            r.connect()
244    def reset(self):
245        for k,r in self.reactors.items():
246            r.reset()
247        self.connected = True
248    def reboot(self):
249        self.connected = False
250        with Pool(7) as p:
251            response = p.map(reboot_wrapper,list(self.reactors.items()))
252        self.connected = True
253    def horacerta(self):
254        """
255        Updates Arduino clocks with clock of current system.
256        """
257        print("[INFO]", f"{bcolors.BOLD}Syncronizing clocks{bcolors.ENDC}")
258        for k,r in self.reactors.items():
259            r.horacerta()
260
261    #Logging
262
263    def log_init(self,**kwargs):
264        """
265        Creates log directories for each Arduino.
266
267        Args:
268            name (str): Name of the subdirectory in the log folder where the files will be saved.
269        """
270        self.log = Log(subdir=list(self.reactors.keys()),**kwargs)
271        print(f"Log will be saved on: {bcolors.OKGREEN}{self.log.prefix}{bcolors.ENDC}")
272    
273    @property
274    def brilho(self):
275        """
276        Convenience method to get brilho from reactors.
277        """
278        out = self.send_parallel(f"get({self.brilho_param.lower()})")
279        out = {k: float(v.strip()) for k,v in out.items()}
280        return out
281
282    def dados(self,save_cache=True):
283        """
284        Get data from Arduinos.
285
286        Args:
287            save_cache (bool): Whether or not so save a cache file with the last reading with `log.log.cache_data`.
288        """
289
290        if self.header is None:
291            self.header = list(self.reactors.values())[0].send("cabecalho").split(" ")
292
293        len_empty = None
294        while len_empty!=0:
295            rows = self.send_parallel("dados",await_response=True).items()
296            #Checking if any reactor didn't respond.
297            empty = list(filter(lambda x: x[1] is None,rows))
298            len_empty = len(empty)
299            if len_empty!=0:
300                #Error treatment in case some reactor fails to respond.
301                empty = list(map(lambda x: x[0],empty))
302                print(bcolors.FAIL+"[FAIL]","The following reactors didn't respond:"+"\n\t"+"\n\t".join(list(map(str,empty))))
303                print("Resetting serial")
304                sleep(10)
305                for i in empty:
306                    self.reactors[i].reset()
307                    self.reactors[i].connect()
308                print("Recovering last state")
309                self.set_preset_state(path=PATHS.INITIAL_STATE_PATH)
310                self.set_preset_state(path=CACHEPATH)
311                sleep(10)
312                print("Done"+bcolors.ENDC)
313
314        rows = dict(map(lambda x: (x[0],OrderedDict(zip(self.header,x[1].split(" ")))),rows))
315        if save_cache:
316            self.log.cache_data(rows) #Index set to False because ID already exists in rows.
317        return rows
318
319    def log_dados(self,save_cache=True):
320        """
321        Logs output of `dados` in csv format.
322
323        Args:
324            save_cache (bool): Whether or not so save a cache file with the last reading with `log.log.cache_data`.
325        """
326        self.garbage()
327        header = list(self.reactors.values())[0].send("cabecalho").split(" ")
328
329        rows = self.send_parallel("dados",delay=13)
330        rows = list(map(lambda x: (x[0],OrderedDict(zip(header,x[1].split(" ")))),rows))
331
332        for _id,row in rows:
333            self.log.log_rows(rows=[row],subdir=_id)
334        rows = dict(rows)
335        if save_cache:
336            self.log.cache_data(rows) #Index set to False because ID already exists in rows.
337        return rows
338    
339    def set_preset_state(self,path="preset_state.csv",sep="\t",chunksize=4, params=PATHS.REACTOR_PARAMETERS, **kwargs):
340        """
341        Prepare Arduinos with preset parameters from a csv file.
342        Args:
343            path (str): Path to the csv file.
344            chunksize (int): How many to commands to send in a single line. A large value can cause Serial errors.
345            sep (str): Column separator used in the csv.
346        """
347        df = pd.read_csv(path,sep=sep,index_col='ID',**kwargs)
348        df = df[df.index.isin(self.ids)]
349        df.columns = df.columns.str.lower() #Commands must be sent in lowercase
350        #Dropping empty columns
351        df.dropna(axis=1,inplace=True)
352        if params:
353            cols = list(set(df.columns)&set(params))
354            df = df.loc[:,cols]
355        #Setting schema
356        schema = set(df.columns)&PATHS.SCHEMA.keys()
357        schema = {k:PATHS.SCHEMA[k] for k in schema}
358        df = df.astype(schema)
359        with Pool(7) as p:
360            p.map(set_in_chunks,map(lambda x: (self.reactors[x[0]],x[1],chunksize),df.to_dict(orient="index").items()))
361        #Saving relevant parameters' values
362        cols = list(set(df.columns)&set(PATHS.RELEVANT_PARAMETERS))
363        self.preset_state = df.loc[:,cols]
364        self.payload = self.preset_state.to_dict('index').copy()
365
366    def calibrate(self,deltaT=120,dir="calibrate"):
367        """
368        Runs `curva` and dumps the result into txts.
369        """
370        if not os.path.exists(dir):
371            os.mkdir(dir)
372        out = {}
373        self.send("curva",await_response=False)
374        sleep(deltaT)
375        for name,reactor in self.reactors.items():
376            out[name] = reactor._conn.read_until('*** fim da curva dos LEDs ***'.encode('ascii'))
377            with open(os.path.join(dir,f"reator_{name}.txt"),"w") as f:
378                f.write(out[name].decode('ascii'))
379        return out
380    
381    def __repr__(self):
382        return f"{bcolors.OKCYAN}<Manager of reactors {''.join(str(self.ids))}>{bcolors.ENDC}"
383
384
385if __name__ == '__main__':
386    r = ReactorManager()
STEP = 0.125
HEADER_DELAY = 5
COLN = 48
CACHEPATH = 'cache.csv'
def chunks(lst, n):
33def chunks(lst, n):
34    """Yield successive n-sized chunks from lst."""
35    for i in range(0, len(lst), n):
36        yield lst[i:i + n]

Yield successive n-sized chunks from lst.

class Reactor:
 38class Reactor:
 39    """
 40    Master of HTTP Server to Serial handler. 
 41    """
 42
 43    def __init__(self, url):
 44        self.connected = False
 45        self.url = url
 46        self.meta = get_meta(url)
 47        self.id = self.meta["id"]
 48
 49    def http_get(self,route):
 50        return requests.get(urljoin(self.url,route))
 51    
 52    def http_post(self,route,command,await_response):
 53        return requests.post(urljoin(self.url,route),json={
 54            "command": command,
 55            "await_response": await_response
 56        })
 57
 58    def connect(self):
 59        """
 60        Starts connection
 61        """
 62        resp = self.http_get("connect")
 63        if resp.ok:
 64            self.connected = True
 65        return resp.ok
 66    
 67    def reboot(self,retry_time=5):
 68        try:
 69            resp = self.http_get("reboot")
 70        except requests.ConnectionError:
 71            self.connected = False
 72        print(bcolors.WARNING,"Rebooting reactor",self.id,"at",self.url,bcolors.ENDC)
 73        while not self.connected:
 74            try:
 75                self.connect()
 76            except requests.ConnectionError:
 77                print(bcolors.WARNING,"Waiting for reactor",self.id,bcolors.ENDC)
 78                sleep(retry_time)
 79
 80    def reset(self):
 81        """
 82        Resets connection.
 83        """
 84        resp = self.http_get("reset")
 85        return resp.ok
 86
 87    def close(self):
 88        """
 89        Sends 'fim'.
 90        """
 91        self.http_post("send","fim",False,0)
 92
 93    def send(self, msg):
 94        """
 95        Sends command and awaits for a response
 96        """
 97        resp = self.http_post("send",msg,True)
 98        return resp.json()["response"]
 99
100    def _send(self, msg):
101        """
102        Sends command and doesn't await for a response
103        """
104        resp = self.http_post("send",msg,False)
105        return resp.ok
106    
107    def set_in_chunks(self,params,chunksize=4):
108        """
109        Sets params into chunks.
110        """
111        ch = chunks(params,chunksize)
112        for chunk in ch:
113            self.set(dict(chunk))
114            sleep(2)
115    
116    def __repr__(self):
117        return f"{bcolors.OKCYAN}<Reactor {self.id} at {self.meta['hostname']}({self.url})>{bcolors.ENDC}"
118
119    def set(self, data=None, **kwargs):
120        """
121        Reactor.set({"440": 50, "brilho": 100})
122        """
123        data = {**(data or {}), **kwargs}
124        args = ",".join(f'{k},{v}' for k, v in data.items())
125        cmd = f"set({args})"
126        self._send(cmd)
127    
128    def horacerta(self):
129        """
130        Synchronizes Arduino clock with the client computer.
131        """
132        now = datetime.now()
133        logging.info(f"Set clock on {self.meta['hostname']}")
134        self.set_in_chunks([["ano",now.year],["mes",now.month],["dia",now.day],["hora",now.hour],["minuto",now.minute]],chunksize=2)
135        sleep(2)
136        logging.debug(f"Clock set on {self.meta['hostname']}")
137        self._send("horacerta")

Master of HTTP Server to Serial handler.

Reactor(url)
43    def __init__(self, url):
44        self.connected = False
45        self.url = url
46        self.meta = get_meta(url)
47        self.id = self.meta["id"]
connected
url
meta
id
def http_get(self, route):
49    def http_get(self,route):
50        return requests.get(urljoin(self.url,route))
def http_post(self, route, command, await_response):
52    def http_post(self,route,command,await_response):
53        return requests.post(urljoin(self.url,route),json={
54            "command": command,
55            "await_response": await_response
56        })
def connect(self):
58    def connect(self):
59        """
60        Starts connection
61        """
62        resp = self.http_get("connect")
63        if resp.ok:
64            self.connected = True
65        return resp.ok

Starts connection

def reboot(self, retry_time=5):
67    def reboot(self,retry_time=5):
68        try:
69            resp = self.http_get("reboot")
70        except requests.ConnectionError:
71            self.connected = False
72        print(bcolors.WARNING,"Rebooting reactor",self.id,"at",self.url,bcolors.ENDC)
73        while not self.connected:
74            try:
75                self.connect()
76            except requests.ConnectionError:
77                print(bcolors.WARNING,"Waiting for reactor",self.id,bcolors.ENDC)
78                sleep(retry_time)
def reset(self):
80    def reset(self):
81        """
82        Resets connection.
83        """
84        resp = self.http_get("reset")
85        return resp.ok

Resets connection.

def close(self):
87    def close(self):
88        """
89        Sends 'fim'.
90        """
91        self.http_post("send","fim",False,0)

Sends 'fim'.

def send(self, msg):
93    def send(self, msg):
94        """
95        Sends command and awaits for a response
96        """
97        resp = self.http_post("send",msg,True)
98        return resp.json()["response"]

Sends command and awaits for a response

def set_in_chunks(self, params, chunksize=4):
107    def set_in_chunks(self,params,chunksize=4):
108        """
109        Sets params into chunks.
110        """
111        ch = chunks(params,chunksize)
112        for chunk in ch:
113            self.set(dict(chunk))
114            sleep(2)

Sets params into chunks.

def set(self, data=None, **kwargs):
119    def set(self, data=None, **kwargs):
120        """
121        Reactor.set({"440": 50, "brilho": 100})
122        """
123        data = {**(data or {}), **kwargs}
124        args = ",".join(f'{k},{v}' for k, v in data.items())
125        cmd = f"set({args})"
126        self._send(cmd)

Reactor.set({"440": 50, "brilho": 100})

def horacerta(self):
128    def horacerta(self):
129        """
130        Synchronizes Arduino clock with the client computer.
131        """
132        now = datetime.now()
133        logging.info(f"Set clock on {self.meta['hostname']}")
134        self.set_in_chunks([["ano",now.year],["mes",now.month],["dia",now.day],["hora",now.hour],["minuto",now.minute]],chunksize=2)
135        sleep(2)
136        logging.debug(f"Clock set on {self.meta['hostname']}")
137        self._send("horacerta")

Synchronizes Arduino clock with the client computer.

def send_wrapper(reactor, command, await_response):
140def send_wrapper(reactor,command,await_response):
141    id, reactor = reactor
142    if await_response:
143        return (id,reactor.send(command))
144    else:
145        return (id,reactor._send(command))
def reboot_wrapper(reactor):
147def reboot_wrapper(reactor):
148    id,reactor = reactor
149    reactor.reboot(retry_time=PATHS.SYSTEM_PARAMETERS["reboot_wait_time"])
150    return True
def set_in_chunks(X):
152def set_in_chunks(X):
153    reactor,row,chunksize = X
154    reactor.set_in_chunks(list(row.items()),chunksize)
class ReactorManager:
156class ReactorManager:
157    """
158    A class that manages multiple reactors.
159
160    Attributes:
161        pinged (bool): Indicates if the reactors have been pinged.
162        network (str): The network address of the reactors.
163        port (int): The port number of the reactors.
164        exclude (list): A list of reactors to exclude.
165        reactors (dict): A dictionary of reactor objects.
166        servers (dict): A dictionary of server addresses.
167        header (list): A list of header values.
168        payload (dict): A dictionary of payload values.
169        connected (bool): Indicates if the reactors are connected.
170        log (log): A log object for logging data.
171
172    Methods:
173        __init__(self, include: dict = None): Initializes the ReactorManager object.
174        ids(self) -> list: Returns a list of reactor IDs.
175        send(self, command, await_response=True, **kwargs): Sends a command to the reactors.
176        send_parallel(self, command, delay, await_response=True): Sends a command to the reactors in parallel.
177        set(self, data=None, **kwargs): Sets data on the reactors.
178        get(self, key=None): Gets data from the reactors.
179        connect(self): Connects to the reactors.
180        reset(self): Resets the reactors.
181        reboot(self): Reboots the reactors.
182        horacerta(self): Updates Arduino clocks with the clock of the current system.
183        log_init(self, **kwargs): Creates log directories for each Arduino.
184        dados(self, save_cache=True): Gets data from the Arduinos.
185        log_dados(self, save_cache=True): Logs output of `dados` in CSV format.
186        set_preset_state(self, path="preset_state.csv", sep="\t", chunksize=4, params=PATHS.REACTOR_PARAMETERS, **kwargs): Prepares Arduinos with preset parameters from a CSV file.
187        calibrate(self, deltaT=120, dir="calibrate"): Runs `curva` and dumps the result into txts.
188    """
189    
190    pinged = False
191    def __init__(self, include: dict = None):
192        """
193        Initializes the ReactorManager object.
194
195        Args:
196            include (dict): A dictionary of reactor IDs and their corresponding server addresses.
197        """
198        self.network = PATHS.SLAVE_PARAMETERS["network"]
199        self.port = PATHS.SLAVE_PARAMETERS["port"]
200        self.exclude = PATHS.SLAVE_PARAMETERS.get("exclude", None)
201        self.reactors = {}
202
203        if include is None:
204            self.servers = get_servers(self.network,self.port,self.exclude)
205        else:
206            self.servers = include
207
208        for id, host in self.servers.items():
209            self.reactors[id] = Reactor(host)
210        
211        logging.info("Connection completed")
212
213        #Info
214        print("\n".join(map(lambda x: f"Reactor {bcolors.OKCYAN}{x.id}{bcolors.ENDC} at {bcolors.OKCYAN}{x.meta['hostname']}({x.url}){bcolors.ENDC}",self.reactors.values())))
215        self.header = None
216        self.payload = None
217
218        if PATHS.SYSTEM_PARAMETERS.get("sync_clocks", True):
219            self.horacerta()
220        self.ids = sorted(list(self.reactors.keys()))
221
222    def send(self,command,await_response=True,**kwargs):
223        out = {}
224        for k,r in self.reactors.items():
225            if await_response:
226                out[k] = r.send(command,**kwargs)
227            else:
228                r._send(command)
229        return out
230    
231    def send_parallel(self,command,await_response=True):
232        with Pool(7) as p:
233            out = p.map(partial(send_wrapper,command=command,await_response=await_response),list(self.reactors.items()))
234        return dict(out)
235
236    def set(self, data=None, **kwargs):
237        for k,r in self.reactors.items():
238            r.set(data=data, **kwargs)
239    def get(self,key=None):
240        for k,r in self.reactors.items():
241            r.get(key=key)
242    def connect(self):
243        for k,r in self.reactors.items():
244            r.connect()
245    def reset(self):
246        for k,r in self.reactors.items():
247            r.reset()
248        self.connected = True
249    def reboot(self):
250        self.connected = False
251        with Pool(7) as p:
252            response = p.map(reboot_wrapper,list(self.reactors.items()))
253        self.connected = True
254    def horacerta(self):
255        """
256        Updates Arduino clocks with clock of current system.
257        """
258        print("[INFO]", f"{bcolors.BOLD}Syncronizing clocks{bcolors.ENDC}")
259        for k,r in self.reactors.items():
260            r.horacerta()
261
262    #Logging
263
264    def log_init(self,**kwargs):
265        """
266        Creates log directories for each Arduino.
267
268        Args:
269            name (str): Name of the subdirectory in the log folder where the files will be saved.
270        """
271        self.log = Log(subdir=list(self.reactors.keys()),**kwargs)
272        print(f"Log will be saved on: {bcolors.OKGREEN}{self.log.prefix}{bcolors.ENDC}")
273    
274    @property
275    def brilho(self):
276        """
277        Convenience method to get brilho from reactors.
278        """
279        out = self.send_parallel(f"get({self.brilho_param.lower()})")
280        out = {k: float(v.strip()) for k,v in out.items()}
281        return out
282
283    def dados(self,save_cache=True):
284        """
285        Get data from Arduinos.
286
287        Args:
288            save_cache (bool): Whether or not so save a cache file with the last reading with `log.log.cache_data`.
289        """
290
291        if self.header is None:
292            self.header = list(self.reactors.values())[0].send("cabecalho").split(" ")
293
294        len_empty = None
295        while len_empty!=0:
296            rows = self.send_parallel("dados",await_response=True).items()
297            #Checking if any reactor didn't respond.
298            empty = list(filter(lambda x: x[1] is None,rows))
299            len_empty = len(empty)
300            if len_empty!=0:
301                #Error treatment in case some reactor fails to respond.
302                empty = list(map(lambda x: x[0],empty))
303                print(bcolors.FAIL+"[FAIL]","The following reactors didn't respond:"+"\n\t"+"\n\t".join(list(map(str,empty))))
304                print("Resetting serial")
305                sleep(10)
306                for i in empty:
307                    self.reactors[i].reset()
308                    self.reactors[i].connect()
309                print("Recovering last state")
310                self.set_preset_state(path=PATHS.INITIAL_STATE_PATH)
311                self.set_preset_state(path=CACHEPATH)
312                sleep(10)
313                print("Done"+bcolors.ENDC)
314
315        rows = dict(map(lambda x: (x[0],OrderedDict(zip(self.header,x[1].split(" ")))),rows))
316        if save_cache:
317            self.log.cache_data(rows) #Index set to False because ID already exists in rows.
318        return rows
319
320    def log_dados(self,save_cache=True):
321        """
322        Logs output of `dados` in csv format.
323
324        Args:
325            save_cache (bool): Whether or not so save a cache file with the last reading with `log.log.cache_data`.
326        """
327        self.garbage()
328        header = list(self.reactors.values())[0].send("cabecalho").split(" ")
329
330        rows = self.send_parallel("dados",delay=13)
331        rows = list(map(lambda x: (x[0],OrderedDict(zip(header,x[1].split(" ")))),rows))
332
333        for _id,row in rows:
334            self.log.log_rows(rows=[row],subdir=_id)
335        rows = dict(rows)
336        if save_cache:
337            self.log.cache_data(rows) #Index set to False because ID already exists in rows.
338        return rows
339    
340    def set_preset_state(self,path="preset_state.csv",sep="\t",chunksize=4, params=PATHS.REACTOR_PARAMETERS, **kwargs):
341        """
342        Prepare Arduinos with preset parameters from a csv file.
343        Args:
344            path (str): Path to the csv file.
345            chunksize (int): How many to commands to send in a single line. A large value can cause Serial errors.
346            sep (str): Column separator used in the csv.
347        """
348        df = pd.read_csv(path,sep=sep,index_col='ID',**kwargs)
349        df = df[df.index.isin(self.ids)]
350        df.columns = df.columns.str.lower() #Commands must be sent in lowercase
351        #Dropping empty columns
352        df.dropna(axis=1,inplace=True)
353        if params:
354            cols = list(set(df.columns)&set(params))
355            df = df.loc[:,cols]
356        #Setting schema
357        schema = set(df.columns)&PATHS.SCHEMA.keys()
358        schema = {k:PATHS.SCHEMA[k] for k in schema}
359        df = df.astype(schema)
360        with Pool(7) as p:
361            p.map(set_in_chunks,map(lambda x: (self.reactors[x[0]],x[1],chunksize),df.to_dict(orient="index").items()))
362        #Saving relevant parameters' values
363        cols = list(set(df.columns)&set(PATHS.RELEVANT_PARAMETERS))
364        self.preset_state = df.loc[:,cols]
365        self.payload = self.preset_state.to_dict('index').copy()
366
367    def calibrate(self,deltaT=120,dir="calibrate"):
368        """
369        Runs `curva` and dumps the result into txts.
370        """
371        if not os.path.exists(dir):
372            os.mkdir(dir)
373        out = {}
374        self.send("curva",await_response=False)
375        sleep(deltaT)
376        for name,reactor in self.reactors.items():
377            out[name] = reactor._conn.read_until('*** fim da curva dos LEDs ***'.encode('ascii'))
378            with open(os.path.join(dir,f"reator_{name}.txt"),"w") as f:
379                f.write(out[name].decode('ascii'))
380        return out
381    
382    def __repr__(self):
383        return f"{bcolors.OKCYAN}<Manager of reactors {''.join(str(self.ids))}>{bcolors.ENDC}"

A class that manages multiple reactors.

Attributes:
  • pinged (bool): Indicates if the reactors have been pinged.
  • network (str): The network address of the reactors.
  • port (int): The port number of the reactors.
  • exclude (list): A list of reactors to exclude.
  • reactors (dict): A dictionary of reactor objects.
  • servers (dict): A dictionary of server addresses.
  • header (list): A list of header values.
  • payload (dict): A dictionary of payload values.
  • connected (bool): Indicates if the reactors are connected.
  • log (log): A log object for logging data.
Methods:

__init__(self, include: dict = None): Initializes the ReactorManager object. ids(self) -> list: Returns a list of reactor IDs. send(self, command, await_response=True, *kwargs): Sends a command to the reactors. send_parallel(self, command, delay, await_response=True): Sends a command to the reactors in parallel. set(self, data=None, *kwargs): Sets data on the reactors. get(self, key=None): Gets data from the reactors. connect(self): Connects to the reactors. reset(self): Resets the reactors. reboot(self): Reboots the reactors. horacerta(self): Updates Arduino clocks with the clock of the current system. log_init(self, *kwargs): Creates log directories for each Arduino. dados(self, save_cache=True): Gets data from the Arduinos. log_dados(self, save_cache=True): Logs output of dados in CSV format. set_preset_state(self, path="preset_state.csv", sep=" ", chunksize=4, params=PATHS.REACTOR_PARAMETERS, *kwargs): Prepares Arduinos with preset parameters from a CSV file. calibrate(self, deltaT=120, dir="calibrate"): Runs curva and dumps the result into txts.

ReactorManager(include: dict = None)
191    def __init__(self, include: dict = None):
192        """
193        Initializes the ReactorManager object.
194
195        Args:
196            include (dict): A dictionary of reactor IDs and their corresponding server addresses.
197        """
198        self.network = PATHS.SLAVE_PARAMETERS["network"]
199        self.port = PATHS.SLAVE_PARAMETERS["port"]
200        self.exclude = PATHS.SLAVE_PARAMETERS.get("exclude", None)
201        self.reactors = {}
202
203        if include is None:
204            self.servers = get_servers(self.network,self.port,self.exclude)
205        else:
206            self.servers = include
207
208        for id, host in self.servers.items():
209            self.reactors[id] = Reactor(host)
210        
211        logging.info("Connection completed")
212
213        #Info
214        print("\n".join(map(lambda x: f"Reactor {bcolors.OKCYAN}{x.id}{bcolors.ENDC} at {bcolors.OKCYAN}{x.meta['hostname']}({x.url}){bcolors.ENDC}",self.reactors.values())))
215        self.header = None
216        self.payload = None
217
218        if PATHS.SYSTEM_PARAMETERS.get("sync_clocks", True):
219            self.horacerta()
220        self.ids = sorted(list(self.reactors.keys()))

Initializes the ReactorManager object.

Arguments:
  • include (dict): A dictionary of reactor IDs and their corresponding server addresses.
pinged = False
network
port
exclude
reactors
header
payload
ids
def send(self, command, await_response=True, **kwargs):
222    def send(self,command,await_response=True,**kwargs):
223        out = {}
224        for k,r in self.reactors.items():
225            if await_response:
226                out[k] = r.send(command,**kwargs)
227            else:
228                r._send(command)
229        return out
def send_parallel(self, command, await_response=True):
231    def send_parallel(self,command,await_response=True):
232        with Pool(7) as p:
233            out = p.map(partial(send_wrapper,command=command,await_response=await_response),list(self.reactors.items()))
234        return dict(out)
def set(self, data=None, **kwargs):
236    def set(self, data=None, **kwargs):
237        for k,r in self.reactors.items():
238            r.set(data=data, **kwargs)
def get(self, key=None):
239    def get(self,key=None):
240        for k,r in self.reactors.items():
241            r.get(key=key)
def connect(self):
242    def connect(self):
243        for k,r in self.reactors.items():
244            r.connect()
def reset(self):
245    def reset(self):
246        for k,r in self.reactors.items():
247            r.reset()
248        self.connected = True
def reboot(self):
249    def reboot(self):
250        self.connected = False
251        with Pool(7) as p:
252            response = p.map(reboot_wrapper,list(self.reactors.items()))
253        self.connected = True
def horacerta(self):
254    def horacerta(self):
255        """
256        Updates Arduino clocks with clock of current system.
257        """
258        print("[INFO]", f"{bcolors.BOLD}Syncronizing clocks{bcolors.ENDC}")
259        for k,r in self.reactors.items():
260            r.horacerta()

Updates Arduino clocks with clock of current system.

def log_init(self, **kwargs):
264    def log_init(self,**kwargs):
265        """
266        Creates log directories for each Arduino.
267
268        Args:
269            name (str): Name of the subdirectory in the log folder where the files will be saved.
270        """
271        self.log = Log(subdir=list(self.reactors.keys()),**kwargs)
272        print(f"Log will be saved on: {bcolors.OKGREEN}{self.log.prefix}{bcolors.ENDC}")

Creates log directories for each Arduino.

Arguments:
  • name (str): Name of the subdirectory in the log folder where the files will be saved.
brilho
274    @property
275    def brilho(self):
276        """
277        Convenience method to get brilho from reactors.
278        """
279        out = self.send_parallel(f"get({self.brilho_param.lower()})")
280        out = {k: float(v.strip()) for k,v in out.items()}
281        return out

Convenience method to get brilho from reactors.

def dados(self, save_cache=True):
283    def dados(self,save_cache=True):
284        """
285        Get data from Arduinos.
286
287        Args:
288            save_cache (bool): Whether or not so save a cache file with the last reading with `log.log.cache_data`.
289        """
290
291        if self.header is None:
292            self.header = list(self.reactors.values())[0].send("cabecalho").split(" ")
293
294        len_empty = None
295        while len_empty!=0:
296            rows = self.send_parallel("dados",await_response=True).items()
297            #Checking if any reactor didn't respond.
298            empty = list(filter(lambda x: x[1] is None,rows))
299            len_empty = len(empty)
300            if len_empty!=0:
301                #Error treatment in case some reactor fails to respond.
302                empty = list(map(lambda x: x[0],empty))
303                print(bcolors.FAIL+"[FAIL]","The following reactors didn't respond:"+"\n\t"+"\n\t".join(list(map(str,empty))))
304                print("Resetting serial")
305                sleep(10)
306                for i in empty:
307                    self.reactors[i].reset()
308                    self.reactors[i].connect()
309                print("Recovering last state")
310                self.set_preset_state(path=PATHS.INITIAL_STATE_PATH)
311                self.set_preset_state(path=CACHEPATH)
312                sleep(10)
313                print("Done"+bcolors.ENDC)
314
315        rows = dict(map(lambda x: (x[0],OrderedDict(zip(self.header,x[1].split(" ")))),rows))
316        if save_cache:
317            self.log.cache_data(rows) #Index set to False because ID already exists in rows.
318        return rows

Get data from Arduinos.

Arguments:
  • save_cache (bool): Whether or not so save a cache file with the last reading with log.log.cache_data.
def log_dados(self, save_cache=True):
320    def log_dados(self,save_cache=True):
321        """
322        Logs output of `dados` in csv format.
323
324        Args:
325            save_cache (bool): Whether or not so save a cache file with the last reading with `log.log.cache_data`.
326        """
327        self.garbage()
328        header = list(self.reactors.values())[0].send("cabecalho").split(" ")
329
330        rows = self.send_parallel("dados",delay=13)
331        rows = list(map(lambda x: (x[0],OrderedDict(zip(header,x[1].split(" ")))),rows))
332
333        for _id,row in rows:
334            self.log.log_rows(rows=[row],subdir=_id)
335        rows = dict(rows)
336        if save_cache:
337            self.log.cache_data(rows) #Index set to False because ID already exists in rows.
338        return rows

Logs output of dados in csv format.

Arguments:
  • save_cache (bool): Whether or not so save a cache file with the last reading with log.log.cache_data.
def set_preset_state( self, path='preset_state.csv', sep='\t', chunksize=4, params=['branco', 'full', '440', '470', '495', '530', '595', '634', '660', '684', 'cor', 'modopainel', 'brilho', 'bomdia', 'boanoite', 'tau', 'step', 'modotemp', 'temp', 'densidade', 'mododil', 'ar', 'ima', 'modoco2', 'co2', 'dtco2'], **kwargs):
340    def set_preset_state(self,path="preset_state.csv",sep="\t",chunksize=4, params=PATHS.REACTOR_PARAMETERS, **kwargs):
341        """
342        Prepare Arduinos with preset parameters from a csv file.
343        Args:
344            path (str): Path to the csv file.
345            chunksize (int): How many to commands to send in a single line. A large value can cause Serial errors.
346            sep (str): Column separator used in the csv.
347        """
348        df = pd.read_csv(path,sep=sep,index_col='ID',**kwargs)
349        df = df[df.index.isin(self.ids)]
350        df.columns = df.columns.str.lower() #Commands must be sent in lowercase
351        #Dropping empty columns
352        df.dropna(axis=1,inplace=True)
353        if params:
354            cols = list(set(df.columns)&set(params))
355            df = df.loc[:,cols]
356        #Setting schema
357        schema = set(df.columns)&PATHS.SCHEMA.keys()
358        schema = {k:PATHS.SCHEMA[k] for k in schema}
359        df = df.astype(schema)
360        with Pool(7) as p:
361            p.map(set_in_chunks,map(lambda x: (self.reactors[x[0]],x[1],chunksize),df.to_dict(orient="index").items()))
362        #Saving relevant parameters' values
363        cols = list(set(df.columns)&set(PATHS.RELEVANT_PARAMETERS))
364        self.preset_state = df.loc[:,cols]
365        self.payload = self.preset_state.to_dict('index').copy()

Prepare Arduinos with preset parameters from a csv file.

Arguments:
  • path (str): Path to the csv file.
  • chunksize (int): How many to commands to send in a single line. A large value can cause Serial errors.
  • sep (str): Column separator used in the csv.
def calibrate(self, deltaT=120, dir='calibrate'):
367    def calibrate(self,deltaT=120,dir="calibrate"):
368        """
369        Runs `curva` and dumps the result into txts.
370        """
371        if not os.path.exists(dir):
372            os.mkdir(dir)
373        out = {}
374        self.send("curva",await_response=False)
375        sleep(deltaT)
376        for name,reactor in self.reactors.items():
377            out[name] = reactor._conn.read_until('*** fim da curva dos LEDs ***'.encode('ascii'))
378            with open(os.path.join(dir,f"reator_{name}.txt"),"w") as f:
379                f.write(out[name].decode('ascii'))
380        return out

Runs curva and dumps the result into txts.