Source code for pyiron.thermodynamics.sxphonons

# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.

from __future__ import print_function
import os
import numpy as np
import scipy.constants
from pyiron.atomistics.structure.atoms import Atoms
from pyiron.sphinx.base import InputWriter, Output
from pyiron.base.job.generic import GenericJob
from pyiron.atomistics.job.atomistic import AtomisticGenericJob
from pyiron.base.generic.parameters import GenericParameters
from pyiron.atomistics.master.parallel import AtomisticParallelMaster
from pyiron.base.master.parallel import JobGenerator

__author__ = "Jan Janssen"
__copyright__ = (
    "Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
    "Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "development"
__date__ = "Sep 1, 2018"


BOHR_TO_ANGSTROM = (
    scipy.constants.physical_constants["Bohr radius"][0] / scipy.constants.angstrom
)
HARTREE_TO_EV = scipy.constants.physical_constants["Hartree energy in eV"][0]
HARTREE_OVER_BOHR_TO_EV_OVER_ANGSTROM = HARTREE_TO_EV / BOHR_TO_ANGSTROM


[docs]class SxUniqDispl(AtomisticGenericJob): def __init__(self, project, job_name): super(SxUniqDispl, self).__init__(project, job_name) self.__version__ = "0.1" self.__name__ = "SxUniqDispl" self.input = GenericParameters(table_name="displacement") self.input["displacement"] = 0.01 self.structure_lst = [] self._id_pyi_to_spx = [] self._id_spx_to_pyi = [] @property def id_spx_to_pyi(self): if self.structure is None: return None if len(self._id_spx_to_pyi) == 0: self._initialize_order() return self._id_spx_to_pyi @property def id_pyi_to_spx(self): if self.structure is None: return None if len(self._id_pyi_to_spx) == 0: self._initialize_order() return self._id_pyi_to_spx def _initialize_order(self): for elm_species in self.structure.get_species_objects(): self._id_pyi_to_spx.append( np.arange(len(self.structure))[ self.structure.get_chemical_symbols() == elm_species.Abbreviation ] ) self._id_pyi_to_spx = np.array( [ooo for oo in self._id_pyi_to_spx for ooo in oo] ) self._id_spx_to_pyi = np.array([0] * len(self._id_pyi_to_spx)) for i, p in enumerate(self._id_pyi_to_spx): self._id_spx_to_pyi[p] = i
[docs] def set_input_to_read_only(self): """ This function enforces read-only mode for the input classes, but it has to be implement in the individual classes. """ super(SxUniqDispl, self).set_input_to_read_only() self.input.read_only = True
[docs] def list_structures(self): if self.status.finished: return self.structure_lst else: return []
[docs] def write_structure(self, cwd, file_name="structure_wrapper.sx"): structure_file_name = "structure.sx" iw = InputWriter() iw.structure = self.structure iw.write_structure(file_name=structure_file_name, cwd=cwd) with open(os.path.join(cwd, file_name), "w") as f: f.writelines(["structure { include <" + structure_file_name + ">; }"])
[docs] def extract_structure(self, working_directory): structure_lst = [self.structure] parser = Output(self) for f in os.listdir(working_directory): if "input-disp" in f: structure_template = self.structure.copy() parser.collect_relaxed_hist(file_name=f, cwd=working_directory) structure_template.cell = parser._parse_dict["cell"][0] structure_template.positions = parser._parse_dict["positions"][0] structure_lst.append(structure_template) return structure_lst
[docs] def write_input(self): self.write_structure( cwd=self.working_directory, file_name="structure_wrapper.sx" ) lines = [ "#!/bin/bash\n", "sxuniqdispl --log -d " + str(float(self.input["displacement"]) / BOHR_TO_ANGSTROM) + " -i structure_wrapper.sx\n", ] with open(os.path.join(self.working_directory, "sxuniqdispl.sh"), "w") as f: f.writelines(lines)
[docs] def collect_output(self): self.structure_lst = self.extract_structure( working_directory=self.working_directory ) with self.project_hdf5.open("output") as hdf_out: for ind, struct in enumerate(self.structure_lst): struct.to_hdf(hdf=hdf_out, group_name="structure_" + str(ind))
[docs] def to_hdf(self, hdf=None, group_name=None): """ Store the ExampleJob object in the HDF5 File Args: hdf (ProjectHDFio): HDF5 group object - optional group_name (str): HDF5 subgroup name - optional """ super(SxUniqDispl, self).to_hdf(hdf=hdf, group_name=group_name) with self.project_hdf5.open("input") as hdf5_input: self.input.to_hdf(hdf5_input)
[docs] def from_hdf(self, hdf=None, group_name=None): """ Restore the ExampleJob object in the HDF5 File Args: hdf (ProjectHDFio): HDF5 group object - optional group_name (str): HDF5 subgroup name - optional """ super(SxUniqDispl, self).from_hdf(hdf=hdf, group_name=group_name) with self.project_hdf5.open("input") as hdf5_input: self.input.from_hdf(hdf5_input) if "output" in self.project_hdf5.list_groups(): with self.project_hdf5.open("output") as hdf5_output: self.structure_lst = [ Atoms().from_hdf(hdf5_output, group_name) for group_name in hdf5_output.list_groups() ]
[docs]class SxDynMat(GenericJob): def __init__(self, project, job_name): super(SxDynMat, self).__init__(project, job_name) self.__version__ = "0.1" self.__name__ = "SxDynMat" self._child_lst = [] self._child_id_lst = [] @property def child_id_lst(self): return self._child_id_lst @child_id_lst.setter def child_id_lst(self, child_id_lst): self._child_id_lst = child_id_lst @property def child_lst(self): if len(self._child_lst) != len(self._child_id_lst): self._child_lst = [ self.project.load(job_id) for job_id in self._child_id_lst ] forces_of_first_child = self._child_lst[0].output.forces[-1] self._forces_lst = [ job.output.forces[-1] - forces_of_first_child for job in self._child_lst ] self._structure_lst = [job.get_structure() for job in self._child_lst] return self._child_lst
[docs] @staticmethod def matrix_to_str(matrix): """ Function to convert an numpy matrix to an Sphinx input compatible matrix. Args: matrix (numpy.d2type): the matrix to be converted Returns: str: the matrix representation in the Sphinx input. """ output_str = "[" for i in matrix: output_str += "[" for j in i: output_str += str(j) + ", " output_str = output_str[:-2] + "], " output_str = output_str[:-2] + "]" return output_str
[docs] @staticmethod def vector_to_str(vector): """ Function to convert an numpy vector to an Sphinx input compatible vector. Args: vector (numpy.d2type): the vector to be converted Returns: str: the vector representation in the Sphinx input. """ output_str = "[" for i in vector: output_str += str(i) + ", " output_str = output_str[:-2] + "]" return output_str
[docs] def write_sxdynmat(self, file_name="sxdynmat.sx", cwd=None): forces_lst = [job.output.forces[-1] for job in self.child_lst] structure_lst = [job.get_structure() for job in self.child_lst] phono_dat_str = "format phononDat;\n\n" initial_structure = structure_lst[0] phono_dat_str += "pseudoPot {\n" for species_obj in initial_structure.get_species_objects(): phono_dat_str += ( " species { reciprocalMass=1/" + str(species_obj.AtomicMass) + "; }\n" ) phono_dat_str += "}\n\n" first_structure = True for structure, force_mat in zip(structure_lst, forces_lst): phono_dat_str += "structure {\n" if first_structure: phono_dat_str += ( " cell = " + self.matrix_to_str(structure.cell * 1 / BOHR_TO_ANGSTROM) + ";\n" ) first_structure = False for species_obj in structure.get_species_objects(): if species_obj.Parent: species = species_obj.Parent else: species = species_obj.Abbreviation phono_dat_str += "species {\n" for elm_pos, elm_species, elm_forces in zip( structure.positions, structure.get_chemical_elements(), force_mat ): if elm_species.Parent: element = elm_species.Parent else: element = elm_species.Abbreviation if element == species: phono_dat_str += ( " atom { coords = " + self.vector_to_str(elm_pos * 1 / BOHR_TO_ANGSTROM) + "; force = " + self.vector_to_str( elm_forces * 1 / HARTREE_OVER_BOHR_TO_EV_OVER_ANGSTROM ) + "; }\n" ) phono_dat_str += "}\n" phono_dat_str += "}\n" if cwd is not None: file_name = os.path.join(cwd, file_name) with open(file_name, "w") as f: f.write(phono_dat_str)
[docs] def write_input(self): self.write_sxdynmat(cwd=self.working_directory, file_name="sxdynmat.sx") lines = ["#!/bin/bash\n", "sxdynmat --log -i sxdynmat.sx -H\n"] with open(os.path.join(self.working_directory, "sxuniqdispl.sh"), "w") as f: f.writelines(lines)
[docs] def get_hesse_matrix(self): if "output" in self.project_hdf5.list_groups(): return self.project_hdf5["output/hesse"]
[docs] def collect_output(self): with self.project_hdf5.open("output") as hdf_out: hdf_out["hesse"] = np.loadtxt( os.path.join(self.working_directory, "HesseMatrix_sphinx") )
[docs] def collect_logfiles(self): pass
[docs]class SxPhononsJobGenerator(JobGenerator): @property def parameter_list(self): """ Returns: (list) """ return [ ["supercell_phonon_%d" % ind, sc] for ind, sc in enumerate(self._job._displacement_job.structure_lst) ]
[docs] @staticmethod def job_name(parameter): return parameter[0]
[docs] def modify_job(self, job, parameter): job.structure = parameter[1] return job
[docs]class SxPhonons(AtomisticParallelMaster): def __init__(self, project, job_name): super(SxPhonons, self).__init__(project, job_name) self.__version__ = "0.1" self.__name__ = "SxPhonons" self.input["displacement"] = (0.01, "atoms displacement, Ang") self._displacement_job = None self._dynmat_job = None self._job_generator = SxPhononsJobGenerator(self)
[docs] def run_static(self): if self._displacement_job is None: self._displacement_job = self.project.create_job( self.project.job_type.SxUniqDispl, self.job_name + "_dis" ) self._displacement_job.input["displacement"] = self.input["displacement"] self._displacement_job.structure = self.structure self._displacement_job.run() super(SxPhonons, self).run_static()
[docs] def collect_output(self): """ Returns: """ self._dynmat_job = self.project.create_job( self.project.job_type.SxDynMat, self.job_name + "_dyn" ) self._dynmat_job.child_id_lst = self.child_ids self._dynmat_job.run() with self.project_hdf5.open("output") as hdf_out: hdf_out["hesse"] = self._dynmat_job.get_hesse_matrix()
[docs] def get_hesse_matrix(self): if "output" in self.project_hdf5.list_groups(): return self.project_hdf5["output/hesse"]
[docs]class SxHarmPotTst(AtomisticGenericJob): def __init__(self, project, job_name): super(SxHarmPotTst, self).__init__(project, job_name) self.__version__ = "0.1" self.__name__ = "SxHarmPotTst" self.input = GenericParameters(table_name="interaction") self.input["interaction_radius"] = 4.0 self.input["maximum_noise"] = 0.26 self._positions_lst = [] self._forces_lst = [] self._md_job_id = None self._md_job = None @property def md_job(self): if self._md_job is None and self._md_job_id is not None: self._md_job = self.project.load(self._md_job_id) return self._md_job @md_job.setter def md_job(self, job): if job.status == "finished": self._md_job_id = job.job_id self._md_job = job self._positions_lst = job["output/generic/positions"] self._forces_lst = job["output/generic/forces"] else: raise ValueError("Job not finished!")
[docs] def set_input_to_read_only(self): """ This function enforces read-only mode for the input classes, but it has to be implement in the individual classes. """ super(SxHarmPotTst, self).set_input_to_read_only() self.input.read_only = True
[docs] def write_harmpot(self, cwd, file_name="harmpot.sx"): harm_pot_str = ( "format harmpot;\n\n" + "valenceCharge=0;\n" + "harmonicPotential {\n" + ' //include "refSym.sx";\n' + ' //include "equivalence.sx";\n' + " maxDist=" + str(float(self.input["maximum_noise"]) / BOHR_TO_ANGSTROM) + ";\n" + ' include "shells.sx";\n' + "}\n" + 'include "structure_wrapper.sx";' ) if cwd is not None: file_name = os.path.join(cwd, file_name) with open(file_name, "w") as f: f.write(harm_pot_str)
[docs] def write_structure(self, cwd, file_name="structure_wrapper.sx"): structure_file_name = "structure.sx" iw = InputWriter() iw.structure = self._md_job.structure iw.write_structure(file_name=structure_file_name, cwd=cwd) with open(os.path.join(cwd, file_name), "w") as f: f.writelines(["structure { include <" + structure_file_name + ">; }"])
[docs] def validate_ready_to_run(self): if len(self._positions_lst) == 0 or len(self._forces_lst) == 0: raise ValueError()
[docs] def write_input(self): self.write_structure( cwd=self.working_directory, file_name="structure_wrapper.sx" ) self.write_harmpot(cwd=self.working_directory, file_name="harmpot.sx") pos_force_mat = np.concatenate((self._positions_lst, self._forces_lst), axis=2) cont_pos_force_mat = pos_force_mat.reshape(-1, pos_force_mat.shape[-1]) np.savetxt( os.path.join(self.working_directory, "POSITIONs"), cont_pos_force_mat ) lines = [ "#!/bin/bash\n", "sxstructparam -i structure_wrapper.sx -c " + str(float(self.input["interaction_radius"]) / BOHR_TO_ANGSTROM) + " --printReduced=shells.sx --log\n", "sxharmpottst --param=POSITIONs --vasp --printHesse HesseMatrix_sphinx -i harmpot.sx --log --svd\n", ] with open(os.path.join(self.working_directory, "sxharmpottst.sh"), "w") as f: f.writelines(lines)
[docs] def get_hesse_matrix(self): if "output" in self.project_hdf5.list_groups(): return self.project_hdf5["output/hesse"]
[docs] def collect_output(self): with self.project_hdf5.open("output") as hdf_out: hdf_out["hesse"] = np.loadtxt( os.path.join(self.working_directory, "HesseMatrix_sphinx") )
[docs] def to_hdf(self, hdf=None, group_name=None): """ Store the ExampleJob object in the HDF5 File Args: hdf (ProjectHDFio): HDF5 group object - optional group_name (str): HDF5 subgroup name - optional """ super(SxHarmPotTst, self).to_hdf(hdf=hdf, group_name=group_name) with self.project_hdf5.open("input") as hdf5_input: self.input.to_hdf(hdf5_input) if len(self._positions_lst) != 0: hdf5_input["positions"] = self._positions_lst if len(self._forces_lst) != 0: hdf5_input["forces"] = self._forces_lst if self._md_job_id is not None: hdf5_input["md_job_id"] = self._md_job_id
[docs] def from_hdf(self, hdf=None, group_name=None): """ Restore the ExampleJob object in the HDF5 File Args: hdf (ProjectHDFio): HDF5 group object - optional group_name (str): HDF5 subgroup name - optional """ super(SxHarmPotTst, self).from_hdf(hdf=hdf, group_name=group_name) with self.project_hdf5.open("input") as hdf5_input: self.input.from_hdf(hdf5_input) if "positions" in hdf5_input.list_nodes(): self._positions_lst = hdf5_input["positions"] if "forces" in hdf5_input.list_nodes(): self._forces_lst = hdf5_input["forces"] if "md_job_id" in hdf5_input.list_nodes(): self._md_job_id = hdf5_input["md_job_id"]