Source code for pyiron.base.job.wrapper

# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.

import os
import logging
from pyiron.base.project.generic import Project
from pyiron.base.settings.generic import Settings
from pyiron.base.database.filetable import get_hamilton_from_file, get_hamilton_version_from_file, \

The job wrapper is called from the script, it restores the job from hdf5 and executes it.

__author__ = "Joerg Neugebauer"
__copyright__ = (
    "Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
    "Computational Materials Design (CM) Department"
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = ""
__status__ = "production"
__date__ = "Sep 1, 2017"

s = Settings()

[docs]class JobWrapper(object): """ The job wrapper is called from the script, it restores the job from hdf5 and executes it. Args: working_directory (str): working directory of the job job_id (int/ None): job ID hdf5_file (str): path to the HDF5 file of the job h5_path (str): path inside the HDF5 file to load the job submit_on_remote (bool): submit to queuing system on remote host debug (bool): enable debug mode [True/False] (optional) """ def __init__(self, working_directory, job_id=None, hdf5_file=None, h5_path=None, submit_on_remote=False, debug=False): self.working_directory = working_directory self._remote_flag = submit_on_remote pr = Project(path=os.path.join(working_directory, '..', '..')) if job_id is not None: self.job = pr.load(int(job_id)) else: projectpath = s.top_path(hdf5_file) if projectpath is None: project = os.path.dirname(hdf5_file) else: project = os.path.relpath(os.path.dirname(hdf5_file), projectpath) job_name = h5_path[1:] self.job = pr.load_from_jobpath( job_id=None, db_entry={ "job": job_name, "subjob": h5_path, "projectpath": projectpath, "project": project + '/', "status": get_job_status_from_file( hdf5_file=hdf5_file, job_name=job_name ), "hamilton": get_hamilton_from_file( hdf5_file=hdf5_file, job_name=job_name ), "hamversion": get_hamilton_version_from_file( hdf5_file=hdf5_file, job_name=job_name ) }, convert_to_object=True) # setup logger self._logger = self.setup_logger(debug=debug)
[docs] @staticmethod def setup_logger(debug=False): """ Setup the error logger Args: debug (bool): the level of logging, enable debug mode [True/False] (optional) Returns: logger: logger object instance """ logger = logging.getLogger("pyiron_log") logger.setLevel(logging.INFO) if debug: logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.INFO) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) ch.setFormatter(formatter) logger.addHandler(ch) return logger
[docs] def run(self): """ The job wrapper run command, sets the job status to 'running' and executes run_if_modal(). """ if self._remote_flag and self.job.server.queue is not None: self.job.run_if_scheduler() else: self.job.run_static()
[docs]def job_wrapper_function(working_directory, job_id=None, file_path=None, submit_on_remote=False, debug=False): """ Job Wrapper function - creates a JobWrapper object and calls run() on that object Args: working_directory (str): directory where the HDF5 file of the job is located job_id (int/ None): job id file_path (str): path to the HDF5 file debug (bool): enable debug mode submit_on_remote (bool): submit to queuing system on remote host """ if job_id is not None: job = JobWrapper( working_directory=working_directory, job_id=job_id, submit_on_remote=submit_on_remote, debug=debug ) elif file_path is not None: hdf5_file = '.'.join(file_path.split('.')[:-1]) + '.' + file_path.split('.')[-1].split('/')[0] h5_path = '/'.join(file_path.split('.')[-1].split('/')[1:]) job = JobWrapper( working_directory, job_id=None, hdf5_file=hdf5_file, h5_path='/' + h5_path, submit_on_remote=submit_on_remote, debug=debug ) else: raise ValueError