KMR
kmrspawn.c
Go to the documentation of this file.
1 /* kmrspawn.c (2016-07-09) -*-Coding: us-ascii;-*- */
2 /* Copyright (C) 2012-2018 RIKEN R-CCS */
3 
4 /** \file kmrspawn.c Static-Spawning Interface. This is a part of the
5  spawning library (https://github.com/riken-rccs/mpispawner).
6  See also the source code of the KMR map-reduce library for the use
7  of spawning (https://github.com/riken-rccs/kmr). It is the
8  worker-side of the master-worker protocol, and the master-side is
9  in "kmrwfmap.c" in KMR. Note that this code is also included in
10  KMR to implement a dummy spawning for testing. */
11 
12 #include <mpi.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <unistd.h>
16 #include <string.h>
17 #include <errno.h>
18 #include <dlfcn.h>
19 #include <assert.h>
20 #include "kmrspawn.h"
21 
22 /* KMR_MAIN_LIBRARY switches compiling for the KMR main library or
23  compiling for the static-spawning library. The purpose of
24  compiling for the main library is for testing the workflow
25  scheduler. The interface functions are renamed by suffixing with
26  "_standin" for the main library. */
27 
28 #define KMR_NO_STATIC_SPAWNING
29 
30 #ifdef KMR_NO_STATIC_SPAWNING
31 #define KMR_MAIN_LIBRARY (1)
32 #else
33 #define KMR_MAIN_LIBRARY (0)
34 #endif
35 
36 #if KMR_MAIN_LIBRARY
37 #include "kmr.h"
38 #include "kmrimpl.h"
39 #else
40 #include "kmrld.h"
41 #endif
42 
43 #if KMR_MAIN_LIBRARY
44 /*NOTHING*/
45 #else
46 #define kmr_free(P,S) free((P))
47 #define kmr_error(MR,S) ((*kmr_ld_err)(DIE,(S)))
48 #define kmr_malloc(S) malloc((S))
49 #endif
50 
51 #if KMR_MAIN_LIBRARY
52 #define kmr_mpi_byte MPI_BYTE
53 #define kmr_mpi_comm_null MPI_COMM_NULL
54 #else
55 #define kmr_mpi_byte (hooks->h.mpi_byte)
56 #define kmr_mpi_comm_null (hooks->h.mpi_comm_null)
57 #endif
58 
59 #if KMR_MAIN_LIBRARY
60 #define kmr_spawn_true_exit exit
61 #define kmr_spawn_true_mpi_finalize MPI_Finalize
62 #define kmr_spawn_mpi_comm_size MPI_Comm_size
63 #define kmr_spawn_mpi_comm_rank MPI_Comm_rank
64 #define kmr_spawn_mpi_send MPI_Send
65 #define kmr_spawn_mpi_recv MPI_Recv
66 #define kmr_spawn_mpi_get_count MPI_Get_count
67 #define kmr_spawn_mpi_intercomm_create MPI_Intercomm_create
68 #define kmr_spawn_mpi_comm_dup MPI_Comm_dup
69 #define kmr_spawn_mpi_comm_free MPI_Comm_free
70 #define kmr_spawn_mpi_comm_set_name MPI_Comm_set_name
71 #else
72 /*NOTHING*/
73 #endif
74 
75 #if KMR_MAIN_LIBRARY
76 #define KMR_LIBAPI(FN) FN ## _standin
77 #else
78 #define KMR_LIBAPI(FN) FN
79 #endif
80 
81 /* Save area of hooks. It is set by kmr_spawn_hookup(). This is used
82  as a fake of libkmrspawn.so. */
83 
84 #if KMR_MAIN_LIBRARY
85 static struct kmr_spawn_hooks *kmr_fake_spawn_hooks = 0;
86 #else
87 /*NOTHING*/
88 #endif
89 
90 #if KMR_MAIN_LIBRARY
91 int
92 kmr_spawn_hookup_standin(struct kmr_spawn_hooks *hooks)
93 {
94  if (kmr_fake_spawn_hooks != 0 && kmr_fake_spawn_hooks != hooks) {
95  kmr_free(kmr_fake_spawn_hooks, sizeof(struct kmr_spawn_hooks));
96  }
97  kmr_fake_spawn_hooks = hooks;
98  return MPI_SUCCESS;
99 }
100 #else
101 /*NOTHING*/
102 #endif
103 
104 static int kmr_spawn_exec_command(struct kmr_spawn_hooks *hooks,
105  int argc, char **argv);
106 
107 /* Records the parameters to spawning. It registers an exec-function,
108  subworld communicators, and their colors. EXECFN is called at
109  starting a work-item. A communicator in SUBWORLDS will be selected
110  as a world for spawning. A color in COLORS is passed to check the
111  correspondence of the subworld at spawning. ARGSSIZE is the limit
112  of the size of the argv strings in RPC messages. It needs to set
113  the MR field in the HOOKS before calling this. */
114 
115 int
116 KMR_LIBAPI(kmr_spawn_setup) (struct kmr_spawn_hooks *hooks,
117  MPI_Comm basecomm, int masterrank,
118  int (*execfn)(struct kmr_spawn_hooks *hooks,
119  int argc, char **argv),
120  int nsubworlds, MPI_Comm subworlds[],
121  unsigned long colors[], size_t argssize)
122 {
123  int cc;
124 
125  int nprocs;
126  int rank;
127  cc = kmr_spawn_mpi_comm_size(basecomm, &nprocs);
128  assert(cc == MPI_SUCCESS);
129  cc = kmr_spawn_mpi_comm_rank(basecomm, &rank);
130  assert(cc == MPI_SUCCESS);
131 
132  if (!(0 <= masterrank && masterrank < nprocs)) {
133  char ee[80];
134  snprintf(ee, sizeof(ee),
135  ("Bad master rank to kmr_spawn_setup() (master=%d).\n"),
136  masterrank);
137  kmr_error(hooks->s.mr, ee);
138  abort();
139  }
140  if (rank == masterrank) {
141  char ee[80];
142  snprintf(ee, sizeof(ee),
143  ("Bad master rank to kmr_spawn_setup();"
144  " master in workers.\n"));
145  kmr_error(hooks->s.mr, ee);
146  abort();
147  }
148  if (nsubworlds > KMR_SPAWN_SUBWORLDS) {
149  char ee[80];
150  snprintf(ee, sizeof(ee),
151  ("Too many subworlds to kmr_spawn_setup()"
152  " (n=%d, limit=%d).\n"),
153  nsubworlds, KMR_SPAWN_SUBWORLDS);
154  kmr_error(hooks->s.mr, ee);
155  abort();
156  }
157 
158  hooks->s.master_rank = masterrank;
159  hooks->s.base_rank = rank;
160  hooks->s.base_comm = basecomm;
161  for (int i = 0; i < KMR_SPAWN_SUBWORLDS; i++) {
162  if (i < nsubworlds) {
163  hooks->s.subworlds[i].comm = subworlds[i];
164  hooks->s.subworlds[i].color = colors[i];
165  } else {
166  hooks->s.subworlds[i].comm = kmr_mpi_comm_null;
167  hooks->s.subworlds[i].color = 0;
168  }
169  }
170 
171  if (execfn != 0) {
172  hooks->s.exec_fn = execfn;
173  } else {
174  hooks->s.exec_fn = kmr_spawn_exec_command;
175  }
176 
177  if (hooks->s.rpc_buffer != 0) {
178  kmr_free(hooks->s.rpc_buffer, hooks->s.rpc_size);
179  hooks->s.rpc_buffer = 0;
180  }
181  size_t msz = (offsetof(struct kmr_spawn_work, args) + argssize);
182  hooks->s.rpc_buffer = kmr_malloc(msz);
183  hooks->s.rpc_size = msz;
184 
185  hooks->s.spawn_world = kmr_mpi_comm_null;
186  hooks->s.spawn_parent = kmr_mpi_comm_null;
187  hooks->s.running_work = 0;
188  hooks->s.mpi_initialized = 0;
189  hooks->s.abort_when_mpi_abort = 0;
190 
191  return MPI_SUCCESS;
192 }
193 
194 void
195 KMR_LIBAPI(kmr_spawn_set_verbosity) (struct kmr_spawn_hooks *hooks, int level)
196 {
197  hooks->s.print_trace = (level >= 2);
198 #if KMR_MAIN_LIBRARY
199 /*NOTHING*/
200 #else
201  kmr_ld_set_error_printer(level, 0);
202 #endif
203 }
204 
205 static int kmr_spawn_join_to_master(struct kmr_spawn_hooks *hooks,
206  struct kmr_spawn_work *w, size_t msglen);
207 static int kmr_spawn_clean_process(struct kmr_spawn_hooks *hooks);
208 static int kmr_spawn_start_work(struct kmr_spawn_hooks *hooks,
209  struct kmr_spawn_work *w, size_t msglen);
210 
211 /* Waits for a work request from the master and handles it. It idles
212  forever by receiving a message. Furthermore, it never returns,
213  when it starts a new executable. Thus, it is normally entered to
214  resume the service instead of looping inside. It should be called
215  with (hooks->s.service_count=0) at the very first time, which
216  causes to send a joining message to the master. */
217 
218 void
219 KMR_LIBAPI(kmr_spawn_service) (struct kmr_spawn_hooks *hooks, int status)
220 {
221  _Bool tracing5 = (hooks->s.print_trace);
222  MPI_Comm basecomm = hooks->s.base_comm;
223  const int master = hooks->s.master_rank;
224 
225  int cc;
226 
227  assert(hooks->s.base_rank != hooks->s.master_rank);
228  assert(hooks->s.rpc_buffer != 0 && hooks->s.rpc_size > 0);
229 
230  /* Reset the previous state. */
231 
232 #if 0
233  cc = kmr_spawn_clean_process(hooks);
234  assert(cc == MPI_SUCCESS);
235 #endif
236 
237  int exitstatus;
238  exitstatus = status;
239 
240  for (;;) {
241  if (tracing5) {
242  fprintf(stderr, ";;KMR [%05d] Entering service loop.\n",
243  hooks->s.base_rank);
244  fflush(0);
245  }
246 
247  {
248  struct kmr_spawn_next mm2;
249  struct kmr_spawn_next *mbuf = &mm2;
250  mbuf->req = KMR_SPAWN_NEXT;
251  mbuf->protocol_version = KMR_SPAWN_MAGIC;
252  mbuf->initial_message = (hooks->s.service_count == 0);
253  mbuf->status = exitstatus;
254  int msz = (int)sizeof(struct kmr_spawn_next);
255  cc = kmr_spawn_mpi_send(mbuf, msz, kmr_mpi_byte, master,
256  KMR_SPAWN_RPC_TAG, basecomm);
257  assert(cc == MPI_SUCCESS);
258  }
259 
260  union kmr_spawn_rpc *mbuf = hooks->s.rpc_buffer;
261  int msz = (int)hooks->s.rpc_size;
262  mbuf->req = KMR_SPAWN_NONE;
263  MPI_Status st;
264  int len;
265  cc = kmr_spawn_mpi_recv(mbuf, msz, kmr_mpi_byte, master,
266  KMR_SPAWN_RPC_TAG, basecomm, &st);
267  assert(cc == MPI_SUCCESS);
268  cc = kmr_spawn_mpi_get_count(&st, kmr_mpi_byte, &len);
269  assert(cc == MPI_SUCCESS);
270  size_t msglen = (size_t)len;
271  int rank = st.MPI_SOURCE;
272 
273  hooks->s.service_count++;
274  assert(hooks->s.service_count != 0);
275 
276  switch (mbuf->req) {
277  case KMR_SPAWN_NONE: {
278  assert(msglen == sizeof(struct kmr_spawn_none));
279  struct kmr_spawn_none *w = &(mbuf->m2);
280  assert(w->protocol_version == KMR_SPAWN_MAGIC);
281  /* Exit for-loop. */
282  break;
283  }
284 
285  case KMR_SPAWN_WORK: {
286  struct kmr_spawn_work *w0 = &(mbuf->m1);
287  if (msglen != (size_t)w0->message_size) {
288  kmr_error(hooks->s.mr,
289  "Bad RPC message size");
290  abort();
291  }
292  assert(w0->protocol_version == KMR_SPAWN_MAGIC);
293  size_t asz = (msglen - offsetof(struct kmr_spawn_work, args));
294 
295  if (0) {
296  if (asz == 0) {
297  fprintf(stderr,
298  ";;KMR [%05d] Receive an activate message.\n",
299  hooks->s.base_rank);
300  fflush(0);
301  }
302  }
303 
304  if (asz == 0) {
305  /* Skip an activation message. */
306  continue;
307  } else {
308  cc = kmr_spawn_clean_process(hooks);
309  assert(cc == MPI_SUCCESS);
310 
311  struct kmr_spawn_work *w = kmr_malloc(msglen);
312  assert(w != 0);
313  memcpy(w, w0, msglen);
314  cc = kmr_spawn_join_to_master(hooks, w, msglen);
315  assert(cc == MPI_SUCCESS);
316  exitstatus = kmr_spawn_start_work(hooks, w, msglen);
317 
318  /* (NEVER RETURNS WHEN EXECED). */
319 
320  exit(exitstatus);
321 
322  cc = kmr_spawn_clean_process(hooks);
323  assert(cc == MPI_SUCCESS);
324  continue;
325  }
326  }
327 
328  default: {
329  char ee[80];
330  snprintf(ee, sizeof(ee),
331  "Bad RPC message request=0x%x length=%zd from rank=%d.\n",
332  mbuf->req, msglen, rank);
333  kmr_error(hooks->s.mr, ee);
334  abort();
335  break;
336  }
337  }
338  /* Exit for-loop. */
339  break;
340  }
341 
342  if (hooks->s.rpc_buffer != 0) {
343  kmr_free(hooks->s.rpc_buffer, hooks->s.rpc_size);
344  hooks->s.rpc_buffer = 0;
345  hooks->s.rpc_size = 0;
346  }
347 
348  kmr_spawn_true_mpi_finalize();
349  kmr_spawn_true_exit(0);
350  abort();
351 }
352 
353 static int kmr_spawn_make_argv_printable(char *s, size_t sz, char **argv);
354 
355 /* Runs the command. */
356 
357 static int
358 kmr_spawn_start_work(struct kmr_spawn_hooks *hooks,
359  struct kmr_spawn_work *w, size_t msglen)
360 {
361  _Bool tracing5 = (hooks->s.print_trace || w->print_trace != 0);
362  size_t asz = (msglen - offsetof(struct kmr_spawn_work, args));
363 
364  if (!(0 <= w->subworld && w->subworld < KMR_SPAWN_SUBWORLDS)
365  || hooks->s.subworlds[w->subworld].comm == kmr_mpi_comm_null) {
366  char ee[80];
367  snprintf(ee, sizeof(ee),
368  ("Bad subworld index for spawning (index=%d).\n"),
369  w->subworld);
370  kmr_error(hooks->s.mr, ee);
371  abort();
372  }
373 
374  int argc;
375  argc = 0;
376 
377  {
378  char *e = &(w->args[asz]);
379  char *p;
380  p = w->args;
381  while (p[0] != 0 && p < (e - 1)) {
382  argc++;
383  while (p[0] != 0 && p < (e - 1)) {
384  p++;
385  }
386  if (p < (e - 1)) {
387  assert(p[0] == 0);
388  p++;
389  }
390  }
391  assert(p == (e - 1) || p[0] == 0);
392  }
393 
394  char *argv[argc + 1];
395 
396  {
397  int i;
398  i = 0;
399  char *e = &(w->args[asz]);
400  char *p;
401  p = w->args;
402  while (p[0] != 0 && p < (e - 1)) {
403  assert(i < argc);
404  argv[i] = p;
405  i++;
406  while (p[0] != 0 && p < (e - 1)) {
407  p++;
408  }
409  if (p < (e - 1)) {
410  assert(p[0] == 0);
411  p++;
412  }
413  }
414  assert(p == (e - 1) || p[0] == 0);
415  assert(i == argc);
416  argv[argc] = 0;
417  }
418 
419  hooks->s.running_work = w;
420  hooks->s.mpi_initialized = 0;
421 
422  if (tracing5) {
423  char aa[80];
424  kmr_spawn_make_argv_printable(aa, 45, argv);
425  printf(";;KMR [%05d] EXEC: %s\n",
426  hooks->s.base_rank, aa);
427  fflush(0);
428  }
429 
430  assert(hooks->s.exec_fn != 0);
431  int exitstatus = (*hooks->s.exec_fn)(hooks, argc, argv);
432  /* (NEVER RETURNS WHEN EXECED). */
433 
434  hooks->s.running_work = 0;
435  hooks->s.mpi_initialized = 0;
436 
437  return exitstatus;
438 }
439 
440 static int
441 kmr_spawn_exec_command(struct kmr_spawn_hooks *hooks, int argc, char **argv)
442 {
443 #if KMR_MAIN_LIBRARY
444  return MPI_SUCCESS;
445 #else
446  kmr_ld_usoexec(argv, 0, hooks->d.initial_argv, hooks->d.options_flags,
447  hooks->d.options_heap_bottom);
448  return MPI_SUCCESS;
449 #endif
450 }
451 
452 static int
453 kmr_spawn_make_argv_printable(char *s, size_t sz, char **argv)
454 {
455  assert(sz > 4);
456  int cc;
457  *s = 0;
458  size_t cnt;
459  cnt = 0;
460  for (int i = 0; argv[i] != 0; i++) {
461  cc = snprintf(&s[cnt], (sz - cnt), "%s%s",
462  (i == 0 ? "" : ","), argv[i]);
463  cnt += (size_t)cc;
464  if (cnt >= sz) {
465  snprintf(&s[sz - 4], 4, "...");
466  return 0;
467  }
468  }
469  return 0;
470 }
471 
472 /* Connects to the master by MPI_Intercomm_create(). */
473 
474 static int
475 kmr_spawn_join_to_master(struct kmr_spawn_hooks *hooks,
476  struct kmr_spawn_work *w, size_t msglen)
477 {
478  assert(w->subworld < KMR_SPAWN_SUBWORLDS);
479  assert(hooks->s.subworlds[w->subworld].comm != kmr_mpi_comm_null);
480  MPI_Comm basecomm = hooks->s.base_comm;
481  //const int master = hooks->s.master_rank;
482  MPI_Comm subworld = hooks->s.subworlds[w->subworld].comm;
483  unsigned long color = hooks->s.subworlds[w->subworld].color;
484  _Bool tracing5 = hooks->s.print_trace;
485 
486  int cc;
487 
488  int nprocs;
489  cc = kmr_spawn_mpi_comm_size(subworld, &nprocs);
490  assert(cc == MPI_SUCCESS);
491  if (w->nprocs > nprocs) {
492  char ee[80];
493  snprintf(ee, sizeof(ee),
494  ("Bad spawn call; number of procs mismatch"
495  " (requested=%d size=%d)"),
496  w->nprocs, nprocs);
497  kmr_error(hooks->s.mr, ee);
498  abort();
499  }
500  if (w->color != color) {
501  char ee[80];
502  snprintf(ee, sizeof(ee),
503  ("Bad spawn call; color mismatch (%lu,%lu).\n"),
504  w->color, color);
505  kmr_error(hooks->s.mr, ee);
506  abort();
507  }
508 
509  assert(hooks->s.spawn_world == kmr_mpi_comm_null);
510  assert(hooks->s.spawn_parent == kmr_mpi_comm_null);
511 
512  cc = kmr_spawn_mpi_comm_dup(subworld, &hooks->s.spawn_world);
513  assert(cc == MPI_SUCCESS);
514  cc = kmr_spawn_mpi_comm_set_name(hooks->s.spawn_world,
515  hooks->h.world_name);
516  assert(cc == MPI_SUCCESS);
517 
518  if (tracing5) {
519  fprintf(stderr, ";;KMR [%05d] Connect to master.\n",
520  hooks->s.base_rank);
521  fflush(0);
522  }
523 
524  cc = kmr_spawn_mpi_intercomm_create(hooks->s.spawn_world, 0,
525  basecomm,
526  hooks->s.master_rank,
527  KMR_SPAWN_ICOMM_TAG,
528  &hooks->s.spawn_parent);
529  assert(cc == MPI_SUCCESS);
530  return MPI_SUCCESS;
531 }
532 
533 /* Cleans the resources of the worker state. It frees the current
534  work-item. */
535 
536 static int
537 kmr_spawn_clean_process(struct kmr_spawn_hooks *hooks)
538 {
539  int cc;
540 
541  if (hooks->s.running_work != 0) {
542  kmr_free(hooks->s.running_work,
543  (size_t)hooks->s.running_work->message_size);
544  hooks->s.running_work = 0;
545  }
546 
547  hooks->s.mpi_initialized = 0;
548 
549  if (hooks->s.spawn_world != kmr_mpi_comm_null) {
550  cc = kmr_spawn_mpi_comm_free(&hooks->s.spawn_world);
551  assert(cc == MPI_SUCCESS);
552  }
553  if (hooks->s.spawn_parent != kmr_mpi_comm_null) {
554  cc = kmr_spawn_mpi_comm_free(&hooks->s.spawn_parent);
555  assert(cc == MPI_SUCCESS);
556  }
557 
558  /* MORE NEEDED */
559 
560  return MPI_SUCCESS;
561 }
562 
563 #if 0
564 __attribute__ ((noreturn)) void
565 kmr_spawn_idle(struct kmr_spawn_hooks *hooks)
566 {
567  while (1) {sleep(3600);}
568 }
569 #endif
570 
571 /*
572 Copyright (C) 2012-2018 RIKEN R-CCS
573 This library is distributed WITHOUT ANY WARRANTY. This library can be
574 redistributed and/or modified under the terms of the BSD 2-Clause License.
575 */
Utilities Private Part (do not include from applications).
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:177
KMR Interface.
Static-Spawning API.