from pynta.tasks import *
from pynta.mol import get_adsorbate, generate_unique_site_additions, generate_adsorbate_guesses, get_name
from molecule.molecule import Molecule
import ase.build
from ase.io import read, write
from ase import Atoms, Atom
from acat.adsorption_sites import SlabAdsorptionSites
from acat.adsorbate_coverage import SlabAdsorbateCoverage
from acat.settings import site_heights, adsorbate_molecule
import os
import time
import yaml
from copy import deepcopy
import numpy as np
from pynta.calculator import get_lattice_parameter
from fireworks import LaunchPad, Workflow
from fireworks.queue.queue_launcher import rapidfire as rapidfirequeue
from fireworks.features.multi_launcher import launch_multiprocess
from fireworks.utilities.fw_serializers import load_object_from_file
from fireworks.core.rocket_launcher import rapidfire
from fireworks.core.fworker import FWorker
import fireworks.fw_config
import logging
[docs]class Pynta:
def __init__(self,path,rxns_file,surface_type,metal,label,launchpad_path=None,fworker_path=None,
vacuum=8.0,repeats=[(1,1,1),(3,3,4)],slab_path=None,software="Espresso",socket=False,queue=False,njobs_queue=0,a=None,
software_kwargs={'kpts': (3, 3, 1), 'tprnfor': True, 'occupations': 'smearing',
'smearing': 'marzari-vanderbilt',
'degauss': 0.01, 'ecutwfc': 40, 'nosym': True,
'conv_thr': 1e-6, 'mixing_mode': 'local-TF',
"pseudopotentials": {"Cu": 'Cu.pbe-spn-kjpaw_psl.1.0.0.UPF',"H": 'H.pbe-kjpaw_psl.1.0.0.UPF',"O": 'O.pbe-n-kjpaw_psl.1.0.0.UPF',"C": 'C.pbe-n-kjpaw_psl.1.0.0.UPF',"N": 'N.pbe-n-kjpaw_psl.1.0.0.UPF',
}, },
software_kwargs_gas=None,
TS_opt_software_kwargs=None,
lattice_opt_software_kwargs={'kpts': (25,25,25), 'ecutwfc': 70, 'degauss':0.02, 'mixing_mode': 'plain'},
reset_launchpad=False,queue_adapter_path=None,num_jobs=25,max_num_hfsp_opts=None,#max_num_hfsp_opts is mostly for fast testing
Eharmtol=3.0,Eharmfiltertol=30.0,Ntsmin=5):
self.surface_type = surface_type
if launchpad_path:
launchpad = LaunchPad.from_file(launchpad_path)
else:
launchpad = LaunchPad()
if reset_launchpad:
launchpad.reset('', require_password=False)
self.launchpad = launchpad
self.slab_path = slab_path
self.vacuum = vacuum
self.a = a
self.software = software
self.socket = socket
self.repeats = repeats
self.path = os.getcwd() if path is None else path
self.facet = metal + surface_type
self.fws = []
self.metal = metal
self.adsorbate_fw_dict = dict()
self.software_kwargs = software_kwargs
if software_kwargs_gas:
self.software_kwargs_gas = software_kwargs_gas
else:
self.software_kwargs_gas = deepcopy(software_kwargs)
self.software_kwargs_gas["kpts"] = 'gamma'
self.software_kwargs_gas["smearing"] = 'gauss'
self.software_kwargs_gas["degauss"] = 0.005
self.software_kwargs_gas["mixing_beta"] = 0.2
self.software_kwargs_gas["mixing_ndim"] = 10
self.software_kwargs_TS = deepcopy(software_kwargs)
if TS_opt_software_kwargs:
for key,val in TS_opt_software_kwargs.items():
self.software_kwargs_TS[key] = val
self.lattice_opt_software_kwargs = deepcopy(software_kwargs)
if lattice_opt_software_kwargs:
for key,val in lattice_opt_software_kwargs.items():
self.lattice_opt_software_kwargs[key] = val
self.queue = queue
self.fworker = None
self.qadapter = None
if fworker_path:
self.fworker = FWorker.from_file(fworker_path)
else:
self.fworker = FWorker()
self.rxns_file = rxns_file
with open(self.rxns_file,'r') as f:
self.rxns_dict = yaml.safe_load(f)
self.slab = read(self.slab_path) if self.slab_path else None
self.njobs_queue = njobs_queue
self.num_jobs = num_jobs
self.label = label
if queue:
self.qadapter = load_object_from_file(queue_adapter_path)
if self.slab_path is None:
self.nslab = int(np.prod(np.array(self.repeats[0])*np.array(self.repeats[1])))
else:
self.nslab = len(read(self.slab_path))
self.mol_dict = None
self.Eharmtol = Eharmtol
self.Eharmfiltertol = Eharmfiltertol
self.Ntsmin = Ntsmin
self.max_num_hfsp_opts = max_num_hfsp_opts
[docs] def generate_slab(self):
"""
generates and optimizes a small scale slab that can be scaled to a large slab as needed
optimization occurs through fireworks and this process waits until the optimization is completed
"""
slab_type = getattr(ase.build,self.surface_type)
#optimize the lattice constant
if self.a is None:
a = get_lattice_parameter(self.metal,self.surface_type,self.software,self.lattice_opt_software_kwargs)
print("computed lattice constant of: {} Angstroms".format(a))
self.a = a
else:
a = self.a
#construct slab with optimial lattice constant
slab = slab_type(self.metal,self.repeats[1],a,self.vacuum)
slab.pbc = (True, True, False)
write(os.path.join(self.path,"slab_init.xyz"),slab)
self.slab_path = os.path.join(self.path,"slab.xyz")
if self.software != "XTB":
fwslab = optimize_firework(os.path.join(self.path,"slab_init.xyz"),self.software,"slab",
opt_method="BFGSLineSearch",socket=self.socket,software_kwargs=self.software_kwargs,
run_kwargs={"fmax" : 0.01},out_path=os.path.join(self.path,"slab.xyz"),constraints=["freeze half slab"])
wfslab = Workflow([fwslab], name=self.label+"_slab")
self.launchpad.add_wf(wfslab)
self.rapidfire()
while not os.path.exists(self.slab_path): #wait until slab optimizes, this is required anyway and makes the rest of the code simpler
time.sleep(1)
self.slab = read(self.slab_path)
else: #testing
self.slab = slab
write(self.slab_path,slab)
[docs] def analyze_slab(self):
full_slab = self.slab * self.repeats[0]
cas = SlabAdsorptionSites(full_slab, self.surface_type,allow_6fold=False,composition_effect=False,
label_sites=True,
surrogate_metal=self.metal)
self.cas = cas
single_geoms,single_site_bond_params_lists,single_sites_lists = generate_unique_site_additions(full_slab,cas,len(full_slab),site_bond_params_list=[],sites_list=[])
double_geoms_full = []
double_site_bond_params_lists_full = []
double_sites_lists_full = []
for i in range(len(single_geoms)):
double_geoms,double_site_bond_params_lists,double_sites_lists = generate_unique_site_additions(single_geoms[i],
cas,len(full_slab),single_site_bond_params_lists[i],single_sites_lists[i])
double_geoms_full.extend(double_geoms)
double_site_bond_params_lists_full.extend(double_site_bond_params_lists)
double_sites_lists_full.extend(double_sites_lists)
self.single_site_bond_params_lists = single_site_bond_params_lists
self.single_sites_lists = single_sites_lists
self.double_site_bond_params_lists = double_site_bond_params_lists_full
self.double_sites_lists = double_sites_lists_full
[docs] def generate_mol_dict(self):
"""
generates all unique Molecule objects based on the reactions and generates a dictionary
mapping smiles to each unique Molecule object
also updates self.rxns_dict with addtional useful information for each reaction
"""
mols = []
for r in self.rxns_dict:
r["reactant_mols"] = []
r["product_mols"] = []
react = Molecule().from_adjacency_list(r["reactant"])
prod = Molecule().from_adjacency_list(r["product"])
react.clear_labeled_atoms()
prod.clear_labeled_atoms()
for mol in react.split():
mol.multiplicity = mol.get_radical_count() + 1
if not mol.is_surface_site():
mols.append(mol)
r["reactant_mols"].append(mol)
for mol in prod.split():
mol.multiplicity = mol.get_radical_count() + 1
if not mol.is_surface_site():
mols.append(mol)
r["product_mols"].append(mol)
unique_mols = []
for mol in mols:
for m in unique_mols:
if mol.is_isomorphic(m):
break
else:
unique_mols.append(mol)
for mol in unique_mols:
mol.multiplicity = mol.get_radical_count() + 1
mol_dict = {get_name(mol):mol for mol in unique_mols}
self.mol_dict = mol_dict
self.name_to_adjlist_dict = {sm:mol.to_adjacency_list() for sm,mol in mol_dict.items()}
for r in self.rxns_dict:
r["reactant_names"] = []
r["product_names"] = []
for i,rmol in enumerate(r["reactant_mols"]):
for sm,mol in mol_dict.items():
if mol is rmol or mol.is_isomorphic(rmol,save_order=True):
r["reactant_mols"][i] = mol
r["reactant_names"].append(sm)
break
else:
print("rmol")
print(rmol.to_adjacency_list())
raise ValueError
for i,rmol in enumerate(r["product_mols"]):
for sm,mol in mol_dict.items():
if mol is rmol or mol.is_isomorphic(rmol,save_order=True):
r["product_mols"][i] = mol
r["product_names"].append(sm)
break
else:
print("rmol")
print(rmol.to_adjacency_list())
raise ValueError
r["reactant_mols"] = [x.to_adjacency_list() for x in r["reactant_mols"]]
r["product_mols"] = [x.to_adjacency_list() for x in r["product_mols"]]
[docs] def generate_initial_adsorbate_guesses(self,skip_structs=False):
"""
Generates initial guess geometries for adsorbates and gas phase species
Generates maps connecting the molecule objects with these adsorbates
"""
cas = self.cas
structures = dict()
gratom_to_molecule_atom_maps = dict()
gratom_to_molecule_surface_atom_maps = dict()
for sm,mol in self.mol_dict.items():
print(sm)
surf_sites = mol.get_surface_sites()
ads,mol_to_atoms_map = get_adsorbate(mol)
if len(surf_sites) == 0:
if not skip_structs:
ads.pbc = (True,True,False)
ads.center(vacuum=10)
structures[sm] = [ads]
gratom_to_molecule_atom_maps[sm] = {val:key for key,val in mol_to_atoms_map.items()}
gratom_to_molecule_surface_atom_maps[sm] = dict()
else:
if not skip_structs:
#check if this adsorbate is already calculated
if os.path.exists(os.path.join(self.path,"Adsorbates",sm)): #assume initial guesses already generated
structures[sm] = None
else:
structs = generate_adsorbate_guesses(mol,ads,self.slab,self.repeats[0],cas,mol_to_atoms_map,self.metal,
self.single_site_bond_params_lists,self.single_sites_lists,
self.double_site_bond_params_lists,self.double_sites_lists,
self.Eharmtol,self.Eharmfiltertol,self.Ntsmin)
structures[sm] = structs
gratom_to_molecule_atom_maps[sm] = {val:key for key,val in mol_to_atoms_map.items()}
adatoms = []
surf_index_atom_map = dict()
for i,atm in enumerate(mol.atoms):
if atm.is_bonded_to_surface():
surf_index_atom_map[mol_to_atoms_map[i]] = i
gratom_to_molecule_surface_atom_maps[sm] = surf_index_atom_map
self.gratom_to_molecule_atom_maps = gratom_to_molecule_atom_maps
self.gratom_to_molecule_surface_atom_maps = gratom_to_molecule_surface_atom_maps
if not skip_structs:
self.adsorbate_structures = structures
[docs] def setup_adsorbates(self,initial_guess_finished=False):
"""
Attaches each adsorbate structure to the slab and sets up fireworks to
first optimize each possible geometry and then run vibrational calculations on each unique final geometry
"""
if not initial_guess_finished:
adsorbate_dict = dict()
for sp_symbol, adsorbate in self.adsorbate_structures.items():
adsorbate_dict[sp_symbol] = dict()
if adsorbate is not None:
for prefix, structure in enumerate(adsorbate):
adsorbate_dict[sp_symbol][prefix] = structure
else: #read from Adsorbates directory
prefixes = os.listdir(os.path.join(self.path,"Adsorbates",sp_symbol))
for prefix in prefixes:
if prefix == "info.json":
continue
adsorbate_dict[sp_symbol][prefix] = read(os.path.join(self.path,"Adsorbates",sp_symbol,str(prefix),str(prefix)+"_init.xyz"))
big_slab = self.slab * self.repeats[0]
nsmall_slab = len(self.slab)
for adsname,adsorbate in adsorbate_dict.items():
xyzs = []
optfws = []
optfws2 = []
mol = self.mol_dict[adsname]
#check if this adsorbate is already calculated
exists = False
for prefix,structure in adsorbate.items():
if os.path.exists(os.path.join(self.path,"Adsorbates",adsname,str(prefix),str(prefix)+".xyz")):
exists = True
if exists: #if this species already has a completed opt job in any prefix driectory skip it
self.adsorbate_fw_dict[adsname] = []
continue
for prefix,structure in adsorbate.items():
if len(mol.get_surface_sites()) > 0:
big_slab_ads = structure
software_kwargs = deepcopy(self.software_kwargs)
target_site_num = len(mol.get_surface_sites())
if self.software != "XTB":
constraints = ["freeze half slab"]
else:
constraints = ["freeze all "+self.metal]
else: #gas phase
big_slab_ads = structure
target_site_num = None #no slab so can't run site analysis
software_kwargs = deepcopy(self.software_kwargs_gas)
constraints = []
if len(big_slab_ads) == 1 and self.software == "Espresso": #monoatomic species
software_kwargs["command"] = software_kwargs["command"].replace("< PREFIX.pwi > PREFIX.pwo","-ndiag 1 < PREFIX.pwi > PREFIX.pwo")
try:
os.makedirs(os.path.join(self.path,"Adsorbates",adsname,str(prefix)))
except:
pass
write(os.path.join(self.path,"Adsorbates",adsname,str(prefix),str(prefix)+"_init.xyz"),big_slab_ads)
sp_dict = {"name":adsname, "adjlist":mol.to_adjacency_list(),"atom_to_molecule_atom_map": self.gratom_to_molecule_atom_maps[adsname],
"gratom_to_molecule_surface_atom_map": self.gratom_to_molecule_surface_atom_maps[adsname], "nslab": self.nslab}
with open(os.path.join(self.path,"Adsorbates",adsname,"info.json"),'w') as f:
json.dump(sp_dict,f)
xyz = os.path.join(self.path,"Adsorbates",adsname,str(prefix),str(prefix)+".xyz")
xyzs.append(xyz)
fwopt = optimize_firework(os.path.join(self.path,"Adsorbates",adsname,str(prefix),str(prefix)+"_init.xyz"),
self.software,"weakopt_"+str(prefix),
opt_method="MDMin",opt_kwargs={'dt': 0.05},socket=self.socket,software_kwargs=software_kwargs,
run_kwargs={"fmax" : 0.5, "steps" : 70},parents=[],constraints=constraints,
ignore_errors=True, metal=self.metal, facet=self.surface_type, target_site_num=target_site_num, priority=3)
fwopt2 = optimize_firework(os.path.join(self.path,"Adsorbates",adsname,str(prefix),"weakopt_"+str(prefix)+".xyz"),
self.software,str(prefix),
opt_method="QuasiNewton",socket=self.socket,software_kwargs=software_kwargs,
run_kwargs={"fmax" : 0.02, "steps" : 70},parents=[fwopt],constraints=constraints,
ignore_errors=True, metal=self.metal, facet=self.surface_type, target_site_num=target_site_num, priority=3)
optfws.append(fwopt)
optfws.append(fwopt2)
optfws2.append(fwopt2)
vib_obj_dict = {"software": self.software, "label": adsname, "software_kwargs": software_kwargs,
"constraints": ["freeze all "+self.metal]}
cfw = collect_firework(xyzs,True,["vibrations_firework"],[vib_obj_dict],["vib.json"],[],parents=optfws2,label=adsname)
self.adsorbate_fw_dict[adsname] = optfws2
logging.error(self.adsorbate_fw_dict.keys())
self.fws.extend(optfws+[cfw])
else:
ads_path = os.path.join(self.path,"Adsorbates")
for ad in os.listdir(ads_path):
xyzs = []
optfws = []
optfws2 = []
if ad in self.mol_dict.keys():
mol = self.mol_dict[ad]
else:
continue #the species is not in the target reactions so skip it
target_site_num = len(mol.get_surface_sites())
ad_path = os.path.join(ads_path,ad)
completed = False
for prefix in os.listdir(ad_path):
if os.path.exists(os.path.join(ad_path,prefix,prefix+".xyz")):
completed = True
if completed:
self.adsorbate_fw_dict[ad] = []
continue
for prefix in os.listdir(ad_path):
if prefix.split(".")[-1] == "json":
continue
prefix_path = os.path.join(ad_path,prefix)
if target_site_num == 0:
software_kwargs = deepcopy(self.software_kwargs_gas)
constraints = []
if len(mol.atoms) == 1 and self.software == "Espresso": #monoatomic species
software_kwargs["command"] = software_kwargs["command"].replace("< PREFIX.pwi > PREFIX.pwo","-ndiag 1 < PREFIX.pwi > PREFIX.pwo")
else:
software_kwargs = deepcopy(self.software_kwargs)
if self.software != "XTB":
constraints = ["freeze half slab"]
else:
constraints = ["freeze all "+self.metal]
xyz = os.path.join(prefix_path,str(prefix)+".xyz")
init_path = os.path.join(prefix_path,prefix+"_init.xyz")
assert os.path.exists(init_path), init_path
xyzs.append(xyz)
fwopt = optimize_firework(init_path,
self.software,"weakopt_"+str(prefix),
opt_method="MDMin",opt_kwargs={'dt': 0.05},socket=self.socket,software_kwargs=software_kwargs,
run_kwargs={"fmax" : 0.5, "steps" : 70},parents=[],constraints=constraints,
ignore_errors=True, metal=self.metal, facet=self.surface_type, target_site_num=target_site_num, priority=3)
fwopt2 = optimize_firework(os.path.join(self.path,"Adsorbates",ad,str(prefix),"weakopt_"+str(prefix)+".xyz"),
self.software,str(prefix),
opt_method="QuasiNewton",socket=self.socket,software_kwargs=software_kwargs,
run_kwargs={"fmax" : 0.02, "steps" : 70},parents=[fwopt],constraints=constraints,
ignore_errors=True, metal=self.metal, facet=self.surface_type, target_site_num=target_site_num, priority=3)
optfws.append(fwopt)
optfws.append(fwopt2)
optfws2.append(fwopt2)
vib_obj_dict = {"software": self.software, "label": ad, "software_kwargs": software_kwargs,
"constraints": ["freeze all "+self.metal]}
cfw = collect_firework(xyzs,True,["vibrations_firework"],[vib_obj_dict],["vib.json"],[True,False],parents=optfws2,label=ad)
self.adsorbate_fw_dict[ad] = optfws2
logging.error(self.adsorbate_fw_dict.keys())
self.fws.extend(optfws+[cfw])
[docs] def setup_transition_states(self,adsorbates_finished=False):
"""
Sets up fireworks to generate and filter a set of TS estimates, optimize each unique TS estimate,
and run vibrational and IRC calculations on the each unique final transition state
Note the vibrational and IRC calculations are launched at the same time
"""
if self.software != "XTB":
opt_obj_dict = {"software":self.software,"label":"prefix","socket":self.socket,"software_kwargs":self.software_kwargs_TS,
"run_kwargs": {"fmax" : 0.02, "steps" : 70},"constraints": ["freeze half slab"],"sella":True,"order":1,}
else:
opt_obj_dict = {"software":self.software,"label":"prefix","socket":self.socket,"software_kwargs":self.software_kwargs_TS,
"run_kwargs": {"fmax" : 0.02, "steps" : 70},"constraints": ["freeze all "+self.metal],"sella":True,"order":1,}
vib_obj_dict = {"software":self.software,"label":"prefix","socket":self.socket,"software_kwargs":self.software_kwargs,
"constraints": ["freeze all "+self.metal]}
IRC_obj_dict = {"software":self.software,"label":"prefix","socket":self.socket,"software_kwargs":self.software_kwargs,
"run_kwargs": {"fmax" : 0.1, "steps" : 70},"constraints":["freeze all "+self.metal]}
for i,rxn in enumerate(self.rxns_dict):
ts_path = os.path.join(self.path,"TS"+str(i))
os.makedirs(ts_path)
ts_task = MolecularTSEstimate({"rxn": rxn,"ts_path": ts_path,"slab_path": self.slab_path,"adsorbates_path": os.path.join(self.path,"Adsorbates"),
"rxns_file": self.rxns_file,"repeats": self.repeats[0],"path": self.path,"metal": self.metal,"facet": self.surface_type, "out_path": ts_path,
"spawn_jobs": True, "opt_obj_dict": opt_obj_dict, "vib_obj_dict": vib_obj_dict,
"IRC_obj_dict": IRC_obj_dict, "nprocs": 48, "name_to_adjlist_dict": self.name_to_adjlist_dict,
"gratom_to_molecule_atom_maps":{sm: {str(k):v for k,v in d.items()} for sm,d in self.gratom_to_molecule_atom_maps.items()},
"gratom_to_molecule_surface_atom_maps":{sm: {str(k):v for k,v in d.items()} for sm,d in self.gratom_to_molecule_surface_atom_maps.items()},
"nslab":self.nslab,"Eharmtol":self.Eharmtol,"Eharmfiltertol":self.Eharmfiltertol,"Ntsmin":self.Ntsmin,
"max_num_hfsp_opts":self.max_num_hfsp_opts})
reactants = rxn["reactant_names"]
products = rxn["product_names"]
parents = []
if not adsorbates_finished:
for m in reactants+products:
parents.extend(self.adsorbate_fw_dict[m])
fw = Firework([ts_task],parents=parents,name="TS"+str(i)+"est",spec={"_priority": 10})
self.fws.append(fw)
[docs] def launch(self):
"""
Call appropriate rapidfire function
"""
if self.queue:
rapidfirequeue(self.launchpad,self.fworker,self.qadapter,njobs_queue=self.njobs_queue,nlaunches="infinite")
elif not self.queue and self.num_jobs == 1:
rapidfire(self.launchpad,self.fworker,nlaunches="infinite")
else:
launch_multiprocess(self.launchpad,self.fworker,"INFO","infinite",self.num_jobs,5)
[docs] def execute(self,generate_initial_ad_guesses=True,calculate_adsorbates=True,
calculate_transition_states=True,launch=True):
"""
generate and launch a Pynta Fireworks Workflow
if generate_initial_ad_guesses is true generates initial guesses, otherwise assumes they are already there
if calculate_adsorbates is true generates firework jobs for adsorbates, otherwise assumes they are not needed
if calculate_transition_states is true generates fireworks jobs for transition states, otherwise assumes they are not needed
if launch is true launches the fireworks workflow in infinite mode...this generates a process that will continue to spawn jobs
if launch is false the Fireworks workflow is added to the launchpad, where it can be launched separately using fireworks commands
"""
if not calculate_adsorbates: #default handling
generate_initial_ad_guesses = False
if self.slab_path is None: #handle slab
self.generate_slab()
self.analyze_slab()
self.generate_mol_dict()
self.generate_initial_adsorbate_guesses(skip_structs=(not generate_initial_ad_guesses))
#adsorbate optimization
if calculate_adsorbates:
self.setup_adsorbates(initial_guess_finished=(not generate_initial_ad_guesses))
if calculate_transition_states:
#setup transition states
self.setup_transition_states(adsorbates_finished=(not calculate_adsorbates))
wf = Workflow(self.fws, name=self.label)
self.launchpad.add_wf(wf)
if launch:
self.launch()
[docs] def execute_from_initial_ad_guesses(self):
if self.slab_path is None: #handle slab
self.generate_slab()
self.analyze_slab()
self.generate_mol_dict()
self.generate_initial_adsorbate_guesses(skip_structs=True)
#adsorbate optimization
self.setup_adsorbates(initial_guess_finished=True)
#setup transition states
self.setup_transition_states()
wf = Workflow(self.fws, name=self.label)
self.launchpad.add_wf(wf)
self.launch()