# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
from builtins import input
import os
import importlib
from six import with_metaclass
import sys
from configparser import ConfigParser
from pathlib2 import Path
from pyiron.base.settings.logger import setup_logger
from pyiron.base.database.generic import DatabaseAccess
from pyiron.base.settings.install import install_pyiron
"""
The settings file provides the attributes of the configuration as properties.
"""
__author__ = "Jan Janssen"
__copyright__ = (
"Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
"Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "production"
__date__ = "Sep 1, 2017"
[docs]class Singleton(type):
"""
Implemented with suggestions from
http://stackoverflow.com/questions/6760685/creating-a-singleton-in-python
"""
_instances = {}
def __call__(cls, *args, **kwargs):
if (
kwargs is not None
and "config" in kwargs.keys()
and kwargs["config"] is not None
):
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
[docs]class Settings(with_metaclass(Singleton)):
"""
The settings object can either search for an configuration file and use the default configuration only when no
other configuration file is found, or it can be forced to use the default configuration file.
Args:
config (dict): Provide a dict with the configuration.
"""
def __init__(self, config=None):
# Default config dictionary
self._configuration = {
"user": "pyiron",
"resource_paths": ["~/pyiron/resources"],
"project_paths": ["~/pyiron/projects"],
"sql_connection_string": None,
"sql_table_name": "jobs_pyiron",
"sql_view_connection_string": None,
"sql_view_table_name": None,
"sql_view_user": None,
"sql_view_user_key": None,
"sql_file": None,
"sql_host": None,
"sql_type": "SQLite",
"sql_user_key": None,
"sql_database": None,
"project_check_enabled": True,
"disable_database": False,
}
environment = os.environ
if "PYIRONCONFIG" in environment.keys():
config_file = environment["PYIRONCONFIG"]
else:
config_file = os.path.expanduser(os.path.join("~", ".pyiron"))
if os.path.isfile(config_file):
self._config_parse_file(config_file)
elif any(["PYIRON" in e for e in environment.keys()]):
self._configuration = self.get_config_from_environment(
environment=environment,
config=self._configuration
)
else:
print("Fall back to default configuration: "
"{'resource_paths': ['~/pyiron/resources'], "
"'project_paths': ['~/pyiron/projects']}")
# Take dictionary as primary source - overwrite everything
self._read_external_config(config=config)
self._configuration["project_paths"] = [
convert_path(path) + "/" if path[-1] != "/" else convert_path(path)
for path in self._configuration["project_paths"]
]
self._configuration["resource_paths"] = [
convert_path(path) for path in self._configuration["resource_paths"]
]
# Build the SQLalchemy connection strings
if not self.database_is_disabled:
self._configuration = self.convert_database_config(
config=self._configuration
)
self._database = None
self._use_local_database = False
self._queue_adapter = None
self._queue_adapter = self._init_queue_adapter(
resource_path_lst=self._configuration["resource_paths"]
)
self.logger = setup_logger()
self._publication_lst = {}
self.publication_add(self.publication)
@property
def database(self):
return self._database
@property
def queue_adapter(self):
return self._queue_adapter
@property
def project_check_enabled(self):
return self._configuration["project_check_enabled"]
@property
def database_is_disabled(self):
return self._configuration["disable_database"]
@property
def publication_lst(self):
"""
List of publications currently in use.
Returns:
list: list of publications
"""
all_publication = []
for v in self._publication_lst.values():
if isinstance(v, list):
all_publication += v
else:
all_publication.append(v)
return all_publication
[docs] def publication_add(self, pub_dict):
"""
Add a publication to the list of publications
Args:
pub_dict (dict): The key should be the name of the code used and the value a list of publications to cite.
"""
for key, value in pub_dict.items():
if key not in self._publication_lst.keys():
self._publication_lst[key] = value
@property
def login_user(self):
"""
Get the username of the current user
Returns:
str: username
"""
return self._configuration["user"]
@property
def resource_paths(self):
"""
Get the path where the potentials for the individual Hamiltons are located
Returns:
list: path of paths
"""
return self._configuration["resource_paths"]
[docs] def open_connection(self):
"""
Internal function to open the connection to the database. Only after this function is called the database is
accessable.
"""
if self._database is None and not self.database_is_disabled:
self._database = DatabaseAccess(
self._configuration["sql_connection_string"],
self._configuration["sql_table_name"],
)
[docs] def switch_to_local_database(self, file_name="pyiron.db", cwd=None):
"""
Swtich to an local SQLite based database.
Args:
file_name (str): SQLite database file name
cwd (str/None): directory where the SQLite database file is located in
"""
if not self._use_local_database and not self.database_is_disabled:
if cwd is None and not os.path.isabs(file_name):
file_name = os.path.join(os.path.abspath(os.path.curdir), file_name)
elif cwd is not None:
file_name = os.path.join(cwd, file_name)
self.close_connection()
self._database = DatabaseAccess(
"sqlite:///" + file_name, self._configuration["sql_table_name"]
)
self._use_local_database = True
else:
print("Database is already in local mode or disabled!")
[docs] def switch_to_central_database(self):
"""
Switch to central database
"""
if self._use_local_database and not self.database_is_disabled:
self.close_connection()
self._database = DatabaseAccess(
self._configuration["sql_connection_string"],
self._configuration["sql_table_name"],
)
self._use_local_database = False
else:
print("Database is already in central mode or disabled!")
[docs] def switch_to_viewer_mode(self):
"""
Switch from user mode to viewer mode - if viewer_mode is enable pyiron has read only access to the database.
"""
if self._configuration["sql_view_connection_string"] is not None and not self.database_is_disabled:
if not self._database.viewer_mode:
self.close_connection()
self._database = DatabaseAccess(
self._configuration["sql_view_connection_string"],
self._configuration["sql_view_table_name"],
)
self._database.viewer_mode = True
else:
print("Database is already in viewer mode!")
else:
print("Viewer Mode is not available on this pyiron installation.")
[docs] def switch_to_user_mode(self):
"""
Switch from viewer mode to user mode - if viewer_mode is enable pyiron has read only access to the database.
"""
if self._configuration["sql_view_connection_string"] is not None and not self.database_is_disabled:
if self._database.viewer_mode:
self.close_connection()
self._database = DatabaseAccess(
self._configuration["sql_connection_string"],
self._configuration["sql_table_name"],
)
self._database.viewer_mode = True
else:
print("Database is already in user mode!")
else:
print("Viewer Mode is not available on this pyiron installation.")
[docs] def close_connection(self):
"""
Internal function to close the connection to the database.
"""
if hasattr(self, "_database") and self._database is not None:
self._database.conn.close()
self._database = None
[docs] def top_path(self, full_path):
"""
Validated that the full_path is a sub directory of one of the pyrion environments loaded.
Args:
full_path (str): path
Returns:
str: path
"""
if full_path[-1] != "/":
full_path += "/"
if not self.project_check_enabled:
return None
for path in self._configuration["project_paths"]:
if path in full_path:
return path
raise ValueError(
"the current path {0} is not included in the .pyiron configuration. {1}".format(
full_path, self._configuration["project_paths"]
)
)
# private functions
@staticmethod
def _init_queue_adapter(resource_path_lst):
"""
Initialize the queue adapter if a folder queues is found in one of the resource paths which contains a
queue configuration file (queue.yaml).
Args:
resource_path_lst (list): List of resource paths
Returns:
pysqa.QueueAdapter:
"""
for resource_path in resource_path_lst:
if (
os.path.exists(resource_path)
and "queues" in os.listdir(resource_path)
and "queue.yaml" in os.listdir(os.path.join(resource_path, "queues"))
):
queueadapter = getattr(importlib.import_module("pysqa"), "QueueAdapter")
return queueadapter(directory=os.path.join(resource_path, "queues"))
return None
def _config_parse_file(self, config_file):
"""
Read section in configuration file and return a dictionary with the corresponding parameters.
Args:
config_file(str): confi file to parse
Returns:
dict: dictionary with the environment configuration
"""
# load config parser - depending on Python version
parser = ConfigParser(inline_comment_prefixes=(";",))
# read config
parser.read(config_file)
# load first section or default section [DEFAULT]
if len(parser.sections()) > 0:
section = parser.sections()[0]
else:
section = "DEFAULT"
# identify SQL type
if parser.has_option(section, "TYPE"):
self._configuration["sql_type"] = parser.get(section, "TYPE")
# read variables
if parser.has_option(section, "PROJECT_PATHS"):
self._configuration["project_paths"] = [
convert_path(c.strip())
for c in parser.get(section, "PROJECT_PATHS").split(",")
]
elif parser.has_option(
section, "TOP_LEVEL_DIRS"
): # for backwards compatibility
self._configuration["project_paths"] = [
convert_path(c.strip())
for c in parser.get(section, "TOP_LEVEL_DIRS").split(",")
]
else:
ValueError("No project path identified!")
if parser.has_option(section, "PROJECT_CHECK_ENABLED"):
self._configuration["project_check_enabled"] = \
parser.getboolean(section, "PROJECT_CHECK_ENABLED")
if parser.has_option(section, "DISABLE_DATABASE"):
self._configuration["disable_database"] = \
parser.getboolean(section, "DISABLE_DATABASE")
if parser.has_option(section, "RESOURCE_PATHS"):
self._configuration["resource_paths"] = [
convert_path(c.strip())
for c in parser.get(section, "RESOURCE_PATHS").split(",")
]
if self._configuration["sql_type"] in ["Postgres", "MySQL"]:
if (
parser.has_option(section, "USER")
& parser.has_option(section, "PASSWD")
& parser.has_option(section, "HOST")
& parser.has_option(section, "NAME")
):
self._configuration["user"] = parser.get(section, "USER")
self._configuration["sql_user_key"] = parser.get(section, "PASSWD")
self._configuration["sql_host"] = parser.get(section, "HOST")
self._configuration["sql_database"] = parser.get(section, "NAME")
self._configuration["sql_file"] = None
else:
raise ValueError(
"If type Postgres or MySQL are selected the options USER, PASSWD, HOST and NAME are"
"required in the configuration file."
)
if (
parser.has_option(section, "VIEWERUSER")
& parser.has_option(section, "VIEWERPASSWD")
& parser.has_option(section, "VIEWER_TABLE")
):
self._configuration["sql_view_table_name"] = parser.get(
section, "VIEWER_TABLE"
)
self._configuration["sql_view_user"] = parser.get(section, "VIEWERUSER")
self._configuration["sql_view_user_key"] = parser.get(
section, "VIEWERPASSWD"
)
elif self._configuration["sql_type"] == "SQLalchemy":
self._configuration["sql_connection_string"] = parser.get(
section, "CONNECTION"
)
else: # finally we assume an SQLite connection
if parser.has_option(section, "FILE"):
self._configuration["sql_file"] = parser.get(section, "FILE").replace(
"\\", "/"
)
if parser.has_option(section, "DATABASE_FILE"):
self._configuration["sql_file"] = parser.get(
section, "DATABASE_FILE"
).replace("\\", "/")
if parser.has_option(section, "JOB_TABLE"):
self._configuration["sql_table_name"] = parser.get(section, "JOB_TABLE")
[docs] @staticmethod
def convert_database_config(config):
# Build the SQLalchemy connection strings
if config["sql_type"] == "Postgres":
config["sql_connection_string"] = (
"postgresql://"
+ config["user"]
+ ":"
+ config["sql_user_key"]
+ "@"
+ config["sql_host"]
+ "/"
+ config["sql_database"]
)
if config["sql_view_user"] is not None:
config["sql_view_connection_string"] = (
"postgresql://"
+ config["sql_view_user"]
+ ":"
+ config["sql_view_user_key"]
+ "@"
+ config["sql_host"]
+ "/"
+ config["sql_database"]
)
elif config["sql_type"] == "MySQL":
config["sql_connection_string"] = (
"mysql+pymysql://"
+ config["user"]
+ ":"
+ config["sql_user_key"]
+ "@"
+ config["sql_host"]
+ "/"
+ config["sql_database"]
)
else:
# SQLite is raising ugly error messages when the database directory does not exist.
if config["sql_file"] is None:
config["sql_file"] = "/".join(
[config["resource_paths"][0], "sqlite.db"]
)
if os.path.dirname(
config["sql_file"]
) != "" and not os.path.exists(
os.path.dirname(config["sql_file"])
):
os.makedirs(os.path.dirname(config["sql_file"]))
config[
"sql_connection_string"
] = "sqlite:///" + config["sql_file"].replace("\\", "/")
return config
def _read_external_config(self, config):
if isinstance(config, dict):
for key, value in config.items():
if key not in ["resource_paths", "project_paths"] or isinstance(
value, list
):
self._configuration[key] = value
elif isinstance(value, str):
self._configuration[key] = [value]
else:
TypeError(
"Config dictionary parameter type not recognized ", key, value
)
[docs] @staticmethod
def get_config_from_environment(environment, config):
env_key_mapping = {
"PYIRONUSER": "user",
"PYIRONRESOURCEPATHS": "resource_paths",
"PYIRONPROJECTPATHS": "project_paths",
"PYIRONSQLCONNECTIONSTRING": "sql_connection_string",
"PYIRONSQLTABLENAME": "sql_table_name",
"PYIRONSQLVIEWCONNECTIONSTRING": "sql_view_connection_string",
"PYIRONSQLVIEWTABLENAME": "sql_view_table_name",
"PYIRONSQLVIEWUSER": "sql_view_user",
"PYIRONSQLVIEWUSERKEY": "sql_view_user_key",
"PYIRONSQLFILE": "sql_file",
"PYIRONSQHOST": "sql_host",
"PYIRONSQLTYPE": "sql_type",
"PYIRONSQLUSERKEY": "sql_user_key",
"PYIRONSQLDATABASE": "sql_database",
"PYIRONPROJECTCHECKENABLED": "project_check_enabled",
"PYIRONDISABLE": "disable_database",
}
for k, v in env_key_mapping.items():
if k in environment.keys():
if k in ["PYIRONPROJECTCHECKENABLED", "PYIRONDISABLE"]:
config[v] = environment[k].lower() in ['t', 'true', 'y', 'yes']
elif k in ["PYIRONRESOURCEPATHS", "PYIRONPROJECTPATHS"]:
config[v] = environment[k].split(':')
else:
config[v] = environment[k]
return config
@property
def publication(self):
return {
"pyiron": {
"pyiron-paper": {
"author": [
"Jan Janssen",
"Sudarsan Surendralal",
"Yury Lysogorskiy",
"Mira Todorova",
"Tilmann Hickel",
"Ralf Drautz",
"Jörg Neugebauer",
],
"title": "pyiron: An integrated development environment for computational "
"materials science",
"journal": "Computational Materials Science",
"volume": "161",
"pages": "24 - 36",
"issn": "0927-0256",
"doi": "https://doi.org/10.1016/j.commatsci.2018.07.043",
"url": "http://www.sciencedirect.com/science/article/pii/S0927025618304786",
"year": "2019",
}
}
}
[docs]def convert_path(path):
"""
Convert path to POSIX path
Args:
path(str): input path
Returns:
str: absolute path in POSIX format
"""
return os.path.abspath(os.path.expanduser(path)).replace("\\", "/")