58 #define MAX(a,b) (((a)>(b))?(a):(b))    62 #define KMR_LANE_LEVELS (4)    64 static short const KMR_NO_LANE = -1;
    65 static short const KMR_ANY_LANE = -2;
    83     enum kmr_spawn_req req;
   130     int n_running_sublanes;
   131     _Bool *running_sublanes;
   186         _Bool record_history;
   195                            MPI_Comm basecomm, 
int masterrank,
   199                            MPI_Comm subworlds[], 
unsigned long colors[],
   202     void (*kmr_spawn_set_verbosity)(
struct kmr_spawn_hooks *hooks, 
int level);
   223     for (
int i = 0; i <= level; i++) {
   224         if (n0.v[i] != n1.v[i]) {
   237         id->v[i] = KMR_NO_LANE;
   252         n.v[i] = KMR_NO_LANE;
   254     const char *e = (s + strlen(s));
   261                 n.v[i] = KMR_ANY_LANE;
   264             } 
else if (*(p + 1) == 
'.' && (p + 2) == e) {
   266                 snprintf(ee, 
sizeof(ee),
   267                          (
"Bad lane string; dot at the tail (%s)"), s);
   271             } 
else if (*(p + 1) == 
'.') {
   273                 n.v[i] = KMR_ANY_LANE;
   277                 snprintf(ee, 
sizeof(ee),
   278                          (
"Bad lane string; * followed by something (%s)"), s);
   286             int cc = sscanf(p, 
"%d%c", &d, dot);
   287             if (cc == 1 || (cc == 2 && dot[0] == 
'.')) {
   288                 while (p < e && *p != 
'.') {
   289                     assert(
'0' <= *p && *p <= 
'9');
   292                 if (p < e && *p == 
'.') {
   295                 _Bool sawany = (i > 0 && n.v[i - 1] == KMR_ANY_LANE);
   298                     snprintf(ee, 
sizeof(ee),
   299                              (
"Bad lane string; * at non-tail (%s)"), s);
   303                 } 
else if (cc == 2 && p == e) {
   305                     snprintf(ee, 
sizeof(ee),
   306                              (
"Bad lane string; dot at the tail (%s)"), s);
   313             } 
else if (cc == 0) {
   315                 snprintf(ee, 
sizeof(ee),
   316                          (
"Bad lane string; non-digit appears (%s)"), s);
   322                 snprintf(ee, 
sizeof(ee),
   323                          (
"Bad lane string; non-digit or bad dot (%s)"), s);
   332         snprintf(ee, 
sizeof(ee),
   333                  (
"Bad lane string; garbage at the tail (%s)"), s);
   347     snprintf(buf, 
sizeof(buf), 
"-");
   348     char *e = (buf + 
sizeof(buf));
   353         if (q == KMR_NO_LANE) {
   354             if (print_all_levels) {
   355                 char *dot = (i == 0 ? 
"" : 
".");
   356                 int cc = snprintf(p, (
size_t)(e - p), 
"%s-", dot);
   359             if (!print_all_levels) {
   362         } 
else if (q == KMR_ANY_LANE) {
   363             char *dot = (i == 0 ? 
"" : 
".");
   364             int cc = snprintf(p, (
size_t)(e - p), 
"%s*", dot);
   366             if (!print_all_levels) {
   370             char *dot = (i == 0 ? 
"" : 
".");
   371             int cc = snprintf(p, (
size_t)(e - p), 
"%s%d", dot, q);
   375     buf[
sizeof(buf) - 1] = 0;
   388         assert(admit_any || n.v[i] != KMR_ANY_LANE);
   390                || n.v[i + 1] == KMR_NO_LANE);
   392                || n.v[i + 1] == KMR_NO_LANE || n.v[i + 1] == KMR_ANY_LANE);
   405 static inline unsigned long   408     union {
struct kmr_lane_no id; 
unsigned long color;} u = {.id = 
id};
   418     assert(lane->workers != 0);
   422     for (
int i = 0; i < u->n; i++) {
   423         if (u->ranks[i] == rank) {
   437     assert(lane->superlane != 0);
   439     assert(sup->sublanes != 0);
   443     for (
int i = 0; i < v->n; i++) {
   444         if (v->lanes[i] == lane) {
   466         for (
int q = 0; q < n; q++) {
   467             v->lanes[q] = lanes[q];
   479                   + 
sizeof(int) * (
size_t)n);
   483     for (
int i = 0; i < n; i++) {
   490 kmr_err_when_swf_is_not_initialized(
KMR *mr)
   492     struct kmr_swf *wf = mr->simple_workflow;
   494         kmr_error(mr, 
"Workflow-mapper is not initialized");
   507     kmr_err_when_swf_is_not_initialized(mr);
   508     struct kmr_swf *wf = mr->simple_workflow;
   509     if (wf->kmr_spawn_set_verbosity != 0) {
   510         (*wf->kmr_spawn_set_verbosity)(wf->hooks, level);
   515                                   _Bool test_with_fake_spawn);
   530     _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
   531     _Bool test_with_fake_spawn = mr->swf_debug_master;
   539     assert(
sizeof(
struct kmr_lane_no) <= 
sizeof(
unsigned long));
   541     if (mr->simple_workflow != 0) {
   542         kmr_error(mr, 
"Workflow-mapper is already initialized");
   544         return MPI_ERR_SPAWN;
   547     if (!(master < mr->nprocs)) {
   549         snprintf(ee, 
sizeof(ee),
   550                  "Bad master rank specified (rank=%d)", master);
   553         return MPI_ERR_SPAWN;
   558     memset(wf, 0, 
sizeof(
struct kmr_swf));
   559     mr->simple_workflow = wf;
   561     wf->base_comm = mr->comm;
   562     wf->nprocs = mr->nprocs;
   565     wf->master_rank = master;
   567         wf->lane_comms[i] = lanecomms[i];
   570     wf->args_size = ((mr->swf_args_size != 0)
   571                      ? mr->swf_args_size : KMR_SPAWN_ARGS_SIZE);
   573     wf->master.idle_ranks = 0;
   574     wf->master.top_lane = 0;
   575     wf->master.lane_of_workers = 0;
   576     wf->master.list_of_all_lanes = 0;
   577     wf->master.rpc_buffer = 0;
   578     wf->master.rpc_size = 0;
   579     wf->master.history_head.next = 0;
   580     wf->master.history_head.item = 0;
   581     wf->master.history_insertion_tail = 0;
   582     wf->master.record_history = 0;
   591     if (wf->hooks == 0) {
   595         hooks->s.mr = wf->mr;
   596         hooks->s.print_trace = tracing5;
   601     assert(wf->kmr_spawn_hookup != 0 && wf->hooks != 0);
   602     cc = (*wf->kmr_spawn_hookup)(wf->hooks);
   603     assert(cc == MPI_SUCCESS);
   616             id = wf->lane_id_on_proc;
   618                 id.v[i] = KMR_NO_LANE;
   625         assert(wf->rank != master || level == -1);
   626         if (wf->rank != master) {
   627             assert(wf->kmr_spawn_setup != 0);
   628             cc = (*wf->kmr_spawn_setup)(hooks, wf->base_comm,
   631                                         colors, wf->args_size);
   632             assert(cc == MPI_SUCCESS);
   636     if (wf->rank == wf->master_rank) {
   637         size_t msz = (offsetof(
struct kmr_spawn_work, args) + wf->args_size);
   639         assert(wf->master.rpc_buffer != 0);
   640         wf->master.rpc_size = msz;
   647 static int kmr_activate_workers(
struct kmr_swf *wf, _Bool shutdown);
   661     kmr_err_when_swf_is_not_initialized(mr);
   662     struct kmr_swf *wf = mr->simple_workflow;
   663     _Bool tracing5 = (wf->mr->trace_map_spawn && (5 <= wf->mr->verbosity));
   667     if (tracing5 && (wf->rank == wf->master_rank)) {
   668         fprintf(stderr, (
";;KMR [%05d] Detach worker ranks"   669                          " and wait for join messages.\n"), wf->rank);
   673     mr->comm = MPI_COMM_NULL;
   674     cc = MPI_Comm_dup(MPI_COMM_SELF, &mr->comm);
   675     assert(cc == MPI_SUCCESS);
   678     if (wf->rank == wf->master_rank) {
   681             if (cc == MPI_SUCCESS) {
   691                 fprintf(stderr, (
";;KMR [%05d]"   692                                  " Rank %d is not involved in workflow.\n"),
   700         void *spawnerso = wf->mr->swf_spawner_so;
   702         void (*service)(
struct kmr_spawn_hooks *, int) = wf->kmr_spawn_service;
   703         wf->mr->swf_spawner_so = 0;
   707         assert(cc == MPI_SUCCESS);
   709         assert(cc == MPI_SUCCESS);
   711         hooks->s.service_count = 0;
   713         if (spawnerso != 0) {
   715             assert(service != 0 && hooks != 0);
   716             (*service)(hooks, 0);
   718             assert(service != 0 && hooks != 0);
   719             (*service)(hooks, 0);
   733     kmr_err_when_swf_is_not_initialized(mr);
   734     struct kmr_swf *wf = mr->simple_workflow;
   738     cc = kmr_activate_workers(wf, 1);
   739     assert(cc == MPI_SUCCESS);
   752     kmr_err_when_swf_is_not_initialized(mr);
   753     struct kmr_swf *wf = mr->simple_workflow;
   757     if (wf->rank == wf->master_rank) {
   758         if (wf->master.history_head.next != 0) {
   759             assert(wf->master.record_history);
   760             assert(wf->master.history_insertion_tail != 0);
   761             kmr_free_work_list(wf, wf->master.history_head.next, 0, 0);
   762             wf->master.history_head.next = 0;
   763             wf->master.history_insertion_tail = 0;
   766         assert(wf->master.top_lane != 0);
   768         wf->master.top_lane = 0;
   770         assert(wf->master.lane_of_workers != 0);
   771         size_t qsz = (
sizeof(
struct kmr_lane_state *) * (
size_t)wf->nprocs);
   772         kmr_free(wf->master.lane_of_workers, qsz);
   773         wf->master.lane_of_workers = 0;
   775         assert(wf->master.rpc_buffer != 0);
   776         kmr_free(wf->master.rpc_buffer, wf->master.rpc_size);
   777         wf->master.rpc_buffer = 0;
   778         wf->master.rpc_size = 0;
   783     if (wf->hooks != 0) {
   788     if (wf->mr->swf_spawner_so != 0) {
   789         char *soname = (wf->mr->swf_spawner_library != 0
   790                         ? wf->mr->swf_spawner_library : 
"libkmrspawn.so");
   792         cc = dlclose(wf->mr->swf_spawner_so);
   795             snprintf(ee, 
sizeof(ee), 
"dlclose(%s): %s\n",
   797             kmr_warning(wf->mr, 5, ee);
   799         wf->mr->swf_spawner_so = 0;
   801         wf->kmr_spawn_hookup = 0;
   802         wf->kmr_spawn_setup = 0;
   803         wf->kmr_spawn_service = 0;
   804         wf->kmr_spawn_set_verbosity = 0;
   807     kmr_free(wf, 
sizeof(
struct kmr_swf));
   808     mr->simple_workflow = 0;
   813 static int kmr_start_worker(
struct kmr_spawn_work *w, 
size_t msglen,
   814                             int rank, MPI_Comm basecomm);
   816 static int kmr_join_to_workers(
struct kmr_swf *wf,
   829     MPI_Comm basecomm = wf->base_comm;
   830     int nprocs = wf->nprocs;
   835     char *soname = (wf->mr->swf_spawner_library != 0
   836                     ? wf->mr->swf_spawner_library : 
"libkmrspawn.so");
   838     void *m = dlopen(soname, (RTLD_NOW|RTLD_GLOBAL));
   842     int ng = ((m != 0) ? nprocs : rank);
   844     cc = MPI_Allreduce(&ng, &ngmin, 1, MPI_INT, MPI_MIN, basecomm);
   845     assert(cc == MPI_SUCCESS);
   846     _Bool allok = (ngmin == nprocs);
   849         if (!test_with_fake_spawn) {
   852                 snprintf(ee, 
sizeof(ee), 
"dlopen(%s) failed: %s",
   854                 kmr_error(wf->mr, ee);
   857                 kmr_error(wf->mr, 
"Some ranks failed to dleopn()");
   864                 snprintf(ee, 
sizeof(ee), 
"%s", dlerror());
   865                 kmr_warning(wf->mr, 1,
   866                             (
"WORKFLOW-MAPPER IS UNUSABLE;"   867                              " spawn-library unavailable"));
   868                 kmr_warning(wf->mr, 1, ee);
   875                     snprintf(ee, 
sizeof(ee), 
"%s", dlerror());
   876                     kmr_warning(wf->mr, 5, ee);
   880             wf->kmr_spawn_hookup = kmr_spawn_hookup_standin;
   881             wf->kmr_spawn_setup = kmr_spawn_setup_standin;
   882             wf->kmr_spawn_service = kmr_spawn_service_standin;
   883             wf->kmr_spawn_set_verbosity = kmr_spawn_set_verbosity_standin;
   885         assert(wf->mr->swf_spawner_so == 0);
   886         return MPI_ERR_SPAWN;
   888         char *fn[10] = {
"kmr_spawn_hookup",
   891                         "kmr_spawn_set_verbosity", 0};
   894         wf->mr->swf_spawner_so = m;
   896         for (
int i = 0; (i < 10 && fn[i] != 0); i++) {
   897             fp[i] = (intptr_t)dlsym(m, fn[i]);
   900                 snprintf(ee, 
sizeof(ee), 
"dlsym(%s): %s\n", fn[i], dlerror());
   901                 kmr_warning(wf->mr, 5, ee);
   904         wf->kmr_spawn_hookup = (int (*)())fp[0];
   905         wf->kmr_spawn_setup = (int (*)())fp[1];
   906         wf->kmr_spawn_service = (void (*)())fp[2];
   907         wf->kmr_spawn_set_verbosity = (void (*)())fp[3];
   931                       int root, 
int *description[], _Bool dump)
   933     const MPI_Comm basecomm = mr->comm;
   934     const int nprocs = mr->nprocs;
   935     const int rank = mr->rank;
   944         splitcomms[d] = MPI_COMM_NULL;
   954         if (mr->rank == root) {
   957                 if (description[i] == 0) {
   964                 snprintf(ee, 
sizeof(ee),
   965                          (
"Bad lane description,"   966                           " no terminating null"));
   971         cc = MPI_Bcast(&depth, 1, MPI_INT, root, basecomm);
   972         assert(cc == MPI_SUCCESS);
   974         if (mr->rank == root) {
   975             for (
int d = 0; d < depth; d++) {
   976                 int *v = description[d];
   978                 for (i = 0; v[i] != 0; i++);
   982         cc = MPI_Bcast(len, depth, MPI_INT, root, basecomm);
   983         assert(cc == MPI_SUCCESS);
   985         for (
int d = 0; d < depth; d++) {
   986             if (mr->rank == root) {
   987                 desc[d] = description[d];
   989                 desc[d] = 
kmr_malloc(
sizeof(
int) * (
size_t)(len[d] + 1));
   990                 assert(desc[d] != 0);
   992             cc = MPI_Bcast(desc[d], (len[d] + 1), MPI_INT, root, basecomm);
   993             assert(cc == MPI_SUCCESS);
  1001     for (
int d = (depth - 1); d >= 0; d--) {
  1002         assert(KMR_NO_LANE == -1);
  1004         int sublanecolor = ((d == (depth - 1)) ? mr->rank : (colors.v[d + 1]));
  1011         for (
int i = 0; i < len[d]; i++) {
  1013             if (sublanecolor != -1 && sublanecolor < sum && color == -1) {
  1018         _Bool ok = ((d == (depth - 1)) ? (sum < nprocs) : (sum == len[d + 1]));
  1021             snprintf(ee, 
sizeof(ee),
  1022                      (
"Bad lane description,"  1023                       " sum of ranks/lanes are too large"  1024                       " (lanes=%d level=%d)"),
  1030         assert(d == (depth - 1) || color != -1 || sublanecolor == -1);
  1031         colors.v[d] = (short)color;
  1036     for (
int d = (depth - 1); d >= 0; d--) {
  1037         int color = ((colors.v[d] != -1) ? colors.v[d] : MPI_UNDEFINED);
  1038         cc = MPI_Comm_split(basecomm, color, rank, &splitcomms[d]);
  1039         assert(cc == MPI_SUCCESS);
  1040         assert(color != MPI_UNDEFINED || splitcomms[d] == MPI_COMM_NULL);
  1044         kmr_dump_split_lanes(mr, colors);
  1047     for (
int d = 0; d < depth; d++) {
  1048         if (mr->rank != root) {
  1049             kmr_free(desc[d], (
sizeof(
int) * (
size_t)(len[d] + 1)));
  1068                     int root, 
char *description[], _Bool dump)
  1070     const MPI_Comm basecomm = mr->comm;
  1071     const int nprocs = mr->nprocs;
  1072     const int rank = mr->rank;
  1074     struct desc {
struct kmr_lane_no colors; 
int ranks;};
  1082         splitcomms[d] = MPI_COMM_NULL;
  1090         for (
int i = 0; i < nprocs; i++) {
  1091             if (description[i] == 0) {
  1098             snprintf(ee, 
sizeof(ee),
  1099                      (
"Bad lane description,"  1100                       " no terminating null"));
  1108         lines = 
kmr_malloc(
sizeof(
struct desc) * (
size_t)nlines);
  1117         for (
int i = 0; i < nlines; i++) {
  1118             char *s = description[i];
  1120             size_t len = (strlen(s) + 1);
  1121             if (len > 
sizeof(buf)) {
  1123                 snprintf(ee, 
sizeof(ee),
  1124                          (
"Bad lane description,"  1125                           " string too long (%s)"), s);
  1129             memcpy(buf, s, len);
  1132             while (p[0] != 0 && p[0] != 
':') {p++;}
  1135                 snprintf(ee, 
sizeof(ee),
  1136                          (
"Bad lane description,"  1137                           " no separator colon (%s)"), s);
  1146             cc  = sscanf((p + 1), 
"%d%c", &count, &garbage);
  1149                 snprintf(ee, 
sizeof(ee),
  1150                          (
"Bad lane description,"  1151                           " bad number of ranks (%s)"), s);
  1158             for (
int j = 0; j < i; j++) {
  1161                     snprintf(ee, 
sizeof(ee),
  1162                              (
"Bad lane description,"  1163                               " duplicate lane-numbers (%s)"),
  1170             lines[i].colors = id;
  1171             lines[i].ranks = count;
  1180         for (
int i = 0; i < nlines; i++) {
  1195             for (
int i = 0; i < nlines; i++) {
  1196                 rankcount += lines[i].ranks;
  1198             if (rankcount > (nprocs - 1)) {
  1200                 snprintf(ee, 
sizeof(ee),
  1201                          (
"Bad lane description,"  1202                           " total rank count too large (%d)"),
  1212             for (
int d = 0; d < depth; d++) {
  1213                 for (
int i = 0; i < nlines; i++) {
  1214                     int q = lines[i].colors.v[d];
  1215                     if (q > (nprocs - 1)) {
  1217                         snprintf(ee, 
sizeof(ee),
  1218                                  (
"Bad lane description,"  1219                                   " lane number too large (%s)"),
  1232         assert(allcolors != 0);
  1240         for (
int d = 1; d < depth; d++) {
  1243             for (
int i = 0; i < nlines; i++) {
  1244                 int q = lines[i].colors.v[d];
  1245                 extent = MAX(extent, (q + 1));
  1247             for (
int i = 0; i < nlines; i++) {
  1248                 int o = lines[i].colors.v[d - 1] * extent;
  1249                 lines[i].colors.v[d] = (short)(lines[i].colors.v[d] + o);
  1257         for (
int i = 0; i < nlines; i++) {
  1258             assert((rankcount + lines[i].ranks) < nprocs);
  1259             for (
int j = 0; j < lines[i].ranks; j++) {
  1260                 allcolors[rankcount + j] = lines[i].colors;
  1262             rankcount += lines[i].ranks;
  1264         assert(rankcount < nprocs);
  1265         for (
int j = rankcount; j < nprocs; j++) {
  1266             allcolors[j] = illegalcolor;
  1269     cc = MPI_Bcast(&depth, 1, MPI_INT, root, basecomm);
  1270     assert(cc == MPI_SUCCESS);
  1274     cc = MPI_Scatter(allcolors, sz, MPI_BYTE, &colors, sz, MPI_BYTE,
  1276     assert(cc == MPI_SUCCESS);
  1280     for (
int d = (depth - 1); d >= 0; d--) {
  1281         int color = ((colors.v[d] != -1) ? colors.v[d] : MPI_UNDEFINED);
  1282         cc = MPI_Comm_split(basecomm, color, rank, &splitcomms[d]);
  1283         assert(cc == MPI_SUCCESS);
  1284         assert(color != MPI_UNDEFINED || splitcomms[d] == MPI_COMM_NULL);
  1288         kmr_dump_split_lanes(mr, colors);
  1292         kmr_free(lines, (
sizeof(
struct desc) * (
size_t)nlines));
  1293         kmr_free(allcolors, (
sizeof(
struct kmr_lane_no) * (
size_t)nprocs));
  1296         assert(allcolors == 0);
  1305     const MPI_Comm basecomm = mr->comm;
  1306     const int nprocs = mr->nprocs;
  1307     const int rank = mr->rank;
  1316         assert(allcolors != 0);
  1321     cc = MPI_Gather(&colors, sz, MPI_BYTE, allcolors, sz, MPI_BYTE,
  1323     assert(cc == MPI_SUCCESS);
  1326         printf(
"Split of lanes"  1327                " (displayed by distinct colors assigned to ranks):\n");
  1328         for (
int d = 0; d < depth; d++) {
  1329             printf(
"color[level=%d]=", d);
  1330             for (
int i = 0; i < nprocs; i++) {
  1332                 if (allcolors[i].v[d] != -1) {
  1333                     snprintf(col, 
sizeof(col), 
"%d", allcolors[i].v[d]);
  1335                     snprintf(col, 
sizeof(col), 
"-");
  1349         kmr_free(allcolors, (
sizeof(
struct kmr_lane_no) * (
size_t)nprocs));
  1351         assert(allcolors == 0);
  1358                                      MPI_Comm supercomm);
  1370     assert(wf->base_comm != MPI_COMM_NULL);
  1372     MPI_Comm basecomm = wf->base_comm;
  1373     MPI_Comm *comms = wf->lane_comms;
  1379         MPI_Comm supercomm = (level == 0 ? basecomm : comms[level - 1]);
  1380         MPI_Comm subcomm = comms[level];
  1381         if (supercomm == MPI_COMM_NULL) {
  1385             colors[level] = color;
  1390         MPI_Comm subcomm = comms[level];
  1391         if (level > 0 && subcomm != MPI_COMM_NULL) {
  1392             int supercolor = colors[level - 1];
  1394             assert(cc == MPI_SUCCESS);
  1396         if (colors[level] == -1) {
  1397             wf->lane_id_on_proc.v[level] = KMR_NO_LANE;
  1399             wf->lane_id_on_proc.v[level] = (short)colors[level];
  1404     assert(cc == MPI_SUCCESS);
  1420     cc = MPI_Comm_size(subcomm, &nprocs);
  1421     assert(cc == MPI_SUCCESS);
  1422     cc = MPI_Comm_rank(subcomm, &rank);
  1423     assert(cc == MPI_SUCCESS);
  1427         colors = 
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
  1428         assert(colors != 0);
  1432     cc = MPI_Gather(&supercolor, 1, MPI_INT, colors, 1, MPI_INT,
  1434     assert(cc == MPI_SUCCESS);
  1436         for (
int i = 0; i < nprocs; i++) {
  1437             if (colors[i] != colors[0]) {
  1438                 kmr_error(wf->mr, (
"Communicators are not partitioning"  1441                 return MPI_ERR_SPAWN;
  1444         kmr_free(colors, (
sizeof(
int) * (
size_t)nprocs));
  1462         if (!admit_any && 
id.v[i] == KMR_ANY_LANE) {
  1464             snprintf(ee, 
sizeof(ee), 
"Bad lane-number (%s): any-lane appear",
  1470         if (state == 0 && q == KMR_ANY_LANE) {
  1473         } 
else if (state == 0 && q == KMR_NO_LANE) {
  1475             assert(level == (i - 1));
  1476         } 
else if (state == 0) {
  1478         } 
else if (state == 1 && q == KMR_ANY_LANE) {
  1480         } 
else if (state == 1 && q == KMR_NO_LANE) {
  1482             assert(level == (i - 1));
  1483         } 
else if (state == 1) {
  1485             snprintf(ee, 
sizeof(ee),
  1486                      "Bad lane-number (%s): some follow any-lane",
  1490         } 
else if (state == 2 && q == KMR_ANY_LANE) {
  1492             snprintf(ee, 
sizeof(ee),
  1493                      "Bad lane-number (%s): some follow no-lane",
  1497         } 
else if (state == 2 && q == KMR_NO_LANE) {
  1499         } 
else if (state == 2) {
  1501             snprintf(ee, 
sizeof(ee),
  1502                      "Bad lane-number (%s): some follow no-lane",
  1522     assert(supercomm != MPI_COMM_NULL);
  1529     cc = MPI_Comm_size(supercomm, &nprocs);
  1530     assert(cc == MPI_SUCCESS);
  1531     cc = MPI_Comm_rank(supercomm, &rank);
  1532     assert(cc == MPI_SUCCESS);
  1537         subcommranks = 
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
  1538         assert(subcommranks != 0);
  1539         colors = 
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
  1540         assert(colors != 0);
  1547     if (subcomm != MPI_COMM_NULL) {
  1548         cc = MPI_Comm_rank(subcomm, &r);
  1549         assert(cc == MPI_SUCCESS);
  1553     cc = MPI_Gather(&r, 1, MPI_INT, subcommranks, 1, MPI_INT, root, supercomm);
  1554     assert(cc == MPI_SUCCESS);
  1561         for (
int i = 0; i < nprocs; i++) {
  1562             if (subcommranks[i] == 0) {
  1563                 colors[i] = ncolors;
  1576     cc = MPI_Scatter(colors, 1, MPI_INT, &color, 1, MPI_INT, root, supercomm);
  1577     assert(cc == MPI_SUCCESS);
  1578     if (subcomm != MPI_COMM_NULL) {
  1579         cc = MPI_Bcast(&color, 1, MPI_INT, 0, subcomm);
  1580         assert(cc == MPI_SUCCESS);
  1584         kmr_free(subcommranks, (
sizeof(
int) * (
size_t)nprocs));
  1585         kmr_free(colors, (
sizeof(
int) * (
size_t)nprocs));
  1609     const int master = wf->master_rank;
  1610     const MPI_Comm basecomm = wf->base_comm;
  1618     if (wf->rank == master) {
  1619         laneids = 
kmr_malloc((
size_t)sz0 * (
size_t)wf->nprocs);
  1620         assert(laneids != 0);
  1624     cc = MPI_Gather(&wf->lane_id_on_proc, sz0, MPI_BYTE,
  1625                     laneids, sz0, MPI_BYTE, master, basecomm);
  1626     assert(cc == MPI_SUCCESS);
  1629         if (wf->rank == master) {
  1630             printf(
"Lanes of ranks:\n");
  1631             for (
int i = 0; i < wf->nprocs; i++) {
  1633                 printf(
"lane[%d]=%s\n", i, s);
  1643         if (wf->lane_comms[i] != MPI_COMM_NULL) {
  1646             cc = MPI_Comm_size(wf->lane_comms[i], &nprocs);
  1647             assert(cc == MPI_SUCCESS);
  1648             cc = MPI_Comm_rank(wf->lane_comms[i], &rank);
  1649             assert(cc == MPI_SUCCESS);
  1650             ranks[i].size= nprocs;
  1651             ranks[i].rank = rank;
  1659     if (wf->rank == master) {
  1660         laneranks = 
kmr_malloc((
size_t)sz1 * (
size_t)wf->nprocs);
  1661         assert(laneranks != 0);
  1665     cc = MPI_Gather(ranks, sz1, MPI_BYTE, laneranks, sz1, MPI_BYTE,
  1667     assert(cc == MPI_SUCCESS);
  1669     if (wf->rank == master) {
  1676         int nbottoms = v->n;
  1678         assert(count0 == nbottoms);
  1680         kmr_free(laneids, ((
size_t)sz0 * (
size_t)wf->nprocs));
  1681         kmr_free(laneranks, ((
size_t)sz1 * (
size_t)wf->nprocs));
  1693         assert(count1 == wf->master.top_lane->total_sublanes);
  1705     assert(level != -1 || nprocs == 0);
  1713     lane->level = level;
  1714     lane->leader_rank = -1;
  1715     lane->total_sublanes = 1;
  1716     lane->total_ranks = nprocs;
  1717     lane->superlane = 0;
  1721         assert(lane->workers != 0);
  1726     lane->icomm = MPI_COMM_NULL;
  1727     lane->queue_head.next = 0;
  1728     lane->queue_head.item = 0;
  1729     lane->queue_insertion_tail = 0;
  1730     lane->current_work = 0;
  1731     lane->yielding_to_superlane = 0;
  1732     lane->n_joined_ranks = 0;
  1733     lane->n_running_sublanes = 0;
  1735         lane->running_sublanes = 
kmr_malloc(
sizeof(_Bool) * (
size_t)nprocs);
  1736         assert(lane->running_sublanes != 0);
  1738         lane->running_sublanes = 0;
  1752     assert(wf->master.lane_of_workers == 0);
  1753     size_t qsz = (
sizeof(
struct kmr_lane_state *) * (
size_t)wf->nprocs);
  1754     wf->master.lane_of_workers = 
kmr_malloc(qsz);
  1755     assert(wf->master.lane_of_workers != 0);
  1756     memset(wf->master.lane_of_workers, 0, qsz);
  1760     memset(lanes, 0, qsz);
  1763     for (
int r = 0; r < wf->nprocs; r++) {
  1764         assert(wf->master.lane_of_workers[r] == 0);
  1770             int nprocsinlane = laneranks[r][level].size;
  1771             int rankinlane = laneranks[r][level].rank;
  1772             assert(nprocsinlane > 0 && rankinlane != -1);
  1773             assert(rankinlane < nprocsinlane);
  1780                 for (q = 0; q < nlanes; q++) {
  1781                     assert(lanes[q] != 0);
  1784                         assert(l0 == level);
  1791                     assert(q == nlanes);
  1794                     lanes[nlanes] = lane;
  1800             wf->master.lane_of_workers[r] = lane;
  1801             assert(lane->workers->ranks[rankinlane] == -1);
  1802             lane->workers->ranks[rankinlane] = r;
  1803             if (rankinlane == 0) {
  1804                 assert(lane->leader_rank == -1);
  1805                 lane->leader_rank = r;
  1814     for (
int q = 0; q < v->n; q++) {
  1817         for (
int i = 0; i < workers->n; i++) {
  1818             assert(workers->ranks[i] != -1);
  1836     assert(wf->master.top_lane == 0);
  1837     wf->master.top_lane = top;
  1843         for (
int q = 0; q < nlanes; q++) {
  1845             assert(lane == 0 || lane->superlane == 0);
  1848             } 
else if (lane->level == level) {
  1855                     id.v[level] = KMR_NO_LANE;
  1861                 assert((sup != top) == (leader != -1));
  1862                 assert(sup->leader_rank == -1);
  1863                 sup->leader_rank = leader;
  1867                 assert(lanes[q] == 0);
  1875     top->link = wf->master.list_of_all_lanes;
  1876     wf->master.list_of_all_lanes = top;
  1887     assert(sup->sublanes == 0);
  1895     for (
int q = 0; q < nlanes; q++) {
  1898             && (lane->level - 1) == sup->level
  1899             && 
kmr_lane_eq(lane->lane_id, sup->lane_id, sup->level)) {
  1901             lane->superlane = sup;
  1902             assert(lane->link == 0);
  1903             suptail->link = lane;
  1905             sup->total_sublanes += lane->total_sublanes;
  1906             sup->total_ranks += lane->total_ranks;
  1918     while (suptail->link != 0) {
  1919         v->lanes[i] = suptail->link;
  1921         suptail = suptail->link;
  1925     assert(sup->running_sublanes == 0);
  1926     sup->running_sublanes = 
kmr_malloc(
sizeof(_Bool) * (
size_t)count);
  1927     assert(sup->running_sublanes != 0);
  1931     suptail->link = wf->master.list_of_all_lanes;
  1932     wf->master.list_of_all_lanes = sup->link;
  1947     if (lane->level == -1) {
  1949     } 
if (lane->sublanes == 0) {
  1950         for (
int i = 0; i < lane->workers->n; i++) {
  1951             int r = lane->workers->ranks[i];
  1953             if (laneranks[r][level].rank == -1) {
  1954                 printf(
"AHO lane=%s i=%d r=%d level=%d w=%d\n",
  1957                 for (
int j = 0; j < lane->workers->n; j++) {
  1958                     printf(
"AHO ranks=%d\n", lane->workers->ranks[j]);
  1964             assert(laneranks[r][level].rank != -1);
  1965             if (laneranks[r][level].rank == 0) {
  1972         for (
int i = 0; i < v->n; i++) {
  1987     if (lane->sublanes == 0) {
  1992         for (
int i = 0; i < lane->sublanes->n; i++) {
  2011     assert(lane->icomm == MPI_COMM_NULL);
  2012     if (lane->sublanes != 0) {
  2014         for (
int i = 0; i < v->n; i++) {
  2015             assert(v->lanes[i]->queue_head.next == 0);
  2019     assert(lane->current_work == 0);
  2020     assert(lane->yielding_to_superlane == 0);
  2021     assert(lane->queue_insertion_tail == 0);
  2023     int nsubs = ((lane->sublanes != 0) ? lane->sublanes->n : lane->workers->n);
  2025     assert(lane->running_sublanes != 0);
  2026     kmr_free(lane->running_sublanes, (
sizeof(_Bool) * (
size_t)nsubs));
  2027     lane->running_sublanes = 0;
  2029     if (lane->sublanes != 0) {
  2031         for (
int i = 0; i < v->n; i++) {
  2039     if (lane->workers != 0) {
  2042                      + (
sizeof(
int) * (
size_t)u->n)));
  2056     kmr_err_when_swf_is_not_initialized(mr);
  2057     struct kmr_swf *wf = mr->simple_workflow;
  2058     if (wf->rank == wf->master_rank) {
  2059         kmr_dump_sublanes(wf, wf->master.top_lane);
  2066     const int master = wf->master_rank;
  2067     if (wf->rank == master) {
  2068         if (lane->workers != 0) {
  2070             printf(
"lane %s : ranks[%d]=(",
  2072             for (
int i = 0; i < u->n; i++) {
  2073                 char *separator = (i == 0 ? 
"" : 
",");
  2074                 printf(
"%s%d", separator, u->ranks[i]);
  2078         if (lane->sublanes != 0) {
  2080             for (
int i = 0; i < v->n; i++) {
  2081                 kmr_dump_sublanes(wf, v->lanes[i]);
  2093                    const char *args, 
size_t argssize,
  2094                    int seq, _Bool separatorspace);
  2099 static void kmr_preset_lane_state(
struct kmr_swf *wf, _Bool queuing);
  2100 static void kmr_check_work_queues_empty(
struct kmr_swf *wf);
  2115     kmr_assert_kvs_ok(kvi, kvo, 1, 0);
  2116     assert(kvi->c.key_data == KMR_KV_OPAQUE
  2117            || kvi->c.key_data == KMR_KV_CSTRING);
  2118     assert(kvi->c.value_data == KMR_KV_OPAQUE
  2119            || kvi->c.value_data == KMR_KV_CSTRING);
  2120     assert(kvi->c.element_count <= INT_MAX);
  2122     KMR * 
const mr = kvi->c.mr;
  2123     kmr_err_when_swf_is_not_initialized(mr);
  2124     struct kmr_swf *wf = mr->simple_workflow;
  2125     _Bool tracing5 = (wf->mr->trace_map_spawn && (5 <= wf->mr->verbosity));
  2127     if (wf->rank != wf->master_rank) {
  2128         kmr_error(mr, 
"Non-master rank calls kmr_map_swf()");
  2136     if (wf->master.history_head.next != 0) {
  2137         assert(wf->master.record_history);
  2138         assert(wf->master.history_insertion_tail != 0);
  2139         kmr_free_work_list(wf, wf->master.history_head.next, 0, 0);
  2140         wf->master.history_head.next = 0;
  2141         wf->master.history_insertion_tail = 0;
  2144     wf->master.record_history = mr->swf_record_history;
  2145     if (wf->master.record_history) {
  2146         wf->master.history_insertion_tail = &wf->master.history_head;
  2152         fprintf(stderr, 
";;KMR [%05d] kmr_map_swf:"  2153                 " Queue work-items.\n", wf->rank);
  2157     kmr_preset_lane_state(wf, 1);
  2161         int count = (int)kvi->c.element_count;
  2165         kvi->c.current_block = kvi->c.first_block;
  2167         e = kmr_kvs_first_entry(kvi, kvi->c.first_block);
  2168         for (
int i = 0; i < count; i++) {
  2171             if (kv.vlen >= (
int)wf->args_size) {
  2173                 snprintf(ee, 
sizeof(ee),
  2174                          "Command string (length=%d) too long (%s)",
  2178                 return MPI_ERR_SPAWN;
  2181             _Bool separatorspace = opt.separator_space;
  2184                 x = kmr_make_work_item(wf, 
id, kv.v.p, (
size_t)kv.vlen,
  2189             assert(cc == MPI_SUCCESS);
  2191             e = kmr_kvs_next(kvi, e, 0);
  2197     kmr_preset_lane_state(wf, 0);
  2202         fprintf(stderr, 
";;KMR [%05d] kmr_map_swf:"  2203                 " Request workers to start.\n", wf->rank);
  2207     cc = kmr_activate_workers(wf, 0);
  2208     assert(cc == MPI_SUCCESS);
  2210     assert(wf->rank == wf->master_rank);
  2213         if (cc == MPI_SUCCESS) {
  2218     kmr_check_work_queues_empty(wf);
  2222         fprintf(stderr, 
";;KMR [%05d] Master finished"  2223                 " (Workers will be in undefined state).\n", wf->rank);
  2236                    const char *args, 
size_t argssize,
  2237                    int seq, _Bool separatorspace)
  2239     char *name = 
"kmr_map_swf";
  2246     memcpy(argsbuf, args, argssize);
  2249     cc = kmr_scan_argv_strings(wf->mr, argsbuf, argssize, 0,
  2250                                &maxargc, 0, separatorspace, name);
  2251     assert(cc == MPI_SUCCESS);
  2253     char **argv0 = 
kmr_malloc(
sizeof(
char *) * (
size_t)(maxargc + 1));
  2254     memset(argv0, 0, (
sizeof(
char *) * (
size_t)(maxargc + 1)));
  2257     cc = kmr_scan_argv_strings(wf->mr, argsbuf, argssize, (maxargc + 1),
  2258                                &argc, argv0, separatorspace, name);
  2259     assert(cc == MPI_SUCCESS);
  2260     assert(maxargc == argc);
  2268     if (argc > 0 && strncmp(
"maxprocs=", argv[0], 9) == 0) {
  2270         cc = kmr_parse_int(&argv[0][9], &v);
  2271         if (cc == 1 || v >= 0) {
  2277             snprintf(ee, 
sizeof(ee), 
"Bad maxprocs string (%s)", argv[0]);
  2278             kmr_error(wf->mr, ee);
  2288         for (
int i = 0; i < argc; i++) {
  2289             size_t sz = (strlen(argv[i]) + 1);
  2294         assert(asz <= (argssize + 1));
  2304     x->req = KMR_SPAWN_WORK;
  2305     x->requesting_lane = id;
  2306     x->assigned_lane = nulllane;
  2308     x->sequence_no = seq;
  2310     x->work.req = KMR_SPAWN_WORK;
  2311     x->work.protocol_version = KMR_SPAWN_MAGIC;
  2312     x->work.message_size = (int)msz;
  2313     x->work.subworld = x->level;
  2315     x->work.nprocs = nprocs;
  2316     x->work.print_trace = 0;
  2321         for (
int i = 0; i < argc; i++) {
  2322             size_t sz = (strlen(argv[i]) + 1);
  2323             memcpy(p, argv[i], sz);
  2328         assert((
size_t)(p - x->work.args) == asz);
  2333                 (
";;KMR [%05d] kmr_map_swf:"  2334                  " argc=%d siz=%d\n"), wf->rank, argc, (
int)asz);
  2335         for (
int i = 0; i < argc; i++) {
  2336             fprintf(stderr, 
"%s\n", argv[i]);
  2341     kmr_free(argv0, (
sizeof(
char *) * (
size_t)(maxargc + 1)));
  2342     kmr_free(argsbuf, argssize);
  2357     int queuing = (lane->level + 1);
  2358     if (lane->sublanes == 0) {
  2360         snprintf(ee, 
sizeof(ee),
  2361                  (
"Bad lane specified; nonexisting level"  2362                   " (lane=%s at level=%d)"),
  2364         kmr_error(wf->mr, ee);
  2366         return MPI_ERR_SPAWN;
  2368         assert(x->level >= queuing);
  2370         int q = x->requesting_lane.v[queuing];
  2371         assert(q != KMR_NO_LANE);
  2372         if (0 <= q && q < v->n && (x->level > queuing)) {
  2375         } 
else if (0 <= q && q < v->n) {
  2376             assert(x->level == queuing);
  2377             cc = kmr_link_work(wf, v->lanes[q], x);
  2379         } 
else if (q == KMR_ANY_LANE && (x->level == queuing || multipleany)) {
  2380             for (
int i = 0; i < v->n; i++) {
  2381                 cc = kmr_link_work(wf, v->lanes[i], x);
  2382                 if (cc != MPI_SUCCESS) {
  2387         } 
else if (q == KMR_ANY_LANE) {
  2388             assert(x->level > queuing && !multipleany);
  2390             snprintf(ee, 
sizeof(ee),
  2391                      (
"Bad lane specified; multiple/non-tail any-lane"  2392                       " (lane=%s at level=%d)"),
  2394             kmr_error(wf->mr, ee);
  2396             return MPI_ERR_SPAWN;
  2399             snprintf(ee, 
sizeof(ee),
  2400                      (
"Bad lane specified; index exceeds size"  2401                       " (lane=%s at level=%d)"),
  2403             kmr_error(wf->mr, ee);
  2405             return MPI_ERR_SPAWN;
  2419     assert(lane->queue_insertion_tail != 0);
  2420     assert(lane->queue_insertion_tail->next == 0);
  2421     lane->queue_insertion_tail->next = p;
  2422     lane->queue_insertion_tail = p;
  2426         for (
int i = 0; i < v->n; i++) {
  2427             cc = kmr_link_work(wf, v->lanes[i], x);
  2428             assert(cc == MPI_SUCCESS);
  2438 kmr_preset_lane_state(
struct kmr_swf *wf, _Bool queuing)
  2441     for (
struct kmr_lane_state *lane = h; lane != 0; lane = lane->link) {
  2442         assert((lane->sublanes != 0) || (lane->workers != 0));
  2446             assert(lane->queue_insertion_tail == 0);
  2447             lane->queue_insertion_tail = &lane->queue_head;
  2450             lane->queue_insertion_tail = 0;
  2454             assert(lane->running_sublanes != 0);
  2455             int nsubs = ((lane->sublanes != 0)
  2456                          ? lane->sublanes->n : lane->workers->n);
  2457             lane->n_running_sublanes = nsubs;
  2458             for (
int i = 0; i < nsubs; i++) {
  2459                 lane->running_sublanes[i] = 1;
  2462             if (lane->workers != 0) {
  2472                 x->req = KMR_SPAWN_NONE;
  2473                 x->requesting_lane = id;
  2474                 x->assigned_lane = id;
  2476                 x->work.req = KMR_SPAWN_NONE;
  2477                 x->work.protocol_version = KMR_SPAWN_MAGIC;
  2478                 x->work.message_size = (int)msz;
  2479                 x->work.subworld = x->level;
  2481                 x->work.nprocs = lane->workers->n;
  2482                 x->work.print_trace = 0;
  2484                 assert(lane->current_work == 0);
  2485                 lane->current_work = x;
  2494 static int kmr_finish_current_work(
struct kmr_swf *wf,
  2497 static int kmr_ckeck_sublanes_empty(
struct kmr_swf *wf,
  2522     assert(lane->n_running_sublanes > 0);
  2523     assert(lane->sublanes == 0 || (sublaneindex < lane->sublanes->n));
  2524     assert(lane->workers == 0 || (sublaneindex < lane->workers->n));
  2525     assert(lane->running_sublanes[sublaneindex] != 0);
  2526     lane->running_sublanes[sublaneindex] = 0;
  2528     lane->n_running_sublanes--;
  2529     if (lane->n_running_sublanes != 0) {
  2530         return MPI_ERR_PENDING;
  2532         cc = kmr_finish_current_work(wf, lane);
  2533         assert(cc == MPI_SUCCESS);
  2534         if (lane->superlane == 0) {
  2539             if (cc == MPI_ERR_PENDING) {
  2540                 return MPI_ERR_PENDING;
  2543                 assert(cc == MPI_SUCCESS);
  2559     _Bool tracing5 = (wf->mr->trace_map_spawn && (5 <= wf->mr->verbosity));
  2563     assert(lane->superlane != 0);
  2564     assert(lane->n_running_sublanes == 0);
  2565     assert(lane->current_work == 0);
  2566     assert(lane->yielding_to_superlane == 0);
  2570         assert(x->level <= lane->level);
  2576             for (
int i = 0; i < v->n; i++) {
  2577                 assert(x == v->lanes[i]->yielding_to_superlane);
  2578                 v->lanes[i]->yielding_to_superlane = 0;
  2582         if (x->level < lane->level) {
  2584             assert(lane->superlane != 0);
  2585             assert(lane->yielding_to_superlane == 0);
  2586             lane->yielding_to_superlane = x;
  2593                 fprintf(stderr, 
";;KMR [%05d] Start a work (lane=%s).\n",
  2597             assert(x->level == lane->level);
  2598             x->assigned_lane = lane->lane_id;
  2600             x->work.print_trace = (tracing5 != 0);
  2602             assert(cc == MPI_SUCCESS);
  2604             assert(cc == MPI_SUCCESS);
  2605             kmr_record_in_history(wf, x);
  2606             return MPI_ERR_PENDING;
  2609         cc = kmr_ckeck_sublanes_empty(wf, lane);
  2610         assert(cc == MPI_SUCCESS);
  2616             for (
int i = 0; i < v->n; i++) {
  2618                 if (cc == MPI_ERR_PENDING) {
  2619                     lane->n_running_sublanes++;
  2623         if (lane->n_running_sublanes != 0) {
  2624             return MPI_ERR_PENDING;
  2630                     ";;KMR [%05d] workflow lane done (lane=%s).\n",
  2634         assert(lane->superlane != 0);
  2646         return MPI_ERR_PENDING;
  2647     } 
else if (lane->sublanes != 0) {
  2649         for (
int i = 0; i < v->n; i++) {
  2650             cc = kmr_ckeck_sublanes_empty(wf, v->lanes[i]);
  2651             if (cc == MPI_ERR_PENDING) {
  2652                 return MPI_ERR_PENDING;
  2666         lane->queue_head.next = h->next;
  2687     while (sup->level >= 0 && 
id.v[sup->level] == KMR_ANY_LANE) {
  2688         sup = lane->superlane;
  2690     cc = kmr_remove_work(wf, sup, x);
  2691     assert(cc == MPI_SUCCESS);
  2701         for (
int i = 0; i < v->n; i++) {
  2702             kmr_remove_work(wf, v->lanes[i], x);
  2706     for (
struct kmr_work_list *q = h; (q != 0 && q->next != 0); q = q->next) {
  2723     assert(x->req == KMR_SPAWN_WORK);
  2724     MPI_Comm basecomm = wf->base_comm;
  2728     assert(lane->current_work == 0);
  2729     lane->current_work = x;
  2731     if (lane->sublanes != 0) {
  2733         assert(lane->n_running_sublanes == 0);
  2734         lane->n_running_sublanes = v->n;
  2735         for (
int i = 0; i < v->n; i++) {
  2737             assert(cc == MPI_SUCCESS);
  2738             assert(lane->running_sublanes[i] == 0);
  2739             lane->running_sublanes[i] = 1;
  2743         assert(lane->n_running_sublanes == 0);
  2744         lane->n_running_sublanes = u->n;
  2745         for (
int i = 0; i < u->n; i++) {
  2746             cc = kmr_start_worker(&x->work, (
size_t)x->work.message_size,
  2747                                   u->ranks[i], basecomm);
  2748             assert(cc == MPI_SUCCESS);
  2749             assert(lane->running_sublanes[i] == 0);
  2750             lane->running_sublanes[i] = 1;
  2752         assert(cc == MPI_SUCCESS);
  2755     _Bool mainlane = (lane->level == x->level);
  2757         assert(lane->icomm == MPI_COMM_NULL);
  2758         cc = kmr_join_to_workers(wf, lane);
  2759         assert(cc == MPI_SUCCESS);
  2773     if (lane->current_work != 0) {
  2775         lane->current_work = 0;
  2777         _Bool mainlane = (lane->level == x->level);
  2779             _Bool dummy_initial_work_item = (x->req == KMR_SPAWN_NONE);
  2780             if (dummy_initial_work_item) {
  2782                               + (size_t)x->work.message_size);
  2785                 assert(lane->icomm != MPI_COMM_NULL);
  2786                 cc = MPI_Comm_free(&lane->icomm);
  2787                 assert(cc == MPI_SUCCESS);
  2792                 if (wf->master.history_insertion_tail == 0) {
  2793                     assert(!wf->master.record_history);
  2795                                   + (size_t)x->work.message_size);
  2808     assert(!warn || lane != 0);
  2819             snprintf(ee, 
sizeof(ee),
  2820                      "Some work-items remain queued (%s)",
  2822             kmr_warning(wf->mr, 1, ee);
  2829                       + (size_t)x->work.message_size);
  2836 kmr_check_work_queues_empty(
struct kmr_swf *wf)
  2841     for (
struct kmr_lane_state *lane = h; lane != 0; lane = lane->link) {
  2842         if (lane->queue_head.next != 0) {
  2844             kmr_free_work_list(wf, lane->queue_head.next, lane, 1);
  2845             lane->queue_head.next = 0;
  2849         kmr_error(wf->mr, 
"Some work-items remain queued");
  2857     if (wf->master.record_history) {
  2858         assert(wf->master.history_insertion_tail != 0);
  2863         assert(wf->master.history_insertion_tail->next == 0);
  2864         wf->master.history_insertion_tail->next = p;
  2865         wf->master.history_insertion_tail = p;
  2876     kmr_err_when_swf_is_not_initialized(mr);
  2877     struct kmr_swf *wf = mr->simple_workflow;
  2878     if (wf->master.history_head.next == 0) {
  2879         kmr_warning(wf->mr, 1, 
"Workflow history not recorded.");
  2882         p = wf->master.history_head.next;
  2888             printf(
"work=%d for lane=%s",
  2890             printf(
" run in lane=%s\n",
  2906     kmr_err_when_swf_is_not_initialized(mr);
  2907     struct kmr_swf *wf = mr->simple_workflow;
  2908     if (wf->master.history_head.next == 0) {
  2909         kmr_warning(wf->mr, 1, 
"Workflow history not recorded.");
  2911         int icount = (int)count;
  2912         for (
int i = 0; i < icount; i++) {
  2918         p = wf->master.history_head.next;
  2919         while (p != 0 && i < icount) {
  2923             history[i] = x->sequence_no;
  2935     kmr_err_when_swf_is_not_initialized(mr);
  2936     struct kmr_swf *wf = mr->simple_workflow;
  2937     if (wf->master.history_head.next == 0) {
  2938         kmr_warning(wf->mr, 1, 
"Workflow history not recorded.");
  2940         assert(wf->master.record_history);
  2941         assert(wf->master.history_insertion_tail != 0);
  2942         kmr_free_work_list(wf, wf->master.history_head.next, 0, 0);
  2943         wf->master.history_head.next = 0;
  2944         wf->master.history_insertion_tail = 0;
  2955     MPI_Comm comm = wf->base_comm;
  2956     _Bool tracing5 = (wf->mr->trace_map_spawn && (5 <= wf->mr->verbosity));
  2960     assert(wf->master.rpc_buffer != 0 && wf->master.rpc_size > 0);
  2962     int msz = (int)wf->master.rpc_size;
  2963     mbuf->req = KMR_SPAWN_NONE;
  2966     cc = MPI_Recv(mbuf, msz, MPI_BYTE, MPI_ANY_SOURCE,
  2967                   KMR_SPAWN_RPC_TAG, comm, &st);
  2968     assert(cc == MPI_SUCCESS);
  2969     cc = MPI_Get_count(&st, MPI_BYTE, &len);
  2970     assert(cc == MPI_SUCCESS);
  2971     size_t msglen = (size_t)len;
  2972     int rank = st.MPI_SOURCE;
  2973     assert(rank != wf->master_rank);
  2982     } *xs = (
struct ss *)&st;
  2983     fprintf(stderr, 
"MPI_SOURCE=%d MPI_TAG=%d MPI_ERROR=%d _count=%d _cancelled=%d\n", xs->MPI_SOURCE, xs->MPI_TAG, xs->MPI_ERROR, xs->_count, xs->_cancelled); fflush(0);
  2984     fprintf(stderr, 
"msz=%d ty=%lx\n", msz, MPI_BYTE); fflush(0);
  2987     assert(MPI_SUCCESS != -1 && MPI_ERR_SPAWN != -1);
  2989     switch (mbuf->req) {
  2990     case KMR_SPAWN_NEXT: {
  2996         assert(lane == 0 || (lane->sublanes == 0 && lane->workers != 0));
  2997         if (w->initial_message != 0) {
  3000                 snprintf(ee, 
sizeof(ee),
  3001                          (
"Unexpectedly receive a worker joining message"  3002                           " (from rank=%d)"), rank);
  3003                 kmr_error(wf->mr, ee);
  3007                 lane->n_joined_ranks++;
  3008                 top->n_joined_ranks++;
  3009                 assert(lane->n_joined_ranks <= lane->workers->n);
  3010                 assert(top->n_joined_ranks <= top->total_ranks);
  3012                 wf->master.idle_ranks++;
  3017             if (w->initial_message != 0) {
  3019                         (
";;KMR [%05d] rank=%d joined"  3020                          " (workers=%d/%d; idle=%d).\n"),
  3022                         top->n_joined_ranks, top->total_ranks,
  3023                         wf->master.idle_ranks);
  3026                 fprintf(stderr, 
";;KMR [%05d] rank=%d requesting a work.\n",
  3033             if (top->n_joined_ranks < top->total_ranks) {
  3034                 return MPI_ERR_PENDING;
  3038         } 
else if (lane != 0) {
  3042             if (cc == MPI_SUCCESS) {
  3044                     fprintf(stderr, 
";;KMR [%05d] Workflow finished.\n",
  3051             return MPI_ERR_PENDING;
  3059         snprintf(ee, 
sizeof(ee),
  3060                  "Bad RPC message request=0x%x length=%zd from rank=%d",
  3061                  mbuf->req, msglen, rank);
  3062         kmr_error(wf->mr, ee);
  3074 kmr_activate_workers(
struct kmr_swf *wf, _Bool shutdown)
  3076     MPI_Comm comm = wf->base_comm;
  3084         memset(mbuf, 0, msz);
  3085         mbuf->req = KMR_SPAWN_WORK;
  3086         mbuf->protocol_version = KMR_SPAWN_MAGIC;
  3087         mbuf->message_size = (int)msz;
  3091         for (
int i = 0; i < wf->nprocs; i++) {
  3092             if (i != wf->master_rank) {
  3093                 cc = MPI_Send(mbuf, (
int)msz, MPI_BYTE, i,
  3094                               KMR_SPAWN_RPC_TAG, comm);
  3095                 assert(cc == MPI_SUCCESS);
  3102         memset(mbuf, 0, msz);
  3103         mbuf->req = KMR_SPAWN_NONE;
  3104         mbuf->protocol_version = KMR_SPAWN_MAGIC;
  3105         for (
int i = 0; i < wf->nprocs; i++) {
  3106             if (i != wf->master_rank) {
  3107                 cc = MPI_Send(mbuf, (
int)msz, MPI_BYTE, i,
  3108                               KMR_SPAWN_RPC_TAG, comm);
  3109                 assert(cc == MPI_SUCCESS);
  3122                  int rank, MPI_Comm basecomm)
  3126     int len = (int)msglen;
  3127     cc = MPI_Send(w, len, MPI_BYTE, rank, KMR_SPAWN_RPC_TAG, basecomm);
  3128     assert(cc == MPI_SUCCESS);
  3138     MPI_Comm basecomm = wf->base_comm;
  3139     _Bool tracing5 = (wf->mr->trace_map_spawn && (5 <= wf->mr->verbosity));
  3144         fprintf(stderr, 
";;KMR [%05d] Connect to workers (lane=%s).\n",
  3149     assert(lane->icomm == MPI_COMM_NULL);
  3150     int leader = lane->leader_rank;
  3151     int nranks = lane->total_ranks;
  3152     cc = MPI_Intercomm_create(MPI_COMM_SELF, 0, basecomm, leader,
  3153                               KMR_SPAWN_ICOMM_TAG, &lane->icomm);
  3154     assert(cc == MPI_SUCCESS);
  3157     cc = MPI_Comm_remote_size(lane->icomm, &nprocs);
  3158     assert(cc == MPI_SUCCESS);
  3159     if (nranks != nprocs) {
  3161         snprintf(ee, 
sizeof(ee),
  3162                  "Bad inter-communicator size (%d!=%d)", nprocs, nranks);
  3163         kmr_error(wf->mr, ee);
  3176     if (kmr_fake_spawn_hooks != 0 && kmr_fake_spawn_hooks != hooks) {
  3179     kmr_fake_spawn_hooks = hooks;
  3197         char *e = &(w->args[asz]);
  3200         while (p[0] != 0 && p < (e - 1)) {
  3202             while (p[0] != 0 && p < (e - 1)) {
  3210         assert(p == (e - 1) || p[0] == 0);
  3213     char *argv[argc + 1];
  3218         char *e = &(w->args[asz]);
  3221         while (p[0] != 0 && p < (e - 1)) {
  3225             while (p[0] != 0 && p < (e - 1)) {
  3233         assert(p == (e - 1) || p[0] == 0);
  3239     hooks->s.running_work = w;
  3240     hooks->s.mpi_initialized = 1;
  3242     if (hooks->s.print_trace) {
  3245         printf(
";;KMR [%05d] EXEC: %s\n",
  3246                hooks->s.base_rank, aa);
  3250     hooks->s.running_work = 0;
  3251     hooks->s.mpi_initialized = 0;
 Lane Number (at-all-ranks). 
static struct kmr_lane_vector * kmr_make_bottom_lanes(struct kmr_swf *wf, struct kmr_lane_no *laneids, struct kmr_pair laneranks[][KMR_LANE_LEVELS])
Makes lanes at the bottom levels (at-the-master). 
Key-Value Stream (abstract). 
static int kmr_yield_for_lane(struct kmr_swf *wf, struct kmr_lane_state *lane, int sublaneindex)
Schedules a next work-item when a worker or a sublane finishes (at-the-master). 
Utilities Private Part (do not include from applications). 
static void kmr_free_lanes(struct kmr_swf *wf, struct kmr_lane_state *lane)
Frees a lane and its sublanes, recursively (at-the-master). 
static int kmr_find_leader(struct kmr_swf *wf, struct kmr_lane_state *lane, int level, struct kmr_pair laneranks[][KMR_LANE_LEVELS])
Searches a leader (rank=0) in a LANE at a LEVEL (at-the-master). 
static int kmr_enqueue_work(struct kmr_swf *wf, struct kmr_lane_state *lane, struct kmr_work_item *x, _Bool multiple_any)
Enqueues a work-item in some sublane of a LANE (at-the-master). 
Work-Item Queue Entry (at-the-master). 
static int kmr_start_lanes(struct kmr_swf *wf, struct kmr_lane_state *lane, struct kmr_work_item *x)
Requests workers to start a work for a lane and its sublanes, and then connects to workers (at-the-ma...
void kmr_free_swf_history(KMR *mr)
Clears the history recorded in kmr_map_swf(). 
static int kmr_check_partitioning(struct kmr_swf *wf, int supercolor, MPI_Comm subcomm)
Checks if a sub-communicator is a partitioning of a super-communicator (at-all-ranks). 
int kmr_init_swf(KMR *mr, MPI_Comm lanecomms[KMR_LANE_LEVELS], int master)
Initializes the lanes of simple workflow. 
Vector of Ranks (at-the-master). 
static void kmr_resolve_lanes(struct kmr_swf *wf)
Assigns a lane-number to a rank (wf->lane_id_on_proc) (at-all-ranks). 
#define kmr_malloc(Z)
Allocates memory, or aborts when failed. 
Work Description (at-the-master). 
static int kmr_load_spawn_library(struct kmr_swf *wf, _Bool test_with_fake_spawn)
Loads the spawn-library "libkmrspawn.so". 
static struct kmr_lane_no kmr_name_lane(KMR *mr, const char *s)
Parses a string as a lane-number. 
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS). 
static int kmr_find_sublane_index(struct kmr_swf *wf, struct kmr_lane_state *lane)
Finds a sublane index of a superlane. 
static int kmr_schedule_lanes(struct kmr_swf *wf, struct kmr_lane_state *lane)
Scehdules a lane for a next work-item (at-the-master). 
int kmr_split_swf_lanes_a(KMR *mr, MPI_Comm splitcomms[KMR_LANE_LEVELS], int root, int *description[], _Bool dump)
Splits a communicator in a KMR context to ones to be used for kmr_init_swf(). 
static char * kmr_lane_string(struct kmr_lane_no n, _Bool print_all_levels)
(NO-THREAD-SAFE) Returns a string representation of a lane-number. 
void kmr_set_swf_verbosity(KMR *mr, int level)
Sets the verbosity of the spawn-library. 
static int kmr_check_lane_id(struct kmr_swf *wf, struct kmr_lane_no id, _Bool admit_any)
Checks well-formedness of a lane-number. 
Work-Item Queue of a Lane (at-the-master). 
static struct kmr_rank_vector * kmr_make_rank_vector(int n)
Allocates a rank vector, filling all entries with -1. 
Handy Copy of a Key-Value Field. 
Workflow State (at-all-ranks). 
Options to Mapping by Spawns. 
int kmr_fin(void)
Clears the environment. 
static struct kmr_lane_state * kmr_allocate_lane(int level, struct kmr_lane_no id, int nprocs)
Makes a lane structure (at-the-master). 
int kmr_split_swf_lanes(KMR *mr, MPI_Comm splitcomms[KMR_LANE_LEVELS], int root, char *description[], _Bool dump)
Splits a communicator in a KMR context to ones to be used for kmr_init_swf(). 
Vector of Lanes (at-the-master). 
static struct kmr_lane_vector * kmr_make_lane_vector(int n, struct kmr_lane_state *lanes[])
Packs lanes in a vector. 
static int kmr_dequeue_scattered_work(struct kmr_swf *wf, struct kmr_lane_state *lane, struct kmr_work_item *x)
Removes all occurrences of a work-item (which may be scattered for an any-lane) from the all queues...
#define KMR_LANE_LEVELS
Maximum Levels of Lanes. 
void kmr_dump_swf_order_history(KMR *mr, int *history, size_t count)
Returns a list of start ordering of the work-items. 
static int kmr_handle_worker_request(struct kmr_swf *wf, _Bool joining)
(spawn-library-protocol) Handles requests from workers. 
static int kmr_count_bottom_level_lanes(struct kmr_lane_state *lane)
Counts the number of the bottom level lanes (at-the-master). 
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context(). 
void kmr_dump_swf_history(KMR *mr)
Prints the history of kmr_map_swf(), which is the start ordering the work-items. 
int kmr_map_swf(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps with a simple workflow. 
int kmr_finish_swf(KMR *mr)
Clears the lanes of simple workflow. 
static void kmr_make_lanes(struct kmr_swf *wf)
Initializes thelanes at the master rank (at-all-ranks). 
static void kmr_clear_lane_id(struct kmr_lane_no *id)
Clears the lane-number to a null-lane. 
static int kmr_color_subcommunicator(struct kmr_swf *wf, MPI_Comm subcomm, MPI_Comm supercomm)
Colors sub-communicators distinctly in a super-communicator, and returns the color which names names ...
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv(). 
int kmr_make_printable_argv_string(char *s, size_t sz, char **argv)
Fills the string buffer with the argv strings for printing. 
int kmr_detach_swf_workers(KMR *mr)
Disengages the workers from main processing and puts them in the service loop for spawning...
int kmr_stop_swf_workers(KMR *mr)
Finishes the workers of workflow. 
void kmr_dump_swf_lanes(KMR *mr)
Dumps lanes created by kmr_init_swf(). 
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type. 
static int kmr_find_worker_index(struct kmr_swf *wf, struct kmr_lane_state *lane, int rank)
Finds a worker index of a lane for a rank. 
static void kmr_bond_sublanes(struct kmr_swf *wf, struct kmr_lane_state *sup, struct kmr_lane_state *lanes[], int nlanes)
Builds a vector of sublanes for the superlane SUP (at-the-master). 
static unsigned long kmr_color_of_lane(struct kmr_lane_no id)
Returns a lane-number as a single integer color. 
static void kmr_bond_all_lanes(struct kmr_swf *wf, struct kmr_lane_vector *v, struct kmr_pair laneranks[][KMR_LANE_LEVELS])
Collects lanes to make a superlane, which build up to a single top-lane (at-the-master). 
static _Bool kmr_lane_eq(struct kmr_lane_no n0, struct kmr_lane_no n1, int level)
Compares lane-numbers up to the LEVEL. 
static int kmr_level_of_lane(struct kmr_lane_no n, _Bool admit_any)
Returns the maximum level of a given lane-number (zero to KMR_LANE_LEVELS-1), or returns -1 for a nul...