# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
from ase import Atoms
from ase.constraints import dict2constraint
import copy
import importlib
import numpy as np
from pyiron.atomistics.job.interactive import GenericInteractive
from pyiron.atomistics.structure.atoms import pyiron_to_ase, Atoms as PAtoms
try:
from ase.cell import Cell
except ImportError:
Cell = None
__author__ = "Jan Janssen"
__copyright__ = (
"Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
"Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "development"
__date__ = "Sep 1, 2018"
[docs]def ase_structure_todict(structure):
atoms_dict = {
"symbols": structure.get_chemical_symbols(),
"positions": structure.get_positions(),
"pbc": structure.get_pbc(),
"celldisp": structure.get_celldisp(),
"constraint": [c.todict() for c in structure.constraints],
"info": copy.deepcopy(structure.info),
}
if Cell is not None:
atoms_dict["cell"] = structure.get_cell().todict()
else:
atoms_dict["cell"] = structure.get_cell()
if structure.has("tags"):
atoms_dict["tags"] = structure.get_tags()
if structure.has("masses"):
atoms_dict["masses"] = structure.get_masses()
if structure.has("momenta"):
atoms_dict["momenta"] = structure.get_momenta()
if structure.has("initial_magmoms"):
atoms_dict["magmoms"] = structure.get_initial_magnetic_moments()
if structure.has("initial_charges"):
atoms_dict["charges"] = structure.get_initial_charges()
if structure.calc is not None:
calculator_dict = structure.calc.todict()
calculator_dict["calculator_class"] = (
str(structure.calc.__class__).replace("'", " ").split()[1]
)
calculator_dict["label"] = structure.calc.label
atoms_dict["calculator"] = calculator_dict
return atoms_dict
[docs]def ase_calculator_fromdict(class_path, class_dict):
module_loaded = importlib.import_module(".".join(class_path.split(".")[:-1]))
module_class = getattr(module_loaded, class_path.split(".")[-1])
return module_class(**class_dict)
[docs]def ase_structure_fromdict(atoms_dict):
def cell_fromdict(celldict):
celldict.pop("pbc", None)
if Cell is not None:
return Cell(**celldict)
else:
return celldict
atoms_dict_copy = copy.deepcopy(atoms_dict)
if "calculator" in atoms_dict_copy.keys():
calculator_dict = atoms_dict_copy["calculator"]
calculator_class = calculator_dict["calculator_class"]
del calculator_dict["calculator_class"]
atoms_dict_copy["calculator"] = ase_calculator_fromdict(
calculator_class, calculator_dict
)
if "constraint" in atoms_dict_copy.keys():
atoms_dict_copy["constraint"] = [
dict2constraint(const_dict)
for const_dict in atoms_dict_copy["constraint"]
]
atoms_dict_copy["cell"] = cell_fromdict(celldict=atoms_dict_copy["cell"])
atoms = Atoms(**atoms_dict_copy)
if atoms.calc is not None:
atoms.calc.read(atoms.calc.label)
return atoms
[docs]class AseJob(GenericInteractive):
def __init__(self, project, job_name):
super(AseJob, self).__init__(project, job_name)
self.__name__ = "AseJob"
self.__version__ = (
None
) # Reset the version number to the executable is set automatically
@property
def structure(self):
return GenericInteractive.structure.fget(self)
@structure.setter
def structure(self, structure):
if isinstance(structure, PAtoms):
structure = pyiron_to_ase(structure)
GenericInteractive.structure.fset(self, structure)
[docs] def to_hdf(self, hdf=None, group_name=None):
super(AseJob, self).to_hdf(hdf=hdf, group_name=group_name)
with self.project_hdf5.open("input") as hdf_input:
hdf_input["structure"] = ase_structure_todict(self._structure)
[docs] def from_hdf(self, hdf=None, group_name=None):
super(AseJob, self).from_hdf(hdf=hdf, group_name=group_name)
with self.project_hdf5.open("input") as hdf_input:
self.structure = ase_structure_fromdict(hdf_input["structure"])
[docs] def run_static(self):
pre_run_mode = self.server.run_mode
self.server.run_mode.interactive = True
self.run_if_interactive()
self.interactive_close()
self.server.run_mode = pre_run_mode
[docs] def run_if_interactive(self):
if self.structure.calc is None:
self.set_calculator()
super(AseJob, self).run_if_interactive()
self.interactive_collect()
[docs] def set_calculator(self):
raise NotImplementedError(
"The _set_calculator function is not implemented for this code."
)
[docs] def interactive_structure_setter(self, structure):
self.structure.calc.calculate(structure)
[docs] def interactive_positions_setter(self, positions):
self.structure.positions = positions
[docs] def interactive_initialize_interface(self):
self.status.running = True
self._structure.calc.set_label(self.working_directory + "/")
self._interactive_library = True
[docs] def interactive_close(self):
if self.interactive_is_activated():
super(AseJob, self).interactive_close()
with self.project_hdf5.open("output") as h5:
if "interactive" in h5.list_groups():
for key in h5["interactive"].list_nodes():
h5["generic/" + key] = h5["interactive/" + key]
[docs] def interactive_forces_getter(self):
return self.structure.get_forces()
[docs] def interactive_pressures_getter(self):
return -self.structure.get_stress(voigt=False)
[docs] def interactive_energy_pot_getter(self):
return self.structure.get_potential_energy()
[docs] def interactive_energy_tot_getter(self):
return self.structure.get_potential_energy()
[docs] def interactive_indices_getter(self):
element_lst = sorted(list(set(self.structure.get_chemical_symbols())))
return np.array(
[element_lst.index(el) for el in self.structure.get_chemical_symbols()]
)
[docs] def interactive_positions_getter(self):
return self.structure.positions.copy()
[docs] def interactive_steps_getter(self):
return len(self.interactive_cache[list(self.interactive_cache.keys())[0]])
[docs] def interactive_time_getter(self):
return self.interactive_steps_getter()
[docs] def interactive_volume_getter(self):
return self.structure.get_volume()
[docs] def interactive_cells_getter(self):
return self.structure.cell.copy()
[docs] def collect_output(self):
pass
[docs] def run_if_scheduler(self):
self._create_working_directory()
super(AseJob, self).run_if_scheduler()
[docs] def interactive_index_organizer(self):
index_merge_lst = self._interactive_species_lst.tolist() + list(
np.unique(self._structure_current.get_chemical_symbols())
)
el_lst = sorted(set(index_merge_lst), key=index_merge_lst.index)
current_structure_index = [
el_lst.index(el)
for el in self._structure_current.get_chemical_symbols()
]
previous_structure_index = [
el_lst.index(el)
for el in self._structure_previous.get_chemical_symbols()
]
if not np.array_equal(
np.array(current_structure_index),
np.array(previous_structure_index),
):
self._logger.debug("Generic library: indices changed!")
self.interactive_indices_setter(self._structure_current.indices)
[docs] def get_structure(self, iteration_step=-1, wrap_atoms=True):
"""
Gets the structure from a given iteration step of the simulation (MD/ionic relaxation). For static calculations
there is only one ionic iteration step
Args:
iteration_step (int): Step for which the structure is requested
wrap_atoms (bool):
Returns:
atomistics.structure.atoms.Atoms object
"""
if (
self.server.run_mode.interactive
or self.server.run_mode.interactive_non_modal
):
# Warning: We only copy symbols, positions and cell information - no tags.
if self.output.indices is not None and len(self.output.indices) != 0:
indices = self.output.indices[iteration_step]
else:
return None
if len(self._interactive_species_lst) == 0:
el_lst = list(
np.unique(self._structure_current.get_chemical_symbols())
)
else:
el_lst = self._interactive_species_lst.tolist()
if indices is not None:
if wrap_atoms:
positions = self.output.positions[iteration_step]
else:
if len(self.output.unwrapped_positions) > max([iteration_step, 0]):
positions = self.output.unwrapped_positions[iteration_step]
else:
positions = (
self.output.positions[iteration_step]
+ self.output.total_displacements[iteration_step]
)
atoms = Atoms(
symbols=np.array([el_lst[el] for el in indices]),
positions=positions,
cell=self.output.cells[iteration_step],
pbc=self.structure.pbc,
)
# Update indicies to match the indicies in the cache.
atoms.indices = indices
return atoms
else:
return None
else:
if (
self.get("output/generic/cells") is not None
and len(self.get("output/generic/cells")) != 0
):
return super(AseJob, self).get_structure(
iteration_step=iteration_step, wrap_atoms=wrap_atoms
)
else:
return None
[docs]class AseAdapter(object):
def __init__(self, ham, fast_mode=False):
self._ham = ham
self._fast_mode = fast_mode
if self._ham.server.run_mode.interactive and fast_mode:
self.interactive_cache = {
"velocities": [],
"energy_kin": [],
"momenta": [],
"positions": [],
"energy_tot": [],
"energy_pot": []
}
self._ham.run()
self._ham.interactive_cache = {}
elif self._ham.server.run_mode.interactive:
self.interactive_cache = {
"velocities": [],
"energy_kin": [],
"momenta": []
}
self.constraints = []
try:
self.arrays = {
"positions": self._ham.structure.positions.copy(),
"numbers": self._ham.structure.numbers,
}
except AttributeError:
self.arrays = {
"positions": self._ham.structure.positions.copy(),
"numbers": self._ham.structure.get_atomic_numbers(),
}
@property
def communicator(self):
return None
[docs] def get_masses(self):
return np.array(self._ham.structure.get_masses())
[docs] def get_positions(self):
return self.arrays["positions"]
[docs] def set_positions(self, positions):
self.arrays["positions"] = positions
[docs] def get_forces(self, md=True):
if self._fast_mode:
self._ham.interactive_positions_setter(self.arrays["positions"])
self.interactive_cache["positions"].append(self.arrays["positions"])
self._ham.interactive_execute()
self.interactive_cache["energy_pot"].append(self._ham.interactive_energy_pot_getter())
return np.array(self._ham.interactive_forces_getter())
else:
self._ham.structure.positions = self.arrays["positions"]
if self._ham.server.run_mode.interactive:
self._ham.run()
else:
self._ham.run(delete_existing_job=True)
return self._ham.output.forces[-1]
[docs] def interactive_close(self):
self._ham.interactive_store_in_cache(
"velocities", self.interactive_cache["velocities"]
)
self._ham.interactive_store_in_cache(
"energy_kin", self.interactive_cache["energy_kin"]
)
if self._fast_mode:
self._ham.interactive_store_in_cache(
"positions", self.interactive_cache["positions"]
)
self._ham.interactive_store_in_cache(
"energy_pot", self.interactive_cache["energy_pot"][::2]
)
self._ham.interactive_store_in_cache(
"energy_tot",
(
np.array(self.interactive_cache["energy_pot"][::2])
+ np.array(self.interactive_cache["energy_kin"])
).tolist(),
)
else:
self._ham.interactive_store_in_cache(
"energy_tot",
(
np.array(self._ham.output.energy_pot)[::2]
+ np.array(self.interactive_cache["energy_kin"])
).tolist(),
)
self._ham.interactive_close()
[docs] def get_number_of_atoms(self):
return self._ham.structure.get_number_of_atoms()
# ASE functions
[docs] def get_kinetic_energy(self):
"""Get the kinetic energy."""
momenta = self.arrays.get("momenta")
if momenta is None:
return 0.0
return 0.5 * np.vdot(momenta, self.get_velocities())
[docs] def set_momenta(self, momenta, apply_constraint=True):
"""Set momenta."""
if apply_constraint and len(self.constraints) > 0 and momenta is not None:
momenta = np.array(momenta) # modify a copy
for constraint in self.constraints:
if hasattr(constraint, "adjust_momenta"):
constraint.adjust_momenta(self, momenta)
self.set_array("momenta", momenta, float, (3,))
self.interactive_cache["velocities"].append(self.get_velocities())
self.interactive_cache["energy_kin"].append(self.get_kinetic_energy())
[docs] def set_velocities(self, velocities):
"""Set the momenta by specifying the velocities."""
self.set_momenta(self.get_masses()[:, np.newaxis] * velocities)
[docs] def get_momenta(self):
"""Get array of momenta."""
if "momenta" in self.arrays:
return self.arrays["momenta"].copy()
else:
return np.zeros((len(self), 3))
[docs] def set_array(self, name, a, dtype=None, shape=None):
"""Update array.
If *shape* is not *None*, the shape of *a* will be checked.
If *a* is *None*, then the array is deleted."""
b = self.arrays.get(name)
if b is None:
if a is not None:
self.new_array(name, a, dtype, shape)
else:
if a is None:
del self.arrays[name]
else:
a = np.asarray(a)
if a.shape != b.shape:
raise ValueError(
"Array has wrong shape %s != %s." % (a.shape, b.shape)
)
b[:] = a
[docs] def get_angular_momentum(self):
"""Get total angular momentum with respect to the center of mass."""
com = self.get_center_of_mass()
positions = self.get_positions()
positions -= com # translate center of mass to origin
return np.cross(positions, self.get_momenta()).sum(0)
[docs] def new_array(self, name, a, dtype=None, shape=None):
"""Add new array.
If *shape* is not *None*, the shape of *a* will be checked."""
if dtype is not None:
a = np.array(a, dtype, order="C")
if len(a) == 0 and shape is not None:
a.shape = (-1,) + shape
else:
if not a.flags["C_CONTIGUOUS"]:
a = np.ascontiguousarray(a)
else:
a = a.copy()
if name in self.arrays:
raise RuntimeError
for b in self.arrays.values():
if len(a) != len(b):
raise ValueError("Array has wrong length: %d != %d." % (len(a), len(b)))
break
if shape is not None and a.shape[1:] != shape:
raise ValueError(
"Array has wrong shape %s != %s." % (a.shape, (a.shape[0:1] + shape))
)
self.arrays[name] = a
[docs] def has(self, name):
"""Check for existence of array.
name must be one of: 'tags', 'momenta', 'masses', 'initial_magmoms',
'initial_charges'."""
# XXX extend has to calculator properties
return name in self.arrays
[docs] def get_center_of_mass(self, scaled=False):
"""Get the center of mass.
If scaled=True the center of mass in scaled coordinates
is returned."""
m = self.get_masses()
com = np.dot(m, self.arrays["positions"]) / m.sum()
if scaled:
if self._fast_mode:
return np.linalg.solve(self._ham.structure.cells[-1].T, com)
else:
return np.linalg.solve(self._ham.output.cells[-1].T, com)
else:
return com
[docs] def get_velocities(self):
"""Get array of velocities."""
momenta = self.arrays.get("momenta")
if momenta is None:
return None
m = self.get_masses()
# m = self.arrays.get('masses')
# if m is None:
# m = atomic_masses[self.arrays['numbers']]
return momenta / m.reshape(-1, 1)
def __len__(self):
return len(self._ham.structure)