Skip to content

Commit

Permalink
Merge pull request #1911 from jedwards4b/subset_rearr_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jedwards4b authored Mar 7, 2022
2 parents fa73d27 + fbd8a0a commit f49656d
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 56 deletions.
28 changes: 18 additions & 10 deletions src/clib/pio_darray_int.c
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,10 @@ write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *vari
}

/* Write, in non-blocking fashion, a list of subarrays. */
PLOG((3, "about to call ncmpi_iput_varn() varids[%d] = %d rrcnt = %d, llen = %d",
nv, varids[nv], rrcnt, llen));
// PLOG((3, "about to call ncmpi_iput_varn() varids[%d] = %d rrcnt = %d, llen = %d",
// nv, varids[nv], rrcnt, llen));
// for(int i=0;i < llen; i++)
// PLOG((3, "bufptr[%d] = %d",i,((int *)bufptr)[i]));
ierr = ncmpi_iput_varn(file->fh, varids[nv], rrcnt, startlist, countlist,
bufptr, llen, iodesc->mpitype, &vdesc->request[vdesc->nreqs]);

Expand Down Expand Up @@ -945,21 +947,25 @@ recv_and_write_data(file_desc_t *file, const int *varids, const int *frame,
for (int regioncnt = 0; regioncnt < rregions; regioncnt++)
{
PLOG((3, "writing data for region with regioncnt = %d", regioncnt));
bool needtowrite = true;

if ((ierr = get_var_desc(varids[0], &file->varlist, &vdesc)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);

/* Get the start/count arrays for this region. */
for (int i = 0; i < fndims; i++)
{
start[i] = tmp_start[i + regioncnt * fndims];
count[i] = tmp_count[i + regioncnt * fndims];
PLOG((3, "start[%d] = %d count[%d] = %d", i, start[i], i, count[i]));
PLOG((3, "needtowrite %d count[%d] %d\n",needtowrite, i, count[i]));
if(i>0 || vdesc->record <0)
needtowrite = (count[i] > 0 && needtowrite);
}

/* Process each variable in the buffer. */
for (int nv = 0; nv < nvars; nv++)
{
PLOG((3, "writing buffer var %d", nv));
if ((ierr = get_var_desc(varids[0], &file->varlist, &vdesc)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);

/* Get a pointer to the correct part of the buffer. */
bufptr = (void *)((char *)iobuf + iodesc->mpitype_size * (nv * rlen + loffset));
Expand All @@ -981,13 +987,15 @@ recv_and_write_data(file_desc_t *file, const int *varids, const int *frame,
}

#ifdef LOGGING
for (int i = 1; i < fndims; i++)
PLOG((3, "start[%d] %d count[%d] %d", i, start[i], i, count[i]));
if(needtowrite)
for (int i = 1; i < fndims; i++)
PLOG((3, "(serial) start[%d] %d count[%d] %d needtowrite %d", i, start[i], i, count[i], needtowrite));
#endif /* LOGGING */

/* Call the netCDF functions to write the data. */
if ((ierr = nc_put_vara(file->fh, varids[nv], start, count, bufptr)))
return check_netcdf2(ios, NULL, ierr, __FILE__, __LINE__);
if (needtowrite)
if ((ierr = nc_put_vara(file->fh, varids[nv], start, count, bufptr)))
return check_netcdf2(ios, NULL, ierr, __FILE__, __LINE__);

} /* next var */

Expand Down Expand Up @@ -1350,7 +1358,7 @@ pio_read_darray_nc(file_desc_t *file, io_desc_t *iodesc, int vid, void *iobuf)
/* Release the start and count arrays. */
for (int i = 0; i < rrlen; i++)
{
PLOG((3,"startlist %d %d countlist %d %d",startlist[i][0],startlist[i][1],countlist[i][0],countlist[i][1]));
// PLOG((3,"startlist %d %d countlist %d %d",startlist[i][0],startlist[i][1],countlist[i][0],countlist[i][1]));
free(startlist[i]);
free(countlist[i]);
}
Expand Down
94 changes: 61 additions & 33 deletions src/clib/pio_rearrange.c
Original file line number Diff line number Diff line change
Expand Up @@ -297,16 +297,16 @@ create_mpi_datatypes(MPI_Datatype mpitype, int msgcnt,

PIO_Offset bsizeT[msgcnt];

PLOG((1, "create_mpi_datatypes mpitype = %d msgcnt = %d", mpitype,
PLOG((2, "create_mpi_datatypes mpitype = %d msgcnt = %d", mpitype,
msgcnt));
PLOG((2, "MPI_BYTE = %d MPI_CHAR = %d MPI_SHORT = %d MPI_INT = %d "
"MPI_FLOAT = %d MPI_DOUBLE = %d", MPI_BYTE, MPI_CHAR, MPI_SHORT,
MPI_INT, MPI_FLOAT, MPI_DOUBLE));
// PLOG((2, "MPI_BYTE = %d MPI_CHAR = %d MPI_SHORT = %d MPI_INT = %d "
// "MPI_FLOAT = %d MPI_DOUBLE = %d", MPI_BYTE, MPI_CHAR, MPI_SHORT,
// MPI_INT, MPI_FLOAT, MPI_DOUBLE));

/* How many indicies in the array? */
for (int j = 0; j < msgcnt; j++)
numinds += mcount[j];
PLOG((2, "numinds = %d", numinds));
// PLOG((2, "numinds = %d", numinds));

if (mindex)
{
Expand All @@ -325,7 +325,6 @@ create_mpi_datatypes(MPI_Datatype mpitype, int msgcnt,
* rearrangers. (If mfrom is NULL, this is the box rearranger.) */
if (mfrom == NULL)
{
PLOG((3, "mfrom is NULL"));
for (int i = 0; i < msgcnt; i++)
{
if (mcount[i] > 0)
Expand All @@ -344,7 +343,7 @@ create_mpi_datatypes(MPI_Datatype mpitype, int msgcnt,
{
blocksize = 1;
}
PLOG((3, "blocksize = %d", blocksize));
PLOG((2, "blocksize = %d", blocksize));

/* pos is an index to the start of each message block. */
pos = 0;
Expand All @@ -358,7 +357,7 @@ create_mpi_datatypes(MPI_Datatype mpitype, int msgcnt,
if (!(displace = malloc(sizeof(int) * len)))
EXIT1(PIO_ENOMEM);

PLOG((2, "blocksize = %d i = %d mcount[%d] = %d len = %d", blocksize, i, i,
PLOG((3, "blocksize = %d i = %d mcount[%d] = %d len = %d", blocksize, i, i,
mcount[i], len));
if (blocksize == 1)
{
Expand Down Expand Up @@ -388,11 +387,14 @@ create_mpi_datatypes(MPI_Datatype mpitype, int msgcnt,
}

#if PIO_ENABLE_LOGGING
for (int j = 0; j < len; j++)
PLOG((3, "displace[%d] = %d", j, displace[j]));
int cnt=0;
for (int j = 1; j < len; j++){
PLOG((4, "displace[%d] = %d", j, displace[j]));
}

#endif /* PIO_ENABLE_LOGGING */

PLOG((3, "calling MPI_Type_create_indexed_block len = %d blocksize = %d "
PLOG((2, "calling MPI_Type_create_indexed_block len = %d blocksize = %d "
"mpitype = %d", len, blocksize, mpitype));
/* Create an indexed datatype with constant-sized blocks. */
mpierr = MPI_Type_create_indexed_block(len, blocksize, displace,
Expand All @@ -410,6 +412,14 @@ create_mpi_datatypes(MPI_Datatype mpitype, int msgcnt,
if ((mpierr = MPI_Type_commit(&mtype[i])))
return check_mpi(NULL, NULL, mpierr, __FILE__, __LINE__);
pos += mcount[i];

// MPI_Aint ext, lb;
// int tsize;
// MPI_Type_get_extent(mtype[i], &lb, &ext);
// MPI_Type_size(mtype[i], &tsize);
// printf("%d lb %d extent %d tsize %d\n",__LINE__, lb, ext, tsize);


}
}

Expand Down Expand Up @@ -477,6 +487,7 @@ define_iodesc_datatypes(iosystem_desc_t *ios, io_desc_t *iodesc)
int *mfrom = iodesc->rearranger == PIO_REARR_SUBSET ? iodesc->rfrom : NULL;

/* Create the MPI datatypes. */
PLOG((2, "Calling create_mpi_datatypes at line %d",__LINE__));
if ((ret = create_mpi_datatypes(iodesc->mpitype, iodesc->nrecvs, iodesc->rindex,
iodesc->rcount, mfrom, iodesc->rtype)))
return pio_err(ios, NULL, ret, __FILE__, __LINE__);
Expand Down Expand Up @@ -510,7 +521,7 @@ define_iodesc_datatypes(iosystem_desc_t *ios, io_desc_t *iodesc)
iodesc->num_stypes = ntypes;

/* Create the MPI data types. */
PLOG((3, "about to call create_mpi_datatypes for computation MPI types"));
PLOG((2, "Calling create_mpi_datatypes at line %d",__LINE__));
if ((ret = create_mpi_datatypes(iodesc->mpitype, ntypes, iodesc->sindex,
iodesc->scount, NULL, iodesc->stype)))
return pio_err(ios, NULL, ret, __FILE__, __LINE__);
Expand Down Expand Up @@ -874,17 +885,18 @@ rearrange_comp2io(iosystem_desc_t *ios, io_desc_t *iodesc, void *sbuf,
recvtypes[i] = PIO_DATATYPE_NULL;
sendtypes[i] = PIO_DATATYPE_NULL;
}
PLOG((3, "ntasks = %d iodesc->mpitype_size = %d niotasks = %d", ntasks,
iodesc->mpitype_size, niotasks));
// PLOG((3, "ntasks = %d iodesc->mpitype_size = %d niotasks = %d", ntasks,
// iodesc->mpitype_size, niotasks));

/* If it has not already been done, define the MPI data types that
* will be used for this io_desc_t. */
// PLOG((2, "Calling define_iodesc_datatypes at line %d sindex[20] = %d",__LINE__,iodesc->sindex[20]));
if ((ret = define_iodesc_datatypes(ios, iodesc)))
return pio_err(ios, NULL, ret, __FILE__, __LINE__);

/* If this io proc, we need to exchange data with compute
* tasks. Create a MPI DataType for that exchange. */
PLOG((2, "ios->ioproc %d iodesc->nrecvs = %d", ios->ioproc, iodesc->nrecvs));
// PLOG((2, "ios->ioproc %d iodesc->nrecvs = %d", ios->ioproc, iodesc->nrecvs));
if (ios->ioproc && iodesc->nrecvs > 0)
{
for (int i = 0; i < iodesc->nrecvs; i++)
Expand Down Expand Up @@ -941,7 +953,7 @@ rearrange_comp2io(iosystem_desc_t *ios, io_desc_t *iodesc, void *sbuf,
for (int i = 0; i < niotasks; i++)
{
int io_comprank = ios->ioranks[i];
PLOG((3, "ios->ioranks[%d] = %d", i, ios->ioranks[i]));
// PLOG((3, "ios->ioranks[%d] = %d", i, ios->ioranks[i]));
if (iodesc->rearranger == PIO_REARR_SUBSET)
io_comprank = 0;

Expand All @@ -966,7 +978,7 @@ rearrange_comp2io(iosystem_desc_t *ios, io_desc_t *iodesc, void *sbuf,
}

/* Data in sbuf on the compute nodes is sent to rbuf on the ionodes */
PLOG((2, "about to call pio_swapm for sbuf"));
// PLOG((2, "about to call pio_swapm for sbuf"));
if ((ret = pio_swapm(sbuf, sendcounts, sdispls, sendtypes,
rbuf, recvcounts, rdispls, recvtypes, mycomm,
&iodesc->rearr_opts.comp2io)))
Expand Down Expand Up @@ -1040,10 +1052,11 @@ rearrange_io2comp(iosystem_desc_t *ios, io_desc_t *iodesc, void *sbuf,
/* Get the size of this communicator. */
if ((mpierr = MPI_Comm_size(mycomm, &ntasks)))
return check_mpi(ios, NULL, mpierr, __FILE__, __LINE__);
PLOG((3, "niotasks %d ntasks %d", niotasks, ntasks));
// PLOG((3, "niotasks %d ntasks %d", niotasks, ntasks));

/* Define the MPI data types that will be used for this
* io_desc_t. */
// PLOG((2, "Calling define_iodesc_datatypes at line %d",__LINE__));
if ((ret = define_iodesc_datatypes(ios, iodesc)))
return pio_err(ios, NULL, ret, __FILE__, __LINE__);

Expand Down Expand Up @@ -2091,14 +2104,21 @@ subset_rearrange_create(iosystem_desc_t *ios, int maplen, PIO_Offset *compmap,
/* Determine scount[0], the number of data elements in the
* computation task that are to be written, by looking at
* compmap. */
// int compmax = -1;
// int compmin = 5000;
for (i = 0; i < iodesc->ndof; i++)
{
// This is allowed in some cases
// pioassert(compmap[i]>=-1 && compmap[i]<=totalgridsize, "Compmap value out of bounds",
// __FILE__,__LINE__);
if (compmap[i] > 0)
(iodesc->scount[0])++;
// if (compmap[i] > compmax)
// compmax = compmap[i];
// if (compmap[i] > 0 && compmap[i]<compmin)
// compmin = compmap[i];
}
// printf("%d compmin=%d compmax=%d maplen=%d\n",__LINE__,compmin, compmax, maplen);

/* Allocate an array for indicies on the computation tasks (the
* send side when writing). */
Expand All @@ -2107,10 +2127,13 @@ subset_rearrange_create(iosystem_desc_t *ios, int maplen, PIO_Offset *compmap,
return pio_err(ios, NULL, PIO_ENOMEM, __FILE__, __LINE__);

j = 0;
for (i = 0; i < iodesc->ndof; i++)
if (compmap[i] > 0)
for (i = 0; i < iodesc->ndof; i++){
if (compmap[i] > 0){
iodesc->sindex[j++] = i;
}
}

PLOG((2,"At line %d scount[0]=%d",__LINE__,iodesc->scount[0]));
/* Pass the reduced maplen (without holes) from each compute task
* to its associated IO task. */
if ((mpierr = MPI_Gather(iodesc->scount, 1, MPI_INT, iodesc->rcount, rcnt,
Expand Down Expand Up @@ -2149,7 +2172,7 @@ subset_rearrange_create(iosystem_desc_t *ios, int maplen, PIO_Offset *compmap,
rdispls[i] = 0;
}
}

// PLOG((2,"At line %d rdispls[%d]=%d rcount=%d",__LINE__,1,rdispls[0], iodesc->rcount[0]));
/* Determine whether fill values will be needed. */
if ((ret = determine_fill(ios, iodesc, gdimlen, compmap)))
return pio_err(ios, NULL, ret, __FILE__, __LINE__);
Expand All @@ -2160,6 +2183,9 @@ subset_rearrange_create(iosystem_desc_t *ios, int maplen, PIO_Offset *compmap,
iodesc->subset_comm)))
return check_mpi(NULL, NULL, mpierr, __FILE__, __LINE__);

// for(int i=0;i<recvcounts[0];i++)
// PLOG((2, "At line %d srcindex[%d] = %d",__LINE__,i,srcindex[i]));

/* On IO tasks which need it, allocate memory for the map and the
* iomap. */
if (ios->ioproc && iodesc->llen > 0)
Expand Down Expand Up @@ -2194,6 +2220,7 @@ subset_rearrange_create(iosystem_desc_t *ios, int maplen, PIO_Offset *compmap,
rdispls, PIO_OFFSET, 0, iodesc->subset_comm)))
return check_mpi(NULL, NULL, mpierr, __FILE__, __LINE__);

// PLOG((2,"At line %d rdispls[%d]=%d",__LINE__,0,rdispls[0]));
if (shrtmap != compmap)
free(shrtmap);

Expand Down Expand Up @@ -2238,26 +2265,26 @@ subset_rearrange_create(iosystem_desc_t *ios, int maplen, PIO_Offset *compmap,
PIO_Offset soffset;
/* we only want a single copy of each source point in the iobuffer but it may be sent to multiple destinations
in a read operation */
int k=0;
PIO_Offset previomap[ntasks];
for (i = 0; i < ntasks; i++)
previomap[i] = -1;
for (i = 0, rllen=0; i < iodesc->llen; i++)
{
mapsort *mptr = &map[i];
iodesc->rfrom[i] = mptr->rfrom;
if(i==0)
if(mptr->iomap > previomap[mptr->rfrom])
{
iodesc->rindex[i] = i;
iomap[0] = mptr->iomap;
iomap[rllen] = mptr->iomap;
soffset = mptr->soffset;
}
else if(mptr->iomap > iomap[rllen])
{
iomap[++rllen] = mptr->iomap;
soffset = mptr->soffset;
}
iodesc->rindex[i] = rllen;
iodesc->rllen = rllen+1;

previomap[mptr->rfrom]=iomap[rllen];
srcindex[(cnt[mptr->rfrom])++] = soffset;
iodesc->rindex[i] = rllen++;
iodesc->rllen = rllen;
}


/* Handle fill values if needed. */
PLOG((3, "ios->ioproc %d iodesc->needsfill %d iodesc->rllen %d", ios->ioproc, iodesc->needsfill, iodesc->rllen));
if (ios->ioproc && iodesc->needsfill)
Expand Down Expand Up @@ -2327,7 +2354,7 @@ subset_rearrange_create(iosystem_desc_t *ios, int maplen, PIO_Offset *compmap,
}

/* Allocate and initialize a grid to fill in missing values. ??? */
PLOG((2, "thisgridsize[ios->io_rank] %d", thisgridsize[ios->io_rank]));
// PLOG((2, "thisgridsize[ios->io_rank] %d", thisgridsize[ios->io_rank]));
PIO_Offset *grid;
if (!(grid = calloc(thisgridsize[ios->io_rank], sizeof(PIO_Offset))))
return pio_err(ios, NULL, PIO_ENOMEM, __FILE__, __LINE__);
Expand Down Expand Up @@ -2441,6 +2468,7 @@ subset_rearrange_create(iosystem_desc_t *ios, int maplen, PIO_Offset *compmap,

iodesc->nrecvs = ntasks;
}
// PLOG((2, "At line %d sindex[20] = %d",__LINE__,iodesc->sindex[20]));

return PIO_NOERR;
}
Expand Down
Loading

0 comments on commit f49656d

Please sign in to comment.