Skip to content

Commit

Permalink
Allow decomp_copy.cpp to run with more than one process
Browse files Browse the repository at this point in the history
  • Loading branch information
wkliao committed Dec 21, 2023
1 parent 914287b commit 74a26fe
Showing 1 changed file with 84 additions and 25 deletions.
109 changes: 84 additions & 25 deletions utils/decomp_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,23 @@
#define MAX_NFILES 6
#define LINE_SIZE 4692802

static void set_start_count(int nprocs,
int rank,
MPI_Offset nelems,
MPI_Offset *start,
MPI_Offset *count)
{
*count = nelems / nprocs;
*start = *count * rank;
if (rank < nelems % nprocs) {
*start += rank;
(*count)++;
}
else {
*start += nelems % nprocs;
}
}

/*----< replay_decomp() >-------------------------------------------------*/
/* Read I/O decomposition file, cfg->decomp_path. The contents of the file
* are shown below, an example from map_f_case_16p.nc
Expand Down Expand Up @@ -118,6 +135,7 @@ int replay_decomp (e3sm_io_config *cfg, e3sm_io_decom *decom) {
int varoffi, varoffo;
int varroffi, varroffo;
int varleni, varleno;
int varfilli, varfillo;
std::vector<int> dims;
int max_nreqs, min_nreqs;
int max_len, min_len;
Expand All @@ -131,11 +149,12 @@ int replay_decomp (e3sm_io_config *cfg, e3sm_io_decom *decom) {
MPI_Offset total_nreqs; // Di total_nreqs
MPI_Offset total_raw_nreqs; // Di total_raw_nreqs
MPI_Offset att_len; // size of str_att
MPI_Offset start, count;
std::vector<char> str_att;
std::string name;

// Set up dummy config for the driver
cfg_in.io_comm = MPI_COMM_SELF;
cfg_in.io_comm = MPI_COMM_WORLD;
cfg_in.info = MPI_INFO_NULL;
cfg_in.num_iotasks = cfg_in.np;
cfg_in.num_subfiles = 0;
Expand Down Expand Up @@ -248,6 +267,16 @@ int replay_decomp (e3sm_io_config *cfg, e3sm_io_decom *decom) {
err = dout->put_att (fido, NC_GLOBAL, name, NC_INT, 1, &min_nreqs);
CHECK_ERR

// Di.fully_covered
name = "D" + std::to_string (i) + ".fully_covered";
err = din->inq_att (fidi, NC_GLOBAL, name, &att_len);
CHECK_ERR
str_att.resize (att_len + 1);
err = din->get_att(fidi, NC_GLOBAL, name, str_att.data());
CHECK_ERR
err = dout->put_att(fido, NC_GLOBAL, name, NC_CHAR, att_len, str_att.data ());
CHECK_ERR

// Di.nreqs
name = "D" + std::to_string (i) + ".nreqs";
err = din->inq_varid (fidi, name.c_str(), &varnri);
Expand Down Expand Up @@ -344,42 +373,81 @@ int replay_decomp (e3sm_io_config *cfg, e3sm_io_decom *decom) {
err = dout->put_att (fido, varleno, "min", NC_INT, 1, &min_len);
CHECK_ERR

/* Di.fill_starts */
name = "D" + std::to_string (i) + ".fill_starts";
err = din->inq_varid(fidi, name.c_str(), &varfilli);
CHECK_ERR
err = dout->def_var(fido, name, NC_INT, 1, &dimnpo, &varfillo);
CHECK_ERR

/* Di.fill_starts:description */
err = din->inq_att(fidi, varfilli, "description", &att_len);
CHECK_ERR
str_att.resize(att_len + 1);
err = din->get_att(fidi, varfilli, "description", str_att.data());
CHECK_ERR
err = dout->put_att(fido, varfillo, "description", NC_CHAR, att_len, str_att.data());
CHECK_ERR

/* exit define mode */
err = dout->enddef (fido);
CHECK_ERR

buf = (int *)malloc (std::max (total_nreqs, std::max (total_raw_nreqs, num_decomp)) *
sizeof (int));
MPI_Offset nelems = std::max(total_nreqs, std::max (total_raw_nreqs, decomp_nprocs));
nelems = nelems / cfg->np + 1;

buf = (int *)malloc (nelems * sizeof (int));
CHECK_PTR (buf)

err = din->get_vara (fidi, varnri, MPI_INT, NULL, NULL, buf, coll);
/* Di.nreqs */
nelems = decomp_nprocs;
set_start_count(cfg->np, cfg->rank, nelems, &start, &count);

err = din->get_vara (fidi, varnri, MPI_INT, &start, &count, buf, coll);
CHECK_ERR
err = dout->put_vara (fido, varnro, MPI_INT, &start, &count, buf, coll);
CHECK_ERR

/* Di.fill_starts */
err = din->get_vara (fidi, varfilli, MPI_INT, &start, &count, buf, coll);
CHECK_ERR
err = dout->put_vara (fido, varnro, MPI_INT, NULL, NULL, buf, coll);
err = dout->put_vara (fido, varfillo, MPI_INT, &start, &count, buf, coll);
CHECK_ERR

/* Di.raw_nreqs */
if (have_raw) {
err = din->get_vara (fidi, varnrri, MPI_INT, NULL, NULL, buf, coll);
err = din->get_vara (fidi, varnrri, MPI_INT, &start, &count, buf, coll);
CHECK_ERR
err = dout->put_vara (fido, varnrro, MPI_INT, NULL, NULL, buf, coll);
err = dout->put_vara (fido, varnrro, MPI_INT, &start, &count, buf, coll);
CHECK_ERR
}

err = din->get_vara (fidi, varoffi, MPI_INT, NULL, NULL, buf, coll);
/* Di.offsets */
nelems = total_nreqs;
set_start_count(cfg->np, cfg->rank, nelems, &start, &count);

err = din->get_vara (fidi, varoffi, MPI_INT, &start, &count, buf, coll);
CHECK_ERR
err = dout->put_vara (fido, varoffo, MPI_INT, NULL, NULL, buf, coll);
err = dout->put_vara (fido, varoffo, MPI_INT, &start, &count, buf, coll);
CHECK_ERR

/* Di.lengths */
err = din->get_vara (fidi, varleni, MPI_INT, &start, &count, buf, coll);
CHECK_ERR
err = dout->put_vara (fido, varleno, MPI_INT, &start, &count, buf, coll);
CHECK_ERR

/* Di.raw_offsets */
if (have_raw) {
err = din->get_vara (fidi, varroffi, MPI_INT, NULL, NULL, buf, coll);
nelems = total_raw_nreqs;
set_start_count(cfg->np, cfg->rank, nelems, &start, &count);

err = din->get_vara (fidi, varroffi, MPI_INT, &start, &count, buf, coll);
CHECK_ERR
err = dout->put_vara (fido, varroffo, MPI_INT, NULL, NULL, buf, coll);
err = dout->put_vara (fido, varroffo, MPI_INT, &start, &count, buf, coll);
CHECK_ERR
}

err = din->get_vara (fidi, varleni, MPI_INT, NULL, NULL, buf, coll);
CHECK_ERR
err = dout->put_vara (fido, varleno, MPI_INT, NULL, NULL, buf, coll);
CHECK_ERR

free (buf);
buf = NULL;

Expand Down Expand Up @@ -426,15 +494,6 @@ int main (int argc, char **argv) {
MPI_Comm_rank (MPI_COMM_WORLD, &rank);
MPI_Comm_size (MPI_COMM_WORLD, &nprocs);

if (nprocs > 1) {
nprocs = 1;
comm = MPI_COMM_SELF;
if (rank == 0)
printf ("Warning: %s is for sequential run. Run on 1 process now.\n", argv[0]);
else
goto err_out;
}

cmd_line[0] = '\0';
for (i = 0; i < argc; i++) {
strcat (cmd_line, argv[i]);
Expand Down

0 comments on commit 74a26fe

Please sign in to comment.