58 #define MAX(a,b) (((a)>(b))?(a):(b)) 62 #define KMR_LANE_LEVELS (4) 64 static short const KMR_NO_LANE = -1;
65 static short const KMR_ANY_LANE = -2;
83 enum kmr_spawn_req req;
130 int n_running_sublanes;
131 _Bool *running_sublanes;
186 _Bool record_history;
195 MPI_Comm basecomm,
int masterrank,
199 MPI_Comm subworlds[],
unsigned long colors[],
202 void (*kmr_spawn_set_verbosity)(
struct kmr_spawn_hooks *hooks,
int level);
223 for (
int i = 0; i <= level; i++) {
224 if (n0.v[i] != n1.v[i]) {
237 id->v[i] = KMR_NO_LANE;
252 n.v[i] = KMR_NO_LANE;
254 const char *e = (s + strlen(s));
261 n.v[i] = KMR_ANY_LANE;
264 }
else if (*(p + 1) ==
'.' && (p + 2) == e) {
266 snprintf(ee,
sizeof(ee),
267 (
"Bad lane string; dot at the tail (%s)"), s);
271 }
else if (*(p + 1) ==
'.') {
273 n.v[i] = KMR_ANY_LANE;
277 snprintf(ee,
sizeof(ee),
278 (
"Bad lane string; * followed by something (%s)"), s);
286 int cc = sscanf(p,
"%d%c", &d, dot);
287 if (cc == 1 || (cc == 2 && dot[0] ==
'.')) {
288 while (p < e && *p !=
'.') {
289 assert(
'0' <= *p && *p <=
'9');
292 if (p < e && *p ==
'.') {
295 _Bool sawany = (i > 0 && n.v[i - 1] == KMR_ANY_LANE);
298 snprintf(ee,
sizeof(ee),
299 (
"Bad lane string; * at non-tail (%s)"), s);
303 }
else if (cc == 2 && p == e) {
305 snprintf(ee,
sizeof(ee),
306 (
"Bad lane string; dot at the tail (%s)"), s);
313 }
else if (cc == 0) {
315 snprintf(ee,
sizeof(ee),
316 (
"Bad lane string; non-digit appears (%s)"), s);
322 snprintf(ee,
sizeof(ee),
323 (
"Bad lane string; non-digit or bad dot (%s)"), s);
332 snprintf(ee,
sizeof(ee),
333 (
"Bad lane string; garbage at the tail (%s)"), s);
347 snprintf(buf,
sizeof(buf),
"-");
348 char *e = (buf +
sizeof(buf));
353 if (q == KMR_NO_LANE) {
354 if (print_all_levels) {
355 char *dot = (i == 0 ?
"" :
".");
356 int cc = snprintf(p, (
size_t)(e - p),
"%s-", dot);
359 if (!print_all_levels) {
362 }
else if (q == KMR_ANY_LANE) {
363 char *dot = (i == 0 ?
"" :
".");
364 int cc = snprintf(p, (
size_t)(e - p),
"%s*", dot);
366 if (!print_all_levels) {
370 char *dot = (i == 0 ?
"" :
".");
371 int cc = snprintf(p, (
size_t)(e - p),
"%s%d", dot, q);
375 buf[
sizeof(buf) - 1] = 0;
388 assert(admit_any || n.v[i] != KMR_ANY_LANE);
390 || n.v[i + 1] == KMR_NO_LANE);
392 || n.v[i + 1] == KMR_NO_LANE || n.v[i + 1] == KMR_ANY_LANE);
405 static inline unsigned long 408 union {
struct kmr_lane_no id;
unsigned long color;} u = {.id =
id};
418 assert(lane->workers != 0);
422 for (
int i = 0; i < u->n; i++) {
423 if (u->ranks[i] == rank) {
437 assert(lane->superlane != 0);
439 assert(sup->sublanes != 0);
443 for (
int i = 0; i < v->n; i++) {
444 if (v->lanes[i] == lane) {
466 for (
int q = 0; q < n; q++) {
467 v->lanes[q] = lanes[q];
479 +
sizeof(int) * (
size_t)n);
483 for (
int i = 0; i < n; i++) {
490 kmr_err_when_swf_is_not_initialized(
KMR *mr)
492 struct kmr_swf *wf = mr->simple_workflow;
494 kmr_error(mr,
"Workflow-mapper is not initialized");
507 kmr_err_when_swf_is_not_initialized(mr);
508 struct kmr_swf *wf = mr->simple_workflow;
509 if (wf->kmr_spawn_set_verbosity != 0) {
510 (*wf->kmr_spawn_set_verbosity)(wf->hooks, level);
515 _Bool test_with_fake_spawn);
530 _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
531 _Bool test_with_fake_spawn = mr->swf_debug_master;
539 assert(
sizeof(
struct kmr_lane_no) <=
sizeof(
unsigned long));
541 if (mr->simple_workflow != 0) {
542 kmr_error(mr,
"Workflow-mapper is already initialized");
544 return MPI_ERR_SPAWN;
547 if (!(master < mr->nprocs)) {
549 snprintf(ee,
sizeof(ee),
550 "Bad master rank specified (rank=%d)", master);
553 return MPI_ERR_SPAWN;
558 memset(wf, 0,
sizeof(
struct kmr_swf));
559 mr->simple_workflow = wf;
561 wf->base_comm = mr->comm;
562 wf->nprocs = mr->nprocs;
565 wf->master_rank = master;
567 wf->lane_comms[i] = lanecomms[i];
570 wf->args_size = ((mr->swf_args_size != 0)
571 ? mr->swf_args_size : KMR_SPAWN_ARGS_SIZE);
573 wf->master.idle_ranks = 0;
574 wf->master.top_lane = 0;
575 wf->master.lane_of_workers = 0;
576 wf->master.list_of_all_lanes = 0;
577 wf->master.rpc_buffer = 0;
578 wf->master.rpc_size = 0;
579 wf->master.history_head.next = 0;
580 wf->master.history_head.item = 0;
581 wf->master.history_insertion_tail = 0;
582 wf->master.record_history = 0;
591 if (wf->hooks == 0) {
595 hooks->s.mr = wf->mr;
596 hooks->s.print_trace = tracing5;
601 assert(wf->kmr_spawn_hookup != 0 && wf->hooks != 0);
602 cc = (*wf->kmr_spawn_hookup)(wf->hooks);
603 assert(cc == MPI_SUCCESS);
616 id = wf->lane_id_on_proc;
618 id.v[i] = KMR_NO_LANE;
625 assert(wf->rank != master || level == -1);
626 if (wf->rank != master) {
627 assert(wf->kmr_spawn_setup != 0);
628 cc = (*wf->kmr_spawn_setup)(hooks, wf->base_comm,
631 colors, wf->args_size);
632 assert(cc == MPI_SUCCESS);
636 if (wf->rank == wf->master_rank) {
637 size_t msz = (offsetof(
struct kmr_spawn_work, args) + wf->args_size);
639 assert(wf->master.rpc_buffer != 0);
640 wf->master.rpc_size = msz;
647 static int kmr_activate_workers(
struct kmr_swf *wf, _Bool shutdown);
661 kmr_err_when_swf_is_not_initialized(mr);
662 struct kmr_swf *wf = mr->simple_workflow;
663 _Bool tracing5 = (wf->mr->trace_map_spawn && (5 <= wf->mr->verbosity));
667 if (tracing5 && (wf->rank == wf->master_rank)) {
668 fprintf(stderr, (
";;KMR [%05d] Detach worker ranks" 669 " and wait for join messages.\n"), wf->rank);
673 mr->comm = MPI_COMM_NULL;
674 cc = MPI_Comm_dup(MPI_COMM_SELF, &mr->comm);
675 assert(cc == MPI_SUCCESS);
678 if (wf->rank == wf->master_rank) {
681 if (cc == MPI_SUCCESS) {
691 fprintf(stderr, (
";;KMR [%05d]" 692 " Rank %d is not involved in workflow.\n"),
700 void *spawnerso = wf->mr->swf_spawner_so;
702 void (*service)(
struct kmr_spawn_hooks *, int) = wf->kmr_spawn_service;
703 wf->mr->swf_spawner_so = 0;
707 assert(cc == MPI_SUCCESS);
709 assert(cc == MPI_SUCCESS);
711 hooks->s.service_count = 0;
713 if (spawnerso != 0) {
715 assert(service != 0 && hooks != 0);
716 (*service)(hooks, 0);
718 assert(service != 0 && hooks != 0);
719 (*service)(hooks, 0);
733 kmr_err_when_swf_is_not_initialized(mr);
734 struct kmr_swf *wf = mr->simple_workflow;
738 cc = kmr_activate_workers(wf, 1);
739 assert(cc == MPI_SUCCESS);
752 kmr_err_when_swf_is_not_initialized(mr);
753 struct kmr_swf *wf = mr->simple_workflow;
757 if (wf->rank == wf->master_rank) {
758 if (wf->master.history_head.next != 0) {
759 assert(wf->master.record_history);
760 assert(wf->master.history_insertion_tail != 0);
761 kmr_free_work_list(wf, wf->master.history_head.next, 0, 0);
762 wf->master.history_head.next = 0;
763 wf->master.history_insertion_tail = 0;
766 assert(wf->master.top_lane != 0);
768 wf->master.top_lane = 0;
770 assert(wf->master.lane_of_workers != 0);
771 size_t qsz = (
sizeof(
struct kmr_lane_state *) * (
size_t)wf->nprocs);
772 kmr_free(wf->master.lane_of_workers, qsz);
773 wf->master.lane_of_workers = 0;
775 assert(wf->master.rpc_buffer != 0);
776 kmr_free(wf->master.rpc_buffer, wf->master.rpc_size);
777 wf->master.rpc_buffer = 0;
778 wf->master.rpc_size = 0;
783 if (wf->hooks != 0) {
788 if (wf->mr->swf_spawner_so != 0) {
789 char *soname = (wf->mr->swf_spawner_library != 0
790 ? wf->mr->swf_spawner_library :
"libkmrspawn.so");
792 cc = dlclose(wf->mr->swf_spawner_so);
795 snprintf(ee,
sizeof(ee),
"dlclose(%s): %s\n",
797 kmr_warning(wf->mr, 5, ee);
799 wf->mr->swf_spawner_so = 0;
801 wf->kmr_spawn_hookup = 0;
802 wf->kmr_spawn_setup = 0;
803 wf->kmr_spawn_service = 0;
804 wf->kmr_spawn_set_verbosity = 0;
807 kmr_free(wf,
sizeof(
struct kmr_swf));
808 mr->simple_workflow = 0;
813 static int kmr_start_worker(
struct kmr_spawn_work *w,
size_t msglen,
814 int rank, MPI_Comm basecomm);
816 static int kmr_join_to_workers(
struct kmr_swf *wf,
829 MPI_Comm basecomm = wf->base_comm;
830 int nprocs = wf->nprocs;
835 char *soname = (wf->mr->swf_spawner_library != 0
836 ? wf->mr->swf_spawner_library :
"libkmrspawn.so");
838 void *m = dlopen(soname, (RTLD_NOW|RTLD_GLOBAL));
842 int ng = ((m != 0) ? nprocs : rank);
844 cc = MPI_Allreduce(&ng, &ngmin, 1, MPI_INT, MPI_MIN, basecomm);
845 assert(cc == MPI_SUCCESS);
846 _Bool allok = (ngmin == nprocs);
849 if (!test_with_fake_spawn) {
852 snprintf(ee,
sizeof(ee),
"dlopen(%s) failed: %s",
854 kmr_error(wf->mr, ee);
857 kmr_error(wf->mr,
"Some ranks failed to dleopn()");
864 snprintf(ee,
sizeof(ee),
"%s", dlerror());
865 kmr_warning(wf->mr, 1,
866 (
"WORKFLOW-MAPPER IS UNUSABLE;" 867 " spawn-library unavailable"));
868 kmr_warning(wf->mr, 1, ee);
875 snprintf(ee,
sizeof(ee),
"%s", dlerror());
876 kmr_warning(wf->mr, 5, ee);
880 wf->kmr_spawn_hookup = kmr_spawn_hookup_standin;
881 wf->kmr_spawn_setup = kmr_spawn_setup_standin;
882 wf->kmr_spawn_service = kmr_spawn_service_standin;
883 wf->kmr_spawn_set_verbosity = kmr_spawn_set_verbosity_standin;
885 assert(wf->mr->swf_spawner_so == 0);
886 return MPI_ERR_SPAWN;
888 char *fn[10] = {
"kmr_spawn_hookup",
891 "kmr_spawn_set_verbosity", 0};
894 wf->mr->swf_spawner_so = m;
896 for (
int i = 0; (i < 10 && fn[i] != 0); i++) {
897 fp[i] = (intptr_t)dlsym(m, fn[i]);
900 snprintf(ee,
sizeof(ee),
"dlsym(%s): %s\n", fn[i], dlerror());
901 kmr_warning(wf->mr, 5, ee);
904 wf->kmr_spawn_hookup = (int (*)())fp[0];
905 wf->kmr_spawn_setup = (int (*)())fp[1];
906 wf->kmr_spawn_service = (void (*)())fp[2];
907 wf->kmr_spawn_set_verbosity = (void (*)())fp[3];
931 int root,
int *description[], _Bool dump)
933 const MPI_Comm basecomm = mr->comm;
934 const int nprocs = mr->nprocs;
935 const int rank = mr->rank;
944 splitcomms[d] = MPI_COMM_NULL;
954 if (mr->rank == root) {
957 if (description[i] == 0) {
964 snprintf(ee,
sizeof(ee),
965 (
"Bad lane description," 966 " no terminating null"));
971 cc = MPI_Bcast(&depth, 1, MPI_INT, root, basecomm);
972 assert(cc == MPI_SUCCESS);
974 if (mr->rank == root) {
975 for (
int d = 0; d < depth; d++) {
976 int *v = description[d];
978 for (i = 0; v[i] != 0; i++);
982 cc = MPI_Bcast(len, depth, MPI_INT, root, basecomm);
983 assert(cc == MPI_SUCCESS);
985 for (
int d = 0; d < depth; d++) {
986 if (mr->rank == root) {
987 desc[d] = description[d];
989 desc[d] =
kmr_malloc(
sizeof(
int) * (
size_t)(len[d] + 1));
990 assert(desc[d] != 0);
992 cc = MPI_Bcast(desc[d], (len[d] + 1), MPI_INT, root, basecomm);
993 assert(cc == MPI_SUCCESS);
1001 for (
int d = (depth - 1); d >= 0; d--) {
1002 assert(KMR_NO_LANE == -1);
1004 int sublanecolor = ((d == (depth - 1)) ? mr->rank : (colors.v[d + 1]));
1011 for (
int i = 0; i < len[d]; i++) {
1013 if (sublanecolor != -1 && sublanecolor < sum && color == -1) {
1018 _Bool ok = ((d == (depth - 1)) ? (sum < nprocs) : (sum == len[d + 1]));
1021 snprintf(ee,
sizeof(ee),
1022 (
"Bad lane description," 1023 " sum of ranks/lanes are too large" 1024 " (lanes=%d level=%d)"),
1030 assert(d == (depth - 1) || color != -1 || sublanecolor == -1);
1031 colors.v[d] = (short)color;
1036 for (
int d = (depth - 1); d >= 0; d--) {
1037 int color = ((colors.v[d] != -1) ? colors.v[d] : MPI_UNDEFINED);
1038 cc = MPI_Comm_split(basecomm, color, rank, &splitcomms[d]);
1039 assert(cc == MPI_SUCCESS);
1040 assert(color != MPI_UNDEFINED || splitcomms[d] == MPI_COMM_NULL);
1044 kmr_dump_split_lanes(mr, colors);
1047 for (
int d = 0; d < depth; d++) {
1048 if (mr->rank != root) {
1049 kmr_free(desc[d], (
sizeof(
int) * (
size_t)(len[d] + 1)));
1068 int root,
char *description[], _Bool dump)
1070 const MPI_Comm basecomm = mr->comm;
1071 const int nprocs = mr->nprocs;
1072 const int rank = mr->rank;
1074 struct desc {
struct kmr_lane_no colors;
int ranks;};
1082 splitcomms[d] = MPI_COMM_NULL;
1090 for (
int i = 0; i < nprocs; i++) {
1091 if (description[i] == 0) {
1098 snprintf(ee,
sizeof(ee),
1099 (
"Bad lane description," 1100 " no terminating null"));
1108 lines =
kmr_malloc(
sizeof(
struct desc) * (
size_t)nlines);
1117 for (
int i = 0; i < nlines; i++) {
1118 char *s = description[i];
1120 size_t len = (strlen(s) + 1);
1121 if (len >
sizeof(buf)) {
1123 snprintf(ee,
sizeof(ee),
1124 (
"Bad lane description," 1125 " string too long (%s)"), s);
1129 memcpy(buf, s, len);
1132 while (p[0] != 0 && p[0] !=
':') {p++;}
1135 snprintf(ee,
sizeof(ee),
1136 (
"Bad lane description," 1137 " no separator colon (%s)"), s);
1146 cc = sscanf((p + 1),
"%d%c", &count, &garbage);
1149 snprintf(ee,
sizeof(ee),
1150 (
"Bad lane description," 1151 " bad number of ranks (%s)"), s);
1158 for (
int j = 0; j < i; j++) {
1161 snprintf(ee,
sizeof(ee),
1162 (
"Bad lane description," 1163 " duplicate lane-numbers (%s)"),
1170 lines[i].colors = id;
1171 lines[i].ranks = count;
1180 for (
int i = 0; i < nlines; i++) {
1195 for (
int i = 0; i < nlines; i++) {
1196 rankcount += lines[i].ranks;
1198 if (rankcount > (nprocs - 1)) {
1200 snprintf(ee,
sizeof(ee),
1201 (
"Bad lane description," 1202 " total rank count too large (%d)"),
1212 for (
int d = 0; d < depth; d++) {
1213 for (
int i = 0; i < nlines; i++) {
1214 int q = lines[i].colors.v[d];
1215 if (q > (nprocs - 1)) {
1217 snprintf(ee,
sizeof(ee),
1218 (
"Bad lane description," 1219 " lane number too large (%s)"),
1232 assert(allcolors != 0);
1240 for (
int d = 1; d < depth; d++) {
1243 for (
int i = 0; i < nlines; i++) {
1244 int q = lines[i].colors.v[d];
1245 extent = MAX(extent, (q + 1));
1247 for (
int i = 0; i < nlines; i++) {
1248 int o = lines[i].colors.v[d - 1] * extent;
1249 lines[i].colors.v[d] = (short)(lines[i].colors.v[d] + o);
1257 for (
int i = 0; i < nlines; i++) {
1258 assert((rankcount + lines[i].ranks) < nprocs);
1259 for (
int j = 0; j < lines[i].ranks; j++) {
1260 allcolors[rankcount + j] = lines[i].colors;
1262 rankcount += lines[i].ranks;
1264 assert(rankcount < nprocs);
1265 for (
int j = rankcount; j < nprocs; j++) {
1266 allcolors[j] = illegalcolor;
1269 cc = MPI_Bcast(&depth, 1, MPI_INT, root, basecomm);
1270 assert(cc == MPI_SUCCESS);
1274 cc = MPI_Scatter(allcolors, sz, MPI_BYTE, &colors, sz, MPI_BYTE,
1276 assert(cc == MPI_SUCCESS);
1280 for (
int d = (depth - 1); d >= 0; d--) {
1281 int color = ((colors.v[d] != -1) ? colors.v[d] : MPI_UNDEFINED);
1282 cc = MPI_Comm_split(basecomm, color, rank, &splitcomms[d]);
1283 assert(cc == MPI_SUCCESS);
1284 assert(color != MPI_UNDEFINED || splitcomms[d] == MPI_COMM_NULL);
1288 kmr_dump_split_lanes(mr, colors);
1292 kmr_free(lines, (
sizeof(
struct desc) * (
size_t)nlines));
1293 kmr_free(allcolors, (
sizeof(
struct kmr_lane_no) * (
size_t)nprocs));
1296 assert(allcolors == 0);
1305 const MPI_Comm basecomm = mr->comm;
1306 const int nprocs = mr->nprocs;
1307 const int rank = mr->rank;
1316 assert(allcolors != 0);
1321 cc = MPI_Gather(&colors, sz, MPI_BYTE, allcolors, sz, MPI_BYTE,
1323 assert(cc == MPI_SUCCESS);
1326 printf(
"Split of lanes" 1327 " (displayed by distinct colors assigned to ranks):\n");
1328 for (
int d = 0; d < depth; d++) {
1329 printf(
"color[level=%d]=", d);
1330 for (
int i = 0; i < nprocs; i++) {
1332 if (allcolors[i].v[d] != -1) {
1333 snprintf(col,
sizeof(col),
"%d", allcolors[i].v[d]);
1335 snprintf(col,
sizeof(col),
"-");
1349 kmr_free(allcolors, (
sizeof(
struct kmr_lane_no) * (
size_t)nprocs));
1351 assert(allcolors == 0);
1358 MPI_Comm supercomm);
1370 assert(wf->base_comm != MPI_COMM_NULL);
1372 MPI_Comm basecomm = wf->base_comm;
1373 MPI_Comm *comms = wf->lane_comms;
1379 MPI_Comm supercomm = (level == 0 ? basecomm : comms[level - 1]);
1380 MPI_Comm subcomm = comms[level];
1381 if (supercomm == MPI_COMM_NULL) {
1385 colors[level] = color;
1390 MPI_Comm subcomm = comms[level];
1391 if (level > 0 && subcomm != MPI_COMM_NULL) {
1392 int supercolor = colors[level - 1];
1394 assert(cc == MPI_SUCCESS);
1396 if (colors[level] == -1) {
1397 wf->lane_id_on_proc.v[level] = KMR_NO_LANE;
1399 wf->lane_id_on_proc.v[level] = (short)colors[level];
1404 assert(cc == MPI_SUCCESS);
1420 cc = MPI_Comm_size(subcomm, &nprocs);
1421 assert(cc == MPI_SUCCESS);
1422 cc = MPI_Comm_rank(subcomm, &rank);
1423 assert(cc == MPI_SUCCESS);
1427 colors =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
1428 assert(colors != 0);
1432 cc = MPI_Gather(&supercolor, 1, MPI_INT, colors, 1, MPI_INT,
1434 assert(cc == MPI_SUCCESS);
1436 for (
int i = 0; i < nprocs; i++) {
1437 if (colors[i] != colors[0]) {
1438 kmr_error(wf->mr, (
"Communicators are not partitioning" 1441 return MPI_ERR_SPAWN;
1444 kmr_free(colors, (
sizeof(
int) * (
size_t)nprocs));
1462 if (!admit_any &&
id.v[i] == KMR_ANY_LANE) {
1464 snprintf(ee,
sizeof(ee),
"Bad lane-number (%s): any-lane appear",
1470 if (state == 0 && q == KMR_ANY_LANE) {
1473 }
else if (state == 0 && q == KMR_NO_LANE) {
1475 assert(level == (i - 1));
1476 }
else if (state == 0) {
1478 }
else if (state == 1 && q == KMR_ANY_LANE) {
1480 }
else if (state == 1 && q == KMR_NO_LANE) {
1482 assert(level == (i - 1));
1483 }
else if (state == 1) {
1485 snprintf(ee,
sizeof(ee),
1486 "Bad lane-number (%s): some follow any-lane",
1490 }
else if (state == 2 && q == KMR_ANY_LANE) {
1492 snprintf(ee,
sizeof(ee),
1493 "Bad lane-number (%s): some follow no-lane",
1497 }
else if (state == 2 && q == KMR_NO_LANE) {
1499 }
else if (state == 2) {
1501 snprintf(ee,
sizeof(ee),
1502 "Bad lane-number (%s): some follow no-lane",
1522 assert(supercomm != MPI_COMM_NULL);
1529 cc = MPI_Comm_size(supercomm, &nprocs);
1530 assert(cc == MPI_SUCCESS);
1531 cc = MPI_Comm_rank(supercomm, &rank);
1532 assert(cc == MPI_SUCCESS);
1537 subcommranks =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
1538 assert(subcommranks != 0);
1539 colors =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
1540 assert(colors != 0);
1547 if (subcomm != MPI_COMM_NULL) {
1548 cc = MPI_Comm_rank(subcomm, &r);
1549 assert(cc == MPI_SUCCESS);
1553 cc = MPI_Gather(&r, 1, MPI_INT, subcommranks, 1, MPI_INT, root, supercomm);
1554 assert(cc == MPI_SUCCESS);
1561 for (
int i = 0; i < nprocs; i++) {
1562 if (subcommranks[i] == 0) {
1563 colors[i] = ncolors;
1576 cc = MPI_Scatter(colors, 1, MPI_INT, &color, 1, MPI_INT, root, supercomm);
1577 assert(cc == MPI_SUCCESS);
1578 if (subcomm != MPI_COMM_NULL) {
1579 cc = MPI_Bcast(&color, 1, MPI_INT, 0, subcomm);
1580 assert(cc == MPI_SUCCESS);
1584 kmr_free(subcommranks, (
sizeof(
int) * (
size_t)nprocs));
1585 kmr_free(colors, (
sizeof(
int) * (
size_t)nprocs));
1609 const int master = wf->master_rank;
1610 const MPI_Comm basecomm = wf->base_comm;
1618 if (wf->rank == master) {
1619 laneids =
kmr_malloc((
size_t)sz0 * (
size_t)wf->nprocs);
1620 assert(laneids != 0);
1624 cc = MPI_Gather(&wf->lane_id_on_proc, sz0, MPI_BYTE,
1625 laneids, sz0, MPI_BYTE, master, basecomm);
1626 assert(cc == MPI_SUCCESS);
1629 if (wf->rank == master) {
1630 printf(
"Lanes of ranks:\n");
1631 for (
int i = 0; i < wf->nprocs; i++) {
1633 printf(
"lane[%d]=%s\n", i, s);
1643 if (wf->lane_comms[i] != MPI_COMM_NULL) {
1646 cc = MPI_Comm_size(wf->lane_comms[i], &nprocs);
1647 assert(cc == MPI_SUCCESS);
1648 cc = MPI_Comm_rank(wf->lane_comms[i], &rank);
1649 assert(cc == MPI_SUCCESS);
1650 ranks[i].size= nprocs;
1651 ranks[i].rank = rank;
1659 if (wf->rank == master) {
1660 laneranks =
kmr_malloc((
size_t)sz1 * (
size_t)wf->nprocs);
1661 assert(laneranks != 0);
1665 cc = MPI_Gather(ranks, sz1, MPI_BYTE, laneranks, sz1, MPI_BYTE,
1667 assert(cc == MPI_SUCCESS);
1669 if (wf->rank == master) {
1676 int nbottoms = v->n;
1678 assert(count0 == nbottoms);
1680 kmr_free(laneids, ((
size_t)sz0 * (
size_t)wf->nprocs));
1681 kmr_free(laneranks, ((
size_t)sz1 * (
size_t)wf->nprocs));
1693 assert(count1 == wf->master.top_lane->total_sublanes);
1705 assert(level != -1 || nprocs == 0);
1713 lane->level = level;
1714 lane->leader_rank = -1;
1715 lane->total_sublanes = 1;
1716 lane->total_ranks = nprocs;
1717 lane->superlane = 0;
1721 assert(lane->workers != 0);
1726 lane->icomm = MPI_COMM_NULL;
1727 lane->queue_head.next = 0;
1728 lane->queue_head.item = 0;
1729 lane->queue_insertion_tail = 0;
1730 lane->current_work = 0;
1731 lane->yielding_to_superlane = 0;
1732 lane->n_joined_ranks = 0;
1733 lane->n_running_sublanes = 0;
1735 lane->running_sublanes =
kmr_malloc(
sizeof(_Bool) * (
size_t)nprocs);
1736 assert(lane->running_sublanes != 0);
1738 lane->running_sublanes = 0;
1752 assert(wf->master.lane_of_workers == 0);
1753 size_t qsz = (
sizeof(
struct kmr_lane_state *) * (
size_t)wf->nprocs);
1754 wf->master.lane_of_workers =
kmr_malloc(qsz);
1755 assert(wf->master.lane_of_workers != 0);
1756 memset(wf->master.lane_of_workers, 0, qsz);
1760 memset(lanes, 0, qsz);
1763 for (
int r = 0; r < wf->nprocs; r++) {
1764 assert(wf->master.lane_of_workers[r] == 0);
1770 int nprocsinlane = laneranks[r][level].size;
1771 int rankinlane = laneranks[r][level].rank;
1772 assert(nprocsinlane > 0 && rankinlane != -1);
1773 assert(rankinlane < nprocsinlane);
1780 for (q = 0; q < nlanes; q++) {
1781 assert(lanes[q] != 0);
1784 assert(l0 == level);
1791 assert(q == nlanes);
1794 lanes[nlanes] = lane;
1800 wf->master.lane_of_workers[r] = lane;
1801 assert(lane->workers->ranks[rankinlane] == -1);
1802 lane->workers->ranks[rankinlane] = r;
1803 if (rankinlane == 0) {
1804 assert(lane->leader_rank == -1);
1805 lane->leader_rank = r;
1814 for (
int q = 0; q < v->n; q++) {
1817 for (
int i = 0; i < workers->n; i++) {
1818 assert(workers->ranks[i] != -1);
1836 assert(wf->master.top_lane == 0);
1837 wf->master.top_lane = top;
1843 for (
int q = 0; q < nlanes; q++) {
1845 assert(lane == 0 || lane->superlane == 0);
1848 }
else if (lane->level == level) {
1855 id.v[level] = KMR_NO_LANE;
1861 assert((sup != top) == (leader != -1));
1862 assert(sup->leader_rank == -1);
1863 sup->leader_rank = leader;
1867 assert(lanes[q] == 0);
1875 top->link = wf->master.list_of_all_lanes;
1876 wf->master.list_of_all_lanes = top;
1887 assert(sup->sublanes == 0);
1895 for (
int q = 0; q < nlanes; q++) {
1898 && (lane->level - 1) == sup->level
1899 &&
kmr_lane_eq(lane->lane_id, sup->lane_id, sup->level)) {
1901 lane->superlane = sup;
1902 assert(lane->link == 0);
1903 suptail->link = lane;
1905 sup->total_sublanes += lane->total_sublanes;
1906 sup->total_ranks += lane->total_ranks;
1918 while (suptail->link != 0) {
1919 v->lanes[i] = suptail->link;
1921 suptail = suptail->link;
1925 assert(sup->running_sublanes == 0);
1926 sup->running_sublanes =
kmr_malloc(
sizeof(_Bool) * (
size_t)count);
1927 assert(sup->running_sublanes != 0);
1931 suptail->link = wf->master.list_of_all_lanes;
1932 wf->master.list_of_all_lanes = sup->link;
1947 if (lane->level == -1) {
1949 }
if (lane->sublanes == 0) {
1950 for (
int i = 0; i < lane->workers->n; i++) {
1951 int r = lane->workers->ranks[i];
1953 if (laneranks[r][level].rank == -1) {
1954 printf(
"AHO lane=%s i=%d r=%d level=%d w=%d\n",
1957 for (
int j = 0; j < lane->workers->n; j++) {
1958 printf(
"AHO ranks=%d\n", lane->workers->ranks[j]);
1964 assert(laneranks[r][level].rank != -1);
1965 if (laneranks[r][level].rank == 0) {
1972 for (
int i = 0; i < v->n; i++) {
1987 if (lane->sublanes == 0) {
1992 for (
int i = 0; i < lane->sublanes->n; i++) {
2011 assert(lane->icomm == MPI_COMM_NULL);
2012 if (lane->sublanes != 0) {
2014 for (
int i = 0; i < v->n; i++) {
2015 assert(v->lanes[i]->queue_head.next == 0);
2019 assert(lane->current_work == 0);
2020 assert(lane->yielding_to_superlane == 0);
2021 assert(lane->queue_insertion_tail == 0);
2023 int nsubs = ((lane->sublanes != 0) ? lane->sublanes->n : lane->workers->n);
2025 assert(lane->running_sublanes != 0);
2026 kmr_free(lane->running_sublanes, (
sizeof(_Bool) * (
size_t)nsubs));
2027 lane->running_sublanes = 0;
2029 if (lane->sublanes != 0) {
2031 for (
int i = 0; i < v->n; i++) {
2039 if (lane->workers != 0) {
2042 + (
sizeof(
int) * (
size_t)u->n)));
2056 kmr_err_when_swf_is_not_initialized(mr);
2057 struct kmr_swf *wf = mr->simple_workflow;
2058 if (wf->rank == wf->master_rank) {
2059 kmr_dump_sublanes(wf, wf->master.top_lane);
2066 const int master = wf->master_rank;
2067 if (wf->rank == master) {
2068 if (lane->workers != 0) {
2070 printf(
"lane %s : ranks[%d]=(",
2072 for (
int i = 0; i < u->n; i++) {
2073 char *separator = (i == 0 ?
"" :
",");
2074 printf(
"%s%d", separator, u->ranks[i]);
2078 if (lane->sublanes != 0) {
2080 for (
int i = 0; i < v->n; i++) {
2081 kmr_dump_sublanes(wf, v->lanes[i]);
2093 const char *args,
size_t argssize,
2094 int seq, _Bool separatorspace);
2099 static void kmr_preset_lane_state(
struct kmr_swf *wf, _Bool queuing);
2100 static void kmr_check_work_queues_empty(
struct kmr_swf *wf);
2115 kmr_assert_kvs_ok(kvi, kvo, 1, 0);
2116 assert(kvi->c.key_data == KMR_KV_OPAQUE
2117 || kvi->c.key_data == KMR_KV_CSTRING);
2118 assert(kvi->c.value_data == KMR_KV_OPAQUE
2119 || kvi->c.value_data == KMR_KV_CSTRING);
2120 assert(kvi->c.element_count <= INT_MAX);
2122 KMR *
const mr = kvi->c.mr;
2123 kmr_err_when_swf_is_not_initialized(mr);
2124 struct kmr_swf *wf = mr->simple_workflow;
2125 _Bool tracing5 = (wf->mr->trace_map_spawn && (5 <= wf->mr->verbosity));
2127 if (wf->rank != wf->master_rank) {
2128 kmr_error(mr,
"Non-master rank calls kmr_map_swf()");
2136 if (wf->master.history_head.next != 0) {
2137 assert(wf->master.record_history);
2138 assert(wf->master.history_insertion_tail != 0);
2139 kmr_free_work_list(wf, wf->master.history_head.next, 0, 0);
2140 wf->master.history_head.next = 0;
2141 wf->master.history_insertion_tail = 0;
2144 wf->master.record_history = mr->swf_record_history;
2145 if (wf->master.record_history) {
2146 wf->master.history_insertion_tail = &wf->master.history_head;
2152 fprintf(stderr,
";;KMR [%05d] kmr_map_swf:" 2153 " Queue work-items.\n", wf->rank);
2157 kmr_preset_lane_state(wf, 1);
2161 int count = (int)kvi->c.element_count;
2165 kvi->c.current_block = kvi->c.first_block;
2167 e = kmr_kvs_first_entry(kvi, kvi->c.first_block);
2168 for (
int i = 0; i < count; i++) {
2171 if (kv.vlen >= (
int)wf->args_size) {
2173 snprintf(ee,
sizeof(ee),
2174 "Command string (length=%d) too long (%s)",
2178 return MPI_ERR_SPAWN;
2181 _Bool separatorspace = opt.separator_space;
2184 x = kmr_make_work_item(wf,
id, kv.v.p, (
size_t)kv.vlen,
2189 assert(cc == MPI_SUCCESS);
2191 e = kmr_kvs_next(kvi, e, 0);
2197 kmr_preset_lane_state(wf, 0);
2202 fprintf(stderr,
";;KMR [%05d] kmr_map_swf:" 2203 " Request workers to start.\n", wf->rank);
2207 cc = kmr_activate_workers(wf, 0);
2208 assert(cc == MPI_SUCCESS);
2210 assert(wf->rank == wf->master_rank);
2213 if (cc == MPI_SUCCESS) {
2218 kmr_check_work_queues_empty(wf);
2222 fprintf(stderr,
";;KMR [%05d] Master finished" 2223 " (Workers will be in undefined state).\n", wf->rank);
2236 const char *args,
size_t argssize,
2237 int seq, _Bool separatorspace)
2239 char *name =
"kmr_map_swf";
2246 memcpy(argsbuf, args, argssize);
2249 cc = kmr_scan_argv_strings(wf->mr, argsbuf, argssize, 0,
2250 &maxargc, 0, separatorspace, name);
2251 assert(cc == MPI_SUCCESS);
2253 char **argv0 =
kmr_malloc(
sizeof(
char *) * (
size_t)(maxargc + 1));
2254 memset(argv0, 0, (
sizeof(
char *) * (
size_t)(maxargc + 1)));
2257 cc = kmr_scan_argv_strings(wf->mr, argsbuf, argssize, (maxargc + 1),
2258 &argc, argv0, separatorspace, name);
2259 assert(cc == MPI_SUCCESS);
2260 assert(maxargc == argc);
2268 if (argc > 0 && strncmp(
"maxprocs=", argv[0], 9) == 0) {
2270 cc = kmr_parse_int(&argv[0][9], &v);
2271 if (cc == 1 || v >= 0) {
2277 snprintf(ee,
sizeof(ee),
"Bad maxprocs string (%s)", argv[0]);
2278 kmr_error(wf->mr, ee);
2288 for (
int i = 0; i < argc; i++) {
2289 size_t sz = (strlen(argv[i]) + 1);
2294 assert(asz <= (argssize + 1));
2304 x->req = KMR_SPAWN_WORK;
2305 x->requesting_lane = id;
2306 x->assigned_lane = nulllane;
2308 x->sequence_no = seq;
2310 x->work.req = KMR_SPAWN_WORK;
2311 x->work.protocol_version = KMR_SPAWN_MAGIC;
2312 x->work.message_size = (int)msz;
2313 x->work.subworld = x->level;
2315 x->work.nprocs = nprocs;
2316 x->work.print_trace = 0;
2321 for (
int i = 0; i < argc; i++) {
2322 size_t sz = (strlen(argv[i]) + 1);
2323 memcpy(p, argv[i], sz);
2328 assert((
size_t)(p - x->work.args) == asz);
2333 (
";;KMR [%05d] kmr_map_swf:" 2334 " argc=%d siz=%d\n"), wf->rank, argc, (
int)asz);
2335 for (
int i = 0; i < argc; i++) {
2336 fprintf(stderr,
"%s\n", argv[i]);
2341 kmr_free(argv0, (
sizeof(
char *) * (
size_t)(maxargc + 1)));
2342 kmr_free(argsbuf, argssize);
2357 int queuing = (lane->level + 1);
2358 if (lane->sublanes == 0) {
2360 snprintf(ee,
sizeof(ee),
2361 (
"Bad lane specified; nonexisting level" 2362 " (lane=%s at level=%d)"),
2364 kmr_error(wf->mr, ee);
2366 return MPI_ERR_SPAWN;
2368 assert(x->level >= queuing);
2370 int q = x->requesting_lane.v[queuing];
2371 assert(q != KMR_NO_LANE);
2372 if (0 <= q && q < v->n && (x->level > queuing)) {
2375 }
else if (0 <= q && q < v->n) {
2376 assert(x->level == queuing);
2377 cc = kmr_link_work(wf, v->lanes[q], x);
2379 }
else if (q == KMR_ANY_LANE && (x->level == queuing || multipleany)) {
2380 for (
int i = 0; i < v->n; i++) {
2381 cc = kmr_link_work(wf, v->lanes[i], x);
2382 if (cc != MPI_SUCCESS) {
2387 }
else if (q == KMR_ANY_LANE) {
2388 assert(x->level > queuing && !multipleany);
2390 snprintf(ee,
sizeof(ee),
2391 (
"Bad lane specified; multiple/non-tail any-lane" 2392 " (lane=%s at level=%d)"),
2394 kmr_error(wf->mr, ee);
2396 return MPI_ERR_SPAWN;
2399 snprintf(ee,
sizeof(ee),
2400 (
"Bad lane specified; index exceeds size" 2401 " (lane=%s at level=%d)"),
2403 kmr_error(wf->mr, ee);
2405 return MPI_ERR_SPAWN;
2419 assert(lane->queue_insertion_tail != 0);
2420 assert(lane->queue_insertion_tail->next == 0);
2421 lane->queue_insertion_tail->next = p;
2422 lane->queue_insertion_tail = p;
2426 for (
int i = 0; i < v->n; i++) {
2427 cc = kmr_link_work(wf, v->lanes[i], x);
2428 assert(cc == MPI_SUCCESS);
2438 kmr_preset_lane_state(
struct kmr_swf *wf, _Bool queuing)
2441 for (
struct kmr_lane_state *lane = h; lane != 0; lane = lane->link) {
2442 assert((lane->sublanes != 0) || (lane->workers != 0));
2446 assert(lane->queue_insertion_tail == 0);
2447 lane->queue_insertion_tail = &lane->queue_head;
2450 lane->queue_insertion_tail = 0;
2454 assert(lane->running_sublanes != 0);
2455 int nsubs = ((lane->sublanes != 0)
2456 ? lane->sublanes->n : lane->workers->n);
2457 lane->n_running_sublanes = nsubs;
2458 for (
int i = 0; i < nsubs; i++) {
2459 lane->running_sublanes[i] = 1;
2462 if (lane->workers != 0) {
2472 x->req = KMR_SPAWN_NONE;
2473 x->requesting_lane = id;
2474 x->assigned_lane = id;
2476 x->work.req = KMR_SPAWN_NONE;
2477 x->work.protocol_version = KMR_SPAWN_MAGIC;
2478 x->work.message_size = (int)msz;
2479 x->work.subworld = x->level;
2481 x->work.nprocs = lane->workers->n;
2482 x->work.print_trace = 0;
2484 assert(lane->current_work == 0);
2485 lane->current_work = x;
2494 static int kmr_finish_current_work(
struct kmr_swf *wf,
2497 static int kmr_ckeck_sublanes_empty(
struct kmr_swf *wf,
2522 assert(lane->n_running_sublanes > 0);
2523 assert(lane->sublanes == 0 || (sublaneindex < lane->sublanes->n));
2524 assert(lane->workers == 0 || (sublaneindex < lane->workers->n));
2525 assert(lane->running_sublanes[sublaneindex] != 0);
2526 lane->running_sublanes[sublaneindex] = 0;
2528 lane->n_running_sublanes--;
2529 if (lane->n_running_sublanes != 0) {
2530 return MPI_ERR_PENDING;
2532 cc = kmr_finish_current_work(wf, lane);
2533 assert(cc == MPI_SUCCESS);
2534 if (lane->superlane == 0) {
2539 if (cc == MPI_ERR_PENDING) {
2540 return MPI_ERR_PENDING;
2543 assert(cc == MPI_SUCCESS);
2559 _Bool tracing5 = (wf->mr->trace_map_spawn && (5 <= wf->mr->verbosity));
2563 assert(lane->superlane != 0);
2564 assert(lane->n_running_sublanes == 0);
2565 assert(lane->current_work == 0);
2566 assert(lane->yielding_to_superlane == 0);
2570 assert(x->level <= lane->level);
2576 for (
int i = 0; i < v->n; i++) {
2577 assert(x == v->lanes[i]->yielding_to_superlane);
2578 v->lanes[i]->yielding_to_superlane = 0;
2582 if (x->level < lane->level) {
2584 assert(lane->superlane != 0);
2585 assert(lane->yielding_to_superlane == 0);
2586 lane->yielding_to_superlane = x;
2593 fprintf(stderr,
";;KMR [%05d] Start a work (lane=%s).\n",
2597 assert(x->level == lane->level);
2598 x->assigned_lane = lane->lane_id;
2600 x->work.print_trace = (tracing5 != 0);
2602 assert(cc == MPI_SUCCESS);
2604 assert(cc == MPI_SUCCESS);
2605 kmr_record_in_history(wf, x);
2606 return MPI_ERR_PENDING;
2609 cc = kmr_ckeck_sublanes_empty(wf, lane);
2610 assert(cc == MPI_SUCCESS);
2616 for (
int i = 0; i < v->n; i++) {
2618 if (cc == MPI_ERR_PENDING) {
2619 lane->n_running_sublanes++;
2623 if (lane->n_running_sublanes != 0) {
2624 return MPI_ERR_PENDING;
2630 ";;KMR [%05d] workflow lane done (lane=%s).\n",
2634 assert(lane->superlane != 0);
2646 return MPI_ERR_PENDING;
2647 }
else if (lane->sublanes != 0) {
2649 for (
int i = 0; i < v->n; i++) {
2650 cc = kmr_ckeck_sublanes_empty(wf, v->lanes[i]);
2651 if (cc == MPI_ERR_PENDING) {
2652 return MPI_ERR_PENDING;
2666 lane->queue_head.next = h->next;
2687 while (sup->level >= 0 &&
id.v[sup->level] == KMR_ANY_LANE) {
2688 sup = lane->superlane;
2690 cc = kmr_remove_work(wf, sup, x);
2691 assert(cc == MPI_SUCCESS);
2701 for (
int i = 0; i < v->n; i++) {
2702 kmr_remove_work(wf, v->lanes[i], x);
2706 for (
struct kmr_work_list *q = h; (q != 0 && q->next != 0); q = q->next) {
2723 assert(x->req == KMR_SPAWN_WORK);
2724 MPI_Comm basecomm = wf->base_comm;
2728 assert(lane->current_work == 0);
2729 lane->current_work = x;
2731 if (lane->sublanes != 0) {
2733 assert(lane->n_running_sublanes == 0);
2734 lane->n_running_sublanes = v->n;
2735 for (
int i = 0; i < v->n; i++) {
2737 assert(cc == MPI_SUCCESS);
2738 assert(lane->running_sublanes[i] == 0);
2739 lane->running_sublanes[i] = 1;
2743 assert(lane->n_running_sublanes == 0);
2744 lane->n_running_sublanes = u->n;
2745 for (
int i = 0; i < u->n; i++) {
2746 cc = kmr_start_worker(&x->work, (
size_t)x->work.message_size,
2747 u->ranks[i], basecomm);
2748 assert(cc == MPI_SUCCESS);
2749 assert(lane->running_sublanes[i] == 0);
2750 lane->running_sublanes[i] = 1;
2752 assert(cc == MPI_SUCCESS);
2755 _Bool mainlane = (lane->level == x->level);
2757 assert(lane->icomm == MPI_COMM_NULL);
2758 cc = kmr_join_to_workers(wf, lane);
2759 assert(cc == MPI_SUCCESS);
2773 if (lane->current_work != 0) {
2775 lane->current_work = 0;
2777 _Bool mainlane = (lane->level == x->level);
2779 _Bool dummy_initial_work_item = (x->req == KMR_SPAWN_NONE);
2780 if (dummy_initial_work_item) {
2782 + (size_t)x->work.message_size);
2785 assert(lane->icomm != MPI_COMM_NULL);
2786 cc = MPI_Comm_free(&lane->icomm);
2787 assert(cc == MPI_SUCCESS);
2792 if (wf->master.history_insertion_tail == 0) {
2793 assert(!wf->master.record_history);
2795 + (size_t)x->work.message_size);
2808 assert(!warn || lane != 0);
2819 snprintf(ee,
sizeof(ee),
2820 "Some work-items remain queued (%s)",
2822 kmr_warning(wf->mr, 1, ee);
2829 + (size_t)x->work.message_size);
2836 kmr_check_work_queues_empty(
struct kmr_swf *wf)
2841 for (
struct kmr_lane_state *lane = h; lane != 0; lane = lane->link) {
2842 if (lane->queue_head.next != 0) {
2844 kmr_free_work_list(wf, lane->queue_head.next, lane, 1);
2845 lane->queue_head.next = 0;
2849 kmr_error(wf->mr,
"Some work-items remain queued");
2857 if (wf->master.record_history) {
2858 assert(wf->master.history_insertion_tail != 0);
2863 assert(wf->master.history_insertion_tail->next == 0);
2864 wf->master.history_insertion_tail->next = p;
2865 wf->master.history_insertion_tail = p;
2876 kmr_err_when_swf_is_not_initialized(mr);
2877 struct kmr_swf *wf = mr->simple_workflow;
2878 if (wf->master.history_head.next == 0) {
2879 kmr_warning(wf->mr, 1,
"Workflow history not recorded.");
2882 p = wf->master.history_head.next;
2888 printf(
"work=%d for lane=%s",
2890 printf(
" run in lane=%s\n",
2906 kmr_err_when_swf_is_not_initialized(mr);
2907 struct kmr_swf *wf = mr->simple_workflow;
2908 if (wf->master.history_head.next == 0) {
2909 kmr_warning(wf->mr, 1,
"Workflow history not recorded.");
2911 int icount = (int)count;
2912 for (
int i = 0; i < icount; i++) {
2918 p = wf->master.history_head.next;
2919 while (p != 0 && i < icount) {
2923 history[i] = x->sequence_no;
2935 kmr_err_when_swf_is_not_initialized(mr);
2936 struct kmr_swf *wf = mr->simple_workflow;
2937 if (wf->master.history_head.next == 0) {
2938 kmr_warning(wf->mr, 1,
"Workflow history not recorded.");
2940 assert(wf->master.record_history);
2941 assert(wf->master.history_insertion_tail != 0);
2942 kmr_free_work_list(wf, wf->master.history_head.next, 0, 0);
2943 wf->master.history_head.next = 0;
2944 wf->master.history_insertion_tail = 0;
2955 MPI_Comm comm = wf->base_comm;
2956 _Bool tracing5 = (wf->mr->trace_map_spawn && (5 <= wf->mr->verbosity));
2960 assert(wf->master.rpc_buffer != 0 && wf->master.rpc_size > 0);
2962 int msz = (int)wf->master.rpc_size;
2963 mbuf->req = KMR_SPAWN_NONE;
2966 cc = MPI_Recv(mbuf, msz, MPI_BYTE, MPI_ANY_SOURCE,
2967 KMR_SPAWN_RPC_TAG, comm, &st);
2968 assert(cc == MPI_SUCCESS);
2969 cc = MPI_Get_count(&st, MPI_BYTE, &len);
2970 assert(cc == MPI_SUCCESS);
2971 size_t msglen = (size_t)len;
2972 int rank = st.MPI_SOURCE;
2973 assert(rank != wf->master_rank);
2982 } *xs = (
struct ss *)&st;
2983 fprintf(stderr,
"MPI_SOURCE=%d MPI_TAG=%d MPI_ERROR=%d _count=%d _cancelled=%d\n", xs->MPI_SOURCE, xs->MPI_TAG, xs->MPI_ERROR, xs->_count, xs->_cancelled); fflush(0);
2984 fprintf(stderr,
"msz=%d ty=%lx\n", msz, MPI_BYTE); fflush(0);
2987 assert(MPI_SUCCESS != -1 && MPI_ERR_SPAWN != -1);
2989 switch (mbuf->req) {
2990 case KMR_SPAWN_NEXT: {
2996 assert(lane == 0 || (lane->sublanes == 0 && lane->workers != 0));
2997 if (w->initial_message != 0) {
3000 snprintf(ee,
sizeof(ee),
3001 (
"Unexpectedly receive a worker joining message" 3002 " (from rank=%d)"), rank);
3003 kmr_error(wf->mr, ee);
3007 lane->n_joined_ranks++;
3008 top->n_joined_ranks++;
3009 assert(lane->n_joined_ranks <= lane->workers->n);
3010 assert(top->n_joined_ranks <= top->total_ranks);
3012 wf->master.idle_ranks++;
3017 if (w->initial_message != 0) {
3019 (
";;KMR [%05d] rank=%d joined" 3020 " (workers=%d/%d; idle=%d).\n"),
3022 top->n_joined_ranks, top->total_ranks,
3023 wf->master.idle_ranks);
3026 fprintf(stderr,
";;KMR [%05d] rank=%d requesting a work.\n",
3033 if (top->n_joined_ranks < top->total_ranks) {
3034 return MPI_ERR_PENDING;
3038 }
else if (lane != 0) {
3042 if (cc == MPI_SUCCESS) {
3044 fprintf(stderr,
";;KMR [%05d] Workflow finished.\n",
3051 return MPI_ERR_PENDING;
3059 snprintf(ee,
sizeof(ee),
3060 "Bad RPC message request=0x%x length=%zd from rank=%d",
3061 mbuf->req, msglen, rank);
3062 kmr_error(wf->mr, ee);
3074 kmr_activate_workers(
struct kmr_swf *wf, _Bool shutdown)
3076 MPI_Comm comm = wf->base_comm;
3084 memset(mbuf, 0, msz);
3085 mbuf->req = KMR_SPAWN_WORK;
3086 mbuf->protocol_version = KMR_SPAWN_MAGIC;
3087 mbuf->message_size = (int)msz;
3091 for (
int i = 0; i < wf->nprocs; i++) {
3092 if (i != wf->master_rank) {
3093 cc = MPI_Send(mbuf, (
int)msz, MPI_BYTE, i,
3094 KMR_SPAWN_RPC_TAG, comm);
3095 assert(cc == MPI_SUCCESS);
3102 memset(mbuf, 0, msz);
3103 mbuf->req = KMR_SPAWN_NONE;
3104 mbuf->protocol_version = KMR_SPAWN_MAGIC;
3105 for (
int i = 0; i < wf->nprocs; i++) {
3106 if (i != wf->master_rank) {
3107 cc = MPI_Send(mbuf, (
int)msz, MPI_BYTE, i,
3108 KMR_SPAWN_RPC_TAG, comm);
3109 assert(cc == MPI_SUCCESS);
3122 int rank, MPI_Comm basecomm)
3126 int len = (int)msglen;
3127 cc = MPI_Send(w, len, MPI_BYTE, rank, KMR_SPAWN_RPC_TAG, basecomm);
3128 assert(cc == MPI_SUCCESS);
3138 MPI_Comm basecomm = wf->base_comm;
3139 _Bool tracing5 = (wf->mr->trace_map_spawn && (5 <= wf->mr->verbosity));
3144 fprintf(stderr,
";;KMR [%05d] Connect to workers (lane=%s).\n",
3149 assert(lane->icomm == MPI_COMM_NULL);
3150 int leader = lane->leader_rank;
3151 int nranks = lane->total_ranks;
3152 cc = MPI_Intercomm_create(MPI_COMM_SELF, 0, basecomm, leader,
3153 KMR_SPAWN_ICOMM_TAG, &lane->icomm);
3154 assert(cc == MPI_SUCCESS);
3157 cc = MPI_Comm_remote_size(lane->icomm, &nprocs);
3158 assert(cc == MPI_SUCCESS);
3159 if (nranks != nprocs) {
3161 snprintf(ee,
sizeof(ee),
3162 "Bad inter-communicator size (%d!=%d)", nprocs, nranks);
3163 kmr_error(wf->mr, ee);
3176 if (kmr_fake_spawn_hooks != 0 && kmr_fake_spawn_hooks != hooks) {
3179 kmr_fake_spawn_hooks = hooks;
3197 char *e = &(w->args[asz]);
3200 while (p[0] != 0 && p < (e - 1)) {
3202 while (p[0] != 0 && p < (e - 1)) {
3210 assert(p == (e - 1) || p[0] == 0);
3213 char *argv[argc + 1];
3218 char *e = &(w->args[asz]);
3221 while (p[0] != 0 && p < (e - 1)) {
3225 while (p[0] != 0 && p < (e - 1)) {
3233 assert(p == (e - 1) || p[0] == 0);
3239 hooks->s.running_work = w;
3240 hooks->s.mpi_initialized = 1;
3242 if (hooks->s.print_trace) {
3245 printf(
";;KMR [%05d] EXEC: %s\n",
3246 hooks->s.base_rank, aa);
3250 hooks->s.running_work = 0;
3251 hooks->s.mpi_initialized = 0;
Lane Number (at-all-ranks).
static struct kmr_lane_vector * kmr_make_bottom_lanes(struct kmr_swf *wf, struct kmr_lane_no *laneids, struct kmr_pair laneranks[][KMR_LANE_LEVELS])
Makes lanes at the bottom levels (at-the-master).
Key-Value Stream (abstract).
static int kmr_yield_for_lane(struct kmr_swf *wf, struct kmr_lane_state *lane, int sublaneindex)
Schedules a next work-item when a worker or a sublane finishes (at-the-master).
Utilities Private Part (do not include from applications).
static void kmr_free_lanes(struct kmr_swf *wf, struct kmr_lane_state *lane)
Frees a lane and its sublanes, recursively (at-the-master).
static int kmr_find_leader(struct kmr_swf *wf, struct kmr_lane_state *lane, int level, struct kmr_pair laneranks[][KMR_LANE_LEVELS])
Searches a leader (rank=0) in a LANE at a LEVEL (at-the-master).
static int kmr_enqueue_work(struct kmr_swf *wf, struct kmr_lane_state *lane, struct kmr_work_item *x, _Bool multiple_any)
Enqueues a work-item in some sublane of a LANE (at-the-master).
Work-Item Queue Entry (at-the-master).
static int kmr_start_lanes(struct kmr_swf *wf, struct kmr_lane_state *lane, struct kmr_work_item *x)
Requests workers to start a work for a lane and its sublanes, and then connects to workers (at-the-ma...
void kmr_free_swf_history(KMR *mr)
Clears the history recorded in kmr_map_swf().
static int kmr_check_partitioning(struct kmr_swf *wf, int supercolor, MPI_Comm subcomm)
Checks if a sub-communicator is a partitioning of a super-communicator (at-all-ranks).
int kmr_init_swf(KMR *mr, MPI_Comm lanecomms[KMR_LANE_LEVELS], int master)
Initializes the lanes of simple workflow.
Vector of Ranks (at-the-master).
static void kmr_resolve_lanes(struct kmr_swf *wf)
Assigns a lane-number to a rank (wf->lane_id_on_proc) (at-all-ranks).
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
Work Description (at-the-master).
static int kmr_load_spawn_library(struct kmr_swf *wf, _Bool test_with_fake_spawn)
Loads the spawn-library "libkmrspawn.so".
static struct kmr_lane_no kmr_name_lane(KMR *mr, const char *s)
Parses a string as a lane-number.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
static int kmr_find_sublane_index(struct kmr_swf *wf, struct kmr_lane_state *lane)
Finds a sublane index of a superlane.
static int kmr_schedule_lanes(struct kmr_swf *wf, struct kmr_lane_state *lane)
Scehdules a lane for a next work-item (at-the-master).
int kmr_split_swf_lanes_a(KMR *mr, MPI_Comm splitcomms[KMR_LANE_LEVELS], int root, int *description[], _Bool dump)
Splits a communicator in a KMR context to ones to be used for kmr_init_swf().
static char * kmr_lane_string(struct kmr_lane_no n, _Bool print_all_levels)
(NO-THREAD-SAFE) Returns a string representation of a lane-number.
void kmr_set_swf_verbosity(KMR *mr, int level)
Sets the verbosity of the spawn-library.
static int kmr_check_lane_id(struct kmr_swf *wf, struct kmr_lane_no id, _Bool admit_any)
Checks well-formedness of a lane-number.
Work-Item Queue of a Lane (at-the-master).
static struct kmr_rank_vector * kmr_make_rank_vector(int n)
Allocates a rank vector, filling all entries with -1.
Handy Copy of a Key-Value Field.
Workflow State (at-all-ranks).
Options to Mapping by Spawns.
int kmr_fin(void)
Clears the environment.
static struct kmr_lane_state * kmr_allocate_lane(int level, struct kmr_lane_no id, int nprocs)
Makes a lane structure (at-the-master).
int kmr_split_swf_lanes(KMR *mr, MPI_Comm splitcomms[KMR_LANE_LEVELS], int root, char *description[], _Bool dump)
Splits a communicator in a KMR context to ones to be used for kmr_init_swf().
Vector of Lanes (at-the-master).
static struct kmr_lane_vector * kmr_make_lane_vector(int n, struct kmr_lane_state *lanes[])
Packs lanes in a vector.
static int kmr_dequeue_scattered_work(struct kmr_swf *wf, struct kmr_lane_state *lane, struct kmr_work_item *x)
Removes all occurrences of a work-item (which may be scattered for an any-lane) from the all queues...
#define KMR_LANE_LEVELS
Maximum Levels of Lanes.
void kmr_dump_swf_order_history(KMR *mr, int *history, size_t count)
Returns a list of start ordering of the work-items.
static int kmr_handle_worker_request(struct kmr_swf *wf, _Bool joining)
(spawn-library-protocol) Handles requests from workers.
static int kmr_count_bottom_level_lanes(struct kmr_lane_state *lane)
Counts the number of the bottom level lanes (at-the-master).
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
void kmr_dump_swf_history(KMR *mr)
Prints the history of kmr_map_swf(), which is the start ordering the work-items.
int kmr_map_swf(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps with a simple workflow.
int kmr_finish_swf(KMR *mr)
Clears the lanes of simple workflow.
static void kmr_make_lanes(struct kmr_swf *wf)
Initializes thelanes at the master rank (at-all-ranks).
static void kmr_clear_lane_id(struct kmr_lane_no *id)
Clears the lane-number to a null-lane.
static int kmr_color_subcommunicator(struct kmr_swf *wf, MPI_Comm subcomm, MPI_Comm supercomm)
Colors sub-communicators distinctly in a super-communicator, and returns the color which names names ...
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv().
int kmr_make_printable_argv_string(char *s, size_t sz, char **argv)
Fills the string buffer with the argv strings for printing.
int kmr_detach_swf_workers(KMR *mr)
Disengages the workers from main processing and puts them in the service loop for spawning...
int kmr_stop_swf_workers(KMR *mr)
Finishes the workers of workflow.
void kmr_dump_swf_lanes(KMR *mr)
Dumps lanes created by kmr_init_swf().
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
static int kmr_find_worker_index(struct kmr_swf *wf, struct kmr_lane_state *lane, int rank)
Finds a worker index of a lane for a rank.
static void kmr_bond_sublanes(struct kmr_swf *wf, struct kmr_lane_state *sup, struct kmr_lane_state *lanes[], int nlanes)
Builds a vector of sublanes for the superlane SUP (at-the-master).
static unsigned long kmr_color_of_lane(struct kmr_lane_no id)
Returns a lane-number as a single integer color.
static void kmr_bond_all_lanes(struct kmr_swf *wf, struct kmr_lane_vector *v, struct kmr_pair laneranks[][KMR_LANE_LEVELS])
Collects lanes to make a superlane, which build up to a single top-lane (at-the-master).
static _Bool kmr_lane_eq(struct kmr_lane_no n0, struct kmr_lane_no n1, int level)
Compares lane-numbers up to the LEVEL.
static int kmr_level_of_lane(struct kmr_lane_no n, _Bool admit_any)
Returns the maximum level of a given lane-number (zero to KMR_LANE_LEVELS-1), or returns -1 for a nul...