pyduino.log
1import pandas as pd 2import os 3from pathlib import Path 4from datetime import datetime 5import io 6from glob import glob 7from tabulate import tabulate 8from collections import OrderedDict 9from datetime import datetime 10 11__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) 12config_file = os.path.join(__location__,"config.yaml") 13 14def datetime_from_str(x): 15 return datetime.strptime(str(x),"%Y%m%d%H%M%S") 16 17def datetime_to_str(x): 18 return x.strftime("%Y%m%d%H%M%S") 19 20def to_markdown_table(data: OrderedDict) -> str: 21 """ 22 Converts the given data into a markdown table format. 23 24 Args: 25 data (OrderedDict[OrderedDict]): The data to be converted into a markdown table. 26 27 Returns: 28 str: The markdown table representation of the data. 29 """ 30 rows = [] 31 for rid, rdata in data.items(): 32 rdata = OrderedDict({"ID": rid, **rdata}) 33 rows.append(rdata) 34 return tabulate(rows, headers="keys", tablefmt="pipe") 35 36def y_to_table(y): 37 return tabulate(list(y.items()), tablefmt="pipe") 38 39class Log: 40 @property 41 def timestamp(self): 42 """str: Current date.""" 43 return datetime.now() 44 45 @property 46 def prefix(self): 47 return os.path.join(self.path,self.start_timestamp) 48 49 def __init__(self,subdir,path="./log",name=None): 50 """ 51 Logs data into jsonls with timestamps. 52 53 Example: 54 log_obj = log(['reactor_0','reactor_1'],path='./log',name='experiment_0') 55 56 log/YEAR/MONTH/ 57 ├─ experiment_0/ 58 │ ├─ reactor_0.jsonl 59 │ ├─ reactor_1.jsonl 60 61 Args: 62 subdir (:obj:`list` of :obj:`str`): List of the names for the subdirectories of `path`. 63 path (str): Save path for the logs. 64 name (str): Name given for this particular instance. If none will name it with the current timestamp. 65 """ 66 self.today = datetime.now() 67 self.path = os.path.join(path, self.today.strftime("%Y"), self.today.strftime("%m")) 68 self.start_timestamp = datetime_to_str(self.timestamp) if name is None else name 69 self.log_name = name 70 Path(os.path.join(self.path,self.start_timestamp)).mkdir(parents=True,exist_ok=True) 71 if isinstance(subdir,str): 72 self.subdir = list(map(os.path.basename,glob(os.path.join(self.prefix,subdir)))) 73 elif isinstance(subdir,list): 74 self.subdir = subdir 75 else: 76 raise ValueError("Invalid type for subdir. Must be either a list of strings or a glob string.") 77 self.subdir = list(map(lambda x: str(x)+".jsonl" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir)) 78 self.first_timestamp = None 79 self.data_frames = {} 80 81 self.paths = list(map(lambda x: os.path.join(self.prefix,x),self.subdir)) 82 self.log_name = name 83 Path(os.path.join(self.path,self.start_timestamp)).mkdir(parents=True,exist_ok=True) 84 if isinstance(subdir,str): 85 self.subdir = list(map(os.path.basename,glob(os.path.join(self.prefix,subdir)))) 86 elif isinstance(subdir,list): 87 self.subdir = subdir 88 else: 89 raise ValueError("Invalid type for subdir. Must be either a list of strings or a glob string.") 90 self.subdir = list(map(lambda x: str(x)+".jsonl" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir)) 91 self.first_timestamp = None 92 self.data_frames = {} 93 94 self.paths = list(map(lambda x: os.path.join(self.prefix,x),self.subdir)) 95 96 def backup_config_file(self): 97 filename = os.path.join(self.path,self.start_timestamp,f"{self.start_timestamp.replace('/','-')}.yaml") 98 if not os.path.exists(filename): 99 with open(config_file) as cfile, open(filename,'w') as wfile: 100 wfile.write(cfile.read()) 101 102 def log_rows(self,rows,subdir,add_timestamp=True,tags=None): 103 """ 104 Logs rows into jsonl format. 105 106 Args: 107 rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows or pandas dataframe. 108 subdir (str): Subdirectory name. Intended to be an element of `self.subdir`. 109 add_timestamp (bool,optional): Whether or not to include a timestamp column. 110 tags (:obj:`dict` of :obj:`str`): Dictionary of strings to be inserted as constant columns. 111 """ 112 t = self.timestamp 113 path = os.path.join(self.path,self.start_timestamp,f"{subdir}.jsonl") 114 115 df = pd.DataFrame() 116 if isinstance(rows,list): 117 df = pd.DataFrame(rows) 118 elif isinstance(rows,pd.DataFrame): 119 df = rows.copy() 120 121 if add_timestamp: 122 df.loc[:,"log_timestamp"] = datetime_to_str(t) 123 if os.path.exists(path): 124 if self.first_timestamp is None: 125 with open(path) as file: 126 head = pd.read_json(io.StringIO(file.readline()+file.readline()), orient="records", lines=True) 127 self.first_timestamp = datetime_from_str(head.log_timestamp[0]) 128 else: 129 self.first_timestamp = t 130 df.loc[:,"elapsed_time_hours"] = (t - self.first_timestamp).total_seconds()/3600.0 131 132 #Inserting constant values 133 if tags is not None: 134 for key,value in tags.items(): 135 df.loc[:,key] = value 136 137 with open(path, mode="a") as log_file: 138 log_file.write(df.to_json(orient="records", lines=True)) 139 140 return df 141 def log_many_rows(self,data,**kwargs): 142 """ 143 Logs rows into jsonl format. 144 145 Args: 146 data (:obj:`dict` of :obj:`dict`): Dictionary encoded data frame. 147 **kwargs: Additional arguments passed to `self.log_rows`. 148 """ 149 self.data_frames = {} 150 for _id,row in data.items(): 151 df = self.log_rows(rows=[row],subdir=_id,**kwargs) 152 self.data_frames[_id] = df 153 self.data_frames = pd.concat(list(self.data_frames.values())) 154 155 def log_optimal(self,column,maximum=True,**kwargs): 156 """ 157 Logs optima of all rows into a single file. 158 """ 159 i=self.data_frames.loc[:,column].astype(float).argmax() if maximum else self.data_frames.loc[:,column].astype(float).argmin() 160 self.df_opt = self.data_frames.iloc[i,:] 161 self.log_rows(rows=[self.df_opt.to_dict()],subdir='opt',**kwargs) 162 163 def log_average(self, cols: list, **kwargs): 164 """ 165 Calculate the average values of specified columns in the data frames and log the results. 166 167 Parameters: 168 - cols (list): A list of column names to calculate the average for. 169 - **kwargs: Additional keyword arguments to customize the logging process. 170 """ 171 df = self.data_frames.copy() 172 df.loc[:, cols] = df.loc[:, cols].astype(float) 173 df.elapsed_time_hours = df.elapsed_time_hours.round(decimals=2) 174 self.df_avg = df.loc[:, cols + ['elapsed_time_hours']].groupby("elapsed_time_hours").mean().reset_index() 175 self.log_rows(rows=self.df_avg, subdir='avg', **kwargs) 176 177 def cache_data(self,rows,path="./cache.jsonl",**kwargs): 178 """ 179 Dumps rows into a single jsonl. 180 181 Args: 182 rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows. 183 path (str): Path to the jsonl file. 184 """ 185 pd.DataFrame(rows).T.to_json(path, orient="records", lines=True, **kwargs) 186 187 def transpose(self,columns,destination,skip=1,**kwargs): 188 """ 189 Maps reactor jsonl to column jsonls with columns given by columns. 190 191 Args: 192 columns (:obj:list of :obj:str): List of columns to extract. 193 destination (str): Destination path. Creates directories as needed and overwrites any existing files. 194 195 skip (int, optional): How many rows to jump while reading the input files. Defaults to 1. 196 """ 197 dfs = [] 198 for file in self.paths: 199 df = pd.read_json(file, orient="records", lines=True, **kwargs) 200 df['FILE'] = file 201 dfs.append(df.iloc[::skip,:]) 202 df = pd.concat(dfs) 203 204 for column in columns: 205 Path(destination).mkdir(parents=True,exist_ok=True) 206 df.loc[:,['ID','FILE',column,'elapsed_time_hours']].to_json(os.path.join(destination,f"{column}.jsonl"), orient="records", lines=True) 207 208 209class LogAggregator: 210 def __init__(self,log_paths,timestamp_col="log_timestamp",elapsed_time_col="elapsed_time_hours"): 211 """ 212 Merges logs from various experiments into a single file for each bioreactor. 213 214 Args: 215 log_paths (:obj:list of :obj:str): List of glob strings pointing at the input files for each experiment. 216 timestamp_col (str, optional): Column to use as timestamp. Defaults to "log_timestamp". 217 elapsed_time_col (str, optional): Columns to use as 'elapsed time'. Defaults to "elapsed_time_hours". 218 """ 219 self.glob_list = log_paths 220 self.timestamp_col = timestamp_col 221 self.elapsed_time_col = elapsed_time_col 222 def agg(self,destination,skip=1,**kwargs): 223 """ 224 Aggregator 225 226 Args: 227 destination (str): Destination path. Creates directories as needed and overwrites any existing files. 228 skip (int, optional): How many rows to jump while reading the input files. Defaults to 1. 229 """ 230 dfs = {} 231 for path in self.glob_list: 232 for file in glob(path): 233 basename = os.path.basename(file) 234 df = pd.read_json(file, orient="records", lines=True, dtype={self.elapsed_time_col:float},**kwargs) 235 df = df.iloc[::skip,:] 236 df['FILE'] = file 237 if dfs.get(basename,None) is not None: 238 top_timestamp = datetime_from_str(df.head(1)[self.timestamp_col].iloc[0]) 239 bottom_timestamp = datetime_from_str(dfs.get(basename).tail(1)[self.timestamp_col].iloc[0]) 240 bottom_elapsed_time = dfs.get(basename).tail(1)[self.elapsed_time_col].iloc[0] 241 deltaT = (top_timestamp - bottom_timestamp).total_seconds()/3600.0 242 print("DeltaT",deltaT) 243 print(df[self.elapsed_time_col].head()) 244 df[self.elapsed_time_col] = df[self.elapsed_time_col] + deltaT + bottom_elapsed_time 245 print(df[self.elapsed_time_col].head()) 246 dfs[basename] = pd.concat([dfs[basename],df]) 247 else: 248 dfs[basename] = df 249 for filename, df in dfs.items(): 250 Path(destination).mkdir(parents=True,exist_ok=True) 251 path = os.path.join(destination,filename) 252 df.to_json(path, orient="records", lines=True)
config_file =
'/home/runner/work/pyduino-parallel/pyduino-parallel/pyduino/config.yaml'
def
datetime_from_str(x):
def
datetime_to_str(x):
def
to_markdown_table(data: collections.OrderedDict) -> str:
21def to_markdown_table(data: OrderedDict) -> str: 22 """ 23 Converts the given data into a markdown table format. 24 25 Args: 26 data (OrderedDict[OrderedDict]): The data to be converted into a markdown table. 27 28 Returns: 29 str: The markdown table representation of the data. 30 """ 31 rows = [] 32 for rid, rdata in data.items(): 33 rdata = OrderedDict({"ID": rid, **rdata}) 34 rows.append(rdata) 35 return tabulate(rows, headers="keys", tablefmt="pipe")
Converts the given data into a markdown table format.
Arguments:
- data (OrderedDict[OrderedDict]): The data to be converted into a markdown table.
Returns:
str: The markdown table representation of the data.
def
y_to_table(y):
class
Log:
40class Log: 41 @property 42 def timestamp(self): 43 """str: Current date.""" 44 return datetime.now() 45 46 @property 47 def prefix(self): 48 return os.path.join(self.path,self.start_timestamp) 49 50 def __init__(self,subdir,path="./log",name=None): 51 """ 52 Logs data into jsonls with timestamps. 53 54 Example: 55 log_obj = log(['reactor_0','reactor_1'],path='./log',name='experiment_0') 56 57 log/YEAR/MONTH/ 58 ├─ experiment_0/ 59 │ ├─ reactor_0.jsonl 60 │ ├─ reactor_1.jsonl 61 62 Args: 63 subdir (:obj:`list` of :obj:`str`): List of the names for the subdirectories of `path`. 64 path (str): Save path for the logs. 65 name (str): Name given for this particular instance. If none will name it with the current timestamp. 66 """ 67 self.today = datetime.now() 68 self.path = os.path.join(path, self.today.strftime("%Y"), self.today.strftime("%m")) 69 self.start_timestamp = datetime_to_str(self.timestamp) if name is None else name 70 self.log_name = name 71 Path(os.path.join(self.path,self.start_timestamp)).mkdir(parents=True,exist_ok=True) 72 if isinstance(subdir,str): 73 self.subdir = list(map(os.path.basename,glob(os.path.join(self.prefix,subdir)))) 74 elif isinstance(subdir,list): 75 self.subdir = subdir 76 else: 77 raise ValueError("Invalid type for subdir. Must be either a list of strings or a glob string.") 78 self.subdir = list(map(lambda x: str(x)+".jsonl" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir)) 79 self.first_timestamp = None 80 self.data_frames = {} 81 82 self.paths = list(map(lambda x: os.path.join(self.prefix,x),self.subdir)) 83 self.log_name = name 84 Path(os.path.join(self.path,self.start_timestamp)).mkdir(parents=True,exist_ok=True) 85 if isinstance(subdir,str): 86 self.subdir = list(map(os.path.basename,glob(os.path.join(self.prefix,subdir)))) 87 elif isinstance(subdir,list): 88 self.subdir = subdir 89 else: 90 raise ValueError("Invalid type for subdir. Must be either a list of strings or a glob string.") 91 self.subdir = list(map(lambda x: str(x)+".jsonl" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir)) 92 self.first_timestamp = None 93 self.data_frames = {} 94 95 self.paths = list(map(lambda x: os.path.join(self.prefix,x),self.subdir)) 96 97 def backup_config_file(self): 98 filename = os.path.join(self.path,self.start_timestamp,f"{self.start_timestamp.replace('/','-')}.yaml") 99 if not os.path.exists(filename): 100 with open(config_file) as cfile, open(filename,'w') as wfile: 101 wfile.write(cfile.read()) 102 103 def log_rows(self,rows,subdir,add_timestamp=True,tags=None): 104 """ 105 Logs rows into jsonl format. 106 107 Args: 108 rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows or pandas dataframe. 109 subdir (str): Subdirectory name. Intended to be an element of `self.subdir`. 110 add_timestamp (bool,optional): Whether or not to include a timestamp column. 111 tags (:obj:`dict` of :obj:`str`): Dictionary of strings to be inserted as constant columns. 112 """ 113 t = self.timestamp 114 path = os.path.join(self.path,self.start_timestamp,f"{subdir}.jsonl") 115 116 df = pd.DataFrame() 117 if isinstance(rows,list): 118 df = pd.DataFrame(rows) 119 elif isinstance(rows,pd.DataFrame): 120 df = rows.copy() 121 122 if add_timestamp: 123 df.loc[:,"log_timestamp"] = datetime_to_str(t) 124 if os.path.exists(path): 125 if self.first_timestamp is None: 126 with open(path) as file: 127 head = pd.read_json(io.StringIO(file.readline()+file.readline()), orient="records", lines=True) 128 self.first_timestamp = datetime_from_str(head.log_timestamp[0]) 129 else: 130 self.first_timestamp = t 131 df.loc[:,"elapsed_time_hours"] = (t - self.first_timestamp).total_seconds()/3600.0 132 133 #Inserting constant values 134 if tags is not None: 135 for key,value in tags.items(): 136 df.loc[:,key] = value 137 138 with open(path, mode="a") as log_file: 139 log_file.write(df.to_json(orient="records", lines=True)) 140 141 return df 142 def log_many_rows(self,data,**kwargs): 143 """ 144 Logs rows into jsonl format. 145 146 Args: 147 data (:obj:`dict` of :obj:`dict`): Dictionary encoded data frame. 148 **kwargs: Additional arguments passed to `self.log_rows`. 149 """ 150 self.data_frames = {} 151 for _id,row in data.items(): 152 df = self.log_rows(rows=[row],subdir=_id,**kwargs) 153 self.data_frames[_id] = df 154 self.data_frames = pd.concat(list(self.data_frames.values())) 155 156 def log_optimal(self,column,maximum=True,**kwargs): 157 """ 158 Logs optima of all rows into a single file. 159 """ 160 i=self.data_frames.loc[:,column].astype(float).argmax() if maximum else self.data_frames.loc[:,column].astype(float).argmin() 161 self.df_opt = self.data_frames.iloc[i,:] 162 self.log_rows(rows=[self.df_opt.to_dict()],subdir='opt',**kwargs) 163 164 def log_average(self, cols: list, **kwargs): 165 """ 166 Calculate the average values of specified columns in the data frames and log the results. 167 168 Parameters: 169 - cols (list): A list of column names to calculate the average for. 170 - **kwargs: Additional keyword arguments to customize the logging process. 171 """ 172 df = self.data_frames.copy() 173 df.loc[:, cols] = df.loc[:, cols].astype(float) 174 df.elapsed_time_hours = df.elapsed_time_hours.round(decimals=2) 175 self.df_avg = df.loc[:, cols + ['elapsed_time_hours']].groupby("elapsed_time_hours").mean().reset_index() 176 self.log_rows(rows=self.df_avg, subdir='avg', **kwargs) 177 178 def cache_data(self,rows,path="./cache.jsonl",**kwargs): 179 """ 180 Dumps rows into a single jsonl. 181 182 Args: 183 rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows. 184 path (str): Path to the jsonl file. 185 """ 186 pd.DataFrame(rows).T.to_json(path, orient="records", lines=True, **kwargs) 187 188 def transpose(self,columns,destination,skip=1,**kwargs): 189 """ 190 Maps reactor jsonl to column jsonls with columns given by columns. 191 192 Args: 193 columns (:obj:list of :obj:str): List of columns to extract. 194 destination (str): Destination path. Creates directories as needed and overwrites any existing files. 195 196 skip (int, optional): How many rows to jump while reading the input files. Defaults to 1. 197 """ 198 dfs = [] 199 for file in self.paths: 200 df = pd.read_json(file, orient="records", lines=True, **kwargs) 201 df['FILE'] = file 202 dfs.append(df.iloc[::skip,:]) 203 df = pd.concat(dfs) 204 205 for column in columns: 206 Path(destination).mkdir(parents=True,exist_ok=True) 207 df.loc[:,['ID','FILE',column,'elapsed_time_hours']].to_json(os.path.join(destination,f"{column}.jsonl"), orient="records", lines=True)
Log(subdir, path='./log', name=None)
50 def __init__(self,subdir,path="./log",name=None): 51 """ 52 Logs data into jsonls with timestamps. 53 54 Example: 55 log_obj = log(['reactor_0','reactor_1'],path='./log',name='experiment_0') 56 57 log/YEAR/MONTH/ 58 ├─ experiment_0/ 59 │ ├─ reactor_0.jsonl 60 │ ├─ reactor_1.jsonl 61 62 Args: 63 subdir (:obj:`list` of :obj:`str`): List of the names for the subdirectories of `path`. 64 path (str): Save path for the logs. 65 name (str): Name given for this particular instance. If none will name it with the current timestamp. 66 """ 67 self.today = datetime.now() 68 self.path = os.path.join(path, self.today.strftime("%Y"), self.today.strftime("%m")) 69 self.start_timestamp = datetime_to_str(self.timestamp) if name is None else name 70 self.log_name = name 71 Path(os.path.join(self.path,self.start_timestamp)).mkdir(parents=True,exist_ok=True) 72 if isinstance(subdir,str): 73 self.subdir = list(map(os.path.basename,glob(os.path.join(self.prefix,subdir)))) 74 elif isinstance(subdir,list): 75 self.subdir = subdir 76 else: 77 raise ValueError("Invalid type for subdir. Must be either a list of strings or a glob string.") 78 self.subdir = list(map(lambda x: str(x)+".jsonl" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir)) 79 self.first_timestamp = None 80 self.data_frames = {} 81 82 self.paths = list(map(lambda x: os.path.join(self.prefix,x),self.subdir)) 83 self.log_name = name 84 Path(os.path.join(self.path,self.start_timestamp)).mkdir(parents=True,exist_ok=True) 85 if isinstance(subdir,str): 86 self.subdir = list(map(os.path.basename,glob(os.path.join(self.prefix,subdir)))) 87 elif isinstance(subdir,list): 88 self.subdir = subdir 89 else: 90 raise ValueError("Invalid type for subdir. Must be either a list of strings or a glob string.") 91 self.subdir = list(map(lambda x: str(x)+".jsonl" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir)) 92 self.first_timestamp = None 93 self.data_frames = {} 94 95 self.paths = list(map(lambda x: os.path.join(self.prefix,x),self.subdir))
Logs data into jsonls with timestamps.
Example:
log_obj = log(['reactor_0','reactor_1'],path='./log',name='experiment_0')
log/YEAR/MONTH/ ├─ experiment_0/ │ ├─ reactor_0.jsonl │ ├─ reactor_1.jsonl
Arguments:
- subdir (
list
ofstr
): List of the names for the subdirectories ofpath
. - path (str): Save path for the logs.
- name (str): Name given for this particular instance. If none will name it with the current timestamp.
def
log_rows(self, rows, subdir, add_timestamp=True, tags=None):
103 def log_rows(self,rows,subdir,add_timestamp=True,tags=None): 104 """ 105 Logs rows into jsonl format. 106 107 Args: 108 rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows or pandas dataframe. 109 subdir (str): Subdirectory name. Intended to be an element of `self.subdir`. 110 add_timestamp (bool,optional): Whether or not to include a timestamp column. 111 tags (:obj:`dict` of :obj:`str`): Dictionary of strings to be inserted as constant columns. 112 """ 113 t = self.timestamp 114 path = os.path.join(self.path,self.start_timestamp,f"{subdir}.jsonl") 115 116 df = pd.DataFrame() 117 if isinstance(rows,list): 118 df = pd.DataFrame(rows) 119 elif isinstance(rows,pd.DataFrame): 120 df = rows.copy() 121 122 if add_timestamp: 123 df.loc[:,"log_timestamp"] = datetime_to_str(t) 124 if os.path.exists(path): 125 if self.first_timestamp is None: 126 with open(path) as file: 127 head = pd.read_json(io.StringIO(file.readline()+file.readline()), orient="records", lines=True) 128 self.first_timestamp = datetime_from_str(head.log_timestamp[0]) 129 else: 130 self.first_timestamp = t 131 df.loc[:,"elapsed_time_hours"] = (t - self.first_timestamp).total_seconds()/3600.0 132 133 #Inserting constant values 134 if tags is not None: 135 for key,value in tags.items(): 136 df.loc[:,key] = value 137 138 with open(path, mode="a") as log_file: 139 log_file.write(df.to_json(orient="records", lines=True)) 140 141 return df
Logs rows into jsonl format.
Arguments:
- rows (
list
ofdict
): List of dictionary-encoded rows or pandas dataframe. - subdir (str): Subdirectory name. Intended to be an element of
self.subdir
. - add_timestamp (bool,optional): Whether or not to include a timestamp column.
- tags (
dict
ofstr
): Dictionary of strings to be inserted as constant columns.
def
log_many_rows(self, data, **kwargs):
142 def log_many_rows(self,data,**kwargs): 143 """ 144 Logs rows into jsonl format. 145 146 Args: 147 data (:obj:`dict` of :obj:`dict`): Dictionary encoded data frame. 148 **kwargs: Additional arguments passed to `self.log_rows`. 149 """ 150 self.data_frames = {} 151 for _id,row in data.items(): 152 df = self.log_rows(rows=[row],subdir=_id,**kwargs) 153 self.data_frames[_id] = df 154 self.data_frames = pd.concat(list(self.data_frames.values()))
Logs rows into jsonl format.
Arguments:
- data (
dict
ofdict
): Dictionary encoded data frame. - **kwargs: Additional arguments passed to
self.log_rows
.
def
log_optimal(self, column, maximum=True, **kwargs):
156 def log_optimal(self,column,maximum=True,**kwargs): 157 """ 158 Logs optima of all rows into a single file. 159 """ 160 i=self.data_frames.loc[:,column].astype(float).argmax() if maximum else self.data_frames.loc[:,column].astype(float).argmin() 161 self.df_opt = self.data_frames.iloc[i,:] 162 self.log_rows(rows=[self.df_opt.to_dict()],subdir='opt',**kwargs)
Logs optima of all rows into a single file.
def
log_average(self, cols: list, **kwargs):
164 def log_average(self, cols: list, **kwargs): 165 """ 166 Calculate the average values of specified columns in the data frames and log the results. 167 168 Parameters: 169 - cols (list): A list of column names to calculate the average for. 170 - **kwargs: Additional keyword arguments to customize the logging process. 171 """ 172 df = self.data_frames.copy() 173 df.loc[:, cols] = df.loc[:, cols].astype(float) 174 df.elapsed_time_hours = df.elapsed_time_hours.round(decimals=2) 175 self.df_avg = df.loc[:, cols + ['elapsed_time_hours']].groupby("elapsed_time_hours").mean().reset_index() 176 self.log_rows(rows=self.df_avg, subdir='avg', **kwargs)
Calculate the average values of specified columns in the data frames and log the results.
Parameters:
- cols (list): A list of column names to calculate the average for.
- **kwargs: Additional keyword arguments to customize the logging process.
def
cache_data(self, rows, path='./cache.jsonl', **kwargs):
178 def cache_data(self,rows,path="./cache.jsonl",**kwargs): 179 """ 180 Dumps rows into a single jsonl. 181 182 Args: 183 rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows. 184 path (str): Path to the jsonl file. 185 """ 186 pd.DataFrame(rows).T.to_json(path, orient="records", lines=True, **kwargs)
Dumps rows into a single jsonl.
Arguments:
- rows (
list
ofdict
): List of dictionary-encoded rows. - path (str): Path to the jsonl file.
def
transpose(self, columns, destination, skip=1, **kwargs):
188 def transpose(self,columns,destination,skip=1,**kwargs): 189 """ 190 Maps reactor jsonl to column jsonls with columns given by columns. 191 192 Args: 193 columns (:obj:list of :obj:str): List of columns to extract. 194 destination (str): Destination path. Creates directories as needed and overwrites any existing files. 195 196 skip (int, optional): How many rows to jump while reading the input files. Defaults to 1. 197 """ 198 dfs = [] 199 for file in self.paths: 200 df = pd.read_json(file, orient="records", lines=True, **kwargs) 201 df['FILE'] = file 202 dfs.append(df.iloc[::skip,:]) 203 df = pd.concat(dfs) 204 205 for column in columns: 206 Path(destination).mkdir(parents=True,exist_ok=True) 207 df.loc[:,['ID','FILE',column,'elapsed_time_hours']].to_json(os.path.join(destination,f"{column}.jsonl"), orient="records", lines=True)
Maps reactor jsonl to column jsonls with columns given by columns.
Arguments:
- columns (: obj:list of :obj:str): List of columns to extract.
- destination (str): Destination path. Creates directories as needed and overwrites any existing files.
- skip (int, optional): How many rows to jump while reading the input files. Defaults to 1.
class
LogAggregator:
210class LogAggregator: 211 def __init__(self,log_paths,timestamp_col="log_timestamp",elapsed_time_col="elapsed_time_hours"): 212 """ 213 Merges logs from various experiments into a single file for each bioreactor. 214 215 Args: 216 log_paths (:obj:list of :obj:str): List of glob strings pointing at the input files for each experiment. 217 timestamp_col (str, optional): Column to use as timestamp. Defaults to "log_timestamp". 218 elapsed_time_col (str, optional): Columns to use as 'elapsed time'. Defaults to "elapsed_time_hours". 219 """ 220 self.glob_list = log_paths 221 self.timestamp_col = timestamp_col 222 self.elapsed_time_col = elapsed_time_col 223 def agg(self,destination,skip=1,**kwargs): 224 """ 225 Aggregator 226 227 Args: 228 destination (str): Destination path. Creates directories as needed and overwrites any existing files. 229 skip (int, optional): How many rows to jump while reading the input files. Defaults to 1. 230 """ 231 dfs = {} 232 for path in self.glob_list: 233 for file in glob(path): 234 basename = os.path.basename(file) 235 df = pd.read_json(file, orient="records", lines=True, dtype={self.elapsed_time_col:float},**kwargs) 236 df = df.iloc[::skip,:] 237 df['FILE'] = file 238 if dfs.get(basename,None) is not None: 239 top_timestamp = datetime_from_str(df.head(1)[self.timestamp_col].iloc[0]) 240 bottom_timestamp = datetime_from_str(dfs.get(basename).tail(1)[self.timestamp_col].iloc[0]) 241 bottom_elapsed_time = dfs.get(basename).tail(1)[self.elapsed_time_col].iloc[0] 242 deltaT = (top_timestamp - bottom_timestamp).total_seconds()/3600.0 243 print("DeltaT",deltaT) 244 print(df[self.elapsed_time_col].head()) 245 df[self.elapsed_time_col] = df[self.elapsed_time_col] + deltaT + bottom_elapsed_time 246 print(df[self.elapsed_time_col].head()) 247 dfs[basename] = pd.concat([dfs[basename],df]) 248 else: 249 dfs[basename] = df 250 for filename, df in dfs.items(): 251 Path(destination).mkdir(parents=True,exist_ok=True) 252 path = os.path.join(destination,filename) 253 df.to_json(path, orient="records", lines=True)
LogAggregator( log_paths, timestamp_col='log_timestamp', elapsed_time_col='elapsed_time_hours')
211 def __init__(self,log_paths,timestamp_col="log_timestamp",elapsed_time_col="elapsed_time_hours"): 212 """ 213 Merges logs from various experiments into a single file for each bioreactor. 214 215 Args: 216 log_paths (:obj:list of :obj:str): List of glob strings pointing at the input files for each experiment. 217 timestamp_col (str, optional): Column to use as timestamp. Defaults to "log_timestamp". 218 elapsed_time_col (str, optional): Columns to use as 'elapsed time'. Defaults to "elapsed_time_hours". 219 """ 220 self.glob_list = log_paths 221 self.timestamp_col = timestamp_col 222 self.elapsed_time_col = elapsed_time_col
Merges logs from various experiments into a single file for each bioreactor.
Arguments:
- log_paths (: obj:list of :obj:str): List of glob strings pointing at the input files for each experiment.
- timestamp_col (str, optional): Column to use as timestamp. Defaults to "log_timestamp".
- elapsed_time_col (str, optional): Columns to use as 'elapsed time'. Defaults to "elapsed_time_hours".
def
agg(self, destination, skip=1, **kwargs):
223 def agg(self,destination,skip=1,**kwargs): 224 """ 225 Aggregator 226 227 Args: 228 destination (str): Destination path. Creates directories as needed and overwrites any existing files. 229 skip (int, optional): How many rows to jump while reading the input files. Defaults to 1. 230 """ 231 dfs = {} 232 for path in self.glob_list: 233 for file in glob(path): 234 basename = os.path.basename(file) 235 df = pd.read_json(file, orient="records", lines=True, dtype={self.elapsed_time_col:float},**kwargs) 236 df = df.iloc[::skip,:] 237 df['FILE'] = file 238 if dfs.get(basename,None) is not None: 239 top_timestamp = datetime_from_str(df.head(1)[self.timestamp_col].iloc[0]) 240 bottom_timestamp = datetime_from_str(dfs.get(basename).tail(1)[self.timestamp_col].iloc[0]) 241 bottom_elapsed_time = dfs.get(basename).tail(1)[self.elapsed_time_col].iloc[0] 242 deltaT = (top_timestamp - bottom_timestamp).total_seconds()/3600.0 243 print("DeltaT",deltaT) 244 print(df[self.elapsed_time_col].head()) 245 df[self.elapsed_time_col] = df[self.elapsed_time_col] + deltaT + bottom_elapsed_time 246 print(df[self.elapsed_time_col].head()) 247 dfs[basename] = pd.concat([dfs[basename],df]) 248 else: 249 dfs[basename] = df 250 for filename, df in dfs.items(): 251 Path(destination).mkdir(parents=True,exist_ok=True) 252 path = os.path.join(destination,filename) 253 df.to_json(path, orient="records", lines=True)
Aggregator
Arguments:
- destination (str): Destination path. Creates directories as needed and overwrites any existing files.
- skip (int, optional): How many rows to jump while reading the input files. Defaults to 1.