Source code for pyiron.base.database.generic

# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.

import numpy as np
import re
import time
from datetime import datetime
from sqlalchemy import (
    Column,
    create_engine,
    DateTime,
    Float,
    Integer,
    MetaData,
    String,
    Table,
    text,
    and_,
    or_,
)
from sqlalchemy.pool import NullPool
from sqlalchemy.sql import select
from sqlalchemy.exc import OperationalError, DatabaseError

"""
DatabaseAccess class deals with accessing the database
"""

__author__ = "Murat Han Celik"
__copyright__ = (
    "Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH"
    " - Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "production"
__date__ = "Sep 1, 2017"


[docs]class AutorestoredConnection: def __init__(self, engine): self.engine = engine self._conn = None
[docs] def execute(self, *args, **kwargs): try: if not self._conn or self._conn.closed: self._conn = self.engine.connect() result = self._conn.execute(*args, **kwargs) except OperationalError: time.sleep(5) result = self.execute(*args, **kwargs) return result
[docs] def close(self): if self._conn is not None: self._conn.close()
[docs]class DatabaseAccess(object): """ A core element of PyIron, which generally deals with accessing the database: getting, sending, changing some data to the db. Args: connection_string (str): SQLalchemy connection string which specifies the database to connect to typical form: dialect+driver://username:password@host:port/database example: 'postgresql://scott:tiger@cmcent56.mpie.de/mdb' table_name (str): database table name, a simple string like: 'simulation' Murat Han Celik """ def __init__(self, connection_string, table_name): """ Initialize the Database connection Args: connection_string (str): SQLalchemy connection string which specifies the database to connect to typical form: dialect+driver://username:password@host:port/database example: 'postgresql://scott:tiger@cmcent56.mpie.de/mdb' table_name (str): database table name, a simple string like: 'simulation' """ self.table_name = table_name self._keep_connection = False self._sql_lite = "sqlite" in connection_string try: if not self._sql_lite: self._engine = create_engine( connection_string, connect_args={"connect_timeout": 15}, poolclass=NullPool, ) self.conn = AutorestoredConnection(self._engine) else: self._engine = create_engine(connection_string) self.conn = self._engine.connect() self.conn.connection.create_function("like", 2, self.regexp) self._keep_connection = True except Exception as except_msg: raise ValueError("Connection to database failed: " + str(except_msg)) self.__reload_db() self.simulation_table = Table( str(table_name), self.metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("parentid", Integer), Column("masterid", Integer), Column("projectpath", String(50)), Column("project", String(255)), Column("job", String(50)), Column("subjob", String(255)), Column("chemicalformula", String(30)), Column("status", String(20)), Column("hamilton", String(20)), Column("hamversion", String(50)), Column("username", String(20)), Column("computer", String(100)), Column("timestart", DateTime), Column("timestop", DateTime), Column("totalcputime", Float), extend_existing=True, ) self.metadata.create_all() self._viewer_mode = False @property def viewer_mode(self): """ Get viewer_mode - if viewer_mode is enable pyiron has read only access to the database. Returns: bool: returns TRUE when viewer_mode is enabled """ return self._viewer_mode @viewer_mode.setter def viewer_mode(self, value): """ Set viewer_mode - if viewer_mode is enable pyiron has read only access to the database. Args: value (bool): TRUE or FALSE """ if isinstance(value, bool): self._viewer_mode = value else: raise TypeError("Viewmode can only be TRUE or FALSE.") # Internal functions def __del__(self): """ Close database connection Returns: """ if not self._keep_connection: self.conn.close() def __reload_db(self): """ Reload database Returns: """ self.metadata = MetaData(bind=self._engine) self.metadata.reflect(self._engine)
[docs] @staticmethod def regexp(expr, item): """ Regex function for SQLite Args: expr: str, regex expression item: str, item which needs to be checked Returns: """ expr = expr.replace("%", "(.)*") expr = expr.replace("_", ".") expr = "^" + expr if expr[-1] != "%": expr += "$" reg = re.compile(expr) if item is not None: return reg.search(item) is not None
# Table functions
[docs] def get_table_headings(self, table_name=None): """ Get column names Args: table_name (str): simple string of a table_name like: 'jobs_username' Returns: list: list of column names like: ['id', 'parentid', 'masterid', 'projectpath', 'project', 'job', 'subjob', 'chemicalformula', 'status', 'hamilton', 'hamversion', 'username', 'computer', 'timestart', 'timestop', 'totalcputime'] """ if table_name is None: table_name = self.table_name self.__reload_db() try: simulation_list = Table( str(table_name), self.metadata, autoload=True, autoload_with=self._engine, ) except Exception: raise ValueError(str(table_name) + " does not exist") return [column.name for column in iter(simulation_list.columns)]
[docs] def add_column(self, col_name, col_type): """ Add an additional column - required for modification on the database Args: col_name (str, list): name of the new column, normal string like: 'myColumn' col_type (str, list: SQL type of the new column, SQL type like: 'varchar(50)' Returns: """ if not self._viewer_mode: if isinstance(col_name, list): col_name = col_name[-1] if isinstance(col_type, list): col_type = col_type[-1] self._engine.execute( "ALTER TABLE %s ADD COLUMN %s %s" % (self.simulation_table.name, col_name, col_type) ) else: raise PermissionError("Not avilable in viewer mode.")
[docs] def change_column_type(self, col_name, col_type): """ Modify data type of an existing column - required for modification on the database Args: col_name (str, list): name of the new column, normal string like: 'myColumn' col_type (str, list: SQL type of the new column, SQL type like: 'varchar(50)' Returns: """ if not self._viewer_mode: if isinstance(col_name, list): col_name = col_name[-1] if isinstance(col_type, list): col_type = col_type[-1] self._engine.execute( "ALTER TABLE %s ALTER COLUMN %s TYPE %s" % (self.simulation_table.name, col_name, col_type) ) else: raise PermissionError("Not avilable in viewer mode.")
[docs] def get_items_sql(self, where_condition=None, sql_statement=None): """ Submit an SQL query to the database Args: where_condition (str): SQL where query, query like: "project LIKE 'lammps.phonons.Ni_fcc%'" sql_statement (str): general SQL query, normal SQL statement Returns: list: get a list of dictionaries, where each dictionary represents one item of the table like: [{u'chemicalformula': u'BO', u'computer': u'localhost', u'hamilton': u'VAMPS', u'hamversion': u'1.1', u'id': 1, u'job': u'testing', u'masterid': None, u'parentid': 0, u'project': u'database.testing', u'projectpath': u'/TESTING', u'status': u'KAAAA', u'subjob': u'testJob', u'timestart': u'2016-05-02 11:31:04.253377', u'timestop': u'2016-05-02 11:31:04.371165', u'totalcputime': 0.117788, u'username': u'User'}, {u'chemicalformula': u'BO', u'computer': u'localhost', u'hamilton': u'VAMPS', u'hamversion': u'1.1', u'id': 2, u'job': u'testing', u'masterid': 0, u'parentid': 0, u'project': u'database.testing', u'projectpath': u'/TESTING', u'status': u'KAAAA', u'subjob': u'testJob', u'timestart': u'2016-05-02 11:31:04.253377', u'timestop': u'2016-05-02 11:31:04.371165', u'totalcputime': 0.117788, u'username': u'User'}.....] """ if where_condition: where_condition = ( where_condition.replace("like", "similar to") if self._engine.dialect.name == "postgresql" else where_condition ) try: query = "select * from " + self.table_name + " where " + where_condition query.replace("%", "%%") result = self.conn.execute(text(query)) except Exception as except_msg: print("EXCEPTION in get_items_sql: ", except_msg) raise ValueError("EXCEPTION in get_items_sql: ", except_msg) elif sql_statement: sql_statement = ( sql_statement.replace("like", "similar to") if self._engine.dialect.name == "postgresql" else sql_statement ) # TODO: make it save against SQL injection result = self.conn.execute(text(sql_statement)) else: result = self.conn.execute(text("select * from " + self.table_name)) row = result.fetchall() if not self._keep_connection: self.conn.close() # change the date of str datatype back into datetime object output_list = [] for col in row: # ensures working with db entries, which are camel case timestop_index = [item.lower() for item in col.keys()].index("timestop") timestart_index = [item.lower() for item in col.keys()].index("timestart") tmp_values = col.values() if ( col.values()[timestop_index] and col.values()[timestart_index] ) is not None: # changes values try: tmp_values[timestop_index] = datetime.strptime( str(tmp_values[timestop_index]), "%Y-%m-%d %H:%M:%S.%f" ) tmp_values[timestart_index] = datetime.strptime( str(tmp_values[timestart_index]), "%Y-%m-%d %H:%M:%S.%f" ) except ValueError: print("error in: ", str(col)) output_list += [dict(zip(col.keys(), tmp_values))] return output_list
# Item functions
[docs] def add_item_dict(self, par_dict): """ Create a new database item Args: par_dict (dict): Dictionary with the item values and column names as keys, like: {'chemicalformula': 'BO', 'computer': 'localhost', 'hamilton': 'VAMPS', 'hamversion': '1.1', 'job': 'testing', 'subjob' : 'SubJob', 'parentid': 0L, 'myCol': 'Blubbablub', 'project': 'database.testing', 'projectpath': '/root/directory/tmp', 'status': 'KAAAA', 'timestart': datetime(2016, 5, 2, 11, 31, 4, 253377), 'timestop': datetime(2016, 5, 2, 11, 31, 4, 371165), 'totalcputime': 0.117788, 'username': 'Test'} Returns: int: Database ID of the item created as an int, like: 3 """ if not self._viewer_mode: try: par_dict = dict( (key.lower(), value) for key, value in par_dict.items() ) # make keys lowercase result = self.conn.execute( self.simulation_table.insert(par_dict) ).inserted_primary_key[-1] if not self._keep_connection: self.conn.close() return result except Exception as except_msg: raise ValueError("Error occurred: " + str(except_msg)) else: raise PermissionError("Not avilable in viewer mode.")
def __get_items(self, col_name, var): """ Get multiple items from the database Args: col_name (str): column to query for, like : 'id' var (str, int): value of the specific column, like: '2' ----> __get_items('id', '2') Returns: dict: Dictionary where the key is the column name, like: [{'chemicalformula': u'BO', 'computer': u'computer', 'hamilton': u'VAMPS', 'hamversion': u'1.1', ------>'id': 2, 'job': u'testing', 'parentid': 0, 'project': u'database.testing', 'projectpath': u'/root/directory/tmp', 'samucol': None, 'status': u'Testing', 'timestart': datetime.datetime(2016, 5, 2, 11, 31, 4, 253377), 'timestop': datetime.datetime(2016, 5, 2, 11, 31, 4, 371165), 'totalcputime': 0.117788, 'username': u'Test'}] """ try: if type(var) is list: var = var[-1] query = select( [self.simulation_table], self.simulation_table.c[str(col_name)] == var ) except Exception: raise ValueError("There is no Column named: " + col_name) try: result = self.conn.execute(query) except (OperationalError, DatabaseError): if not self._sql_lite: self.conn = AutorestoredConnection(self._engine) else: self.conn = self._engine.connect() self.conn.connection.create_function("like", 2, self.regexp) result = self.conn.execute(query) row = result.fetchall() if not self._keep_connection: self.conn.close() return [dict(zip(col.keys(), col.values())) for col in row]
[docs] def item_update(self, par_dict, item_id): """ Modify Item in database Args: par_dict (dict): Dictionary of the parameters to be modified,, where the key is the column name. {'job' : 'maximize', 'subjob' : 'testing', ........} item_id (int, list): Database Item ID (Integer) - '38' can also be [38] Returns: """ if not self._viewer_mode: if type(item_id) is list: item_id = item_id[-1] # sometimes a list is given, make it int if np.issubdtype(type(item_id), np.integer): item_id = int(item_id) # all items must be lower case, ensured here par_dict = dict((key.lower(), value) for key, value in par_dict.items()) query = self.simulation_table.update( self.simulation_table.c["id"] == item_id ).values() try: self.conn.execute(query, par_dict) except (OperationalError, DatabaseError): if not self._sql_lite: self.conn = AutorestoredConnection(self._engine) else: self.conn = self._engine.connect() self.conn.connection.create_function("like", 2, self.regexp) self.conn.execute(query, par_dict) if not self._keep_connection: self.conn.close() else: raise PermissionError("Not avilable in viewer mode.")
[docs] def delete_item(self, item_id): """ Delete Item from database Args: item_id (int): Databse Item ID (Integer), like: 38 Returns: """ if not self._viewer_mode: self.conn.execute( self.simulation_table.delete( self.simulation_table.c["id"] == int(item_id) ) ) if not self._keep_connection: self.conn.close() else: raise PermissionError("Not avilable in viewer mode.")
# Shortcut
[docs] def get_item_by_id(self, item_id): """ Get item from database by searching for a specific item Id. Args: item_id (int): Databse Item ID (Integer), like: 38 Returns: dict: Dictionary where the key is the column name, like: {'chemicalformula': u'BO', 'computer': u'localhost', 'hamilton': u'VAMPS', 'hamversion': u'1.1', 'id': 1, 'job': u'testing', 'masterid': None, 'parentid': 0, 'project': u'database.testing', 'projectpath': u'/root/directory/tmp', 'status': u'KAAAA', 'subjob': u'SubJob', 'timestart': datetime.datetime(2016, 5, 2, 11, 31, 4, 253377), 'timestop': datetime.datetime(2016, 5, 2, 11, 31, 4, 371165), 'totalcputime': 0.117788, 'username': u'Test'} """ # convert item_id to int type # needed since psycopg2 gives otherwise an error for np.int64 type (bigint in database) if item_id is None: return None if isinstance(item_id, (str, float)): item_id = int(item_id) if np.issubdtype(type(item_id), np.integer): try: return self.__get_items("id", int(item_id))[-1] except TypeError as except_msg: raise TypeError( "Wrong data type given as parameter. item_id has to be Integer or String: ", except_msg, ) except IndexError as except_msg: raise IndexError( "Error when trying to find elements by given Job ID: ", except_msg ) else: raise TypeError("THE SQL database ID has to be an integer.")
[docs] def query_for_element(self, element): return or_( *[ self.simulation_table.c["chemicalformula"].like( "%" + element + "[ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789]%" ), self.simulation_table.c["chemicalformula"].like("%" + element), ] )
[docs] def get_items_dict(self, item_dict, return_all_columns=True): """ Args: item_dict (dict): a dict type, which has a certain syntax for this function: a normal dict like {'hamilton': 'VAMPE', 'hamversion': '1.1'} has similarities with a simple query like select * from table_name where hamilton = 'VAMPE AND hamversion = '1.1' as seen it puts an AND for every key, value combination in the dict and searches for it. another syntax is for an OR statement, simply: {'hamilton': ['VAMPE', 'LAMMPS']}, the query would be: select * from table_name where hamilton = 'VAMPE' OR hamilton = 'LAMMPS' and lastly for a LIKE statement, simply: {'project': 'database.%'}, the query would be select * from table_name where project LIKE 'database.%' that means you can simply add the syntax for a like statement like '%' and it will automatically operate a like-search of course you can also use a more complex select method, with everything in use: {'hamilton': ['VAMPE', 'LAMMPS'], 'project': 'databse%', 'hamversion': '1.1'} select * from table_name where (hamilton = 'VAMPE' Or hamilton = 'LAMMPS') AND (project LIKE 'database%') AND hamversion = '1.1' return_all_columns (bool): return all columns or only the 'id' - still the format stays the same. Returns: list: the function returns a list of dicts like get_items_sql, but it does not format datetime: [{'chemicalformula': u'Ni108', 'computer': u'mapc157', 'hamilton': u'LAMMPS', 'hamversion': u'1.1', 'id': 24, 'job': u'DOF_1_0', 'parentid': 21L, 'project': u'lammps.phonons.Ni_fcc', 'projectpath': u'D:/PyIron/PyIron_data/projects', 'status': u'finished', 'timestart': datetime.datetime(2016, 6, 24, 10, 17, 3, 140000), 'timestop': datetime.datetime(2016, 6, 24, 10, 17, 3, 173000), 'totalcputime': 0.033, 'username': u'test'}, {'chemicalformula': u'Ni108', 'computer': u'mapc157', 'hamilton': u'LAMMPS', 'hamversion': u'1.1', 'id': 21, 'job': u'ref', 'parentid': 20L, 'project': u'lammps.phonons.Ni_fcc', 'projectpath': u'D:/PyIron/PyIron_data/projects', 'status': u'finished', 'timestart': datetime.datetime(2016, 6, 24, 10, 17, 2, 429000), 'timestop': datetime.datetime(2016, 6, 24, 10, 17, 2, 463000), 'totalcputime': 0.034, 'username': u'test'},.......] """ if not isinstance(item_dict, dict): raise TypeError("Wrong DataType! Only Dicts are usable!") and_statement = [] # list for the whole sqlalchemy statement # here we go through all keys and values of item_dict for key, value in item_dict.items(): # if a value of item_dict is a list, we have to make an or statement of it if key == "element_lst": part_of_statement = [ self.query_for_element(element=element) for element in value ] elif isinstance(value, list): or_statement = [ self.simulation_table.c[str(key)] == element if "%" not in element else self.simulation_table.c[str(key)].like(element) for element in value ] # here we wrap the given values in an sqlalchemy-type or_statement part_of_statement = [or_(*or_statement)] else: if "%" not in str(value): part_of_statement = [self.simulation_table.c[str(key)] == value] else: part_of_statement = [self.simulation_table.c[str(key)].like(value)] # here all statements are wrapped together for the and statement and_statement += part_of_statement if return_all_columns: query = select([self.simulation_table], and_(*and_statement)) else: query = select([self.simulation_table.columns["id"]], and_(*and_statement)) try: result = self.conn.execute(query) except (OperationalError, DatabaseError): if not self._sql_lite: self.conn = AutorestoredConnection(self._engine) else: self.conn = self._engine.connect() self.conn.connection.create_function("like", 2, self.regexp) result = self.conn.execute(query) row = result.fetchall() if not self._keep_connection: self.conn.close() return [dict(zip(col.keys(), col.values())) for col in row]