KMR
test5.c
1 /* test5.c (2014-02-04) */
2 
3 /* Check spawning mappers. */
4 
5 /* Run it with four or more dynamic processes. */
6 
7 #include <mpi.h>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <unistd.h>
11 #include <fcntl.h>
12 #include <limits.h>
13 #include <sys/types.h>
14 #include <sys/stat.h>
15 #include <sys/time.h>
16 #include <assert.h>
17 #ifdef _OPENMP
18 #include <omp.h>
19 #endif
20 
21 #include "kmr.h"
22 #include "kmrimpl.h"
23 
24 _Bool skipmpiwork = 0;
25 
26 static int
27 empty_map_fn_seq(const struct kmr_kv_box kv, const KMR_KVS *kvi,
28  KMR_KVS *kvo, void *p, long ii)
29 {
30  fflush(0);
31  usleep(50 * 1000);
32  printf("test5:empty-map-fn[%d]: called.\n", (int)ii);
33  fflush(0);
34  sleep(3);
35  return MPI_SUCCESS;
36 }
37 
38 static int
39 empty_map_fn_mpi_noreply(const struct kmr_kv_box kv, const KMR_KVS *kvi,
40  KMR_KVS *kvo, void *p, long ii)
41 {
42  KMR *mr = kmr_get_context_of_kvs(kvi);
43  MPI_Comm *pp = kmr_get_spawner_communicator(mr, ii);
44  MPI_Comm ic = *pp;
45  fflush(0);
46  usleep(50 * 1000);
47  if (sizeof(int) != sizeof(void *) && sizeof(ic) == sizeof(void *)) {
48  printf("test5:empty-map-fn[%d]: sleeping 12 sec (icomm=%p)...\n",
49  (int)ii, (void *)ic);
50  } else {
51  printf("test5:empty-map-fn[%d]: sleeping 12 sec (icomm=%d)...\n",
52  (int)ii, (int)ic);
53  }
54  fflush(0);
55  sleep(12);
56  return MPI_SUCCESS;
57 }
58 
59 static int
60 empty_map_fn_mpi_with_reply(const struct kmr_kv_box kv, const KMR_KVS *kvi,
61  KMR_KVS *kvo, void *p, long ii)
62 {
63  KMR *mr = kmr_get_context_of_kvs(kvi);
64  MPI_Comm *pp = kmr_get_spawner_communicator(mr, ii);
65  MPI_Comm ic = *pp;
66  fflush(0);
67  usleep(50 * 1000);
68  if (sizeof(int) != sizeof(void *) && sizeof(ic) == sizeof(void *)) {
69  printf("test5:empty-map-fn[%d]: called (icomm=%p).\n",
70  (int)ii, (void *)ic);
71  } else {
72  printf("test5:empty-map-fn[%d]: called (icomm=%d).\n",
73  (int)ii, (int)ic);
74  }
75  fflush(0);
76  return MPI_SUCCESS;
77 }
78 
79 static void
80 simple0(int nprocs, int rank)
81 {
82  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
83  mr->trace_map_spawn = 1;
84  mr->spawn_max_processes = 4;
85 
86  MPI_Barrier(MPI_COMM_WORLD);
87  usleep(50 * 1000);
88 
89  if (1) {
90  MPI_Barrier(MPI_COMM_WORLD);
91  usleep(50 * 1000);
92  if (rank == 0) {
93  printf("\n");
94  printf("** CHECK kmr_map_via_spawn WITH RETURNING KVS.\n");
95  printf("** Spawn 2-rank work 4 times"
96  " using %d dynamic processes.\n",
97  mr->spawn_max_processes);
98  }
99  fflush(0);
100  usleep(50 * 1000);
101 
102  KMR_KVS *kvs00 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
103  if (mr->rank == 0) {
104  char kbuf[256];
105  char vbuf[256];
106  snprintf(kbuf, sizeof(kbuf), "key");
107  snprintf(vbuf, sizeof(vbuf),
108  "maxprocs=2 %s ./a.out mpi returnkvs a0 a1 a2",
109  "info0=value0 info1=value1 info2=value2");
110  struct kmr_kv_box nkv = {
111  .klen = (int)(strlen(kbuf) + 1),
112  .vlen = (int)(strlen(vbuf) + 1),
113  .k.p = kbuf,
114  .v.p = vbuf};
115  for (int i = 0; i < 4; i++) {
116  kmr_add_kv(kvs00, nkv);
117  }
118  }
119  kmr_add_kv_done(kvs00);
120  KMR_KVS *kvs01 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
121  struct kmr_spawn_option opt = {.reply_each = 1,
122  .separator_space = 1};
123  kmr_map_via_spawn(kvs00, kvs01, 0,
124  MPI_INFO_NULL, opt, kmr_receive_kvs_from_spawned_fn);
125  /*kmr_dump_kvs(kvs01, 0);*/
126  if (mr->rank == 0) {
127  assert(kvs01->c.element_count == 32);
128  } else {
129  assert(kvs01->c.element_count == 0);
130  }
131  kmr_free_kvs(kvs01);
132  }
133 
134  kmr_free_context(mr);
135 }
136 
137 static void
138 simple1(int nprocs, int rank)
139 {
140  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
141  mr->trace_map_spawn = 1;
142  mr->spawn_max_processes = 4;
143 
144  MPI_Barrier(MPI_COMM_WORLD);
145  usleep(50 * 1000);
146 
147  if (1) {
148  MPI_Barrier(MPI_COMM_WORLD);
149  usleep(50 * 1000);
150  if (rank == 0) {
151  printf("\n");
152  printf("** CHECK kmr_map_via_spawn WITH WAITING IN MAP-FN.\n");
153  printf("** Spawn 2-rank work 4 times"
154  " using %d dynamic processes.\n",
155  mr->spawn_max_processes);
156  }
157  fflush(0);
158  usleep(50 * 1000);
159 
160  KMR_KVS *kvs00 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
161  if (mr->rank == 0) {
162  char kbuf[256];
163  char vbuf[256];
164  snprintf(kbuf, sizeof(kbuf), "key");
165  snprintf(vbuf, sizeof(vbuf),
166  "maxprocs=2 ./a.out mpi noreply a0 a1 a2");
167  struct kmr_kv_box nkv = {
168  .klen = (int)(strlen(kbuf) + 1),
169  .vlen = (int)(strlen(vbuf) + 1),
170  .k.p = kbuf,
171  .v.p = vbuf};
172  for (int i = 0; i < 4; i++) {
173  kmr_add_kv(kvs00, nkv);
174  }
175  }
176  kmr_add_kv_done(kvs00);
177  KMR_KVS *kvs01 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
178  struct kmr_spawn_option opt = {.separator_space = 1};
179  kmr_map_via_spawn(kvs00, kvs01, 0,
180  MPI_INFO_NULL, opt, empty_map_fn_mpi_noreply);
181  kmr_free_kvs(kvs01);
182  }
183 
184  if (1) {
185  MPI_Barrier(MPI_COMM_WORLD);
186  usleep(50 * 1000);
187  if (rank == 0) {
188  printf("\n");
189  printf("** CHECK kmr_map_via_spawn WITH REPLY (EACH).\n");
190  printf("** Spawn 2-rank work 4 times"
191  " using %d dynamic processes.\n",
192  mr->spawn_max_processes);
193  }
194  fflush(0);
195  usleep(50 * 1000);
196 
197  KMR_KVS *kvs10 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
198  if (mr->rank == 0) {
199  char kbuf[256];
200  char vbuf[256];
201  snprintf(kbuf, sizeof(kbuf), "key");
202  snprintf(vbuf, sizeof(vbuf),
203  "maxprocs=2 ./a.out mpi eachreply a0 a1 a2");
204  struct kmr_kv_box nkv = {
205  .klen = (int)(strlen(kbuf) + 1),
206  .vlen = (int)(strlen(vbuf) + 1),
207  .k.p = kbuf,
208  .v.p = vbuf};
209  for (int i = 0; i < 4; i++) {
210  kmr_add_kv(kvs10, nkv);
211  }
212  }
213  kmr_add_kv_done(kvs10);
214  KMR_KVS *kvs11 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
215  struct kmr_spawn_option opt = {.reply_each = 1,
216  .separator_space = 1};
217  kmr_map_via_spawn(kvs10, kvs11, 0,
218  MPI_INFO_NULL, opt, empty_map_fn_mpi_with_reply);
219  kmr_free_kvs(kvs11);
220  }
221 
222  if (1) {
223  MPI_Barrier(MPI_COMM_WORLD);
224  usleep(50 * 1000);
225  if (rank == 0) {
226  printf("\n");
227  printf("** CHECK kmr_map_via_spawn WITH REPLY (ROOT).\n");
228  printf("Spawn 2-rank work 4 times"
229  " using %d dynamic processes.\n",
230  mr->spawn_max_processes);
231  }
232  fflush(0);
233  usleep(50 * 1000);
234 
235  KMR_KVS *kvs20 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
236  if (mr->rank == 0) {
237  char kbuf[256];
238  char vbuf[256];
239  snprintf(kbuf, sizeof(kbuf), "key");
240  snprintf(vbuf, sizeof(vbuf),
241  "maxprocs=2 ./a.out mpi rootreply a0 a1 a2");
242  struct kmr_kv_box nkv = {
243  .klen = (int)(strlen(kbuf) + 1),
244  .vlen = (int)(strlen(vbuf) + 1),
245  .k.p = kbuf,
246  .v.p = vbuf};
247  for (int i = 0; i < 4; i++) {
248  kmr_add_kv(kvs20, nkv);
249  }
250  }
251  kmr_add_kv_done(kvs20);
252  KMR_KVS *kvs21 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
253  struct kmr_spawn_option opt = {.reply_root = 1,
254  .separator_space = 1};
255  kmr_map_via_spawn(kvs20, kvs21, 0,
256  MPI_INFO_NULL, opt, empty_map_fn_mpi_with_reply);
257  kmr_free_kvs(kvs21);
258  }
259 
260  kmr_free_context(mr);
261 }
262 
263 static void
264 simple2(int nprocs, int rank)
265 {
266  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
267  mr->trace_map_spawn = 1;
268  mr->spawn_max_processes = 4;
269 
270  MPI_Barrier(MPI_COMM_WORLD);
271  usleep(50 * 1000);
272 
273  if (1) {
274  MPI_Barrier(MPI_COMM_WORLD);
275  usleep(50 * 1000);
276  if (rank == 0) {
277  printf("\n");
278  printf("** CHECK kmr_map_processes (SERIAL)...\n");
279  printf("** Spawn 2 serial processes 4 times.\n");
280  }
281  fflush(0);
282  usleep(50 * 1000);
283 
284  KMR_KVS *kvs10 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
285  if (mr->rank == 0) {
286  char kbuf[256];
287  char vbuf[256];
288  snprintf(kbuf, sizeof(kbuf), "key");
289  snprintf(vbuf, sizeof(vbuf),
290  "maxprocs=2 ./a.out seq noreply a0 a1 a2");
291  struct kmr_kv_box nkv = {
292  .klen = (int)(strlen(kbuf) + 1),
293  .vlen = (int)(strlen(vbuf) + 1),
294  .k.p = kbuf,
295  .v.p = vbuf};
296  for (int i = 0; i < 8; i++) {
297  kmr_add_kv(kvs10, nkv);
298  }
299  }
300  kmr_add_kv_done(kvs10);
301  KMR_KVS *kvs11 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
302  struct kmr_spawn_option opt = {.separator_space = 1};
303  kmr_map_processes(1, kvs10, kvs11, 0, MPI_INFO_NULL, opt,
304  empty_map_fn_seq);
305  kmr_free_kvs(kvs11);
306  }
307 
308  if (!skipmpiwork) {
309  MPI_Barrier(MPI_COMM_WORLD);
310  usleep(50 * 1000);
311  if (rank == 0) {
312  printf("\n");
313  printf("** CHECK kmr_map_processes (MPI)...\n");
314  printf("** Spawn 2-rank work 4 times"
315  " using %d dynamic processes.\n",
316  mr->spawn_max_processes);
317  printf("** THIS TEST MAY BLOCK INDEFINITELY"
318  " ON SOME IMPLEMENTATIONS OF MPI.\n");
319  printf("** THEN, RUN THIS TEST WITH a.out 0"
320  " TO SKIP THIS PART.\n");
321  }
322  fflush(0);
323  usleep(50 * 1000);
324 
325  KMR_KVS *kvs00 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
326  if (mr->rank == 0) {
327  char kbuf[256];
328  char vbuf[256];
329  snprintf(kbuf, sizeof(kbuf), "key");
330  snprintf(vbuf, sizeof(vbuf),
331  "maxprocs=2 ./a.out mpi noreply a0 a1 a2");
332  struct kmr_kv_box nkv = {
333  .klen = (int)(strlen(kbuf) + 1),
334  .vlen = (int)(strlen(vbuf) + 1),
335  .k.p = kbuf,
336  .v.p = vbuf};
337  for (int i = 0; i < 4; i++) {
338  kmr_add_kv(kvs00, nkv);
339  }
340  }
341  kmr_add_kv_done(kvs00);
342  KMR_KVS *kvs01 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
343  struct kmr_spawn_option opt = {.separator_space = 1};
344  kmr_map_processes(0, kvs00, kvs01, 0, MPI_INFO_NULL, opt,
345  empty_map_fn_seq);
346  kmr_free_kvs(kvs01);
347  }
348 
349  kmr_free_context(mr);
350 }
351 
352 static int
353 spawned(int argc, char *argv[])
354 {
355  /* SPAWNED CHILD */
356 
357  assert(strcmp(argv[1], "seq") == 0 || strcmp(argv[1], "mpi") == 0);
358  if (strcmp(argv[1], "seq") == 0) {
359  printf("test5:spawned(serial): started (%s,%s,%s).\n",
360  argv[0], argv[1], argv[3]);
361  printf("test5:spawned(serial): sleeping 3 sec...\n");
362  fflush(0);
363  sleep(3);
364  printf("test5:spawned(serial): exits.\n");
365  fflush(0);
366  } else if (strcmp(argv[1], "mpi") == 0) {
367  int nprocs, rank, lev;
368  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &lev);
369  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
370  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
371 
372  MPI_Comm parent;
373  MPI_Comm_get_parent(&parent);
374  assert(parent != MPI_COMM_NULL);
375 
376  int peer_nprocs;
377  MPI_Comm_remote_size(parent, &peer_nprocs);
378  assert(peer_nprocs == 1);
379  printf("test5:spawned(mpi;rank=%d/%d): started (%s,%s,%s).\n",
380  rank, nprocs, argv[0], argv[1], argv[2]);
381  printf("test5:spawned(mpi;rank=%d/%d): sleeping 3 sec...\n",
382  rank, nprocs);
383  fflush(0);
384  sleep(3);
385 
386  assert((strcmp(argv[2], "noreply") == 0)
387  || (strcmp(argv[2], "eachreply") == 0)
388  || (strcmp(argv[2], "rootreply") == 0)
389  || (strcmp(argv[2], "returnkvs") == 0));
390  if (strcmp(argv[2], "noreply") == 0) {
391  /* NO REPLY */
392  printf("test5:spawned(mpi;rank=%d/%d):"
393  " no reply.\n",
394  rank, nprocs);
395  } else if (strcmp(argv[2], "eachreply") == 0) {
396  /* EACH REPLY */
397  printf("test5:spawned(mpi;rank=%d/%d):"
398  " sending a reply.\n",
399  rank, nprocs);
400  int peer = 0;
401  MPI_Send(0, 0, MPI_BYTE, peer,
402  KMR_TAG_SPAWN_REPLY, parent);
403  } else if (strcmp(argv[2], "rootreply") == 0) {
404  /* ROOT REPLY */
405  if (rank == 0) {
406  printf("test5:spawned(mpi;rank=%d/%d):"
407  " sending a root reply.\n",
408  rank, nprocs);
409  int peer = 0;
410  MPI_Send(0, 0, MPI_BYTE, peer,
411  KMR_TAG_SPAWN_REPLY, parent);
412  }
413  } else if (strcmp(argv[2], "returnkvs") == 0) {
414  printf("test5:spawned(mpi;rank=%d/%d):"
415  " sending a kvs.\n",
416  rank, nprocs);
417  KMR *mr = kmr_create_dummy_context();
419  KMR_KVS *kvs00 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
420  for (int i = 0; i < 4; i++) {
421  char k[40];
422  char v[40];
423  snprintf(k, sizeof(k), "k%d", i);
424  snprintf(v, sizeof(v), "v%d", i);
425  kmr_add_string(kvs00, k, v);
426  }
427  kmr_add_kv_done(kvs00);
428  kmr_send_kvs_to_spawner(mr, kvs00);
429  } else {
430  /* NO REPLY */
431  assert(0);
432  }
433 
434  printf("test5:spawned(mpi;rank=%d/%d):"
435  " call MPI_Comm_free (could block)...\n",
436  rank, nprocs);
437  fflush(0);
438  MPI_Comm_free(&parent);
439  printf("test5:spawned(mpi;rank=%d/%d):"
440  " MPI_Comm_free done.\n",
441  rank, nprocs);
442  fflush(0);
443  printf("test5:spawned(mpi;rank=%d/%d):"
444  " call MPI_Finalize...\n",
445  rank, nprocs);
446  MPI_Finalize();
447  printf("test5:spawned(mpi;rank=%d/%d):"
448  " MPI_Finalize done; exits.\n",
449  rank, nprocs);
450  fflush(0);
451  }
452 
453  return 0;
454 }
455 
456 int
457 main(int argc, char *argv[])
458 {
459  if (argc == 1 || argc == 2) {
460  /* SPAWNER */
461 
462  int nprocs, rank, lev;
463  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &lev);
464  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
465  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
466 
467  kmr_init();
468 
469  skipmpiwork = (argc == 2 && argv[1][0] == '0');
470 
471  if (rank == 0) {
472  printf("Check spawning mapper.\n");
473  printf("Running this test needs 4 or more"
474  " dynamic processes.\n");
475  }
476  fflush(0);
477 
478  simple0(nprocs, rank);
479  simple1(nprocs, rank);
480  simple2(nprocs, rank);
481 
482  MPI_Barrier(MPI_COMM_WORLD);
483  usleep(50 * 1000);
484  if (rank == 0) {printf("OK\n");}
485  fflush(0);
486 
487  kmr_fin();
488 
489  MPI_Finalize();
490  } else {
491  spawned(argc, argv);
492  }
493  return 0;
494 }
Key-Value Stream (abstract).
Definition: kmr.h:632
Utilities Private Part (do not include from applications).
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:809
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
MPI_Comm * kmr_get_spawner_communicator(KMR *mr, long index)
Obtains (a reference to) a parent inter-communicator of a spawned process.
Definition: kmrmapms.c:1916
int kmr_send_kvs_to_spawner(KMR *mr, KMR_KVS *kvs)
Sends the KVS from a spawned process to the map-function of the spawner.
Definition: kmrmapms.c:2127
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:939
int kmr_reply_to_spawner(KMR *mr)
Sends a reply message in the spawned process, which tells it is ready to finish and may have some dat...
Definition: kmrmapms.c:1893
KMR Context.
Definition: kmr.h:247
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:679
Handy Copy of a Key-Value Field.
Definition: kmr.h:401
Options to Mapping by Spawns.
Definition: kmr.h:708
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
#define kmr_init()
Sets up the environment.
Definition: kmr.h:794
int kmr_map_via_spawn(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn().
Definition: kmrmapms.c:1992
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:367
KMR Interface.
int kmr_add_string(KMR_KVS *kvs, const char *k, const char *v)
Adds a key-value pair of strings.
Definition: kmrbase.c:971
int kmr_receive_kvs_from_spawned_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Collects key-value pairs generated by spawned processes.
Definition: kmrmapms.c:2161
int kmr_map_processes(_Bool nonmpi, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent processes.
Definition: kmrmapms.c:2087
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:168