# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
from ase.io import write as ase_write
import copy
import numpy as np
import warnings
from pyiron.atomistics.structure.atoms import Atoms
from pyiron.base.generic.parameters import GenericParameters
from pyiron.base.job.generic import GenericJob as GenericJobCore
from pyiron.base.master.generic import GenericMaster
try:
from pyiron.base.project import ProjectGUI
except (ImportError, TypeError, AttributeError):
pass
__author__ = "Jan Janssen"
__copyright__ = (
"Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
"Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "production"
__date__ = "Sep 1, 2017"
[docs]class AtomisticGenericJob(GenericJobCore):
"""
Atomistic Generic Job class extends the Generic Job class with all the functionality to run jobs containing
atomistic structures. From this class all specific atomistic Hamiltonians are derived. Therefore it should contain
the properties/routines common to all atomistic jobs. The functions in this module should be as generic as possible.
Args:
project (ProjectHDFio): ProjectHDFio instance which points to the HDF5 file the job is stored in
job_name (str): name of the job, which has to be unique within the project
Attributes:
.. attribute:: job_name
name of the job, which has to be unique within the project
.. attribute:: status
execution status of the job, can be one of the following [initialized, appended, created, submitted, running,
aborted, collect, suspended, refresh, busy, finished]
.. attribute:: job_id
unique id to identify the job in the pyiron database
.. attribute:: parent_id
job id of the predecessor job - the job which was executed before the current one in the current job series
.. attribute:: master_id
job id of the master job - a meta job which groups a series of jobs, which are executed either in parallel or in
serial.
.. attribute:: child_ids
list of child job ids - only meta jobs have child jobs - jobs which list the meta job as their master
.. attribute:: project
Project instance the jobs is located in
.. attribute:: project_hdf5
ProjectHDFio instance which points to the HDF5 file the job is stored in
.. attribute:: job_info_str
short string to describe the job by it is job_name and job ID - mainly used for logging
.. attribute:: working_directory
working directory of the job is executed in - outside the HDF5 file
.. attribute:: path
path to the job as a combination of absolute file system path and path within the HDF5 file.
.. attribute:: version
Version of the hamiltonian, which is also the version of the executable unless a custom executable is used.
.. attribute:: executable
Executable used to run the job - usually the path to an external executable.
.. attribute:: library_activated
For job types which offer a Python library pyiron can use the python library instead of an external executable.
.. attribute:: server
Server object to handle the execution environment for the job.
.. attribute:: queue_id
the ID returned from the queuing system - it is most likely not the same as the job ID.
.. attribute:: logger
logger object to monitor the external execution and internal pyiron warnings.
.. attribute:: restart_file_list
list of files which are used to restart the calculation from these files.
.. attribute:: job_type
Job type object with all the available job types: ['ExampleJob', 'SerialMaster', 'ParallelMaster', 'ScriptJob',
'ListMaster']
"""
def __init__(self, project, job_name):
super(AtomisticGenericJob, self).__init__(project, job_name)
self.__name__ = "AtomisticGenericJob"
self.__version__ = "0.1"
self._structure = None
self._generic_input = GenericInput()
self.output = GenericOutput(job=self)
self.map_functions = MapFunctions()
@property
def structure(self):
"""
Returns:
"""
return self._structure
@structure.setter
def structure(self, basis):
"""
Args:
basis:
Returns:
"""
self._generic_input["structure"] = "atoms"
self._structure = basis
[docs] def copy_to(
self, project=None, new_job_name=None, input_only=False, new_database_entry=True
):
"""
Args:
destination:
new_job_name:
input_only:
new_database_entry:
Returns:
"""
new_generic_job = super(AtomisticGenericJob, self).copy_to(
project=project,
new_job_name=new_job_name,
input_only=input_only,
new_database_entry=new_database_entry,
)
if not new_generic_job._structure:
new_generic_job._structure = copy.copy(self._structure)
return new_generic_job
[docs] def calc_minimize(
self, e_tol=0, f_tol=1e-4, max_iter=1000, pressure=None, n_print=1
):
"""
Args:
e_tol:
f_tol:
max_iter:
pressure:
n_print:
Returns:
"""
self._generic_input["calc_mode"] = "minimize"
self._generic_input["max_iter"] = max_iter
self._generic_input["pressure"] = pressure
self._generic_input.remove_keys(
["temperature", "n_ionic_steps", "n_print", "velocity"]
)
[docs] def calc_static(self):
"""
Returns:
"""
self._generic_input["calc_mode"] = "static"
self._generic_input.remove_keys(
[
"max_iter",
"pressure",
"temperature",
"n_ionic_steps",
"n_print",
"velocity",
]
)
[docs] def calc_md(
self,
temperature=None,
pressure=None,
n_ionic_steps=1000,
time_step=None,
n_print=100,
temperature_damping_timescale=100.0,
pressure_damping_timescale=None,
seed=None,
tloop=None,
initial_temperature=True,
langevin=False,
):
self._generic_input["calc_mode"] = "md"
self._generic_input["temperature"] = temperature
self._generic_input["n_ionic_steps"] = n_ionic_steps
self._generic_input["n_print"] = n_print
self._generic_input.remove_keys(["max_iter", "pressure"])
[docs] def from_hdf(self, hdf=None, group_name=None):
"""
Recreates instance from the hdf5 file
Args:
hdf (str): Path to the hdf5 file
group_name (str): Name of the group which contains the object
"""
super(AtomisticGenericJob, self).from_hdf(hdf=hdf, group_name=group_name)
with self._hdf5.open("input") as hdf5_input:
try:
self._generic_input.from_hdf(hdf5_input)
except ValueError:
pass
[docs] def to_hdf(self, hdf=None, group_name=None):
"""
Store the GenericJob in an HDF5 file
Args:
hdf (ProjectHDFio): HDF5 group object - optional
group_name (str): HDF5 subgroup name - optional
"""
super(AtomisticGenericJob, self).to_hdf(hdf=hdf, group_name=group_name)
with self._hdf5.open("input") as hdf5_input:
self._generic_input.to_hdf(hdf5_input)
[docs] def store_structure(self):
"""
Returns:
"""
if self.structure is not None:
structure_container = self.create_job(
job_type=self.project.job_type.StructureContainer,
job_name=self.job_name + "_structure",
)
structure_container.structure = self.structure
self.parent_id = structure_container.job_id
else:
ValueError("There is no structure attached to the current Job.")
[docs] def animate_structure(
self,
spacefill=True,
show_cell=True,
stride=1,
center_of_mass=False,
particle_size=0.5,
):
"""
Animates the job if a trajectory is present
Args:
spacefill (bool):
show_cell (bool):
stride (int): show animation every stride [::stride]
use value >1 to make animation faster
default=1
center_of_mass (bool):
Returns:
animation: nglview IPython widget
"""
try:
import nglview
except ImportError:
raise ImportError(
"The animate() function requires the package nglview to be installed"
)
animation = nglview.show_asetraj(
self.trajectory(stride=stride, center_of_mass=center_of_mass)
)
if spacefill:
animation.add_spacefill(radius_type="vdw", scale=0.5, radius=particle_size)
animation.remove_ball_and_stick()
else:
animation.add_ball_and_stick()
if show_cell:
if self.structure.cell is not None:
animation.add_unitcell()
return animation
[docs] def view_structure(self, snapshot=-1, spacefill=True, show_cell=True):
"""
Args:
snapshot (int): Snapshot of the trajectory one wants
spacefill (bool):
show_cell (bool):
Returns:
view: nglview IPython widget
"""
import nglview
atoms = self.get_structure(snapshot)
picture = nglview.show_ase(atoms)
if spacefill:
picture.add_spacefill(radius_type="vdw", scale=0.5)
picture.remove_ball_and_stick()
else:
picture.add_ball_and_stick()
if show_cell:
if atoms.cell is not None:
picture.add_unitcell()
return picture
[docs] def validate_ready_to_run(self):
"""
Returns:
"""
if not self.structure and self._generic_input["structure"] == "atoms":
raise ValueError(
"This job does not contain a valid structure: {}".format(self.job_name)
)
[docs] def db_entry(self):
"""
Generate the initial database entry
Returns:
(dict): db_dict
"""
db_dict = super(AtomisticGenericJob, self).db_entry()
if self.structure:
if isinstance(self.structure, Atoms):
parent_structure = self.structure.get_parent_basis()
else:
parent_structure = self.structure.copy()
db_dict["ChemicalFormula"] = parent_structure.get_chemical_formula()
return db_dict
[docs] def restart(self, job_name=None, job_type=None):
"""
Restart a new job created from an existing calculation.
Args:
project (pyiron.project.Project instance): Project instance at which the new job should be created
job_name (str): Job name
job_type (str): Job type
Returns:
new_ham: New job
"""
new_ham = super(AtomisticGenericJob, self).restart(
job_name=job_name, job_type=job_type
)
if isinstance(new_ham, GenericMaster) and not isinstance(self, GenericMaster):
new_child = self.restart(job_name=None, job_type=None)
new_ham.append(new_child)
new_ham.structure = self.get_structure(iteration_step=-1)
if new_ham.structure is None:
new_ham.structure = self.structure.copy()
new_ham._generic_input['structure'] = 'atoms'
return new_ham
# Required functions
[docs] def continue_with_restart_files(self, job_type=None, job_name=None):
"""
Args:
job_type:
job_name:
Returns:
"""
if job_name is None:
job_name = "{}_continue".format(self.job_name)
new_ham = self.restart(job_type=job_type, job_name=job_name)
if self.status.initialized:
self._job_id = self.save()
new_ham.parent_id = self.job_id
new_ham._generic_input["structure"] = "continue_final"
return new_ham
[docs] def continue_with_final_structure(self, job_type=None, job_name=None):
"""
Args:
job_type:
job_name:
Returns:
"""
if job_name is None:
job_name = "{}_continue".format(self.job_name)
if job_type is None:
job_type = self.__name__
new_ham = self.create_job(job_type, job_name)
if self.status.initialized:
self._job_id = self.save()
new_ham.parent_id = self.job_id
if self.status.finished:
new_ham.structure = self.get_structure(iteration_step=-1)
new_ham._generic_input["structure"] = "atoms"
else:
new_ham._generic_input["structure"] = "continue_final"
return new_ham
[docs] def trajectory(
self, stride=1, center_of_mass=False, atom_indices=None,
snapshot_indices=None, overwrite_positions=None, overwrite_cells=None
):
"""
Args:
stride (int): The trajectories are generated with every 'stride' steps
center_of_mass (list/numpy.ndarray): The center of mass
atom_indices (list/numpy.ndarray): The atom indices for which the trajectory should be generated
snapshot_indices (list/numpy.ndarray): The snapshots for which the trajectory should be generated
overwrite_positions (list/numpy.ndarray): List of positions that are meant to overwrite the existing
trajectory. Useful to wrap coordinates for example
overwrite_cells(list/numpy.ndarray): List of cells that are meant to overwrite the existing
trajectory. Only used when `overwrite_positions` is defined. This must
have the same length of `overwrite_positions`
Returns:
pyiron.atomistics.job.atomistic.Trajectory: Trajectory instance
"""
cells = self.output.cells
if len(self.output.indices) != 0:
indices = self.output.indices
else:
indices = [self.structure.indices] * len(cells) # Use the same indices throughout
if overwrite_positions is not None:
positions = np.array(overwrite_positions).copy()
if overwrite_cells is not None:
if overwrite_cells.shape == (len(positions), 3, 3):
cells = np.array(overwrite_cells).copy()
else:
raise ValueError("overwrite_cells must be compatible with the positions!")
else:
positions = self.output.positions.copy()
conditions = list()
if isinstance(cells, (list, np.ndarray)):
if len(cells) == 0:
conditions.append(True)
else:
conditions.append(cells[0] is None)
conditions.append(cells is None)
if any(conditions):
max_pos = np.max(np.max(positions, axis=0), axis=0)
max_pos[np.abs(max_pos) < 1e-2] = 10
cell = np.eye(3) * max_pos
cells = np.array([cell] * len(positions))
if len(positions) != len(cells):
raise ValueError("The positions must have the same length as the cells!")
if snapshot_indices is not None:
positions = positions[snapshot_indices]
cells = cells[snapshot_indices]
indices = indices[snapshot_indices]
if atom_indices is None:
return Trajectory(
positions[::stride],
self.structure.get_parent_basis(),
center_of_mass=center_of_mass,
cells=cells[::stride],
indices=indices[::stride]
)
else:
sub_struct = self.structure.get_parent_basis()[atom_indices]
if len(sub_struct.species) < len(self.structure.species):
# Then `sub_struct` has had its indices remapped so they run from 0 to the number of species - 1
# But the `indices` array is unaware of this and needs to be remapped to this new space
original_symbols = np.array([el.Abbreviation for el in self.structure.species])
sub_symbols = np.array([el.Abbreviation for el in sub_struct.species])
map_ = np.array([np.argwhere(original_symbols == symbol)[0, 0] for symbol in sub_symbols], dtype=int)
remapped_indices = np.array(indices)
for i_sub, i_original in enumerate(map_):
np.place(remapped_indices, indices == i_original, i_sub)
else:
remapped_indices = indices
return Trajectory(
positions[::stride, atom_indices, :],
sub_struct,
center_of_mass=center_of_mass,
cells=cells[::stride],
indices=remapped_indices[::stride, atom_indices]
)
[docs] def write_traj(
self,
filename,
file_format=None,
parallel=True,
append=False,
stride=1,
center_of_mass=False,
atom_indices=None,
snapshot_indices=None,
overwrite_positions=None,
overwrite_cells=None,
**kwargs
):
"""
Writes the trajectory in a given file file_format based on the `ase.io.write`_ function.
Args:
filename (str): Filename of the output
file_format (str): The specific file_format of the output
parallel (bool): ase parameter
append (bool): ase parameter
stride (int): Writes trajectory every `stride` steps
center_of_mass (bool): True if the positions are centered on the COM
atom_indices (list/numpy.ndarray): The atom indices for which the trajectory should be generated
snapshot_indices (list/numpy.ndarray): The snapshots for which the trajectory should be generated
overwrite_positions (list/numpy.ndarray): List of positions that are meant to overwrite the existing
trajectory. Useful to wrap coordinates for example
overwrite_cells(list/numpy.ndarray): List of cells that are meant to overwrite the existing
trajectory. Only used when `overwrite_positions` is defined. This must
have the same length of `overwrite_positions`
**kwargs: Additional ase arguments
.. _ase.io.write: https://wiki.fysik.dtu.dk/ase/_modules/ase/io/formats.html#write
"""
traj = self.trajectory(
stride=stride,
center_of_mass=center_of_mass,
atom_indices=atom_indices,
snapshot_indices=snapshot_indices,
overwrite_positions=overwrite_positions,
overwrite_cells=overwrite_cells
)
# Using thr ASE output writer
ase_write(
filename=filename,
images=traj,
format=file_format,
parallel=parallel,
append=append,
**kwargs
)
# Compatibility functions
[docs] def get_final_structure(self):
"""
Returns:
"""
warnings.warn(
"get_final_structure() is deprecated - please use get_structure() instead.",
DeprecationWarning,
)
return self.get_structure(iteration_step=-1)
[docs] def set_kpoints(
self,
mesh=None,
scheme="MP",
center_shift=None,
symmetry_reduction=True,
manual_kpoints=None,
weights=None,
reciprocal=True,
):
"""
Args:
mesh:
scheme:
center_shift:
symmetry_reduction:
manual_kpoints:
weights:
reciprocal:
Returns:
"""
raise NotImplementedError(
"The set_kpoints function is not implemented for this code."
)
[docs] def set_encut(self, encut):
"""
Args:
encut:
Returns:
"""
raise NotImplementedError(
"The set_encut function is not implemented for this code."
)
[docs] def get_encut(self):
raise NotImplementedError(
"The set_encut function is not implemented for this code."
)
[docs] def get_structure(self, iteration_step=-1, wrap_atoms=True):
"""
Gets the structure from a given iteration step of the simulation (MD/ionic relaxation). For static calculations
there is only one ionic iteration step
Args:
iteration_step (int): Step for which the structure is requested
wrap_atoms (bool): True if the atoms are to be wrapped back into the unit cell
Returns:
pyiron.atomistics.structure.atoms.Atoms: The required structure
"""
if not (self.structure is not None):
raise AssertionError()
snapshot = self.structure.copy()
conditions = list()
if isinstance(self.output.cells, (list, np.ndarray)):
if len(self.output.cells) == 0:
conditions.append(True)
else:
conditions.append(self.output.cells[0] is None)
conditions.append(self.output.cells is None)
if any(conditions):
snapshot.cell = None
else:
snapshot.cell = self.output.cells[iteration_step]
snapshot.positions = self.output.positions[iteration_step]
indices = self.output.indices
if indices is not None and len(indices) > max([iteration_step, 0]):
snapshot.indices = indices[iteration_step]
if wrap_atoms:
return snapshot.center_coordinates_in_unit_cell()
else:
if len(self.output.unwrapped_positions) > max([iteration_step, 0]):
snapshot.positions = self.output.unwrapped_positions[iteration_step]
else:
snapshot.positions += self.output.total_displacements[iteration_step]
return snapshot
[docs] def map(self, function, parameter_lst):
master = self.create_job(
job_type=self.project.job_type.MapMaster, job_name="map_" + self.job_name
)
master.modify_function = function
master.parameter_list = parameter_lst
return master
[docs] def gui(self):
"""
Returns:
"""
ProjectGUI(self)
def _structure_to_hdf(self):
if self.structure is not None and self._generic_input["structure"] == "atoms":
with self.project_hdf5.open("input") as hdf5_input:
self.structure.to_hdf(hdf5_input)
def _structure_from_hdf(self):
if (
"structure" in self.project_hdf5["input"].list_groups()
and self._generic_input["structure"] == "atoms"
):
with self.project_hdf5.open("input") as hdf5_input:
self.structure = Atoms().from_hdf(hdf5_input)
def _write_chemical_formular_to_database(self):
if self.structure:
parent_structure = self.structure.get_parent_basis()
self.project.db.item_update(
{"ChemicalFormula": parent_structure.get_chemical_formula()},
self._job_id,
)
def _before_successor_calc(self, ham):
if ham._generic_input["structure"] == "continue_final":
ham.structure = self.get_structure(iteration_step=-1)
ham.to_hdf()
[docs]def set_encut(job, parameter):
job.set_encut(parameter)
return job
[docs]def set_kpoints(job, parameter):
job.set_kpoints(parameter)
return job
[docs]def set_structure(job, parameter):
job.structure = parameter
return job
[docs]class MapFunctions(object):
def __init__(self):
self.set_structure = set_structure
self.set_encut = set_encut
self.set_kpoints = set_kpoints
[docs]class Trajectory(object):
"""
A trajectory instance compatible with the ase.io class
Args:
positions (numpy.ndarray): The array of the trajectory in cartesian coordinates
structure (pyiron.atomistics.structure.atoms.Atoms): The initial structure instance from which the species info
is derived
center_of_mass (bool): False (default) if the specified positions are w.r.t. the origin
cells (numpy.ndarray): Optional argument of the cell shape at every time step (Nx3x3 array) when the volume
varies
"""
def __init__(self, positions, structure, center_of_mass=False, cells=None, indices=None):
if center_of_mass:
pos = np.copy(positions)
pos[:, :, 0] = (pos[:, :, 0].T - np.mean(pos[:, :, 0], axis=1)).T
pos[:, :, 1] = (pos[:, :, 1].T - np.mean(pos[:, :, 1], axis=1)).T
pos[:, :, 2] = (pos[:, :, 2].T - np.mean(pos[:, :, 2], axis=1)).T
self._positions = pos
else:
self._positions = positions
self._structure = structure
self._cells = cells
self._indices = indices
def __getitem__(self, item):
new_structure = self._structure.copy()
if self._cells is not None:
new_structure.cell = self._cells[item]
if self._indices is not None:
new_structure.indices = self._indices[item]
new_structure.positions = self._positions[item]
# This step is necessary for using ase.io.write for trajectories
new_structure.arrays["positions"] = new_structure.positions
# new_structure.arrays['cells'] = new_structure.cell
return new_structure
def __len__(self):
return len(self._positions)
[docs]class GenericOutput(object):
def __init__(self, job):
self._job = job
@property
def cells(self):
return self._job["output/generic/cells"]
@property
def energy_pot(self):
return self._job["output/generic/energy_pot"]
@property
def energy_tot(self):
return self._job["output/generic/energy_tot"]
@property
def forces(self):
return self._job["output/generic/forces"]
@property
def force_max(self):
"""
maximum force magnitude of each step which is used for
convergence criterion of structure optimizations
"""
return np.linalg.norm(self.forces, axis=-1).max(axis=-1)
@property
def positions(self):
return self._job["output/generic/positions"]
@property
def pressures(self):
return self._job["output/generic/pressures"]
@property
def steps(self):
return self._job["output/generic/steps"]
@property
def temperature(self):
return self._job["output/generic/temperature"]
@property
def computation_time(self):
return self._job["output/generic/computation_time"]
@property
def unwrapped_positions(self):
unwrapped_positions = self._job["output/generic/unwrapped_positions"]
if unwrapped_positions is not None:
return unwrapped_positions
else:
return self._job.structure.positions+self.total_displacements
@property
def volume(self):
return self._job["output/generic/volume"]
@property
def indices(self):
return self._job["output/generic/indices"]
@property
def displacements(self):
"""
Output for 3-d displacements between successive snapshots, with minimum image convention.
For the total displacements from the initial configuration, use total_displacements
This algorithm collapses if:
- the ID's are not consistent (i.e. you can also not change the number of atoms)
- there are atoms which move by more than half a box length in any direction within two snapshots (due to
periodic boundary conditions)
"""
# Check if the volume changes in any snapshot
vol = np.linalg.det(self.cells)
varying_cell = np.sqrt(np.average((vol - vol[0])**2)) > 1e-5
return self.get_displacements(self._job.structure, self.positions, self.cells, varying_cell=varying_cell)
[docs] @staticmethod
def get_displacements(structure, positions, cells, varying_cell=False):
"""
Output for 3-d displacements between successive snapshots, with minimum image convention.
For the total displacements from the initial configuration, use total_displacements
This algorithm collapses if:
- the ID's are not consistent (i.e. you can also not change the number of atoms)
- there are atoms which move by more than half a box length in any direction within two snapshots (due to
periodic boundary conditions)
Args:
structure (pyiron.atomistics.structure.atoms.Atoms): The initial structure
positions (numpy.ndarray/list): List of positions in cartesian coordinates (N_steps x N_atoms x 3)
cells (numpy.ndarray/list): List of cells (N_steps x 3 x 3)
varying_cell (bool): True if the cell shape varies during the trajectory (raises a warning)
Returns:
numpy.ndarray: Displacements (N_steps x N_atoms x 3)
"""
if not varying_cell:
displacement = np.tensordot(positions, np.linalg.inv(cells[-1]), axes=([2, 0]))
displacement -= np.append(structure.get_scaled_positions(),
displacement).reshape(len(positions) + 1, len(structure), 3)[:-1]
displacement -= np.rint(displacement)
displacement = np.tensordot(displacement, cells[-1], axes=([2, 0]))
else:
warnings.warn("You are computing displacements in a simulation with periodic boundary conditions \n"
"and a varying cell shape.")
displacement = np.array(
[np.tensordot(pos, np.linalg.inv(cell), axes=([1, 1])) for pos, cell in zip(positions, cells)])
displacement -= np.append(structure.get_scaled_positions(),
displacement).reshape(len(positions) + 1, len(structure), 3)[:-1]
displacement -= np.rint(displacement)
displacement = np.einsum('nki,nji->nkj', displacement, cells)
return displacement
@property
def total_displacements(self):
"""
Output for 3-d total displacements from the initial configuration, with minimum image convention.
For the diplacements for the successive snapshots, use displacements
This algorithm collapses if:
- the ID's are not consistent (i.e. you can also not change the number of atoms)
- there are atoms which move by more than half a box length in any direction within two snapshots (due to periodic boundary conditions)
"""
return np.cumsum(self.displacements, axis=0)
def __dir__(self):
hdf5_path = self._job["output/generic"]
if hdf5_path is not None:
return hdf5_path.list_nodes()
else:
return []