KMR
kmrwfmap.c
Go to the documentation of this file.
1 /* kmrwfmap.c (2016-07-14) -*-Coding: us-ascii;-*- */
2 /* Copyright (C) 2012-2018 RIKEN R-CCS */
3 
4 /** \file kmrwfmap.c Simple Workflow by Static-Spawning. It needs a
5  library for static-spawning "libkmrspawn.so" (it is only available
6  on K). It runs MPI executables under the control of a simple
7  master/worker scheduler. It groups ranks as lanes, where the
8  lanes are hierarchically split into maximum four levels of
9  sublanes. Each lane is associated with a subworld communicator.
10  In the following diagram, lane (0) is split into two lanes (0.0)
11  and (0.1), and they are split further. A work-item (or job/task)
12  is enqueued in a lane specified by a list of lane-numbers like
13  (0.1) or (0.1.1). A work-item entered in a lane runs using all
14  sublanes below it. Work-items in each lane are scheduled in the
15  FIFO order. The single and dedicated master rank keeps track of
16  running/idling lanes.
17 
18  ~~~~
19  Split of Lanes (three levels):
20  (0).-.(0.0).-.(0.0.0).-.a-number-of-ranks
21  ....-.......-.(0.0.1).-.a-number-of-ranks
22  ....-.......-.(0.0.2).-.a-number-of-ranks
23  ....-.(0.1).-.(0.1.0).-.a-number-of-ranks
24  ....-.......-.(0.1.1).-.a-number-of-ranks
25  ....-.......-.(0.1.2).-.a-number-of-ranks
26  (1).-.(1.0).-.(1.0.0).-.a-number-of-ranks
27  ....-.......-.(1.0.1).-.a-number-of-ranks
28  ....-.......-.(1.0.2).-.a-number-of-ranks
29  ~~~~
30 
31  The design of this workflow specifies explicitly the scheduling,
32  and thus the data-flow (dependency) is implicit. The scheduler at
33  each rank is almost stateless, where the state is stored in the
34  spawning library. The include file "kmrspawn.h" is an interface
35  to the spawning library. IMPLEMENTATION NOTE: The file
36  "kmrspawn.c" is copied from the spawning library to use it as a
37  dummy worker. IMPLEMENTATION NOTE: Creation of
38  inter-communicators for subworlds is serialized, and a use of a
39  single tag is sufficient. */
40 
41 #include <mpi.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <stddef.h>
45 #include <unistd.h>
46 #include <limits.h>
47 #include <string.h>
48 #include <dlfcn.h>
49 #include <errno.h>
50 #include <assert.h>
51 #ifdef _OPENMP
52 #include <omp.h>
53 #endif
54 #include "kmr.h"
55 #include "kmrimpl.h"
56 #include "kmrspawn.h"
57 
58 #define MAX(a,b) (((a)>(b))?(a):(b))
59 
60 /** Maximum Levels of Lanes. */
61 
62 #define KMR_LANE_LEVELS (4)
63 
64 static short const KMR_NO_LANE = -1;
65 static short const KMR_ANY_LANE = -2;
66 
67 /** Lane Number (at-all-ranks). Lanes are hierarchical partitioning
68  of communicators, and their depth is limited by KMR_LANE_LEVELS.
69  Currently, the lane-number (structure kmr_lane_no) is stored as
70  unsigned-long and the levels are limited by four. Unspecifed
71  levels must be set to KMR_NO_LANE. If KMR_ANY_LANE appears at a
72  level, it should so at the subordinate levels. */
73 
74 struct kmr_lane_no {short v[KMR_LANE_LEVELS];};
75 
76 /*static const int KMR_ENQ_TAG = 610;*/
77 
78 /** Work Description (at-the-master). The tail of this structure is
79  truncated by the size of the argument string. It has only a
80  proper format lane-number. */
81 
82 struct kmr_work_item {
83  enum kmr_spawn_req req;
84  struct kmr_lane_no requesting_lane;
85  struct kmr_lane_no assigned_lane;
86  int level;
87  int sequence_no;
88  struct kmr_spawn_work work;
89 };
90 
91 /** Work-Item Queue Entry (at-the-master). Lists are linked on
92  QUEUE_HEAD and QUEUE_INSERTION_TAIL of a lane structure. A
93  work-item is placed in a lane and also its sublanes to mark for
94  yielding sublanes. A work-item to any-lane is placed in multiple
95  lanes. The entries in multiple lanes are removed at the first
96  dequeuing. */
97 
98 struct kmr_work_list {
99  struct kmr_work_list *next;
100  struct kmr_work_item *item;
101 };
102 
103 /** Work-Item Queue of a Lane (at-the-master). TOTAL_SUBLANES is the
104  count of all descendent sublanes (includes itself), and thus, the
105  bottom lanes have one for it. SUBLANES holds the sublanes, and it
106  is null for the bottom lanes. WORKERS holds the ranks of workers
107  of the bottom lanes. Either SUBLANES or WORKERS is null.
108  CURRENT_WORK holds the running work-item. N_JOINED_RANKS and
109  RUNNING_SUBLANES is used only for assertions. N_JOINED_RANKS is
110  valid only on the top and the bottom lanes. */
111 
113  struct kmr_lane_no lane_id;
114  int level;
115  int leader_rank;
116  int total_sublanes;
117  int total_ranks;
118  struct kmr_lane_state *superlane;
119  struct kmr_lane_vector *sublanes;
120  struct kmr_rank_vector *workers;
121 
122  /* Transient states below. */
123 
124  MPI_Comm icomm;
125  struct kmr_work_list queue_head;
126  struct kmr_work_list *queue_insertion_tail;
127  struct kmr_work_item *current_work;
128  struct kmr_work_item *yielding_to_superlane;
129  int n_joined_ranks;
130  int n_running_sublanes;
131  _Bool *running_sublanes;
132 
133  /* (Temporary used in initialization). */
134 
135  struct kmr_lane_state *link;
136 };
137 
138 /** Vector of Lanes (at-the-master). */
139 
141  int n;
142  struct kmr_lane_state *lanes[1] /*[n]*/;
143 };
144 
145 /** Vector of Ranks (at-the-master). */
146 
148  int n;
149  int ranks[1] /*[n]*/;
150 };
151 
152 struct kmr_pair {int size; int rank;};
153 
154 /** Workflow State (at-all-ranks). It is saved in KMR context as
155  (mr->simple_workflow). The state on workers is only available
156  during a setup. BASE_COMM, NPROCS, and RANK are of the copies of
157  ones of the KMR context MR. BASE_COMM is a communicator of all
158  ranks. LANE_COMMS holds the communicators associated with the
159  lanes, which are passed by kmr_init_swf(). TOP_LANE is a
160  superlane of all lanes. LIST_OF_ALL_LANES holds all lanes used
161  for initialization and finalization. */
162 
163 struct kmr_swf {
164  KMR *mr;
165  MPI_Comm base_comm;
166  int nprocs;
167  int rank;
168 
169  int master_rank;
170  struct kmr_lane_no lane_id_on_proc;
171  MPI_Comm lane_comms[KMR_LANE_LEVELS];
172 
173  size_t args_size;
174 
175  /* Slots on the master. */
176 
177  struct {
178  int idle_ranks;
179  struct kmr_lane_state *top_lane;
180  struct kmr_lane_state **lane_of_workers /*[nprocs]*/;
181  struct kmr_lane_state *list_of_all_lanes;
182  union kmr_spawn_rpc *rpc_buffer;
183  size_t rpc_size;
184  struct kmr_work_list history_head;
185  struct kmr_work_list *history_insertion_tail;
186  _Bool record_history;
187  //_Bool force_free_communicators;
188  } master;
189 
190  /* Static-Spawning API for Wokers. */
191 
192  struct kmr_spawn_hooks *hooks;
193  int (*kmr_spawn_hookup)(struct kmr_spawn_hooks *hooks);
194  int (*kmr_spawn_setup)(struct kmr_spawn_hooks *hooks,
195  MPI_Comm basecomm, int masterrank,
196  int (*execfn)(struct kmr_spawn_hooks *,
197  int, char **),
198  int nsubworlds,
199  MPI_Comm subworlds[], unsigned long colors[],
200  size_t argssize);
201  void (*kmr_spawn_service)(struct kmr_spawn_hooks *hooks, int status);
202  void (*kmr_spawn_set_verbosity)(struct kmr_spawn_hooks *hooks, int level);
203 };
204 
205 /* Save area of hooks. It is set by kmr_spawn_hookup(). This is used
206  as a fake of libkmrspawn.so. */
207 
208 #if 0 /*AHO*/
209 static struct kmr_spawn_hooks *kmr_fake_spawn_hooks = 0;
210 #endif
211 
212 /* ================================================================ */
213 
214 /* Initialization of Lanes of Workflow. */
215 
216 /** Compares lane-numbers up to the LEVEL. Note that comparison
217  includes the LEVEL. It returns true when (LEVEL=-1). */
218 
219 static inline _Bool
220 kmr_lane_eq(struct kmr_lane_no n0, struct kmr_lane_no n1, int level)
221 {
222  assert(-1 <= level && level < KMR_LANE_LEVELS);
223  for (int i = 0; i <= level; i++) {
224  if (n0.v[i] != n1.v[i]) {
225  return 0;
226  }
227  }
228  return 1;
229 }
230 
231 /** Clears the lane-number to a null-lane. */
232 
233 static inline void
235 {
236  for (int i = 0; i < KMR_LANE_LEVELS; i++) {
237  id->v[i] = KMR_NO_LANE;
238  }
239 }
240 
241 /** Parses a string as a lane-number. It signals an error for
242  improper format strings, but, it does not check the range of each
243  index. Examples are: "", "3", "3.3.1", "*", "3.3.*", "3.*.*" (""
244  is parsed but unusable). Illegal examples are: ".", "3.",
245  "3.*.3". */
246 
247 static struct kmr_lane_no
248 kmr_name_lane(KMR *mr, const char *s)
249 {
250  struct kmr_lane_no n;
251  for (int i = 0; i < KMR_LANE_LEVELS; i++) {
252  n.v[i] = KMR_NO_LANE;
253  }
254  const char *e = (s + strlen(s));
255  const char *p;
256  p = s;
257  for (int i = 0; i < KMR_LANE_LEVELS && p < e; i++) {
258  if (*p == '*') {
259  if ((p + 1) == e) {
260  /* Tail, OK. */
261  n.v[i] = KMR_ANY_LANE;
262  p++;
263  break;
264  } else if (*(p + 1) == '.' && (p + 2) == e) {
265  char ee[80];
266  snprintf(ee, sizeof(ee),
267  ("Bad lane string; dot at the tail (%s)"), s);
268  kmr_error(mr, ee);
269  abort();
270  break;
271  } else if (*(p + 1) == '.') {
272  /* "*.", OK. */
273  n.v[i] = KMR_ANY_LANE;
274  p += 2;
275  } else {
276  char ee[80];
277  snprintf(ee, sizeof(ee),
278  ("Bad lane string; * followed by something (%s)"), s);
279  kmr_error(mr, ee);
280  abort();
281  break;
282  }
283  } else {
284  int d;
285  char dot[8];
286  int cc = sscanf(p, "%d%c", &d, dot);
287  if (cc == 1 || (cc == 2 && dot[0] == '.')) {
288  while (p < e && *p != '.') {
289  assert('0' <= *p && *p <= '9');
290  p++;
291  }
292  if (p < e && *p == '.') {
293  p++;
294  }
295  _Bool sawany = (i > 0 && n.v[i - 1] == KMR_ANY_LANE);
296  if (sawany) {
297  char ee[80];
298  snprintf(ee, sizeof(ee),
299  ("Bad lane string; * at non-tail (%s)"), s);
300  kmr_error(mr, ee);
301  abort();
302  break;
303  } else if (cc == 2 && p == e) {
304  char ee[80];
305  snprintf(ee, sizeof(ee),
306  ("Bad lane string; dot at the tail (%s)"), s);
307  kmr_error(mr, ee);
308  abort();
309  break;
310  } else {
311  n.v[i] = (short)d;
312  }
313  } else if (cc == 0) {
314  char ee[80];
315  snprintf(ee, sizeof(ee),
316  ("Bad lane string; non-digit appears (%s)"), s);
317  kmr_error(mr, ee);
318  abort();
319  break;
320  } else {
321  char ee[80];
322  snprintf(ee, sizeof(ee),
323  ("Bad lane string; non-digit or bad dot (%s)"), s);
324  kmr_error(mr, ee);
325  abort();
326  break;
327  }
328  }
329  }
330  if (p != e) {
331  char ee[80];
332  snprintf(ee, sizeof(ee),
333  ("Bad lane string; garbage at the tail (%s)"), s);
334  kmr_error(mr, ee);
335  abort();
336  }
337  return n;
338 }
339 
340 /** (NO-THREAD-SAFE) Returns a string representation of a lane-number.
341  It returns "-" for a null-lane. */
342 
343 static char *
344 kmr_lane_string(struct kmr_lane_no n, _Bool print_all_levels)
345 {
346  static char buf[20];
347  snprintf(buf, sizeof(buf), "-");
348  char *e = (buf + sizeof(buf));
349  char *p;
350  p = buf;
351  for (int i = 0; i < KMR_LANE_LEVELS && p < e; i++) {
352  int q = n.v[i];
353  if (q == KMR_NO_LANE) {
354  if (print_all_levels) {
355  char *dot = (i == 0 ? "" : ".");
356  int cc = snprintf(p, (size_t)(e - p), "%s-", dot);
357  p += cc;
358  }
359  if (!print_all_levels) {
360  break;
361  }
362  } else if (q == KMR_ANY_LANE) {
363  char *dot = (i == 0 ? "" : ".");
364  int cc = snprintf(p, (size_t)(e - p), "%s*", dot);
365  p += cc;
366  if (!print_all_levels) {
367  break;
368  }
369  } else {
370  char *dot = (i == 0 ? "" : ".");
371  int cc = snprintf(p, (size_t)(e - p), "%s%d", dot, q);
372  p += cc;
373  }
374  }
375  buf[sizeof(buf) - 1] = 0;
376  return buf;
377 }
378 
379 /** Returns the maximum level of a given lane-number (zero to
380  KMR_LANE_LEVELS-1), or returns -1 for a null-lane. */
381 
382 static int
383 kmr_level_of_lane(struct kmr_lane_no n, _Bool admit_any)
384 {
385  int level;
386  level = (KMR_LANE_LEVELS - 1);
387  for (int i = 0; i < KMR_LANE_LEVELS; i++) {
388  assert(admit_any || n.v[i] != KMR_ANY_LANE);
389  assert(!(n.v[i] == KMR_NO_LANE && (i + 1) < KMR_LANE_LEVELS)
390  || n.v[i + 1] == KMR_NO_LANE);
391  assert(!(n.v[i] == KMR_ANY_LANE && (i + 1) < KMR_LANE_LEVELS)
392  || n.v[i + 1] == KMR_NO_LANE || n.v[i + 1] == KMR_ANY_LANE);
393  if (level == (KMR_LANE_LEVELS - 1) && n.v[i] == KMR_NO_LANE) {
394  level = (i - 1);
395  }
396  }
397  return level;
398 }
399 
400 /** Returns a lane-number as a single integer color. A color is used
401  to check the identity of a lane in assertions. A color is used in
402  place of a lane in the spawn-library, because it does not know the
403  lanes. */
404 
405 static inline unsigned long
407 {
408  union {struct kmr_lane_no id; unsigned long color;} u = {.id = id};
409  return u.color;
410 }
411 
412 /** Finds a worker index of a lane for a rank. */
413 
414 static inline int
416  int rank)
417 {
418  assert(lane->workers != 0);
419  struct kmr_rank_vector *u = lane->workers;
420  int s;
421  s = -1;
422  for (int i = 0; i < u->n; i++) {
423  if (u->ranks[i] == rank) {
424  s = i;
425  break;
426  }
427  }
428  assert(s != -1);
429  return s;
430 }
431 
432 /** Finds a sublane index of a superlane. */
433 
434 static inline int
436 {
437  assert(lane->superlane != 0);
438  struct kmr_lane_state *sup = lane->superlane;
439  assert(sup->sublanes != 0);
440  struct kmr_lane_vector *v = sup->sublanes;
441  int s;
442  s = -1;
443  for (int i = 0; i < v->n; i++) {
444  if (v->lanes[i] == lane) {
445  s = i;
446  break;
447  }
448  }
449  assert(s != -1);
450  return s;
451 }
452 
453 /** Packs lanes in a vector. It returns an unfilled vector when a
454  null LANES is passed. */
455 
456 static struct kmr_lane_vector *
457 kmr_make_lane_vector(int n, struct kmr_lane_state *lanes[])
458 {
459  size_t vsz = (offsetof(struct kmr_lane_vector, lanes)
460  + sizeof(struct kmr_lane_state *) * (size_t)n);
461  struct kmr_lane_vector *v = kmr_malloc(vsz);
462  assert(v != 0);
463  memset(v, 0, vsz);
464  v->n = n;
465  if (lanes != 0) {
466  for (int q = 0; q < n; q++) {
467  v->lanes[q] = lanes[q];
468  }
469  }
470  return v;
471 }
472 
473 /** Allocates a rank vector, filling all entries with -1. */
474 
475 static struct kmr_rank_vector *
477 {
478  size_t vsz = (offsetof(struct kmr_rank_vector, ranks)
479  + sizeof(int) * (size_t)n);
480  struct kmr_rank_vector *v = kmr_malloc(vsz);
481  assert(v != 0);
482  v->n = n;
483  for (int i = 0; i < n; i++) {
484  v->ranks[i] = -1;
485  }
486  return v;
487 }
488 
489 static void
490 kmr_err_when_swf_is_not_initialized(KMR *mr)
491 {
492  struct kmr_swf *wf = mr->simple_workflow;
493  if (wf == 0) {
494  kmr_error(mr, "Workflow-mapper is not initialized");
495  abort();
496  }
497 }
498 
499 /** Sets the verbosity of the spawn-library. LEVEL is 1 to 3, where 3
500  is the most verbose. It should be called after kmr_init_swf() and
501  before detaching by kmr_detach_swf_workers() to affect worker
502  ranks. */
503 
504 void
505 kmr_set_swf_verbosity(KMR *mr, int level)
506 {
507  kmr_err_when_swf_is_not_initialized(mr);
508  struct kmr_swf *wf = mr->simple_workflow;
509  if (wf->kmr_spawn_set_verbosity != 0) {
510  (*wf->kmr_spawn_set_verbosity)(wf->hooks, level);
511  }
512 }
513 
514 static int kmr_load_spawn_library(struct kmr_swf *wf,
515  _Bool test_with_fake_spawn);
516 static void kmr_resolve_lanes(struct kmr_swf *wf);
517 static void kmr_make_lanes(struct kmr_swf *wf);
518 static void kmr_free_lanes(struct kmr_swf *wf, struct kmr_lane_state *lane);
519 
520 /** Initializes the lanes of simple workflow. Lanes of workflow are
521  created corresponding to communicators in LANECOMMS[], one lane
522  for each communicator. Work-items in a lane at a level are
523  executed in the communicator LANECOMMS[level]. MASTER specifies
524  the master rank, which should not be included in any communicator,
525  because the master needs to be distinct from the workers. */
526 
527 int
528 kmr_init_swf(KMR *mr, MPI_Comm lanecomms[KMR_LANE_LEVELS], int master)
529 {
530  _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
531  _Bool test_with_fake_spawn = mr->swf_debug_master;
532 
533  int cc;
534 
535  /* Check if a lane-number can be stored in kmr_spawn_work.color,
536  which is used to check the consistent use of split
537  communicators. */
538 
539  assert(sizeof(struct kmr_lane_no) <= sizeof(unsigned long));
540 
541  if (mr->simple_workflow != 0) {
542  kmr_error(mr, "Workflow-mapper is already initialized");
543  abort();
544  return MPI_ERR_SPAWN;
545  }
546 
547  if (!(master < mr->nprocs)) {
548  char ee[80];
549  snprintf(ee, sizeof(ee),
550  "Bad master rank specified (rank=%d)", master);
551  kmr_error(mr, ee);
552  abort();
553  return MPI_ERR_SPAWN;
554  }
555 
556  struct kmr_swf *wf = kmr_malloc(sizeof(struct kmr_swf));
557  assert(wf != 0);
558  memset(wf, 0, sizeof(struct kmr_swf));
559  mr->simple_workflow = wf;
560  wf->mr = mr;
561  wf->base_comm = mr->comm;
562  wf->nprocs = mr->nprocs;
563  wf->rank = mr->rank;
564 
565  wf->master_rank = master;
566  for (int i = 0; i < KMR_LANE_LEVELS; i++) {
567  wf->lane_comms[i] = lanecomms[i];
568  }
569 
570  wf->args_size = ((mr->swf_args_size != 0)
571  ? mr->swf_args_size : KMR_SPAWN_ARGS_SIZE);
572 
573  wf->master.idle_ranks = 0;
574  wf->master.top_lane = 0;
575  wf->master.lane_of_workers = 0;
576  wf->master.list_of_all_lanes = 0;
577  wf->master.rpc_buffer = 0;
578  wf->master.rpc_size = 0;
579  wf->master.history_head.next = 0;
580  wf->master.history_head.item = 0;
581  wf->master.history_insertion_tail = 0;
582  wf->master.record_history = 0;
583  //wf->master.force_free_communicators = 0;
584 
585  /* Load the spawn-library. */
586 
587  cc = kmr_load_spawn_library(wf, test_with_fake_spawn);
588  /* IGNORE FAILURES IN LOADING A LIBRARY. */
589 
590  struct kmr_spawn_hooks *hooks;
591  if (wf->hooks == 0) {
592  hooks = kmr_malloc(sizeof(struct kmr_spawn_hooks));
593  assert(hooks != 0);
594  memset(hooks, 0, sizeof(struct kmr_spawn_hooks));
595  hooks->s.mr = wf->mr;
596  hooks->s.print_trace = tracing5;
597  wf->hooks = hooks;
598  } else {
599  hooks = wf->hooks;
600  }
601  assert(wf->kmr_spawn_hookup != 0 && wf->hooks != 0);
602  cc = (*wf->kmr_spawn_hookup)(wf->hooks);
603  assert(cc == MPI_SUCCESS);
604 
605  /* Make lanes on the master. */
606 
607  kmr_resolve_lanes(wf);
608  kmr_make_lanes(wf);
609 
610  /* Inform communicators to the spawn-library. */
611 
612  {
613  unsigned long colors[KMR_LANE_LEVELS];
614  for (int level = 0; level < KMR_LANE_LEVELS; level++) {
615  struct kmr_lane_no id;
616  id = wf->lane_id_on_proc;
617  for (int i = (level + 1); i < KMR_LANE_LEVELS; i++) {
618  id.v[i] = KMR_NO_LANE;
619  }
620  unsigned long u = kmr_color_of_lane(id);
621  colors[level] = u;
622  }
623 
624  int level = kmr_level_of_lane(wf->lane_id_on_proc, 0);
625  assert(wf->rank != master || level == -1);
626  if (wf->rank != master) {
627  assert(wf->kmr_spawn_setup != 0);
628  cc = (*wf->kmr_spawn_setup)(hooks, wf->base_comm,
629  master, /*fn*/ 0,
630  KMR_LANE_LEVELS, wf->lane_comms,
631  colors, wf->args_size);
632  assert(cc == MPI_SUCCESS);
633  }
634  }
635 
636  if (wf->rank == wf->master_rank) {
637  size_t msz = (offsetof(struct kmr_spawn_work, args) + wf->args_size);
638  wf->master.rpc_buffer = kmr_malloc(msz);
639  assert(wf->master.rpc_buffer != 0);
640  wf->master.rpc_size = msz;
641  }
642 
643  return MPI_SUCCESS;
644 }
645 
646 static int kmr_handle_worker_request(struct kmr_swf *wf, _Bool joining);
647 static int kmr_activate_workers(struct kmr_swf *wf, _Bool shutdown);
648 
649 /** Disengages the workers from main processing and puts them in the
650  service loop for spawning. Only the master rank returns from this
651  call and continues processing, but the worker ranks never return
652  as if they call exit(). It replaces the communicator in the KMR
653  context with a self-communicator after saving the old communicator
654  for workflow. Replacing the communicator makes the context
655  independent from the other ranks and safe to free it. It
656  finalizes the context of workers.*/
657 
658 int
660 {
661  kmr_err_when_swf_is_not_initialized(mr);
662  struct kmr_swf *wf = mr->simple_workflow;
663  _Bool tracing5 = (wf->mr->trace_map_spawn && (5 <= wf->mr->verbosity));
664 
665  int cc;
666 
667  if (tracing5 && (wf->rank == wf->master_rank)) {
668  fprintf(stderr, (";;KMR [%05d] Detach worker ranks"
669  " and wait for join messages.\n"), wf->rank);
670  fflush(0);
671  }
672 
673  mr->comm = MPI_COMM_NULL;
674  cc = MPI_Comm_dup(MPI_COMM_SELF, &mr->comm);
675  assert(cc == MPI_SUCCESS);
676 
677  int level = kmr_level_of_lane(wf->lane_id_on_proc, 0);
678  if (wf->rank == wf->master_rank) {
679  for (;;) {
680  cc = kmr_handle_worker_request(wf, 1);
681  if (cc == MPI_SUCCESS) {
682  break;
683  }
684  }
685  return MPI_SUCCESS;
686  } else {
687  /*AHO*/ //assert(wf->mr->swf_spawner_so == 0);
688 
689  if (level == -1) {
690  if (tracing5) {
691  fprintf(stderr, (";;KMR [%05d]"
692  " Rank %d is not involved in workflow.\n"),
693  wf->rank, wf->rank);
694  fflush(0);
695  }
696  }
697 
698  /* Keep the so and hooks from freeing in kmr_free_context(). */
699 
700  void *spawnerso = wf->mr->swf_spawner_so;
701  struct kmr_spawn_hooks *hooks = wf->hooks;
702  void (*service)(struct kmr_spawn_hooks *, int) = wf->kmr_spawn_service;
703  wf->mr->swf_spawner_so = 0;
704  wf->hooks = 0;
705 
706  cc = kmr_free_context(mr);
707  assert(cc == MPI_SUCCESS);
708  cc = kmr_fin();
709  assert(cc == MPI_SUCCESS);
710 
711  hooks->s.service_count = 0;
712 
713  if (spawnerso != 0) {
714  /*exit(551);*/
715  assert(service != 0 && hooks != 0);
716  (*service)(hooks, 0);
717  } else {
718  assert(service != 0 && hooks != 0);
719  (*service)(hooks, 0);
720  }
721  abort();
722  return MPI_SUCCESS;
723  }
724 }
725 
726 /** Finishes the workers of workflow. It stops the service loop of
727  the workers and lets them exit. It should be called (immediately)
728  before MPI_Finalize() at the master rank. */
729 
730 int
732 {
733  kmr_err_when_swf_is_not_initialized(mr);
734  struct kmr_swf *wf = mr->simple_workflow;
735 
736  int cc;
737 
738  cc = kmr_activate_workers(wf, 1);
739  assert(cc == MPI_SUCCESS);
740 
741  return MPI_SUCCESS;
742 }
743 
744 static void kmr_free_work_list(struct kmr_swf *wf, struct kmr_work_list *h,
745  struct kmr_lane_state *lane, _Bool warn);
746 
747 /** Clears the lanes of simple workflow. */
748 
749 int
751 {
752  kmr_err_when_swf_is_not_initialized(mr);
753  struct kmr_swf *wf = mr->simple_workflow;
754 
755  int cc;
756 
757  if (wf->rank == wf->master_rank) {
758  if (wf->master.history_head.next != 0) {
759  assert(wf->master.record_history);
760  assert(wf->master.history_insertion_tail != 0);
761  kmr_free_work_list(wf, wf->master.history_head.next, 0, 0);
762  wf->master.history_head.next = 0;
763  wf->master.history_insertion_tail = 0;
764  }
765 
766  assert(wf->master.top_lane != 0);
767  kmr_free_lanes(wf, wf->master.top_lane);
768  wf->master.top_lane = 0;
769 
770  assert(wf->master.lane_of_workers != 0);
771  size_t qsz = (sizeof(struct kmr_lane_state *) * (size_t)wf->nprocs);
772  kmr_free(wf->master.lane_of_workers, qsz);
773  wf->master.lane_of_workers = 0;
774 
775  assert(wf->master.rpc_buffer != 0);
776  kmr_free(wf->master.rpc_buffer, wf->master.rpc_size);
777  wf->master.rpc_buffer = 0;
778  wf->master.rpc_size = 0;
779  } else {
780  /*(worker)*/
781  }
782 
783  if (wf->hooks != 0) {
784  kmr_free(wf->hooks, sizeof(struct kmr_spawn_hooks));
785  wf->hooks = 0;
786  }
787 
788  if (wf->mr->swf_spawner_so != 0) {
789  char *soname = (wf->mr->swf_spawner_library != 0
790  ? wf->mr->swf_spawner_library : "libkmrspawn.so");
791 
792  cc = dlclose(wf->mr->swf_spawner_so);
793  if (cc != 0) {
794  char ee[80];
795  snprintf(ee, sizeof(ee), "dlclose(%s): %s\n",
796  soname, dlerror());
797  kmr_warning(wf->mr, 5, ee);
798  }
799  wf->mr->swf_spawner_so = 0;
800 
801  wf->kmr_spawn_hookup = 0;
802  wf->kmr_spawn_setup = 0;
803  wf->kmr_spawn_service = 0;
804  wf->kmr_spawn_set_verbosity = 0;
805  }
806 
807  kmr_free(wf, sizeof(struct kmr_swf));
808  mr->simple_workflow = 0;
809 
810  return MPI_SUCCESS;
811 }
812 
813 static int kmr_start_worker(struct kmr_spawn_work *w, size_t msglen,
814  int rank, MPI_Comm basecomm);
815 
816 static int kmr_join_to_workers(struct kmr_swf *wf,
817  struct kmr_lane_state *lane);
818 
819 #if 0
820 static int kmr_spawn_hookup_fake(struct kmr_spawn_hooks *hooks);
821 #endif
822 
823 /** Loads the spawn-library "libkmrspawn.so". It implements
824  static-spawning, which is only available on K, FX10, and FX100. */
825 
826 static int
827 kmr_load_spawn_library(struct kmr_swf *wf, _Bool test_with_fake_spawn)
828 {
829  MPI_Comm basecomm = wf->base_comm;
830  int nprocs = wf->nprocs;
831  int rank = wf->rank;
832 
833  int cc;
834 
835  char *soname = (wf->mr->swf_spawner_library != 0
836  ? wf->mr->swf_spawner_library : "libkmrspawn.so");
837 
838  void *m = dlopen(soname, (RTLD_NOW|RTLD_GLOBAL));
839 
840  /* Check all ranks succeeded with dlopen(). */
841 
842  int ng = ((m != 0) ? nprocs : rank);
843  int ngmin;
844  cc = MPI_Allreduce(&ng, &ngmin, 1, MPI_INT, MPI_MIN, basecomm);
845  assert(cc == MPI_SUCCESS);
846  _Bool allok = (ngmin == nprocs);
847 
848  if (!allok) {
849  if (!test_with_fake_spawn) {
850  char ee[160];
851  if (m == 0) {
852  snprintf(ee, sizeof(ee), "dlopen(%s) failed: %s",
853  soname, dlerror());
854  kmr_error(wf->mr, ee);
855  abort();
856  } else {
857  kmr_error(wf->mr, "Some ranks failed to dleopn()");
858  abort();
859  }
860  } else {
861  if (rank == ngmin) {
862  assert(m == 0);
863  char ee[160];
864  snprintf(ee, sizeof(ee), "%s", dlerror());
865  kmr_warning(wf->mr, 1,
866  ("WORKFLOW-MAPPER IS UNUSABLE;"
867  " spawn-library unavailable"));
868  kmr_warning(wf->mr, 1, ee);
869  }
870 
871  if (m != 0) {
872  cc = dlclose(m);
873  if (cc != 0) {
874  char ee[160];
875  snprintf(ee, sizeof(ee), "%s", dlerror());
876  kmr_warning(wf->mr, 5, ee);
877  }
878  }
879 
880  wf->kmr_spawn_hookup = kmr_spawn_hookup_standin;
881  wf->kmr_spawn_setup = kmr_spawn_setup_standin;
882  wf->kmr_spawn_service = kmr_spawn_service_standin;
883  wf->kmr_spawn_set_verbosity = kmr_spawn_set_verbosity_standin;
884  }
885  assert(wf->mr->swf_spawner_so == 0);
886  return MPI_ERR_SPAWN;
887  } else {
888  char *fn[10] = {"kmr_spawn_hookup",
889  "kmr_spawn_setup",
890  "kmr_spawn_service",
891  "kmr_spawn_set_verbosity", 0};
892  intptr_t fp[10];
893 
894  wf->mr->swf_spawner_so = m;
895 
896  for (int i = 0; (i < 10 && fn[i] != 0); i++) {
897  fp[i] = (intptr_t)dlsym(m, fn[i]);
898  if (fp[i] == 0) {
899  char ee[80];
900  snprintf(ee, sizeof(ee), "dlsym(%s): %s\n", fn[i], dlerror());
901  kmr_warning(wf->mr, 5, ee);
902  }
903  }
904  wf->kmr_spawn_hookup = (int (*)())fp[0];
905  wf->kmr_spawn_setup = (int (*)())fp[1];
906  wf->kmr_spawn_service = (void (*)())fp[2];
907  wf->kmr_spawn_set_verbosity = (void (*)())fp[3];
908  return MPI_SUCCESS;
909  }
910 }
911 
912 static void kmr_dump_split_lanes(KMR *mr, struct kmr_lane_no id);
913 
914 /** Splits a communicator in a KMR context to ones to be used for
915  kmr_init_swf(). This is a utility. It is restricted to make
916  lanes having the same depth of levels. DESCRIPTION is an array of
917  a list of positive integers terminated by zeros in the form
918  D[0]={L0,L1,...,Lk-1,0}, D[1]={M0,M1,...,Mj-1,0},
919  D[2]={N0,N1,...,Ni-1,0}. Here, it is the depth=3 case. N0 to
920  Ni-1 specifies how to split ranks (the bottom level) to i groups.
921  They must satisfy (N0+...+Ni-1)<=(nprocs-1). It needs at least
922  one spare rank for the master. M0 to Mj-1 specifies the 2nd
923  bottom level, and it must satisfy (M0+...+Mj-1)=i. L0 to Lk-1
924  specifies the top level, and it must satisfy (L0+...+Lk-1)=j.
925  DESCRIPTION and NLAYERS need to be valid only on rank0 (they are
926  broadcasted inside this routine). The ranks not a member of a
927  lane have a null communicator at each level. */
928 
929 int
930 kmr_split_swf_lanes_a(KMR *mr, MPI_Comm splitcomms[KMR_LANE_LEVELS],
931  int root, int *description[], _Bool dump)
932 {
933  const MPI_Comm basecomm = mr->comm;
934  const int nprocs = mr->nprocs;
935  const int rank = mr->rank;
936  /*const int root = 0;*/
937 
938  int cc;
939 
940  struct kmr_lane_no illegallane;
941  kmr_clear_lane_id(&illegallane);
942 
943  for (int d = 0; d < KMR_LANE_LEVELS; d++) {
944  splitcomms[d] = MPI_COMM_NULL;
945  }
946 
947  /* Broadcast description arrays from rank0. */
948 
949  int depth;
950  int len[KMR_LANE_LEVELS];
951  int *desc[KMR_LANE_LEVELS];
952 
953  {
954  if (mr->rank == root) {
955  depth = (KMR_LANE_LEVELS + 1);
956  for (int i = 0; i < (KMR_LANE_LEVELS + 1); i++) {
957  if (description[i] == 0) {
958  depth = i;
959  break;
960  }
961  }
962  if (depth > KMR_LANE_LEVELS) {
963  char ee[80];
964  snprintf(ee, sizeof(ee),
965  ("Bad lane description,"
966  " no terminating null"));
967  kmr_error(mr, ee);
968  abort();
969  }
970  }
971  cc = MPI_Bcast(&depth, 1, MPI_INT, root, basecomm);
972  assert(cc == MPI_SUCCESS);
973 
974  if (mr->rank == root) {
975  for (int d = 0; d < depth; d++) {
976  int *v = description[d];
977  int i;
978  for (i = 0; v[i] != 0; i++);
979  len[d] = i;
980  }
981  }
982  cc = MPI_Bcast(len, depth, MPI_INT, root, basecomm);
983  assert(cc == MPI_SUCCESS);
984 
985  for (int d = 0; d < depth; d++) {
986  if (mr->rank == root) {
987  desc[d] = description[d];
988  } else {
989  desc[d] = kmr_malloc(sizeof(int) * (size_t)(len[d] + 1));
990  assert(desc[d] != 0);
991  }
992  cc = MPI_Bcast(desc[d], (len[d] + 1), MPI_INT, root, basecomm);
993  assert(cc == MPI_SUCCESS);
994  }
995  }
996 
997  /* Color by descriptions from the bottom. */
998 
999  struct kmr_lane_no colors = illegallane;
1000 
1001  for (int d = (depth - 1); d >= 0; d--) {
1002  assert(KMR_NO_LANE == -1);
1003 
1004  int sublanecolor = ((d == (depth - 1)) ? mr->rank : (colors.v[d + 1]));
1005  int *v = desc[d];
1006 
1007  int color;
1008  color = -1;
1009  int sum;
1010  sum = 0;
1011  for (int i = 0; i < len[d]; i++) {
1012  sum += v[i];
1013  if (sublanecolor != -1 && sublanecolor < sum && color == -1) {
1014  color = i;
1015  }
1016  }
1017 
1018  _Bool ok = ((d == (depth - 1)) ? (sum < nprocs) : (sum == len[d + 1]));
1019  if (!ok) {
1020  char ee[80];
1021  snprintf(ee, sizeof(ee),
1022  ("Bad lane description,"
1023  " sum of ranks/lanes are too large"
1024  " (lanes=%d level=%d)"),
1025  sum, d);
1026  kmr_error(mr, ee);
1027  abort();
1028  }
1029 
1030  assert(d == (depth - 1) || color != -1 || sublanecolor == -1);
1031  colors.v[d] = (short)color;
1032  }
1033 
1034  /* Split the base communicator from the bottom. */
1035 
1036  for (int d = (depth - 1); d >= 0; d--) {
1037  int color = ((colors.v[d] != -1) ? colors.v[d] : MPI_UNDEFINED);
1038  cc = MPI_Comm_split(basecomm, color, rank, &splitcomms[d]);
1039  assert(cc == MPI_SUCCESS);
1040  assert(color != MPI_UNDEFINED || splitcomms[d] == MPI_COMM_NULL);
1041  }
1042 
1043  if (dump) {
1044  kmr_dump_split_lanes(mr, colors);
1045  }
1046 
1047  for (int d = 0; d < depth; d++) {
1048  if (mr->rank != root) {
1049  kmr_free(desc[d], (sizeof(int) * (size_t)(len[d] + 1)));
1050  desc[d] = 0;
1051  }
1052  }
1053 
1054  return MPI_SUCCESS;
1055 }
1056 
1057 /** Splits a communicator in a KMR context to ones to be used for
1058  kmr_init_swf(). This is a utility. DESCRIPTION is a vector of
1059  strings terminated by a null-string. A line consists of a
1060  lane-number, a separator colon, and a number of ranks. Thus, each
1061  line looks like "3.3.3:4". It does not accept any whitespaces.
1062  Note that the descriptions are to distinguish lanes, and the
1063  lane-numbers can change, because they are once translated to
1064  communicators. */
1065 
1066 int
1067 kmr_split_swf_lanes(KMR *mr, MPI_Comm splitcomms[KMR_LANE_LEVELS],
1068  int root, char *description[], _Bool dump)
1069 {
1070  const MPI_Comm basecomm = mr->comm;
1071  const int nprocs = mr->nprocs;
1072  const int rank = mr->rank;
1073  /*const int root = 0;*/
1074  struct desc {struct kmr_lane_no colors; int ranks;};
1075 
1076  struct kmr_lane_no illegalcolor;
1077  kmr_clear_lane_id(&illegalcolor);
1078 
1079  int cc;
1080 
1081  for (int d = 0; d < KMR_LANE_LEVELS; d++) {
1082  splitcomms[d] = MPI_COMM_NULL;
1083  }
1084 
1085  /* Count description lines. */
1086 
1087  int nlines;
1088  if (rank == root) {
1089  nlines = -1;
1090  for (int i = 0; i < nprocs; i++) {
1091  if (description[i] == 0) {
1092  nlines = i;
1093  break;
1094  }
1095  }
1096  if (nlines == -1) {
1097  char ee[80];
1098  snprintf(ee, sizeof(ee),
1099  ("Bad lane description,"
1100  " no terminating null"));
1101  kmr_error(mr, ee);
1102  abort();
1103  }
1104  }
1105 
1106  struct desc *lines;
1107  if (rank == root) {
1108  lines = kmr_malloc(sizeof(struct desc) * (size_t)nlines);
1109  assert(lines != 0);
1110  } else {
1111  lines = 0;
1112  }
1113 
1114  /* Scan description lines. */
1115 
1116  if (rank == root) {
1117  for (int i = 0; i < nlines; i++) {
1118  char *s = description[i];
1119  char buf[80];
1120  size_t len = (strlen(s) + 1);
1121  if (len > sizeof(buf)) {
1122  char ee[80];
1123  snprintf(ee, sizeof(ee),
1124  ("Bad lane description,"
1125  " string too long (%s)"), s);
1126  kmr_error(mr, ee);
1127  abort();
1128  }
1129  memcpy(buf, s, len);
1130  char *p;
1131  p = buf;
1132  while (p[0] != 0 && p[0] != ':') {p++;}
1133  if (p[0] != ':') {
1134  char ee[80];
1135  snprintf(ee, sizeof(ee),
1136  ("Bad lane description,"
1137  " no separator colon (%s)"), s);
1138  kmr_error(mr, ee);
1139  abort();
1140  }
1141  p[0] = 0;
1142  struct kmr_lane_no id = kmr_name_lane(mr, buf);
1143 
1144  int count;
1145  char garbage;
1146  cc = sscanf((p + 1), "%d%c", &count, &garbage);
1147  if (cc != 1) {
1148  char ee[80];
1149  snprintf(ee, sizeof(ee),
1150  ("Bad lane description,"
1151  " bad number of ranks (%s)"), s);
1152  kmr_error(mr, ee);
1153  abort();
1154  }
1155 
1156  /* Check duplicate lane-numbers. */
1157 
1158  for (int j = 0; j < i; j++) {
1159  if (kmr_lane_eq(id, lines[i].colors, (KMR_LANE_LEVELS - 1))) {
1160  char ee[80];
1161  snprintf(ee, sizeof(ee),
1162  ("Bad lane description,"
1163  " duplicate lane-numbers (%s)"),
1164  kmr_lane_string(id, 0));
1165  kmr_error(mr, ee);
1166  abort();
1167  }
1168  }
1169 
1170  lines[i].colors = id;
1171  lines[i].ranks = count;
1172  }
1173  }
1174 
1175  /* Count the depth of description lines. */
1176 
1177  int depth;
1178  depth = 0;
1179  if (rank == root) {
1180  for (int i = 0; i < nlines; i++) {
1181  struct kmr_lane_no co = lines[i].colors;
1182  while (depth < KMR_LANE_LEVELS && co.v[depth] != KMR_NO_LANE) {
1183  depth++;
1184  }
1185  }
1186  }
1187 
1188  if (rank == root) {
1189 
1190  /* Check the total count of ranks. */
1191 
1192  {
1193  int rankcount;
1194  rankcount = 0;
1195  for (int i = 0; i < nlines; i++) {
1196  rankcount += lines[i].ranks;
1197  }
1198  if (rankcount > (nprocs - 1)) {
1199  char ee[80];
1200  snprintf(ee, sizeof(ee),
1201  ("Bad lane description,"
1202  " total rank count too large (%d)"),
1203  rankcount);
1204  kmr_error(mr, ee);
1205  abort();
1206  }
1207  }
1208 
1209  /* Check lanes have no large entry. */
1210 
1211  {
1212  for (int d = 0; d < depth; d++) {
1213  for (int i = 0; i < nlines; i++) {
1214  int q = lines[i].colors.v[d];
1215  if (q > (nprocs - 1)) {
1216  char ee[80];
1217  snprintf(ee, sizeof(ee),
1218  ("Bad lane description,"
1219  " lane number too large (%s)"),
1220  kmr_lane_string(lines[i].colors, 0));
1221  kmr_error(mr, ee);
1222  abort();
1223  }
1224  }
1225  }
1226  }
1227  }
1228 
1229  struct kmr_lane_no *allcolors;
1230  if (rank == root) {
1231  allcolors = kmr_malloc(sizeof(struct kmr_lane_no) * (size_t)nprocs);
1232  assert(allcolors != 0);
1233  } else {
1234  allcolors = 0;
1235  }
1236 
1237  if (rank == root) {
1238  /* Translate lane-numbers to distinct colors. */
1239 
1240  for (int d = 1; d < depth; d++) {
1241  int extent;
1242  extent = 0;
1243  for (int i = 0; i < nlines; i++) {
1244  int q = lines[i].colors.v[d];
1245  extent = MAX(extent, (q + 1));
1246  }
1247  for (int i = 0; i < nlines; i++) {
1248  int o = lines[i].colors.v[d - 1] * extent;
1249  lines[i].colors.v[d] = (short)(lines[i].colors.v[d] + o);
1250  }
1251  }
1252 
1253  /* Fill array of colors for ranks. */
1254 
1255  int rankcount;
1256  rankcount = 0;
1257  for (int i = 0; i < nlines; i++) {
1258  assert((rankcount + lines[i].ranks) < nprocs);
1259  for (int j = 0; j < lines[i].ranks; j++) {
1260  allcolors[rankcount + j] = lines[i].colors;
1261  }
1262  rankcount += lines[i].ranks;
1263  }
1264  assert(rankcount < nprocs);
1265  for (int j = rankcount; j < nprocs; j++) {
1266  allcolors[j] = illegalcolor;
1267  }
1268  }
1269  cc = MPI_Bcast(&depth, 1, MPI_INT, root, basecomm);
1270  assert(cc == MPI_SUCCESS);
1271 
1272  struct kmr_lane_no colors;
1273  int sz = (int)sizeof(struct kmr_lane_no);
1274  cc = MPI_Scatter(allcolors, sz, MPI_BYTE, &colors, sz, MPI_BYTE,
1275  root, basecomm);
1276  assert(cc == MPI_SUCCESS);
1277 
1278  /* Split the base communicator from the bottom. */
1279 
1280  for (int d = (depth - 1); d >= 0; d--) {
1281  int color = ((colors.v[d] != -1) ? colors.v[d] : MPI_UNDEFINED);
1282  cc = MPI_Comm_split(basecomm, color, rank, &splitcomms[d]);
1283  assert(cc == MPI_SUCCESS);
1284  assert(color != MPI_UNDEFINED || splitcomms[d] == MPI_COMM_NULL);
1285  }
1286 
1287  if (dump) {
1288  kmr_dump_split_lanes(mr, colors);
1289  }
1290 
1291  if (rank == root) {
1292  kmr_free(lines, (sizeof(struct desc) * (size_t)nlines));
1293  kmr_free(allcolors, (sizeof(struct kmr_lane_no) * (size_t)nprocs));
1294  } else {
1295  assert(lines == 0);
1296  assert(allcolors == 0);
1297  }
1298 
1299  return MPI_SUCCESS;
1300 }
1301 
1302 static void
1303 kmr_dump_split_lanes(KMR *mr, struct kmr_lane_no colors)
1304 {
1305  const MPI_Comm basecomm = mr->comm;
1306  const int nprocs = mr->nprocs;
1307  const int rank = mr->rank;
1308  const int root = 0;
1309  const int depth = KMR_LANE_LEVELS;
1310 
1311  int cc;
1312 
1313  struct kmr_lane_no *allcolors;
1314  if (rank == root) {
1315  allcolors = kmr_malloc(sizeof(struct kmr_lane_no) * (size_t)nprocs);
1316  assert(allcolors != 0);
1317  } else {
1318  allcolors = 0;
1319  }
1320  int sz = (int)sizeof(struct kmr_lane_no);
1321  cc = MPI_Gather(&colors, sz, MPI_BYTE, allcolors, sz, MPI_BYTE,
1322  root, basecomm);
1323  assert(cc == MPI_SUCCESS);
1324 
1325  if (rank == root) {
1326  printf("Split of lanes"
1327  " (displayed by distinct colors assigned to ranks):\n");
1328  for (int d = 0; d < depth; d++) {
1329  printf("color[level=%d]=", d);
1330  for (int i = 0; i < nprocs; i++) {
1331  char col[20];
1332  if (allcolors[i].v[d] != -1) {
1333  snprintf(col, sizeof(col), "%d", allcolors[i].v[d]);
1334  } else {
1335  snprintf(col, sizeof(col), "-");
1336  }
1337  if (i == 0) {
1338  printf("%s", col);
1339  } else {
1340  printf(",%s", col);
1341  }
1342  }
1343  printf("\n");
1344  }
1345  fflush(0);
1346  }
1347 
1348  if (rank == root) {
1349  kmr_free(allcolors, (sizeof(struct kmr_lane_no) * (size_t)nprocs));
1350  } else {
1351  assert(allcolors == 0);
1352  }
1353 }
1354 
1355 static int kmr_check_lane_id(struct kmr_swf *wf, struct kmr_lane_no id,
1356  _Bool admit_any);
1357 static int kmr_color_subcommunicator(struct kmr_swf *wf, MPI_Comm subcomm,
1358  MPI_Comm supercomm);
1359 static int kmr_check_partitioning(struct kmr_swf *wf, int supercolor,
1360  MPI_Comm subcomm);
1361 
1362 /** Assigns a lane-number to a rank (wf->lane_id_on_proc)
1363  (at-all-ranks). It calculates a lane-number from the set of split
1364  communicators. It assumes the master-rank (the last rank) is
1365  excluded from the lanes. */
1366 
1367 static void
1369 {
1370  assert(wf->base_comm != MPI_COMM_NULL);
1371 
1372  MPI_Comm basecomm = wf->base_comm;
1373  MPI_Comm *comms = wf->lane_comms;
1374 
1375  int cc;
1376 
1377  int colors[KMR_LANE_LEVELS];
1378  for (int level = 0; level < KMR_LANE_LEVELS; level++) {
1379  MPI_Comm supercomm = (level == 0 ? basecomm : comms[level - 1]);
1380  MPI_Comm subcomm = comms[level];
1381  if (supercomm == MPI_COMM_NULL) {
1382  colors[level] = -1;
1383  } else {
1384  int color = kmr_color_subcommunicator(wf, subcomm, supercomm);
1385  colors[level] = color;
1386  }
1387  }
1388 
1389  for (int level = 0; level < KMR_LANE_LEVELS; level++) {
1390  MPI_Comm subcomm = comms[level];
1391  if (level > 0 && subcomm != MPI_COMM_NULL) {
1392  int supercolor = colors[level - 1];
1393  cc = kmr_check_partitioning(wf, supercolor, subcomm);
1394  assert(cc == MPI_SUCCESS);
1395  }
1396  if (colors[level] == -1) {
1397  wf->lane_id_on_proc.v[level] = KMR_NO_LANE;
1398  } else {
1399  wf->lane_id_on_proc.v[level] = (short)colors[level];
1400  }
1401  }
1402 
1403  cc = kmr_check_lane_id(wf, wf->lane_id_on_proc, 0);
1404  assert(cc == MPI_SUCCESS);
1405 }
1406 
1407 /** Checks if a sub-communicator is a partitioning of a
1408  super-communicator (at-all-ranks). A SUPERCOLOR gives a distinct
1409  color to each super-communicator, which should be identical in the
1410  sub-communicator. */
1411 
1412 static int
1413 kmr_check_partitioning(struct kmr_swf *wf, int supercolor, MPI_Comm subcomm)
1414 {
1415  const int root = 0;
1416  int cc;
1417 
1418  int nprocs;
1419  int rank;
1420  cc = MPI_Comm_size(subcomm, &nprocs);
1421  assert(cc == MPI_SUCCESS);
1422  cc = MPI_Comm_rank(subcomm, &rank);
1423  assert(cc == MPI_SUCCESS);
1424 
1425  int *colors;
1426  if (rank == root) {
1427  colors = kmr_malloc(sizeof(int) * (size_t)nprocs);
1428  assert(colors != 0);
1429  } else {
1430  colors = 0;
1431  }
1432  cc = MPI_Gather(&supercolor, 1, MPI_INT, colors, 1, MPI_INT,
1433  root, subcomm);
1434  assert(cc == MPI_SUCCESS);
1435  if (rank == root) {
1436  for (int i = 0; i < nprocs; i++) {
1437  if (colors[i] != colors[0]) {
1438  kmr_error(wf->mr, ("Communicators are not partitioning"
1439  " of upper ones"));
1440  abort();
1441  return MPI_ERR_SPAWN;
1442  }
1443  }
1444  kmr_free(colors, (sizeof(int) * (size_t)nprocs));
1445  }
1446  return MPI_SUCCESS;
1447 }
1448 
1449 /** Checks well-formedness of a lane-number. Slots after an any-lane
1450  must be an any-lane or a no-lane. Slots after a no-lane must be a
1451  no-lane. */
1452 
1453 static int
1454 kmr_check_lane_id(struct kmr_swf *wf, struct kmr_lane_no id, _Bool admit_any)
1455 {
1456  KMR *mr = wf->mr;
1457  int state;
1458  int level;
1459  state = 0;
1460  level = -1;
1461  for (int i = 0; i < KMR_LANE_LEVELS; i++) {
1462  if (!admit_any && id.v[i] == KMR_ANY_LANE) {
1463  char ee[80];
1464  snprintf(ee, sizeof(ee), "Bad lane-number (%s): any-lane appear",
1465  kmr_lane_string(id, 1));
1466  kmr_error(mr, ee);
1467  abort();
1468  }
1469  int q = id.v[i];
1470  if (state == 0 && q == KMR_ANY_LANE) {
1471  state = 1;
1472  level = i;
1473  } else if (state == 0 && q == KMR_NO_LANE) {
1474  state = 2;
1475  assert(level == (i - 1));
1476  } else if (state == 0) {
1477  level = i;
1478  } else if (state == 1 && q == KMR_ANY_LANE) {
1479  level = i;
1480  } else if (state == 1 && q == KMR_NO_LANE) {
1481  state = 2;
1482  assert(level == (i - 1));
1483  } else if (state == 1) {
1484  char ee[80];
1485  snprintf(ee, sizeof(ee),
1486  "Bad lane-number (%s): some follow any-lane",
1487  kmr_lane_string(id, 1));
1488  kmr_error(mr, ee);
1489  abort();
1490  } else if (state == 2 && q == KMR_ANY_LANE) {
1491  char ee[80];
1492  snprintf(ee, sizeof(ee),
1493  "Bad lane-number (%s): some follow no-lane",
1494  kmr_lane_string(id, 1));
1495  kmr_error(mr, ee);
1496  abort();
1497  } else if (state == 2 && q == KMR_NO_LANE) {
1498  /* OK. */
1499  } else if (state == 2) {
1500  char ee[80];
1501  snprintf(ee, sizeof(ee),
1502  "Bad lane-number (%s): some follow no-lane",
1503  kmr_lane_string(id, 1));
1504  kmr_error(mr, ee);
1505  abort();
1506  } else {
1507  abort();
1508  }
1509  }
1510  return MPI_SUCCESS;
1511 }
1512 
1513 /** Colors sub-communicators distinctly in a super-communicator, and
1514  returns the color which names names a lane (at-all-ranks). It
1515  enumerates the rank0 processes of the sub-communicators. It
1516  returns -1 for the ranks with a null SUBCOMM. */
1517 
1518 static int
1519 kmr_color_subcommunicator(struct kmr_swf *wf, MPI_Comm subcomm,
1520  MPI_Comm supercomm)
1521 {
1522  assert(supercomm != MPI_COMM_NULL);
1523 
1524  const int root = 0;
1525  int cc;
1526 
1527  int nprocs;
1528  int rank;
1529  cc = MPI_Comm_size(supercomm, &nprocs);
1530  assert(cc == MPI_SUCCESS);
1531  cc = MPI_Comm_rank(supercomm, &rank);
1532  assert(cc == MPI_SUCCESS);
1533 
1534  int *subcommranks;
1535  int *colors;
1536  if (rank == root) {
1537  subcommranks = kmr_malloc(sizeof(int) * (size_t)nprocs);
1538  assert(subcommranks != 0);
1539  colors = kmr_malloc(sizeof(int) * (size_t)nprocs);
1540  assert(colors != 0);
1541  } else {
1542  subcommranks = 0;
1543  colors = 0;
1544  }
1545 
1546  int r;
1547  if (subcomm != MPI_COMM_NULL) {
1548  cc = MPI_Comm_rank(subcomm, &r);
1549  assert(cc == MPI_SUCCESS);
1550  } else {
1551  r = -1;
1552  }
1553  cc = MPI_Gather(&r, 1, MPI_INT, subcommranks, 1, MPI_INT, root, supercomm);
1554  assert(cc == MPI_SUCCESS);
1555 
1556  /* Determine the colors of rank=0 in the subcomm. */
1557 
1558  int ncolors;
1559  if (rank == root) {
1560  ncolors = 0;
1561  for (int i = 0; i < nprocs; i++) {
1562  if (subcommranks[i] == 0) {
1563  colors[i] = ncolors;
1564  ncolors++;
1565  } else {
1566  colors[i] = -1;
1567  }
1568  }
1569  } else {
1570  ncolors = 0;
1571  }
1572 
1573  /* Tell its color to all ranks. */
1574 
1575  int color;
1576  cc = MPI_Scatter(colors, 1, MPI_INT, &color, 1, MPI_INT, root, supercomm);
1577  assert(cc == MPI_SUCCESS);
1578  if (subcomm != MPI_COMM_NULL) {
1579  cc = MPI_Bcast(&color, 1, MPI_INT, 0, subcomm);
1580  assert(cc == MPI_SUCCESS);
1581  }
1582 
1583  if (rank == root) {
1584  kmr_free(subcommranks, (sizeof(int) * (size_t)nprocs));
1585  kmr_free(colors, (sizeof(int) * (size_t)nprocs));
1586  }
1587  return color;
1588 }
1589 
1590 static struct kmr_lane_vector *
1591 kmr_make_bottom_lanes(struct kmr_swf *wf, struct kmr_lane_no *laneids,
1592  struct kmr_pair laneranks[][KMR_LANE_LEVELS]);
1593 static void kmr_bond_all_lanes(struct kmr_swf *wf, struct kmr_lane_vector *v,
1594  struct kmr_pair laneranks[][KMR_LANE_LEVELS]);
1595 static void kmr_bond_sublanes(struct kmr_swf *wf, struct kmr_lane_state *sup,
1596  struct kmr_lane_state *lanes[], int nlanes);
1597 static int kmr_find_leader(struct kmr_swf *wf, struct kmr_lane_state *lane,
1598  int level,
1599  struct kmr_pair laneranks[][KMR_LANE_LEVELS]);
1600 static int kmr_count_bottom_level_lanes(struct kmr_lane_state *lane);
1601 
1602 /** Initializes thelanes at the master rank (at-all-ranks). It
1603  collects lane-numbers of all ranks and makes the lane structures
1604  from the bottom and upwards. */
1605 
1606 static void
1608 {
1609  const int master = wf->master_rank;
1610  const MPI_Comm basecomm = wf->base_comm;
1611 
1612  int cc;
1613 
1614  /* Collect lane-numbers of all ranks. */
1615 
1616  int sz0 = sizeof(struct kmr_lane_no);
1617  struct kmr_lane_no *laneids;
1618  if (wf->rank == master) {
1619  laneids = kmr_malloc((size_t)sz0 * (size_t)wf->nprocs);
1620  assert(laneids != 0);
1621  } else {
1622  laneids = 0;
1623  }
1624  cc = MPI_Gather(&wf->lane_id_on_proc, sz0, MPI_BYTE,
1625  laneids, sz0, MPI_BYTE, master, basecomm);
1626  assert(cc == MPI_SUCCESS);
1627 
1628  if (0) {
1629  if (wf->rank == master) {
1630  printf("Lanes of ranks:\n");
1631  for (int i = 0; i < wf->nprocs; i++) {
1632  char *s = kmr_lane_string(laneids[i], 1);
1633  printf("lane[%d]=%s\n", i, s);
1634  }
1635  fflush(0);
1636  }
1637  }
1638 
1639  /* Collect rank numbers of the lane communicators. */
1640 
1641  struct kmr_pair ranks[KMR_LANE_LEVELS];
1642  for (int i = 0; i < KMR_LANE_LEVELS; i++) {
1643  if (wf->lane_comms[i] != MPI_COMM_NULL) {
1644  int nprocs;
1645  int rank;
1646  cc = MPI_Comm_size(wf->lane_comms[i], &nprocs);
1647  assert(cc == MPI_SUCCESS);
1648  cc = MPI_Comm_rank(wf->lane_comms[i], &rank);
1649  assert(cc == MPI_SUCCESS);
1650  ranks[i].size= nprocs;
1651  ranks[i].rank = rank;
1652  } else {
1653  ranks[i].size = 0;
1654  ranks[i].rank = -1;
1655  }
1656  }
1657  int sz1 = sizeof(struct kmr_pair [KMR_LANE_LEVELS]);
1658  struct kmr_pair (*laneranks)[KMR_LANE_LEVELS];
1659  if (wf->rank == master) {
1660  laneranks = kmr_malloc((size_t)sz1 * (size_t)wf->nprocs);
1661  assert(laneranks != 0);
1662  } else {
1663  laneranks = 0;
1664  }
1665  cc = MPI_Gather(ranks, sz1, MPI_BYTE, laneranks, sz1, MPI_BYTE,
1666  master, basecomm);
1667  assert(cc == MPI_SUCCESS);
1668 
1669  if (wf->rank == master) {
1670  struct kmr_lane_vector *
1671  v = kmr_make_bottom_lanes(wf, laneids, laneranks);
1672  kmr_bond_all_lanes(wf, v, laneranks);
1673 
1674  /* Check simply lane hierarchy. */
1675 
1676  int nbottoms = v->n;
1677  int count0 = kmr_count_bottom_level_lanes(wf->master.top_lane);
1678  assert(count0 == nbottoms);
1679 
1680  kmr_free(laneids, ((size_t)sz0 * (size_t)wf->nprocs));
1681  kmr_free(laneranks, ((size_t)sz1 * (size_t)wf->nprocs));
1682  kmr_free(v, (offsetof(struct kmr_lane_vector, lanes)
1683  + sizeof(struct kmr_lane_state *) * (size_t)v->n));
1684 
1685  /* Check a list of lanes are OK. */
1686 
1687  struct kmr_lane_state *list = wf->master.list_of_all_lanes;
1688  int count1;
1689  count1 = 0;
1690  for (struct kmr_lane_state *p = list; p != 0; p = p->link) {
1691  count1++;
1692  }
1693  assert(count1 == wf->master.top_lane->total_sublanes);
1694  }
1695 }
1696 
1697 /** Makes a lane structure (at-the-master). It is a bottom lane if
1698  NPROCS is non-zero, or a superlane if NPROCS is zero. The
1699  lane-number argument is ignored when creating a top-lane
1700  (level=-1). */
1701 
1702 static struct kmr_lane_state *
1703 kmr_allocate_lane(int level, struct kmr_lane_no id, int nprocs)
1704 {
1705  assert(level != -1 || nprocs == 0);
1706  struct kmr_lane_state *lane = kmr_malloc(sizeof(struct kmr_lane_state));
1707  assert(lane != 0);
1708  memset(lane, 0, sizeof(struct kmr_lane_state));
1709  kmr_clear_lane_id(&(lane->lane_id));
1710  if (level != -1) {
1711  lane->lane_id = id;
1712  }
1713  lane->level = level;
1714  lane->leader_rank = -1;
1715  lane->total_sublanes = 1;
1716  lane->total_ranks = nprocs;
1717  lane->superlane = 0;
1718  lane->sublanes = 0;
1719  if (nprocs != 0) {
1720  lane->workers = kmr_make_rank_vector(nprocs);
1721  assert(lane->workers != 0);
1722  } else {
1723  lane->workers = 0;
1724  }
1725 
1726  lane->icomm = MPI_COMM_NULL;
1727  lane->queue_head.next = 0;
1728  lane->queue_head.item = 0;
1729  lane->queue_insertion_tail = 0;
1730  lane->current_work = 0;
1731  lane->yielding_to_superlane = 0;
1732  lane->n_joined_ranks = 0;
1733  lane->n_running_sublanes = 0;
1734  if (nprocs != 0) {
1735  lane->running_sublanes = kmr_malloc(sizeof(_Bool) * (size_t)nprocs);
1736  assert(lane->running_sublanes != 0);
1737  } else {
1738  lane->running_sublanes = 0;
1739  }
1740 
1741  return lane;
1742 }
1743 
1744 /** Makes lanes at the bottom levels (at-the-master). The depths may
1745  be not eqaul. The created bottom lanes are bonded to a superlane
1746  in kmr_bond_all_lanes(). */
1747 
1748 static struct kmr_lane_vector *
1749 kmr_make_bottom_lanes(struct kmr_swf *wf, struct kmr_lane_no *laneids,
1750  struct kmr_pair laneranks[][KMR_LANE_LEVELS])
1751 {
1752  assert(wf->master.lane_of_workers == 0);
1753  size_t qsz = (sizeof(struct kmr_lane_state *) * (size_t)wf->nprocs);
1754  wf->master.lane_of_workers = kmr_malloc(qsz);
1755  assert(wf->master.lane_of_workers != 0);
1756  memset(wf->master.lane_of_workers, 0, qsz);
1757 
1758  struct kmr_lane_state **lanes = kmr_malloc(qsz);
1759  assert(lanes != 0);
1760  memset(lanes, 0, qsz);
1761  int nlanes;
1762  nlanes = 0;
1763  for (int r = 0; r < wf->nprocs; r++) {
1764  assert(wf->master.lane_of_workers[r] == 0);
1765  struct kmr_lane_no id = laneids[r];
1766  int level = kmr_level_of_lane(id, 0);
1767  if (level == -1) {
1768  /* Skip a null-lane. */
1769  } else {
1770  int nprocsinlane = laneranks[r][level].size;
1771  int rankinlane = laneranks[r][level].rank;
1772  assert(nprocsinlane > 0 && rankinlane != -1);
1773  assert(rankinlane < nprocsinlane);
1774 
1775  /* Find a lane when it was already created. */
1776 
1777  struct kmr_lane_state *lane;
1778  {
1779  int q;
1780  for (q = 0; q < nlanes; q++) {
1781  assert(lanes[q] != 0);
1782  if (kmr_lane_eq(id, lanes[q]->lane_id, level)) {
1783  int l0 = kmr_level_of_lane(lanes[q]->lane_id, 0);
1784  assert(l0 == level);
1785  break;
1786  }
1787  }
1788  if (q < nlanes) {
1789  lane = lanes[q];
1790  } else {
1791  assert(q == nlanes);
1792  lane = kmr_allocate_lane(level, id, nprocsinlane);
1793  assert(lane != 0);
1794  lanes[nlanes] = lane;
1795  nlanes++;
1796  }
1797  }
1798  assert(lane != 0);
1799 
1800  wf->master.lane_of_workers[r] = lane;
1801  assert(lane->workers->ranks[rankinlane] == -1);
1802  lane->workers->ranks[rankinlane] = r;
1803  if (rankinlane == 0) {
1804  assert(lane->leader_rank == -1);
1805  lane->leader_rank = r;
1806  }
1807  }
1808  }
1809 
1810  struct kmr_lane_vector *v = kmr_make_lane_vector(nlanes, lanes);
1811 
1812  /* Check lane->workers->ranks are all set. */
1813 
1814  for (int q = 0; q < v->n; q++) {
1815  struct kmr_lane_state *lane = v->lanes[q];
1816  struct kmr_rank_vector *workers = lane->workers;
1817  for (int i = 0; i < workers->n; i++) {
1818  assert(workers->ranks[i] != -1);
1819  }
1820  }
1821 
1822  return v;
1823 }
1824 
1825 /** Collects lanes to make a superlane, which build up to a single
1826  top-lane (at-the-master). It destructively modifies the passed
1827  vector V. (SLOW). */
1828 
1829 static void
1831  struct kmr_pair laneranks[][KMR_LANE_LEVELS])
1832 {
1833  struct kmr_lane_no nulllane = {.v = {0}};
1834  struct kmr_lane_state *top = kmr_allocate_lane(-1, nulllane, 0);
1835  assert(top != 0);
1836  assert(wf->master.top_lane == 0);
1837  wf->master.top_lane = top;
1838 
1839  int nlanes = v->n;
1840  struct kmr_lane_state **lanes = v->lanes;
1841 
1842  for (int level = (KMR_LANE_LEVELS - 1); level >= 0; level--) {
1843  for (int q = 0; q < nlanes; q++) {
1844  struct kmr_lane_state *lane = lanes[q];
1845  assert(lane == 0 || lane->superlane == 0);
1846  if (lane == 0) {
1847  /* Skip already handled entry. */
1848  } else if (lane->level == level) {
1849  struct kmr_lane_state *sup;
1850  if (level == 0) {
1851  sup = top;
1852  } else {
1853  struct kmr_lane_no id;
1854  id = lane->lane_id;
1855  id.v[level] = KMR_NO_LANE;
1856  sup = kmr_allocate_lane((level - 1), id, 0);
1857  }
1858  assert(sup != 0);
1859  kmr_bond_sublanes(wf, sup, lanes, nlanes);
1860  int leader = kmr_find_leader(wf, sup, sup->level, laneranks);
1861  assert((sup != top) == (leader != -1));
1862  assert(sup->leader_rank == -1);
1863  sup->leader_rank = leader;
1864 
1865  /* Replace a lane with its superlane. */
1866 
1867  assert(lanes[q] == 0);
1868  lanes[q] = sup;
1869  }
1870  }
1871  }
1872 
1873  /* Add the top lane to the list of the all lanes. */
1874 
1875  top->link = wf->master.list_of_all_lanes;
1876  wf->master.list_of_all_lanes = top;
1877 }
1878 
1879 /** Builds a vector of sublanes for the superlane SUP (at-the-master).
1880  It destructively clears LANES[i] to null, when it is merged to a
1881  superlane. */
1882 
1883 static void
1884 kmr_bond_sublanes(struct kmr_swf *wf, struct kmr_lane_state *sup,
1885  struct kmr_lane_state *lanes[], int nlanes)
1886 {
1887  assert(sup->sublanes == 0);
1888 
1889  /* Collect sublanes. */
1890 
1891  struct kmr_lane_state *suptail;
1892  suptail = sup;
1893  int count;
1894  count = 0;
1895  for (int q = 0; q < nlanes; q++) {
1896  struct kmr_lane_state *lane = lanes[q];
1897  if (lane != 0
1898  && (lane->level - 1) == sup->level
1899  && kmr_lane_eq(lane->lane_id, sup->lane_id, sup->level)) {
1900  lanes[q] = 0;
1901  lane->superlane = sup;
1902  assert(lane->link == 0);
1903  suptail->link = lane;
1904  suptail = lane;
1905  sup->total_sublanes += lane->total_sublanes;
1906  sup->total_ranks += lane->total_ranks;
1907  count++;
1908  }
1909  }
1910  assert(count > 0);
1911 
1912  /* Make a sublanes vector. */
1913 
1914  struct kmr_lane_vector *v = kmr_make_lane_vector(count, 0);
1915  int i;
1916  i = 0;
1917  suptail = sup;
1918  while (suptail->link != 0) {
1919  v->lanes[i] = suptail->link;
1920  i++;
1921  suptail = suptail->link;
1922  }
1923  assert(i == count);
1924 
1925  assert(sup->running_sublanes == 0);
1926  sup->running_sublanes = kmr_malloc(sizeof(_Bool) * (size_t)count);
1927  assert(sup->running_sublanes != 0);
1928 
1929  /* List all lanes as a single list. */
1930 
1931  suptail->link = wf->master.list_of_all_lanes;
1932  wf->master.list_of_all_lanes = sup->link;
1933 
1934  sup->link = 0;
1935  sup->sublanes = v;
1936 }
1937 
1938 /** Searches a leader (rank=0) in a LANE at a LEVEL (at-the-master).
1939  It returns a rank in the base communicator or -1 if not found. It
1940  returns -1 for the top lane, because the top lane is never used
1941  for work. (SLOW). */
1942 
1943 static int
1944 kmr_find_leader(struct kmr_swf *wf, struct kmr_lane_state *lane,
1945  int level, struct kmr_pair laneranks[][KMR_LANE_LEVELS])
1946 {
1947  if (lane->level == -1) {
1948  return -1;
1949  } if (lane->sublanes == 0) {
1950  for (int i = 0; i < lane->workers->n; i++) {
1951  int r = lane->workers->ranks[i];
1952 #if 0 /*AHO*/
1953  if (laneranks[r][level].rank == -1) {
1954  printf("AHO lane=%s i=%d r=%d level=%d w=%d\n",
1955  kmr_lane_string(lane->lane_id, 1), i, r, level,
1956  lane->workers->n);
1957  for (int j = 0; j < lane->workers->n; j++) {
1958  printf("AHO ranks=%d\n", lane->workers->ranks[j]);
1959  }
1960  fflush(0);
1961  abort();
1962  }
1963 #endif
1964  assert(laneranks[r][level].rank != -1);
1965  if (laneranks[r][level].rank == 0) {
1966  return r;
1967  }
1968  }
1969  return -1;
1970  } else {
1971  struct kmr_lane_vector *v = lane->sublanes;
1972  for (int i = 0; i < v->n; i++) {
1973  int r = kmr_find_leader(wf, v->lanes[i], level, laneranks);
1974  if (r != -1) {
1975  return r;
1976  }
1977  }
1978  return -1;
1979  }
1980 }
1981 
1982 /** Counts the number of the bottom level lanes (at-the-master). */
1983 
1984 static int
1986 {
1987  if (lane->sublanes == 0) {
1988  return 1;
1989  } else {
1990  int count;
1991  count = 0;
1992  for (int i = 0; i < lane->sublanes->n; i++) {
1993  count += kmr_count_bottom_level_lanes(lane->sublanes->lanes[i]);
1994  }
1995  return count;
1996  }
1997 }
1998 
1999 static int kmr_dequeue_scattered_work(struct kmr_swf *wf,
2000  struct kmr_lane_state *lane,
2001  struct kmr_work_item *x);
2002 
2003 /** Frees a lane and its sublanes, recursively (at-the-master). */
2004 
2005 static void
2006 kmr_free_lanes(struct kmr_swf *wf, struct kmr_lane_state *lane)
2007 {
2008 
2009  /* Assert if some work-items remain. */
2010 
2011  assert(lane->icomm == MPI_COMM_NULL);
2012  if (lane->sublanes != 0) {
2013  struct kmr_lane_vector *v = lane->sublanes;
2014  for (int i = 0; i < v->n; i++) {
2015  assert(v->lanes[i]->queue_head.next == 0);
2016  }
2017  }
2018 
2019  assert(lane->current_work == 0);
2020  assert(lane->yielding_to_superlane == 0);
2021  assert(lane->queue_insertion_tail == 0);
2022 
2023  int nsubs = ((lane->sublanes != 0) ? lane->sublanes->n : lane->workers->n);
2024 
2025  assert(lane->running_sublanes != 0);
2026  kmr_free(lane->running_sublanes, (sizeof(_Bool) * (size_t)nsubs));
2027  lane->running_sublanes = 0;
2028 
2029  if (lane->sublanes != 0) {
2030  struct kmr_lane_vector *v = lane->sublanes;
2031  for (int i = 0; i < v->n; i++) {
2032  kmr_free_lanes(wf, v->lanes[i]);
2033  v->lanes[i] = 0;
2034  }
2035  kmr_free(v, (offsetof(struct kmr_lane_vector, lanes)
2036  + (sizeof(struct kmr_lane_state *) * (size_t)v->n)));
2037  lane->sublanes = 0;
2038  }
2039  if (lane->workers != 0) {
2040  struct kmr_rank_vector *u = lane->workers;
2041  kmr_free(u, (offsetof(struct kmr_rank_vector, ranks)
2042  + (sizeof(int) * (size_t)u->n)));
2043  lane->workers = 0;
2044  }
2045 
2046  kmr_free(lane, sizeof(struct kmr_lane_state));
2047 }
2048 
2049 static void kmr_dump_sublanes(struct kmr_swf *wf, struct kmr_lane_state *lane);
2050 
2051 /** Dumps lanes created by kmr_init_swf(). */
2052 
2053 void
2055 {
2056  kmr_err_when_swf_is_not_initialized(mr);
2057  struct kmr_swf *wf = mr->simple_workflow;
2058  if (wf->rank == wf->master_rank) {
2059  kmr_dump_sublanes(wf, wf->master.top_lane);
2060  }
2061 }
2062 
2063 static void
2064 kmr_dump_sublanes(struct kmr_swf *wf, struct kmr_lane_state *lane)
2065 {
2066  const int master = wf->master_rank;
2067  if (wf->rank == master) {
2068  if (lane->workers != 0) {
2069  struct kmr_rank_vector *u = lane->workers;
2070  printf("lane %s : ranks[%d]=(",
2071  kmr_lane_string(lane->lane_id, 0), u->n);
2072  for (int i = 0; i < u->n; i++) {
2073  char *separator = (i == 0 ? "" : ",");
2074  printf("%s%d", separator, u->ranks[i]);
2075  }
2076  printf(")\n");
2077  }
2078  if (lane->sublanes != 0) {
2079  struct kmr_lane_vector *v = lane->sublanes;
2080  for (int i = 0; i < v->n; i++) {
2081  kmr_dump_sublanes(wf, v->lanes[i]);
2082  }
2083  }
2084  }
2085 }
2086 
2087 /* ================================================================ */
2088 
2089 /* Scheduler of Workflow */
2090 
2091 static struct kmr_work_item *
2092 kmr_make_work_item(struct kmr_swf *wf, struct kmr_lane_no id,
2093  const char *args, size_t argssize,
2094  int seq, _Bool separatorspace);
2095 static int kmr_enqueue_work(struct kmr_swf *wf, struct kmr_lane_state *lane,
2096  struct kmr_work_item *x, _Bool multiple_any);
2097 static int kmr_link_work(struct kmr_swf *wf, struct kmr_lane_state *lane,
2098  struct kmr_work_item *x);
2099 static void kmr_preset_lane_state(struct kmr_swf *wf, _Bool queuing);
2100 static void kmr_check_work_queues_empty(struct kmr_swf *wf);
2101 
2102 /** Maps with a simple workflow. The ranks are configured as lanes,
2103  which should be initialized by kmr_init_swf() in advance. The key
2104  part specifies the lane like "3.3.3", and the value part specifies
2105  the command-line arguments. The work-items in a lane run in the
2106  FIFO order. The lane specification can be an any-lane using a
2107  wildcard like "3.3.*". The higher level lane blocks the sublanes,
2108  thus, for example, an entry with the lane "3.3" blocks the
2109  following entries with the lanes "3.3.*". */
2110 
2111 int
2112 kmr_map_swf(KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
2113  struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
2114 {
2115  kmr_assert_kvs_ok(kvi, kvo, 1, 0);
2116  assert(kvi->c.key_data == KMR_KV_OPAQUE
2117  || kvi->c.key_data == KMR_KV_CSTRING);
2118  assert(kvi->c.value_data == KMR_KV_OPAQUE
2119  || kvi->c.value_data == KMR_KV_CSTRING);
2120  assert(kvi->c.element_count <= INT_MAX);
2121 
2122  KMR * const mr = kvi->c.mr;
2123  kmr_err_when_swf_is_not_initialized(mr);
2124  struct kmr_swf *wf = mr->simple_workflow;
2125  _Bool tracing5 = (wf->mr->trace_map_spawn && (5 <= wf->mr->verbosity));
2126 
2127  if (wf->rank != wf->master_rank) {
2128  kmr_error(mr, "Non-master rank calls kmr_map_swf()");
2129  abort();
2130  }
2131 
2132  int cc;
2133 
2134  /* Clear the old history if it remains. */
2135 
2136  if (wf->master.history_head.next != 0) {
2137  assert(wf->master.record_history);
2138  assert(wf->master.history_insertion_tail != 0);
2139  kmr_free_work_list(wf, wf->master.history_head.next, 0, 0);
2140  wf->master.history_head.next = 0;
2141  wf->master.history_insertion_tail = 0;
2142  }
2143 
2144  wf->master.record_history = mr->swf_record_history;
2145  if (wf->master.record_history) {
2146  wf->master.history_insertion_tail = &wf->master.history_head;
2147  }
2148 
2149  /* Enqueue work-items. */
2150 
2151  if (tracing5) {
2152  fprintf(stderr, ";;KMR [%05d] kmr_map_swf:"
2153  " Queue work-items.\n", wf->rank);
2154  fflush(0);
2155  }
2156 
2157  kmr_preset_lane_state(wf, 1);
2158 
2159  {
2160 
2161  int count = (int)kvi->c.element_count;
2162 
2163  /* Scan key-value pairs and put them in the queues. */
2164 
2165  kvi->c.current_block = kvi->c.first_block;
2166  struct kmr_kvs_entry *e;
2167  e = kmr_kvs_first_entry(kvi, kvi->c.first_block);
2168  for (int i = 0; i < count; i++) {
2169  struct kmr_kv_box kv = kmr_pick_kv(e, kvi);
2170 
2171  if (kv.vlen >= (int)wf->args_size) {
2172  char ee[80];
2173  snprintf(ee, sizeof(ee),
2174  "Command string (length=%d) too long (%s)",
2175  kv.vlen, kv.v.p);
2176  kmr_error(mr, ee);
2177  abort();
2178  return MPI_ERR_SPAWN;
2179  }
2180 
2181  _Bool separatorspace = opt.separator_space;
2182  struct kmr_lane_no id = kmr_name_lane(wf->mr, kv.k.p);
2183  struct kmr_work_item *
2184  x = kmr_make_work_item(wf, id, kv.v.p, (size_t)kv.vlen,
2185  i, separatorspace);
2186  assert(x != 0);
2187 
2188  cc = kmr_enqueue_work(wf, wf->master.top_lane, x, 1);
2189  assert(cc == MPI_SUCCESS);
2190 
2191  e = kmr_kvs_next(kvi, e, 0);
2192  }
2193  }
2194 
2195  /* Initialize lanes for start. */
2196 
2197  kmr_preset_lane_state(wf, 0);
2198 
2199  /* Start workers. */
2200 
2201  if (tracing5) {
2202  fprintf(stderr, ";;KMR [%05d] kmr_map_swf:"
2203  " Request workers to start.\n", wf->rank);
2204  fflush(0);
2205  }
2206 
2207  cc = kmr_activate_workers(wf, 0);
2208  assert(cc == MPI_SUCCESS);
2209 
2210  assert(wf->rank == wf->master_rank);
2211  for (;;) {
2212  cc = kmr_handle_worker_request(wf, 0);
2213  if (cc == MPI_SUCCESS) {
2214  break;
2215  }
2216  }
2217 
2218  kmr_check_work_queues_empty(wf);
2219  kmr_free_kvs(kvi);
2220 
2221  if (tracing5) {
2222  fprintf(stderr, ";;KMR [%05d] Master finished"
2223  " (Workers will be in undefined state).\n", wf->rank);
2224  fflush(0);
2225  }
2226 
2227  return MPI_SUCCESS;
2228 }
2229 
2230 /* Allocates and fills a work-item. It scans the arguments for
2231  "maxprocs=", and converts the separator of the arguments to a null
2232  character. */
2233 
2234 static struct kmr_work_item *
2235 kmr_make_work_item(struct kmr_swf *wf, struct kmr_lane_no id,
2236  const char *args, size_t argssize,
2237  int seq, _Bool separatorspace)
2238 {
2239  char *name = "kmr_map_swf";
2240  struct kmr_lane_no nulllane;
2241  kmr_clear_lane_id(&nulllane);
2242 
2243  int cc;
2244 
2245  char *argsbuf = kmr_malloc(argssize);
2246  memcpy(argsbuf, args, argssize);
2247 
2248  int maxargc;
2249  cc = kmr_scan_argv_strings(wf->mr, argsbuf, argssize, 0,
2250  &maxargc, 0, separatorspace, name);
2251  assert(cc == MPI_SUCCESS);
2252 
2253  char **argv0 = kmr_malloc(sizeof(char *) * (size_t)(maxargc + 1));
2254  memset(argv0, 0, (sizeof(char *) * (size_t)(maxargc + 1)));
2255 
2256  int argc;
2257  cc = kmr_scan_argv_strings(wf->mr, argsbuf, argssize, (maxargc + 1),
2258  &argc, argv0, separatorspace, name);
2259  assert(cc == MPI_SUCCESS);
2260  assert(maxargc == argc);
2261 
2262  char **argv;
2263  argv = argv0;
2264 
2265  /* Check if the "MAXPROCS=" string in the arguments. */
2266 
2267  int nprocs;
2268  if (argc > 0 && strncmp("maxprocs=", argv[0], 9) == 0) {
2269  int v;
2270  cc = kmr_parse_int(&argv[0][9], &v);
2271  if (cc == 1 || v >= 0) {
2272  nprocs = v;
2273  argc--;
2274  argv++;
2275  } else {
2276  char ee[80];
2277  snprintf(ee, sizeof(ee), "Bad maxprocs string (%s)", argv[0]);
2278  kmr_error(wf->mr, ee);
2279  abort();
2280  }
2281  } else {
2282  nprocs = 0;
2283  }
2284 
2285  size_t asz;
2286  {
2287  asz = 0;
2288  for (int i = 0; i < argc; i++) {
2289  size_t sz = (strlen(argv[i]) + 1);
2290  asz += sz;
2291  }
2292  /* Add the last empty string. */
2293  asz++;
2294  assert(asz <= (argssize + 1));
2295  }
2296 
2297  size_t msz = (offsetof(struct kmr_spawn_work, args) + asz);
2298  size_t xsz = (offsetof(struct kmr_work_item, work) + msz);
2299  struct kmr_work_item *x = kmr_malloc(xsz);
2300  assert(x != 0);
2301 
2302  /* Fill slots of a work-item. */
2303 
2304  x->req = KMR_SPAWN_WORK;
2305  x->requesting_lane = id;
2306  x->assigned_lane = nulllane;
2307  x->level = kmr_level_of_lane(id, 1);
2308  x->sequence_no = seq;
2309 
2310  x->work.req = KMR_SPAWN_WORK;
2311  x->work.protocol_version = KMR_SPAWN_MAGIC;
2312  x->work.message_size = (int)msz;
2313  x->work.subworld = x->level;
2314  x->work.color = kmr_color_of_lane(id);
2315  x->work.nprocs = nprocs;
2316  x->work.print_trace = 0;
2317 
2318  {
2319  char *p;
2320  p = x->work.args;
2321  for (int i = 0; i < argc; i++) {
2322  size_t sz = (strlen(argv[i]) + 1);
2323  memcpy(p, argv[i], sz);
2324  p += sz;
2325  }
2326  *p = 0;
2327  p++;
2328  assert((size_t)(p - x->work.args) == asz);
2329  }
2330 
2331  if (0) {
2332  fprintf(stderr,
2333  (";;KMR [%05d] kmr_map_swf:"
2334  " argc=%d siz=%d\n"), wf->rank, argc, (int)asz);
2335  for (int i = 0; i < argc; i++) {
2336  fprintf(stderr, "%s\n", argv[i]);
2337  }
2338  fflush(0);
2339  }
2340 
2341  kmr_free(argv0, (sizeof(char *) * (size_t)(maxargc + 1)));
2342  kmr_free(argsbuf, argssize);
2343 
2344  return x;
2345 }
2346 
2347 /** Enqueues a work-item in some sublane of a LANE (at-the-master).
2348  Note it also puts the work-item in the sublanes below, which will
2349  block the lanes for yielding them for a superlane. */
2350 
2351 static int
2352 kmr_enqueue_work(struct kmr_swf *wf, struct kmr_lane_state *lane,
2353  struct kmr_work_item *x, _Bool multipleany)
2354 {
2355  int cc;
2356 
2357  int queuing = (lane->level + 1);
2358  if (lane->sublanes == 0) {
2359  char ee[80];
2360  snprintf(ee, sizeof(ee),
2361  ("Bad lane specified; nonexisting level"
2362  " (lane=%s at level=%d)"),
2363  kmr_lane_string(x->requesting_lane, 0), queuing);
2364  kmr_error(wf->mr, ee);
2365  abort();
2366  return MPI_ERR_SPAWN;
2367  } else {
2368  assert(x->level >= queuing);
2369  struct kmr_lane_vector *v = lane->sublanes;
2370  int q = x->requesting_lane.v[queuing];
2371  assert(q != KMR_NO_LANE);
2372  if (0 <= q && q < v->n && (x->level > queuing)) {
2373  cc = kmr_enqueue_work(wf, v->lanes[q], x, multipleany);
2374  return cc;
2375  } else if (0 <= q && q < v->n) {
2376  assert(x->level == queuing);
2377  cc = kmr_link_work(wf, v->lanes[q], x);
2378  return cc;
2379  } else if (q == KMR_ANY_LANE && (x->level == queuing || multipleany)) {
2380  for (int i = 0; i < v->n; i++) {
2381  cc = kmr_link_work(wf, v->lanes[i], x);
2382  if (cc != MPI_SUCCESS) {
2383  break;
2384  }
2385  }
2386  return cc;
2387  } else if (q == KMR_ANY_LANE) {
2388  assert(x->level > queuing && !multipleany);
2389  char ee[80];
2390  snprintf(ee, sizeof(ee),
2391  ("Bad lane specified; multiple/non-tail any-lane"
2392  " (lane=%s at level=%d)"),
2393  kmr_lane_string(x->requesting_lane, 0), queuing);
2394  kmr_error(wf->mr, ee);
2395  abort();
2396  return MPI_ERR_SPAWN;
2397  } else {
2398  char ee[80];
2399  snprintf(ee, sizeof(ee),
2400  ("Bad lane specified; index exceeds size"
2401  " (lane=%s at level=%d)"),
2402  kmr_lane_string(x->requesting_lane, 0), queuing);
2403  kmr_error(wf->mr, ee);
2404  abort();
2405  return MPI_ERR_SPAWN;
2406  }
2407  }
2408 }
2409 
2410 static int
2411 kmr_link_work(struct kmr_swf *wf, struct kmr_lane_state *lane,
2412  struct kmr_work_item *x)
2413 {
2414  int cc;
2415  struct kmr_work_list *p = kmr_malloc(sizeof(struct kmr_work_list));
2416  assert(p != 0);
2417  p->next = 0;
2418  p->item = x;
2419  assert(lane->queue_insertion_tail != 0);
2420  assert(lane->queue_insertion_tail->next == 0);
2421  lane->queue_insertion_tail->next = p;
2422  lane->queue_insertion_tail = p;
2423 
2424  struct kmr_lane_vector *v = lane->sublanes;
2425  if (v != 0) {
2426  for (int i = 0; i < v->n; i++) {
2427  cc = kmr_link_work(wf, v->lanes[i], x);
2428  assert(cc == MPI_SUCCESS);
2429  }
2430  }
2431  return MPI_SUCCESS;
2432 }
2433 
2434 /* Sets the lanes as all working, so as to receive for an initial
2435  message which acts as a done message from the workers. */
2436 
2437 static void
2438 kmr_preset_lane_state(struct kmr_swf *wf, _Bool queuing)
2439 {
2440  struct kmr_lane_state *h = wf->master.list_of_all_lanes;
2441  for (struct kmr_lane_state *lane = h; lane != 0; lane = lane->link) {
2442  assert((lane->sublanes != 0) || (lane->workers != 0));
2443 
2444  if (queuing) {
2445  /* Open the work-item queue. */
2446  assert(lane->queue_insertion_tail == 0);
2447  lane->queue_insertion_tail = &lane->queue_head;
2448  } else {
2449  /* Close the work-item queue, it will not be added. */
2450  lane->queue_insertion_tail = 0;
2451 
2452  /* Mark a lane as running and add a dummy work. */
2453 
2454  assert(lane->running_sublanes != 0);
2455  int nsubs = ((lane->sublanes != 0)
2456  ? lane->sublanes->n : lane->workers->n);
2457  lane->n_running_sublanes = nsubs;
2458  for (int i = 0; i < nsubs; i++) {
2459  lane->running_sublanes[i] = 1;
2460  }
2461 
2462  if (lane->workers != 0) {
2463  struct kmr_lane_no id = lane->lane_id;
2464 
2465  size_t msz = (offsetof(struct kmr_spawn_work, args) + 0);
2466  size_t xsz = (offsetof(struct kmr_work_item, work) + msz);
2467  struct kmr_work_item *x = kmr_malloc(xsz);
2468  assert(x != 0);
2469  memset(x, 0, xsz);
2470 
2471  /* (Mark as a dummy with KMR_SPAWN_NONE). */
2472  x->req = KMR_SPAWN_NONE;
2473  x->requesting_lane = id;
2474  x->assigned_lane = id;
2475  x->level = kmr_level_of_lane(id, 1);
2476  x->work.req = KMR_SPAWN_NONE;
2477  x->work.protocol_version = KMR_SPAWN_MAGIC;
2478  x->work.message_size = (int)msz;
2479  x->work.subworld = x->level;
2480  x->work.color = kmr_color_of_lane(id);
2481  x->work.nprocs = lane->workers->n;
2482  x->work.print_trace = 0;
2483 
2484  assert(lane->current_work == 0);
2485  lane->current_work = x;
2486  }
2487  }
2488  }
2489 }
2490 
2491 static int kmr_schedule_lanes(struct kmr_swf *wf, struct kmr_lane_state *lane);
2492 static int kmr_start_lanes(struct kmr_swf *wf, struct kmr_lane_state *lane,
2493  struct kmr_work_item *x);
2494 static int kmr_finish_current_work(struct kmr_swf *wf,
2495  struct kmr_lane_state *lane);
2496 static struct kmr_work_item *kmr_dequeue_work(struct kmr_lane_state *lane);
2497 static int kmr_ckeck_sublanes_empty(struct kmr_swf *wf,
2498  struct kmr_lane_state *lane);
2499 static void kmr_record_in_history(struct kmr_swf *wf, struct kmr_work_item *x);
2500 
2501 /** Schedules a next work-item when a worker or a sublane finishes
2502  (at-the-master). It returns MPI_SUCCESS when the all of its
2503  workers and sublanes finish, or MPI_ERR_PENDING when some
2504  work-items are running. Scheduling of the lanes works in a
2505  bottom-up mannar, and is implemented by two functions
2506  kmr_yield_for_lane() and kmr_schedule_lanes().
2507  kmr_yield_for_lane() propagates the state upwards, and
2508  kmr_schedule_lanes() starts a work-item on the lane.
2509  kmr_yield_for_lane() is called when one of the workers or the
2510  sublanes finishes. kmr_schedule_lanes() is called when the lane
2511  is free (no running workers nor running sublanes) to start the
2512  lane. A call to kmr_yield_for_lane() first enters in a
2513  bottom-level lane and ascends its superlanes upwards. Note that
2514  the current work-item can be nothing. */
2515 
2516 static int
2517 kmr_yield_for_lane(struct kmr_swf *wf, struct kmr_lane_state *lane,
2518  int sublaneindex)
2519 {
2520  int cc;
2521 
2522  assert(lane->n_running_sublanes > 0);
2523  assert(lane->sublanes == 0 || (sublaneindex < lane->sublanes->n));
2524  assert(lane->workers == 0 || (sublaneindex < lane->workers->n));
2525  assert(lane->running_sublanes[sublaneindex] != 0);
2526  lane->running_sublanes[sublaneindex] = 0;
2527 
2528  lane->n_running_sublanes--;
2529  if (lane->n_running_sublanes != 0) {
2530  return MPI_ERR_PENDING;
2531  } else {
2532  cc = kmr_finish_current_work(wf, lane);
2533  assert(cc == MPI_SUCCESS);
2534  if (lane->superlane == 0) {
2535  /* ALL DONE FOR THE TOP-LANE. */
2536  return MPI_SUCCESS;
2537  } else {
2538  cc = kmr_schedule_lanes(wf, lane);
2539  if (cc == MPI_ERR_PENDING) {
2540  return MPI_ERR_PENDING;
2541  } else {
2542  /* Propagate the free-state upwards. */
2543  assert(cc == MPI_SUCCESS);
2544  int s = kmr_find_sublane_index(wf, lane);
2545  cc = kmr_yield_for_lane(wf, lane->superlane, s);
2546  return cc;
2547  }
2548  }
2549  }
2550 }
2551 
2552 /** Scehdules a lane for a next work-item (at-the-master). There is
2553  no work-items currently running in the lane and its sublanes. It
2554  is called by kmr_yield_for_lane(). */
2555 
2556 static int
2557 kmr_schedule_lanes(struct kmr_swf *wf, struct kmr_lane_state *lane)
2558 {
2559  _Bool tracing5 = (wf->mr->trace_map_spawn && (5 <= wf->mr->verbosity));
2560 
2561  int cc;
2562 
2563  assert(lane->superlane != 0);
2564  assert(lane->n_running_sublanes == 0);
2565  assert(lane->current_work == 0);
2566  assert(lane->yielding_to_superlane == 0);
2567 
2568  struct kmr_work_item *x = kmr_dequeue_work(lane);
2569  if (x != 0) {
2570  assert(x->level <= lane->level);
2571 
2572  /* Confirm all sublanes have yielded. */
2573 
2574  struct kmr_lane_vector *v = lane->sublanes;
2575  if (v != 0) {
2576  for (int i = 0; i < v->n; i++) {
2577  assert(x == v->lanes[i]->yielding_to_superlane);
2578  v->lanes[i]->yielding_to_superlane = 0;
2579  }
2580  }
2581 
2582  if (x->level < lane->level) {
2583  /* Yield this lane for the superlane. */
2584  assert(lane->superlane != 0);
2585  assert(lane->yielding_to_superlane == 0);
2586  lane->yielding_to_superlane = x;
2587  int s = kmr_find_sublane_index(wf, lane);
2588  cc = kmr_yield_for_lane(wf, lane->superlane, s);
2589  return cc;
2590  } else {
2591  /* Start workers. */
2592  if (tracing5) {
2593  fprintf(stderr, ";;KMR [%05d] Start a work (lane=%s).\n",
2594  wf->rank, kmr_lane_string(lane->lane_id, 0));
2595  fflush(0);
2596  }
2597  assert(x->level == lane->level);
2598  x->assigned_lane = lane->lane_id;
2599  x->work.color = kmr_color_of_lane(lane->lane_id);
2600  x->work.print_trace = (tracing5 != 0);
2601  cc = kmr_dequeue_scattered_work(wf, lane, x);
2602  assert(cc == MPI_SUCCESS);
2603  cc = kmr_start_lanes(wf, lane, x);
2604  assert(cc == MPI_SUCCESS);
2605  kmr_record_in_history(wf, x);
2606  return MPI_ERR_PENDING;
2607  }
2608  } else {
2609  cc = kmr_ckeck_sublanes_empty(wf, lane);
2610  assert(cc == MPI_SUCCESS);
2611 
2612 #if 0
2613  /* Schedule sublanes. */
2614  struct kmr_lane_vector *v = lane->sublanes;
2615  if (v != 0) {
2616  for (int i = 0; i < v->n; i++) {
2617  cc = kmr_schedule_lanes(wf, v->lanes[i]);
2618  if (cc == MPI_ERR_PENDING) {
2619  lane->n_running_sublanes++;
2620  }
2621  }
2622  }
2623  if (lane->n_running_sublanes != 0) {
2624  return MPI_ERR_PENDING;
2625  }
2626 #endif
2627 
2628  if (tracing5) {
2629  fprintf(stderr,
2630  ";;KMR [%05d] workflow lane done (lane=%s).\n",
2631  wf->rank, kmr_lane_string(lane->lane_id, 0));
2632  fflush(0);
2633  }
2634  assert(lane->superlane != 0);
2635  return MPI_SUCCESS;
2636  }
2637 }
2638 
2639 static int
2640 kmr_ckeck_sublanes_empty(struct kmr_swf *wf, struct kmr_lane_state *lane)
2641 {
2642  int cc;
2643 
2644  struct kmr_work_list *h = lane->queue_head.next;
2645  if (h != 0) {
2646  return MPI_ERR_PENDING;
2647  } else if (lane->sublanes != 0) {
2648  struct kmr_lane_vector *v = lane->sublanes;
2649  for (int i = 0; i < v->n; i++) {
2650  cc = kmr_ckeck_sublanes_empty(wf, v->lanes[i]);
2651  if (cc == MPI_ERR_PENDING) {
2652  return MPI_ERR_PENDING;
2653  }
2654  }
2655  }
2656  return MPI_SUCCESS;
2657 }
2658 
2659 static struct kmr_work_item *
2660 kmr_dequeue_work(struct kmr_lane_state *lane)
2661 {
2662  struct kmr_work_list *h = lane->queue_head.next;
2663  if (h == 0) {
2664  return 0;
2665  } else {
2666  lane->queue_head.next = h->next;
2667  struct kmr_work_item *x = h->item;
2668  kmr_free(h, sizeof(struct kmr_work_list));
2669  return x;
2670  }
2671 }
2672 
2673 static int kmr_remove_work(struct kmr_swf *wf, struct kmr_lane_state *lane,
2674  struct kmr_work_item *x);
2675 
2676 /** Removes all occurrences of a work-item (which may be scattered for
2677  an any-lane) from the all queues. */
2678 
2679 static int
2681  struct kmr_work_item *x)
2682 {
2683  struct kmr_lane_no id = x->requesting_lane;
2684  int cc;
2685  struct kmr_lane_state *sup;
2686  sup = lane;
2687  while (sup->level >= 0 && id.v[sup->level] == KMR_ANY_LANE) {
2688  sup = lane->superlane;
2689  }
2690  cc = kmr_remove_work(wf, sup, x);
2691  assert(cc == MPI_SUCCESS);
2692  return MPI_SUCCESS;
2693 }
2694 
2695 static int
2696 kmr_remove_work(struct kmr_swf *wf, struct kmr_lane_state *lane,
2697  struct kmr_work_item *x)
2698 {
2699  struct kmr_lane_vector *v = lane->sublanes;
2700  if (v != 0) {
2701  for (int i = 0; i < v->n; i++) {
2702  kmr_remove_work(wf, v->lanes[i], x);
2703  }
2704  }
2705  struct kmr_work_list *h = &(lane->queue_head);
2706  for (struct kmr_work_list *q = h; (q != 0 && q->next != 0); q = q->next) {
2707  struct kmr_work_list *p = q->next;
2708  if (p->item == x) {
2709  q->next = p->next;
2710  kmr_free(p, sizeof(struct kmr_work_list));
2711  }
2712  }
2713  return MPI_SUCCESS;
2714 }
2715 
2716 /** Requests workers to start a work for a lane and its sublanes, and
2717  then connects to workers (at-the-master). */
2718 
2719 static int
2720 kmr_start_lanes(struct kmr_swf *wf, struct kmr_lane_state *lane,
2721  struct kmr_work_item *x)
2722 {
2723  assert(x->req == KMR_SPAWN_WORK);
2724  MPI_Comm basecomm = wf->base_comm;
2725 
2726  int cc;
2727 
2728  assert(lane->current_work == 0);
2729  lane->current_work = x;
2730 
2731  if (lane->sublanes != 0) {
2732  struct kmr_lane_vector *v = lane->sublanes;
2733  assert(lane->n_running_sublanes == 0);
2734  lane->n_running_sublanes = v->n;
2735  for (int i = 0; i < v->n; i++) {
2736  cc = kmr_start_lanes(wf, v->lanes[i], x);
2737  assert(cc == MPI_SUCCESS);
2738  assert(lane->running_sublanes[i] == 0);
2739  lane->running_sublanes[i] = 1;
2740  }
2741  } else {
2742  struct kmr_rank_vector *u = lane->workers;
2743  assert(lane->n_running_sublanes == 0);
2744  lane->n_running_sublanes = u->n;
2745  for (int i = 0; i < u->n; i++) {
2746  cc = kmr_start_worker(&x->work, (size_t)x->work.message_size,
2747  u->ranks[i], basecomm);
2748  assert(cc == MPI_SUCCESS);
2749  assert(lane->running_sublanes[i] == 0);
2750  lane->running_sublanes[i] = 1;
2751  }
2752  assert(cc == MPI_SUCCESS);
2753  }
2754 
2755  _Bool mainlane = (lane->level == x->level);
2756  if (mainlane) {
2757  assert(lane->icomm == MPI_COMM_NULL);
2758  cc = kmr_join_to_workers(wf, lane);
2759  assert(cc == MPI_SUCCESS);
2760  }
2761  return MPI_SUCCESS;
2762 }
2763 
2764 /* Finishes the current work-item. It is called when the all workers
2765  or the all sublanes finish. Note that the current work-item can be
2766  nothing, when its sublanes are just scheduled. */
2767 
2768 static int
2769 kmr_finish_current_work(struct kmr_swf *wf, struct kmr_lane_state *lane)
2770 {
2771  int cc;
2772 
2773  if (lane->current_work != 0) {
2774  struct kmr_work_item *x = lane->current_work;
2775  lane->current_work = 0;
2776 
2777  _Bool mainlane = (lane->level == x->level);
2778  if (mainlane) {
2779  _Bool dummy_initial_work_item = (x->req == KMR_SPAWN_NONE);
2780  if (dummy_initial_work_item) {
2781  size_t xsz = (offsetof(struct kmr_work_item, work)
2782  + (size_t)x->work.message_size);
2783  kmr_free(x, xsz);
2784  } else {
2785  assert(lane->icomm != MPI_COMM_NULL);
2786  cc = MPI_Comm_free(&lane->icomm);
2787  assert(cc == MPI_SUCCESS);
2788 
2789  /* A work-item cannot be freed, when it may be linked
2790  from the history. */
2791 
2792  if (wf->master.history_insertion_tail == 0) {
2793  assert(!wf->master.record_history);
2794  size_t xsz = (offsetof(struct kmr_work_item, work)
2795  + (size_t)x->work.message_size);
2796  kmr_free(x, xsz);
2797  }
2798  }
2799  }
2800  }
2801  return MPI_SUCCESS;
2802 }
2803 
2804 static void
2805 kmr_free_work_list(struct kmr_swf *wf, struct kmr_work_list *h,
2806  struct kmr_lane_state *lane, _Bool warn)
2807 {
2808  assert(!warn || lane != 0);
2809 
2810  struct kmr_work_list *p;
2811  p = h;
2812  while (p != 0) {
2813  struct kmr_work_list *q = p;
2814  struct kmr_work_item *x = q->item;
2815  p = p->next;
2816 
2817  if (warn) {
2818  char ee[80];
2819  snprintf(ee, sizeof(ee),
2820  "Some work-items remain queued (%s)",
2821  x->work.args);
2822  kmr_warning(wf->mr, 1, ee);
2823  }
2824 
2825  if (lane != 0) {
2826  kmr_dequeue_scattered_work(wf, lane, x);
2827  }
2828  size_t xsz = (offsetof(struct kmr_work_item, work)
2829  + (size_t)x->work.message_size);
2830  kmr_free(x, xsz);
2831  kmr_free(q, sizeof(struct kmr_work_list));
2832  }
2833 }
2834 
2835 static void
2836 kmr_check_work_queues_empty(struct kmr_swf *wf)
2837 {
2838  _Bool somenonempty;
2839  somenonempty = 0;
2840  struct kmr_lane_state *h = wf->master.list_of_all_lanes;
2841  for (struct kmr_lane_state *lane = h; lane != 0; lane = lane->link) {
2842  if (lane->queue_head.next != 0) {
2843  somenonempty |= 1;
2844  kmr_free_work_list(wf, lane->queue_head.next, lane, 1);
2845  lane->queue_head.next = 0;
2846  }
2847  }
2848  if (somenonempty) {
2849  kmr_error(wf->mr, "Some work-items remain queued");
2850  abort();
2851  }
2852 }
2853 
2854 static void
2855 kmr_record_in_history(struct kmr_swf *wf, struct kmr_work_item *x)
2856 {
2857  if (wf->master.record_history) {
2858  assert(wf->master.history_insertion_tail != 0);
2859  struct kmr_work_list *p = kmr_malloc(sizeof(struct kmr_work_list));
2860  assert(p != 0);
2861  p->next = 0;
2862  p->item = x;
2863  assert(wf->master.history_insertion_tail->next == 0);
2864  wf->master.history_insertion_tail->next = p;
2865  wf->master.history_insertion_tail = p;
2866  }
2867 }
2868 
2869 /** Prints the history of kmr_map_swf(), which is the start ordering
2870  the work-items. The work-items are given sequence numbers from
2871  zero in the order in the KVS. */
2872 
2873 void
2875 {
2876  kmr_err_when_swf_is_not_initialized(mr);
2877  struct kmr_swf *wf = mr->simple_workflow;
2878  if (wf->master.history_head.next == 0) {
2879  kmr_warning(wf->mr, 1, "Workflow history not recorded.");
2880  } else {
2881  struct kmr_work_list *p;
2882  p = wf->master.history_head.next;
2883  while (p != 0) {
2884  struct kmr_work_list *q = p;
2885  struct kmr_work_item *x = q->item;
2886  p = p->next;
2887  /* (Note kmr_lane_string() uses static buffer). */
2888  printf("work=%d for lane=%s",
2889  x->sequence_no, kmr_lane_string(x->requesting_lane, 0));
2890  printf(" run in lane=%s\n",
2891  kmr_lane_string(x->assigned_lane, 0));
2892  }
2893  }
2894  fflush(0);
2895 }
2896 
2897 /** Returns a list of start ordering of the work-items. The
2898  work-items are given sequence numbers from zero in the order in
2899  the KVS, and the HISTORY vector is filled by them in the order of
2900  the starts of the work-items. The COUNT specifies the allocated
2901  length of the history vector. */
2902 
2903 void
2904 kmr_dump_swf_order_history(KMR *mr, int *history, size_t count)
2905 {
2906  kmr_err_when_swf_is_not_initialized(mr);
2907  struct kmr_swf *wf = mr->simple_workflow;
2908  if (wf->master.history_head.next == 0) {
2909  kmr_warning(wf->mr, 1, "Workflow history not recorded.");
2910  } else {
2911  int icount = (int)count;
2912  for (int i = 0; i < icount; i++) {
2913  history[i] = -1;
2914  }
2915  int i;
2916  i = 0;
2917  struct kmr_work_list *p;
2918  p = wf->master.history_head.next;
2919  while (p != 0 && i < icount) {
2920  struct kmr_work_list *q = p;
2921  struct kmr_work_item *x = q->item;
2922  p = p->next;
2923  history[i] = x->sequence_no;
2924  i++;
2925  }
2926  }
2927 }
2928 
2929 /** Clears the history recorded in kmr_map_swf(). The history is also
2930  automatically cleared when a next call to kmr_map_swf(). */
2931 
2932 void
2934 {
2935  kmr_err_when_swf_is_not_initialized(mr);
2936  struct kmr_swf *wf = mr->simple_workflow;
2937  if (wf->master.history_head.next == 0) {
2938  kmr_warning(wf->mr, 1, "Workflow history not recorded.");
2939  } else {
2940  assert(wf->master.record_history);
2941  assert(wf->master.history_insertion_tail != 0);
2942  kmr_free_work_list(wf, wf->master.history_head.next, 0, 0);
2943  wf->master.history_head.next = 0;
2944  wf->master.history_insertion_tail = 0;
2945  }
2946 }
2947 
2948 /** (spawn-library-protocol) Handles requests from workers. It
2949  returns MPI_ERR_PENDING when some workers not finish, or
2950  MPI_SUCCESS. It blocks in receiving a new request. */
2951 
2952 static int
2953 kmr_handle_worker_request(struct kmr_swf *wf, _Bool joining)
2954 {
2955  MPI_Comm comm = wf->base_comm;
2956  _Bool tracing5 = (wf->mr->trace_map_spawn && (5 <= wf->mr->verbosity));
2957 
2958  int cc;
2959 
2960  assert(wf->master.rpc_buffer != 0 && wf->master.rpc_size > 0);
2961  union kmr_spawn_rpc *mbuf = wf->master.rpc_buffer;
2962  int msz = (int)wf->master.rpc_size;
2963  mbuf->req = KMR_SPAWN_NONE;
2964  MPI_Status st;
2965  int len;
2966  cc = MPI_Recv(mbuf, msz, MPI_BYTE, MPI_ANY_SOURCE,
2967  KMR_SPAWN_RPC_TAG, comm, &st);
2968  assert(cc == MPI_SUCCESS);
2969  cc = MPI_Get_count(&st, MPI_BYTE, &len);
2970  assert(cc == MPI_SUCCESS);
2971  size_t msglen = (size_t)len;
2972  int rank = st.MPI_SOURCE;
2973  assert(rank != wf->master_rank);
2974 
2975 #if 0 /*AHOAHO*/
2976  struct ss {
2977  int MPI_SOURCE;
2978  int MPI_TAG;
2979  int MPI_ERROR;
2980  int _count;
2981  int _cancelled;
2982  } *xs = (struct ss *)&st;
2983  fprintf(stderr, "MPI_SOURCE=%d MPI_TAG=%d MPI_ERROR=%d _count=%d _cancelled=%d\n", xs->MPI_SOURCE, xs->MPI_TAG, xs->MPI_ERROR, xs->_count, xs->_cancelled); fflush(0);
2984  fprintf(stderr, "msz=%d ty=%lx\n", msz, MPI_BYTE); fflush(0);
2985 #endif
2986 
2987  assert(MPI_SUCCESS != -1 && MPI_ERR_SPAWN != -1);
2988  cc = -1;
2989  switch (mbuf->req) {
2990  case KMR_SPAWN_NEXT: {
2991  assert(msglen == sizeof(struct kmr_spawn_next));
2992  struct kmr_spawn_next *w = &(mbuf->m0);
2993  struct kmr_lane_state *top = wf->master.top_lane;
2994  struct kmr_lane_state *lane = wf->master.lane_of_workers[rank];
2995 
2996  assert(lane == 0 || (lane->sublanes == 0 && lane->workers != 0));
2997  if (w->initial_message != 0) {
2998  if (!joining) {
2999  char ee[80];
3000  snprintf(ee, sizeof(ee),
3001  ("Unexpectedly receive a worker joining message"
3002  " (from rank=%d)"), rank);
3003  kmr_error(wf->mr, ee);
3004  abort();
3005  }
3006  if (lane != 0) {
3007  lane->n_joined_ranks++;
3008  top->n_joined_ranks++;
3009  assert(lane->n_joined_ranks <= lane->workers->n);
3010  assert(top->n_joined_ranks <= top->total_ranks);
3011  } else {
3012  wf->master.idle_ranks++;
3013  }
3014  }
3015 
3016  if (tracing5) {
3017  if (w->initial_message != 0) {
3018  fprintf(stderr,
3019  (";;KMR [%05d] rank=%d joined"
3020  " (workers=%d/%d; idle=%d).\n"),
3021  wf->rank, rank,
3022  top->n_joined_ranks, top->total_ranks,
3023  wf->master.idle_ranks);
3024  fflush(0);
3025  } else {
3026  fprintf(stderr, ";;KMR [%05d] rank=%d requesting a work.\n",
3027  wf->rank, rank);
3028  fflush(0);
3029  }
3030  }
3031 
3032  if (joining) {
3033  if (top->n_joined_ranks < top->total_ranks) {
3034  return MPI_ERR_PENDING;
3035  } else {
3036  return MPI_SUCCESS;
3037  }
3038  } else if (lane != 0) {
3039  int s = kmr_find_worker_index(wf, lane, rank);
3040  assert(s != -1);
3041  cc = kmr_yield_for_lane(wf, lane, s);
3042  if (cc == MPI_SUCCESS) {
3043  if (tracing5) {
3044  fprintf(stderr, ";;KMR [%05d] Workflow finished.\n",
3045  wf->rank);
3046  fflush(0);
3047  }
3048  }
3049  return cc;
3050  } else {
3051  return MPI_ERR_PENDING;
3052  }
3053  //break;
3054  }
3055 
3056  default: {
3057  cc = MPI_ERR_SPAWN;
3058  char ee[80];
3059  snprintf(ee, sizeof(ee),
3060  "Bad RPC message request=0x%x length=%zd from rank=%d",
3061  mbuf->req, msglen, rank);
3062  kmr_error(wf->mr, ee);
3063  abort();
3064  return cc;
3065  }
3066  }
3067 }
3068 
3069 /* (spawn-library-protocol) Starts processing on workers which are
3070  idle in the service loop. With SHUTDOWN=true, it makes all
3071  workers leave the service loop. */
3072 
3073 static int
3074 kmr_activate_workers(struct kmr_swf *wf, _Bool shutdown)
3075 {
3076  MPI_Comm comm = wf->base_comm;
3077 
3078  int cc;
3079 
3080  if (!shutdown) {
3081  struct kmr_spawn_work mm;
3082  struct kmr_spawn_work *mbuf = &mm;
3083  size_t msz = (offsetof(struct kmr_spawn_work, args) + 0);
3084  memset(mbuf, 0, msz);
3085  mbuf->req = KMR_SPAWN_WORK;
3086  mbuf->protocol_version = KMR_SPAWN_MAGIC;
3087  mbuf->message_size = (int)msz;
3088  mbuf->subworld = 0;
3089  mbuf->color = 0;
3090  mbuf->nprocs = 0;
3091  for (int i = 0; i < wf->nprocs; i++) {
3092  if (i != wf->master_rank) {
3093  cc = MPI_Send(mbuf, (int)msz, MPI_BYTE, i,
3094  KMR_SPAWN_RPC_TAG, comm);
3095  assert(cc == MPI_SUCCESS);
3096  }
3097  }
3098  } else {
3099  struct kmr_spawn_none mm;
3100  struct kmr_spawn_none *mbuf = &mm;
3101  size_t msz = sizeof(struct kmr_spawn_none);
3102  memset(mbuf, 0, msz);
3103  mbuf->req = KMR_SPAWN_NONE;
3104  mbuf->protocol_version = KMR_SPAWN_MAGIC;
3105  for (int i = 0; i < wf->nprocs; i++) {
3106  if (i != wf->master_rank) {
3107  cc = MPI_Send(mbuf, (int)msz, MPI_BYTE, i,
3108  KMR_SPAWN_RPC_TAG, comm);
3109  assert(cc == MPI_SUCCESS);
3110  }
3111  }
3112  }
3113  return MPI_SUCCESS;
3114 }
3115 
3116 /* (spawn-library-protocol). Requests a worker to start a work. A
3117  caller is responsible for ensuring that the all workers are idle.
3118  The started workers must be connected by kmr_spawn_connect(). */
3119 
3120 static int
3121 kmr_start_worker(struct kmr_spawn_work *w, size_t msglen,
3122  int rank, MPI_Comm basecomm)
3123 {
3124  int cc;
3125 
3126  int len = (int)msglen;
3127  cc = MPI_Send(w, len, MPI_BYTE, rank, KMR_SPAWN_RPC_TAG, basecomm);
3128  assert(cc == MPI_SUCCESS);
3129  return MPI_SUCCESS;
3130 }
3131 
3132 /* (spawn-library-protocol). Connects to workers corresponding to
3133  kmr_spawn_join_to_master(). */
3134 
3135 static int
3136 kmr_join_to_workers(struct kmr_swf *wf, struct kmr_lane_state *lane)
3137 {
3138  MPI_Comm basecomm = wf->base_comm;
3139  _Bool tracing5 = (wf->mr->trace_map_spawn && (5 <= wf->mr->verbosity));
3140 
3141  int cc;
3142 
3143  if (tracing5) {
3144  fprintf(stderr, ";;KMR [%05d] Connect to workers (lane=%s).\n",
3145  wf->rank, kmr_lane_string(lane->lane_id, 0));
3146  fflush(0);
3147  }
3148 
3149  assert(lane->icomm == MPI_COMM_NULL);
3150  int leader = lane->leader_rank;
3151  int nranks = lane->total_ranks;
3152  cc = MPI_Intercomm_create(MPI_COMM_SELF, 0, basecomm, leader,
3153  KMR_SPAWN_ICOMM_TAG, &lane->icomm);
3154  assert(cc == MPI_SUCCESS);
3155 
3156  int nprocs;
3157  cc = MPI_Comm_remote_size(lane->icomm, &nprocs);
3158  assert(cc == MPI_SUCCESS);
3159  if (nranks != nprocs) {
3160  char ee[80];
3161  snprintf(ee, sizeof(ee),
3162  "Bad inter-communicator size (%d!=%d)", nprocs, nranks);
3163  kmr_error(wf->mr, ee);
3164  abort();
3165  }
3166 
3167  return MPI_SUCCESS;
3168 }
3169 
3170 /* (FAKE OF KMRSPAWN LIBRARY). */
3171 
3172 #if 0 /*AHO*/
3173 int
3174 kmr_spawn_hookup_fake(struct kmr_spawn_hooks *hooks)
3175 {
3176  if (kmr_fake_spawn_hooks != 0 && kmr_fake_spawn_hooks != hooks) {
3177  kmr_free(kmr_fake_spawn_hooks, sizeof(struct kmr_spawn_hooks));
3178  }
3179  kmr_fake_spawn_hooks = hooks;
3180  return MPI_SUCCESS;
3181 }
3182 #endif
3183 
3184 /* (FAKE OF KMRSPAWN LIBRARY). */
3185 
3186 #if 0
3187 static int
3188 kmr_run_command(struct kmr_spawn_hooks *hooks,
3189  struct kmr_spawn_work *w, size_t msglen)
3190 {
3191  size_t asz = (msglen - offsetof(struct kmr_spawn_work, args));
3192 
3193  int argc;
3194  argc = 0;
3195 
3196  {
3197  char *e = &(w->args[asz]);
3198  char *p;
3199  p = w->args;
3200  while (p[0] != 0 && p < (e - 1)) {
3201  argc++;
3202  while (p[0] != 0 && p < (e - 1)) {
3203  p++;
3204  }
3205  if (p < (e - 1)) {
3206  assert(p[0] == 0);
3207  p++;
3208  }
3209  }
3210  assert(p == (e - 1) || p[0] == 0);
3211  }
3212 
3213  char *argv[argc + 1];
3214 
3215  {
3216  int i;
3217  i = 0;
3218  char *e = &(w->args[asz]);
3219  char *p;
3220  p = w->args;
3221  while (p[0] != 0 && p < (e - 1)) {
3222  assert(i < argc);
3223  argv[i] = p;
3224  i++;
3225  while (p[0] != 0 && p < (e - 1)) {
3226  p++;
3227  }
3228  if (p < (e - 1)) {
3229  assert(p[0] == 0);
3230  p++;
3231  }
3232  }
3233  assert(p == (e - 1) || p[0] == 0);
3234  assert(i == argc);
3235  argv[argc] = 0;
3236  }
3237 
3238  struct kmr_spawn_hooks *hooks = kmr_spawn_hooks;
3239  hooks->s.running_work = w;
3240  hooks->s.mpi_initialized = 1;
3241 
3242  if (hooks->s.print_trace) {
3243  char aa[80];
3244  kmr_make_printable_argv_string(aa, 45, argv);
3245  printf(";;KMR [%05d] EXEC: %s\n",
3246  hooks->s.base_rank, aa);
3247  fflush(0);
3248  }
3249 
3250  hooks->s.running_work = 0;
3251  hooks->s.mpi_initialized = 0;
3252 
3253  return MPI_SUCCESS;
3254 }
3255 #endif
3256 
3257 /*
3258 Copyright (C) 2012-2018 RIKEN R-CCS
3259 This library is distributed WITHOUT ANY WARRANTY. This library can be
3260 redistributed and/or modified under the terms of the BSD 2-Clause License.
3261 */
Lane Number (at-all-ranks).
Definition: kmrwfmap.c:74
static struct kmr_lane_vector * kmr_make_bottom_lanes(struct kmr_swf *wf, struct kmr_lane_no *laneids, struct kmr_pair laneranks[][KMR_LANE_LEVELS])
Makes lanes at the bottom levels (at-the-master).
Definition: kmrwfmap.c:1749
Key-Value Stream (abstract).
Definition: kmr.h:632
static int kmr_yield_for_lane(struct kmr_swf *wf, struct kmr_lane_state *lane, int sublaneindex)
Schedules a next work-item when a worker or a sublane finishes (at-the-master).
Definition: kmrwfmap.c:2517
Utilities Private Part (do not include from applications).
static void kmr_free_lanes(struct kmr_swf *wf, struct kmr_lane_state *lane)
Frees a lane and its sublanes, recursively (at-the-master).
Definition: kmrwfmap.c:2006
static int kmr_find_leader(struct kmr_swf *wf, struct kmr_lane_state *lane, int level, struct kmr_pair laneranks[][KMR_LANE_LEVELS])
Searches a leader (rank=0) in a LANE at a LEVEL (at-the-master).
Definition: kmrwfmap.c:1944
static int kmr_enqueue_work(struct kmr_swf *wf, struct kmr_lane_state *lane, struct kmr_work_item *x, _Bool multiple_any)
Enqueues a work-item in some sublane of a LANE (at-the-master).
Definition: kmrwfmap.c:2352
Work-Item Queue Entry (at-the-master).
Definition: kmrwfmap.c:98
static int kmr_start_lanes(struct kmr_swf *wf, struct kmr_lane_state *lane, struct kmr_work_item *x)
Requests workers to start a work for a lane and its sublanes, and then connects to workers (at-the-ma...
Definition: kmrwfmap.c:2720
void kmr_free_swf_history(KMR *mr)
Clears the history recorded in kmr_map_swf().
Definition: kmrwfmap.c:2933
static int kmr_check_partitioning(struct kmr_swf *wf, int supercolor, MPI_Comm subcomm)
Checks if a sub-communicator is a partitioning of a super-communicator (at-all-ranks).
Definition: kmrwfmap.c:1413
int kmr_init_swf(KMR *mr, MPI_Comm lanecomms[KMR_LANE_LEVELS], int master)
Initializes the lanes of simple workflow.
Definition: kmrwfmap.c:528
Vector of Ranks (at-the-master).
Definition: kmrwfmap.c:147
static void kmr_resolve_lanes(struct kmr_swf *wf)
Assigns a lane-number to a rank (wf->lane_id_on_proc) (at-all-ranks).
Definition: kmrwfmap.c:1368
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:177
Work Description (at-the-master).
Definition: kmrwfmap.c:82
static int kmr_load_spawn_library(struct kmr_swf *wf, _Bool test_with_fake_spawn)
Loads the spawn-library "libkmrspawn.so".
Definition: kmrwfmap.c:827
Definition: kmr.h:391
KMR Context.
Definition: kmr.h:247
static struct kmr_lane_no kmr_name_lane(KMR *mr, const char *s)
Parses a string as a lane-number.
Definition: kmrwfmap.c:248
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:679
static int kmr_find_sublane_index(struct kmr_swf *wf, struct kmr_lane_state *lane)
Finds a sublane index of a superlane.
Definition: kmrwfmap.c:435
static int kmr_schedule_lanes(struct kmr_swf *wf, struct kmr_lane_state *lane)
Scehdules a lane for a next work-item (at-the-master).
Definition: kmrwfmap.c:2557
int kmr_split_swf_lanes_a(KMR *mr, MPI_Comm splitcomms[KMR_LANE_LEVELS], int root, int *description[], _Bool dump)
Splits a communicator in a KMR context to ones to be used for kmr_init_swf().
Definition: kmrwfmap.c:930
static char * kmr_lane_string(struct kmr_lane_no n, _Bool print_all_levels)
(NO-THREAD-SAFE) Returns a string representation of a lane-number.
Definition: kmrwfmap.c:344
void kmr_set_swf_verbosity(KMR *mr, int level)
Sets the verbosity of the spawn-library.
Definition: kmrwfmap.c:505
static int kmr_check_lane_id(struct kmr_swf *wf, struct kmr_lane_no id, _Bool admit_any)
Checks well-formedness of a lane-number.
Definition: kmrwfmap.c:1454
Work-Item Queue of a Lane (at-the-master).
Definition: kmrwfmap.c:112
static struct kmr_rank_vector * kmr_make_rank_vector(int n)
Allocates a rank vector, filling all entries with -1.
Definition: kmrwfmap.c:476
Handy Copy of a Key-Value Field.
Definition: kmr.h:401
Workflow State (at-all-ranks).
Definition: kmrwfmap.c:163
Options to Mapping by Spawns.
Definition: kmr.h:708
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
static struct kmr_lane_state * kmr_allocate_lane(int level, struct kmr_lane_no id, int nprocs)
Makes a lane structure (at-the-master).
Definition: kmrwfmap.c:1703
int kmr_split_swf_lanes(KMR *mr, MPI_Comm splitcomms[KMR_LANE_LEVELS], int root, char *description[], _Bool dump)
Splits a communicator in a KMR context to ones to be used for kmr_init_swf().
Definition: kmrwfmap.c:1067
Vector of Lanes (at-the-master).
Definition: kmrwfmap.c:140
static struct kmr_lane_vector * kmr_make_lane_vector(int n, struct kmr_lane_state *lanes[])
Packs lanes in a vector.
Definition: kmrwfmap.c:457
static int kmr_dequeue_scattered_work(struct kmr_swf *wf, struct kmr_lane_state *lane, struct kmr_work_item *x)
Removes all occurrences of a work-item (which may be scattered for an any-lane) from the all queues...
Definition: kmrwfmap.c:2680
#define KMR_LANE_LEVELS
Maximum Levels of Lanes.
Definition: kmrwfmap.c:62
void kmr_dump_swf_order_history(KMR *mr, int *history, size_t count)
Returns a list of start ordering of the work-items.
Definition: kmrwfmap.c:2904
static int kmr_handle_worker_request(struct kmr_swf *wf, _Bool joining)
(spawn-library-protocol) Handles requests from workers.
Definition: kmrwfmap.c:2953
static int kmr_count_bottom_level_lanes(struct kmr_lane_state *lane)
Counts the number of the bottom level lanes (at-the-master).
Definition: kmrwfmap.c:1985
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:367
KMR Interface.
void kmr_dump_swf_history(KMR *mr)
Prints the history of kmr_map_swf(), which is the start ordering the work-items.
Definition: kmrwfmap.c:2874
int kmr_map_swf(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps with a simple workflow.
Definition: kmrwfmap.c:2112
int kmr_finish_swf(KMR *mr)
Clears the lanes of simple workflow.
Definition: kmrwfmap.c:750
static void kmr_make_lanes(struct kmr_swf *wf)
Initializes thelanes at the master rank (at-all-ranks).
Definition: kmrwfmap.c:1607
static void kmr_clear_lane_id(struct kmr_lane_no *id)
Clears the lane-number to a null-lane.
Definition: kmrwfmap.c:234
Static-Spawning API.
static int kmr_color_subcommunicator(struct kmr_swf *wf, MPI_Comm subcomm, MPI_Comm supercomm)
Colors sub-communicators distinctly in a super-communicator, and returns the color which names names ...
Definition: kmrwfmap.c:1519
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv().
Definition: kmrimpl.h:551
int kmr_make_printable_argv_string(char *s, size_t sz, char **argv)
Fills the string buffer with the argv strings for printing.
Definition: kmrutil.c:1859
int kmr_detach_swf_workers(KMR *mr)
Disengages the workers from main processing and puts them in the service loop for spawning...
Definition: kmrwfmap.c:659
int kmr_stop_swf_workers(KMR *mr)
Finishes the workers of workflow.
Definition: kmrwfmap.c:731
void kmr_dump_swf_lanes(KMR *mr)
Dumps lanes created by kmr_init_swf().
Definition: kmrwfmap.c:2054
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
Definition: kmr.h:736
static int kmr_find_worker_index(struct kmr_swf *wf, struct kmr_lane_state *lane, int rank)
Finds a worker index of a lane for a rank.
Definition: kmrwfmap.c:415
static void kmr_bond_sublanes(struct kmr_swf *wf, struct kmr_lane_state *sup, struct kmr_lane_state *lanes[], int nlanes)
Builds a vector of sublanes for the superlane SUP (at-the-master).
Definition: kmrwfmap.c:1884
static unsigned long kmr_color_of_lane(struct kmr_lane_no id)
Returns a lane-number as a single integer color.
Definition: kmrwfmap.c:406
static void kmr_bond_all_lanes(struct kmr_swf *wf, struct kmr_lane_vector *v, struct kmr_pair laneranks[][KMR_LANE_LEVELS])
Collects lanes to make a superlane, which build up to a single top-lane (at-the-master).
Definition: kmrwfmap.c:1830
static _Bool kmr_lane_eq(struct kmr_lane_no n0, struct kmr_lane_no n1, int level)
Compares lane-numbers up to the LEVEL.
Definition: kmrwfmap.c:220
static int kmr_level_of_lane(struct kmr_lane_no n, _Bool admit_any)
Returns the maximum level of a given lane-number (zero to KMR_LANE_LEVELS-1), or returns -1 for a nul...
Definition: kmrwfmap.c:383