Source code for pyiron.gpaw.pyiron_ase

# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.

from ase import Atoms
from ase.constraints import dict2constraint
import copy
import importlib
import numpy as np
from pyiron.base.job.interactive import InteractiveBase
from pyiron.atomistics.job.interactive import GenericInteractiveOutput

try:
    from ase.cell import Cell
except ImportError:
    Cell = None

__author__ = "Jan Janssen"
__copyright__ = (
    "Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
    "Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "development"
__date__ = "Sep 1, 2018"


[docs]class AseJob(InteractiveBase): def __init__(self, project, job_name): super(AseJob, self).__init__(project, job_name) self.__name__ = "AseJob" self.__version__ = ( None ) # Reset the version number to the executable is set automatically self._structure = None self.interactive_cache = { "cells": [], "energy_pot": [], "forces": [], "positions": [], "steps": [], "indices": [], "time": [], "volume": [], } self.output = GenericInteractiveOutput(job=self) @staticmethod def _cellfromdict(celldict): if Cell is not None: return Cell(**celldict) else: return celldict def _todict(self): atoms_dict = { "symbols": self._structure.get_chemical_symbols(), "positions": self._structure.get_positions(), "pbc": self._structure.get_pbc(), "celldisp": self._structure.get_celldisp(), "constraint": [c.todict() for c in self._structure.constraints], "info": copy.deepcopy(self._structure.info), } if Cell is not None: atoms_dict["cell"] = self._structure.get_cell().todict() else: atoms_dict["cell"] = self._structure.get_cell() if self._structure.has("tags"): atoms_dict["tags"] = self._structure.get_tags() if self._structure.has("masses"): atoms_dict["masses"] = self._structure.get_masses() if self._structure.has("momenta"): atoms_dict["momenta"] = self._structure.get_momenta() if self._structure.has("initial_magmoms"): atoms_dict["magmoms"] = self._structure.get_initial_magnetic_moments() if self._structure.has("initial_charges"): atoms_dict["charges"] = self._structure.get_initial_charges() if self._structure._calc is not None: calculator_dict = self._structure._calc.todict() calculator_dict["calculator_class"] = ( str(self._structure._calc.__class__).replace("'", " ").split()[1] ) calculator_dict["label"] = self._structure._calc.label atoms_dict["calculator"] = calculator_dict return atoms_dict def _fromdict(self, atoms_dict): atoms_dict_copy = copy.deepcopy(atoms_dict) if "calculator" in atoms_dict_copy.keys(): calculator_dict = atoms_dict_copy["calculator"] calculator_class = calculator_dict["calculator_class"] del calculator_dict["calculator_class"] atoms_dict_copy["calculator"] = self._dict2calculator( calculator_class, calculator_dict ) if "constraint" in atoms_dict_copy.keys(): atoms_dict_copy["constraint"] = [ dict2constraint(const_dict) for const_dict in atoms_dict_copy["constraint"] ] atoms_dict_copy["cell"] = self._cellfromdict(celldict=atoms_dict_copy["cell"]) atoms = Atoms(**atoms_dict_copy) if atoms.calc is not None: atoms.calc.read(atoms.calc.label) return atoms def _create_job_structure(self, debug=False): """ Internal helper function to create the input directories, save the job in the database and write the wrapper. Args: debug (bool): Debug Mode """ self._job_id = self.save() print( "The job " + self.job_name + " was saved and received the ID: " + str(self._job_id) ) self.status.created = True self._calculate_predecessor() @staticmethod def _dict2calculator(class_path, class_dict): module_loaded = importlib.import_module(".".join(class_path.split(".")[:-1])) module_class = getattr(module_loaded, class_path.split(".")[-1]) return module_class(**class_dict) @property def structure(self): return self._structure @structure.setter def structure(self, basis): self._structure = basis
[docs] def to_hdf(self, hdf=None, group_name=None): super(AseJob, self).to_hdf(hdf=hdf, group_name=group_name) with self.project_hdf5.open("input") as hdf_input: hdf_input["structure"] = self._todict()
[docs] def from_hdf(self, hdf=None, group_name=None): super(AseJob, self).from_hdf(hdf=hdf, group_name=group_name) with self.project_hdf5.open("input") as hdf_input: self.structure = self._fromdict(hdf_input["structure"])
[docs] def run_static(self): raise ValueError("ASEJob requires interactive mode.")
[docs] def run_if_interactive(self): self.status.running = True self._structure.calc.set_label(self.working_directory + "/") self.structure.calc.calculate(self.structure) self.interactive_collect()
[docs] def interactive_close(self): self.status.collect = True if ( len(list(self.interactive_cache.keys())) > 0 and len(self.interactive_cache[list(self.interactive_cache.keys())[0]]) != 0 ): self.interactive_flush(path="interactive") with self.project_hdf5.open("output") as h5: if "interactive" in h5.list_groups(): for key in h5["interactive"].list_nodes(): h5["generic/" + key] = h5["interactive/" + key] self.status.finished = True
[docs] def interactive_forces_getter(self): return self._structure.get_forces()
[docs] def interactive_energy_pot_getter(self): return self._structure.get_potential_energy()
[docs] def interactive_indices_getter(self): element_lst = sorted(list(set(self._structure.get_chemical_symbols()))) return np.array( [element_lst.index(el) for el in self._structure.get_chemical_symbols()] )
[docs] def interactive_positions_getter(self): return self._structure.positions.copy()
[docs] def interactive_steps_getter(self): return len(self.interactive_cache[list(self.interactive_cache.keys())[0]])
[docs] def interactive_time_getter(self): return self.interactive_steps_getter()
[docs] def interactive_volume_getter(self): return self._structure.get_volume()
[docs] def interactive_cells_getter(self): return self._structure.cell.copy()
[docs] def interactive_collect(self): if "cells" in self.interactive_cache.keys(): self.interactive_cache["cells"].append(self.interactive_cells_getter()) if "energy_pot" in self.interactive_cache.keys(): self.interactive_cache["energy_pot"].append( self.interactive_energy_pot_getter() ) if "energy_tot" in self.interactive_cache.keys(): self.interactive_cache["energy_tot"].append( self.interactive_energy_tot_getter() ) if "forces" in self.interactive_cache.keys(): self.interactive_cache["forces"].append(self.interactive_forces_getter()) if "positions" in self.interactive_cache.keys(): self.interactive_cache["positions"].append( self.interactive_positions_getter() ) if "pressures" in self.interactive_cache.keys(): self.interactive_cache["pressures"].append( self.interactive_pressures_getter() ) if "steps" in self.interactive_cache.keys(): self.interactive_cache["steps"].append(self.interactive_steps_getter()) if "temperatures" in self.interactive_cache.keys(): self.interactive_cache["temperatures"].append( self.interactive_temperatures_getter() ) if "time" in self.interactive_cache.keys(): self.interactive_cache["time"].append(self.interactive_time_getter()) if "indices" in self.interactive_cache.keys(): self.interactive_cache["indices"].append(self.interactive_indices_getter()) if "spin_constraints" in self.interactive_cache.keys(): self.interactive_cache["spin_constraints"].append( self.interactive_spin_constraints_getter() ) if "magnetic_forces" in self.interactive_cache.keys(): if np.any(self._structure_current.get_initial_magnetic_moments()): self.interactive_cache["magnetic_forces"].append( self.interactive_magnetic_forces_getter() ) else: del self.interactive_cache["magnetic_forces"] if "unwrapped_positions" in self.interactive_cache.keys(): self.interactive_cache["unwrapped_positions"].append( self.interactive_unwrapped_positions_getter() ) if "volume" in self.interactive_cache.keys(): self.interactive_cache["volume"].append(self.interactive_volume_getter()) if ( len(list(self.interactive_cache.keys())) > 0 and len(self.interactive_cache[list(self.interactive_cache.keys())[0]]) % self._interactive_flush_frequency == 0 ): self.interactive_flush(path="interactive")
[docs]class AseAdapter(object): def __init__(self, ham, fast_mode=False): self._ham = ham self._fast_mode = fast_mode if self._ham.server.run_mode.interactive and fast_mode: self.interactive_cache = { "velocities": [], "energy_kin": [], "momenta": [], "positions": [], "energy_pot": [] } self._ham.run() self._ham.interactive_cache = {} elif self._ham.server.run_mode.interactive: self.interactive_cache = { "velocities": [], "energy_kin": [], "momenta": [] } self.constraints = [] try: self.arrays = { "positions": self._ham.structure.positions.copy(), "numbers": self._ham.structure.numbers, } except AttributeError: self.arrays = { "positions": self._ham.structure.positions.copy(), "numbers": self._ham.structure.get_atomic_numbers(), } @property def communicator(self): return None
[docs] def get_masses(self): return np.array(self._ham.structure.get_masses())
[docs] def get_positions(self): return self.arrays["positions"]
[docs] def set_positions(self, positions): self.arrays["positions"] = positions
[docs] def get_forces(self, md=True): if self._fast_mode: self._ham.interactive_positions_setter(self.arrays["positions"]) self.interactive_cache["positions"].append(self.arrays["positions"]) self._ham.interactive_execute() self.interactive_cache["energy_pot"].append(self._ham.interactive_energy_pot_getter()) return np.array(self._ham.interactive_forces_getter()) else: self._ham.structure.positions = self.arrays["positions"] if self._ham.server.run_mode.interactive: self._ham.run() else: self._ham.run(run_again=True) return self._ham.output.forces[-1]
[docs] def interactive_close(self): self._ham.interactive_store_in_cache( "velocities", self.interactive_cache["velocities"] ) self._ham.interactive_store_in_cache( "energy_kin", self.interactive_cache["energy_kin"] ) if self._fast_mode: self._ham.interactive_store_in_cache( "positions", self.interactive_cache["positions"] ) self._ham.interactive_store_in_cache( "energy_pot", self.interactive_cache["energy_pot"][::2] ) self._ham.interactive_store_in_cache( "energy_tot", ( np.array(self.interactive_cache["energy_pot"][::2]) + np.array(self.interactive_cache["energy_kin"]) ).tolist(), ) else: self._ham.interactive_store_in_cache( "energy_tot", ( np.array(self._ham.output.energy_pot)[::2] + np.array(self.interactive_cache["energy_kin"]) ).tolist(), ) self._ham.interactive_close()
[docs] def get_number_of_atoms(self): return self._ham.structure.get_number_of_atoms()
# ASE functions
[docs] def get_kinetic_energy(self): """Get the kinetic energy.""" momenta = self.arrays.get("momenta") if momenta is None: return 0.0 return 0.5 * np.vdot(momenta, self.get_velocities())
[docs] def set_momenta(self, momenta, apply_constraint=True): """Set momenta.""" if apply_constraint and len(self.constraints) > 0 and momenta is not None: momenta = np.array(momenta) # modify a copy for constraint in self.constraints: if hasattr(constraint, "adjust_momenta"): constraint.adjust_momenta(self, momenta) self.set_array("momenta", momenta, float, (3,)) self.interactive_cache["velocities"].append(self.get_velocities()) self.interactive_cache["energy_kin"].append(self.get_kinetic_energy())
[docs] def set_velocities(self, velocities): """Set the momenta by specifying the velocities.""" self.set_momenta(self.get_masses()[:, np.newaxis] * velocities)
[docs] def get_momenta(self): """Get array of momenta.""" if "momenta" in self.arrays: return self.arrays["momenta"].copy() else: return np.zeros((len(self), 3))
[docs] def set_array(self, name, a, dtype=None, shape=None): """Update array. If *shape* is not *None*, the shape of *a* will be checked. If *a* is *None*, then the array is deleted.""" b = self.arrays.get(name) if b is None: if a is not None: self.new_array(name, a, dtype, shape) else: if a is None: del self.arrays[name] else: a = np.asarray(a) if a.shape != b.shape: raise ValueError( "Array has wrong shape %s != %s." % (a.shape, b.shape) ) b[:] = a
[docs] def get_angular_momentum(self): """Get total angular momentum with respect to the center of mass.""" com = self.get_center_of_mass() positions = self.get_positions() positions -= com # translate center of mass to origin return np.cross(positions, self.get_momenta()).sum(0)
[docs] def new_array(self, name, a, dtype=None, shape=None): """Add new array. If *shape* is not *None*, the shape of *a* will be checked.""" if dtype is not None: a = np.array(a, dtype, order="C") if len(a) == 0 and shape is not None: a.shape = (-1,) + shape else: if not a.flags["C_CONTIGUOUS"]: a = np.ascontiguousarray(a) else: a = a.copy() if name in self.arrays: raise RuntimeError for b in self.arrays.values(): if len(a) != len(b): raise ValueError("Array has wrong length: %d != %d." % (len(a), len(b))) break if shape is not None and a.shape[1:] != shape: raise ValueError( "Array has wrong shape %s != %s." % (a.shape, (a.shape[0:1] + shape)) ) self.arrays[name] = a
[docs] def has(self, name): """Check for existence of array. name must be one of: 'tags', 'momenta', 'masses', 'initial_magmoms', 'initial_charges'.""" # XXX extend has to calculator properties return name in self.arrays
[docs] def get_center_of_mass(self, scaled=False): """Get the center of mass. If scaled=True the center of mass in scaled coordinates is returned.""" m = self.get_masses() com = np.dot(m, self.arrays["positions"]) / m.sum() if scaled: if self._fast_mode: return np.linalg.solve(self._ham.structure.cells[-1].T, com) else: return np.linalg.solve(self._ham.output.cells[-1].T, com) else: return com
[docs] def get_velocities(self): """Get array of velocities.""" momenta = self.arrays.get("momenta") if momenta is None: return None m = self.get_masses() # m = self.arrays.get('masses') # if m is None: # m = atomic_masses[self.arrays['numbers']] return momenta / m.reshape(-1, 1)
def __len__(self): return len(self._ham.structure)