Source code for pyiron.interactive.sxextoptint

# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.

import numpy as np
import subprocess
import os
import time
import posixpath
import warnings
from pyiron.base.settings.generic import Settings
from pyiron.base.generic.parameters import GenericParameters
from pyiron.atomistics.job.interactivewrapper import (
    InteractiveWrapper,
    ReferenceJobOutput,
)
from pyiron.atomistics.job.interactive import InteractiveInterface
from pyiron.base.job.executable import Executable
from pyiron.sphinx.base import InputWriter

__author__ = "Jan Janssen, Osamu Waseda"
__copyright__ = (
    "Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
    "Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "development"
__date__ = "Sep 1, 2018"


s = Settings()


[docs]class SxExtOpt(InteractiveInterface): def __init__( self, structure, working_directory=None, maxDist=5, ionic_steps=1000, ionic_energy=1.0e-3, ionic_forces=1.0e-2, max_step_length=1.0e-1, soft_mode_damping=1, executable=None, ssa=False, ): super().__init__() self.__name__ = "SxExtOpt" if working_directory is None: warnings.warn("WARNING: working_directory not set; current folder is used") working_directory = os.getcwd() self._interactive_library = None self._interactive_library_read = None self.working_directory = working_directory if executable is None: executable = Executable( path_binary_codes=s._configuration["resource_paths"], codename="SxExtOptInteractive", module=self.__module__.split(".")[1], overwrite_nt_flag=False, ).executable_path self._start_process( structure=structure, executable=executable, maxDist=maxDist, ionic_steps=ionic_steps, ionic_energy=ionic_energy, ionic_forces=ionic_forces, max_step_length=max_step_length, soft_mode_damping=soft_mode_damping, selective_dynamics="selective_dynamics" in structure._tag_list.keys(), ssa=ssa, ) self._cell = structure.cell if ssa: self._elements = structure.get_parent_symbols() else: magmom = structure.get_initial_magnetic_moments() magmom[magmom!=None] = np.round(magmom[magmom!=None], decimals=1) magmom = np.char.mod('%s', magmom) self._elements = np.char.add(structure.get_parent_symbols(), magmom) self._elements = np.char.replace(self._elements, '-', 'm') self._elements = np.char.replace(self._elements, '.', 'p') self._positions = structure.positions self._converged = False def _start_process( self, structure, executable, maxDist=5, ionic_steps=1000, ionic_energy=1.0e-3, ionic_forces=1.0e-2, max_step_length=1.0e-1, soft_mode_damping=1, selective_dynamics=False, ssa=False, ): if selective_dynamics: input_writer_obj = InputWriter() input_writer_obj.structure = structure if ssa: input_writer_obj.structure.set_initial_magnetic_moments(len(structure)*[None]) input_writer_obj.write_structure( file_name="structure.sx", cwd=self.working_directory, structure_str=None, symmetry_enabled=True, keep_angstrom=True, ) self._write_input( working_directory=self.working_directory, maxDist=maxDist, ionic_steps=ionic_steps, ionic_energy=ionic_energy, ionic_forces=ionic_forces, max_step_length=max_step_length, soft_mode_damping=soft_mode_damping, selective_dynamics=selective_dynamics, ) shell = os.name == "nt" try: with open( posixpath.join(self.working_directory, "out.txt"), mode="w" ) as f_out: with open( posixpath.join(self.working_directory, "error.txt"), mode="w" ) as f_err: self._process = subprocess.Popen( [executable], cwd=self.working_directory, shell=shell, stdout=f_out, stderr=f_err, universal_newlines=True, ) except subprocess.CalledProcessError as e: raise ValueError("run_job.py crashed") while not self._interactive_pipes_initialized(self.working_directory): time.sleep(1) self._interactive_initialize_interface() @staticmethod def _write_input( working_directory, maxDist=5, ionic_steps=1000, ionic_energy=1.0e-3, ionic_forces=1.0e-2, max_step_length=1.0e-1, soft_mode_damping=1, selective_dynamics=False, ): with open(os.path.join(working_directory, "optim.sx"), "w") as f: content = ( "main { ricQN { ric { maxDist = %f; withAngles; } maxSteps = %i; dEnergy = %f; dF = %f; maxStepLength = %f; softModeDamping = %f;}}" % ( maxDist, ionic_steps, ionic_energy, ionic_forces, max_step_length, soft_mode_damping, ) ) if selective_dynamics: content += "structure { include <structure.sx>; }" f.write(content) @staticmethod def _interactive_pipes_initialized(working_directory): return os.path.exists( os.path.join(working_directory, "control") ) and os.path.exists(os.path.join(working_directory, "response")) def _interactive_write_line(self, line): self._interactive_library.write("%s\n" % line) self._interactive_library.flush() def _interactive_initialize_interface(self): self._interactive_library_read = open( os.path.join(self.working_directory, "control"), "r" ) self._interactive_library = open( os.path.join(self.working_directory, "response"), "w" )
[docs] def interactive_close(self): if self.interactive_is_activated(): self._interactive_library.close() self._interactive_library_read.close() self._delete_named_pipes(working_directory=self.working_directory)
@staticmethod def _delete_named_pipes(working_directory): for file in ["control", "response"]: file_path = posixpath.join(working_directory, file) if os.path.exists(file_path): os.remove(file_path)
[docs] def interactive_is_activated(self): if self._interactive_library is None: return False else: return True
def _write_cell(self, cell): for c in cell: self._interactive_write_line("%.16f %.16f %.16f" % (c[0], c[1], c[2])) def _write_number_of_atoms(self, count): self._interactive_write_line("%s" % (count)) def _write_positions(self, positions, elements): for pos, el in zip(positions, elements): self._interactive_write_line( "%.16f %.16f %.16f %s" % (pos[0], pos[1], pos[2], str(el)) ) def _write_energy(self): self._interactive_write_line("0") def _write_forces(self, forces): for f in forces: self._interactive_write_line("%.16f %.16f %.16f" % (f[0], f[1], f[2])) def _read_positions(self, count): return [ [float(c) for c in self._interactive_library_read.readline().split()] for i in range(count) ]
[docs] def set_forces(self, forces): line = self._interactive_library_read.readline().split() if len(line) == 0 or line[0] == "end": print("Ending calculation") self._converged = True elif line[0] == "get": if line[1] == "cell": self._write_cell(cell=self._cell) elif line[1] == "natoms": self._write_number_of_atoms(count=len(self._positions)) elif line[1] == "positions": self._write_positions( positions=self._positions, elements=self._elements ) elif line[1] == "energy": self._write_energy() elif line[1] == "forces": self._write_forces(forces=forces) else: raise ValueError("Unknown command:", line) self.set_forces(forces) elif line[0] == "run": self.set_forces(forces) elif line[0] == "set": if line[1] == "positions": self._positions = np.array(self._read_positions(count=len(forces))) else: raise ValueError("Unknown command:", line) else: raise ValueError("Unknown command:", line)
[docs] def get_positions(self): return self._positions
[docs] def end(self): while not self.converged: self.set_forces(np.zeros_like(self._positions))
@property def converged(self): return self._converged def __del__(self): self.end() if self.interactive_is_activated(): self.interactive_close() else: self._delete_named_pipes(working_directory=self.working_directory)
[docs]class SxExtOptInteractive(InteractiveWrapper): def __init__(self, project, job_name): super(SxExtOptInteractive, self).__init__(project, job_name) self.__name__ = "SxExtOptInteractive" self.__version__ = ( None ) # Reset the version number to the executable is set automatically self._executable_activate() self.input = Input() self.output = SxExtOptOutput(job=self) self._interactive_interface = None self._interactive_number_of_steps = 0 self._coarse_run = False
[docs] def set_input_to_read_only(self): """ This function enforces read-only mode for the input classes, but it has to be implement in the individual classes. """ super(SxExtOptInteractive, self).set_input_to_read_only() self.input.read_only = True
[docs] def write_input(self): pass
[docs] def run_static(self): """ The run if modal function is called by run to execute the simulation, while waiting for the output. For this we use subprocess.check_output() """ self._create_working_directory() self._interactive_interface = SxExtOpt( structure=self.ref_job.structure, working_directory=self.working_directory, maxDist=int(self.input["maxDist"]), ionic_steps=int(self.input["ionic_steps"]), ionic_energy=float(self.input["ionic_energy"]), ionic_forces=float(self.input["ionic_forces"]), max_step_length=float(self.input["max_step_length"]), soft_mode_damping=float(self.input["soft_mode_damping"]), executable=self.executable.executable_path, ssa=self.input['ssa'], ) try: self._coarse_run = self.ref_job.coarse_run except AttributeError: pass self.status.running = True self._logger.info("job status: %s", self.status) new_positions = self.ref_job.structure.positions self.ref_job_initialize() while ( self._interactive_number_of_steps < self.input["ionic_steps"] and not self._interactive_interface.converged ): str_temp = self.ref_job.structure str_temp.positions = new_positions self.ref_job.structure = str_temp if self.ref_job.server.run_mode.interactive: self._logger.debug("SxExtOpt: step start!") self.ref_job.run() if ( self._coarse_run and np.max(np.linalg.norm(self.get_forces(), axis=-1), axis=-1) < self.input["ionic_forces"] ): self._coarse_run = False self.ref_job.coarse_run = False self.ref_job.run() self._logger.debug("SxExtOpt: step finished!") else: self.ref_job.run(run_again=True) if ( self._coarse_run and np.max(np.linalg.norm(self.get_forces(), axis=-1), axis=-1) < self.input["ionic_forces"] ): self._coarse_run = False self.ref_job.coarse_run = False self.ref_job.run(run_again=True) self._interactive_interface.set_forces(forces=self.get_forces()) new_positions = self._interactive_interface.get_positions() self._interactive_number_of_steps += 1 self.status.collect = True if self.ref_job.server.run_mode.interactive: self.ref_job.interactive_close() self._interactive_interface.interactive_close() self.run()
[docs] def get_forces(self): ff = np.array(self.ref_job.output.forces[-1]) if hasattr(self.ref_job.structure, "selective_dynamics"): ff[np.array(self.ref_job.structure.selective_dynamics) == False] = 0 return ff return ff
[docs] def convergence_check(self): """ Validate the convergence of the calculation. Returns: (bool): If the calculation is converged """ if self._interactive_number_of_steps < self.input["ionic_steps"]: return True else: return False
[docs]class Input(GenericParameters): """ class to control the generic input for a Sphinx calculation. Args: input_file_name (str): name of the input file table_name (str): name of the GenericParameters table """ def __init__(self, input_file_name=None, table_name="input"): super(Input, self).__init__( input_file_name=input_file_name, table_name=table_name, comment_char="//", separator_char="=", end_value_char=";", )
[docs] def load_default(self): """ Loads the default file content """ file_content = ( "ionic_steps = 1000 // maximum number of ionic steps\n" "ionic_energy = 1.0e-3\n" "ionic_forces = 1.0e-2\n" "maxDist = 5 // maximum possible distance for considering neighbors\n" "max_step_length = 1.0e-1 // maximum displacement at each step\n" "ssa = False // ignore different magnetic moment values when internal symmetries are considered\n" "soft_mode_damping = 1.0 // Tikhonov damper\n" ) self.load_string(file_content)
[docs]class SxExtOptOutput(ReferenceJobOutput): def __init__(self, job): super(SxExtOptOutput, self).__init__(job=job)