KMR
kmrmapms.c
Go to the documentation of this file.
1 /* kmrmapms.c (2014-02-04) */
2 /* Copyright (C) 2012-2018 RIKEN R-CCS */
3 
4 /** \file kmrmapms.c Master-Worker Mapping on Key-Value Stream. */
5 
6 #include <mpi.h>
7 #include <stddef.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <limits.h>
11 #include <poll.h>
12 #include <netdb.h>
13 #include <errno.h>
14 #include <fcntl.h>
15 #include <sys/types.h>
16 #include <sys/wait.h>
17 #include <sys/stat.h>
18 #include <sys/param.h>
19 #include <arpa/inet.h>
20 #include <sys/mman.h>
21 #include <assert.h>
22 #ifdef _OPENMP
23 #include <omp.h>
24 #endif
25 #include "kmr.h"
26 #include "kmrimpl.h"
27 
28 #define MAX(a,b) (((a)>(b))?(a):(b))
29 #define MIN(a,b) (((a)<(b))?(a):(b))
30 
31 static const int kmr_kv_buffer_slack_size = 1024;
32 
33 /* State of each task of kmr_map_ms(). */
34 
35 enum {
36  KMR_RPC_NONE, KMR_RPC_GOON, KMR_RPC_DONE
37 };
38 
39 static inline void
40 kmr_assert_peer_tag(int tag)
41 {
42  assert(KMR_TAG_PEER_0 <= tag && tag < KMR_TAG_PEER_END);
43 }
44 
45 /* Special values of task ID. Task IDs are non-negative. A task ID
46  is included in an RPC request, which is used both for returning a
47  result and for wanting a new task. KMR_RPC_ID_NONE marks no
48  results are returned in the first request from a worker thread.
49  KMR_RPC_ID_FIN marks a node has finished with all the worker
50  threads. */
51 
52 #define KMR_RPC_ID_NONE -1
53 #define KMR_RPC_ID_FIN -2
54 
55 /** Delivers key-value pairs as requested. It returns MPI_SUCCESS if
56  all done, or MPI_ERR_ROOT otherwise. It finishes the tasks when
57  all nodes have contacted and all worker threads are done.
58  Protocol: (1) Receive an RPC request (KMR_TAG_REQ). A request
59  consists of a triple of integers (task-ID, peer-tag, result-size)
60  ("int req[3]"). The task-ID encodes some special values. (2)
61  Receive a result if a worker has one. (3) Return a new task if
62  available. A reply consists of a tuple of integers (task-ID,
63  argument-size) ("int ack[2]"). (4) Or, return a "no-tasks"
64  indicator by ID=KMR_RPC_ID_NONE. (5) Count "done" messages by
65  ID=KMR_RPC_ID_FIN, which indicates the worker node has finished for
66  all worker threads. The task-ID in an RPC request is
67  KMR_RPC_ID_NONE for the first request (meaning that the request
68  has no result). Peer-tags are used in subsequent messages to
69  direct reply messages to a requesting thread. */
70 
71 static int
73  void *arg, struct kmr_option opt, kmr_mapfn_t m)
74 {
75  KMR *mr = kvi->c.mr;
76  if (kmr_fields_pointer_p(kvi)) {
77  kmr_error(mr, "kmr_map_ms: cannot handle pointer field types");
78  }
79  assert(kvo->c.key_data == kvi->c.key_data
80  && kvo->c.value_data == kvi->c.value_data);
81  MPI_Comm comm = mr->comm;
82  int nprocs = mr->nprocs;
83  _Bool tracing5 = (mr->trace_map_ms && (5 <= mr->verbosity));
84  long longcount = kvi->c.element_count;
85  assert(INT_MIN <= longcount && longcount <= INT_MAX);
86  int cnt = (int)longcount;
87  struct kmr_map_ms_state *ms = kvi->c.ms;
88  char *msstates;
89  if (ms != 0) {
90  msstates = &(ms->states[0]);
91  } else {
92  /* First time. */
93  size_t hdsz = offsetof(struct kmr_map_ms_state, states);
94  ms = kmr_malloc((hdsz + sizeof(char) * (size_t)cnt));
95  kvi->c.ms = ms;
96  ms->idles = 0;
97  ms->kicks = 0;
98  ms->dones = 0;
99  msstates = &(ms->states[0]);
100  for (int i = 0; i < cnt; i++) {
101  msstates[i] = KMR_RPC_NONE;
102  }
103  if (tracing5) {
104  char *name = "kmr_map_ms";
105  fprintf(stderr,
106  ";;KMR [%05d] %s: key-count=%d\n",
107  mr->rank, name, cnt);
108  fflush(0);
109  }
110  }
111  /* Make/remake array of key-value pointers. */
112  struct kmr_kvs_entry **ev = kvi->c.temporary_data;
113  if (ev == 0) {
114  ev = kmr_malloc(sizeof(struct kmr_kvs_entry *) * (size_t)cnt);
115  kvi->c.temporary_data = ev;
116  kvi->c.current_block = kvi->c.first_block;
117  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, kvi->c.first_block);
118  for (int i = 0; i < cnt; i++) {
119  assert(e != 0);
120  ev[i] = e;
121  e = kmr_kvs_next(kvi, e, 0);
122  }
123  }
124  /* Finish the task when all done. */
125  if (ms->dones == cnt && ms->idles == (nprocs - 1)) {
126  /* Let ranks leave. */
127  for (int peer = 1; peer < nprocs; peer++) {
128  int cc;
129  int req[3] = {0, KMR_RPC_ID_FIN, 0};
130  cc = MPI_Send(req, 3, MPI_INT, peer, KMR_TAG_REQ, comm);
131  assert(cc == MPI_SUCCESS);
132  }
133  if (kvi->c.temporary_data != 0) {
134  kmr_free(kvi->c.temporary_data,
135  (sizeof(struct kmr_kvs_entry *) * (size_t)cnt));
136  kvi->c.temporary_data = 0;
137  }
138  return MPI_SUCCESS;
139  }
140  /* Wait for one request and process it. */
141  assert(ms->dones <= ms->kicks);
142  MPI_Status st;
143  int cc;
144  int req[3];
145  cc = MPI_Recv(req, 3, MPI_INT, MPI_ANY_SOURCE, KMR_TAG_REQ, comm, &st);
146  assert(cc == MPI_SUCCESS);
147  int peer_tag = req[0];
148  int peer = st.MPI_SOURCE;
149  assert(peer != 0);
150  {
151  int id = req[1];
152  int sz = req[2];
153  if (id == KMR_RPC_ID_NONE) {
154  /* Got the first request from a peer, no task results. */
155  } else if (id == KMR_RPC_ID_FIN) {
156  /* Got the finishing request from a peer. */
157  ms->idles++;
158  assert(ms->idles <= (nprocs - 1));
159  } else {
160  /* Receive a task result. */
161  assert(id >= 0);
162  kmr_assert_peer_tag(peer_tag);
163  void *packed = kmr_malloc((size_t)sz);
164  cc = MPI_Recv(packed, sz, MPI_BYTE, peer, peer_tag, comm, &st);
165  assert(cc == MPI_SUCCESS);
166  KMR_KVS *kvx = kmr_create_kvs(mr, KMR_KV_BAD, KMR_KV_BAD);
167  cc = kmr_restore_kvs(kvx, packed, (size_t)sz, kmr_noopt);
168  assert(cc == MPI_SUCCESS);
169  struct kmr_option keepopen = {.keep_open = 1};
170  cc = kmr_map(kvx, kvo, 0, keepopen, kmr_add_identity_fn);
171  assert(cc == MPI_SUCCESS);
172  kmr_free(packed, (size_t)sz);
173  assert(msstates[id] == KMR_RPC_GOON);
174  msstates[id] = KMR_RPC_DONE;
175  ms->dones++;
176  }
177  }
178  if (ms->kicks < cnt) {
179  /* Send a new task (cnt is count in integer). */
180  int id;
181  for (id = 0; id < cnt; id++) {
182  if (msstates[id] == KMR_RPC_NONE) {
183  break;
184  }
185  }
186  assert(id != KMR_RPC_ID_NONE && id != cnt);
187  struct kmr_kvs_entry *e = ev[id];
188  int sz = (int)kmr_kvs_entry_netsize(e);
189  assert(sz > 0);
190  int ack[2] = {id, sz};
191  cc = MPI_Send(ack, 2, MPI_INT, peer, peer_tag, comm);
192  assert(cc == MPI_SUCCESS);
193  cc = MPI_Send(e, sz, MPI_BYTE, peer, peer_tag, comm);
194  assert(cc == MPI_SUCCESS);
195  assert(msstates[id] == KMR_RPC_NONE);
196  msstates[id] = KMR_RPC_GOON;
197  ms->kicks++;
198  if (tracing5) {
199  char *name = "kmr_map_ms";
200  fprintf(stderr,
201  ";;KMR [%05d] %s: work=%d to rank=%d\n",
202  mr->rank, name, id, peer);
203  fflush(0);
204  }
205  } else {
206  /* Finish the worker thread. */
207  int ack[2] = {KMR_RPC_ID_NONE, 0};
208  cc = MPI_Send(ack, 2, MPI_INT, peer, peer_tag, comm);
209  assert(cc == MPI_SUCCESS);
210  }
211  /* Have more entries. */
212  return MPI_ERR_ROOT;
213 }
214 
215 /** Asks the master for a task, then calls a map-function. With
216  threading, each thread works independently asking the master for a
217  task. It simply protects MPI send/recv calls by OMP critical
218  sections, but their grain sizes are too large for uses of OMP
219  critical sections. */
220 
221 static int
223  void *arg, struct kmr_option opt, kmr_mapfn_t m)
224 {
225  assert(!kmr_fields_pointer_p(kvi)
226  && kvo->c.key_data == kvi->c.key_data
227  && kvo->c.value_data == kvi->c.value_data);
228  KMR * const mr = kvi->c.mr;
229  const MPI_Comm comm = mr->comm;
230  const int rank = mr->rank;
231  const enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvo->c.key_data);
232  const enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvo->c.value_data);
233  assert(rank != 0);
234  assert(kvi->c.element_count == 0);
235 #ifdef _OPENMP
236  const _Bool threading = !(mr->single_thread || opt.nothreading);
237 #endif
238  KMR_OMP_PARALLEL_IF_(threading)
239  {
240  int cc;
241  int thr = KMR_OMP_GET_THREAD_NUM();
242  MPI_Status st;
243  struct kmr_kvs_entry *e = 0;
244  int maxsz = 0;
245  int peer_tag = KMR_TAG_PEER(thr);
246  kmr_assert_peer_tag(peer_tag);
247  {
248  /* Make the first request. */
249  int req[3] = {peer_tag, KMR_RPC_ID_NONE, 0};
250  KMR_OMP_CRITICAL_
251  cc = MPI_Send(req, 3, MPI_INT, 0, KMR_TAG_REQ, comm);
252  assert(cc == MPI_SUCCESS);
253  }
254  for (;;) {
255  int ack[2];
256  KMR_OMP_CRITICAL_
257  cc = MPI_Recv(ack, 2, MPI_INT, 0, peer_tag, comm, &st);
258  assert(cc == MPI_SUCCESS);
259  int id = ack[0];
260  int sz = ack[1];
261  if (id == KMR_RPC_ID_NONE) {
262  assert(sz == 0);
263  break;
264  }
265 
266  assert(id >= 0 && sz > 0);
267  if (sz > maxsz) {
268  maxsz = (sz + kmr_kv_buffer_slack_size);
269  e = kmr_realloc(e, (size_t)maxsz);
270  assert(e != 0);
271  }
272  KMR_OMP_CRITICAL_
273  cc = MPI_Recv(e, sz, MPI_BYTE, 0, peer_tag, comm, &st);
274  assert(cc == MPI_SUCCESS);
275  /* Invoke a mapper. */
276  KMR_KVS *kvx;
277  KMR_OMP_CRITICAL_
278  kvx = kmr_create_kvs(mr, keyf, valf);
279  struct kmr_kv_box kv = kmr_pick_kv(e, kvi);
280  cc = (*m)(kv, kvi, kvx, arg, id);
281  if (cc != MPI_SUCCESS) {
282  char ee[80];
283  snprintf(ee, sizeof(ee),
284  "Map-fn returned with error cc=%d", cc);
285  kmr_error(mr, ee);
286  }
287  kmr_add_kv_done(kvx);
288  void *packed = 0;
289  size_t packsz = 0;
290  cc = kmr_save_kvs(kvx, &packed, &packsz, kmr_noopt);
291  assert(cc == MPI_SUCCESS && packed != 0);
292  /* Send a task result. */
293  assert(packsz <= (size_t)INT_MAX);
294  sz = (int)packsz;
295  int req[3] = {peer_tag, id, sz};
296  KMR_OMP_CRITICAL_
297  cc = MPI_Send(req, 3, MPI_INT, 0, KMR_TAG_REQ, comm);
298  assert(cc == MPI_SUCCESS);
299  KMR_OMP_CRITICAL_
300  cc = MPI_Send(packed, sz, MPI_BYTE, 0, peer_tag, comm);
301  assert(cc == MPI_SUCCESS);
302  /* Cleanup. */
303  KMR_OMP_CRITICAL_
304  cc = kmr_free_kvs(kvx);
305  assert(cc == MPI_SUCCESS);
306  kmr_free(packed, packsz);
307  }
308  if (e != 0) {
309  kmr_free(e, (size_t)maxsz);
310  }
311  }
312  /* (Threads join). */
313  {
314  /* Make the finishing request. */
315  int cc;
316  int req[3] = {0, KMR_RPC_ID_FIN, 0};
317  cc = MPI_Send(req, 3, MPI_INT, 0, KMR_TAG_REQ, comm);
318  assert(cc == MPI_SUCCESS);
319  /* Wait for the master to finish. */
320  MPI_Status st;
321  cc = MPI_Recv(req, 3, MPI_INT, 0, KMR_TAG_REQ, comm, &st);
322  assert(cc == MPI_SUCCESS);
323  assert(req[0] == 0 && req[1] == KMR_RPC_ID_FIN && req[2] == 0);
324  }
325  return MPI_SUCCESS;
326 }
327 
328 /** Maps in master-worker mode. The input key-value stream should be
329  empty except on rank0 where the master is running (the contents on
330  the worker ranks are ignored). It consumes the input key-value
331  stream. The master does delivery only. The master returns
332  frequently to give a chance to check-pointing, etc. The master
333  returns immaturely each time one pair is delivered, and those
334  returns are marked by MPI_ERR_ROOT indicating more tasks remain.
335  In contrast, workers return only after all tasks done. The enough
336  state to have to keep during kmr_map_ms() for check-pointing is in
337  the key-value streams KVI and KVO on the master. Note that this
338  totally diverges from bulk-synchronous execution. It does not
339  accept key-value field types KMR_KV_POINTER_OWNED or
340  KMR_KV_POINTER_UNMANAGED. Effective-options: NOTHREADING,
341  KEEP_OPEN. See struct kmr_option. */
342 
343 int
345  void *arg, struct kmr_option opt, kmr_mapfn_t m)
346 {
347  kmr_assert_kvs_ok(kvi, kvo, 1, 0);
348  KMR *mr = kvi->c.mr;
349  struct kmr_option kmr_supported = {.nothreading = 1, .keep_open = 1};
350  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
351  int kcdc = kmr_ckpt_disable_ckpt(mr);
352  int rank = mr->rank;
353  long cnt = kvi->c.element_count;
354  assert(INT_MIN <= cnt && cnt <= INT_MAX);
355  int ccr;
356  int cc;
357  if (rank == 0) {
358  ccr = kmr_map_master(kvi, kvo, arg, opt, m);
359  if (ccr == MPI_SUCCESS) {
360  cc = kmr_add_kv_done(kvo);
361  assert(cc == MPI_SUCCESS);
362  cc = kmr_free_kvs(kvi);
363  assert(cc == MPI_SUCCESS);
364  }
365  } else {
366  ccr = kmr_map_worker(kvi, kvo, arg, opt, m);
367  cc = kmr_add_kv_done(kvo);
368  assert(cc == MPI_SUCCESS);
369  cc = kmr_free_kvs(kvi);
370  assert(cc == MPI_SUCCESS);
371  }
372  kmr_ckpt_enable_ckpt(mr, kcdc);
373  return ccr;
374 }
375 
376 /* ================================================================ */
377 
378 /* Mode of Spawning. KMR_SPAWN_INTERACT indicates spawned processes
379  interact with a map-function. KMR_SPAWN_SERIAL and
380  KMR_SPAWN_PARALLEL indicates spawned processes do not interact with
381  the parent. KMR_SPAWN_SERIAL is for sequential programs, for which
382  a watch-program "kmrwatch0" replies in place of spawned processes.
383  KMR_SPAWN_PARALLEL is for independent MPI programs, which do not
384  interact with the parent. Since independent MPI programs run
385  freely, it uses a socket connection by a watch-program "kmrwatch0"
386  to detect their ends. */
387 
388 enum kmr_spawn_mode {
389  KMR_SPAWN_INTERACT, KMR_SPAWN_SERIAL, KMR_SPAWN_PARALLEL
390 };
391 
392 /** State of each Spawning. The array of this structure is stored in
393  the kmr_spawning structure. RUNNING indicates the spawned
394  processes are running. N_PROCS is the number of processes to be
395  spawned (it equals to the COUNT below). INDEX is the number of
396  processes spawned so far, and COUNT is the number of processes of
397  the current spawn. The range INDEX by COUNT enumerates spawned
398  processes, and is used to point in the array of the MPI requests.
399  ARGC and ARGV are the argument list. ABUF (byte array of size
400  with ALEN), ARGV0 (pointer array of size with ARGC0) are buffers
401  for making command line arguments. ICOMM is the
402  inter-communicator. WATCH_PORT hold a IP port number for
403  watch-programs. */
404 
406  _Bool running;
407  int n_procs;
408  int index;
409  int count;
410  int argc;
411  char **argv;
412  int argc0;
413  char **argv0;
414  size_t alen;
415  char *abuf;
416  MPI_Comm icomm;
417  int watch_port;
418  double timestamp[6];
419 };
420 
421 /** State of Spawner. REPLIES hold receive requests for spawned
422  processes. WATCHES hold sockets used to detect the end of the
423  spawned processes. N_STARTEDS is process count of started, and
424  N_RUNNINGS is process count that have not finished. */
425 
426 struct kmr_spawning {
427  char *fn;
428  enum kmr_spawn_mode mode;
429  int n_spawns;
430  int n_spawners;
431  int n_processes;
432  int usize;
433  int spawn_limit;
434 
435  int n_starteds;
436  int n_runnings;
437 
438  struct kmr_spawn_state *spawned;
439  struct kmr_kv_box *ev;
440  MPI_Request *replies;
441  int *watches;
442 
443  int watch_listener;
444  char watch_host[MAXHOSTNAMELEN + 10];
445 };
446 
447 /* Sums integers among all ranks. It is used to check the number of
448  ranks which call spawn. */
449 
450 static int
451 kmr_sum_on_all_ranks(KMR *mr, int v, int *sum)
452 {
453  assert(sum != 0);
454  int cc;
455  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
456  struct kmr_kv_box nkv = {
457  .klen = (int)sizeof(long),
458  .vlen = (int)sizeof(long),
459  .k.i = 0,
460  .v.i = v
461  };
462  cc = kmr_add_kv(kvs0, nkv);
463  assert(cc == MPI_SUCCESS);
464  kmr_add_kv_done(kvs0);
465  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
466  cc = kmr_replicate(kvs0, kvs1, kmr_noopt);
467  assert(cc == MPI_SUCCESS);
468  long count;
469  cc = kmr_reduce_as_one(kvs1, 0, &count, kmr_noopt, kmr_isum_one_fn);
470  assert(cc == MPI_SUCCESS);
471  *sum = (int)count;
472  return MPI_SUCCESS;
473 }
474 
475 int
476 kmr_map_via_spawn_ff(KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
477  int finfo, struct kmr_spawn_option opt,
478  kmr_mapfn_t mapfn)
479 {
480  MPI_Info info = MPI_Info_f2c(finfo);
481  int cc = kmr_map_via_spawn(kvi, kvo, arg, info, opt, mapfn);
482  return cc;
483 }
484 
485 static inline void
486 kmr_spawn_info_put(struct kmr_spawn_info *info,
487  struct kmr_spawn_state *s,
488  struct kmr_spawn_option opt, void *arg)
489 {
490  info->maparg = arg;
491  info->u.icomm = s->icomm;
492  info->icomm_ff = MPI_Comm_c2f(s->icomm);
493  info->reply_root = opt.reply_root;
494 }
495 
496 static inline void
497 kmr_spawn_info_get(struct kmr_spawn_info *info,
498  struct kmr_spawn_state *s)
499 {
500  if (info->u.icomm != s->icomm) {
501  s->icomm = info->u.icomm;
502  } else if (info->icomm_ff != MPI_Comm_c2f(s->icomm)) {
503  s->icomm = MPI_Comm_f2c(info->icomm_ff);
504  }
505 }
506 
507 /* Makes a list of processes to spawn from the KVS entries. */
508 
509 static int
510 kmr_list_spawns(struct kmr_spawning *spw, KMR_KVS *kvi, MPI_Info info,
511  struct kmr_spawn_option opt)
512 {
513  assert(spw->n_spawns == (int)kvi->c.element_count);
514  KMR * const mr = kvi->c.mr;
515  _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
516  int cc;
517 
518  /* Scan key-value pairs and put them in EV. */
519 
520  kvi->c.current_block = kvi->c.first_block;
521  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, kvi->c.first_block);
522  for (int w = 0; w < spw->n_spawns; w++) {
523  spw->ev[w] = kmr_pick_kv(e, kvi);
524  e = kmr_kvs_next(kvi, e, 0);
525  }
526 
527  /* Share the universe evenly by all ranks which call spawn. */
528 
529  int nranks;
530  cc = kmr_sum_on_all_ranks(mr, ((spw->n_spawns > 0) ? 1 : 0), &nranks);
531  assert(cc == MPI_SUCCESS && nranks <= mr->nprocs);
532  spw->n_spawners = nranks;
533  int *usizep;
534  int uflag;
535  cc = MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_UNIVERSE_SIZE, &usizep, &uflag);
536  if (cc != MPI_SUCCESS || uflag == 0) {
537  char ee[80];
538  snprintf(ee, sizeof(ee), "%s: MPI lacks universe size", spw->fn);
539  kmr_error(mr, ee);
540  }
541  spw->usize = *usizep;
542  if (spw->usize <= mr->nprocs) {
543  char ee[80];
544  snprintf(ee, sizeof(ee), "%s: no dynamic processes in universe",
545  spw->fn);
546  kmr_error(mr, ee);
547  }
548  int m = spw->usize - mr->nprocs;
549  if (spw->n_spawners != 0) {
550  m /= spw->n_spawners;
551  }
552  spw->spawn_limit = ((mr->spawn_max_processes != 0)
553  ? MIN(mr->spawn_max_processes, m)
554  : m);
555  if (tracing5) {
556  if (spw->n_spawns > 0) {
557  fprintf(stderr,
558  ";;KMR [%05d] %s: universe-size=%d spawn-limit=%d\n",
559  mr->rank, spw->fn, spw->usize, spw->spawn_limit);
560  fflush(0);
561  }
562  }
563 
564  /* Take MAXPROCS from info if defined. */
565 
566  int maxprocs = -1;
567  {
568  char *infoval = kmr_malloc((size_t)(MPI_MAX_INFO_VAL + 1));
569  int iflag;
570  if (info != MPI_INFO_NULL) {
571  cc = MPI_Info_get(info, "maxprocs", MPI_MAX_INFO_VAL,
572  infoval, &iflag);
573  assert(cc == MPI_SUCCESS);
574  } else {
575  iflag = 0;
576  }
577  if (iflag != 0) {
578  int v;
579  cc = kmr_parse_int(infoval, &v);
580  if (cc == 0 || v < 0) {
581  char ee[80];
582  snprintf(ee, sizeof(ee), "%s: bad value in info maxprocs=%s",
583  spw->fn, infoval);
584  kmr_error(mr, ee);
585  maxprocs = -1;
586  } else {
587  maxprocs = v;
588  }
589  } else {
590  maxprocs = -1;
591  }
592  kmr_free(infoval, (size_t)(MPI_MAX_INFO_VAL + 1));
593  }
594 
595  /* Make the arguments to spawn. */
596 
597  spw->n_processes = 0;
598  for (int w = 0; w < spw->n_spawns; w++) {
599  struct kmr_kv_box kv = spw->ev[w];
600  struct kmr_spawn_state *s = &(spw->spawned[w]);
601  s->running = 0;
602  s->n_procs = -1;
603  s->index = 0;
604  s->count = 0;
605  s->argc = 0;
606  s->argv = 0;
607  s->argc0 = 0;
608  s->argv0 = 0;
609  s->alen = 0;
610  s->abuf = 0;
611  s->icomm = MPI_COMM_NULL;
612  s->watch_port = -1;
613 
614  s->alen = (size_t)kv.vlen;
615  s->abuf = kmr_malloc(s->alen);
616  memcpy(s->abuf, kv.v.p, (size_t)kv.vlen);
617  int maxargc;
618  cc = kmr_scan_argv_strings(mr, s->abuf, s->alen,
619  0, &maxargc, 0,
620  opt.separator_space, spw->fn);
621  assert(cc == MPI_SUCCESS);
622  s->argv0 = kmr_malloc(sizeof(char *) * (size_t)(maxargc + 1));
623  memset(s->argv0, 0, (sizeof(char *) * (size_t)(maxargc + 1)));
624  cc = kmr_scan_argv_strings(mr, s->abuf, s->alen,
625  (maxargc + 1), &s->argc0, s->argv0,
626  opt.separator_space, spw->fn);
627  assert(cc == MPI_SUCCESS);
628  assert(maxargc == s->argc0);
629 
630  /* Check if the "MAXPROCS=" string in the arguments. */
631 
632  if (s->argc0 > 0 && strncmp("maxprocs=", s->argv0[0], 9) == 0) {
633  int v;
634  cc = kmr_parse_int(&s->argv0[0][9], &v);
635  if (cc == 0 || v < 0) {
636  char ee[80];
637  snprintf(ee, sizeof(ee), "%s: bad maxprocs=%s",
638  spw->fn, s->argv0[0]);
639  kmr_error(mr, ee);
640  }
641  s->n_procs = v;
642  s->argc = (s->argc0 - 1);
643  s->argv = (s->argv0 + 1);
644  } else {
645  s->n_procs = maxprocs;
646  s->argc = s->argc0;
647  s->argv = s->argv0;
648  }
649 
650  if (s->argc <= 0) {
651  char ee[80];
652  snprintf(ee, sizeof(ee), "%s: no arguments", spw->fn);
653  kmr_error(mr, ee);
654  }
655  if (s->n_procs <= 0) {
656  char ee[80];
657  snprintf(ee, sizeof(ee), "%s: maxprocs not specified",
658  spw->fn);
659  kmr_error(mr, ee);
660  }
661  if (s->n_procs > spw->spawn_limit) {
662  char ee[80];
663  snprintf(ee, sizeof(ee),
664  "%s: maxprocs too large, (maxprocs=%d limit=%d)",
665  spw->fn, s->n_procs, spw->spawn_limit);
666  kmr_error(mr, ee);
667  }
668 
669  spw->n_processes += s->n_procs;
670  }
671 
672  return MPI_SUCCESS;
673 }
674 
675 static int
676 kmr_free_comm_with_tracing(KMR *mr, struct kmr_spawning *spw,
677  struct kmr_spawn_state *s)
678 {
679  int cc;
680  _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
681  if (s->icomm != MPI_COMM_NULL) {
682  if (tracing5) {
683  ptrdiff_t done = (s - spw->spawned);
684  fprintf(stderr, (";;KMR [%05d] %s [%ld]:"
685  " MPI_Comm_free (could block)...\n"),
686  mr->rank, spw->fn, done);
687  fflush(0);
688  }
689 
690  if (!mr->spawn_disconnect_but_free) {
691  cc = MPI_Comm_free(&(s->icomm));
692  } else {
693  cc = MPI_Comm_disconnect(&(s->icomm));
694  }
695  assert(cc == MPI_SUCCESS);
696 
697  if (tracing5) {
698  ptrdiff_t done = (s - spw->spawned);
699  fprintf(stderr, (";;KMR [%05d] %s [%ld]:"
700  " MPI_Comm_free done\n"),
701  mr->rank, spw->fn, done);
702  fflush(0);
703  }
704  }
705  return MPI_SUCCESS;
706 }
707 
708 /* Makes a listening socket for a watch-program. It fills
709  WATCH_LISTENER (fd) and WATCH_HOST fields in the array of SPAWNS,
710  if successful. */
711 
712 static int
713 kmr_listen_to_watch(KMR *mr, struct kmr_spawning *spw, int index)
714 {
715  assert(sizeof(spw->watch_host) >= 46);
716  int cc;
717  union {
718  struct sockaddr sa;
719  struct sockaddr_in sa4;
720  struct sockaddr_in6 sa6;
721  struct sockaddr_storage ss;
722  } sa;
723  char hostname[MAXHOSTNAMELEN];
724  char address[INET6_ADDRSTRLEN];
725 
726  /* Not use AF_UNSPEC. */
727 
728  int af = mr->spawn_watch_af;
729  assert(af == 0 || af == 4 || af == 6);
730  char *family = (af == 4 ? "AF_INET" : "AF_INET6");
731 
732  assert(spw->watch_listener == -1);
733  const int *ports = mr->spawn_watch_port_range;
734  assert(ports[0] != 0 || ports[1] == 0);
735  for (int port = ports[0]; port <= ports[1]; port++) {
736  if (af == 4) {
737  sa.sa.sa_family = AF_INET;
738  } else if (af == 0 || af == 6) {
739  sa.sa.sa_family = AF_INET6;
740  } else {
741  assert(0);
742  }
743  int fd = socket(sa.sa.sa_family, SOCK_STREAM, 0);
744  if (fd < 0) {
745  char ee[80];
746  char *m = strerror(errno);
747  snprintf(ee, sizeof(ee), "%s: socket(%s) failed: %s",
748  spw->fn, family, m);
749  kmr_error(mr, ee);
750  }
751  int one = 1;
752  cc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
753  if (cc != 0) {
754  char ee[80];
755  char *m = strerror(errno);
756  snprintf(ee, sizeof(ee), "%s: setsockopt(SO_REUSEADDR): %s",
757  spw->fn, m);
758  kmr_warning(mr, 1, ee);
759  }
760 
761  socklen_t salen;
762  if (af == 4) {
763  memset(&sa, 0, sizeof(sa));
764  sa.sa4.sin_family = AF_INET;
765  sa.sa4.sin_addr.s_addr = htonl(INADDR_ANY);
766  sa.sa4.sin_port = htons((uint16_t)port);
767  salen = sizeof(sa.sa4);
768  } else if (af == 0 || af == 6) {
769  memset(&sa, 0, sizeof(sa));
770  sa.sa6.sin6_family = AF_INET6;
771  sa.sa6.sin6_addr = in6addr_any;
772  sa.sa6.sin6_port = htons((uint16_t)port);
773  salen = sizeof(sa.sa6);
774  } else {
775  salen = 0;
776  assert(0);
777  }
778 
779  /* NOTE: Linux returns EINVAL for EADDRINUSE in bind. */
780 
781  cc = bind(fd, &sa.sa, salen);
782  if (cc != 0) {
783  if (errno == EADDRINUSE || errno == EINVAL) {
784  cc = close(fd);
785  assert(cc == 0);
786  continue;
787  } else {
788  char ee[80];
789  char *m = strerror(errno);
790  snprintf(ee, sizeof(ee), "%s: bind(%s, port=%d) failed: %s",
791  spw->fn, family, port, m);
792  kmr_error(mr, ee);
793  }
794  }
795 
796  /* NOTE: Linux may return EADDRINUSE in listen, too. */
797 
798  int backlog = spw->spawn_limit;
799  cc = listen(fd, backlog);
800  if (cc != 0) {
801  if (errno == EADDRINUSE || errno == EINVAL) {
802  cc = close(fd);
803  assert(cc == 0);
804  continue;
805  } else {
806  char ee[80];
807  char *m = strerror(errno);
808  snprintf(ee, sizeof(ee), "%s: listen(%s, port=%d) failed: %s",
809  spw->fn, family, port, m);
810  kmr_error(mr, ee);
811  }
812  }
813  assert(fd != -1);
814  spw->watch_listener = fd;
815  break;
816  }
817 
818  int fd = spw->watch_listener;
819  if (fd == -1) {
820  char ee[80];
821  snprintf(ee, sizeof(ee), "%s: no ports to listen to watch-programs",
822  spw->fn);
823  kmr_error(mr, ee);
824  }
825 
826  /* Get address and port number from the socket. */
827 
828  memset(&sa, 0, sizeof(sa));
829  socklen_t salen = sizeof(sa);
830  cc = getsockname(fd, &sa.sa, &salen);
831  if (cc != 0) {
832  char ee[80];
833  char *m = strerror(errno);
834  snprintf(ee, sizeof(ee), "%s: getsockname() failed: %s",
835  spw->fn, m);
836  kmr_error(mr, ee);
837  }
838 
839  int port = 0;
840  if (sa.sa.sa_family == AF_INET) {
841  port = ntohs(sa.sa4.sin_port);
842  } else if (sa.sa.sa_family == AF_INET6) {
843  port = ntohs(sa.sa6.sin6_port);
844  } else {
845  char ee[80];
846  snprintf(ee, sizeof(ee), "%s: getsockname(): unknown ip family=%d",
847  spw->fn, sa.sa.sa_family);
848  kmr_error(mr, ee);
849  }
850  assert(port != 0);
851 
852  if (mr->spawn_watch_host_name != 0) {
853  cc = snprintf(hostname, sizeof(hostname),
854  "%s", mr->spawn_watch_host_name);
855  assert(cc < (int)sizeof(hostname));
856  } else {
857  cc = gethostname(hostname, sizeof(hostname));
858  if (cc != 0) {
859  char ee[80];
860  char *m = strerror(errno);
861  snprintf(ee, sizeof(ee), "%s: gethostname() failed: %s",
862  spw->fn, m);
863  kmr_error(mr, ee);
864  }
865  }
866 
867  struct addrinfo hints;
868  memset(&hints, 0, sizeof(hints));
869  hints.ai_flags = AI_ADDRCONFIG;
870  hints.ai_socktype = SOCK_STREAM;
871  hints.ai_protocol = IPPROTO_TCP;
872  if (af == 4) {
873  hints.ai_family = AF_INET;
874  } else if (af == 6) {
875  hints.ai_family = AF_INET6;
876  } else if (af == 0) {
877  hints.ai_family = (AF_INET6 | AI_V4MAPPED);
878  } else {
879  assert(0);
880  }
881  struct addrinfo *addrs = 0;
882  cc = getaddrinfo(hostname, 0, &hints, &addrs);
883  if (cc != 0) {
884  char ee[80];
885  const char *m = gai_strerror(cc);
886  snprintf(ee, sizeof(ee), "%s: getaddrinfo(%s) failed: %s",
887  spw->fn, hostname, m);
888  kmr_error(mr, ee);
889  }
890  struct addrinfo *p;
891  for (p = addrs; p != 0; p = p->ai_next) {
892  if (!(p->ai_family == AF_INET || p->ai_family == AF_INET6)) {
893  continue;
894  }
895  if (af == 4 && p->ai_family != AF_INET) {
896  continue;
897  }
898  if (af == 6 && p->ai_family != AF_INET6) {
899  continue;
900  }
901  break;
902  }
903  if (p == 0) {
904  char ee[80];
905  snprintf(ee, sizeof(ee), "%s: getaddrinfo(%s): no address for host",
906  spw->fn, hostname);
907  kmr_error(mr, ee);
908  }
909 
910  if (p->ai_family == AF_INET) {
911  void *inaddr = &(((struct sockaddr_in *)p->ai_addr)->sin_addr);
912  inet_ntop(p->ai_family, inaddr, address, sizeof(address));
913  } else if (p->ai_family == AF_INET6) {
914  void *inaddr = &(((struct sockaddr_in6 *)p->ai_addr)->sin6_addr);
915  inet_ntop(p->ai_family, inaddr, address, sizeof(address));
916  } else {
917  char ee[80];
918  snprintf(ee, sizeof(ee), "%s: getaddrinfo(%s): unknown ip family=%d",
919  spw->fn, hostname, p->ai_family);
920  kmr_error(mr, ee);
921  }
922  freeaddrinfo(addrs);
923 
924  assert(0 <= index && index < spw->n_spawns);
925  struct kmr_spawn_state *s = &(spw->spawned[index]);
926  s->watch_port = port;
927 
928  cc = snprintf(spw->watch_host, sizeof(spw->watch_host),
929  "%s", address);
930  assert(cc < (int)sizeof(spw->watch_host));
931 
932  return MPI_SUCCESS;
933 }
934 
935 /* Waits for connections from the watch-programs of all the spawned
936  processes. It works on spawning one by one. */
937 
938 static int
939 kmr_accept_on_watch(KMR *mr, struct kmr_spawning *spw, int index)
940 {
941  _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
942  assert(0 <= index && index < spw->n_spawns);
943  struct kmr_spawn_state *s = &(spw->spawned[index]);
944  assert(s->n_procs > 0);
945  union {
946  struct sockaddr sa;
947  struct sockaddr_in sa4;
948  struct sockaddr_in6 sa6;
949  struct sockaddr_storage ss;
950  } sa;
951  int cc;
952 
953  assert(spw->watch_listener != -1);
954  int fd0 = spw->watch_listener;
955  for (int count = 0; count < s->n_procs; count++) {
956  for (;;) {
957  nfds_t nfds = 1;
958  struct pollfd fds0, *fds = &fds0;
959  memset(fds, 0, (sizeof(struct pollfd) * nfds));
960  fds[0].fd = fd0;
961  fds[0].events = (POLLIN|POLLPRI);
962  fds[0].revents = 0;
963 
964  assert(mr->spawn_watch_accept_onhold_msec >= (60 * 1000));
965  int msec = mr->spawn_watch_accept_onhold_msec;
966  int nn = poll(fds, nfds, msec);
967  if (nn == 0) {
968  char ee[80];
969  snprintf(ee, sizeof(ee),
970  "%s: accepting watch-programs timed out"
971  " (msec=%d)", spw->fn, msec);
972  kmr_error(mr, ee);
973  } else if (nn < 0 && (errno == EAGAIN || errno == EINTR)) {
974  char ee[80];
975  char *m = strerror(errno);
976  snprintf(ee, sizeof(ee),
977  "%s: poll (for watch-programs) returned: %s",
978  spw->fn, m);
979  kmr_warning(mr, 1, ee);
980  continue;
981  } else if (nn < 0){
982  char ee[80];
983  char *m = strerror(errno);
984  snprintf(ee, sizeof(ee),
985  "%s: poll (for watch-programs) failed: %s",
986  spw->fn, m);
987  kmr_error(mr, ee);
988  }
989  break;
990  }
991 
992  memset(&sa, 0, sizeof(sa));
993  socklen_t salen = sizeof(sa);
994  int fd = accept(fd0, &sa.sa, &salen);
995  if (fd == -1) {
996  char ee[80];
997  char *m = strerror(errno);
998  snprintf(ee, sizeof(ee),
999  "%s: accept (for watch-programs) failed: %s",
1000  spw->fn, m);
1001  kmr_error(mr, ee);
1002  }
1003 
1004  /*setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));*/
1005 
1006  if (tracing5) {
1007  char address[INET6_ADDRSTRLEN];
1008  //int port = 0;
1009  if (sa.sa.sa_family == AF_INET) {
1010  void *inaddr = &sa.sa4.sin_addr;
1011  inet_ntop(sa.sa.sa_family, inaddr, address, sizeof(address));
1012  //port = ntohs(sa.sa4.sin_port);
1013  } else if (sa.sa.sa_family == AF_INET6) {
1014  void *inaddr = &sa.sa6.sin6_addr;
1015  inet_ntop(sa.sa.sa_family, inaddr, address, sizeof(address));
1016  //port = ntohs(sa.sa6.sin6_port);
1017  } else {
1018  char ee[80];
1019  snprintf(ee, sizeof(ee), "%s: accept(): unknown ip family=%d",
1020  spw->fn, sa.sa.sa_family);
1021  kmr_error(mr, ee);
1022  }
1023 
1024  fprintf(stderr, (";;KMR [%05d] %s [%d]:"
1025  " accepting a connection of watch-programs"
1026  " on port=%d from %s (%d/%d)\n"),
1027  mr->rank, spw->fn, index,
1028  s->watch_port, address, (count + 1), s->n_procs);
1029  fflush(0);
1030  }
1031 
1032  int val;
1033  if (count == 0 || mr->spawn_watch_all) {
1034  assert((s->index + count) <= spw->n_processes);
1035  spw->watches[s->index + count] = fd;
1036  /* send 1 when connection is accepted */
1037  val = 1;
1038  } else {
1039  val = 0;
1040  }
1041  ssize_t wsize = write(fd, &val, sizeof(int));
1042  if (wsize < 0) {
1043  char ee[80];
1044  char *m = strerror(errno);
1045  snprintf(ee, sizeof(ee),
1046  "%s: write (for watch-programs) failed: %s",
1047  spw->fn, m);
1048  kmr_error(mr, ee);
1049  }
1050  assert(wsize == sizeof(int));
1051 
1052  int rval;
1053  ssize_t rsize = read(fd, &rval, sizeof(int));
1054  if (rsize < 0) {
1055  char ee[80];
1056  char *m = strerror(errno);
1057  snprintf(ee, sizeof(ee),
1058  "%s: read (for watch-programs) failed: %s",
1059  spw->fn, m);
1060  kmr_error(mr, ee);
1061  }
1062  assert(rsize == sizeof(int));
1063  assert(val == rval);
1064 
1065  if (!(count == 0 || mr->spawn_watch_all)) {
1066  cc = close(fd);
1067  assert(cc == 0);
1068  }
1069  }
1070 
1071  cc = close(spw->watch_listener);
1072  assert(cc == 0);
1073  spw->watch_listener = -1;
1074 
1075  return MPI_SUCCESS;
1076 }
1077 
1078 static int
1079 kmr_receive_for_reply(KMR *mr, struct kmr_spawning *spw,
1080  int w, _Bool replyeach, _Bool replyroot)
1081 {
1082  assert(0 <= w && w < spw->n_spawns);
1083  int cc;
1084  enum kmr_spawn_mode mode = spw->mode;
1085  struct kmr_spawn_state *s = &(spw->spawned[w]);
1086  MPI_Request *reqs = spw->replies;
1087  if (mode == KMR_SPAWN_INTERACT) {
1088  if (replyeach) {
1089  assert(s->index + s->count <= spw->n_processes);
1090  for (int rank = 0; rank < s->count; rank++) {
1091  assert(reqs[s->index + rank] == MPI_REQUEST_NULL);
1092  cc = MPI_Irecv(0, 0, MPI_BYTE,
1093  rank, KMR_TAG_SPAWN_REPLY,
1094  s->icomm, &reqs[s->index + rank]);
1095  assert(cc == MPI_SUCCESS);
1096  assert(reqs[s->index + rank] != MPI_REQUEST_NULL);
1097  }
1098  } else if (replyroot) {
1099  assert(w <= spw->n_processes);
1100  int rank = 0;
1101  assert(reqs[w] == MPI_REQUEST_NULL);
1102  cc = MPI_Irecv(0, 0, MPI_BYTE,
1103  rank, KMR_TAG_SPAWN_REPLY,
1104  s->icomm, &reqs[w]);
1105  assert(cc == MPI_SUCCESS);
1106  assert(reqs[w] != MPI_REQUEST_NULL);
1107  } else {
1108  /*nothing*/
1109  }
1110  } else if (mode == KMR_SPAWN_SERIAL) {
1111  assert(replyeach);
1112  {
1113  assert(s->index + s->count <= spw->n_processes);
1114  for (int rank = 0; rank < s->count; rank++) {
1115  assert(reqs[s->index + rank] == MPI_REQUEST_NULL);
1116  cc = MPI_Irecv(0, 0, MPI_BYTE,
1117  rank, KMR_TAG_SPAWN_REPLY,
1118  s->icomm, &reqs[s->index + rank]);
1119  assert(cc == MPI_SUCCESS);
1120  assert(reqs[s->index + rank] != MPI_REQUEST_NULL);
1121  }
1122  }
1123  } else {
1124  assert(mode == KMR_SPAWN_INTERACT || mode == KMR_SPAWN_SERIAL);
1125  }
1126  return MPI_SUCCESS;
1127 }
1128 
1129 /* Waits for a single reply and checks a finished spawn from the
1130  request index. It returns an index of the n-th spawning or -1 if
1131  nothing has finished. Or, when no replies are expected, it
1132  immediately returns a minimum index of the spawns still running.
1133  The return value is an index of the array SPAWNED. */
1134 
1135 static int
1136 kmr_wait_for_reply(KMR *mr, struct kmr_spawning *spw,
1137  struct kmr_spawn_option opt)
1138 {
1139  _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
1140  int cc;
1141  MPI_Request *reqs = spw->replies;
1142  if (opt.reply_each) {
1143  MPI_Status st;
1144  int index;
1145  int nwait = spw->n_processes;
1146  cc = MPI_Waitany(nwait, reqs, &index, &st);
1147  assert(cc == MPI_SUCCESS && index != MPI_UNDEFINED);
1148  assert(index < spw->n_processes);
1149  assert(reqs[index] == MPI_REQUEST_NULL);
1150  /* Find spawned state from the request index. */
1151  int done = -1;
1152  for (int w = 0; w < spw->n_spawns; w++) {
1153  struct kmr_spawn_state *s = &spw->spawned[w];
1154  if (index < (s->index + s->count)) {
1155  assert(s->index <= index);
1156  done = w;
1157  break;
1158  }
1159  }
1160  assert(done != -1);
1161  struct kmr_spawn_state *s = &spw->spawned[done];
1162  assert(s->running);
1163  int count = (opt.reply_each ? s->count : 1);
1164  int nreplies = 0;
1165  assert((s->index + count) <= spw->n_processes);
1166  for (int j = 0; j < count; j++) {
1167  if (reqs[s->index + j] == MPI_REQUEST_NULL) {
1168  nreplies++;
1169  }
1170  }
1171 
1172  if (tracing5) {
1173  fprintf(stderr, (";;KMR [%05d] %s [%d]: got a reply (%d/%d)\n"),
1174  mr->rank, spw->fn, done, nreplies, count);
1175  fflush(0);
1176  }
1177 
1178  _Bool fin = (nreplies == count);
1179  return (fin ? done : -1);
1180  } else if (opt.reply_root) {
1181  MPI_Status st;
1182  int index;
1183  int nwait = spw->n_spawns;
1184  cc = MPI_Waitany(nwait, reqs, &index, &st);
1185  assert(cc == MPI_SUCCESS && index != MPI_UNDEFINED);
1186  assert(index <= spw->n_spawns);
1187  assert(reqs[index] == MPI_REQUEST_NULL);
1188  int done = index;
1189  assert(0 <= done && done < spw->n_spawns);
1190  struct kmr_spawn_state *s = &spw->spawned[done];
1191  assert(s->running);
1192  assert(reqs[done] == MPI_REQUEST_NULL);
1193 
1194  if (tracing5) {
1195  fprintf(stderr, (";;KMR [%05d] %s [%d]: got a root reply\n"),
1196  mr->rank, spw->fn, done);
1197  fflush(0);
1198  }
1199 
1200  return done;
1201  } else {
1202  int done = -1;
1203  for (int w = 0; w < spw->n_spawns; w++) {
1204  struct kmr_spawn_state *s = &spw->spawned[w];
1205  if (s->running) {
1206  done = w;
1207  break;
1208  }
1209  }
1210 
1211  if (tracing5) {
1212  fprintf(stderr, (";;KMR [%05d] %s [%d]: (no checks of replies)\n"),
1213  mr->rank, spw->fn, done);
1214  fflush(0);
1215  }
1216 
1217  assert(done != -1);
1218  return done;
1219  }
1220 }
1221 
1222 /* Waits for the end of some spawned process. It detects the end by
1223  closure of a socket of the watch-program. It returns an index of
1224  the n-th spawning, or -1 if nothing has finished. It waits for one
1225  to finish, but it may possibly return with nothing with -1. (It
1226  avoids using MPI_STATUS_IGNORE in MPI_Testany() for a bug in some
1227  versions of Open MPI (around 1.6.3)) */
1228 
1229 static int
1230 kmr_wait_for_watch(KMR *mr, struct kmr_spawning *spw,
1231  struct kmr_spawn_option _)
1232 {
1233  _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
1234  int cc;
1235  char garbage[4096];
1236 
1237  int nruns = 0;
1238  for (int w = 0; w < spw->n_spawns; w++) {
1239  struct kmr_spawn_state *s = &(spw->spawned[w]);
1240  if (s->running) {
1241  nruns += s->count;
1242  }
1243  }
1244  assert(nruns != 0 && spw->n_runnings == nruns);
1245 
1246  nfds_t nfds = 0;
1247  for (int i = 0; i < spw->n_processes; i++) {
1248  if (spw->watches[i] != -1) {
1249  nfds++;
1250  }
1251  }
1252  assert(nfds != 0);
1253 
1254  struct pollfd *fds = kmr_malloc(sizeof(struct pollfd) * (size_t)nfds);
1255 
1256  int done = -1;
1257  for (;;) {
1258  memset(fds, 0, (sizeof(struct pollfd) * nfds));
1259  nfds_t fdix = 0;
1260  for (int i = 0; i < spw->n_processes; i++) {
1261  if (spw->watches[i] != -1) {
1262  assert(fdix < nfds);
1263  fds[fdix].fd = spw->watches[i];
1264  fds[fdix].events = (POLLIN|POLLPRI);
1265  fds[fdix].revents = 0;
1266  fdix++;
1267  }
1268  }
1269  assert(fdix == nfds);
1270 
1271  if (tracing5) {
1272  fprintf(stderr, (";;KMR [%05d] %s:"
1273  " waiting for some watch-programs finish\n"),
1274  mr->rank, spw->fn);
1275  fflush(0);
1276  }
1277 
1278  for (;;) {
1279  int msec = 1;
1280  int nn = poll(fds, nfds, msec);
1281  if (nn == 0) {
1282  int index;
1283  int ok;
1284  MPI_Status st;
1285  MPI_Testany(0, 0, &index, &ok, &st);
1286  /*kmr_warning(mr, 1,
1287  "poll (for watch-programs)"
1288  " timed out badly; continuing");*/
1289  continue;
1290  } else if (nn < 0 && (errno == EAGAIN || errno == EINTR)) {
1291  char ee[80];
1292  char *m = strerror(errno);
1293  snprintf(ee, sizeof(ee),
1294  ("poll (for watch-programs) interrupted;"
1295  " continuing: %s"), m);
1296  kmr_warning(mr, 1, ee);
1297  continue;
1298  } else if (nn < 0){
1299  char ee[80];
1300  char *m = strerror(errno);
1301  snprintf(ee, sizeof(ee),
1302  "%s: poll (for watch-programs) failed: %s",
1303  spw->fn, m);
1304  kmr_error(mr, ee);
1305  } else {
1306  break;
1307  }
1308  }
1309 
1310  int fd = -1;
1311  for (nfds_t k = 0; k < nfds; k++) {
1312  if (fds[k].fd != -1 && fds[k].revents != 0) {
1313  fd = fds[k].fd;
1314  break;
1315  }
1316  }
1317  if (fd == -1) {
1318  char ee[80];
1319  snprintf(ee, sizeof(ee), "poll (for watch-programs) no FD found");
1320  kmr_warning(mr, 1, ee);
1321  continue;
1322  }
1323 
1324  int index = -1;
1325  for (int w = 0; w < spw->n_spawns; w++) {
1326  struct kmr_spawn_state *s = &(spw->spawned[w]);
1327  assert((s->index + s->count) <= spw->n_processes);
1328  for (int j = 0; j < s->count; j++) {
1329  if (fd == spw->watches[s->index + j]) {
1330  index = (s->index + j);
1331  done = w;
1332  break;
1333  }
1334  }
1335  }
1336  assert(fd != -1 && index != -1 && done != -1);
1337  assert(0 <= index && index < spw->n_processes);
1338  assert(0 <= done && done < spw->n_spawns);
1339  //struct kmr_spawn_state *s = &(spw->spawned[done]);
1340 
1341  ssize_t rr = read(fd, garbage, sizeof(garbage));
1342  if (rr == 0) {
1343  /* Got EOF. */
1344  assert(fd == spw->watches[index]);
1345  cc = close(fd);
1346  assert(cc == 0);
1347  spw->watches[index] = -1;
1348  break;
1349  } else if (rr > 0) {
1350  /* Read out garbage data. */
1351  continue;
1352  } else if (rr == -1 && (errno == EAGAIN || errno == EINTR)) {
1353  char ee[80];
1354  char *m = strerror(errno);
1355  snprintf(ee, sizeof(ee),
1356  "read (for watch-programs) returned: %s", m);
1357  kmr_warning(mr, 1, ee);
1358  continue;
1359  } else if (rr == -1) {
1360  char ee[80];
1361  char *m = strerror(errno);
1362  snprintf(ee, sizeof(ee),
1363  "%s: read (for watch-programs) failed: %s",
1364  spw->fn, m);
1365  kmr_error(mr, ee);
1366  }
1367  }
1368  assert(done != -1);
1369 
1370  assert(0 <= done && done < spw->n_spawns);
1371  struct kmr_spawn_state *s = &(spw->spawned[done]);
1372  int count = ((mr->spawn_watch_all) ? s->count : 1);
1373  int nreplies = 0;
1374  assert((s->index + count) <= spw->n_processes);
1375  for (int j = 0; j < count; j++) {
1376  if (spw->watches[s->index + j] == -1) {
1377  nreplies++;
1378  }
1379  }
1380 
1381  if (tracing5) {
1382  fprintf(stderr, (";;KMR [%05d] %s [%d]:"
1383  " detected a watch done (%d/%d)\n"),
1384  mr->rank, spw->fn, done, nreplies, count);
1385  fflush(0);
1386  }
1387 
1388  _Bool fin = (nreplies == count);
1389  if (fin) {
1390  if (s->icomm != MPI_COMM_NULL) {
1391  cc = kmr_free_comm_with_tracing(mr, spw, s);
1392  assert(cc == MPI_SUCCESS);
1393  }
1394  }
1395 
1396  kmr_free(fds, (sizeof(struct pollfd) * (size_t)nfds));
1397  return (fin ? done : -1);
1398 }
1399 
1400 static int
1401 kmr_wait_then_map(KMR *mr, struct kmr_spawning *spw,
1402  KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
1403  struct kmr_spawn_option opt, kmr_mapfn_t m)
1404 {
1405  int cc;
1406  enum kmr_spawn_mode mode = spw->mode;
1407  int done;
1408  if (mode == KMR_SPAWN_INTERACT) {
1409  done = kmr_wait_for_reply(mr, spw, opt);
1410  } else if (mode == KMR_SPAWN_SERIAL) {
1411  done = kmr_wait_for_reply(mr, spw, opt);
1412  } else if (mode == KMR_SPAWN_PARALLEL) {
1413  done = kmr_wait_for_watch(mr, spw, opt);
1414  } else {
1415  assert(0);
1416  done = -1;
1417  }
1418  if (done != -1) {
1419  assert(0 <= done && done < spw->n_spawns);
1420  struct kmr_spawn_state *s = &(spw->spawned[done]);
1421  s->timestamp[3] = MPI_Wtime();
1422  if (m != 0) {
1423  if (mr->spawn_pass_intercomm_in_argument
1424  && mode == KMR_SPAWN_INTERACT) {
1425  assert(mr->spawn_comms != 0);
1426  assert(mr->spawn_comms[done] == &(s->icomm));
1427  struct kmr_spawn_info si;
1428  kmr_spawn_info_put(&si, s, opt, arg);
1429  cc = (*m)(spw->ev[done], kvi, kvo, &si, done);
1430  if (cc != MPI_SUCCESS) {
1431  char ee[80];
1432  snprintf(ee, sizeof(ee),
1433  "Map-fn returned with error cc=%d", cc);
1434  kmr_error(mr, ee);
1435  }
1436  kmr_spawn_info_get(&si, s);
1437  } else {
1438  cc = (*m)(spw->ev[done], kvi, kvo, arg, done);
1439  if (cc != MPI_SUCCESS) {
1440  char ee[80];
1441  snprintf(ee, sizeof(ee),
1442  "Map-fn returned with error cc=%d", cc);
1443  kmr_error(mr, ee);
1444  }
1445  }
1446  }
1447  s->timestamp[4] = MPI_Wtime();
1448  if (s->icomm != MPI_COMM_NULL) {
1449  cc = kmr_free_comm_with_tracing(mr, spw, s);
1450  assert(cc == MPI_SUCCESS);
1451  }
1452  s->timestamp[5] = MPI_Wtime();
1453  assert(s->running);
1454  s->running = 0;
1455  spw->n_runnings -= s->count;
1456  if (kmr_ckpt_enabled(mr)) {
1457  kmr_ckpt_save_kvo_each_add(mr, kvo, done);
1458  }
1459  }
1460  return MPI_SUCCESS;
1461 }
1462 
1463 /* NOTE: MPI_Comm_spawn() may fail due to a race between an issue of
1464  spawning and job scheduling. Although KMR adds a sleep between
1465  spawning, still, the users can call spawning-mappers without a gap.
1466  It retries regardless of the cause of an error (e.g., timing or bad
1467  arguments), because it cannot tell from the return code of
1468  MPI_Comm_spawn() (always MPI_ERR_SPAWN). */
1469 
1470 static int
1471 kmr_map_spawned_processes(enum kmr_spawn_mode mode, char *name,
1472  KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
1473  MPI_Info info,
1474  struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
1475 {
1476  kmr_assert_kvs_ok(kvi, kvo, 1, 0);
1477  assert(kvi->c.value_data == KMR_KV_OPAQUE
1478  || kvi->c.value_data == KMR_KV_CSTRING);
1479  assert(kvi->c.element_count <= INT_MAX);
1480  _Bool use_reply = (mode == KMR_SPAWN_INTERACT || mode == KMR_SPAWN_SERIAL);
1481  _Bool use_watch = (mode != KMR_SPAWN_INTERACT);
1482  KMR * const mr = kvi->c.mr;
1483  _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
1484  struct kmr_spawning spawning0;
1485  struct kmr_spawning *spw = &spawning0;
1486  char magic[20];
1487  char hostport[MAXHOSTNAMELEN + 10];
1488  int from = 0;
1489  int cc;
1490 
1491  if (use_watch) {
1492  int kcdc = kmr_ckpt_disable_ckpt(mr);
1493  cc = kmr_install_watch_program(mr, name);
1494  assert(cc == MPI_SUCCESS);
1495  kmr_ckpt_enable_ckpt(mr, kcdc);
1496  }
1497 
1498  if (mr->spawn_self == MPI_COMM_NULL) {
1499  if (mr->spawn_retry_limit > 0) {
1500  cc = MPI_Comm_dup(MPI_COMM_SELF, &mr->spawn_self);
1501  if (cc != MPI_SUCCESS) {
1502  kmr_error_mpi(mr, "MPI_Comm_dup(MPI_COMM_SELF)", cc);
1503  MPI_Abort(MPI_COMM_WORLD, 1);
1504  }
1505 #if (MPI_VERSION == 3)
1506  cc = MPI_Comm_set_errhandler(mr->spawn_self, MPI_ERRORS_RETURN);
1507 #else
1508  cc = MPI_Errhandler_set(mr->spawn_self, MPI_ERRORS_RETURN);
1509 #endif
1510  if (cc != MPI_SUCCESS) {
1511  kmr_error_mpi(mr, "MPI_Errhandler_set()", cc);
1512  MPI_Abort(MPI_COMM_WORLD, 1);
1513  }
1514  } else {
1515  mr->spawn_self = MPI_COMM_SELF;
1516  }
1517  }
1518 
1519  int cnt = (int)kvi->c.element_count;
1520  memset(spw, 0, sizeof(struct kmr_spawning));
1521  spw->fn = name;
1522  spw->mode = mode;
1523  spw->n_spawns = cnt;
1524  spw->n_starteds = 0;
1525  spw->n_runnings = 0;
1526  spw->spawned = kmr_malloc(sizeof(struct kmr_spawn_state) * (size_t)spw->n_spawns);
1527  spw->ev = kmr_malloc(sizeof(struct kmr_kv_box) * (size_t)spw->n_spawns);
1528  spw->watch_listener = -1;
1529  spw->watch_host[0] = 0;
1530 
1531  int kcdc = kmr_ckpt_disable_ckpt(mr);
1532  cc = kmr_list_spawns(spw, kvi, info, opt);
1533  assert(cc == MPI_SUCCESS);
1534  kmr_ckpt_enable_ckpt(mr, kcdc);
1535 
1536  if (kmr_ckpt_enabled(mr)) {
1537  struct kmr_option kopt = kmr_noopt;
1538  if (opt.take_ckpt) {
1539  kopt.take_ckpt = 1;
1540  }
1541  if (kmr_ckpt_progress_init(kvi, kvo, kopt)) {
1542  if (kvo != 0) {
1543  /* No "keep_open" option (!opt.keep_open). */
1544  kmr_add_kv_done(kvo);
1545  }
1546  if (1) {
1547  /* No "inspect" option (!opt.inspect). */
1548  kmr_free_kvs(kvi);
1549  }
1550  return MPI_SUCCESS;
1551  }
1552  from = (int)kmr_ckpt_first_unprocessed_kv(mr);
1553  kmr_ckpt_save_kvo_each_init(mr, kvo);
1554  }
1555 
1556  if (use_reply) {
1557  assert(spw->replies == 0);
1558  spw->replies = kmr_malloc(sizeof(MPI_Request) * (size_t)spw->n_processes);
1559  for (int i = 0; i < spw->n_processes; i++) {
1560  spw->replies[i] = MPI_REQUEST_NULL;
1561  }
1562  }
1563  if (mode == KMR_SPAWN_PARALLEL) {
1564  assert(spw->watches == 0);
1565  spw->watches = kmr_malloc(sizeof(int) * (size_t)spw->n_processes);
1566  for (int i = 0; i < spw->n_processes; i++) {
1567  spw->watches[i] = -1;
1568  }
1569  }
1570  assert(mr->spawn_comms == 0);
1571  mr->spawn_size = spw->n_spawns;
1572  mr->spawn_comms = kmr_malloc(sizeof(MPI_Comm *) * (size_t)spw->n_spawns);
1573  for (int w = 0; w < spw->n_spawns; w++) {
1574  mr->spawn_comms[w] = &(spw->spawned[w].icomm);
1575  }
1576 
1577  int gap;
1578  if (mr->spawn_gap_msec[0] == 0) {
1579  gap = 0;
1580  } else {
1581  int usz = 0;
1582  unsigned int v = (unsigned int)spw->usize;
1583  while (v > 0) {
1584  v = (v >> 1);
1585  usz++;
1586  }
1587  gap = (int)((((long)mr->spawn_gap_msec[1] * usz) / 10)
1588  + mr->spawn_gap_msec[0]);
1589  }
1590 
1591  /* Spawn by each entry. */
1592 
1593  for (int w = from; w < spw->n_spawns; w++) {
1594  struct kmr_spawn_state *s = &(spw->spawned[w]);
1595 
1596  /* Wait while no more processes are available. */
1597 
1598  if ((spw->n_runnings + s->n_procs) > spw->spawn_limit) {
1599  while ((spw->n_runnings + s->n_procs) > spw->spawn_limit) {
1600  cc = kmr_wait_then_map(mr, spw,
1601  kvi, kvo, arg, opt, mapfn);
1602  assert(cc == MPI_SUCCESS);
1603  }
1604  if (gap != 0) {
1605  if (tracing5) {
1606  fprintf(stderr,
1607  ";;KMR [%05d] %s: sleeping for spawn gap"
1608  " (%d msec)\n",
1609  mr->rank, spw->fn, gap);
1610  fflush(0);
1611  }
1612  kmr_msleep(gap, 1);
1613  }
1614  }
1615 
1616  if (mode == KMR_SPAWN_PARALLEL) {
1617  cc = kmr_listen_to_watch(mr, spw, w);
1618  assert(cc == MPI_SUCCESS);
1619  } else {
1620  cc = snprintf(spw->watch_host, sizeof(spw->watch_host), "0");
1621  assert(cc < (int)sizeof(spw->watch_host));
1622  }
1623 
1624  /* Modify the command line by moving "key=value" entries to
1625  the info. An empty key or value is regarded as an end of
1626  the entries. */
1627 
1628  MPI_Info infox;
1629  char **argv1;
1630  argv1 = s->argv;
1631  if (!opt.no_set_infos && strchr(argv1[0], '=') != 0) {
1632  if (info == MPI_INFO_NULL) {
1633  cc = MPI_Info_create(&infox);
1634  assert(cc == MPI_SUCCESS);
1635  } else {
1636  cc = MPI_Info_dup(info, &infox);
1637  assert(cc == MPI_SUCCESS);
1638  }
1639 
1640  char *sep;
1641  while ((sep = strchr(argv1[0], '=')) != 0) {
1642  char *k = argv1[0];
1643  char *v = (sep + 1);
1644  if (k != sep && v[0] != 0) {
1645  /* (It ignores an empty entry). */
1646  assert(*sep == '=');
1647  *sep = '\0';
1648  cc = MPI_Info_set(infox, k, v);
1649  assert(cc == MPI_SUCCESS);
1650  *sep = '=';
1651  }
1652  argv1++;
1653  if (!(k != sep && v[0] != 0)) {
1654  break;
1655  }
1656  }
1657  } else {
1658  infox = info;
1659  }
1660 
1661  /* Modify the command line to use the watch program. */
1662 
1663  char **argv2;
1664  int argc2;
1665  if (!use_watch) {
1666  argv2 = argv1;
1667  } else {
1668  int argc1;
1669  for (argc1 = 0; argv1[argc1] != 0; argc1++);
1670 
1671  argc2 = (argc1 + 5);
1672  argv2 = kmr_malloc(sizeof(char *) * (size_t)(argc2 + 1));
1673 
1674  cc = snprintf(hostport, sizeof(hostport),
1675  "%s/%d", spw->watch_host, s->watch_port);
1676  assert(cc < (int)sizeof(hostport));
1677 
1678  unsigned int vv = (unsigned int)random();
1679  cc = snprintf(magic, sizeof(magic), "%08xN%dV0%s",
1680  vv, w, ((mr->trace_map_spawn) ? "T1" : ""));
1681  assert(cc < (int)sizeof(magic));
1682 
1683  assert(mr->spawn_watch_program != 0);
1684  argv2[0] = mr->spawn_watch_program;
1685  argv2[1] = ((mode == KMR_SPAWN_SERIAL) ? "seq" : "mpi");
1686  argv2[2] = hostport;
1687  argv2[3] = magic;
1688  argv2[4] = "--";
1689  for (int i = 0; i < argc1; i++) {
1690  argv2[5 + i] = argv1[i];
1691  }
1692  argv2[(argc1 + 5)] = 0;
1693  }
1694 
1695  {
1696  if (tracing5) {
1697  char ee0[160];
1698  char ee1[160];
1699  kmr_make_printable_argv_string(ee0, sizeof(ee0), argv2);
1700  kmr_make_printable_info_string(ee1, sizeof(ee1), infox);
1701  fprintf(stderr, (";;KMR [%05d] %s [%d]: MPI_Comm_spawn"
1702  " (maxprocs=%d;%s;info:%s)\n"),
1703  mr->rank, spw->fn, w, s->n_procs, ee0, ee1);
1704  fflush(0);
1705  }
1706 
1707  s->timestamp[0] = MPI_Wtime();
1708 
1709  int nspawns;
1710  assert(s->icomm == MPI_COMM_NULL);
1711  int *ec = kmr_malloc(sizeof(int) * (size_t)s->n_procs);
1712  const int root = 0;
1713 
1714  /* RETRY WHEN SPAWNING FAILS. */
1715 
1716  int trys;
1717  trys = (mr->spawn_retry_limit + 1);
1718  assert(trys > 0);
1719  for (;;) {
1720  cc = MPI_Comm_spawn(argv2[0], &(argv2[1]), s->n_procs, infox,
1721  root, mr->spawn_self, &s->icomm, ec);
1722  if (cc != MPI_SUCCESS) {
1723  int xcc;
1724  xcc = MPI_SUCCESS;
1725  MPI_Error_class(cc, &xcc);
1726  assert(xcc == MPI_SUCCESS || xcc == MPI_ERR_SPAWN);
1727  }
1728  trys--;
1729  if (cc == MPI_SUCCESS || trys == 0) {
1730  break;
1731  }
1732  if (tracing5) {
1733  fprintf(stderr,
1734  ";;KMR [%05d] %s: sleeping after spawn failure"
1735  " (%d msec)\n",
1736  mr->rank, spw->fn, mr->spawn_retry_gap_msec);
1737  fflush(0);
1738  }
1739  kmr_msleep(mr->spawn_retry_gap_msec, 1);
1740  }
1741 
1742  /* NOT handle SOFT-ERROR case at all. */
1743 
1744  if (cc != MPI_SUCCESS) {
1745  nspawns = 0;
1746  kmr_error_mpi(mr, "MPI_Comm_spawn()", cc);
1747  MPI_Abort(MPI_COMM_WORLD, 1);
1748  } else {
1749  nspawns = s->n_procs;
1750  }
1751  assert(nspawns > 0);
1752 
1753  s->timestamp[1] = MPI_Wtime();
1754 
1755  kmr_free(ec, (sizeof(int) * (size_t)s->n_procs));
1756  if (argv2 != argv1) {
1757  kmr_free(argv2, (sizeof(char *) * (size_t)(argc2 + 1)));
1758  }
1759  argv2 = 0;
1760 
1761  s->running = 1;
1762  s->index = spw->n_starteds;
1763  s->count = nspawns;
1764  spw->n_starteds += nspawns;
1765  spw->n_runnings += nspawns;
1766 
1767  if (infox != info) {
1768  cc = MPI_Info_free(&infox);
1769  assert(cc == MPI_SUCCESS);
1770  }
1771  }
1772 
1773  if (mode == KMR_SPAWN_PARALLEL) {
1774  cc = kmr_accept_on_watch(mr, spw, w);
1775  assert(cc == MPI_SUCCESS);
1776  }
1777 
1778  if (mr->spawn_disconnect_early && mode == KMR_SPAWN_PARALLEL) {
1779  if (s->icomm != MPI_COMM_NULL) {
1780  cc = kmr_free_comm_with_tracing(mr, spw, s);
1781  assert(cc == MPI_SUCCESS);
1782  }
1783  }
1784 
1785  if (mr->spawn_sync_at_startup && s->icomm != MPI_COMM_NULL) {
1786  int flag;
1787  cc = MPI_Comm_test_inter(s->icomm, &flag);
1788  assert(cc == MPI_SUCCESS && flag != 0);
1789  int peernprocs;
1790  cc = MPI_Comm_remote_size(s->icomm, &peernprocs);
1791  assert(cc == MPI_SUCCESS && peernprocs == s->count);
1792  }
1793 
1794  s->timestamp[2] = MPI_Wtime();
1795 
1796  if (use_reply) {
1797  cc = kmr_receive_for_reply(mr, spw, w,
1798  opt.reply_each, opt.reply_root);
1799  assert(cc == MPI_SUCCESS);
1800  }
1801  }
1802 
1803  while (spw->n_runnings > 0) {
1804  cc = kmr_wait_then_map(mr, spw,
1805  kvi, kvo, arg, opt, mapfn);
1806  assert(cc == MPI_SUCCESS);
1807  }
1808 
1809  if (tracing5) {
1810  for (int w = 0; w < spw->n_spawns; w++) {
1811  struct kmr_spawn_state *s = &(spw->spawned[w]);
1812  fprintf(stderr, (";;KMR [%05d] %s [%d/%d]"
1813  " timing:"
1814  " spawn=%f setup=%f run=%f mapfn=%f clean=%f"
1815  " (msec)\n"),
1816  mr->rank, spw->fn, w, spw->n_spawns,
1817  ((s->timestamp[1] - s->timestamp[0]) * 1e3),
1818  ((s->timestamp[2] - s->timestamp[1]) * 1e3),
1819  ((s->timestamp[3] - s->timestamp[2]) * 1e3),
1820  ((s->timestamp[4] - s->timestamp[3]) * 1e3),
1821  ((s->timestamp[5] - s->timestamp[4]) * 1e3));
1822  fflush(0);
1823  }
1824  }
1825 
1826  assert(mr->spawn_comms != 0);
1827  mr->spawn_size = 0;
1828  kmr_free(mr->spawn_comms, (sizeof(MPI_Comm *) * (size_t)spw->n_spawns));
1829  mr->spawn_comms = 0;
1830 
1831  for (int w = 0; w < spw->n_spawns; w++) {
1832  struct kmr_spawn_state *s = &(spw->spawned[w]);
1833  assert(s->icomm == MPI_COMM_NULL);
1834  assert(s->abuf != 0);
1835  kmr_free(s->abuf, s->alen);
1836  s->abuf = 0;
1837  assert(s->argv0 != 0);
1838  kmr_free(s->argv0, (sizeof(char *) * (size_t)(s->argc0 + 1)));
1839  s->argv0 = 0;
1840  }
1841 
1842  assert(spw->ev != 0);
1843  kmr_free(spw->ev, (sizeof(struct kmr_kv_box) * (size_t)spw->n_spawns));
1844  spw->ev = 0;
1845  assert(spw->spawned != 0);
1846  kmr_free(spw->spawned, (sizeof(struct kmr_spawn_state) * (size_t)spw->n_spawns));
1847  spw->spawned = 0;
1848 
1849  if (use_reply) {
1850  assert(spw->replies != 0);
1851  for (int i = 0; i < spw->n_processes; i++) {
1852  assert(spw->replies[i] == MPI_REQUEST_NULL);
1853  }
1854  kmr_free(spw->replies, (sizeof(MPI_Request) * (size_t)spw->n_processes));
1855  spw->replies = 0;
1856  }
1857  if (mode == KMR_SPAWN_PARALLEL) {
1858  assert(spw->watches != 0);
1859  for (int i = 0; i < spw->n_processes; i++) {
1860  assert(spw->watches[i] == -1);
1861  }
1862  kmr_free(spw->watches, (sizeof(int) * (size_t)spw->n_processes));
1863  spw->watches = 0;
1864  }
1865 
1866  assert(spw->watch_listener == -1);
1867 
1868  if (kmr_ckpt_enabled(mr)) {
1869  kmr_ckpt_save_kvo_each_fin(mr, kvo);
1870  }
1871 
1872  if (kvo != 0) {
1873  /* No "keep_open" option (!opt.keep_open). */
1874  kmr_add_kv_done(kvo);
1875  }
1876  if (1) {
1877  /* No "inspect" option (!opt.inspect). */
1878  kmr_free_kvs(kvi);
1879  }
1880 
1881  if (kmr_ckpt_enabled(mr)) {
1883  }
1884 
1885  return MPI_SUCCESS;
1886 }
1887 
1888 /** Sends a reply message in the spawned process, which tells it is
1889  ready to finish and may have some data to send to the spawner in
1890  kmr_map_via_spawn(). */
1891 
1892 int
1894 {
1895  int cc;
1896  MPI_Comm ic = MPI_COMM_NULL;
1897  cc = MPI_Comm_get_parent(&ic);
1898  assert(cc == MPI_SUCCESS);
1899  if (ic == MPI_COMM_NULL) {
1900  kmr_error(mr, ("kmr_reply_to_spawner:"
1901  " may be called in a not-spawned process"));
1902  }
1903  int peer = 0;
1904  cc = MPI_Send(0, 0, MPI_BYTE, peer, KMR_TAG_SPAWN_REPLY, ic);
1905  assert(cc == MPI_SUCCESS);
1906  return MPI_SUCCESS;
1907 }
1908 
1909 /** Obtains (a reference to) a parent inter-communicator of a spawned
1910  process. It is used inside a map-function of kmr_map_via_spawn();
1911  Pass INDEX the same argument to a map-function. It returns a
1912  reference for the side-effect of freeing a communicator in a
1913  map-function. */
1914 
1915 MPI_Comm *
1917 {
1918  if (mr->spawn_comms == 0) {
1919  kmr_error(mr, ("kmr_get_spawner_communicator() be called"
1920  " outside of kmr_map_via_spawn()"));
1921  }
1922  if (index >= mr->spawn_size) {
1923  kmr_error(mr, ("kmr_get_spawner_communicator() be called"
1924  " with index out of range"));
1925  }
1926  MPI_Comm *comm = mr->spawn_comms[index];
1927  return comm;
1928 }
1929 
1930 int
1931 kmr_get_spawner_communicator_ff(KMR *mr, long ii, int *comm)
1932 {
1933  MPI_Comm *c = kmr_get_spawner_communicator(mr, ii);
1934  *comm = MPI_Comm_c2f(*c);
1935  return MPI_SUCCESS;
1936 }
1937 
1938 /** Maps on processes started by MPI_Comm_spawn(). It is intended to
1939  run custom MPI programs which will return a reply as MPI messages.
1940  Consider other variations to run independent processes, when the
1941  spawned processes will not interact with the parent:
1942  kmr_map_serial_processes(), kmr_map_parallel_processes(), or
1943  kmr_map_ms_commands().\n A spawner (parent) spawns processes
1944  specified by key-value pairs. The key part is ignored, and the
1945  value part is a list of null-separated strings which constitutes a
1946  command and arguments. The option SEPARATOR_SPACE changes the
1947  separator character to whitespaces. If the first string is
1948  "maxprocs=n", then the number of processes is taken from this
1949  string. Or, an MPI_Info entry "maxprocs" in INFO is used, in
1950  which case "maxprocs" is common to all spawns. It is an error if
1951  neither is specified. A spawner tries to control the
1952  simultaneously running processes limited to the number of
1953  processes in the universe. When multiple spawners are active
1954  (more than one ranks have the entries to spawn), they divide the
1955  universe evenly among them.\n The option REPLY_EACH or REPLY_ROOT
1956  lets a spawner wait for reply messages from the spawned processes,
1957  and then the spawner calls a map-function. A reply message is of
1958  the tag KMR_TAG_SPAWN_REPLY=500 and length zero, and
1959  kmr_reply_to_spawner() can be used to send this reply. When none
1960  of REPLY_EACH or REPLY_ROOT are specified, the spawner immediately
1961  calls a map-function one-by-one in the FIFO order (before the
1962  spawned processes finish). In that case, no load-balance is
1963  taken. Thus, the map-function should wait for the spawned
1964  processes to finish, otherwise, a spawner starts next spawns
1965  continuously and runs out the processes, which causes the MPI
1966  runtime to signal an error.\n Communication between the spawned
1967  processes and a map-function of a spawner is through an
1968  inter-communicator. The parent inter-communicator of the spawned
1969  processes can be taken by MPI_Comm_get_parent() as usual. The
1970  inter-communicator at the spawner side can be obtained by calling
1971  kmr_get_spawner_communicator() inside a map-function.\n The INFO
1972  argument is passed to MPI_Comm_spawn() after inserting the entries
1973  which appear in the command line, when the command line has
1974  prefixes of the form "key=value". Insertion of the prefixes can
1975  be terminated by an empty entry "=". Use of info is discouraged,
1976  because it is not portable and may contradicts to the implicit
1977  assumption of the KMR implementation.\n NOTE: There is no way to
1978  check the availability of processes for spawning in the MPI
1979  specification and MPI implementations. And, the MPI runtime
1980  signals errors when it runs out the processes. Thus, it puts a
1981  sleep (1 sec) in between MPI_Comm_spawn() calls to allow clean-ups
1982  in the MPI runtime and to avoid timing issues.\n INTERFACE CHANGE:
1983  Set mr->spawn_pass_intercomm_in_argument=1 to enables the old
1984  interface, where the map-function MAPFN is called with the
1985  kmr_spawn_state structure as the general argument. The argument
1986  ARG passed to the mapper is stored in the MAPARG slot in the
1987  kmr_spawn_state structure. When TAKE_CKPT option is specified, a
1988  checkpoint data file of the output key-value stream is saved if
1989  both CKPT_ENABLE and CKPT_SELECTIVE global options are set. */
1990 
1991 int
1992 kmr_map_via_spawn(KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
1993  MPI_Info info, struct kmr_spawn_option opt,
1994  kmr_mapfn_t mapfn)
1995 {
1996  int cc = kmr_map_spawned_processes(KMR_SPAWN_INTERACT,
1997  "kmr_map_via_spawn",
1998  kvi, kvo, arg, info, opt, mapfn);
1999  return cc;
2000 }
2001 
2002 /** Maps on processes started by MPI_Comm_spawn() to run independent
2003  MPI processes, which will not communicate to the parent. The
2004  programs need to be MPI. It is a variation of
2005  kmr_map_via_spawn(), and refer to the comments on it for the basic
2006  usage. Since the spawned program does not know the parent, there
2007  is no way to communicate from the spawner. The map-function is
2008  called after the processes have exited, so that the map-function
2009  can check the result files created by the spawned processes.\n
2010  This function detects the end of spawned processes using a
2011  watch-program "kmrwatch0", by checking a closure of a socket to
2012  which "kmrwatch0" connected.\n NOTE THAT THIS OPERATION WILL BLOCK
2013  INDEFINITELY AND FAIL, DEPENDING ON THE BEHAVIOR OF AN MPI
2014  IMPLEMENTATION. It is checked to work with Open MPI (1.6) and
2015  MPICH2 (1.5), but not with Intel MPI (4.1) and YAMPI2 (GridMPI
2016  2.1). It depends on the behavior that MPI_Comm_free() on the
2017  parent and MPI_Finalize() on the child do not synchronize. The
2018  quote of the standard (MPI 2.x) says: "Though collective,
2019  MPI_Comm_free is anticipated that this operation will normally be
2020  implemented to be local, ..." The blocking situation can be
2021  checked by enabling tracing around calls to MPI_Comm_free() by
2022  (mr->trace_map_spawn=1).\n NOTE (on MPI spawn implementations):
2023  Open MPI (1.6) allows to spawn non-MPI processes by passing an
2024  special MPI_Info. MPICH2 (1.5) does not allow to spawn non-MPI
2025  processes, because MPI_Comm_spawn() of the parent and MPI_Init()
2026  of the child synchronize. In Intel MPI (4.1) and YAMPI2
2027  (GridMPI), the calls of MPI_Comm_free() on the parent and
2028  MPI_Finalize() or MPI_Comm_free() on the child synchronize, and
2029  thus, they require to call MPI_Comm_free() at an appropriate time
2030  on the parent.\n Options REPLY_ROOT and REPLY_EACH have no
2031  effect.
2032  When TAKE_CKPT option is specified, a checkpoint data file of the
2033  output key-value stream is saved if both CKPT_ENABLE and
2034  CKPT_SELECTIVE global options are set. */
2035 
2036 int
2038  MPI_Info info, struct kmr_spawn_option opt,
2039  kmr_mapfn_t mapfn)
2040 {
2041  struct kmr_spawn_option ssopt = opt;
2042  ssopt.reply_root = 0;
2043  ssopt.reply_each = 1;
2044  int cc = kmr_map_spawned_processes(KMR_SPAWN_PARALLEL,
2045  "kmr_map_parallel_processes",
2046  kvi, kvo, arg, info, ssopt, mapfn);
2047  return cc;
2048 }
2049 
2050 /** Maps on processes started by MPI_Comm_spawn() to run serial
2051  processes. This should NOT be used; Use kmr_map_ms_commands(),
2052  instead. Fork-execing in kmr_map_ms_commands() is simpler than
2053  spawning. See also the comments on kmr_map_via_spawn() and
2054  kmr_map_parallel_processes(). The map-function is called after
2055  the processes have exited, thus, there is no way to communicate
2056  from the map-function. Instead, the map-function can check the
2057  result files created by the spawned processes.\n This function
2058  detects the end of spawned processes using a watch-program
2059  "kmrwatch0" which sends a reply to the parent in place of the
2060  serial program. Options REPLY_ROOT and REPLY_EACH have no
2061  effect.
2062  When TAKE_CKPT option is specified, a checkpoint data file of the
2063  output key-value stream is saved if both CKPT_ENABLE and
2064  CKPT_SELECTIVE global options are set. */
2065 
2066 int
2068  MPI_Info info, struct kmr_spawn_option opt,
2069  kmr_mapfn_t mapfn)
2070 {
2071  struct kmr_spawn_option ssopt = opt;
2072  ssopt.reply_root = 0;
2073  ssopt.reply_each = 1;
2074  int cc = kmr_map_spawned_processes(KMR_SPAWN_SERIAL,
2075  "kmr_map_serial_processes",
2076  kvi, kvo, arg, info, ssopt, mapfn);
2077  return cc;
2078 }
2079 
2080 /** Maps on processes started by MPI_Comm_spawn() to run independent
2081  processes. It either calls kmr_map_parallel_processes() or
2082  kmr_map_serial_processes() with regard to the NONMPI argument.
2083  See the comments of kmr_map_parallel_processes() and
2084  kmr_map_serial_processes(). */
2085 
2086 int
2087 kmr_map_processes(_Bool nonmpi, KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
2088  MPI_Info info, struct kmr_spawn_option opt,
2089  kmr_mapfn_t mapfn)
2090 {
2091  KMR *mr = kvi->c.mr;
2092  if (opt.reply_root || opt.reply_each) {
2093  kmr_error(mr, "kmr_map_processes:"
2094  " options REPLY_ROOT/REPLY_EACH not allowed");
2095  }
2096 
2097  struct kmr_spawn_option ssopt = opt;
2098  ssopt.reply_root = 0;
2099  ssopt.reply_each = 1;
2100  if (nonmpi) {
2101  int cc = kmr_map_spawned_processes(KMR_SPAWN_SERIAL,
2102  "kmr_map_processes",
2103  kvi, kvo, arg, info, ssopt, mapfn);
2104  return cc;
2105  } else {
2106  int cc = kmr_map_spawned_processes(KMR_SPAWN_PARALLEL,
2107  "kmr_map_processes",
2108  kvi, kvo, arg, info, ssopt, mapfn);
2109  return cc;
2110  }
2111 }
2112 
2113 /* Creates a dummy context in spawned processes. It only be used to
2114  make KVS for adding elements. */
2115 
2116 KMR *
2117 kmr_create_dummy_context(void)
2118 {
2119  KMR *mr = kmr_create_context(MPI_COMM_SELF, MPI_INFO_NULL, 0);
2120  return mr;
2121 }
2122 
2123 /** Sends the KVS from a spawned process to the map-function of the
2124  spawner. It is paired with kmr_receive_kvs_from_spawned_fn(). */
2125 
2126 int
2128 {
2129  int cc;
2130  MPI_Comm ic = MPI_COMM_NULL;
2131  cc = MPI_Comm_get_parent(&ic);
2132  assert(cc == MPI_SUCCESS);
2133  if (ic == MPI_COMM_NULL) {
2134  kmr_error(mr, ("kmr_send_kvs_to_spawner:"
2135  " may be called in a not-spawned process"));
2136  }
2137  void *data = 0;
2138  size_t sz = 0;
2139  cc = kmr_save_kvs(kvs, &data, &sz, kmr_noopt);
2140  assert(cc == MPI_SUCCESS && data != 0 && sz != 0);
2141  int siz = (int)sz;
2142  int peer = 0;
2143  cc = MPI_Send(&siz, 1, MPI_INT, peer, KMR_TAG_SPAWN_REPLY1, ic);
2144  assert(cc == MPI_SUCCESS);
2145  cc = MPI_Send(data, (int)sz, MPI_BYTE, peer, KMR_TAG_SPAWN_REPLY1, ic);
2146  assert(cc == MPI_SUCCESS);
2147  free(data);
2148  return MPI_SUCCESS;
2149 }
2150 
2151 /** Collects key-value pairs generated by spawned processes. It is a
2152  map-function to be used with kmr_map_via_spawn() with the
2153  REPLY_EACH option. The spawned processes call
2154  kmr_send_kvs_to_spawner() to send generated key-value pairs, and
2155  this function receives and puts them into KVO. PROTOCOL: The
2156  reply consists of one or two messages with the tag
2157  KMR_TAG_SPAWN_REPLY1=501. One is the data size, which is followed
2158  by a marshaled key-value stream when the data size is non-zero. */
2159 
2160 int
2162  const KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
2163  const long index)
2164 {
2165  _Bool replyeach = 1;
2166  assert(kvi != 0);
2167  KMR * const mr = kvi->c.mr;
2168  MPI_Comm *icommr = kmr_get_spawner_communicator(mr, index);
2169  assert(icommr != 0);
2170  MPI_Comm icomm = *icommr;
2171  int cc;
2172  int peernprocs;
2173  cc = MPI_Comm_remote_size(icomm, &peernprocs);
2174  assert(cc == MPI_SUCCESS);
2175  int npeers = (replyeach ? peernprocs : 1);
2176  for (int peerrank = 0; peerrank < npeers; peerrank++) {
2177  assert(kvo != 0);
2178  MPI_Status st;
2179  int sz;
2180  cc = MPI_Recv(&sz, 1, MPI_INT,
2181  peerrank, KMR_TAG_SPAWN_REPLY1,
2182  icomm, &st);
2183  assert(cc == MPI_SUCCESS);
2184  if (sz == 0) {
2185  continue;
2186  }
2187  void *data = kmr_malloc((size_t)sz);
2188  cc = MPI_Recv(data, sz, MPI_BYTE,
2189  peerrank, KMR_TAG_SPAWN_REPLY1,
2190  icomm, &st);
2191  assert(cc == MPI_SUCCESS);
2192  KMR_KVS *kvx = kmr_create_kvs(mr, KMR_KV_BAD, KMR_KV_BAD);
2193  cc = kmr_restore_kvs(kvx, data, (size_t)sz, kmr_noopt);
2194  assert(cc == MPI_SUCCESS);
2195  struct kmr_option keepopen = {.keep_open = 1};
2196  cc = kmr_map(kvx, kvo, 0, keepopen, kmr_add_identity_fn);
2197  assert(cc == MPI_SUCCESS);
2198  kmr_free(data, (size_t)sz);
2199  }
2200  return MPI_SUCCESS;
2201 }
2202 
2204  struct kmr_spawn_option opt;
2205  void *arg;
2206  kmr_mapfn_t fn;
2207 };
2208 
2209 /* Runs commands in kmr_map_ms_commands(). */
2210 
2211 static int
2212 kmr_exec_command_e(_Bool use_exec, const struct kmr_kv_box kv,
2213  const KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
2214  const long index)
2215 {
2216  char *name = "kmr_map_ms_commands";
2217  KMR *mr = kvi->c.mr;
2218  _Bool tracing5 = (mr->trace_map_ms && (5 <= mr->verbosity));
2219  struct kmr_map_ms_commands_argument *xarg = arg;
2220  struct kmr_spawn_option opt = xarg->opt;
2221  const int maxargc = 128;
2222  int cc;
2223  int argc;
2224  char prefix[80];
2225  char argvstring[160];
2226 
2227  char *starter = (use_exec ? "fork-exec" : "system");
2228  snprintf(prefix, sizeof(prefix), "%s: %s", name, starter);
2229 
2230  char *abuf = kmr_malloc((size_t)kv.vlen + 1);
2231  char **argv = kmr_malloc(sizeof(char *) * (size_t)maxargc);
2232  memcpy(abuf, kv.v.p, (size_t)kv.vlen);
2233  abuf[kv.vlen] = 0;
2234 
2235  if (! use_exec) {
2236  argc = 1;
2237  argv[0] = abuf;
2238  argv[1] = 0;
2239  } else {
2240  cc = kmr_scan_argv_strings(mr, abuf, (size_t)kv.vlen, maxargc,
2241  &argc, argv,
2242  opt.separator_space, name);
2243  assert(cc == MPI_SUCCESS);
2244  argv[argc] = 0;
2245  }
2246  kmr_make_printable_argv_string(argvstring, sizeof(argvstring), argv);
2247 
2248  if (tracing5) {
2249  fprintf(stderr,
2250  ";;KMR [%05d] %s(%s)\n",
2251  mr->rank, prefix, argvstring);
2252  fflush(0);
2253  }
2254 
2255  int waitstatus = 0;
2256  if (! use_exec) {
2257  waitstatus = system(abuf);
2258  if (waitstatus == -1) {
2259  char ee[240];
2260  char *m = strerror(errno);
2261  snprintf(ee, sizeof(ee), "%s() failed: %s for %s(%s)",
2262  prefix, m, starter, argvstring);
2263  kmr_error(mr, ee);
2264  }
2265  } else {
2266  int closefds;
2267  if (mr->keep_fds_at_fork) {
2268  closefds = 0;
2269  } else if (mr->rlimit_nofile == -1) {
2270  mr->rlimit_nofile = kmr_getdtablesize(mr);
2271  assert(mr->rlimit_nofile > 0);
2272  closefds = mr->rlimit_nofile;
2273  } else {
2274  closefds = mr->rlimit_nofile;
2275  }
2276 
2277  int pid = fork();
2278  if (pid == -1) {
2279  char ee[80];
2280  char *m = strerror(errno);
2281  snprintf(ee, sizeof(ee), "%s: fork() failed: %s",
2282  name, m);
2283  kmr_error(mr, ee);
2284  } else {
2285  if (pid == 0) {
2286  for (int fd = 3; fd < closefds; fd++) {
2287  close(fd);
2288  }
2289 
2290  if (0) {
2291  unsetenv("LD_PRELOAD");
2292  setenv("XOS_MMM_L_HPAGE_TYPE", "none", 1);
2293  }
2294 
2295  cc = execvp(argv[0], argv);
2296  if (cc == -1) {
2297  char ee[240];
2298  char *m = strerror(errno);
2299  snprintf(ee, sizeof(ee),
2300  "%s: execvp failed: %s for execvp(%s)",
2301  name, m, argvstring);
2302  kmr_error(mr, ee);
2303  } else {
2304  char ee[240];
2305  snprintf(ee, sizeof(ee),
2306  "%s: execvp returned with=%d for execvp(%s)",
2307  name, cc, argvstring);
2308  kmr_error(mr, ee);
2309  }
2310  } else {
2311  for (;;) {
2312  cc = waitpid(pid, &waitstatus, 0);
2313  if (cc == -1) {
2314  if (errno == EINTR) {
2315  char ee[80];
2316  snprintf(ee, sizeof(ee),
2317  "%s: waitpid() interrupted",
2318  name);
2319  kmr_warning(mr, 1, ee);
2320  continue;
2321  } else {
2322  char ee[80];
2323  char *m = strerror(errno);
2324  snprintf(ee, sizeof(ee),
2325  "%s: waitpid() failed: %s",
2326  name, m);
2327  kmr_warning(mr, 1, ee);
2328  break;
2329  }
2330  } else {
2331  break;
2332  }
2333  }
2334  }
2335  }
2336  }
2337 
2338  if (tracing5 || WIFSIGNALED(waitstatus) || WIFSTOPPED(waitstatus)) {
2339  if (WIFEXITED(waitstatus)) {
2340  int n = WEXITSTATUS(waitstatus);
2341  fprintf(stderr,
2342  ";;KMR [%05d] %s() done (%d) for %s(%s)\n",
2343  mr->rank, prefix, n, starter, argvstring);
2344  } else if (WIFSIGNALED(waitstatus)) {
2345  char ee[240];
2346  int n = WTERMSIG(waitstatus);
2347  snprintf(ee, sizeof(ee),
2348  "%s() signaled=%d in %s(%s)\n",
2349  prefix, n, starter, argvstring);
2350  if (mr->map_ms_abort_on_signal) {
2351  kmr_error(mr, ee);
2352  } else {
2353  kmr_warning(mr, 1, ee);
2354  }
2355  } else if (WIFSTOPPED(waitstatus)) {
2356  /* (never happens). */
2357  char ee[240];
2358  int n = WSTOPSIG(waitstatus);
2359  snprintf(ee, sizeof(ee),
2360  "%s() stopped=%d in %s(%s)\n",
2361  prefix, n, starter, argvstring);
2362  if (mr->map_ms_abort_on_signal) {
2363  kmr_error(mr, ee);
2364  } else {
2365  kmr_warning(mr, 1, ee);
2366  }
2367  } else {
2368  /* (never happens). */
2369  char ee[240];
2370  snprintf(ee, sizeof(ee),
2371  "%s() bad return (?): %s(%s)\n",
2372  prefix, starter, argvstring);
2373  if (mr->map_ms_abort_on_signal) {
2374  kmr_error(mr, ee);
2375  } else {
2376  kmr_warning(mr, 1, ee);
2377  }
2378  }
2379  fflush(0);
2380  }
2381 
2382  cc = (*xarg->fn)(kv, kvi, kvo, xarg->arg, index);
2383  assert(cc == MPI_SUCCESS);
2384 
2385  kmr_free(abuf, (size_t)kv.vlen);
2386  kmr_free(argv, sizeof(char *) * (size_t)maxargc);
2387  return MPI_SUCCESS;
2388 }
2389 
2390 /** Runs commands in kmr_map_ms_commands(). It has system(3C) and
2391  fork-exec variants. */
2392 
2393 static int
2395  const KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
2396  const long index)
2397 {
2398  KMR *mr = kvi->c.mr;
2399  struct kmr_map_ms_commands_argument *xarg = arg;
2400  struct kmr_spawn_option opt = xarg->opt;
2401  int cc;
2402 
2403  _Bool contains_separator;
2404  if (opt.separator_space) {
2405  contains_separator = 1;
2406  } else {
2407  contains_separator = 0;
2408  for (int i = 0; i < (kv.vlen - 1); i++) {
2409  if (kv.v.p[i] == '\0') {
2410  contains_separator = 1;
2411  break;
2412  }
2413  }
2414  }
2415 
2416  _Bool use_exec = (mr->map_ms_use_exec || contains_separator);
2417  cc = kmr_exec_command_e(use_exec, kv, kvi, kvo, arg, index);
2418  return cc;
2419 }
2420 
2421 /** Maps in the master-worker mode, specialized to run serial
2422  commands. It executes a command specified by a key-value, then
2423  calls a map-function at finishes of the command. It takes the
2424  commands in the same way as kmr_map_via_spawn(). The commands
2425  never be MPI programs. It uses system(3C) or fork-exec, switching
2426  to fork-exec either when the SEPARATOR_SPACE option is specified,
2427  a command string includes null characters, or the MAP_MS_USE_EXEC
2428  option to KMR is specified. It is implemented with kmr_map_ms();
2429  see the comments on kmr_map_ms(). */
2430 
2431 int
2433  void *arg, struct kmr_option opt,
2434  struct kmr_spawn_option sopt, kmr_mapfn_t m)
2435 {
2436  int cc;
2437  struct kmr_map_ms_commands_argument xarg = {
2438  .arg = arg,
2439  .opt = sopt,
2440  .fn = m
2441  };
2442  cc = kmr_map_ms(kvi, kvo, &xarg, opt, kmr_exec_command);
2443  return cc;
2444 }
2445 
2446 int kmr_check_exec__(KMR *mr);
2447 
2448 int
2449 kmr_check_exec__(KMR *mr)
2450 {
2451  struct kmr_spawn_option opt = kmr_snoopt;
2452  opt.separator_space = 0;
2453  struct kmr_map_ms_commands_argument arg = {
2454  .opt = opt,
2455  .arg = 0,
2456  .fn = kmr_add_identity_fn
2457  };
2458  long index = 0;
2459 
2460  char key[] = "key0";
2461  char val[] = "echo start a subprocess.; sleep 3;"
2462  " echo a process done.";
2463  const int klen = sizeof(key);
2464  int vlen = sizeof(val);
2465  assert(klen == 5);
2466  struct kmr_kv_box kv = {
2467  .klen = (int)klen,
2468  .vlen = (int)vlen,
2469  .k.p = key,
2470  .v.p = val
2471  };
2472 
2473  KMR_KVS *kvi = kmr_create_kvs(mr, KMR_KV_CSTRING, KMR_KV_CSTRING);
2474  kmr_add_kv_done(kvi);
2475  KMR_KVS *kvo = kmr_create_kvs(mr, KMR_KV_CSTRING, KMR_KV_CSTRING);
2476  int cc = kmr_exec_command(kv, kvi, kvo, &arg, index);
2477  kmr_free_kvs(kvi);
2478  kmr_free_kvs(kvo);
2479  return cc;
2480 }
2481 
2482 /*
2483 Copyright (C) 2012-2018 RIKEN R-CCS
2484 This library is distributed WITHOUT ANY WARRANTY. This library can be
2485 redistributed and/or modified under the terms of the BSD 2-Clause License.
2486 */
int kmr_ckpt_progress_init(KMR_KVS *, KMR_KVS *, struct kmr_option)
It initializes a progress of MapReduce checkpointing.
Definition: kmrckpt.c:2754
Key-Value Stream (abstract).
Definition: kmr.h:632
static int kmr_map_master(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Delivers key-value pairs as requested.
Definition: kmrmapms.c:72
Utilities Private Part (do not include from applications).
int kmr_make_printable_info_string(char *s, size_t sz, MPI_Info info)
Fills the string buffer with the MPI_Info strings for printing.
Definition: kmrutil.c:1884
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:658
void kmr_ckpt_progress_fin(KMR *)
It finalizes the progress of MapReduce checkpointing.
Definition: kmrckpt.c:2846
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:809
void kmr_ckpt_save_kvo_each_init(KMR *, KMR_KVS *)
It initializes saving indexed key-value pairs of the output KVS to a checkpoint data file...
Definition: kmrckpt.c:2704
Spawning Info.
Definition: kmr.h:759
void kmr_ckpt_save_kvo_each_fin(KMR *, KMR_KVS *)
It finalizes saving indexed key-value pairs of the output KVS to the checkpoint data file...
Definition: kmrckpt.c:2734
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:177
State of Spawner.
Definition: kmrmapms.c:426
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_save_kvs(KMR_KVS *kvi, void **dataq, size_t *szq, struct kmr_option opt)
Packs locally the contents of a key-value stream to a byte array.
Definition: kmrbase.c:1026
int kmr_ckpt_enabled(KMR *)
Check if checkpoint/restart is enabled.
Definition: kmrckpt.c:2479
long kmr_ckpt_first_unprocessed_kv(KMR *)
It returns the index of the first unprocessed key-value in the input KVS.
Definition: kmrckpt.c:2536
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:939
Definition: kmr.h:391
KMR Context.
Definition: kmr.h:247
void kmr_ckpt_save_kvo_each_add(KMR *, KMR_KVS *, long)
It adds new key-value pairs of the output KVS to the checkpoint data file.
Definition: kmrckpt.c:2719
static int kmr_exec_command(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Runs commands in kmr_map_ms_commands().
Definition: kmrmapms.c:2394
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:679
int kmr_map_processes(_Bool nonmpi, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent processes.
Definition: kmrmapms.c:2087
int kmr_map_via_spawn(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn().
Definition: kmrmapms.c:1992
kmr_kv_field
Datatypes of Keys or Values.
Definition: kmr.h:368
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:401
Options to Mapping by Spawns.
Definition: kmr.h:708
State during kmr_map_ms().
Definition: kmr.h:453
int kmr_ckpt_disable_ckpt(KMR *)
It temporally disables checkpoint/restart.
Definition: kmrckpt.c:2495
State of each Spawning.
Definition: kmrmapms.c:405
int kmr_map_ms(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps in master-worker mode.
Definition: kmrmapms.c:344
#define kmr_realloc(P, Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:181
int kmr_send_kvs_to_spawner(KMR *mr, KMR_KVS *kvs)
Sends the KVS from a spawned process to the map-function of the spawner.
Definition: kmrmapms.c:2127
int kmr_restore_kvs(KMR_KVS *kvo, void *data, size_t sz, struct kmr_option opt)
Unpacks locally the contents of a key-value stream from a byte array.
Definition: kmrbase.c:1092
MPI_Comm * kmr_get_spawner_communicator(KMR *mr, long index)
Obtains (a reference to) a parent inter-communicator of a spawned process.
Definition: kmrmapms.c:1916
KMR Interface.
int kmr_add_identity_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Adds a given key-value pair unmodified.
Definition: kmrbase.c:995
int kmr_getdtablesize(KMR *mr)
Does getdtablesize(); it is defined, because it is not Posix.
Definition: kmrutil.c:650
int kmr_reply_to_spawner(KMR *mr)
Sends a reply message in the spawned process, which tells it is ready to finish and may have some dat...
Definition: kmrmapms.c:1893
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2240
int kmr_map_parallel_processes(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent MPI processes, which will not commun...
Definition: kmrmapms.c:2037
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv().
Definition: kmrimpl.h:551
int kmr_make_printable_argv_string(char *s, size_t sz, char **argv)
Fills the string buffer with the argv strings for printing.
Definition: kmrutil.c:1859
int kmr_reduce_as_one(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Calls a reduce-function once as if all key-value pairs had the same key.
Definition: kmrbase.c:2683
int kmr_receive_kvs_from_spawned_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Collects key-value pairs generated by spawned processes.
Definition: kmrmapms.c:2161
int kmr_map_ms_commands(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, struct kmr_spawn_option sopt, kmr_mapfn_t m)
Maps in the master-worker mode, specialized to run serial commands.
Definition: kmrmapms.c:2432
int kmr_ckpt_enable_ckpt(KMR *, int)
It temporally enables checkpoint/restart which has been disabled by calling kmr_ckpt_disable_ckpt().
Definition: kmrckpt.c:2516
static int kmr_map_worker(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Asks the master for a task, then calls a map-function.
Definition: kmrmapms.c:222
int kmr_map_serial_processes(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run serial processes.
Definition: kmrmapms.c:2067
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
Definition: kmr.h:736
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:168