""" Amber25‑compliant 1‑trajectory MM/GBSA + MM/PBSA pipeline. USAGE run_amber_mmgbsa.py ZID /path/to/picked_complexes/ The directory must contain a file named '*_merged.pdb' that holds receptor + ligand coordinates (no waters/ions). Everything else is generated automatically inside /amber_/. """ from __future__ import annotations import os, sys, glob, subprocess, textwrap, shutil, datetime, multiprocessing as mp from pathlib import Path def say(msg: str) -> None: now = datetime.datetime.now().strftime("%F %T") print(f"[{now}] {msg}", flush=True) def first(pattern: str) -> Path: hits = sorted(glob.glob(pattern)) if not hits: raise FileNotFoundError(f"No match: {pattern}") return Path(hits[0]).resolve() def pick_engine() -> str: return "pmemd.cuda" if shutil.which("pmemd.cuda") else "sander" def shell(cmd: list[str], lbl: str | None = None) -> None: say(lbl or " ".join(cmd)) subprocess.check_call(cmd) def main(zid: str, complex_dir: str | Path) -> None: complex_dir = Path(complex_dir).resolve() # absolute path to the input PDB chosen by the array job merge = first(f"{complex_dir}/{zid}*_merged.pdb") # create work dir: picked_complexes/amber_ work = complex_dir / f"amber_{zid}" work.mkdir(parents=True, exist_ok=True) os.chdir(work) md_engine = pick_engine() say(f"Using engine: {md_engine}") say(f"Input merged PDB: {merge}") #1. Split merged PDB into receptor / ligand if not Path("receptor.pdb").exists(): std_res = { # canonical 20 "ALA","ARG","ASN","ASP","CYS","GLN","GLU","GLY", "HIS","ILE","LEU","LYS","MET","PHE","PRO","SER", "THR","TRP","TYR","VAL", # common prot / tautomer variants "HID","HIE","HIP","ASH","GLH","CYM","CYX","LYN" } ignore_caps_ions = { # peptide caps, terminal patches, mono‑atomic ions "ACE","NME","NMET","CPRO","CTER","NH2", "ZN","NA","K","CL","MG","CA" } # pass 1: count heavy atoms per residue heavy = {} with open(merge) as fh: for line in fh: if not line.startswith(("ATOM","HETATM")): continue res = line[17:20].strip() if res in std_res or res in ignore_caps_ions: continue if line[12:16].startswith(" H"): # skip hydrogens continue heavy[res] = heavy.get(res, 0) + 1 if not heavy: sys.exit("ERROR: no candidate ligand residue found. " "Consider setting env‑var LIG_RES manually.") # choose largest non‑standard residue ligand_res = max(heavy, key=heavy.get) say(f"Chosen ligand residue: {ligand_res} " f"({heavy[ligand_res]} heavy atoms)") if heavy[ligand_res] < 6: sys.exit("ERROR: detected ligand has < 6 heavy atoms – " "likely a cap/ion. Set LIG_RES manually.") with open("receptor.pdb", "w") as rec, open("ligand.pdb", "w") as lig: for line in open(merge): if not line.startswith(("ATOM","HETATM")): continue res = line[17:20].strip() (lig if res == ligand_res else rec).write(line) # 2. Ligand parameters (antechamber + parmchk2) def run_antechamber(charge: int) -> bool: say(f"Attempting antechamber with charge {charge:+d}") cmd = ["antechamber", "-i","ligand.pdb","-fi","pdb", "-o","ligand.mol2","-fo","mol2", "-c","bcc","-at","gaff2", "-nc", str(charge), "-s","2"] try: subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as err: say("antechamber crashed – will try another charge") return False bad = False txt = Path("sqm.out").read_text(errors="ignore") if "odd number of electrons" in txt.lower(): bad = True if "exiting due to errors" in txt.lower(): bad = True return not bad override = os.getenv("LIG_CHARGE") or (sys.argv[3] if len(sys.argv) == 4 else None) if override is not None: lig_charge = int(override) good = run_antechamber(lig_charge) if not good: sys.exit(f"ERROR: antechamber failed even with the user‑specified " f"charge {lig_charge:+d}. Open sqm.out for details.") else: # neutral → +1 → −1 → +2 → −2 ... up to ±3 lig_charge, good = 0, False for q in [0, 1, -1, 2, -2, 3, -3]: if run_antechamber(q): lig_charge, good = q, True break if not good: sys.exit("ERROR: could not find a ligand charge that satisfies SQM." " Inspect sqm.out manually.") say(f"✓ antechamber succeeded with ligand charge {lig_charge:+d}") # parmchk2 stays the same and uses the ligand.mol2 we just created subprocess.check_call(["parmchk2","-i","ligand.mol2", "-f","mol2","-o","ligand.frcmod"]) # 3. Build prmtops (LEaP) if not Path("com_solvated.top").exists(): tleap_in = """\ source leaprc.protein.ff14SB source leaprc.gaff2 source leaprc.water.tip3p loadamberparams ligand.frcmod addPdbResMap { { 0 "ACE" "NTER" } { 0 "CTER" "CTER" } { 0 "CPRO" "CYP" } { 0 "NMET" "MET" } } LIG = loadmol2 ligand.mol2 REC = loadpdb receptor.pdb complex = combine {REC LIG} set default PBRadii mbondi2 saveamberparm LIG lig.top lig.crd saveamberparm REC rec.top rec.crd saveamberparm complex com.top com.crd solvateOct complex TIP3PBOX 15.0 saveamberparm complex com_solvated.top com_solvated.crd quit """ Path("tleap.in").write_text(tleap_in) shell(["tleap","-f","tleap.in"], "tleap – building topologies") # 4. MD input files (min → heat → eq → prod) mdin = { "min.in": "&cntrl imin=1,maxcyc=5000,ntb=1,cut=10.0,ntpr=100 /\n", "heat.in": "&cntrl imin=0,irest=0,ntx=1,ntb=2,ntc=2,ntf=2," "tempi=0.0,temp0=300.0,nstlim=25000,dt=0.002," "ntt=3,gamma_ln=2.0,ntpr=500,ntwx=500,ntp=1," "pres0=1.0,taup=2.0,cut=10.0 /\n", "eq.in": "&cntrl imin=0,irest=1,ntx=5,ntb=2,ntc=2,ntf=2," "temp0=300.0,gamma_ln=2.0,nstlim=250000,dt=0.002," "ntt=3,ntpr=1000,ntwx=1000,ntp=1,pres0=1.0,taup=2.0," "cut=10.0 /\n", "prod.in": "&cntrl imin=0,irest=1,ntx=5,ntb=2,ntc=2,ntf=2," "temp0=300.0,gamma_ln=2.0,iwrap=1,ioutfm=1," "nstlim=10000000,dt=0.002,ntt=3,ntpr=5000,ntwx=5000," "ntp=1,pres0=1.0,taup=2.0,cut=10.0 /\n" } for fn, txt in mdin.items(): Path(fn).write_text(txt) def run_md(tag: str, inp: str, start: str, restrt: str, top: str = "com_solvated.top"): shell([md_engine,"-O","-i",inp,"-o",f"{tag}.out", "-p",top,"-c",start,"-r",restrt,"-x",f"{tag}.nc"], f"{md_engine} – {tag}") if not Path("prod.rst7").exists(): run_md("min" , "min.in" , "com_solvated.crd", "min.rst7") run_md("heat", "heat.in", "min.rst7" , "heat.rst7") run_md("eq" , "eq.in" , "heat.rst7" , "eq.rst7") run_md("prod", "prod.in", "eq.rst7" , "prod.rst7") # Strip solvent & auto‑image trajectory if not Path("stripped.nc").exists(): Path("strip.in").write_text("""\ parm com_solvated.top trajin prod.nc autoimage center :1-9999 origin image origin center strip :WAT,Na+,Cl- trajout stripped.nc netcdf go """) shell(["cpptraj","-i","strip.in"], "cpptraj – strip + autoimage") # 6. MM/PB‑GBSA input file if not Path("mmpbsa.in").exists(): Path("mmpbsa.in").write_text("""\ &general startframe=1,endframe=-1,interval=5,keep_files=0,/ &gb igb=5,saltcon=0.15,/ &pb istrng=0.15,fillratio=4.0,/ &decomp idecomp=2,csv_format=1,dec_verbose=3,/ """) # 7. Run MMPBSA.py in parallel cores = str(mp.cpu_count()) shell(["mpirun","-np",cores,"MMPBSA.py.MPI","-O", "-i","mmpbsa.in", "-o", f"{zid}_FINAL_RESULTS.dat", "-eo", f"{zid}_ENERGY_PER_FRAME.csv", "-deo", f"{zid}_DECOMP.csv", "-sp","com_solvated.top", "-cp","com.top","-rp","rec.top","-lp","lig.top", "-y","stripped.nc"], "MMPBSA.py – GB + PB") say("Workflow finished successfully.") say(f"Results: {zid}_FINAL_RESULTS.dat , *_ENERGY_PER_FRAME.csv , *_DECOMP.csv") if __name__ == "__main__": if len(sys.argv) != 3: sys.exit("USAGE: run_amber_mmgbsa.py ") main(sys.argv[1], sys.argv[2])