Source code for pyiron.base.database.filetable

import numpy as np
import os
import pandas
import datetime
import h5io
import sys
from six import with_metaclass
from pyfileindex import PyFileIndex


[docs]class Singleton(type): """ Implemented with suggestions from http://stackoverflow.com/questions/6760685/creating-a-singleton-in-python """ _instances = {} def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) return cls._instances[cls]
[docs]def filter_function(file_name): return '.h5' in file_name
[docs]class FileTable(with_metaclass(Singleton)): def __init__(self, project): self._fileindex = None self._job_table = None self._project = os.path.abspath(project) self._columns = ['id', 'status', 'chemicalformula', 'job', 'subjob', 'projectpath', 'project', 'timestart', 'timestop', 'totalcputime', 'computer', 'hamilton', 'hamversion', 'parentid', 'masterid', 'username'] self.force_reset()
[docs] def force_reset(self): self._fileindex = PyFileIndex(path=self._project, filter_function=filter_function) df = pandas.DataFrame(self.init_table(fileindex=self._fileindex.dataframe)) if len(df) != 0: self._job_table = df[np.array(self._columns)] else: self._job_table = pandas.DataFrame({k: [] for k in self._columns})
[docs] def init_table(self, fileindex, working_dir_lst=None): if working_dir_lst is None: working_dir_lst = [] fileindex = fileindex[~fileindex.is_directory] fileindex = fileindex.iloc[fileindex.path.values.argsort()] job_lst = [] for path, mtime in zip(fileindex.path, fileindex.mtime): job_dict = self.get_extract(path, mtime) job_dict['id'] = len(working_dir_lst) + 1 working_dir_lst.append(job_dict['project'][:-1] + job_dict['subjob'] + '_hdf5/') if job_dict['project'] in working_dir_lst: job_dict['masterid'] = working_dir_lst.index(job_dict['project']) + 1 else: job_dict['masterid'] = None job_lst.append(job_dict) return job_lst
[docs] @staticmethod def get_extract(path, mtime): basename = os.path.basename(path) job = os.path.splitext(basename)[0] time = datetime.datetime.fromtimestamp(mtime) return {'status': get_job_status_from_file(hdf5_file=path, job_name=job), 'chemicalformula': None, 'job': job, 'subjob': '/' + job, 'projectpath': None, 'project': os.path.dirname(path) + '/', 'timestart': time, 'timestop': time, 'totalcputime': 0.0, 'computer': None, 'username': None, 'parentid': None, 'hamilton': get_hamilton_from_file(hdf5_file=path, job_name=job), 'hamversion': get_hamilton_version_from_file(hdf5_file=path, job_name=job)}
[docs] def add_item_dict(self, par_dict): par_dict = dict((key.lower(), value) for key, value in par_dict.items()) if len(self._job_table) != 0: job_id = np.max(self._job_table.id.values) + 1 else: job_id = 1 default_values = {'id': job_id, 'status': 'initialized', 'chemicalformula': None, 'timestart': datetime.datetime.now(), 'computer': None, 'parentid': None, 'username': None, 'timestop': None, 'totalcputime': None, 'masterid': None} for k, v in default_values.items(): if k not in par_dict.keys(): par_dict[k] = v self._job_table = pandas.concat([self._job_table, pandas.DataFrame([par_dict])[self._columns]]).reset_index(drop=True) return int(par_dict['id'])
[docs] def item_update(self, par_dict, item_id): if isinstance(item_id, list): item_id = item_id[0] if isinstance(item_id, str): item_id = float(item_id) for k, v in par_dict.items(): self._job_table.loc[self._job_table.id == int(item_id), k] = v
[docs] def delete_item(self, item_id): item_id = int(item_id) if item_id in [int(v) for v in self._job_table.id.values]: self._job_table = self._job_table[self._job_table.id != item_id].reset_index(drop=True) else: raise ValueError
[docs] def get_item_by_id(self, item_id): item_id = int(item_id) return {k: list(v.values())[0] for k, v in self._job_table[self._job_table.id == item_id].to_dict().items()}
[docs] def get_items_dict(self, item_dict, return_all_columns=True): df = self._job_table if not isinstance(item_dict, dict): raise TypeError for k, v in item_dict.items(): if k in ['id', 'parentid', 'masterid']: df = df[df[k] == int(v)] elif "%" not in str(v): df = df[df[k] == v] else: df = df[df[k].str.contains(v.replace('%', ''))] df_dict = df.to_dict() if return_all_columns: return [{k: v[i] for k, v in df_dict.items()} for i in df_dict['id'].keys()] else: return [{'id': i} for i in df_dict['id'].values()]
[docs] def update(self): self._fileindex.update() if len(self._job_table) != 0: files_lst, working_dir_lst = zip(*[[project + subjob[1:] + '.h5', project + subjob[1:] + '_hdf5'] for project, subjob in zip(self._job_table.project.values, self._job_table.subjob.values)]) df_new = self._fileindex.dataframe[ ~self._fileindex.dataframe.is_directory & ~self._fileindex.dataframe.path.isin(files_lst)] else: files_lst, working_dir_lst = [], [] df_new = self._fileindex.dataframe[~self._fileindex.dataframe.is_directory] if len(df_new) > 0: job_lst = self.init_table(fileindex=df_new, working_dir_lst=list(working_dir_lst)) df = pandas.DataFrame(job_lst)[self._columns] if len(files_lst) != 0 and len(working_dir_lst) != 0: self._job_table = pandas.concat([self._job_table, df]).reset_index(drop=True) else: self._job_table = df
[docs] def get_db_columns(self): return self.get_table_headings()
[docs] def get_table_headings(self): return self._job_table.columns.values
[docs] def job_table(self, project=None, recursive=True, columns=None, all_columns=False, sort_by="id", max_colwidth=200, job_name_contains=''): if project is None: project = self._project if columns is None: columns = ["job", "project", "chemicalformula"] if all_columns: columns = self._columns if len(self._job_table) != 0: if recursive: df = self._job_table[self._job_table.project.str.contains(project)] else: df = self._job_table[self._job_table.project == project] else: df = self._job_table pandas.set_option("display.max_colwidth", max_colwidth) if len(df) == 0: return df if job_name_contains != '': df = df[df.job.str.contains(job_name_contains)] if sort_by in columns: return df[columns].sort_values(by=sort_by) return df[columns]
[docs] def get_jobs(self, project=None, recursive=True, columns=None): if project is None: project = self._project if columns is None: columns = ["id", "project"] df = self.job_table(project=project, recursive=recursive, columns=columns) if len(df) == 0: dictionary = {} for key in columns: dictionary[key] = list() return dictionary # return {key: list() for key in columns} dictionary = {} for key in df.keys(): dictionary[key] = df[ key ].tolist() # ToDo: Check difference of tolist and to_list return dictionary
[docs] def get_job_ids(self, project=None, recursive=True): return self.get_jobs(project=project, recursive=recursive, columns=['id'])["id"]
[docs] def get_job_id(self, job_specifier, project=None): if project is None: project = self._project if isinstance(job_specifier, (int, np.integer)): return job_specifier # is id job_specifier.replace(".", "_") # if job_specifier[0] is not '/': # sub_job_name = '/' + job_specifier # else: # sub_job_name = job_specifier # job_dict = _job_dict(database, sql_query, user, project_path, recursive=False, # job=job_specifier, # sub_job_name=sub_job_name) # if len(job_dict) == 0: # job_dict = _job_dict(database, sql_query, user, project_path, recursive=True, # job=job_specifier, # sub_job_name=sub_job_name) job_id_lst = self._job_table[ (self._job_table.project == project) & (self._job_table.job == job_specifier)].id.values if len(job_id_lst) == 0: job_id_lst = self._job_table[ self._job_table.project.str.contains(project) & (self._job_table.job == job_specifier)].id.values if len(job_id_lst) == 0: return None elif len(job_id_lst) == 1: return int(job_id_lst[0]) else: raise ValueError( "job name '{0}' in this project is not unique".format(job_specifier) )
[docs] def get_child_ids(self, job_specifier, project=None, status=None): """ Get the childs for a specific job Args: database (DatabaseAccess): Database object sql_query (str): SQL query to enter a more specific request user (str): username of the user whoes user space should be searched project_path (str): root_path - this is in contrast to the project_path in GenericPath job_specifier (str): name of the master job or the master jobs job ID status (str): filter childs which match a specific status - None by default Returns: list: list of child IDs """ if project is None: project = self._project id_master = self.get_job_id(project=project, job_specifier=job_specifier) if id_master is None: return [] else: if status is not None: id_lst = self._job_table[ (self._job_table.masterid == id_master) & (self._job_table.status == status)].id.values else: id_lst = self._job_table[(self._job_table.masterid == id_master)].id.values return sorted(id_lst)
[docs] def set_job_status(self, job_specifier, status, project=None): """ Set the status of a particular job Args: database (DatabaseAccess): Database object sql_query (str): SQL query to enter a more specific request user (str): username of the user whoes user space should be searched project_path (str): root_path - this is in contrast to the project_path in GenericPath job_specifier (str): name of the job or job ID status (str): job status can be one of the following ['initialized', 'appended', 'created', 'submitted', 'running', 'aborted', 'collect', 'suspended', 'refresh', 'busy', 'finished'] """ if project is None: project = self._project job_id = self.get_job_id(project=project, job_specifier=job_specifier) self._job_table.loc[self._job_table.id == job_id, 'status'] = status db_entry = self.get_item_by_id(item_id=job_id) h5io.write_hdf5(db_entry["project"] + db_entry["subjob"] + '.h5', status, title=db_entry["subjob"][1:] + '/status', overwrite="update")
[docs] def get_job_status(self, job_specifier, project=None): """ Get the status of a particular job Args: database (DatabaseAccess): Database object sql_query (str): SQL query to enter a more specific request user (str): username of the user whoes user space should be searched project_path (str): root_path - this is in contrast to the project_path in GenericPath job_specifier (str): name of the job or job ID Returns: str: job status can be one of the following ['initialized', 'appended', 'created', 'submitted', 'running', 'aborted', 'collect', 'suspended', 'refresh', 'busy', 'finished'] """ if project is None: project = self._project try: return self._job_table[ self._job_table.id == self.get_job_id(project=project, job_specifier=job_specifier)].status.values[0] except KeyError: return None
[docs] def get_job_working_directory(self, job_specifier, project=None): """ Get the working directory of a particular job Args: database (DatabaseAccess): Database object sql_query (str): SQL query to enter a more specific request user (str): username of the user whoes user space should be searched project_path (str): root_path - this is in contrast to the project_path in GenericPath job_specifier (str): name of the job or job ID Returns: str: working directory as absolute path """ if project is None: project = self._project try: db_entry = self.get_item_by_id(item_id=self.get_job_id(project=project, job_specifier=job_specifier)) if db_entry and len(db_entry) > 0: job_name = db_entry["subjob"][1:] return os.path.join( db_entry["project"], job_name + "_hdf5", job_name, ) else: return None except KeyError: return None
[docs]def get_hamilton_from_file(hdf5_file, job_name): return h5io.read_hdf5(hdf5_file, job_name + '/TYPE').split(".")[-1].split("'")[0]
[docs]def get_hamilton_version_from_file(hdf5_file, job_name): return h5io.read_hdf5(hdf5_file, job_name + '/VERSION')
[docs]def get_job_status_from_file(hdf5_file, job_name): return h5io.read_hdf5(hdf5_file, job_name + '/status')