KMR
test4.c
1 /* test4.c (2014-02-04) */
2 
3 /* Check master-worker mappers. */
4 
5 #include <mpi.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <unistd.h>
9 #include <fcntl.h>
10 #include <limits.h>
11 #include <sys/types.h>
12 #include <sys/stat.h>
13 #include <sys/time.h>
14 #include <assert.h>
15 #ifdef _OPENMP
16 #include <omp.h>
17 #endif
18 
19 #include "kmr.h"
20 #include "kmrimpl.h"
21 
22 static int
23 copykvfn(struct kmr_kv_box kv, const KMR_KVS *kvi,
24  KMR_KVS *kvo, void *p, const long ii)
25 {
26  printf("mapfn(k=%s, v=%s)\n", kv.k.p, kv.v.p);
27  fflush(0);
28  kmr_add_kv(kvo, kv);
29  return MPI_SUCCESS;
30 }
31 
32 static void
33 simple0(int nprocs, int rank)
34 {
35  int cc;
36 
37  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
38  mr->trace_map_ms = 1;
39 
40  if (1) {
41  MPI_Barrier(MPI_COMM_WORLD);
42  usleep(50 * 1000);
43  if (rank == 0) {
44  printf("\n");
45  printf("Checking kmr_map_ms...\n");
46  }
47  fflush(0);
48  usleep(50 * 1000);
49 
50  KMR_KVS *kvs00 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
51  if (mr->rank == 0) {
52  char k[256];
53  char v[256];
54  for (int i = 0; i < 20; i++) {
55  snprintf(k, sizeof(k), "key%d", i);
56  snprintf(v, sizeof(v), "value%d", i);
57  kmr_add_string(kvs00, k, v);
58  }
59  }
60  kmr_add_kv_done(kvs00);
61  KMR_KVS *kvs01 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
62  do {
63  struct kmr_option opt = kmr_noopt;
64  opt.nothreading = 1;
65  cc = kmr_map_ms(kvs00, kvs01, 0, opt, copykvfn);
66  } while (cc == MPI_ERR_ROOT);
67  /*kmr_dump_kvs(kvs01, 0);*/
68  if (mr->rank == 0) {
69  assert(kvs01->c.element_count == 20);
70  } else {
71  assert(kvs01->c.element_count == 0);
72  }
73  kmr_free_kvs(kvs01);
74  }
75 
76  kmr_free_context(mr);
77 }
78 
79 static void
80 simple1(int nprocs, int rank, _Bool forkexec)
81 {
82  int cc;
83 
84  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
85  mr->trace_map_ms = 1;
86  mr->map_ms_abort_on_signal = 1;
87  mr->map_ms_use_exec = forkexec;
88 
89  if (1) {
90  MPI_Barrier(MPI_COMM_WORLD);
91  usleep(50 * 1000);
92  if (rank == 0) {
93  if (forkexec) {
94  printf("\n");
95  printf("Checking kmr_map_ms_commands (by fork-exec)...\n");
96  } else {
97  printf("\n");
98  printf("Checking kmr_map_ms_commands (by system(3C)...\n");
99  }
100  }
101  fflush(0);
102  usleep(50 * 1000);
103 
104  KMR_KVS *kvs00 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
105  if (mr->rank == 0) {
106  for (int i = 0; i < 20; i++) {
107  char key[] = "key";
108  size_t klen = sizeof(key);
109  char val_s[] = "echo start a subprocess.; "
110  "/bin/sleep 3; "
111  "echo a process done.";
112  char val_e[] = "sh\0-c\0echo start a subprocess.; "
113  "/bin/sleep 3; "
114  "echo a process done.";
115  char *val = (forkexec ? val_e : val_s);
116  size_t vlen = (forkexec ? sizeof(val_e) : sizeof(val_s));
117  assert(klen <= INT_MAX && vlen <= INT_MAX);
118  struct kmr_kv_box kv;
119  kv.klen = (int)klen;
120  kv.k.p = key;
121  kv.vlen = (int)vlen;
122  kv.v.p = val;
123  int ccx = kmr_add_kv(kvs00, kv);
124  assert(ccx == MPI_SUCCESS);
125  }
126  }
127  kmr_add_kv_done(kvs00);
128  KMR_KVS *kvs01 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
129  do {
130  struct kmr_option mopt = kmr_noopt;
131  mopt.nothreading = 1;
132  struct kmr_spawn_option sopt = {.separator_space = 0};
133  cc = kmr_map_ms_commands(kvs00, kvs01, 0,
134  mopt, sopt, copykvfn);
135  } while (cc == MPI_ERR_ROOT);
136  /*kmr_dump_kvs(kvs01, 0);*/
137  if (mr->rank == 0) {
138  assert(kvs01->c.element_count == 20);
139  } else {
140  assert(kvs01->c.element_count == 0);
141  }
142  kmr_free_kvs(kvs01);
143  }
144 
145  kmr_free_context(mr);
146 }
147 
148 int
149 main(int argc, char *argv[])
150 {
151  if (argc == 1 || argc == 2) {
152  int nprocs, rank, lev;
153  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &lev);
154  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
155  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
156 
157  kmr_init();
158 
159  if (rank == 0) {printf("CHECK MS MAPPER\n");}
160  fflush(0);
161 
162  simple0(nprocs, rank);
163  simple1(nprocs, rank, 0);
164  simple1(nprocs, rank, 1);
165 
166  MPI_Barrier(MPI_COMM_WORLD);
167  usleep(50 * 1000);
168  if (rank == 0) {printf("OK\n");}
169  fflush(0);
170 
171  kmr_fin();
172 
173  MPI_Finalize();
174  } else {
175  /* This part is not used now. The test invoked this program
176  in a subprocess. Now, the test invokes a shell script. */
177 
178  printf("a process runs: %s %s %s %s\n",
179  argv[0], argv[1], argv[2], argv[3]);
180  fflush(0);
181  sleep(3);
182  printf("a process done.\n");
183  fflush(0);
184  }
185  return 0;
186 }
Key-Value Stream (abstract).
Definition: kmr.h:632
Utilities Private Part (do not include from applications).
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:658
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:809
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:939
KMR Context.
Definition: kmr.h:247
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:679
Handy Copy of a Key-Value Field.
Definition: kmr.h:401
Options to Mapping by Spawns.
Definition: kmr.h:708
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
#define kmr_init()
Sets up the environment.
Definition: kmr.h:794
int kmr_map_ms_commands(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, struct kmr_spawn_option sopt, kmr_mapfn_t m)
Maps in the master-worker mode, specialized to run serial commands.
Definition: kmrmapms.c:2432
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:367
KMR Interface.
int kmr_map_ms(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps in master-worker mode.
Definition: kmrmapms.c:344
int kmr_add_string(KMR_KVS *kvs, const char *k, const char *v)
Adds a key-value pair of strings.
Definition: kmrbase.c:971
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:168