diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index e5ed45cbce..35ab100172 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -39,8 +39,7 @@ jobs: - uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - cache: pip - cache-dependency-path: pyproject.toml + - name: Install dependencies # ERROR: Cannot install atomate2 and atomate2[strict,tests]==0.0.1 because these package versions have conflicting dependencies. diff --git a/pyproject.toml b/pyproject.toml index 173f5fd514..fd35919e92 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,9 +50,10 @@ defects = [ ] forcefields = [ "ase>=3.22.1", + "torch<=2.2.1", # incompatibility with dgl if newer versions are used "chgnet>=0.2.2", "mace-torch>=0.3.3", - "matgl>=0.9.0", + "matgl>=1.0.0", "quippy-ase>=0.9.14", ] docs = [ @@ -74,6 +75,7 @@ strict = [ # must use >= for ase to not uninstall main branch install in CI # last known working commit: https://gitlab.com/ase/ase@2bab58f4e "ase>=3.22.1", + "torch==2.2.1", "cclib==1.8.1", "chgnet==0.3.5", "click==8.1.7", diff --git a/src/atomate2/common/jobs/utils.py b/src/atomate2/common/jobs/utils.py index 100db6e40f..c305a85ed2 100644 --- a/src/atomate2/common/jobs/utils.py +++ b/src/atomate2/common/jobs/utils.py @@ -40,7 +40,7 @@ def structure_to_conventional( structure: Structure, symprec: float = SETTINGS.SYMPREC ) -> Structure: """ - Job hat creates a standard conventional structure. + Job that creates a standard conventional structure. Parameters ---------- diff --git a/src/atomate2/common/schemas/phonons.py b/src/atomate2/common/schemas/phonons.py index 9c4b66af0d..ab954b5be0 100644 --- a/src/atomate2/common/schemas/phonons.py +++ b/src/atomate2/common/schemas/phonons.py @@ -128,7 +128,7 @@ class PhononJobDirs(BaseModel): None, description="Directory where optimization run was performed." ) taskdoc_run_job_dir: Optional[str] = Field( - None, description="Directory where taskdoc was generated." + None, description="Directory where task doc was generated." ) diff --git a/src/atomate2/forcefields/__init__.py b/src/atomate2/forcefields/__init__.py index 3e7ac5ac47..a76400755f 100644 --- a/src/atomate2/forcefields/__init__.py +++ b/src/atomate2/forcefields/__init__.py @@ -10,4 +10,5 @@ class MLFF(Enum): # TODO inherit from StrEnum when 3.11+ GAP = "GAP" M3GNet = "M3GNet" CHGNet = "CHGNet" + Forcefield = "Forcefield" # default placeholder option Nequip = "Nequip" diff --git a/src/atomate2/forcefields/jobs.py b/src/atomate2/forcefields/jobs.py index a92ba652de..a210f5908b 100644 --- a/src/atomate2/forcefields/jobs.py +++ b/src/atomate2/forcefields/jobs.py @@ -6,27 +6,71 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING +from ase.units import GPa as _GPa_to_eV_per_A3 from jobflow import Maker, job +from pymatgen.core.trajectory import Trajectory from atomate2.forcefields import MLFF from atomate2.forcefields.schemas import ForceFieldTaskDocument -from atomate2.forcefields.utils import Relaxer, revert_default_dtype +from atomate2.forcefields.utils import Relaxer, ase_calculator, revert_default_dtype if TYPE_CHECKING: - from collections.abc import Sequence from pathlib import Path + from typing import Callable + from ase.calculators.calculator import Calculator from pymatgen.core.structure import Structure logger = logging.getLogger(__name__) +_FORCEFIELD_DATA_OBJECTS = [Trajectory] + + +def forcefield_job(method: Callable) -> job: + """ + Decorate the ``make`` method of forcefield job makers. + + This is a thin wrapper around :obj:`~jobflow.core.job.Job` that configures common + settings for all forcefield jobs. For example, it ensures that large data objects + (currently only trajectories) are all stored in the atomate2 data store. + It also configures the output schema to be a ForceFieldTaskDocument :obj:`.TaskDoc`. + + Any makers that return forcefield jobs (not flows) should decorate the + ``make`` method with @forcefield_job. For example: + + .. code-block:: python + + class MyForcefieldMaker(Maker): + @forcefield_job + def make(structure): + # code to run forcefield job. + pass + + Parameters + ---------- + method : callable + A Maker.make method. This should not be specified directly and is + implied by the decorator. + + Returns + ------- + callable + A decorated version of the make function that will generate forcefield jobs. + """ + return job( + method, data=_FORCEFIELD_DATA_OBJECTS, output_schema=ForceFieldTaskDocument + ) + @dataclass class ForceFieldRelaxMaker(Maker): """ Base Maker to calculate forces and stresses using any force field. - Should be subclassed to use a specific force field. + Should be subclassed to use a specific force field. By default, + the code attempts to use the `self.force_field_name` attr to look + up a predefined forcefield. To overwrite this behavior, + redefine `self._calculator`. Parameters ---------- @@ -42,19 +86,22 @@ class ForceFieldRelaxMaker(Maker): Keyword arguments that will get passed to :obj:`Relaxer.relax`. optimizer_kwargs : dict Keyword arguments that will get passed to :obj:`Relaxer()`. + calculator_kwargs : dict + Keyword arguments that will get passed to the ASE calculator. task_document_kwargs : dict Additional keyword args passed to :obj:`.ForceFieldTaskDocument()`. """ name: str = "Force field relax" - force_field_name: str = "Force field" + force_field_name: str = f"{MLFF.Forcefield}" relax_cell: bool = True steps: int = 500 relax_kwargs: dict = field(default_factory=dict) optimizer_kwargs: dict = field(default_factory=dict) + calculator_kwargs: dict = field(default_factory=dict) task_document_kwargs: dict = field(default_factory=dict) - @job(output_schema=ForceFieldTaskDocument) + @forcefield_job def make( self, structure: Structure, prev_dir: str | Path | None = None ) -> ForceFieldTaskDocument: @@ -75,7 +122,11 @@ def make( "Behavior may vary..." ) - result = self._relax(structure) + with revert_default_dtype(): + relaxer = Relaxer( + self._calculator(), relax_cell=self.relax_cell, **self.optimizer_kwargs + ) + result = relaxer.relax(structure, steps=self.steps, **self.relax_kwargs) return ForceFieldTaskDocument.from_ase_compatible_result( self.force_field_name, @@ -87,14 +138,19 @@ def make( **self.task_document_kwargs, ) - def _relax(self, structure: Structure) -> dict: - raise NotImplementedError + def _calculator(self) -> Calculator: + """ASE calculator, can be overwritten by user.""" + return ase_calculator(self.force_field_name, **self.calculator_kwargs) @dataclass class ForceFieldStaticMaker(ForceFieldRelaxMaker): """ - Maker to calculate forces and stresses using the CHGNet force field. + Maker to calculate forces and stresses using any force field. + + Note that while `steps = 1` by default, the user could override + this setting along with cell shape relaxation (`relax_cell = False` + by default). Parameters ---------- @@ -108,44 +164,13 @@ class ForceFieldStaticMaker(ForceFieldRelaxMaker): name: str = "Force field static" force_field_name: str = "Force field" + relax_cell: bool = False + steps: int = 1 + relax_kwargs: dict = field(default_factory=dict) + optimizer_kwargs: dict = field(default_factory=dict) + calculator_kwargs: dict = field(default_factory=dict) task_document_kwargs: dict = field(default_factory=dict) - @job(output_schema=ForceFieldTaskDocument) - def make( - self, structure: Structure, prev_dir: str | Path | None = None - ) -> ForceFieldTaskDocument: - """ - Perform a static evaluation using a force field. - - Parameters - ---------- - structure: .Structure - pymatgen structure. - prev_dir : str or Path or None - A previous calculation directory to copy output files from. Unused, just - added to match the method signature of other makers. - """ - if self.steps < 0: - logger.warning( - "WARNING: A negative number of steps is not possible. " - "Behavior may vary..." - ) - - result = self._evaluate_static(structure) - - return ForceFieldTaskDocument.from_ase_compatible_result( - self.force_field_name, - result, - relax_cell=False, - steps=1, - relax_kwargs=None, - optimizer_kwargs=None, - **self.task_document_kwargs, - ) - - def _evaluate_static(self, structure: Structure) -> dict: - raise NotImplementedError - @dataclass class CHGNetRelaxMaker(ForceFieldRelaxMaker): @@ -169,20 +194,15 @@ class CHGNetRelaxMaker(ForceFieldRelaxMaker): """ name: str = f"{MLFF.CHGNet} relax" - force_field_name = f"{MLFF.CHGNet}" + force_field_name: str = f"{MLFF.CHGNet}" relax_cell: bool = True steps: int = 500 relax_kwargs: dict = field(default_factory=dict) optimizer_kwargs: dict = field(default_factory=dict) task_document_kwargs: dict = field(default_factory=dict) - - def _relax(self, structure: Structure) -> dict: - from chgnet.model import StructOptimizer - - relaxer = StructOptimizer(**self.optimizer_kwargs) - return relaxer.relax( - structure, relax_cell=self.relax_cell, steps=self.steps, **self.relax_kwargs - ) + calculator_kwargs: dict = field( + default_factory=lambda: {"stress_weight": _GPa_to_eV_per_A3} + ) @dataclass @@ -199,14 +219,11 @@ class CHGNetStaticMaker(ForceFieldStaticMaker): """ name: str = f"{MLFF.CHGNet} static" - force_field_name = f"{MLFF.CHGNet}" + force_field_name: str = f"{MLFF.CHGNet}" task_document_kwargs: dict = field(default_factory=dict) - - def _evaluate_static(self, structure: Structure) -> dict: - from chgnet.model import StructOptimizer - - relaxer = StructOptimizer() - return relaxer.relax(structure, steps=1) + calculator_kwargs: dict = field( + default_factory=lambda: {"stress_weight": _GPa_to_eV_per_A3} + ) @dataclass @@ -239,20 +256,9 @@ class M3GNetRelaxMaker(ForceFieldRelaxMaker): relax_kwargs: dict = field(default_factory=dict) optimizer_kwargs: dict = field(default_factory=dict) task_document_kwargs: dict = field(default_factory=dict) - - def _relax(self, structure: Structure) -> dict: - import matgl - from matgl.ext.ase import Relaxer - - # Note: the below code was taken from the matgl repo examples. - # Load pre-trained M3GNet model (currently uses the MP-2021.2.8 database) - potential = matgl.load_model("M3GNet-MP-2021.2.8-PES") - - relaxer = Relaxer( - potential=potential, relax_cell=self.relax_cell, **self.optimizer_kwargs - ) - - return relaxer.relax(structure, steps=self.steps, **self.relax_kwargs) + calculator_kwargs: dict = field( + default_factory=lambda: {"stress_weight": _GPa_to_eV_per_A3} + ) @dataclass @@ -276,13 +282,6 @@ class NequipRelaxMaker(ForceFieldRelaxMaker): Keyword arguments that will get passed to :obj:`Relaxer()`. task_document_kwargs : dict Additional keyword args passed to :obj:`.ForceFieldTaskDocument()`. - model_path: str | Path - deployed model checkpoint to load with - :obj:`nequip.calculators.NequipCalculator.from_deployed_model()'`. - model_kwargs: dict[str, Any] - Further keywords (e.g. device: Union[str, torch.device], - species_to_type_name: Optional[Dict[str, str]] = None) for - :obj:`nequip.calculators.NequipCalculator()'`. """ name: str = f"{MLFF.Nequip} relax" @@ -292,20 +291,6 @@ class NequipRelaxMaker(ForceFieldRelaxMaker): relax_kwargs: dict = field(default_factory=dict) optimizer_kwargs: dict = field(default_factory=dict) task_document_kwargs: dict = field(default_factory=dict) - model_path: str | Path = "" - model_kwargs: dict = field(default_factory=dict) - - def _relax(self, structure: Structure) -> dict: - from nequip.ase import NequIPCalculator - - calculator = NequIPCalculator.from_deployed_model( - self.model_path, **self.model_kwargs - ) - relaxer = Relaxer( - calculator, relax_cell=self.relax_cell, **self.optimizer_kwargs - ) - - return relaxer.relax(structure, steps=self.steps, **self.relax_kwargs) @dataclass @@ -321,30 +306,11 @@ class NequipStaticMaker(ForceFieldStaticMaker): The name of the force field. task_document_kwargs : dict Additional keyword args passed to :obj:`.ForceFieldTaskDocument()`. - model_path: str | Path - deployed model checkpoint to load with - :obj:`nequip.calculators.NequipCalculator()'`. - model_kwargs: dict[str, Any] - Further keywords (e.g. device: Union[str, torch.device], - species_to_type_name: Optional[Dict[str, str]] = None) for - :obj:`nequip.calculators.NequipCalculator()'`. """ name: str = f"{MLFF.Nequip} static" force_field_name: str = f"{MLFF.Nequip}" task_document_kwargs: dict = field(default_factory=dict) - model_path: str | Path = "" - model_kwargs: dict = field(default_factory=dict) - - def _evaluate_static(self, structure: Structure) -> dict: - from nequip.ase import NequIPCalculator - - calculator = NequIPCalculator.from_deployed_model( - self.model_path, **self.model_kwargs - ) - relaxer = Relaxer(calculator, relax_cell=False) - - return relaxer.relax(structure, steps=1) @dataclass @@ -365,18 +331,9 @@ class M3GNetStaticMaker(ForceFieldStaticMaker): name: str = f"{MLFF.M3GNet} static" force_field_name: str = f"{MLFF.M3GNet}" task_document_kwargs: dict = field(default_factory=dict) - - def _evaluate_static(self, structure: Structure) -> dict: - import matgl - from matgl.ext.ase import Relaxer - - # Note: the below code was taken from the matgl repo examples. - # Load pre-trained M3GNet model (currently uses the MP-2021.2.8 database) - potential = matgl.load_model("M3GNet-MP-2021.2.8-PES") - - relaxer = Relaxer(potential=potential, relax_cell=False) - - return relaxer.relax(structure, steps=1) + calculator_kwargs: dict = field( + default_factory=lambda: {"stress_weight": _GPa_to_eV_per_A3} + ) @dataclass @@ -417,18 +374,6 @@ class MACERelaxMaker(ForceFieldRelaxMaker): relax_kwargs: dict = field(default_factory=dict) optimizer_kwargs: dict = field(default_factory=dict) task_document_kwargs: dict = field(default_factory=dict) - model: str | Path | Sequence[str | Path] | None = None - model_kwargs: dict = field(default_factory=dict) - - def _relax(self, structure: Structure) -> dict: - from mace.calculators import mace_mp - - with revert_default_dtype(): - calculator = mace_mp(model=self.model, **self.model_kwargs) - relaxer = Relaxer( - calculator, relax_cell=self.relax_cell, **self.optimizer_kwargs - ) - return relaxer.relax(structure, steps=self.steps, **self.relax_kwargs) @dataclass @@ -457,16 +402,6 @@ class MACEStaticMaker(ForceFieldStaticMaker): name: str = f"{MLFF.MACE} static" force_field_name: str = f"{MLFF.MACE}" task_document_kwargs: dict = field(default_factory=dict) - model: str | Path | Sequence[str | Path] | None = None - model_kwargs: dict = field(default_factory=dict) - - def _evaluate_static(self, structure: Structure) -> dict: - from mace.calculators import mace_mp - - with revert_default_dtype(): - calculator = mace_mp(model=self.model, **self.model_kwargs) - relaxer = Relaxer(calculator, relax_cell=False) - return relaxer.relax(structure, steps=1) @dataclass @@ -490,12 +425,6 @@ class GAPRelaxMaker(ForceFieldRelaxMaker): Keyword arguments that will get passed to :obj:`Relaxer()`. task_document_kwargs : dict Additional keyword args passed to :obj:`.ForceFieldTaskDocument()`. - potential_args_str: str - args_str for :obj:`quippy.potential.Potential()'`. - potential_param_file_name: str | Path - param_file_name for :obj:`quippy.potential.Potential()'`. - potential_kwargs: dict - Further keywords for :obj:`quippy.potential.Potential()'`. """ name: str = f"{MLFF.GAP} relax" @@ -504,23 +433,13 @@ class GAPRelaxMaker(ForceFieldRelaxMaker): steps: int = 500 relax_kwargs: dict = field(default_factory=dict) optimizer_kwargs: dict = field(default_factory=dict) + calculator_kwargs: dict = field( + default_factory=lambda: { + "args_str": "IP GAP", + "param_filename": "gap.xml", + } + ) task_document_kwargs: dict = field(default_factory=dict) - potential_args_str: str | Path = "IP GAP" - potential_param_file_name: str = "gap.xml" - potential_kwargs: dict = field(default_factory=dict) - - def _relax(self, structure: Structure) -> dict: - from quippy.potential import Potential - - calculator = Potential( - args_str=self.potential_args_str, - param_filename=str(self.potential_param_file_name), - **self.potential_kwargs, - ) - relaxer = Relaxer( - calculator, **self.optimizer_kwargs, relax_cell=self.relax_cell - ) - return relaxer.relax(structure, steps=self.steps, **self.relax_kwargs) @dataclass @@ -536,28 +455,14 @@ class GAPStaticMaker(ForceFieldStaticMaker): The name of the force field. task_document_kwargs : dict Additional keyword args passed to :obj:`.ForceFieldTaskDocument()`. - potential_args_str: str - args_str for :obj:`quippy.potential.Potential()'`. - potential_param_file_name: str | Path - param_file_name for :obj:`quippy.potential.Potential()'`. - potential_kwargs: dict - Further keywords for :obj:`quippy.potential.Potential()'`. """ name: str = f"{MLFF.GAP} static" force_field_name: str = f"{MLFF.GAP}" task_document_kwargs: dict = field(default_factory=dict) - potential_args_str: str = "IP GAP" - potential_param_file_name: str | Path = "gap.xml" - potential_kwargs: dict = field(default_factory=dict) - - def _evaluate_static(self, structure: Structure) -> dict: - from quippy.potential import Potential - - calculator = Potential( - args_str=self.potential_args_str, - param_filename=str(self.potential_param_file_name), - **self.potential_kwargs, - ) - relaxer = Relaxer(calculator, relax_cell=False) - return relaxer.relax(structure, steps=1) + calculator_kwargs: dict = field( + default_factory=lambda: { + "args_str": "IP GAP", + "param_filename": "gap.xml", + } + ) diff --git a/src/atomate2/forcefields/md.py b/src/atomate2/forcefields/md.py new file mode 100644 index 0000000000..15cf1c850b --- /dev/null +++ b/src/atomate2/forcefields/md.py @@ -0,0 +1,383 @@ +"""Makers to perform MD with forcefields.""" + +from __future__ import annotations + +import contextlib +import io +from collections.abc import Sequence +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +import numpy as np +from ase import units +from ase.md.andersen import Andersen +from ase.md.langevin import Langevin +from ase.md.md import MolecularDynamics +from ase.md.npt import NPT +from ase.md.nptberendsen import NPTBerendsen +from ase.md.nvtberendsen import NVTBerendsen +from ase.md.velocitydistribution import ( + MaxwellBoltzmannDistribution, + Stationary, + ZeroRotation, +) +from ase.md.verlet import VelocityVerlet +from jobflow import Maker +from pymatgen.io.ase import AseAtomsAdaptor +from scipy.interpolate import interp1d +from scipy.linalg import schur + +from atomate2.forcefields import MLFF +from atomate2.forcefields.jobs import forcefield_job +from atomate2.forcefields.schemas import ForcefieldResult, ForceFieldTaskDocument +from atomate2.forcefields.utils import ( + TrajectoryObserver, + ase_calculator, + revert_default_dtype, +) + +if TYPE_CHECKING: + from pathlib import Path + from typing import Literal + + from ase.calculators.calculator import Calculator + from pymatgen.core.structure import Structure + +_valid_dynamics: dict[str, tuple[str, ...]] = { + "nve": ("velocityverlet",), + "nvt": ("nose-hoover", "langevin", "andersen", "berendsen"), + "npt": ("nose-hoover", "berendsen"), +} + +_preset_dynamics: dict = { + "nve_velocityverlet": VelocityVerlet, + "nvt_andersen": Andersen, + "nvt_berendsen": NVTBerendsen, + "nvt_langevin": Langevin, + "nvt_nose-hoover": NPT, + "npt_berendsen": NPTBerendsen, + "npt_nose-hoover": NPT, +} + + +@dataclass +class ForceFieldMDMaker(Maker): + """ + Perform MD with a force field. + + Note the the following units are consistent with the VASP MD implementation: + - `temperature` in Kelvin (TEBEG and TEEND) + - `time_step` in femtoseconds (POTIM) + - `pressure` in kB (PSTRESS) + + The default dynamics is Langevin NVT consistent with VASP MD, with the friction + coefficient set to 10 ps^-1 (LANGEVIN_GAMMA). + + For the rest of preset dynamics (`_valid_dynamics`) and custom dynamics inherited + from ASE (`MolecularDynamics`), the user can specify the dynamics as a string or an + ASE class into the `dynamics` attribute. In this case, please consult the ASE + documentation for the parameters and units to pass into the ASE MD function through + `ase_md_kwargs`. + + Parameters + ---------- + name : str + The name of the MD Maker + force_field_name : str + The name of the forcefield (for provenance) + time_step : float | None = None. + The timestep of the MD run in fs. + If `None`, defaults to 0.5 fs if a structure contains an isotope of + hydrogen and 2 fs otherwise. + n_steps : int = 1000 + The number of MD steps to run + ensemble : str = "nvt" + The ensemble to use. Valid ensembles are nve, nvt, or npt + temperature: float | Sequence | np.ndarray | None. + The temperature in Kelvin. If a sequence or 1D array, the temperature + schedule will be interpolated linearly between the given values. If a + float, the temperature will be constant throughout the run. + pressure: float | Sequence | None = None + The pressure in kilobar. If a sequence or 1D array, the pressure + schedule will be interpolated linearly between the given values. If a + float, the pressure will be constant throughout the run. + dynamics : str | ASE .MolecularDynamics = "langevin" + The dynamical thermostat to use. If dynamics is an ASE .MolecularDynamics + object, this uses the option specified explicitly by the user. + See _valid_dynamics for a list of pre-defined options when + specifying dynamics as a string. + ase_md_kwargs : dict | None = None + Options except for temperature and pressure to pass into the ASE MD function + calculator_kwargs : dict + kwargs to pass to the ASE calculator class + traj_file : str | Path | None = None + If a str or Path, the name of the file to save the MD trajectory to. + If None, the trajectory is not written to disk + traj_file_fmt : Literal["ase","pmg"] + The format of the trajectory file to write. If "ase", writes an + ASE trajectory, if "pmg", writes a Pymatgen trajectory. + traj_interval : int + The step interval for saving the trajectories. + mb_velocity_seed : int | None = None + If an int, a random number seed for generating initial velocities + from a Maxwell-Boltzmann distribution. + zero_linear_momentum : bool = False + Whether to initialize the atomic velocities with zero linear momentum + zero_angular_momentum : bool = False + Whether to initialize the atomic velocities with zero angular momentum + task_document_kwargs: dict + Options to pass to the TaskDoc. Default choice + {"store_trajectory": "partial", "ionic_step_data": ("energy",),} + is consistent with atomate2.vasp.md.MDMaker + """ + + name: str = "Forcefield MD" + force_field_name: str = f"{MLFF.Forcefield}" + time_step: float | None = None + n_steps: int = 1000 + ensemble: Literal["nve", "nvt", "npt"] = "nvt" + dynamics: str | MolecularDynamics = "langevin" + temperature: float | Sequence | np.ndarray | None = 300.0 + pressure: float | Sequence | np.ndarray | None = None + ase_md_kwargs: dict | None = None + calculator_kwargs: dict = field(default_factory=dict) + traj_file: str | Path | None = None + traj_file_fmt: Literal["pmg", "ase"] = "ase" + traj_interval: int = 1 + mb_velocity_seed: int | None = None + zero_linear_momentum: bool = False + zero_angular_momentum: bool = False + task_document_kwargs: dict = field( + default_factory=lambda: { + "store_trajectory": "partial", + "ionic_step_data": ("energy",), # energy is required in ionic steps + } + ) + + @staticmethod + def _interpolate_quantity(values: Sequence | np.ndarray, n_pts: int) -> np.ndarray: + """Interpolate temperature / pressure on a schedule.""" + n_vals = len(values) + return np.interp( + np.linspace(0, n_vals - 1, n_pts + 1), + np.linspace(0, n_vals - 1, n_vals), + values, + ) + + def _get_ensemble_schedule(self) -> None: + if self.ensemble == "nve": + # Disable thermostat and barostat + self.temperature = np.nan + self.pressure = np.nan + self.t_schedule = np.full(self.n_steps + 1, self.temperature) + self.p_schedule = np.full(self.n_steps + 1, self.pressure) + return + + if isinstance(self.temperature, Sequence) or ( + isinstance(self.temperature, np.ndarray) and self.temperature.ndim == 1 + ): + self.t_schedule = self._interpolate_quantity(self.temperature, self.n_steps) + # NOTE: In ASE Langevin dynamics, the temperature are normally + # scalars, but in principle one quantity per atom could be specified by giving + # an array. This is not implemented yet here. + else: + self.t_schedule = np.full(self.n_steps + 1, self.temperature) + + if self.ensemble == "nvt": + self.pressure = np.nan + self.p_schedule = np.full(self.n_steps + 1, self.pressure) + return + + if isinstance(self.pressure, Sequence) or ( + isinstance(self.pressure, np.ndarray) and self.pressure.ndim == 1 + ): + self.p_schedule = self._interpolate_quantity(self.pressure, self.n_steps) + elif isinstance(self.pressure, np.ndarray) and self.pressure.ndim == 4: + self.p_schedule = interp1d( + np.arange(self.n_steps + 1), self.pressure, kind="linear" + ) + else: + self.p_schedule = np.full(self.n_steps + 1, self.pressure) + + def _get_ensemble_defaults(self) -> None: + """Update ASE MD kwargs with defaults consistent with VASP MD.""" + self.ase_md_kwargs = self.ase_md_kwargs or {} + + if self.ensemble == "nve": + self.ase_md_kwargs.pop("temperature", None) + self.ase_md_kwargs.pop("temperature_K", None) + self.ase_md_kwargs.pop("externalstress", None) + elif self.ensemble == "nvt": + self.ase_md_kwargs["temperature_K"] = self.t_schedule[0] + self.ase_md_kwargs.pop("externalstress", None) + elif self.ensemble == "npt": + self.ase_md_kwargs["temperature_K"] = self.t_schedule[0] + self.ase_md_kwargs["externalstress"] = self.p_schedule[0] * 1e3 * units.bar + + if isinstance(self.dynamics, str) and self.dynamics.lower() == "langevin": + self.ase_md_kwargs["friction"] = self.ase_md_kwargs.get( + "friction", + 10.0 * 1e-3 / units.fs, # Same default as in VASP: 10 ps^-1 + ) + + @forcefield_job + def make( + self, + structure: Structure, + prev_dir: str | Path | None = None, + ) -> ForceFieldTaskDocument: + """ + Perform MD on a structure using a force field. + + Parameters + ---------- + structure: .Structure + pymatgen structure. + prev_dir : str or Path or None + A previous calculation directory to copy output files from. Unused, just + added to match the method signature of other makers. + """ + self._get_ensemble_schedule() + self._get_ensemble_defaults() + + if self.time_step is None: + # If a structure contains an isotope of hydrogen, set default `time_step` + # to 0.5 fs, and 2 fs otherwise. + has_h_isotope = any(element.Z == 1 for element in structure.composition) + self.time_step = 0.5 if has_h_isotope else 2.0 + + initial_velocities = structure.site_properties.get("velocities") + + if isinstance(self.dynamics, str): + # Use known dynamics if `self.dynamics` is a str + self.dynamics = self.dynamics.lower() + if self.dynamics not in _valid_dynamics[self.ensemble]: + raise ValueError( + f"{self.dynamics} thermostat not available for {self.ensemble}." + f"Available {self.ensemble} thermostats are:" + " ".join(_valid_dynamics[self.ensemble]) + ) + + if self.ensemble == "nve" and self.dynamics is None: + self.dynamics = "velocityverlet" + md_func = _preset_dynamics[f"{self.ensemble}_{self.dynamics}"] + + elif issubclass(self.dynamics, MolecularDynamics): + # Allow user to explicitly run ASE Dynamics class + md_func = self.dynamics + + atoms = structure.to_ase_atoms() + + if md_func is NPT: + # Note that until md_func is instantiated, isinstance(md_func,NPT) is False + # ASE NPT implementation requires upper triangular cell + u, _ = schur(atoms.get_cell(complete=True), output="complex") + atoms.set_cell(u.real, scale_atoms=True) + + if initial_velocities: + atoms.set_velocities(initial_velocities) + elif not np.isnan(self.t_schedule).any(): + MaxwellBoltzmannDistribution( + atoms=atoms, + temperature_K=self.t_schedule[0], + rng=np.random.default_rng(seed=self.mb_velocity_seed), + ) + if self.zero_linear_momentum: + Stationary(atoms) + if self.zero_angular_momentum: + ZeroRotation(atoms) + + with revert_default_dtype(): + atoms.calc = self._calculator() + + with contextlib.redirect_stdout(io.StringIO()): + md_observer = TrajectoryObserver(atoms, store_md_outputs=True) + + md_runner = md_func( + atoms=atoms, + timestep=self.time_step * units.fs, + **self.ase_md_kwargs, + ) + + md_runner.attach(md_observer, interval=self.traj_interval) + + def _callback(dyn: MolecularDynamics = md_runner) -> None: + if self.ensemble == "nve": + return + dyn.set_temperature(temperature_K=self.t_schedule[dyn.nsteps]) + if self.ensemble == "nvt": + return + dyn.set_stress(self.p_schedule[dyn.nsteps] * 1e3 * units.bar) + + md_runner.attach(_callback, interval=1) + md_runner.run(steps=self.n_steps) + + if self.traj_file is not None: + md_observer.save(filename=self.traj_file, fmt=self.traj_file_fmt) + + structure = AseAtomsAdaptor.get_structure(atoms) + + self.task_document_kwargs = self.task_document_kwargs or {} + + return ForceFieldTaskDocument.from_ase_compatible_result( + self.force_field_name, + ForcefieldResult( + final_structure=structure, + trajectory=md_observer.to_pymatgen_trajectory(None), + ), + relax_cell=(self.ensemble == "npt"), + steps=self.n_steps, + relax_kwargs=None, + optimizer_kwargs=None, + **self.task_document_kwargs, + ) + + def _calculator(self) -> Calculator: + """ASE calculator, can be overwritten by user.""" + return ase_calculator(self.force_field_name, **self.calculator_kwargs) + + +@dataclass +class MACEMDMaker(ForceFieldMDMaker): + """Perform an MD run with MACE.""" + + name: str = f"{MLFF.MACE} MD" + force_field_name: str = f"{MLFF.MACE}" + calculator_kwargs: dict = field( + default_factory=lambda: {"default_dtype": "float32"} + ) + + +@dataclass +class M3GNetMDMaker(ForceFieldMDMaker): + """Perform an MD run with M3GNet.""" + + name: str = f"{MLFF.M3GNet} MD" + force_field_name: str = f"{MLFF.M3GNet}" + + +@dataclass +class CHGNetMDMaker(ForceFieldMDMaker): + """Perform an MD run with CHGNet.""" + + name: str = f"{MLFF.CHGNet} MD" + force_field_name: str = f"{MLFF.CHGNet}" + + +@dataclass +class GAPMDMaker(ForceFieldMDMaker): + """Perform an MD run with GAP.""" + + name: str = f"{MLFF.GAP} MD" + force_field_name: str = f"{MLFF.GAP}" + calculator_kwargs: dict = field( + default_factory=lambda: {"args_str": "IP GAP", "param_filename": "gap.xml"} + ) + + +@dataclass +class NequipMDMaker(ForceFieldMDMaker): + """Perform an MD run with nequip.""" + + name: str = f"{MLFF.Nequip} MD" + force_field_name: str = f"{MLFF.Nequip}" diff --git a/src/atomate2/forcefields/schemas.py b/src/atomate2/forcefields/schemas.py index 98f3450995..c4b7063176 100644 --- a/src/atomate2/forcefields/schemas.py +++ b/src/atomate2/forcefields/schemas.py @@ -1,18 +1,39 @@ """Schema definitions for force field tasks.""" -from typing import Optional +from typing import Any, Optional from ase.stress import voigt_6_to_full_3x3_stress from ase.units import GPa from emmet.core.math import Matrix3D, Vector3D from emmet.core.structure import StructureMetadata +from emmet.core.utils import ValueEnum +from emmet.core.vasp.calculation import StoreTrajectoryOption from pydantic import BaseModel, Field from pymatgen.core.structure import Structure -from pymatgen.io.ase import AseAtomsAdaptor +from pymatgen.core.trajectory import Trajectory from atomate2.forcefields import MLFF +class ForcefieldResult(dict): + """Schema to store outputs in ForceFieldTaskDocument.""" + + def __init__(self, **kwargs: Any) -> None: + kwargs = { + "final_structure": None, + "trajectory": None, + "is_force_converged": None, + **kwargs, + } + super().__init__(**kwargs) + + +class ForcefieldObject(ValueEnum): + """Types of forcefield data objects.""" + + TRAJECTORY = "trajectory" + + class IonicStep(BaseModel, extra="allow"): # type: ignore[call-arg] """Document defining the information at each ionic step.""" @@ -21,7 +42,9 @@ class IonicStep(BaseModel, extra="allow"): # type: ignore[call-arg] None, description="The forces on each atom." ) stress: Optional[Matrix3D] = Field(None, description="The stress on the lattice.") - structure: Structure = Field(None, description="The structure at this step.") + structure: Optional[Structure] = Field( + None, description="The structure at this step." + ) class InputDoc(BaseModel): @@ -104,6 +127,21 @@ class ForceFieldTaskDocument(StructureMetadata): None, description="Directory where the force field calculations are performed." ) + included_objects: Optional[list[ForcefieldObject]] = Field( + None, description="list of forcefield objects included with this task document" + ) + forcefield_objects: Optional[dict[ForcefieldObject, Any]] = Field( + None, description="Forcefield objects associated with this task" + ) + + is_force_converged: Optional[bool] = Field( + None, + description=( + "Whether the calculation is converged with respect " + "to interatomic forces." + ), + ) + @classmethod def from_ase_compatible_result( cls, @@ -114,6 +152,7 @@ def from_ase_compatible_result( relax_kwargs: dict = None, optimizer_kwargs: dict = None, ionic_step_data: tuple = ("energy", "forces", "magmoms", "stress", "structure"), + store_trajectory: StoreTrajectoryOption = StoreTrajectoryOption.NO, ) -> "ForceFieldTaskDocument": """ Create a ForceFieldTaskDocument for a Task that has ASE-compatible outputs. @@ -135,23 +174,22 @@ def from_ase_compatible_result( ionic_step_data : tuple Which data to save from each ionic step. """ - trajectory = result["trajectory"].__dict__ + trajectory = result["trajectory"] + + n_steps = len(trajectory) # NOTE: convert stress units from eV/A³ to kBar (* -1 from standard output) # and to 3x3 matrix to comply with MP convention - for idx in range(len(trajectory["stresses"])): - trajectory["stresses"][idx] = voigt_6_to_full_3x3_stress( - trajectory["stresses"][idx] * -10 / GPa - ) - - species = AseAtomsAdaptor.get_structure(trajectory["atoms"]).species + for idx in range(n_steps): + if trajectory.frame_properties[idx].get("stress") is not None: + trajectory.frame_properties[idx]["stress"] = voigt_6_to_full_3x3_stress( + [ + val * -10 / GPa + for val in trajectory.frame_properties[idx]["stress"] + ] + ) - input_structure = Structure( - lattice=trajectory["cells"][0], - coords=trajectory["atom_positions"][0], - species=species, - coords_are_cartesian=True, - ) + input_structure = trajectory[0] input_doc = InputDoc( structure=input_structure, @@ -165,70 +203,56 @@ def from_ase_compatible_result( # number of steps for static calculations. if steps <= 1: steps = 1 - for key in trajectory: - trajectory[key] = [trajectory[key][0]] + n_steps = 1 + trajectory = Trajectory.from_structures( + structures=[trajectory[0]], + frame_properties=[trajectory.frame_properties[0]], + constant_lattice=False, + ) output_structure = input_structure else: output_structure = result["final_structure"] - final_energy = trajectory["energies"][-1] - final_energy_per_atom = trajectory["energies"][-1] / input_structure.num_sites - final_forces = trajectory["forces"][-1].tolist() - final_stress = trajectory["stresses"][-1].tolist() - - n_steps = len(trajectory["energies"]) + final_energy = trajectory.frame_properties[-1]["energy"] + final_energy_per_atom = final_energy / input_structure.num_sites + final_forces = trajectory.frame_properties[-1]["forces"] + final_stress = trajectory.frame_properties[-1]["stress"] ionic_steps = [] for idx in range(n_steps): - energy = ( - trajectory["energies"][idx] if "energy" in ionic_step_data else None - ) - forces = ( - trajectory["forces"][idx].tolist() - if "forces" in ionic_step_data + _ionic_step_data = { + key: trajectory.frame_properties[idx][key] + if key in ionic_step_data else None - ) - stress = ( - trajectory["stresses"][idx].tolist() - if "stress" in ionic_step_data - else None - ) + for key in ("energy", "forces", "stress") + } - if "structure" in ionic_step_data: - cur_structure = Structure( - lattice=trajectory["cells"][idx], - coords=trajectory["atom_positions"][idx], - species=species, - coords_are_cartesian=True, - ) - else: - cur_structure = None + cur_structure = trajectory[idx] if "structure" in ionic_step_data else None # include "magmoms" in :obj:`ionic_step` if the trajectory has "magmoms" - if "magmoms" in trajectory: - ionic_step = IonicStep( - energy=energy, - forces=forces, - magmoms=( - trajectory["magmoms"][idx].tolist() + if "magmoms" in trajectory.frame_properties[idx]: + _ionic_step_data.update( + { + "magmoms": trajectory.frame_properties[idx]["magmoms"] if "magmoms" in ionic_step_data else None - ), - stress=stress, - structure=cur_structure, + } ) - # otherwise do not include "magmoms" in :obj:`ionic_step` - elif "magmoms" not in trajectory: - ionic_step = IonicStep( - energy=energy, - forces=forces, - stress=stress, - structure=cur_structure, - ) + ionic_step = IonicStep( + structure=cur_structure, + **_ionic_step_data, + ) ionic_steps.append(ionic_step) + forcefield_objects: dict[ForcefieldObject, Any] = {} + if store_trajectory != StoreTrajectoryOption.NO: + # For VASP calculations, the PARTIAL trajectory option removes + # electronic step info. There is no equivalent for forcefields, + # so we just save the same info for FULL and PARTIAL options. + forcefield_objects[ForcefieldObject.TRAJECTORY] = trajectory # type: ignore[index] + output_doc = OutputDoc( structure=output_structure, energy=final_energy, @@ -240,11 +264,17 @@ def from_ase_compatible_result( ) # map force field name to its package name - pkg_name = { - MLFF.M3GNet: "matgl", - MLFF.CHGNet: "chgnet", - MLFF.MACE: "mace-torch", - }.get(forcefield_name) # type: ignore[call-overload] + pkg_names = { + str(k): v + for k, v in { + MLFF.M3GNet: "matgl", + MLFF.CHGNet: "chgnet", + MLFF.MACE: "mace-torch", + MLFF.GAP: "quippy-ase", + MLFF.Nequip: "nequip", + }.items() + } + pkg_name = pkg_names.get(forcefield_name) if pkg_name: import importlib.metadata @@ -258,4 +288,7 @@ def from_ase_compatible_result( output=output_doc, forcefield_name=forcefield_name, forcefield_version=version, + included_objects=list(forcefield_objects.keys()), + forcefield_objects=forcefield_objects, + is_force_converged=result.get("is_force_converged"), ) diff --git a/src/atomate2/forcefields/utils.py b/src/atomate2/forcefields/utils.py index c02fef56a7..4f6d1720fc 100644 --- a/src/atomate2/forcefields/utils.py +++ b/src/atomate2/forcefields/utils.py @@ -11,17 +11,27 @@ import contextlib import io -import pickle +import json import sys import warnings from contextlib import contextmanager from typing import TYPE_CHECKING +import numpy as np +from ase.calculators.calculator import PropertyNotImplementedError +from ase.calculators.singlepoint import SinglePointCalculator +from ase.io import Trajectory as AseTrajectory from ase.optimize import BFGS, FIRE, LBFGS, BFGSLineSearch, LBFGSLineSearch, MDMin from ase.optimize.sciopt import SciPyFminBFGS, SciPyFminCG +from monty.json import MontyDecoder +from monty.serialization import dumpfn from pymatgen.core.structure import Molecule, Structure +from pymatgen.core.trajectory import Trajectory as PmgTrajectory from pymatgen.io.ase import AseAtomsAdaptor +from atomate2.forcefields import MLFF +from atomate2.forcefields.schemas import ForcefieldResult + try: from ase.filters import FrechetCellFilter except ImportError: @@ -39,9 +49,8 @@ if TYPE_CHECKING: from collections.abc import Generator from os import PathLike - from typing import Any + from typing import Any, Literal - import numpy as np from ase import Atoms from ase.calculators.calculator import Calculator from ase.filters import Filter @@ -60,13 +69,52 @@ } +def _get_pymatgen_trajectory_from_observer( + trajectory_observer: Any, frame_property_keys: list[str] +) -> PmgTrajectory: + to_singluar = {"energies": "energy", "stresses": "stress"} + + if hasattr(trajectory_observer, "as_dict"): + traj = trajectory_observer.as_dict() + else: + traj = trajectory_observer.__dict__ + + n_md_steps = len(traj["cells"]) + species = AseAtomsAdaptor.get_structure(traj["atoms"]).species + + structures = [ + Structure( + lattice=traj["cells"][idx], + coords=traj["atom_positions"][idx], + species=species, + coords_are_cartesian=True, + ) + for idx in range(n_md_steps) + ] + + frame_properties = [ + { + to_singluar.get(key, key): traj[key][idx] + for key in frame_property_keys + if key in traj + } + for idx in range(n_md_steps) + ] + + return PmgTrajectory.from_structures( + structures, + frame_properties=frame_properties, + constant_lattice=False, + ) + + class TrajectoryObserver: """Trajectory observer. This is a hook in the relaxation process that saves the intermediate structures. """ - def __init__(self, atoms: Atoms) -> None: + def __init__(self, atoms: Atoms, store_md_outputs: bool = False) -> None: """ Initialize the Observer. @@ -82,18 +130,43 @@ def __init__(self, atoms: Atoms) -> None: self.energies: list[float] = [] self.forces: list[np.ndarray] = [] self.stresses: list[np.ndarray] = [] + + self._store_magmoms = True + self.magmoms: list[np.ndarray] = [] + self.atom_positions: list[np.ndarray] = [] self.cells: list[np.ndarray] = [] + self._store_md_outputs = store_md_outputs + # `self.{velocities,temperatures}` always initialized, + # but data is only stored / saved to trajectory for MD runs + self.velocities: list[np.ndarray] = [] + self.temperatures: list[float] = [] + def __call__(self) -> None: """Save the properties of an Atoms during the relaxation.""" - # TODO: maybe include magnetic moments self.energies.append(self.compute_energy()) self.forces.append(self.atoms.get_forces()) - self.stresses.append(self.atoms.get_stress()) + # MD needs kinetic energy parts of stress, relaxations do not + # When _store_md_outputs is True, ideal gas contribution to + # stress is included. + self.stresses.append( + self.atoms.get_stress(include_ideal_gas=self._store_md_outputs) + ) + + if self._store_magmoms: + try: + self.magmoms.append(self.atoms.get_magnetic_moments()) + except PropertyNotImplementedError: + self._store_magmoms = False + self.atom_positions.append(self.atoms.get_positions()) self.cells.append(self.atoms.get_cell()[:]) + if self._store_md_outputs: + self.velocities.append(self.atoms.get_velocities()) + self.temperatures.append(self.atoms.get_temperature()) + def compute_energy(self) -> float: """ Calculate the energy, here we just use the potential energy. @@ -104,9 +177,11 @@ def compute_energy(self) -> float: """ return self.atoms.get_potential_energy() - def save(self, filename: str | PathLike) -> None: + def save( + self, filename: str | PathLike | None, fmt: Literal["pmg", "ase"] = "ase" + ) -> None: """ - Save the trajectory file. + Save the trajectory file using monty.serialization. Parameters ---------- @@ -116,16 +191,95 @@ def save(self, filename: str | PathLike) -> None: ------- None """ + filename = str(filename) if filename is not None else None + if fmt == "pmg": + self.to_pymatgen_trajectory(filename=filename) + elif fmt == "ase": + self.to_ase_trajectory(filename=filename) + + def to_ase_trajectory(self, filename: str | None = "atoms.traj") -> AseTrajectory: + """ + Convert to an ASE .Trajectory. + + Parameters + ---------- + filename : str | None + Name of the file to write the ASE trajectory to. + If None, no file is written. + """ + for idx in range(len(self.cells)): + atoms = self.atoms.copy() + atoms.set_positions(self.atom_positions[idx]) + atoms.set_cell(self.cells[idx]) + + if self._store_md_outputs: + atoms.set_velocities(self.velocities[idx]) + + kwargs = { + "energy": self.energies[idx], + "forces": self.forces[idx], + "stress": self.stresses[idx], + } + if self._store_magmoms: + kwargs["magmom"] = self.magmoms[idx] + + atoms.calc = SinglePointCalculator(atoms=atoms, **kwargs) + with AseTrajectory(filename, "a" if idx > 0 else "w", atoms=atoms) as f: + f.write() + + return AseTrajectory(filename, "r") + + def to_pymatgen_trajectory( + self, filename: str | None = "trajectory.json.gz" + ) -> PmgTrajectory: + """ + Convert the trajectory to a pymatgen .Trajectory object. + + Parameters + ---------- + filename : str or None + Name of the file to write the pymatgen trajectory to. + If None, no file is written. + """ + frame_property_keys = ["energy", "forces", "stress"] + if self._store_magmoms: + frame_property_keys += ["magmoms"] + if self._store_md_outputs: + frame_property_keys += ["velocities", "temperature"] + + traj = _get_pymatgen_trajectory_from_observer( + self, frame_property_keys=frame_property_keys + ) + + if filename: + dumpfn(traj, filename) + + return traj + + def as_dict(self) -> dict: + """Make JSONable dict representation of the Trajectory.""" traj_dict = { "energy": self.energies, "forces": self.forces, - "stresses": self.stresses, + "stress": self.stresses, "atom_positions": self.atom_positions, - "cell": self.cells, + "cells": self.cells, + "atoms": self.atoms, "atomic_number": self.atoms.get_atomic_numbers(), } - with open(filename, "wb") as file: - pickle.dump(traj_dict, file) + + if self._store_magmoms: + traj_dict["magmoms"] = self.magmoms + + if self._store_md_outputs: + traj_dict.update(velocities=self.velocities, temperature=self.temperatures) + # sanitize dict + for key in traj_dict: + if all(isinstance(val, np.ndarray) for val in traj_dict[key]): + traj_dict[key] = [val.tolist() for val in traj_dict[key]] + elif isinstance(traj_dict[key], np.ndarray): + traj_dict[key] = traj_dict[key].tolist() + return traj_dict class Relaxer: @@ -169,7 +323,7 @@ def relax( verbose: bool = False, cell_filter: Filter = FrechetCellFilter, **kwargs, - ) -> dict[str, Any]: + ) -> ForcefieldResult: """ Relax the structure. @@ -212,7 +366,78 @@ def relax( atoms = atoms.atoms struct = self.ase_adaptor.get_structure(atoms) - return {"final_structure": struct, "trajectory": obs} + traj = obs.to_pymatgen_trajectory(None) + is_force_conv = all( + np.linalg.norm(traj.frame_properties[-1]["forces"][idx]) < abs(fmax) + for idx in range(len(struct)) + ) + return ForcefieldResult( + final_structure=struct, trajectory=traj, is_force_converged=is_force_conv + ) + + +def ase_calculator(calculator_meta: str | dict, **kwargs: Any) -> Calculator | None: + """ + Create an ASE calculator from a given set of metadata. + + Parameters + ---------- + calculator_meta : str or dict + If a str, should be one of `atomate2.forcefields.MLFF`. + If a dict, should be decodable by `monty.json.MontyDecoder`. + For example, one can also call the CHGNet calculator as follows + ``` + calculator_meta = { + "@module": "chgnet.model.dynamics", + "@callable": "CHGNetCalculator" + } + ``` + args : optional args to pass to a calculator + kwargs : optional kwargs to pass to a calculator + + Returns + ------- + ASE .Calculator + """ + calculator = None + + if isinstance(calculator_meta, str) and calculator_meta in [ + f"{name}" for name in MLFF + ]: + calculator_name = MLFF(calculator_meta.split("MLFF.")[-1]) + + if calculator_name == MLFF.CHGNet: + from chgnet.model.dynamics import CHGNetCalculator + + calculator = CHGNetCalculator(**kwargs) + + elif calculator_name == MLFF.M3GNet: + import matgl + from matgl.ext.ase import PESCalculator + + potential = matgl.load_model("M3GNet-MP-2021.2.8-PES") + calculator = PESCalculator(potential, **kwargs) + + elif calculator_name == MLFF.MACE: + from mace.calculators import mace_mp + + calculator = mace_mp(**kwargs) + + elif calculator_name == MLFF.GAP: + from quippy.potential import Potential + + calculator = Potential(**kwargs) + + elif calculator_name == MLFF.Nequip: + from nequip.ase import NequIPCalculator + + calculator = NequIPCalculator.from_deployed_model(**kwargs) + + elif isinstance(calculator_meta, dict): + _calculator = MontyDecoder().decode(json.dumps(calculator_meta)) + calculator = _calculator(**kwargs) + + return calculator @contextmanager diff --git a/tests/forcefields/flows/test_elastic.py b/tests/forcefields/flows/test_elastic.py index 1f41671fcb..dea097eb95 100644 --- a/tests/forcefields/flows/test_elastic.py +++ b/tests/forcefields/flows/test_elastic.py @@ -11,9 +11,8 @@ def test_elastic_wf_with_mace(clean_dir, si_structure, test_dir): si_prim = SpacegroupAnalyzer(si_structure).get_primitive_standard_structure() model_path = f"{test_dir}/forcefields/mace/MACE.model" common_kwds = dict( - model=model_path, + calculator_kwargs={"model": model_path, "default_dtype": "float64"}, relax_kwargs={"fmax": 0.00001}, - model_kwargs={"default_dtype": "float64"}, ) flow = ElasticMaker( diff --git a/tests/forcefields/test_jobs.py b/tests/forcefields/test_jobs.py index 95527bb16c..ebd3837bf4 100644 --- a/tests/forcefields/test_jobs.py +++ b/tests/forcefields/test_jobs.py @@ -1,3 +1,4 @@ +from importlib.metadata import version as get_imported_version from pathlib import Path import pytest @@ -36,14 +37,17 @@ def test_chgnet_static_maker(si_structure): assert output1.output.ionic_steps[-1].magmoms is None assert output1.output.n_steps == 1 + assert output1.forcefield_version == get_imported_version("chgnet") + @pytest.mark.parametrize("relax_cell", [True, False]) def test_chgnet_relax_maker(si_structure: Structure, relax_cell: bool): # translate one atom to ensure a small number of relaxation steps are taken si_structure.translate_sites(0, [0, 0, 0.1]) + max_step = 25 # generate job - job = CHGNetRelaxMaker(steps=25, relax_cell=relax_cell).make(si_structure) + job = CHGNetRelaxMaker(steps=max_step, relax_cell=relax_cell).make(si_structure) # run the flow or job and ensure that it finished running successfully responses = run_locally(job, ensure_success=True) @@ -51,11 +55,14 @@ def test_chgnet_relax_maker(si_structure: Structure, relax_cell: bool): # validate job outputs output1 = responses[job.uuid][1].output assert isinstance(output1, ForceFieldTaskDocument) - assert output1.output.n_steps >= 12 if relax_cell: + assert not output1.is_force_converged + assert output1.output.n_steps == max_step + 2 assert output1.output.energy == approx(-10.62461, abs=1e-2) assert output1.output.ionic_steps[-1].magmoms[0] == approx(0.00251674, rel=1e-1) else: + assert output1.is_force_converged + assert output1.output.n_steps == 13 assert output1.output.energy == approx(-10.6274, rel=1e-2) assert output1.output.ionic_steps[-1].magmoms[0] == approx(0.00303572, rel=1e-2) @@ -75,13 +82,16 @@ def test_m3gnet_static_maker(si_structure): assert output1.output.energy == approx(-10.8, abs=0.2) assert output1.output.n_steps == 1 + assert output1.forcefield_version == get_imported_version("matgl") + def test_m3gnet_relax_maker(si_structure): # translate one atom to ensure a small number of relaxation steps are taken si_structure.translate_sites(0, [0, 0, 0.1]) # generate job - job = M3GNetRelaxMaker(steps=25).make(si_structure) + max_step = 25 + job = M3GNetRelaxMaker(steps=max_step).make(si_structure) # run the flow or job and ensure that it finished running successfully responses = run_locally(job, ensure_success=True) @@ -89,8 +99,9 @@ def test_m3gnet_relax_maker(si_structure): # validate job outputs output1 = responses[job.uuid][1].output assert isinstance(output1, ForceFieldTaskDocument) + assert output1.is_force_converged assert output1.output.energy == approx(-10.8, abs=0.2) - assert output1.output.n_steps == 27 + assert output1.output.n_steps == 24 mace_paths = pytest.mark.parametrize( @@ -109,9 +120,9 @@ def test_mace_static_maker(si_structure: Structure, test_dir: Path, model): # generate job # NOTE the test model is not trained on Si, so the energy is not accurate - job = MACEStaticMaker(model=model, task_document_kwargs=task_doc_kwargs).make( - si_structure - ) + job = MACEStaticMaker( + calculator_kwargs={"model": model}, task_document_kwargs=task_doc_kwargs + ).make(si_structure) # run the flow or job and ensure that it finished running successfully responses = run_locally(job, ensure_success=True) @@ -121,6 +132,7 @@ def test_mace_static_maker(si_structure: Structure, test_dir: Path, model): assert isinstance(output1, ForceFieldTaskDocument) assert output1.output.energy == approx(-0.068231, rel=1e-4) assert output1.output.n_steps == 1 + assert output1.forcefield_version == get_imported_version("mace-torch") @pytest.mark.parametrize("relax_cell", [True, False]) @@ -134,7 +146,7 @@ def test_mace_relax_maker( # generate job # NOTE the test model is not trained on Si, so the energy is not accurate job = MACERelaxMaker( - model=model, + calculator_kwargs={"model": model}, steps=25, optimizer_kwargs={"optimizer": "BFGSLineSearch"}, relax_cell=relax_cell, @@ -145,6 +157,7 @@ def test_mace_relax_maker( # validating the outputs of the job output1 = responses[job.uuid][1].output + assert output1.is_force_converged assert isinstance(output1, ForceFieldTaskDocument) if relax_cell: assert output1.output.energy == approx(-0.0526856, rel=1e-1) @@ -162,8 +175,10 @@ def test_gap_static_maker(si_structure: Structure, test_dir): # generate job # Test files have been provided by Yuanbin Liu (University of Oxford) job = GAPStaticMaker( - potential_args_str="IP GAP", - potential_param_file_name=test_dir / "forcefields" / "gap" / "gap_file.xml", + calculator_kwargs={ + "args_str": "IP GAP", + "param_filename": str(test_dir / "forcefields" / "gap" / "gap_file.xml"), + }, task_document_kwargs=task_doc_kwargs, ).make(si_structure) @@ -175,16 +190,20 @@ def test_gap_static_maker(si_structure: Structure, test_dir): assert isinstance(output1, ForceFieldTaskDocument) assert output1.output.energy == approx(-10.8523, rel=1e-4) assert output1.output.n_steps == 1 + assert output1.forcefield_version == get_imported_version("quippy-ase") def test_nequip_static_maker(sr_ti_o3_structure: Structure, test_dir: Path): + importorskip("nequip") task_doc_kwargs = {"ionic_step_data": ("structure", "energy")} # generate job # NOTE the test model is not trained on Si, so the energy is not accurate job = NequipStaticMaker( task_document_kwargs=task_doc_kwargs, - model_path=test_dir / "forcefields" / "nequip" / "nequip_ff_sr_ti_o3.pth", + calculator_kwargs={ + "model_path": test_dir / "forcefields" / "nequip" / "nequip_ff_sr_ti_o3.pth" + }, ).make(sr_ti_o3_structure) # run the flow or job and ensure that it finished running successfully @@ -195,12 +214,14 @@ def test_nequip_static_maker(sr_ti_o3_structure: Structure, test_dir: Path): assert isinstance(output1, ForceFieldTaskDocument) assert output1.output.energy == approx(-44.40017, rel=1e-4) assert output1.output.n_steps == 1 + assert output1.forcefield_version == get_imported_version("nequip") @pytest.mark.parametrize("relax_cell", [True, False]) def test_nequip_relax_maker( sr_ti_o3_structure: Structure, test_dir: Path, relax_cell: bool ): + importorskip("nequip") # translate one atom to ensure a small number of relaxation steps are taken sr_ti_o3_structure.translate_sites(0, [0, 0, 0.2]) # generate job @@ -208,7 +229,9 @@ def test_nequip_relax_maker( steps=25, optimizer_kwargs={"optimizer": "BFGSLineSearch"}, relax_cell=relax_cell, - model_path=test_dir / "forcefields" / "nequip" / "nequip_ff_sr_ti_o3.pth", + calculator_kwargs={ + "model_path": test_dir / "forcefields" / "nequip" / "nequip_ff_sr_ti_o3.pth" + }, ).make(sr_ti_o3_structure) # run the flow or job and ensure that it finished running successfully @@ -234,9 +257,13 @@ def test_gap_relax_maker(si_structure: Structure, test_dir: Path, relax_cell: bo # generate job # Test files have been provided by Yuanbin Liu (University of Oxford) + max_step = 25 job = GAPRelaxMaker( - potential_param_file_name=test_dir / "forcefields" / "gap" / "gap_file.xml", - steps=25, + calculator_kwargs={ + "args_str": "IP GAP", + "param_filename": str(test_dir / "forcefields" / "gap" / "gap_file.xml"), + }, + steps=max_step, relax_cell=relax_cell, ).make(si_structure) @@ -247,8 +274,10 @@ def test_gap_relax_maker(si_structure: Structure, test_dir: Path, relax_cell: bo output1 = responses[job.uuid][1].output assert isinstance(output1, ForceFieldTaskDocument) if relax_cell: + assert not output1.is_force_converged assert output1.output.energy == approx(-13.08492, rel=1e-2) - assert output1.output.n_steps == 27 + assert output1.output.n_steps == max_step + 2 else: + assert output1.is_force_converged assert output1.output.energy == approx(-10.8523, rel=1e-4) assert output1.output.n_steps == 17 diff --git a/tests/forcefields/test_md.py b/tests/forcefields/test_md.py new file mode 100644 index 0000000000..d8d6a1cc97 --- /dev/null +++ b/tests/forcefields/test_md.py @@ -0,0 +1,262 @@ +"""Tests for forcefield MD flows.""" + +from pathlib import Path + +import numpy as np +import pytest +from ase import units +from ase.io import Trajectory +from ase.md.verlet import VelocityVerlet +from jobflow import run_locally +from monty.serialization import loadfn +from pymatgen.analysis.structure_matcher import StructureMatcher +from pymatgen.core import Structure + +from atomate2.forcefields.md import ( + CHGNetMDMaker, + GAPMDMaker, + M3GNetMDMaker, + MACEMDMaker, + NequipMDMaker, +) + +name_to_maker = { + "CHGNet": CHGNetMDMaker, + "M3GNet": M3GNetMDMaker, + "MACE": MACEMDMaker, + "GAP": GAPMDMaker, + "Nequip": NequipMDMaker, +} + + +@pytest.mark.parametrize("ff_name", ["CHGNet", "M3GNet", "MACE", "GAP", "Nequip"]) +def test_ml_ff_md_maker(ff_name, si_structure, sr_ti_o3_structure, test_dir, clean_dir): + n_steps = 5 + + ref_energies_per_atom = { + "CHGNet": -5.280157089233398, + "M3GNet": -5.387282371520996, + "MACE": -5.311369895935059, + "GAP": -5.391255755606209, + "Nequip": -8.84670181274414, + } + + # ASE can slightly change tolerances on structure positions + matcher = StructureMatcher() + + calculator_kwargs = {} + unit_cell_structure = si_structure.copy() + if ff_name == "GAP": + calculator_kwargs = { + "args_str": "IP GAP", + "param_filename": str(test_dir / "forcefields" / "gap" / "gap_file.xml"), + } + elif ff_name == "Nequip": + calculator_kwargs = { + "model_path": test_dir / "forcefields" / "nequip" / "nequip_ff_sr_ti_o3.pth" + } + unit_cell_structure = sr_ti_o3_structure.copy() + + structure = unit_cell_structure.to_conventional() * (2, 2, 2) + + job = name_to_maker[ff_name]( + n_steps=n_steps, + traj_file="md_traj.json.gz", + traj_file_fmt="pmg", + task_document_kwargs={"store_trajectory": "partial"}, + calculator_kwargs=calculator_kwargs, + ).make(structure) + response = run_locally(job, ensure_success=True) + task_doc = response[next(iter(response))][1].output + + # Check that energies are reasonably close to reference values + assert task_doc.output.energy / len(structure) == pytest.approx( + ref_energies_per_atom[ff_name], abs=0.1 + ) + + # Check that we have the right number of MD steps + # ASE logs the initial structure energy, and doesn't count this as an MD step + assert matcher.fit(task_doc.output.ionic_steps[0].structure, structure) + assert len(task_doc.output.ionic_steps) == n_steps + 1 + + # Check that the ionic steps have the expected physical properties + assert all( + key in step.model_dump() + for key in ("energy", "forces", "stress", "structure") + for step in task_doc.output.ionic_steps + ) + + # Check that the trajectory has expected physical properties + assert task_doc.included_objects == ["trajectory"] + assert len(task_doc.forcefield_objects["trajectory"]) == n_steps + 1 + assert all( + key in step + for key in ("energy", "forces", "stress", "velocities", "temperature") + for step in task_doc.forcefield_objects["trajectory"].frame_properties + ) + + +@pytest.mark.parametrize("traj_file", ["trajectory.json.gz", "atoms.traj"]) +def test_traj_file(traj_file, si_structure, clean_dir, ff_name="CHGNet"): + n_steps = 5 + + # Check that traj file written to disk is consistent with trajectory + # stored to the task document + + if ".json.gz" in traj_file: + traj_file_fmt = "pmg" + traj_file_loader = loadfn + else: + traj_file_fmt = "ase" + traj_file_loader = Trajectory + + structure = si_structure.to_conventional() * (2, 2, 2) + job = name_to_maker[ff_name]( + n_steps=n_steps, + traj_file=traj_file, + traj_file_fmt=traj_file_fmt, + ).make(structure) + response = run_locally(job, ensure_success=True) + task_doc = response[next(iter(response))][1].output + + traj_from_file = traj_file_loader(traj_file) + + assert len(traj_from_file) == n_steps + 1 + + if traj_file_fmt == "pmg": + assert all( + np.all( + traj_from_file.frame_properties[idx][key] + == task_doc.forcefield_objects["trajectory"] + .frame_properties[idx] + .get(key) + ) + for key in ("energy", "temperature", "forces", "velocities") + for idx in range(n_steps + 1) + ) + elif traj_file_fmt == "ase": + traj_as_dict = [ + { + "energy": traj_from_file[idx].get_potential_energy(), + "temperature": traj_from_file[idx].get_temperature(), + "forces": traj_from_file[idx].get_forces(), + "velocities": traj_from_file[idx].get_velocities(), + } + for idx in range(1, n_steps + 1) + ] + assert all( + np.all( + traj_as_dict[idx - 1][key] + == task_doc.forcefield_objects["trajectory"] + .frame_properties[idx] + .get(key) + ) + for key in ("energy", "temperature", "forces", "velocities") + for idx in range(1, n_steps + 1) + ) + + +def test_nve_and_dynamics_obj(si_structure: Structure, test_dir: Path): + # This test serves two purposes: + # 1. Test the NVE calculator + # 2. Test specifying the `dynamics` kwarg of the `MDMaker` as a str + # vs. as an ase.md.md.MolecularDyanmics object + + output = {} + for k in ["from_str", "from_dyn"]: + if k == "from_str": + dyn = "velocityverlet" + elif k == "from_dyn": + dyn = VelocityVerlet + + job = CHGNetMDMaker( + ensemble="nve", + dynamics=dyn, + n_steps=50, + traj_file=None, + ).make(si_structure) + + response = run_locally(job, ensure_success=True) + output[k] = response[next(iter(response))][1].output + + # check that energy and volume are constants + assert output["from_str"].output.energy == pytest.approx(-10.6, abs=0.1) + assert output["from_str"].output.structure.volume == pytest.approx( + output["from_str"].input.structure.volume + ) + assert all( + step.energy == pytest.approx(-10.6, abs=0.1) + for step in output["from_str"].output.ionic_steps + ) + + # ensure that output is consistent if molecular dynamics object is specified + # as str or as MolecularDynamics object + for attr in ("energy", "forces", "stress", "structure"): + vals = {k: getattr(output[k].output, attr) for k in ("from_str", "from_dyn")} + if isinstance(vals["from_str"], float): + assert vals["from_str"] == pytest.approx(vals["from_dyn"]) + elif isinstance(vals["from_str"], Structure): + assert vals["from_str"] == vals["from_dyn"] + else: + assert all( + vals["from_str"][i][j] == pytest.approx(vals["from_dyn"][i][j]) + for i in range(len(vals["from_str"])) + for j in range(len(vals["from_str"][i])) + ) + + +@pytest.mark.parametrize("ff_name", ["CHGNet"]) +def test_temp_schedule(ff_name, si_structure, clean_dir): + n_steps = 50 + temp_schedule = [300, 3000] + + structure = si_structure.to_conventional() * (2, 2, 2) + + job = name_to_maker[ff_name]( + n_steps=n_steps, + traj_file=None, + dynamics="nose-hoover", + temperature=temp_schedule, + ase_md_kwargs=dict(ttime=50.0 * units.fs, pfactor=None), + ).make(structure) + response = run_locally(job, ensure_success=True) + task_doc = response[next(iter(response))][1].output + + temp_history = [ + step["temperature"] + for step in task_doc.forcefield_objects["trajectory"].frame_properties + ] + + assert temp_history[-1] > temp_schedule[0] + + +@pytest.mark.parametrize("ff_name", ["CHGNet"]) +def test_press_schedule(ff_name, si_structure, clean_dir): + n_steps = 20 + press_schedule = [0, 10] # kBar + + structure = si_structure.to_conventional() * (3, 3, 3) + + job = name_to_maker[ff_name]( + ensemble="npt", + n_steps=n_steps, + traj_file="md_traj.json.gz", + traj_file_fmt="pmg", + dynamics="nose-hoover", + pressure=press_schedule, + ase_md_kwargs=dict( + ttime=50.0 * units.fs, + pfactor=(75.0 * units.fs) ** 2 * units.GPa, + ), + ).make(structure) + run_locally(job, ensure_success=True) + # task_doc = response[next(iter(response))][1].output + + traj_from_file = loadfn("md_traj.json.gz") + + stress_history = [ + sum(traj_from_file.frame_properties[idx]["stress"][:3]) / 3.0 + for idx in range(len(traj_from_file)) + ] + + assert stress_history[-1] < stress_history[0] diff --git a/tests/forcefields/test_utils.py b/tests/forcefields/test_utils.py index 4c16faa43e..55f113b926 100644 --- a/tests/forcefields/test_utils.py +++ b/tests/forcefields/test_utils.py @@ -6,7 +6,13 @@ from numpy.testing import assert_allclose from pymatgen.io.ase import AseAtomsAdaptor -from atomate2.forcefields.utils import FrechetCellFilter, Relaxer, TrajectoryObserver +from atomate2.forcefields import MLFF +from atomate2.forcefields.utils import ( + FrechetCellFilter, + Relaxer, + TrajectoryObserver, + ase_calculator, +) def test_safe_import(): @@ -41,14 +47,14 @@ def test_trajectory_observer(si_structure, test_dir, tmp_dir): ] assert_allclose(traj.stresses[0], expected_stresses, atol=1e-8) - save_file_name = "log_file.json.gz" + save_file_name = "log_file.traj" traj.save(save_file_name) assert os.path.isfile(save_file_name) @pytest.mark.parametrize( ("optimizer", "traj_file"), - [("BFGS", None), (None, None), (BFGS, "log_file.json.gz")], + [("BFGS", None), (None, None), (BFGS, "log_file.traj")], ) def test_relaxer(si_structure, test_dir, tmp_dir, optimizer, traj_file): if FrechetCellFilter: @@ -109,13 +115,28 @@ def test_relaxer(si_structure, test_dir, tmp_dir, optimizer, traj_file): for key in expected_lattice } == pytest.approx(expected_lattice) - assert relax_output["trajectory"].energies[-1] == pytest.approx(expected_energy) + assert relax_output["trajectory"].frame_properties[-1]["energy"] == pytest.approx( + expected_energy + ) - assert_allclose(relax_output["trajectory"].forces[-1], expected_forces, atol=1e-8) + assert_allclose( + relax_output["trajectory"].frame_properties[-1]["forces"], expected_forces + ) assert_allclose( - relax_output["trajectory"].stresses[-1], expected_stresses, atol=1e-8 + relax_output["trajectory"].frame_properties[-1]["stress"], expected_stresses ) if traj_file: assert os.path.isfile(traj_file) + + +def test_ext_load(): + forcefield_to_callable = { + "CHGNet": {"@module": "chgnet.model.dynamics", "@callable": "CHGNetCalculator"}, + "MACE": {"@module": "mace.calculators", "@callable": "mace_mp"}, + } + for forcefield in ["CHGNet", "MACE"]: + calc_from_decode = ase_calculator(forcefield_to_callable[forcefield]) + calc_from_preset = ase_calculator(f"{MLFF(forcefield)}") + assert isinstance(calc_from_decode, type(calc_from_preset))