from pyiron.base.generic.parameters import GenericParameters
from pyiron.atomistics.structure.atoms import Atoms
from pyiron.atomistics.job.atomistic import AtomisticGenericJob, GenericOutput
from pyiron.base.settings.generic import Settings
from yaff import System, log, ForceField
from quickff.tools import set_ffatypes
log.set_level(log.silent)
from molmod.units import *
from molmod.constants import *
from molmod.periodic import periodic as pt
import subprocess
import os, posixpath, numpy as np, h5py, matplotlib.pyplot as pp
[docs]def write_chk(input_dict,working_directory='.'):
# collect data and initialize Yaff system
if 'cell' in input_dict.keys() and input_dict['cell'] is not None:
system = System(input_dict['numbers'], input_dict['pos']*angstrom, ffatypes=input_dict['ffatypes'], ffatype_ids=input_dict['ffatype_ids'], rvecs=input_dict['cell']*angstrom)
else:
system = System(input_dict['numbers'], input_dict['pos']*angstrom, ffatypes=input_dict['ffatypes'], ffatype_ids=input_dict['ffatype_ids'])
# determine masses, bonds and ffaypes from ffatype_rules
system.detect_bonds()
system.set_standard_masses()
# write dictionary to MolMod CHK file
system.to_file(posixpath.join(working_directory,'system.chk'))
[docs]def write_pars(input_dict,working_directory='.'):
with open(posixpath.join(working_directory,'pars.txt'), 'w') as f:
for line in input_dict['ffpars']:
f.write(line)
common = """#! /usr/bin/python
from molmod.units import *
from yaff import *
import h5py, numpy as np
#Setting up system and force field
system = System.from_file('system.chk')
ff = ForceField.generate(system, 'pars.txt', rcut={rcut}*angstrom, alpha_scale={alpha_scale}, gcut_scale={gcut_scale}, smooth_ei={smooth_ei})
#Setting up output
f = h5py.File('output.h5', mode='w')
hdf5 = HDF5Writer(f, step={h5step})
r = h5py.File('restart.h5', mode='w')
restart = RestartWriter(r, step=10000)
hooks = [hdf5, restart]
#Setting up simulation
"""
tail = """
"""
[docs]def write_yopt(input_dict,working_directory='.'):
body = common.format(
rcut=input_dict['rcut']/angstrom, alpha_scale=input_dict['alpha_scale'],
gcut_scale=input_dict['gcut_scale'], smooth_ei=input_dict['smooth_ei'],
h5step=1,
)
body += "dof = CartesianDOF(ff, gpos_rms={gpos_rms}, dpos_rms={dpos_rms})".format(
gpos_rms=input_dict['gpos_rms'],dpos_rms=input_dict['dpos_rms']
)
body += """
opt = CGOptimizer(dof, hooks=[hdf5])
opt.run({nsteps})
system.to_file('opt.chk')
""".format(nsteps=input_dict['nsteps'])
body+= tail
with open(posixpath.join(working_directory,'yscript.py'), 'w') as f:
f.write(body)
[docs]def write_yopt_cell(input_dict,working_directory='.'):
body = common.format(
rcut=input_dict['rcut']/angstrom, alpha_scale=input_dict['alpha_scale'],
gcut_scale=input_dict['gcut_scale'], smooth_ei=input_dict['smooth_ei'],
h5step=1,
)
body += "dof = StrainCellDOF(ff, gpos_rms={gpos_rms}, dpos_rms={dpos_rms}, grvecs_rms={grvecs_rms}, drvecs_rms={drvecs_rms}, do_frozen=False)".format(
gpos_rms=input_dict['gpos_rms'],dpos_rms=input_dict['dpos_rms'],
grvecs_rms=input_dict['grvecs_rms'],drvecs_rms=input_dict['drvecs_rms']
)
body += """
opt = CGOptimizer(dof, hooks=[hdf5])
opt.run({nsteps})
system.to_file('opt.chk')
""".format(nsteps=input_dict['nsteps'])
body+= tail
with open(posixpath.join(working_directory,'yscript.py'), 'w') as f:
f.write(body)
[docs]def write_ysp(input_dict,working_directory='.'):
body = common.format(
rcut=input_dict['rcut']/angstrom, alpha_scale=input_dict['alpha_scale'],
gcut_scale=input_dict['gcut_scale'], smooth_ei=input_dict['smooth_ei'],
h5step=1,
)
body +="""
energy = ff.compute()
system.to_hdf5(f)
f['system/energy'] = energy
"""
body+= tail
with open(posixpath.join(working_directory,'yscript.py'), 'w') as f:
f.write(body)
[docs]def write_yhess(input_dict,working_directory='.'):
body = common.format(
rcut=input_dict['rcut']/angstrom, alpha_scale=input_dict['alpha_scale'],
gcut_scale=input_dict['gcut_scale'], smooth_ei=input_dict['smooth_ei'],
h5step=1,
)
body +="""dof = CartesianDOF(ff)
gpos = np.zeros((len(system.numbers), 3), float)
vtens = np.zeros((3, 3), float)
energy = ff.compute(gpos, vtens)
hessian = estimate_hessian(dof, eps={hessian_eps})
system.to_hdf5(f)
f['system/energy'] = energy
f['system/gpos'] = gpos
f['system/hessian'] = hessian""".format(hessian_eps=input_dict['hessian_eps'])
body+= tail
with open(posixpath.join(working_directory,'yscript.py'), 'w') as f:
f.write(body)
[docs]def write_ynve(input_dict,working_directory='.'):
body = common.format(
rcut=input_dict['rcut']/angstrom, alpha_scale=input_dict['alpha_scale'],
gcut_scale=input_dict['gcut_scale'], smooth_ei=input_dict['smooth_ei'],
h5step=input_dict['h5step'],
)
if input_dict['enhanced'] is not None:
body += """
plumed = ForcePartPlumed(ff.system, fn='plumed.dat')
ff.add_part(plumed)
hooks.append(plumed)
"""
body += """
hooks.append(VerletScreenLog(step=1000))
md = VerletIntegrator(ff, {timestep}*femtosecond, hooks=hooks)
md.run({nsteps})
""".format(timestep=input_dict['timestep']/femtosecond, nsteps=input_dict['nsteps'])
body+= tail
with open(posixpath.join(working_directory,'yscript.py'), 'w') as f:
f.write(body)
[docs]def write_ynvt(input_dict,working_directory='.'):
body = common.format(
rcut=input_dict['rcut']/angstrom, alpha_scale=input_dict['alpha_scale'],
gcut_scale=input_dict['gcut_scale'], smooth_ei=input_dict['smooth_ei'],
h5step=input_dict['h5step'],
)
if input_dict['enhanced'] is not None:
body += """
plumed = ForcePartPlumed(ff.system, fn='plumed.dat')
ff.add_part(plumed)
hooks.append(plumed)
"""
body += """
temp = {temp}*kelvin
thermo = NHCThermostat(temp, timecon={timecon_thermo}*femtosecond)
hooks.append(thermo)
hooks.append(VerletScreenLog(step=1000))
md = VerletIntegrator(ff, {timestep}*femtosecond, hooks=hooks)
md.run({nsteps})
""".format(
temp=input_dict['temp']/kelvin,timestep=input_dict['timestep']/femtosecond,
timecon_thermo=input_dict['timecon_thermo']/femtosecond, nsteps=input_dict['nsteps']
)
body+= tail
with open(posixpath.join(working_directory,'yscript.py'), 'w') as f:
f.write(body)
[docs]def write_ynpt(input_dict,working_directory='.'):
body = common.format(
rcut=input_dict['rcut']/angstrom, alpha_scale=input_dict['alpha_scale'],
gcut_scale=input_dict['gcut_scale'], smooth_ei=input_dict['smooth_ei'],
h5step=input_dict['h5step'],
)
if input_dict['enhanced'] is not None:
body += """
plumed = ForcePartPlumed(ff.system, fn='plumed.dat')
ff.add_part(plumed)
hooks.append(plumed)
"""
body += """
temp = {temp}*kelvin
press = {press}*bar
thermo = NHCThermostat(temp, timecon={timecon_thermo}*femtosecond)
baro = MTKBarostat(ff, temp, press, timecon={timecon_baro}*femtosecond)
TBC = TBCombination(thermo, baro)
hooks.append(TBC)
hooks.append(VerletScreenLog(step=1000))
md = VerletIntegrator(ff, {timestep}*femtosecond, hooks=hooks)
md.run({nsteps})
""".format(
temp=input_dict['temp']/kelvin,timestep=input_dict['timestep']/femtosecond,
press=input_dict['press']/bar,timecon_thermo=input_dict['timecon_thermo']/femtosecond,
timecon_baro=input_dict['timecon_baro']/femtosecond, nsteps=input_dict['nsteps']
)
body+= tail
with open(posixpath.join(working_directory,'yscript.py'), 'w') as f:
f.write(body)
[docs]def write_plumed_enhanced(input_dict,working_directory='.'):
#make copy of input_dict['enhanced'] that includes lower case definitions
#(allowing for case insenstive definition of input_dict['enhanced'])
enhanced = {}
for key, value in input_dict['enhanced'].items():
enhanced[key] = value
enhanced[key.lower()] = value
#write plumed.dat file
with open(posixpath.join(working_directory, 'plumed.dat'), 'w') as f:
#set units to atomic units
f.write('UNITS LENGTH=Bohr ENERGY=kj/mol TIME=atomic \n')
#define ics
for i, kind in enumerate(enhanced['ickinds']):
if isinstance(kind, bytes):
kind = kind.decode()
if len( enhanced['icindices'][i] > 0):
f.write('ic%i: %s ATOMS=%s \n' %(i, kind.upper(), ','.join([str(icidx) for icidx in enhanced['icindices'][i]])))
else:
f.write('ic%i: %s \n' %(i, kind.upper()))
#define metadynamics run
if 'sigma' in enhanced.keys():
if len(enhanced['sigma'])==1:
sigma = '%.2f' %(enhanced['sigma'])
else:
assert len(enhanced['sigma'])>1
sigma = ','.join(['%.2f' %s for s in enhanced['sigma']])
if len(enhanced['height'])==1:
height = '%.2f' %(enhanced['height']/kjmol)
else:
assert len(enhanced['height'])>1
height = ','.join(['%.2f' %h/kjmol for h in enhanced['height']])
f.write('metad: METAD ARG=%s SIGMA=%s HEIGHT=%s PACE=%i FILE=%s \n' %(
','.join([ 'ic%i' %i for i in range(len(enhanced['ickinds'])) ]),
sigma, height, enhanced['pace'], enhanced['file']
))
#setup printing of colvar
f.write('PRINT ARG=%s,metad.bias FILE=%s STRIDE=%i \n' %(
','.join([ 'ic%i' %i for i in range(len(enhanced['ickinds'])) ]),
enhanced['file_colvar'], enhanced['stride']
))
# define umbrella sampling run
if 'kappa' in enhanced.keys():
if len(enhanced['kappa'])==1:
kappa = '%.2f' %(enhanced['kappa']/kjmol)
else:
assert len(enhanced['kappa'])>1
kappa = ','.join(['%.2f' %s/kjmol for s in enhanced['kappa']])
if len(enhanced['loc'])==1:
loc = '%.2f' %(enhanced['loc'])
else:
assert len(enhanced['loc'])>1
loc = ','.join(['%.2f' %h for h in enhanced['loc']])
f.write('umbrella: RESTRAINT ARG=%s KAPPA=%s AT=%s \n' %(
','.join([ 'ic%i' %i for i in range(len(enhanced['ickinds'])) ]),
kappa, loc
))
#setup printing of colvar
f.write('PRINT ARG=%s,umbrella.bias FILE=%s STRIDE=%i \n' %(
','.join([ 'ic%i' %i for i in range(len(enhanced['ickinds'])) ]),
enhanced['file_colvar'], enhanced['stride']
))
[docs]def hdf2dict(h5):
hdict = {}
hdict['structure/numbers'] = h5['system/numbers'][:]
hdict['structure/masses'] = h5['system/masses'][:]
hdict['structure/ffatypes'] = h5['system/ffatypes'][:]
hdict['structure/ffatype_ids'] = h5['system/ffatype_ids'][:]
if 'energy' in h5['system'].keys():
hdict['generic/energy_pot'] = h5['system/energy'][()]/electronvolt
if 'trajectory' in h5.keys() and 'pos' in h5['trajectory'].keys():
hdict['generic/positions'] = h5['trajectory/pos'][:]/angstrom
else:
hdict['generic/positions'] = np.array([h5['system/pos'][:]/angstrom])
if 'trajectory' in h5.keys() and 'pos' in h5['trajectory'].keys():
hdict['structure/positions'] = h5['trajectory/pos'][-1]/angstrom
else:
hdict['structure/positions'] = np.array([h5['system/pos'][:]/angstrom])
if 'trajectory' in h5.keys() and 'cell' in h5['trajectory']:
hdict['generic/cells'] = h5['trajectory/cell'][:]/angstrom
elif 'rvecs' in h5['system'].keys():
hdict['generic/cells'] = np.array([h5['system/rvecs'][:]/angstrom])
else:
hdict['generic/cells'] = None
if 'trajectory' in h5.keys():
if 'counter' in h5['trajectory'].keys():
hdict['generic/steps'] = h5['trajectory/counter'][:]
if 'time' in h5['trajectory'].keys():
hdict['generic/time'] = h5['trajectory/time'][:]
if 'volume' in h5['trajectory']:
hdict['generic/volume'] = h5['trajectory/volume'][:]/angstrom**3
if 'epot' in h5['trajectory'].keys():
hdict['generic/energy_pot'] = h5['trajectory/epot'][:]/electronvolt
if 'ekin' in h5['trajectory'].keys():
hdict['generic/energy_kin'] = h5['trajectory/ekin'][:]/electronvolt
if 'temp' in h5['trajectory'].keys():
hdict['generic/temperature'] = h5['trajectory/temp'][:]
if 'etot' in h5['trajectory'].keys():
hdict['generic/energy_tot'] = h5['trajectory/etot'][:]/electronvolt
if 'econs' in h5['trajectory'].keys():
hdict['generic/energy_cons'] = h5['trajectory/econs'][:]/electronvolt
if 'press' in h5['trajectory'].keys():
hdict['generic/pressure'] = h5['trajectory/press'][:]
if 'gradient' in h5['trajectory'].keys():
hdict['generic/forces'] = -h5['trajectory/gradient'][:]/(electronvolt/angstrom)
if 'hessian' in h5['system'].keys():
hdict['generic/forces'] = -h5['system/gpos'][:]/(electronvolt/angstrom)
hdict['generic/hessian'] = h5['system/hessian'][:]/(electronvolt/angstrom**2)
return hdict
[docs]def read_colvar(output_file,output_dict):
colvar_file = posixpath.join(output_file[:output_file.rfind('/')],'COLVAR')
if os.path.exists(colvar_file):
data = np.loadtxt(colvar_file)
output_dict['enhanced/time'] = data[:,0]
output_dict['enhanced/cv'] = data[:,1:-1]
output_dict['enhanced/bias'] = data[:,-1]
[docs]def collect_output(output_file):
# this routine basically reads and returns the output HDF5 file produced by Yaff
# read output
h5 = h5py.File(output_file, mode='r')
# translate to dict
output_dict = hdf2dict(h5)
# read colvar file if it is there
read_colvar(output_file,output_dict)
return output_dict
[docs]class Yaff(AtomisticGenericJob):
def __init__(self, project, job_name):
super(Yaff, self).__init__(project, job_name)
self.__name__ = "Yaff"
self._executable_activate(enforce=True)
self.input = YaffInput()
self.ffatypes = None
self.ffatype_ids = None
self.enhanced = None # should have more generic name e.g. enhanced
[docs] def calc_minimize(self, cell=False, gpos_tol=1e-8, dpos_tol=1e-6, grvecs_tol=1e-8, drvecs_tol=1e-6, max_iter=1000, n_print=5):
'''
Set up an optimization calculation.
**Arguments**
cell (bool): Set True if the cell also has to be optimized
gpos_tol (float): Convergence criterion for RMS of gradients towards atomic coordinates
dpos_tol (float): Convergence criterion for RMS of differences of atomic coordinates
grvecs_tol (float): Convergence criterion for RMS of gradients towards cell parameters
drvecs_tol (float): Convergence criterion for RMS of differences of cell parameters
max_iter (int): Maximum number of optimization steps
n_print (int): Print frequency
'''
if cell:
self.input['jobtype'] = 'opt_cell'
else:
self.input['jobtype'] = 'opt'
self.input['nsteps'] = max_iter
self.input['h5step'] = n_print
self.input['gpos_rms'] = gpos_tol
self.input['dpos_rms'] = dpos_tol
self.input['grvecs_rms'] = grvecs_tol
self.input['drvecs_rms'] = drvecs_tol
super(Yaff, self).calc_minimize(max_iter=max_iter, n_print=n_print)
[docs] def calc_static(self):
'''
Set up a static force field calculation.
'''
self.input['jobtype'] = 'sp'
super(Yaff, self).calc_static()
[docs] def calc_md(self, temperature=None, pressure=None, nsteps=1000, time_step=1.0*femtosecond, n_print=5,
timecon_thermo=100.0*femtosecond, timecon_baro=1000.0*femtosecond):
'''
Set an MD calculation within Yaff. Nosé Hoover chain is used by default.
**Arguments**
temperature (None/float): Target temperature. If set to None, an NVE calculation is performed.
It is required when the pressure is set
pressure (None/float): Target pressure. If set to None, an NVE or an NVT calculation is performed.
nsteps (int): Number of md steps
time_step (float): Step size between two steps.
n_print (int): Print frequency
timecon_thermo (float): The time associated with the thermostat adjusting the temperature.
timecon_baro (float): The time associated with the barostat adjusting the temperature.
'''
self.input['temp'] = temperature
self.input['press'] = pressure
self.input['nsteps'] = nsteps
self.input['timestep'] = time_step
self.input['h5step'] = n_print
self.input['timecon_thermo'] = timecon_thermo
self.input['timecon_baro'] = timecon_baro
if temperature is None:
self.input['jobtype'] = 'nve'
else:
if pressure is None:
self.input['jobtype'] = 'nvt'
else:
self.input['jobtype'] = 'npt'
super(Yaff, self).calc_md(temperature=temperature, pressure=pressure, n_ionic_steps=nsteps,
time_step=time_step, n_print=n_print,
temperature_damping_timescale=timecon_thermo,
pressure_damping_timescale=timecon_baro)
[docs] def load_chk(self, fn):
'''
Load the atom types, atom type ids and structure by reading a .chk file.
**Arguments**
fn the path to the chk file
'''
system = System.from_file(fn)
system.set_standard_masses()
if len(system.pos.shape)!=2:
raise IOError("Something went wrong, positions in CHK file %s should have Nx3 dimensions" %fn)
if system.cell.rvecs is not None and len(system.cell.rvecs)>0:
self.structure = Atoms(
positions=system.pos.copy()/angstrom,
numbers=system.numbers,
masses=system.masses,
cell=system.cell.rvecs/angstrom,
)
else:
self.structure = Atoms(
positions=system.pos.copy()/angstrom,
numbers=system.numbers,
masses=system.masses,
)
if system.ffatypes is not None:
self.ffatypes = system.ffatypes
if system.ffatype_ids is not None:
self.ffatype_ids = system.ffatype_ids
[docs] def set_mtd(self, ics, height, sigma, pace, fn='HILLS', fn_colvar='COLVAR', stride=10, temp=300):
'''
Setup a Metadynamics run using PLUMED along the internal coordinates
defined in the ICs argument.
**Arguments**
ics a list of entries defining each internal coordinate. Each
of these entries should be of the form (kind, [i, j, ...])
Herein, kind defines the kind of IC as implemented in PLUMED:
i.e. distance, angle, torsion, volume, cell, ... see
https://www.plumed.org/doc-v2.5/user-doc/html/_colvar.html
for more information).
and [i, j, ...] is a list of atom indices, starting from 0, involved in this
IC. If no atom indices are required for e.g. volume, provide an empty list.
An example for a 1D metadynamica using the distance between
atoms 2 and 4:
ics = [('distance', [2,4])]
height the height of the Gaussian hills, can be a single value
(the gaussian hills for each IC have identical height) or
a list of values, one for each IC defined.
sigmas the sigma of the Gaussian hills, can be a single value
(the gaussian hills for each IC have identical height) or
a list of values, one for each IC defined.
pace the number of steps after which the gaussian hills are
updated.
fn the PLUMED output file for the gaussian hills
fn_colvar
the PLUMED output file for logging of collective variables
stride the number of steps after which the internal coordinate
values and bias are printed to the COLVAR output file.
temp the system temperature
'''
for l in ics:
assert len(l)==2
assert isinstance(l[0], str)
assert isinstance(l[1], list) or isinstance(l[1], tuple)
ickinds = np.array([ic[0] for ic in ics],dtype='S22')
icindices = np.array([np.array(ic[1])+1 for ic in ics]) # plumed starts counting from 1
if not isinstance(height,list) and not isinstance(height,np.ndarray):
height = np.array([height])
if not isinstance(sigma,list) and not isinstance(sigma,np.ndarray):
sigma = np.array([sigma])
self.enhanced= {
'ickinds': ickinds, 'icindices': icindices, 'height': height, 'sigma': sigma, 'pace': pace,
'file': fn, 'file_colvar': fn_colvar, 'stride': stride, 'temp': temp
}
[docs] def set_us(self, ics, kappa, loc, fn_colvar='COLVAR', stride=10, temp=300):
'''
Setup an Umbrella sampling run using PLUMED along the internal coordinates
defined in the ICs argument.
**Arguments**
ics a list of entries defining each an internal coordinate. Each
of these entries should be of the form (kind, [i, j, ...])
Herein, kind defines the kind of IC as implemented in PLUMED:
i.e. distance, angle, torsion, volume, cell, ... see
https://www.plumed.org/doc-v2.5/user-doc/html/_colvar.html
for more information).
and [i, j, ...] is a list of atom indices, starting from 0, involved in this
IC. If no atom indices are required for e.g. volume, provide an empty list.
An example for a 1D metadynamica using the distance between
atoms 2 and 4:
ics = [('distance', [2,4])]
kappa the value of the force constant of the harmonic bias potential,
can be a single value (the harmonic bias potential for each IC has identical kappa)
or a list of values, one for each IC defined.
loc the location of the umbrella
(should have a length equal to the number of ICs)
fn_colvar
the PLUMED output file for logging of collective variables
stride the number of steps after which the internal coordinate
values and bias are printed to the COLVAR output file.
temp the system temperature
'''
for l in ics:
assert len(l)==2
assert isinstance(l[0], str)
assert isinstance(l[1], list) or isinstance(l[1], tuple)
ickinds = np.array([ic[0] for ic in ics],dtype='S22')
icindices = np.array([np.array(ic[1])+1 for ic in ics]) # plumed starts counting from 1
if not isinstance(kappa,list) and not isinstance(kappa,np.ndarray):
kappa = np.array([kappa])
if not isinstance(loc,list) and not isinstance(loc,np.ndarray):
loc = np.array([loc])
assert len(loc)==len(ics)
self.enhanced= {
'ickinds': ickinds, 'icindices': icindices, 'kappa': kappa, 'loc': loc,
'file_colvar': fn_colvar, 'stride': stride, 'temp': temp
}
[docs] def detect_ffatypes(self, ffatypes=None, ffatype_rules=None, ffatype_level=None):
'''
Define atom types by explicitely giving them through the
ffatypes keyword, defining atype rules using the ATSELECT
language implemented in Yaff (see the Yaff documentation at
http://molmod.github.io/yaff/ug_atselect.html) or by specifying
the ffatype_level employing the built-in routine in QuickFF.
'''
numbers = np.array([pt[symbol].number for symbol in self.structure.get_chemical_symbols()])
if self.structure.cell is None:
system = System(numbers, self.structure.positions.copy()*angstrom)
else:
system = System(numbers, self.structure.positions.copy()*angstrom, rvecs=self.structure.cell*angstrom)
system.detect_bonds()
if not sum([ffatypes is None, ffatype_rules is None, ffatype_level is None]) == 2:
raise IOError('Exactly one of ffatypes, ffatype_rules and ffatype_level should be defined')
if ffatypes is not None:
system.ffatypes = ffatypes
system.ffatype_ids = None
system._init_derived_ffatypes()
if ffatype_rules is not None:
system.detect_ffatypes(ffatype_rules)
if ffatype_level is not None:
set_ffatypes(system, ffatype_level)
self.ffatypes = system.ffatypes.copy()
self.ffatype_ids = system.ffatype_ids.copy()
[docs] def collect_output(self):
output_dict = collect_output(output_file=posixpath.join(self.working_directory, 'output.h5'))
with self.project_hdf5.open("output") as hdf5_output:
for k, v in output_dict.items():
hdf5_output[k] = v
[docs] def to_hdf(self, hdf=None, group_name=None):
super(Yaff, self).to_hdf(hdf=hdf, group_name=group_name)
with self.project_hdf5.open("input") as hdf5_input:
self.structure.to_hdf(hdf5_input)
self.input.to_hdf(hdf5_input)
hdf5_input['generic/ffatypes'] = np.asarray(self.ffatypes,'S22')
hdf5_input['generic/ffatype_ids'] = self.ffatype_ids
if not self.enhanced is None:
grp = hdf5_input.create_group('generic/enhanced')
for k,v in self.enhanced.items():
grp[k] = v
[docs] def from_hdf(self, hdf=None, group_name=None):
super(Yaff, self).from_hdf(hdf=hdf, group_name=group_name)
with self.project_hdf5.open("input") as hdf5_input:
self.input.from_hdf(hdf5_input)
self.structure = Atoms().from_hdf(hdf5_input)
self.ffatypes = np.char.decode(hdf5_input['generic/ffatypes']) # decode byte string literals
self.ffatype_ids = hdf5_input['generic/ffatype_ids']
if "enhanced" in hdf5_input['generic'].keys():
self.enhanced = {}
for key,val in hdf5_input['generic/enhanced'].items():
if key=='ickinds':
self.enhanced[key] = np.char.decode(val)
else:
self.enhanced[key] = val
[docs] def get_structure(self, iteration_step=-1, wrap_atoms=True):
"""
Overwrite the get_structure routine from AtomisticGenericJob because we want to avoid
defining a unit cell when one does not exist
"""
if not (self.structure is not None):
raise AssertionError()
positions = self.get("output/generic/positions")
cells = self.get("output/generic/cells")
snapshot = self.structure.copy()
snapshot.positions = positions[iteration_step]
if cells is not None:
snapshot.cell = cells[iteration_step]
indices = self.get("output/generic/indices")
if indices is not None:
snapshot.indices = indices[iteration_step]
if wrap_atoms and cells is not None:
return snapshot.center()
else:
return snapshot
# Plot functions are deprecated while yaff is no longer in atomic units!
[docs] def plot(self, ykey, xkey='generic/steps', xunit='au', yunit='au', ref=None, linestyle='-', rolling_average=False):
xs = self['output/%s' %xkey]/parse_unit(xunit)
ys = self['output/%s' %ykey]/parse_unit(yunit)
if rolling_average:
ra = np.zeros(len(ys))
for i, y in enumerate(ys):
if i==0:
ra[i] = ys[0]
else:
ra[i] = (i*ra[i-1]+ys[i])/(i+1)
ys = ra.copy()
self._ref(ys,ref)
pp.clf()
pp.plot(xs, ys, linestyle)
pp.xlabel('%s [%s]' %(xkey, xunit))
pp.ylabel('%s [%s]' %(ykey, yunit))
pp.show()
# Plot functions are deprecated while yaff is no longer in atomic units!
[docs] def plot_multi(self, ykeys, xkey='generic/steps', xunit='au', yunit='au', ref=None, linestyle='-', rolling_average=False):
# Assume that all ykeys have the same length than the xkey
xs = self['output/%s' %xkey]/parse_unit(xunit)
yss = np.array([self['output/%s' %ykey]/parse_unit(yunit) for ykey in ykeys])
if rolling_average:
for ys in yss:
ra = np.zeros(len(ys))
for i, y in enumerate(ys):
if i==0:
ra[i] = ys[0]
else:
ra[i] = (i*ra[i-1]+ys[i])/(i+1)
ys = ra.copy()
if not isinstance(ref,list):
for ys in yss:
self._ref(ys,ref)
else:
assert len(ref)==len(yss)
for n in range(len(ref)):
_ref(yss[n],ref[n])
pp.clf()
for n,ys in enumerate(yss):
pp.plot(xs, ys, linestyle, label=ykeys[n])
pp.xlabel('%s [%s]' %(xkey, xunit))
pp.ylabel('[%s]' %(yunit))
pp.legend()
pp.show()
@staticmethod
def _ref(ys,ref):
if isinstance(ref, int):
ys -= ys[ref]
elif isinstance(ref, float):
ys -= ref
elif isinstance(ref,str):
if ref=='min':
ys -= min(ys)
elif ref=='max':
ys -= max(ys)
elif ref=='mean':
ys -= np.mean(ys)
[docs] def log(self):
with open(posixpath.join(self.working_directory, 'yaff.log')) as f:
print(f.read())
[docs] def get_yaff_system(self, snapshot=0):
numbers = np.array([pt[symbol].number for symbol in self.structure.get_chemical_symbols()])
if snapshot==0:
struct = self.structure
else:
struct = self.get_structure(iteration_step=snapshot, wrap_atoms=False)
pos = struct.positions.reshape(-1,3)*angstrom
cell = struct.cell
if cell is None:
system = System(numbers, pos, ffatypes=self.ffatypes, ffatype_ids=self.ffatype_ids)
else:
system = System(numbers, pos, rvecs=cell*angstrom, ffatypes=self.ffatypes, ffatype_ids=self.ffatype_ids)
system.detect_bonds()
system.set_standard_masses()
return system
[docs] def get_yaff_ff(self, system=None):
if system is None:
system = self.get_yaff_system()
fn_pars = posixpath.join(self.working_directory, 'pars.txt')
if not os.path.isfile(fn_pars):
raise IOError('No pars.txt file find in job working directory. Have you already run the job?')
ff = ForceField.generate(
system, fn_pars, rcut=self.input['rcut'], alpha_scale=self.input['alpha_scale'],
gcut_scale=self.input['gcut_scale'], smooth_ei=self.input['smooth_ei']
)
return ff
[docs] def mtd_sum_hills_1d(self,fn=None):
'''
Creates a fes.dat file for plotting the free energy surface after a mtd simulation.
**Arguments**
fn path to the hills file or hills files (comma separated)
'''
if fn is None:
fn = posixpath.join(self.working_directory, self.enhanced['file'])
fn_out = posixpath.join(self.working_directory, 'fes.dat')
subprocess.check_output(
"ml load PLUMED/2.5.2-intel-2019a-Python-3.7.2; plumed sum_hills --hills {} --outfile {}".format(fn,fn_out),
stderr=subprocess.STDOUT,
universal_newlines=True,
shell=True
)