pyduino.log

  1import pandas as pd
  2import os
  3from pathlib import Path
  4from datetime import datetime
  5import io
  6from glob import glob
  7from tabulate import tabulate
  8from collections import OrderedDict
  9from datetime import datetime
 10
 11__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
 12config_file = os.path.join(__location__,"config.yaml")
 13
 14def datetime_from_str(x):
 15    return datetime.strptime(str(x),"%Y%m%d%H%M%S")
 16
 17def datetime_to_str(x):
 18    return x.strftime("%Y%m%d%H%M%S")
 19
 20def to_markdown_table(data: OrderedDict) -> str:
 21    """
 22    Converts the given data into a markdown table format.
 23
 24    Args:
 25        data (OrderedDict[OrderedDict]): The data to be converted into a markdown table.
 26
 27    Returns:
 28        str: The markdown table representation of the data.
 29    """
 30    rows = []
 31    for rid, rdata in data.items():
 32        rdata = OrderedDict({"ID": rid, **rdata})
 33        rows.append(rdata)
 34    return tabulate(rows, headers="keys", tablefmt="pipe")
 35
 36def y_to_table(y):
 37    return tabulate(list(y.items()), tablefmt="pipe")
 38
 39class Log:
 40    @property
 41    def timestamp(self):
 42        """str: Current date."""
 43        return datetime.now()
 44    
 45    @property
 46    def prefix(self):
 47        return os.path.join(self.path,self.start_timestamp)
 48
 49    def __init__(self,subdir,path="./log",name=None):
 50        """
 51        Logs data into jsonls with timestamps.
 52
 53        Example:
 54            log_obj = log(['reactor_0','reactor_1'],path='./log',name='experiment_0')
 55
 56            log/YEAR/MONTH/
 57            ├─ experiment_0/
 58            │  ├─ reactor_0.jsonl
 59            │  ├─ reactor_1.jsonl
 60
 61        Args:
 62            subdir (:obj:`list` of :obj:`str`): List of the names for the subdirectories of `path`.
 63            path (str): Save path for the logs.
 64            name (str): Name given for this particular instance. If none will name it with the current timestamp.
 65        """
 66        self.today = datetime.now()
 67        self.path = os.path.join(path, self.today.strftime("%Y"), self.today.strftime("%m"))
 68        self.start_timestamp = datetime_to_str(self.timestamp) if name is None else name
 69        self.log_name = name
 70        Path(os.path.join(self.path,self.start_timestamp)).mkdir(parents=True,exist_ok=True)
 71        if isinstance(subdir,str):
 72            self.subdir = list(map(os.path.basename,glob(os.path.join(self.prefix,subdir))))
 73        elif isinstance(subdir,list):
 74            self.subdir = subdir
 75        else:
 76            raise ValueError("Invalid type for subdir. Must be either a list of strings or a glob string.")
 77        self.subdir = list(map(lambda x: str(x)+".jsonl" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir))
 78        self.first_timestamp = None
 79        self.data_frames = {}
 80
 81        self.paths = list(map(lambda x: os.path.join(self.prefix,x),self.subdir))
 82        self.log_name = name
 83        Path(os.path.join(self.path,self.start_timestamp)).mkdir(parents=True,exist_ok=True)
 84        if isinstance(subdir,str):
 85            self.subdir = list(map(os.path.basename,glob(os.path.join(self.prefix,subdir))))
 86        elif isinstance(subdir,list):
 87            self.subdir = subdir
 88        else:
 89            raise ValueError("Invalid type for subdir. Must be either a list of strings or a glob string.")
 90        self.subdir = list(map(lambda x: str(x)+".jsonl" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir))
 91        self.first_timestamp = None
 92        self.data_frames = {}
 93
 94        self.paths = list(map(lambda x: os.path.join(self.prefix,x),self.subdir))
 95
 96    def backup_config_file(self):
 97        filename = os.path.join(self.path,self.start_timestamp,f"{self.start_timestamp.replace('/','-')}.yaml")
 98        if not os.path.exists(filename):
 99            with open(config_file) as cfile, open(filename,'w') as wfile:
100                wfile.write(cfile.read())
101
102    def log_rows(self,rows,subdir,add_timestamp=True,tags=None):
103        """
104        Logs rows into jsonl format.
105
106        Args:
107            rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows or pandas dataframe.
108            subdir (str): Subdirectory name. Intended to be an element of `self.subdir`.
109            add_timestamp (bool,optional): Whether or not to include a timestamp column.
110            tags (:obj:`dict` of :obj:`str`): Dictionary of strings to be inserted as constant columns.
111        """
112        t = self.timestamp
113        path = os.path.join(self.path,self.start_timestamp,f"{subdir}.jsonl")
114
115        df = pd.DataFrame()
116        if isinstance(rows,list):
117            df = pd.DataFrame(rows)
118        elif isinstance(rows,pd.DataFrame):
119            df = rows.copy()
120        
121        if add_timestamp:
122            df.loc[:,"log_timestamp"] = datetime_to_str(t)
123        if os.path.exists(path):
124            if self.first_timestamp is None:
125                with open(path) as file:
126                    head = pd.read_json(io.StringIO(file.readline()+file.readline()), orient="records", lines=True)
127                    self.first_timestamp = datetime_from_str(head.log_timestamp[0])
128        else:
129            self.first_timestamp = t
130        df.loc[:,"elapsed_time_hours"] = (t - self.first_timestamp).total_seconds()/3600.0
131
132        #Inserting constant values
133        if tags is not None:
134            for key,value in tags.items():
135                df.loc[:,key] = value
136
137        with open(path, mode="a") as log_file:
138            log_file.write(df.to_json(orient="records", lines=True))
139
140        return df
141    def log_many_rows(self,data,**kwargs):
142        """
143        Logs rows into jsonl format.
144
145        Args:
146            data (:obj:`dict` of :obj:`dict`): Dictionary encoded data frame.
147            **kwargs: Additional arguments passed to `self.log_rows`.
148        """
149        self.data_frames = {}
150        for _id,row in data.items():
151            df = self.log_rows(rows=[row],subdir=_id,**kwargs)
152            self.data_frames[_id] = df
153        self.data_frames = pd.concat(list(self.data_frames.values()))
154    
155    def log_optimal(self,column,maximum=True,**kwargs):
156        """
157        Logs optima of all rows into a single file.
158        """
159        i=self.data_frames.loc[:,column].astype(float).argmax() if maximum else self.data_frames.loc[:,column].astype(float).argmin()
160        self.df_opt = self.data_frames.iloc[i,:]
161        self.log_rows(rows=[self.df_opt.to_dict()],subdir='opt',**kwargs)
162    
163    def log_average(self, cols: list, **kwargs):
164        """
165        Calculate the average values of specified columns in the data frames and log the results.
166
167        Parameters:
168        - cols (list): A list of column names to calculate the average for.
169        - **kwargs: Additional keyword arguments to customize the logging process.
170        """
171        df = self.data_frames.copy()
172        df.loc[:, cols] = df.loc[:, cols].astype(float)
173        df.elapsed_time_hours = df.elapsed_time_hours.round(decimals=2)
174        self.df_avg = df.loc[:, cols + ['elapsed_time_hours']].groupby("elapsed_time_hours").mean().reset_index()
175        self.log_rows(rows=self.df_avg, subdir='avg',  **kwargs)
176
177    def cache_data(self,rows,path="./cache.jsonl",**kwargs):
178        """
179        Dumps rows into a single jsonl.
180
181        Args:
182            rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows.
183            path (str): Path to the jsonl file.
184        """
185        pd.DataFrame(rows).T.to_json(path, orient="records", lines=True, **kwargs)
186
187    def transpose(self,columns,destination,skip=1,**kwargs):
188        """
189        Maps reactor jsonl to column jsonls with columns given by columns.
190
191        Args:
192            columns (:obj:list of :obj:str): List of columns to extract.
193            destination (str): Destination path. Creates directories as needed and overwrites any existing files.
194
195            skip (int, optional): How many rows to jump while reading the input files. Defaults to 1.
196        """
197        dfs = []
198        for file in self.paths:
199            df = pd.read_json(file, orient="records", lines=True, **kwargs)
200            df['FILE'] = file
201            dfs.append(df.iloc[::skip,:])
202        df = pd.concat(dfs)
203
204        for column in columns:
205            Path(destination).mkdir(parents=True,exist_ok=True)
206            df.loc[:,['ID','FILE',column,'elapsed_time_hours']].to_json(os.path.join(destination,f"{column}.jsonl"), orient="records", lines=True)
207
208
209class LogAggregator:
210    def __init__(self,log_paths,timestamp_col="log_timestamp",elapsed_time_col="elapsed_time_hours"):
211        """
212        Merges logs from various experiments into a single file for each bioreactor.
213
214        Args:
215            log_paths (:obj:list of :obj:str): List of glob strings pointing at the input files for each experiment.
216            timestamp_col (str, optional): Column to use as timestamp. Defaults to "log_timestamp".
217            elapsed_time_col (str, optional): Columns to use as 'elapsed time'. Defaults to "elapsed_time_hours".
218        """
219        self.glob_list = log_paths
220        self.timestamp_col = timestamp_col
221        self.elapsed_time_col = elapsed_time_col
222    def agg(self,destination,skip=1,**kwargs):
223        """
224        Aggregator
225
226        Args:
227            destination (str): Destination path. Creates directories as needed and overwrites any existing files.
228            skip (int, optional): How many rows to jump while reading the input files. Defaults to 1.
229        """
230        dfs = {}
231        for path in self.glob_list:
232            for file in glob(path):
233                basename = os.path.basename(file)
234                df = pd.read_json(file, orient="records", lines=True, dtype={self.elapsed_time_col:float},**kwargs)
235                df = df.iloc[::skip,:]
236                df['FILE'] = file
237                if dfs.get(basename,None) is not None:
238                    top_timestamp = datetime_from_str(df.head(1)[self.timestamp_col].iloc[0])
239                    bottom_timestamp = datetime_from_str(dfs.get(basename).tail(1)[self.timestamp_col].iloc[0])
240                    bottom_elapsed_time = dfs.get(basename).tail(1)[self.elapsed_time_col].iloc[0]
241                    deltaT = (top_timestamp - bottom_timestamp).total_seconds()/3600.0
242                    print("DeltaT",deltaT)
243                    print(df[self.elapsed_time_col].head())
244                    df[self.elapsed_time_col] = df[self.elapsed_time_col] + deltaT + bottom_elapsed_time
245                    print(df[self.elapsed_time_col].head())
246                    dfs[basename] = pd.concat([dfs[basename],df])
247                else:
248                    dfs[basename] = df
249        for filename, df in dfs.items():
250            Path(destination).mkdir(parents=True,exist_ok=True)
251            path = os.path.join(destination,filename)
252            df.to_json(path, orient="records", lines=True)
config_file = '/home/runner/work/pyduino-parallel/pyduino-parallel/pyduino/config.yaml'
def datetime_from_str(x):
15def datetime_from_str(x):
16    return datetime.strptime(str(x),"%Y%m%d%H%M%S")
def datetime_to_str(x):
18def datetime_to_str(x):
19    return x.strftime("%Y%m%d%H%M%S")
def to_markdown_table(data: collections.OrderedDict) -> str:
21def to_markdown_table(data: OrderedDict) -> str:
22    """
23    Converts the given data into a markdown table format.
24
25    Args:
26        data (OrderedDict[OrderedDict]): The data to be converted into a markdown table.
27
28    Returns:
29        str: The markdown table representation of the data.
30    """
31    rows = []
32    for rid, rdata in data.items():
33        rdata = OrderedDict({"ID": rid, **rdata})
34        rows.append(rdata)
35    return tabulate(rows, headers="keys", tablefmt="pipe")

Converts the given data into a markdown table format.

Arguments:
  • data (OrderedDict[OrderedDict]): The data to be converted into a markdown table.
Returns:

str: The markdown table representation of the data.

def y_to_table(y):
37def y_to_table(y):
38    return tabulate(list(y.items()), tablefmt="pipe")
class Log:
 40class Log:
 41    @property
 42    def timestamp(self):
 43        """str: Current date."""
 44        return datetime.now()
 45    
 46    @property
 47    def prefix(self):
 48        return os.path.join(self.path,self.start_timestamp)
 49
 50    def __init__(self,subdir,path="./log",name=None):
 51        """
 52        Logs data into jsonls with timestamps.
 53
 54        Example:
 55            log_obj = log(['reactor_0','reactor_1'],path='./log',name='experiment_0')
 56
 57            log/YEAR/MONTH/
 58            ├─ experiment_0/
 59            │  ├─ reactor_0.jsonl
 60            │  ├─ reactor_1.jsonl
 61
 62        Args:
 63            subdir (:obj:`list` of :obj:`str`): List of the names for the subdirectories of `path`.
 64            path (str): Save path for the logs.
 65            name (str): Name given for this particular instance. If none will name it with the current timestamp.
 66        """
 67        self.today = datetime.now()
 68        self.path = os.path.join(path, self.today.strftime("%Y"), self.today.strftime("%m"))
 69        self.start_timestamp = datetime_to_str(self.timestamp) if name is None else name
 70        self.log_name = name
 71        Path(os.path.join(self.path,self.start_timestamp)).mkdir(parents=True,exist_ok=True)
 72        if isinstance(subdir,str):
 73            self.subdir = list(map(os.path.basename,glob(os.path.join(self.prefix,subdir))))
 74        elif isinstance(subdir,list):
 75            self.subdir = subdir
 76        else:
 77            raise ValueError("Invalid type for subdir. Must be either a list of strings or a glob string.")
 78        self.subdir = list(map(lambda x: str(x)+".jsonl" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir))
 79        self.first_timestamp = None
 80        self.data_frames = {}
 81
 82        self.paths = list(map(lambda x: os.path.join(self.prefix,x),self.subdir))
 83        self.log_name = name
 84        Path(os.path.join(self.path,self.start_timestamp)).mkdir(parents=True,exist_ok=True)
 85        if isinstance(subdir,str):
 86            self.subdir = list(map(os.path.basename,glob(os.path.join(self.prefix,subdir))))
 87        elif isinstance(subdir,list):
 88            self.subdir = subdir
 89        else:
 90            raise ValueError("Invalid type for subdir. Must be either a list of strings or a glob string.")
 91        self.subdir = list(map(lambda x: str(x)+".jsonl" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir))
 92        self.first_timestamp = None
 93        self.data_frames = {}
 94
 95        self.paths = list(map(lambda x: os.path.join(self.prefix,x),self.subdir))
 96
 97    def backup_config_file(self):
 98        filename = os.path.join(self.path,self.start_timestamp,f"{self.start_timestamp.replace('/','-')}.yaml")
 99        if not os.path.exists(filename):
100            with open(config_file) as cfile, open(filename,'w') as wfile:
101                wfile.write(cfile.read())
102
103    def log_rows(self,rows,subdir,add_timestamp=True,tags=None):
104        """
105        Logs rows into jsonl format.
106
107        Args:
108            rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows or pandas dataframe.
109            subdir (str): Subdirectory name. Intended to be an element of `self.subdir`.
110            add_timestamp (bool,optional): Whether or not to include a timestamp column.
111            tags (:obj:`dict` of :obj:`str`): Dictionary of strings to be inserted as constant columns.
112        """
113        t = self.timestamp
114        path = os.path.join(self.path,self.start_timestamp,f"{subdir}.jsonl")
115
116        df = pd.DataFrame()
117        if isinstance(rows,list):
118            df = pd.DataFrame(rows)
119        elif isinstance(rows,pd.DataFrame):
120            df = rows.copy()
121        
122        if add_timestamp:
123            df.loc[:,"log_timestamp"] = datetime_to_str(t)
124        if os.path.exists(path):
125            if self.first_timestamp is None:
126                with open(path) as file:
127                    head = pd.read_json(io.StringIO(file.readline()+file.readline()), orient="records", lines=True)
128                    self.first_timestamp = datetime_from_str(head.log_timestamp[0])
129        else:
130            self.first_timestamp = t
131        df.loc[:,"elapsed_time_hours"] = (t - self.first_timestamp).total_seconds()/3600.0
132
133        #Inserting constant values
134        if tags is not None:
135            for key,value in tags.items():
136                df.loc[:,key] = value
137
138        with open(path, mode="a") as log_file:
139            log_file.write(df.to_json(orient="records", lines=True))
140
141        return df
142    def log_many_rows(self,data,**kwargs):
143        """
144        Logs rows into jsonl format.
145
146        Args:
147            data (:obj:`dict` of :obj:`dict`): Dictionary encoded data frame.
148            **kwargs: Additional arguments passed to `self.log_rows`.
149        """
150        self.data_frames = {}
151        for _id,row in data.items():
152            df = self.log_rows(rows=[row],subdir=_id,**kwargs)
153            self.data_frames[_id] = df
154        self.data_frames = pd.concat(list(self.data_frames.values()))
155    
156    def log_optimal(self,column,maximum=True,**kwargs):
157        """
158        Logs optima of all rows into a single file.
159        """
160        i=self.data_frames.loc[:,column].astype(float).argmax() if maximum else self.data_frames.loc[:,column].astype(float).argmin()
161        self.df_opt = self.data_frames.iloc[i,:]
162        self.log_rows(rows=[self.df_opt.to_dict()],subdir='opt',**kwargs)
163    
164    def log_average(self, cols: list, **kwargs):
165        """
166        Calculate the average values of specified columns in the data frames and log the results.
167
168        Parameters:
169        - cols (list): A list of column names to calculate the average for.
170        - **kwargs: Additional keyword arguments to customize the logging process.
171        """
172        df = self.data_frames.copy()
173        df.loc[:, cols] = df.loc[:, cols].astype(float)
174        df.elapsed_time_hours = df.elapsed_time_hours.round(decimals=2)
175        self.df_avg = df.loc[:, cols + ['elapsed_time_hours']].groupby("elapsed_time_hours").mean().reset_index()
176        self.log_rows(rows=self.df_avg, subdir='avg',  **kwargs)
177
178    def cache_data(self,rows,path="./cache.jsonl",**kwargs):
179        """
180        Dumps rows into a single jsonl.
181
182        Args:
183            rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows.
184            path (str): Path to the jsonl file.
185        """
186        pd.DataFrame(rows).T.to_json(path, orient="records", lines=True, **kwargs)
187
188    def transpose(self,columns,destination,skip=1,**kwargs):
189        """
190        Maps reactor jsonl to column jsonls with columns given by columns.
191
192        Args:
193            columns (:obj:list of :obj:str): List of columns to extract.
194            destination (str): Destination path. Creates directories as needed and overwrites any existing files.
195
196            skip (int, optional): How many rows to jump while reading the input files. Defaults to 1.
197        """
198        dfs = []
199        for file in self.paths:
200            df = pd.read_json(file, orient="records", lines=True, **kwargs)
201            df['FILE'] = file
202            dfs.append(df.iloc[::skip,:])
203        df = pd.concat(dfs)
204
205        for column in columns:
206            Path(destination).mkdir(parents=True,exist_ok=True)
207            df.loc[:,['ID','FILE',column,'elapsed_time_hours']].to_json(os.path.join(destination,f"{column}.jsonl"), orient="records", lines=True)
Log(subdir, path='./log', name=None)
50    def __init__(self,subdir,path="./log",name=None):
51        """
52        Logs data into jsonls with timestamps.
53
54        Example:
55            log_obj = log(['reactor_0','reactor_1'],path='./log',name='experiment_0')
56
57            log/YEAR/MONTH/
58            ├─ experiment_0/
59            │  ├─ reactor_0.jsonl
60            │  ├─ reactor_1.jsonl
61
62        Args:
63            subdir (:obj:`list` of :obj:`str`): List of the names for the subdirectories of `path`.
64            path (str): Save path for the logs.
65            name (str): Name given for this particular instance. If none will name it with the current timestamp.
66        """
67        self.today = datetime.now()
68        self.path = os.path.join(path, self.today.strftime("%Y"), self.today.strftime("%m"))
69        self.start_timestamp = datetime_to_str(self.timestamp) if name is None else name
70        self.log_name = name
71        Path(os.path.join(self.path,self.start_timestamp)).mkdir(parents=True,exist_ok=True)
72        if isinstance(subdir,str):
73            self.subdir = list(map(os.path.basename,glob(os.path.join(self.prefix,subdir))))
74        elif isinstance(subdir,list):
75            self.subdir = subdir
76        else:
77            raise ValueError("Invalid type for subdir. Must be either a list of strings or a glob string.")
78        self.subdir = list(map(lambda x: str(x)+".jsonl" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir))
79        self.first_timestamp = None
80        self.data_frames = {}
81
82        self.paths = list(map(lambda x: os.path.join(self.prefix,x),self.subdir))
83        self.log_name = name
84        Path(os.path.join(self.path,self.start_timestamp)).mkdir(parents=True,exist_ok=True)
85        if isinstance(subdir,str):
86            self.subdir = list(map(os.path.basename,glob(os.path.join(self.prefix,subdir))))
87        elif isinstance(subdir,list):
88            self.subdir = subdir
89        else:
90            raise ValueError("Invalid type for subdir. Must be either a list of strings or a glob string.")
91        self.subdir = list(map(lambda x: str(x)+".jsonl" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir))
92        self.first_timestamp = None
93        self.data_frames = {}
94
95        self.paths = list(map(lambda x: os.path.join(self.prefix,x),self.subdir))

Logs data into jsonls with timestamps.

Example:

log_obj = log(['reactor_0','reactor_1'],path='./log',name='experiment_0')

log/YEAR/MONTH/ ├─ experiment_0/ │ ├─ reactor_0.jsonl │ ├─ reactor_1.jsonl

Arguments:
  • subdir (list of str): List of the names for the subdirectories of path.
  • path (str): Save path for the logs.
  • name (str): Name given for this particular instance. If none will name it with the current timestamp.
timestamp
41    @property
42    def timestamp(self):
43        """str: Current date."""
44        return datetime.now()

str: Current date.

prefix
46    @property
47    def prefix(self):
48        return os.path.join(self.path,self.start_timestamp)
today
path
start_timestamp
log_name
subdir
first_timestamp
data_frames
paths
def backup_config_file(self):
 97    def backup_config_file(self):
 98        filename = os.path.join(self.path,self.start_timestamp,f"{self.start_timestamp.replace('/','-')}.yaml")
 99        if not os.path.exists(filename):
100            with open(config_file) as cfile, open(filename,'w') as wfile:
101                wfile.write(cfile.read())
def log_rows(self, rows, subdir, add_timestamp=True, tags=None):
103    def log_rows(self,rows,subdir,add_timestamp=True,tags=None):
104        """
105        Logs rows into jsonl format.
106
107        Args:
108            rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows or pandas dataframe.
109            subdir (str): Subdirectory name. Intended to be an element of `self.subdir`.
110            add_timestamp (bool,optional): Whether or not to include a timestamp column.
111            tags (:obj:`dict` of :obj:`str`): Dictionary of strings to be inserted as constant columns.
112        """
113        t = self.timestamp
114        path = os.path.join(self.path,self.start_timestamp,f"{subdir}.jsonl")
115
116        df = pd.DataFrame()
117        if isinstance(rows,list):
118            df = pd.DataFrame(rows)
119        elif isinstance(rows,pd.DataFrame):
120            df = rows.copy()
121        
122        if add_timestamp:
123            df.loc[:,"log_timestamp"] = datetime_to_str(t)
124        if os.path.exists(path):
125            if self.first_timestamp is None:
126                with open(path) as file:
127                    head = pd.read_json(io.StringIO(file.readline()+file.readline()), orient="records", lines=True)
128                    self.first_timestamp = datetime_from_str(head.log_timestamp[0])
129        else:
130            self.first_timestamp = t
131        df.loc[:,"elapsed_time_hours"] = (t - self.first_timestamp).total_seconds()/3600.0
132
133        #Inserting constant values
134        if tags is not None:
135            for key,value in tags.items():
136                df.loc[:,key] = value
137
138        with open(path, mode="a") as log_file:
139            log_file.write(df.to_json(orient="records", lines=True))
140
141        return df

Logs rows into jsonl format.

Arguments:
  • rows (list of dict): List of dictionary-encoded rows or pandas dataframe.
  • subdir (str): Subdirectory name. Intended to be an element of self.subdir.
  • add_timestamp (bool,optional): Whether or not to include a timestamp column.
  • tags (dict of str): Dictionary of strings to be inserted as constant columns.
def log_many_rows(self, data, **kwargs):
142    def log_many_rows(self,data,**kwargs):
143        """
144        Logs rows into jsonl format.
145
146        Args:
147            data (:obj:`dict` of :obj:`dict`): Dictionary encoded data frame.
148            **kwargs: Additional arguments passed to `self.log_rows`.
149        """
150        self.data_frames = {}
151        for _id,row in data.items():
152            df = self.log_rows(rows=[row],subdir=_id,**kwargs)
153            self.data_frames[_id] = df
154        self.data_frames = pd.concat(list(self.data_frames.values()))

Logs rows into jsonl format.

Arguments:
  • data (dict of dict): Dictionary encoded data frame.
  • **kwargs: Additional arguments passed to self.log_rows.
def log_optimal(self, column, maximum=True, **kwargs):
156    def log_optimal(self,column,maximum=True,**kwargs):
157        """
158        Logs optima of all rows into a single file.
159        """
160        i=self.data_frames.loc[:,column].astype(float).argmax() if maximum else self.data_frames.loc[:,column].astype(float).argmin()
161        self.df_opt = self.data_frames.iloc[i,:]
162        self.log_rows(rows=[self.df_opt.to_dict()],subdir='opt',**kwargs)

Logs optima of all rows into a single file.

def log_average(self, cols: list, **kwargs):
164    def log_average(self, cols: list, **kwargs):
165        """
166        Calculate the average values of specified columns in the data frames and log the results.
167
168        Parameters:
169        - cols (list): A list of column names to calculate the average for.
170        - **kwargs: Additional keyword arguments to customize the logging process.
171        """
172        df = self.data_frames.copy()
173        df.loc[:, cols] = df.loc[:, cols].astype(float)
174        df.elapsed_time_hours = df.elapsed_time_hours.round(decimals=2)
175        self.df_avg = df.loc[:, cols + ['elapsed_time_hours']].groupby("elapsed_time_hours").mean().reset_index()
176        self.log_rows(rows=self.df_avg, subdir='avg',  **kwargs)

Calculate the average values of specified columns in the data frames and log the results.

Parameters:

  • cols (list): A list of column names to calculate the average for.
  • **kwargs: Additional keyword arguments to customize the logging process.
def cache_data(self, rows, path='./cache.jsonl', **kwargs):
178    def cache_data(self,rows,path="./cache.jsonl",**kwargs):
179        """
180        Dumps rows into a single jsonl.
181
182        Args:
183            rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows.
184            path (str): Path to the jsonl file.
185        """
186        pd.DataFrame(rows).T.to_json(path, orient="records", lines=True, **kwargs)

Dumps rows into a single jsonl.

Arguments:
  • rows (list of dict): List of dictionary-encoded rows.
  • path (str): Path to the jsonl file.
def transpose(self, columns, destination, skip=1, **kwargs):
188    def transpose(self,columns,destination,skip=1,**kwargs):
189        """
190        Maps reactor jsonl to column jsonls with columns given by columns.
191
192        Args:
193            columns (:obj:list of :obj:str): List of columns to extract.
194            destination (str): Destination path. Creates directories as needed and overwrites any existing files.
195
196            skip (int, optional): How many rows to jump while reading the input files. Defaults to 1.
197        """
198        dfs = []
199        for file in self.paths:
200            df = pd.read_json(file, orient="records", lines=True, **kwargs)
201            df['FILE'] = file
202            dfs.append(df.iloc[::skip,:])
203        df = pd.concat(dfs)
204
205        for column in columns:
206            Path(destination).mkdir(parents=True,exist_ok=True)
207            df.loc[:,['ID','FILE',column,'elapsed_time_hours']].to_json(os.path.join(destination,f"{column}.jsonl"), orient="records", lines=True)

Maps reactor jsonl to column jsonls with columns given by columns.

Arguments:
  • columns (: obj:list of :obj:str): List of columns to extract.
  • destination (str): Destination path. Creates directories as needed and overwrites any existing files.
  • skip (int, optional): How many rows to jump while reading the input files. Defaults to 1.
class LogAggregator:
210class LogAggregator:
211    def __init__(self,log_paths,timestamp_col="log_timestamp",elapsed_time_col="elapsed_time_hours"):
212        """
213        Merges logs from various experiments into a single file for each bioreactor.
214
215        Args:
216            log_paths (:obj:list of :obj:str): List of glob strings pointing at the input files for each experiment.
217            timestamp_col (str, optional): Column to use as timestamp. Defaults to "log_timestamp".
218            elapsed_time_col (str, optional): Columns to use as 'elapsed time'. Defaults to "elapsed_time_hours".
219        """
220        self.glob_list = log_paths
221        self.timestamp_col = timestamp_col
222        self.elapsed_time_col = elapsed_time_col
223    def agg(self,destination,skip=1,**kwargs):
224        """
225        Aggregator
226
227        Args:
228            destination (str): Destination path. Creates directories as needed and overwrites any existing files.
229            skip (int, optional): How many rows to jump while reading the input files. Defaults to 1.
230        """
231        dfs = {}
232        for path in self.glob_list:
233            for file in glob(path):
234                basename = os.path.basename(file)
235                df = pd.read_json(file, orient="records", lines=True, dtype={self.elapsed_time_col:float},**kwargs)
236                df = df.iloc[::skip,:]
237                df['FILE'] = file
238                if dfs.get(basename,None) is not None:
239                    top_timestamp = datetime_from_str(df.head(1)[self.timestamp_col].iloc[0])
240                    bottom_timestamp = datetime_from_str(dfs.get(basename).tail(1)[self.timestamp_col].iloc[0])
241                    bottom_elapsed_time = dfs.get(basename).tail(1)[self.elapsed_time_col].iloc[0]
242                    deltaT = (top_timestamp - bottom_timestamp).total_seconds()/3600.0
243                    print("DeltaT",deltaT)
244                    print(df[self.elapsed_time_col].head())
245                    df[self.elapsed_time_col] = df[self.elapsed_time_col] + deltaT + bottom_elapsed_time
246                    print(df[self.elapsed_time_col].head())
247                    dfs[basename] = pd.concat([dfs[basename],df])
248                else:
249                    dfs[basename] = df
250        for filename, df in dfs.items():
251            Path(destination).mkdir(parents=True,exist_ok=True)
252            path = os.path.join(destination,filename)
253            df.to_json(path, orient="records", lines=True)
LogAggregator( log_paths, timestamp_col='log_timestamp', elapsed_time_col='elapsed_time_hours')
211    def __init__(self,log_paths,timestamp_col="log_timestamp",elapsed_time_col="elapsed_time_hours"):
212        """
213        Merges logs from various experiments into a single file for each bioreactor.
214
215        Args:
216            log_paths (:obj:list of :obj:str): List of glob strings pointing at the input files for each experiment.
217            timestamp_col (str, optional): Column to use as timestamp. Defaults to "log_timestamp".
218            elapsed_time_col (str, optional): Columns to use as 'elapsed time'. Defaults to "elapsed_time_hours".
219        """
220        self.glob_list = log_paths
221        self.timestamp_col = timestamp_col
222        self.elapsed_time_col = elapsed_time_col

Merges logs from various experiments into a single file for each bioreactor.

Arguments:
  • log_paths (: obj:list of :obj:str): List of glob strings pointing at the input files for each experiment.
  • timestamp_col (str, optional): Column to use as timestamp. Defaults to "log_timestamp".
  • elapsed_time_col (str, optional): Columns to use as 'elapsed time'. Defaults to "elapsed_time_hours".
glob_list
timestamp_col
elapsed_time_col
def agg(self, destination, skip=1, **kwargs):
223    def agg(self,destination,skip=1,**kwargs):
224        """
225        Aggregator
226
227        Args:
228            destination (str): Destination path. Creates directories as needed and overwrites any existing files.
229            skip (int, optional): How many rows to jump while reading the input files. Defaults to 1.
230        """
231        dfs = {}
232        for path in self.glob_list:
233            for file in glob(path):
234                basename = os.path.basename(file)
235                df = pd.read_json(file, orient="records", lines=True, dtype={self.elapsed_time_col:float},**kwargs)
236                df = df.iloc[::skip,:]
237                df['FILE'] = file
238                if dfs.get(basename,None) is not None:
239                    top_timestamp = datetime_from_str(df.head(1)[self.timestamp_col].iloc[0])
240                    bottom_timestamp = datetime_from_str(dfs.get(basename).tail(1)[self.timestamp_col].iloc[0])
241                    bottom_elapsed_time = dfs.get(basename).tail(1)[self.elapsed_time_col].iloc[0]
242                    deltaT = (top_timestamp - bottom_timestamp).total_seconds()/3600.0
243                    print("DeltaT",deltaT)
244                    print(df[self.elapsed_time_col].head())
245                    df[self.elapsed_time_col] = df[self.elapsed_time_col] + deltaT + bottom_elapsed_time
246                    print(df[self.elapsed_time_col].head())
247                    dfs[basename] = pd.concat([dfs[basename],df])
248                else:
249                    dfs[basename] = df
250        for filename, df in dfs.items():
251            Path(destination).mkdir(parents=True,exist_ok=True)
252            path = os.path.join(destination,filename)
253            df.to_json(path, orient="records", lines=True)

Aggregator

Arguments:
  • destination (str): Destination path. Creates directories as needed and overwrites any existing files.
  • skip (int, optional): How many rows to jump while reading the input files. Defaults to 1.