# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
from __future__ import print_function
import os
import numpy as np
import scipy.constants
from pyiron.atomistics.structure.atoms import Atoms
from pyiron.sphinx.base import InputWriter, Output
from pyiron.base.job.generic import GenericJob
from pyiron.atomistics.job.atomistic import AtomisticGenericJob
from pyiron.base.generic.parameters import GenericParameters
from pyiron.atomistics.master.parallel import AtomisticParallelMaster
from pyiron.base.master.parallel import JobGenerator
__author__ = "Jan Janssen"
__copyright__ = (
"Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
"Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "development"
__date__ = "Sep 1, 2018"
BOHR_TO_ANGSTROM = (
scipy.constants.physical_constants["Bohr radius"][0] / scipy.constants.angstrom
)
HARTREE_TO_EV = scipy.constants.physical_constants["Hartree energy in eV"][0]
HARTREE_OVER_BOHR_TO_EV_OVER_ANGSTROM = HARTREE_TO_EV / BOHR_TO_ANGSTROM
[docs]class SxUniqDispl(AtomisticGenericJob):
def __init__(self, project, job_name):
super(SxUniqDispl, self).__init__(project, job_name)
self.__version__ = "0.1"
self.__name__ = "SxUniqDispl"
self.input = GenericParameters(table_name="displacement")
self.input["displacement"] = 0.01
self.structure_lst = []
self._id_pyi_to_spx = []
self._id_spx_to_pyi = []
@property
def id_spx_to_pyi(self):
if self.structure is None:
return None
if len(self._id_spx_to_pyi) == 0:
self._initialize_order()
return self._id_spx_to_pyi
@property
def id_pyi_to_spx(self):
if self.structure is None:
return None
if len(self._id_pyi_to_spx) == 0:
self._initialize_order()
return self._id_pyi_to_spx
def _initialize_order(self):
for elm_species in self.structure.get_species_objects():
self._id_pyi_to_spx.append(
np.arange(len(self.structure))[
self.structure.get_chemical_symbols() == elm_species.Abbreviation
]
)
self._id_pyi_to_spx = np.array(
[ooo for oo in self._id_pyi_to_spx for ooo in oo]
)
self._id_spx_to_pyi = np.array([0] * len(self._id_pyi_to_spx))
for i, p in enumerate(self._id_pyi_to_spx):
self._id_spx_to_pyi[p] = i
[docs] def list_structures(self):
if self.status.finished:
return self.structure_lst
else:
return []
[docs] def write_structure(self, cwd, file_name="structure_wrapper.sx"):
structure_file_name = "structure.sx"
iw = InputWriter()
iw.structure = self.structure
iw.write_structure(file_name=structure_file_name, cwd=cwd)
with open(os.path.join(cwd, file_name), "w") as f:
f.writelines(["structure { include <" + structure_file_name + ">; }"])
[docs] def collect_output(self):
self.structure_lst = self.extract_structure(
working_directory=self.working_directory
)
with self.project_hdf5.open("output") as hdf_out:
for ind, struct in enumerate(self.structure_lst):
struct.to_hdf(hdf=hdf_out, group_name="structure_" + str(ind))
[docs] def to_hdf(self, hdf=None, group_name=None):
"""
Store the ExampleJob object in the HDF5 File
Args:
hdf (ProjectHDFio): HDF5 group object - optional
group_name (str): HDF5 subgroup name - optional
"""
super(SxUniqDispl, self).to_hdf(hdf=hdf, group_name=group_name)
with self.project_hdf5.open("input") as hdf5_input:
self.input.to_hdf(hdf5_input)
[docs] def from_hdf(self, hdf=None, group_name=None):
"""
Restore the ExampleJob object in the HDF5 File
Args:
hdf (ProjectHDFio): HDF5 group object - optional
group_name (str): HDF5 subgroup name - optional
"""
super(SxUniqDispl, self).from_hdf(hdf=hdf, group_name=group_name)
with self.project_hdf5.open("input") as hdf5_input:
self.input.from_hdf(hdf5_input)
if "output" in self.project_hdf5.list_groups():
with self.project_hdf5.open("output") as hdf5_output:
self.structure_lst = [
Atoms().from_hdf(hdf5_output, group_name)
for group_name in hdf5_output.list_groups()
]
[docs]class SxDynMat(GenericJob):
def __init__(self, project, job_name):
super(SxDynMat, self).__init__(project, job_name)
self.__version__ = "0.1"
self.__name__ = "SxDynMat"
self._child_lst = []
self._child_id_lst = []
@property
def child_id_lst(self):
return self._child_id_lst
@child_id_lst.setter
def child_id_lst(self, child_id_lst):
self._child_id_lst = child_id_lst
@property
def child_lst(self):
if len(self._child_lst) != len(self._child_id_lst):
self._child_lst = [
self.project.load(job_id) for job_id in self._child_id_lst
]
forces_of_first_child = self._child_lst[0].output.forces[-1]
self._forces_lst = [
job.output.forces[-1] - forces_of_first_child for job in self._child_lst
]
self._structure_lst = [job.get_structure() for job in self._child_lst]
return self._child_lst
[docs] @staticmethod
def matrix_to_str(matrix):
"""
Function to convert an numpy matrix to an Sphinx input compatible matrix.
Args:
matrix (numpy.d2type): the matrix to be converted
Returns:
str: the matrix representation in the Sphinx input.
"""
output_str = "["
for i in matrix:
output_str += "["
for j in i:
output_str += str(j) + ", "
output_str = output_str[:-2] + "], "
output_str = output_str[:-2] + "]"
return output_str
[docs] @staticmethod
def vector_to_str(vector):
"""
Function to convert an numpy vector to an Sphinx input compatible vector.
Args:
vector (numpy.d2type): the vector to be converted
Returns:
str: the vector representation in the Sphinx input.
"""
output_str = "["
for i in vector:
output_str += str(i) + ", "
output_str = output_str[:-2] + "]"
return output_str
[docs] def write_sxdynmat(self, file_name="sxdynmat.sx", cwd=None):
forces_lst = [job.output.forces[-1] for job in self.child_lst]
structure_lst = [job.get_structure() for job in self.child_lst]
phono_dat_str = "format phononDat;\n\n"
initial_structure = structure_lst[0]
phono_dat_str += "pseudoPot {\n"
for species_obj in initial_structure.get_species_objects():
phono_dat_str += (
" species { reciprocalMass=1/" + str(species_obj.AtomicMass) + "; }\n"
)
phono_dat_str += "}\n\n"
first_structure = True
for structure, force_mat in zip(structure_lst, forces_lst):
phono_dat_str += "structure {\n"
if first_structure:
phono_dat_str += (
" cell = "
+ self.matrix_to_str(structure.cell * 1 / BOHR_TO_ANGSTROM)
+ ";\n"
)
first_structure = False
for species_obj in structure.get_species_objects():
if species_obj.Parent:
species = species_obj.Parent
else:
species = species_obj.Abbreviation
phono_dat_str += "species {\n"
for elm_pos, elm_species, elm_forces in zip(
structure.positions, structure.get_chemical_elements(), force_mat
):
if elm_species.Parent:
element = elm_species.Parent
else:
element = elm_species.Abbreviation
if element == species:
phono_dat_str += (
" atom { coords = "
+ self.vector_to_str(elm_pos * 1 / BOHR_TO_ANGSTROM)
+ "; force = "
+ self.vector_to_str(
elm_forces * 1 / HARTREE_OVER_BOHR_TO_EV_OVER_ANGSTROM
)
+ "; }\n"
)
phono_dat_str += "}\n"
phono_dat_str += "}\n"
if cwd is not None:
file_name = os.path.join(cwd, file_name)
with open(file_name, "w") as f:
f.write(phono_dat_str)
[docs] def get_hesse_matrix(self):
if "output" in self.project_hdf5.list_groups():
return self.project_hdf5["output/hesse"]
[docs] def collect_output(self):
with self.project_hdf5.open("output") as hdf_out:
hdf_out["hesse"] = np.loadtxt(
os.path.join(self.working_directory, "HesseMatrix_sphinx")
)
[docs] def collect_logfiles(self):
pass
[docs]class SxPhononsJobGenerator(JobGenerator):
@property
def parameter_list(self):
"""
Returns:
(list)
"""
return [
["supercell_phonon_%d" % ind, sc]
for ind, sc in enumerate(self._job._displacement_job.structure_lst)
]
[docs] @staticmethod
def job_name(parameter):
return parameter[0]
[docs] def modify_job(self, job, parameter):
job.structure = parameter[1]
return job
[docs]class SxPhonons(AtomisticParallelMaster):
def __init__(self, project, job_name):
super(SxPhonons, self).__init__(project, job_name)
self.__version__ = "0.1"
self.__name__ = "SxPhonons"
self.input["displacement"] = (0.01, "atoms displacement, Ang")
self._displacement_job = None
self._dynmat_job = None
self._job_generator = SxPhononsJobGenerator(self)
[docs] def run_static(self):
if self._displacement_job is None:
self._displacement_job = self.project.create_job(
self.project.job_type.SxUniqDispl, self.job_name + "_dis"
)
self._displacement_job.input["displacement"] = self.input["displacement"]
self._displacement_job.structure = self.structure
self._displacement_job.run()
super(SxPhonons, self).run_static()
[docs] def collect_output(self):
"""
Returns:
"""
self._dynmat_job = self.project.create_job(
self.project.job_type.SxDynMat, self.job_name + "_dyn"
)
self._dynmat_job.child_id_lst = self.child_ids
self._dynmat_job.run()
with self.project_hdf5.open("output") as hdf_out:
hdf_out["hesse"] = self._dynmat_job.get_hesse_matrix()
[docs] def get_hesse_matrix(self):
if "output" in self.project_hdf5.list_groups():
return self.project_hdf5["output/hesse"]
[docs]class SxHarmPotTst(AtomisticGenericJob):
def __init__(self, project, job_name):
super(SxHarmPotTst, self).__init__(project, job_name)
self.__version__ = "0.1"
self.__name__ = "SxHarmPotTst"
self.input = GenericParameters(table_name="interaction")
self.input["interaction_radius"] = 4.0
self.input["maximum_noise"] = 0.26
self._positions_lst = []
self._forces_lst = []
self._md_job_id = None
self._md_job = None
@property
def md_job(self):
if self._md_job is None and self._md_job_id is not None:
self._md_job = self.project.load(self._md_job_id)
return self._md_job
@md_job.setter
def md_job(self, job):
if job.status == "finished":
self._md_job_id = job.job_id
self._md_job = job
self._positions_lst = job["output/generic/positions"]
self._forces_lst = job["output/generic/forces"]
else:
raise ValueError("Job not finished!")
[docs] def write_harmpot(self, cwd, file_name="harmpot.sx"):
harm_pot_str = (
"format harmpot;\n\n"
+ "valenceCharge=0;\n"
+ "harmonicPotential {\n"
+ ' //include "refSym.sx";\n'
+ ' //include "equivalence.sx";\n'
+ " maxDist="
+ str(float(self.input["maximum_noise"]) / BOHR_TO_ANGSTROM)
+ ";\n"
+ ' include "shells.sx";\n'
+ "}\n"
+ 'include "structure_wrapper.sx";'
)
if cwd is not None:
file_name = os.path.join(cwd, file_name)
with open(file_name, "w") as f:
f.write(harm_pot_str)
[docs] def write_structure(self, cwd, file_name="structure_wrapper.sx"):
structure_file_name = "structure.sx"
iw = InputWriter()
iw.structure = self._md_job.structure
iw.write_structure(file_name=structure_file_name, cwd=cwd)
with open(os.path.join(cwd, file_name), "w") as f:
f.writelines(["structure { include <" + structure_file_name + ">; }"])
[docs] def validate_ready_to_run(self):
if len(self._positions_lst) == 0 or len(self._forces_lst) == 0:
raise ValueError()
[docs] def get_hesse_matrix(self):
if "output" in self.project_hdf5.list_groups():
return self.project_hdf5["output/hesse"]
[docs] def collect_output(self):
with self.project_hdf5.open("output") as hdf_out:
hdf_out["hesse"] = np.loadtxt(
os.path.join(self.working_directory, "HesseMatrix_sphinx")
)
[docs] def to_hdf(self, hdf=None, group_name=None):
"""
Store the ExampleJob object in the HDF5 File
Args:
hdf (ProjectHDFio): HDF5 group object - optional
group_name (str): HDF5 subgroup name - optional
"""
super(SxHarmPotTst, self).to_hdf(hdf=hdf, group_name=group_name)
with self.project_hdf5.open("input") as hdf5_input:
self.input.to_hdf(hdf5_input)
if len(self._positions_lst) != 0:
hdf5_input["positions"] = self._positions_lst
if len(self._forces_lst) != 0:
hdf5_input["forces"] = self._forces_lst
if self._md_job_id is not None:
hdf5_input["md_job_id"] = self._md_job_id
[docs] def from_hdf(self, hdf=None, group_name=None):
"""
Restore the ExampleJob object in the HDF5 File
Args:
hdf (ProjectHDFio): HDF5 group object - optional
group_name (str): HDF5 subgroup name - optional
"""
super(SxHarmPotTst, self).from_hdf(hdf=hdf, group_name=group_name)
with self.project_hdf5.open("input") as hdf5_input:
self.input.from_hdf(hdf5_input)
if "positions" in hdf5_input.list_nodes():
self._positions_lst = hdf5_input["positions"]
if "forces" in hdf5_input.list_nodes():
self._forces_lst = hdf5_input["forces"]
if "md_job_id" in hdf5_input.list_nodes():
self._md_job_id = hdf5_input["md_job_id"]