Skip to content

Commit

Permalink
Sample: Using Dask with ESPREsSo (#4781)
Browse files Browse the repository at this point in the history
This was produced as a side project of learning Dask, but might be useful for others.
  • Loading branch information
kodiakhq[bot] authored Oct 31, 2023
2 parents c31f701 + a8d4806 commit b70d9e8
Show file tree
Hide file tree
Showing 8 changed files with 430 additions and 0 deletions.
10 changes: 10 additions & 0 deletions doc/bibliography.bib
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,16 @@ @Article{reed92a
publisher={AIP Publishing}
}

@InProceedings{rocklin15a,
author = {Rocklin, Matthew},
title = {{D}ask: Parallel Computation with Blocked algorithms and Task Scheduling},
booktitle = {Proceedings of the 14\textsuperscript{th} {P}ython in {S}cience {C}onference},
year = {2015},
editor = {Huff, Kathryn and Bergstra, James},
pages = {126--132},
doi = {10.25080/Majora-7b98e3ed-013},
}

@Book{rubinstein03a,
title = {Polymer Physics},
publisher = {Oxford University Press},
Expand Down
1 change: 1 addition & 0 deletions doc/sphinx/samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def get_docstring(filenames):
# extract docstrings
samples = [x for x in os.listdir(samples_dir) if x.endswith('.py')]
samples += ['immersed_boundary/sampleImmersedBoundary.py',
'high_throughput_with_dask/run_pv.py',
'object_in_fluid/motivation.py']
docstrings = get_docstring(samples)

Expand Down
79 changes: 79 additions & 0 deletions samples/high_throughput_with_dask/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Introduction

This sample illustrates how to run a large amount of short ESPResSo simulations
with Dask. Dask is a parallel computing library in Python that enables efficient
handling of large datasets and computation tasks.
Note that this sample is not meant to produce meaningful physics results.
The sample consists of the following parts:

- `espresso_dask.py`: contains helper functions that handle running ESPResSo
within Dask and communicating data between Dask and ESPResSo
- `lj_pressure.py`: simulation script which obtains the average pressure
for a Lennard-Jones liquid at a given volume fraction
- `run_pv.py`: Uses Dask to run the simulation script at various volume
fractions and obtain a pressure vs volume fraction curve.
- `test_dask_espresso.py`: corresponding unit tests, to be run with `pytest`
- `echo.py`: Used to mock an ESPResSo simulation for the unit tests

## How to Use

Note: It is not possible to use ESPResSo with `dask.distributed.LocalCluster`.
Instead, follow the procedure described below:

1. Move to the sample directory
```bash
cd samples/high_throughput_with_dask
```
1. Open `run_pv.py` in an editor and adapt the `PYPRESSO` variable
to the correct path to `pypresso`
1. Set the `PYTHONPATH` environment variable such that it includes
the directory in which `dask_espresso.py` resides:
```bash
export PYTHONPATH="${PYTHONPATH:+$PYTHONPATH:}$(realpath .)"
```
1. Start the Dask scheduler
```bash
dask scheduler &
```
1. Note the address of the scheduler (e.g., `tcp://127.0.0.1:8786`)
1. Launch a few workers using the correct scheduler address:
```bash
for i in {1..5}; do dask worker SCHEDULER_ADDRESS & done
```
1. Run `python3 run_pv.py SCHEDULER_ADDRESS`, again inserting the scheduler address from above
1. Use `fg` and Ctrl-C to shut down the Dask workers and scheduler,
or use `pkill "dask"` if you don't have any other Dask scheduler
running in the background.
Note that Dask can also be used on compute clusters with HTCondor and Slurm.
## Technical Notes
- Since currently only one ESPResSo instance can be used in a Python script,
ESPResSo is run as a separate process. This is accomplished by the
`dask_espresso_task` function in `dask_espresso.py`.
- Also, the data transfer between Dask and ESPResSo has to be handled such that
it is safe for inter-process communication. This is achieved via the `pickle`
and `base64` Python modules. Encoding and decoding functions can be found in
`dask_espresso.py`
- The communication happens via the standard input and output of the simulation
script. Therefore, it is essential not to use simple `print()` calls in the
simulation script. Instead, use the `logging` module for status messages.
These will go to the standard error stream.
- To use this sample for your own simulations:
- Use `dask_espresso.py` as is.
- Adapt `run_pv.py` to run simulations with the parameters you need.
The keyword arguments passed to `dask_espresso_task()` will be passed
as a dictionary to the simulation.
- Use `data = dask_espresso.get_data_from_stdin()` to get the parameters
at the beginning of the simulation script.
- Use `print(dask_espresso.encode_transport_data(result))` at the end
of your simulation to pass the result to Dask.
- The simulation parameters and results can be any Python object that
can be safely pickled and do not require additional context. Basic data
types (int, float, string, list, dict) as well as numpy arrays work,
whereas objects that require additional context to be valid do not
(e.g. file objects and ESPResSo particles).
- To test your simulation script, including the transfer of parameters
and results outside Dask, you can also use
the `dask_espresso.dask_espresso_task.py` function.
77 changes: 77 additions & 0 deletions samples/high_throughput_with_dask/dask_espresso.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#
# Copyright (C) 2023 The ESPResSo project
#
# This file is part of ESPResSo.
#
# ESPResSo is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ESPResSo is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

"""Helper functions to use ESPResSo with Dask."""

import pickle
import base64
import sys
import subprocess
import logging
import dask


def encode_transport_data(data):
"""
Use ``pickle`` and ``base64`` to convert the provided data to a string
which can be passed safely between the Dask scheduler, worker and ESPResSo.
"""
return base64.b64encode(pickle.dumps(data)).decode("utf-8")


def decode_transport_data(encoded_data):
"""
Convert the transport data back to a Python object via ``base64``
and ``pickle``.
"""
pickle_data = base64.b64decode(encoded_data)
return pickle.loads(pickle_data)


def get_data_from_stdin():
return decode_transport_data(sys.stdin.read())


@dask.delayed
def dask_espresso_task(pypresso, script, **kwargs):
"""
Run ESPResSo asynchronously as a Dask task.
pypresso: :obj:`str`
Path to pypresso
script: :obj:`str`
Simulation script to run with pypresso
kwargs:
The keyword arguments are passed encoded and sent to
the standard input of the simulation script.
Use ``data = get_data_from_stdin()`` to obtain it.
"""

logger = logging.getLogger(__name__)
encoded_data = encode_transport_data(kwargs)
espresso = subprocess.Popen([pypresso, script],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
espresso.stdin.write(encoded_data)
out, err = espresso.communicate()
if err != "":
logger.warning("STDERR output from ESPResSo\n", err)
return decode_transport_data(out)
29 changes: 29 additions & 0 deletions samples/high_throughput_with_dask/echo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Copyright (C) 2023 The ESPResSo project
#
# This file is part of ESPResSo.
#
# ESPResSo is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ESPResSo is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

"""
This is part of the unit tests. It reads encoded simulation data from stdin,
decodes it, adds ``processed=True`` and outputs the encoded result to stdout.
"""

import dask_espresso as de
data = de.get_data_from_stdin()
data.update(processed=True)

print(de.encode_transport_data(data))
120 changes: 120 additions & 0 deletions samples/high_throughput_with_dask/lj_pressure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#
# Copyright (C) 2013-2023 The ESPResSo project
#
# This file is part of ESPResSo.
#
# ESPResSo is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ESPResSo is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

"""
Obtain the average pressure of a Lennard-Jones liquid.
For use with ``dask_espresso``.
Adapted from :file:`samples/lj_liquid.py`.
"""

import espressomd
import numpy as np

import dask_espresso as de


# Note: Avoid print() in this script, as the standard output is used
# for data transfer to Dask. Use the logging module for status messages.
import logging
logger = logging.getLogger(__name__)


# Get parameters from Dask via the standard input stream
params = de.get_data_from_stdin()

logger.info("Parameters:", params)
n_particles = int(params["n_particles"])
volume_fraction = float(params["volume_fraction"])
n_steps = int(params["n_steps"])

required_features = ["LENNARD_JONES"]
espressomd.assert_features(required_features)

rng = np.random.default_rng()

# System
#############################################################

# Interaction parameters (Lennard-Jones)
#############################################################

lj_eps = 1.0 # LJ epsilon
lj_sig = 1.0 # particle diameter
lj_cut = lj_sig * 2**(1. / 6.) # cutoff distance

# System parameters
#############################################################
# volume of N spheres with radius r: N * (4/3*pi*r^3)
box_l = (n_particles * 4. / 3. * np.pi * (lj_sig / 2.)**3
/ volume_fraction)**(1. / 3.)

# System
#############################################################
system = espressomd.System(box_l=3 * [box_l])

# Integration parameters
#############################################################
system.time_step = 0.01
system.cell_system.skin = 0.4

# Interaction setup
#############################################################
system.non_bonded_inter[0, 0].lennard_jones.set_params(
epsilon=lj_eps, sigma=lj_sig, cutoff=lj_cut, shift="auto")

# Particle setup
#############################################################

system.part.add(pos=rng.random((n_particles, 3)) * system.box_l)

# Warmup Integration
#############################################################

# warmup
logger.info("Warmup")
system.integrator.set_steepest_descent(
f_max=0, max_displacement=0.01, gamma=1E-3)
system.integrator.run(1)
while np.any(np.abs(system.part.all().f) * system.time_step > .1):
system.integrator.run(10)

system.integrator.set_vv()
system.thermostat.set_langevin(kT=1.0, gamma=1.0, seed=42)

system.integrator.run(1000)
min_skin = 0.2
max_skin = 1.0
# tuning and equilibration
logger.info("Tune skin: {:.3f}".format(system.cell_system.tune_skin(
min_skin=min_skin, max_skin=max_skin, tol=0.05, int_steps=100)))
system.integrator.run(1000)

logger.info("Measuring")

pressures = np.zeros(n_steps)
for i in range(n_steps):
system.integrator.run(10)
pressures[i] = system.analysis.pressure()["total"]

# Put the simulation results into a dictionary
result = {"pressure": np.mean(pressures),
"pressure_std_dev": np.std(pressures)}

# Output the results for Dask via the standard output stream
print(de.encode_transport_data(result))
Loading

0 comments on commit b70d9e8

Please sign in to comment.