diff --git a/yt/frontends/ramses/data_structures.py b/yt/frontends/ramses/data_structures.py index 3c44ecfa6d7..15042099f7f 100644 --- a/yt/frontends/ramses/data_structures.py +++ b/yt/frontends/ramses/data_structures.py @@ -18,6 +18,7 @@ from yt.utilities.cython_fortran_utils import FortranFile as fpu from yt.utilities.lib.cosmology_time import friedman from yt.utilities.on_demand_imports import _f90nml as f90nml +from yt.utilities.parallel_tools.parallel_analysis_interface import parallel_objects from yt.utilities.physical_constants import kb, mp from yt.utilities.physical_ratios import cm_per_mpc @@ -548,19 +549,36 @@ def _initialize_oct_handler(self): if self.ds._bbox is not None: cpu_list = get_cpu_list(self.dataset, self.dataset._bbox) else: - cpu_list = range(self.dataset["ncpu"]) + cpu_list = list(range(self.dataset["ncpu"])) + + if len(cpu_list) < self.comm.size: + mylog.error( + "The number of MPI tasks is larger than the number of files to read " + "on disk. This is not supported. Try running with at most " + f"{len(cpu_list)} MPI tasks." + ) + self.comm.comm.Abort(2) + + # Important note: we need to use the method="sequential" + # so that the first MPI task gets domains 1, 2, ..., n + # and the second task gets domains n+1, n+2, ..., 2n, etc. + self.domains = [ + RAMSESDomainFile(self.dataset, i + 1) + for i in parallel_objects(cpu_list, method="sequential") + ] - self.domains = [RAMSESDomainFile(self.dataset, i + 1) for i in cpu_list] total_octs = sum( dom.local_oct_count for dom in self.domains # + dom.ngridbound.sum() ) + max_level_loc = max(dom.max_level for dom in self.domains) + force_max_level, convention = self.ds._force_max_level if convention == "yt": force_max_level += self.ds.min_level + 1 - self.max_level = min( - force_max_level, max(dom.max_level for dom in self.domains) - ) - self.num_grids = total_octs + + max_level_loc = min(force_max_level, max_level_loc) + self.max_level = self.comm.mpi_allreduce(max_level_loc, op="max") + self.num_grids = self.comm.mpi_allreduce(total_octs, op="sum") def _detect_output_fields(self): dsl = set() @@ -622,6 +640,7 @@ def _chunk_io(self, dobj, cache=True, local_only=False): def _initialize_level_stats(self): levels = sum(dom.level_count for dom in self.domains) + levels = self.comm.mpi_allreduce(levels, op="sum") desc = {"names": ["numcells", "level"], "formats": ["int64"] * 2} max_level = self.dataset.min_level + self.dataset.max_level + 2 self.level_stats = blankRecordArray(desc, max_level) @@ -644,6 +663,8 @@ def _get_particle_type_counts(self): count = fh.local_particle_count npart[fh.ptype] += count + npart = self.comm.par_combine_object(npart, op="sum") + return npart def print_stats(self): diff --git a/yt/frontends/ramses/field_handlers.py b/yt/frontends/ramses/field_handlers.py index 10860b5afd5..edd0f612f7b 100644 --- a/yt/frontends/ramses/field_handlers.py +++ b/yt/frontends/ramses/field_handlers.py @@ -272,7 +272,7 @@ def offset(self): self.domain.domain_id, self.parameters["nvar"], self.domain.amr_header, - skip_len=nvars * 8, + Nskip=nvars * 8, ) self._offset = offset diff --git a/yt/frontends/ramses/io.py b/yt/frontends/ramses/io.py index a74a24cf1fa..f6e503dd793 100644 --- a/yt/frontends/ramses/io.py +++ b/yt/frontends/ramses/io.py @@ -176,11 +176,12 @@ def _read_fluid_selection(self, chunks, selector, fields, size): # Set of field types ftypes = {f[0] for f in fields} + for chunk in chunks: # Gather fields by type to minimize i/o operations for ft in ftypes: # Get all the fields of the same type - field_subs = list(filter(lambda f, ft=ft: f[0] == ft, fields)) + field_subs = [field for field in fields if field[0] == ft] # Loop over subsets for subset in chunk.objs: @@ -209,12 +210,17 @@ def _read_fluid_selection(self, chunks, selector, fields, size): d.max(), d.size, ) - tr[(ft, f)].append(d) + tr[ft, f].append(np.atleast_1d(d)) + d = {} for field in fields: - d[field] = np.concatenate(tr.pop(field)) + tmp = tr.pop(field, None) + if tmp: + d[field] = np.concatenate(tmp) + else: + d[field] = np.empty(0, dtype="=f8") - return d + return self.ds.index.comm.par_combine_object(d, op="cat") def _read_particle_coords(self, chunks, ptf): pn = "particle_position_%s" @@ -258,6 +264,7 @@ def _read_particle_fields(self, chunks, ptf, selector): yield (ptype, field), data else: + tr = defaultdict(list) for chunk in chunks: for subset in chunk.objs: rv = self._read_particle_subset(subset, fields) @@ -270,7 +277,19 @@ def _read_particle_fields(self, chunks, ptf, selector): mask = [] for field in field_list: data = np.asarray(rv.pop((ptype, field))[mask], "=f8") - yield (ptype, field), data + tr[ptype, field].append(np.atleast_1d(data)) + + d = {} + for ptype, field_list in sorted(ptf.items()): + for field in field_list: + tmp = tr.pop((ptype, field), None) + if tmp: + d[ptype, field] = np.concatenate(tmp) + else: + d[ptype, field] = np.empty(0, dtype="=f8") + + d = self.ds.index.comm.par_combine_object(d, op="cat") + yield from d.items() def _read_particle_subset(self, subset, fields): """Read the particle files.""" diff --git a/yt/frontends/ramses/io_utils.pyx b/yt/frontends/ramses/io_utils.pyx index 7becee63486..46ed160b28d 100644 --- a/yt/frontends/ramses/io_utils.pyx +++ b/yt/frontends/ramses/io_utils.pyx @@ -14,6 +14,10 @@ ctypedef np.int32_t INT32_t ctypedef np.int64_t INT64_t ctypedef np.float64_t DOUBLE_t +cdef int INT32_SIZE = sizeof(np.int32_t) +cdef int INT64_SIZE = sizeof(np.int64_t) +cdef int DOUBLE_SIZE = sizeof(np.float64_t) + @cython.cpow(True) @cython.boundscheck(False) @cython.wraparound(False) @@ -76,19 +80,28 @@ def read_amr(FortranFile f, dict headers, f.skip(skip_len) # Note that we're adding *grids*, not individual cells. if ilevel >= min_level: - n = oct_handler.add(icpu + 1, ilevel - min_level, pos[:ng, :], - count_boundary = 1) + n = oct_handler.add( + icpu + 1, + ilevel - min_level, + pos[:ng, :], + skip_boundary = 1, + count_boundary = 1, + ) if n > 0: max_level = max(ilevel - min_level, max_level) return max_level + +cdef inline int skip_len(int Nskip, int record_len) noexcept nogil: + return Nskip * (record_len * DOUBLE_SIZE + INT64_SIZE) + @cython.cpow(True) @cython.boundscheck(False) @cython.wraparound(False) @cython.cdivision(True) @cython.nonecheck(False) -cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t nvar, dict headers, int skip_len): +cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t nvar, dict headers, int Nskip): cdef np.ndarray[np.int64_t, ndim=2] offset, level_count cdef INT64_t ndim, twotondim, nlevelmax, n_levels, nboundary, ncpu, ncpu_and_bound @@ -104,8 +117,8 @@ cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t n ncpu_and_bound = nboundary + ncpu twotondim = 2**ndim - if skip_len == -1: - skip_len = twotondim * nvar + if Nskip == -1: + Nskip = twotondim * nvar # It goes: level, CPU, 8-variable (1 oct) offset = np.full((ncpu_and_bound, n_levels), -1, dtype=np.int64) @@ -130,7 +143,7 @@ cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t n if ilevel >= min_level: offset_view[icpu, ilevel - min_level] = f.tell() level_count_view[icpu, ilevel - min_level] = file_ncache - f.skip(skip_len) + f.seek(skip_len(Nskip, file_ncache), 1) return offset, level_count @@ -154,21 +167,38 @@ def fill_hydro(FortranFile f, cdef dict tmp cdef str field cdef INT64_t twotondim - cdef int ilevel, icpu, ifield, nfields, nlevels, nc, ncpu_selected - cdef np.ndarray[np.uint8_t, ndim=1] mask + cdef int ilevel, icpu, nlevels, nc, ncpu_selected, nfields_selected + cdef int i, j, ii twotondim = 2**ndim - nfields = len(all_fields) + nfields_selected = len(fields) nlevels = offsets.shape[1] ncpu_selected = len(cpu_enumerator) - mask = np.array([(field in fields) for field in all_fields], dtype=np.uint8) + cdef np.int64_t[::1] cpu_list = np.asarray(cpu_enumerator, dtype=np.int64) + + cdef np.int64_t[::1] jumps = np.zeros(nfields_selected + 1, dtype=np.int64) + cdef int jump_len + cdef np.ndarray[np.float64_t, ndim=3] buffer + jump_len = 0 + j = 0 + for i, field in enumerate(all_fields): + if field in fields: + jumps[j] = jump_len + j += 1 + jump_len = 0 + else: + jump_len += 1 + jumps[j] = jump_len + + cdef int nc_largest = 0 # Loop over levels for ilevel in range(nlevels): # Loop over cpu domains - for icpu in cpu_enumerator: + for ii in range(ncpu_selected): + icpu = cpu_list[ii] nc = level_count[icpu, ilevel] if nc == 0: continue @@ -176,19 +206,35 @@ def fill_hydro(FortranFile f, if offset == -1: continue f.seek(offset) - tmp = {} # Initialize temporary data container for io # note: we use Fortran ordering to reflect the in-file ordering - for field in all_fields: - tmp[field] = np.empty((nc, twotondim), dtype="float64", order='F') + if nc > nc_largest: + nc_largest = nc + buffer = np.empty((nc, twotondim, nfields_selected), dtype="float64", order='F') + jump_len = 0 for i in range(twotondim): # Read the selected fields - for ifield in range(nfields): - if not mask[ifield]: - f.skip() - else: - tmp[all_fields[ifield]][:, i] = f.read_vector('d') # i-th cell + for j in range(nfields_selected): + jump_len += jumps[j] + if jump_len > 0: + f.seek(skip_len(jump_len, nc), 1) + jump_len = 0 + f.read_vector_inplace('d', &buffer[0, i, j]) + + jump_len += jumps[nfields_selected] + + # In principle, we may be left with some fields to skip + # but since we're doing an absolute seek at the beginning of + # the loop on CPUs, we can spare one seek here + ## if jump_len > 0: + ## f.seek(skip_len(jump_len, nc), 1) + + # Alias buffer into dictionary + tmp = {} + for i, field in enumerate(fields): + tmp[field] = buffer[:, :, i] + if ncpu_selected > 1: oct_handler.fill_level_with_domain( ilevel, levels, cell_inds, file_inds, domains, tr, tmp, domain=icpu+1) diff --git a/yt/geometry/oct_container.pxd b/yt/geometry/oct_container.pxd index 5ac9f9318a7..8b1a7be4595 100644 --- a/yt/geometry/oct_container.pxd +++ b/yt/geometry/oct_container.pxd @@ -82,6 +82,41 @@ cdef class OctreeContainer: cdef public object fill_style cdef public int max_level + cpdef int add( + self, + const int curdom, + const int curlevel, + const np.float64_t[:, ::1] pos, + int skip_boundary = ?, + int count_boundary = ?, + np.uint64_t[::1] levels = ?, + np.int64_t[::1] file_inds = ?, + np.uint8_t[::1] domain_inds = ?, + ) + + cpdef void fill_level( + self, + const int level, + const np.uint8_t[:] levels, + const np.uint8_t[:] cell_inds, + const np.int64_t[:] file_inds, + dict dest_fields, + dict source_fields, + np.int64_t offset = ? + ) + cpdef int fill_level_with_domain( + self, + const int level, + const np.uint8_t[:] levels, + const np.uint8_t[:] cell_inds, + const np.int64_t[:] file_inds, + const np.int32_t[:] domains, + dict dest_fields, + dict source_fields, + const np.int32_t domain, + np.int64_t offset = ? + ) + cdef class SparseOctreeContainer(OctreeContainer): cdef OctKey *root_nodes cdef void *tree_root diff --git a/yt/geometry/oct_container.pyx b/yt/geometry/oct_container.pyx index 7d3118cb252..6c7b8bea33d 100644 --- a/yt/geometry/oct_container.pyx +++ b/yt/geometry/oct_container.pyx @@ -575,12 +575,17 @@ cdef class OctreeContainer: @cython.boundscheck(False) @cython.wraparound(False) @cython.cdivision(True) - def add(self, int curdom, int curlevel, - np.ndarray[np.float64_t, ndim=2] pos, - int skip_boundary = 1, - int count_boundary = 0, - np.ndarray[np.uint64_t, ndim=1, cast=True] levels = None - ): + cpdef int add( + self, + const int curdom, + const int curlevel, + const np.float64_t[:, ::1] pos, + int skip_boundary = 1, + int count_boundary = 0, + np.uint64_t[::1] levels = None, + np.int64_t[::1] file_inds = None, + np.uint8_t[::1] domain_inds = None + ): # In this function, if we specify curlevel = -1, then we query the # (optional) levels array for the oct level. cdef int no, p, i @@ -742,12 +747,16 @@ cdef class OctreeContainer: @cython.boundscheck(False) @cython.wraparound(False) @cython.cdivision(True) - def fill_level(self, int level, - np.ndarray[np.uint8_t, ndim=1] levels, - np.ndarray[np.uint8_t, ndim=1] cell_inds, - np.ndarray[np.int64_t, ndim=1] file_inds, - dest_fields, source_fields, - np.int64_t offset = 0): + cpdef void fill_level( + self, + const int level, + const np.uint8_t[:] levels, + const np.uint8_t[:] cell_inds, + const np.int64_t[:] file_inds, + dict dest_fields, + dict source_fields, + np.int64_t offset = 0 + ): cdef np.ndarray[np.float64_t, ndim=2] source cdef np.ndarray[np.float64_t, ndim=1] dest cdef int i, lvl @@ -824,17 +833,18 @@ cdef class OctreeContainer: @cython.boundscheck(False) @cython.wraparound(False) @cython.cdivision(True) - def fill_level_with_domain( - self, int level, - np.uint8_t[:] levels, - np.uint8_t[:] cell_inds, - np.int64_t[:] file_inds, - np.int32_t[:] domains, - dict dest_fields, - dict source_fields, - np.int32_t domain, - np.int64_t offset = 0 - ): + cpdef int fill_level_with_domain( + self, + const int level, + const np.uint8_t[:] levels, + const np.uint8_t[:] cell_inds, + const np.int64_t[:] file_inds, + const np.int32_t[:] domains, + dict dest_fields, + dict source_fields, + const np.int32_t domain, + np.int64_t offset = 0 + ): """Similar to fill_level but accepts a domain argument. This is particularly useful for frontends that have buffer zones at CPU boundaries. diff --git a/yt/utilities/cython_fortran_utils.pxd b/yt/utilities/cython_fortran_utils.pxd index ab6c2a0dc59..71c35fe1210 100644 --- a/yt/utilities/cython_fortran_utils.pxd +++ b/yt/utilities/cython_fortran_utils.pxd @@ -13,6 +13,7 @@ cdef class FortranFile: cdef INT64_t get_size(self, str dtype) cpdef INT32_t read_int(self) except? -1 cpdef np.ndarray read_vector(self, str dtype) + cdef int read_vector_inplace(self, str dtype, void *data) cpdef INT64_t tell(self) except -1 cpdef INT64_t seek(self, INT64_t pos, INT64_t whence=*) except -1 cpdef void close(self) diff --git a/yt/utilities/cython_fortran_utils.pyx b/yt/utilities/cython_fortran_utils.pyx index aa26add5c16..fc2008fc419 100644 --- a/yt/utilities/cython_fortran_utils.pyx +++ b/yt/utilities/cython_fortran_utils.pyx @@ -142,6 +142,38 @@ cdef class FortranFile: return data + cdef int read_vector_inplace(self, str dtype, void *data): + """Reads a record from the file. + + Parameters + ---------- + d : data type + This is the datatype (from the struct module) that we should read. + data : void* + The pointer where to store the data. + It should be preallocated and have the correct size. + """ + cdef INT32_t s1, s2, size + + if self._closed: + raise ValueError("I/O operation on closed file.") + + size = self.get_size(dtype) + + fread(&s1, INT32_SIZE, 1, self.cfile) + + # Check record is compatible with data type + if s1 % size != 0: + raise ValueError('Size obtained (%s) does not match with the expected ' + 'size (%s) of multi-item record' % (s1, size)) + + fread(data, size, s1 // size, self.cfile) + fread(&s2, INT32_SIZE, 1, self.cfile) + + if s1 != s2: + raise IOError('Sizes do not agree in the header and footer for ' + 'this record - check header dtype') + cpdef INT32_t read_int(self) except? -1: """Reads a single int32 from the file and return it. diff --git a/yt/utilities/parallel_tools/parallel_analysis_interface.py b/yt/utilities/parallel_tools/parallel_analysis_interface.py index bbaad9316da..4ded4c52d73 100644 --- a/yt/utilities/parallel_tools/parallel_analysis_interface.py +++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py @@ -5,6 +5,7 @@ import traceback from functools import wraps from io import StringIO +from typing import Any, Literal, Union import numpy as np from more_itertools import always_iterable @@ -165,6 +166,15 @@ def get_mpi_type(dtype): return val +def get_transport_dtype(dtype: type) -> Union[Any, str]: + mpi_type = get_mpi_type(dtype) + if mpi_type is None: + # Fallback to char + return MPI.CHAR, "c" + else: + return mpi_type, dtype.str + + class ObjectIterator: """ This is a generalized class that accepts a list of objects and then @@ -466,7 +476,14 @@ class ResultsStorage: result_id = None -def parallel_objects(objects, njobs=0, storage=None, barrier=True, dynamic=False): +def parallel_objects( + objects, + njobs=0, + storage=None, + barrier=True, + dynamic=False, + method: Union[Literal["strided"], Literal["sequential"]] = "strided", +): r"""This function dispatches components of an iterable to different processors. @@ -503,6 +520,11 @@ def parallel_objects(objects, njobs=0, storage=None, barrier=True, dynamic=False This requires one dedicated processor; if this is enabled with a set of 128 processors available, only 127 will be available to iterate over objects as one will be load balancing the rest. + method : str + This governs how the objects are distributed to processors. The + default, "strided", will assign objects to processors in a round-robin + fashion. The other option, "sequential", will assign objects to + processors in a consecutive fashion. Examples @@ -558,7 +580,20 @@ def parallel_objects(objects, njobs=0, storage=None, barrier=True, dynamic=False to_share = {} # If our objects object is slice-aware, like time series data objects are, # this will prevent intermediate objects from being created. - oiter = itertools.islice(enumerate(objects), my_new_id, None, njobs) + if method == "strided": + oiter = itertools.islice(enumerate(objects), my_new_id, None, njobs) + elif method == "sequential": + chunk_size = len(objects) // njobs + remainder = len(objects) % njobs + + if my_new_id < remainder: + istart = my_new_id * (chunk_size + 1) + iend = istart + chunk_size + 1 + else: + istart = (chunk_size + 1) * remainder + (my_new_id - remainder) * chunk_size + iend = istart + chunk_size + + oiter = itertools.islice(enumerate(objects), istart, iend, 1) for result_id, obj in oiter: if storage is not None: rstore = ResultsStorage() @@ -723,7 +758,6 @@ class Communicator: comm = None _grids = None _distributed = None - __tocast = "c" def __init__(self, comm=None): self.comm = comm @@ -1101,10 +1135,12 @@ def merge_quadtree_buffers(self, qt, merge_style): def send_array(self, arr, dest, tag=0): if not isinstance(arr, np.ndarray): - self.comm.send((None, None), dest=dest, tag=tag) + self.comm.send((None, None, None), dest=dest, tag=tag) self.comm.send(arr, dest=dest, tag=tag) return - tmp = arr.view(self.__tocast) # Cast to CHAR + mpi_type, transport_dtype = get_transport_dtype(arr.dtype) + tmp = arr.view(transport_dtype) + # communicate type and shape and optionally units if isinstance(arr, YTArray): unit_metadata = (str(arr.units), arr.units.registry.lut) @@ -1114,13 +1150,17 @@ def send_array(self, arr, dest, tag=0): unit_metadata += ("YTArray",) else: unit_metadata = () - self.comm.send((arr.dtype.str, arr.shape) + unit_metadata, dest=dest, tag=tag) - self.comm.Send([arr, MPI.CHAR], dest=dest, tag=tag) + self.comm.send( + (arr.dtype.str, arr.shape, transport_dtype) + unit_metadata, + dest=dest, + tag=tag, + ) + self.comm.Send([arr, mpi_type], dest=dest, tag=tag) del tmp def recv_array(self, source, tag=0): metadata = self.comm.recv(source=source, tag=tag) - dt, ne = metadata[:2] + dt, ne, transport_dt = metadata[:3] if ne is None and dt is None: return self.comm.recv(source=source, tag=tag) arr = np.empty(ne, dtype=dt) @@ -1130,8 +1170,9 @@ def recv_array(self, source, tag=0): arr = ImageArray(arr, units=metadata[2], registry=registry) else: arr = YTArray(arr, metadata[2], registry=registry) - tmp = arr.view(self.__tocast) - self.comm.Recv([tmp, MPI.CHAR], source=source, tag=tag) + mpi_type = get_mpi_type(transport_dt) + tmp = arr.view(transport_dt) + self.comm.Recv([tmp, mpi_type], source=source, tag=tag) return arr def alltoallv_array(self, send, total_size, offsets, sizes): @@ -1144,7 +1185,8 @@ def alltoallv_array(self, send, total_size, offsets, sizes): recv = np.array(recv) return recv offset = offsets[self.comm.rank] - tmp_send = send.view(self.__tocast) + mpi_type, transport_dtype = get_transport_dtype(send.dtype) + tmp_send = send.view(transport_dtype) recv = np.empty(total_size, dtype=send.dtype) if isinstance(send, YTArray): # We assume send.units is consistent with the units @@ -1157,9 +1199,9 @@ def alltoallv_array(self, send, total_size, offsets, sizes): dtr = send.dtype.itemsize / tmp_send.dtype.itemsize # > 1 roff = [off * dtr for off in offsets] rsize = [siz * dtr for siz in sizes] - tmp_recv = recv.view(self.__tocast) + tmp_recv = recv.view(transport_dtype) self.comm.Allgatherv( - (tmp_send, tmp_send.size, MPI.CHAR), (tmp_recv, (rsize, roff), MPI.CHAR) + (tmp_send, tmp_send.size, mpi_type), (tmp_recv, (rsize, roff), mpi_type) ) return recv