KMR
test1.c
1 /* test1.c (2014-02-04) */
2 
3 /* Check KMR basic operations. It includes similar tests as test0.c,
4  but with many key-value pairs. */
5 
6 /* Run it with "mpirun -np n a.out". */
7 
8 #include <mpi.h>
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <unistd.h>
12 #include <fcntl.h>
13 #include <limits.h>
14 #include <sys/types.h>
15 #include <sys/stat.h>
16 #include <sys/time.h>
17 #include <assert.h>
18 #ifdef _OPENMP
19 #include <omp.h>
20 #endif
21 
22 #include "kmr.h"
23 #include "kmrimpl.h"
24 
25 static double
26 wtime()
27 {
28  static struct timeval tv0 = {.tv_sec = 0};
29  struct timeval tv;
30  int cc;
31  cc = gettimeofday(&tv, 0);
32  assert(cc == 0);
33  if (tv0.tv_sec == 0) {
34  tv0 = tv;
35  assert(tv0.tv_sec != 0);
36  }
37  double dt = ((double)(tv.tv_sec - tv0.tv_sec)
38  + ((double)(tv.tv_usec - tv0.tv_usec) * 1e-6));
39  return dt;
40 }
41 
42 /* Puts many key-value pairs to output KVO. */
43 
44 static int
45 addsomekeys0(const struct kmr_kv_box kv0,
46  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i_)
47 {
48  assert(kvs0 == 0 && kv0.klen == 0 && kv0.vlen == 0 && kvo != 0);
49  int N = *((int *)p);
50  char k[80];
51  char v[80];
52  int cc;
53  for (int i = 0; i < N; i++) {
54  snprintf(k, 80, "key%d", i);
55  snprintf(v, 80, "value%d", i);
56  struct kmr_kv_box kv = {
57  .klen = (int)(strlen(k) + 1), .vlen = (int)(strlen(v) + 1),
58  .k.p = k, .v.p = v};
59  cc = kmr_add_kv(kvo, kv);
60  assert(cc == MPI_SUCCESS);
61  }
62  return MPI_SUCCESS;
63 }
64 
65 static int
66 replacevalues0(const struct kmr_kv_box kv0,
67  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i)
68 {
69  char buf[1024];
70  assert(kvs0 != 0 && kvo != 0);
71  int cc, x;
72  char gomi;
73  cc = sscanf((&((char *)kv0.k.p)[3]), "%d%c", &x, &gomi);
74  assert(cc == 1);
75  snprintf(buf, sizeof(buf), "newvalue%d", x);
76  int vlen = (int)(strlen(buf) + 1);
77  struct kmr_kv_box kv = {.klen = kv0.klen,
78  .vlen = vlen,
79  .k.p = kv0.k.p,
80  .v.p = buf};
81  cc = kmr_add_kv(kvo, kv);
82  assert(cc == MPI_SUCCESS);
83  return MPI_SUCCESS;
84 }
85 
86 /* Aggregates reduction pairs. It asserts key equality for
87  checking. */
88 
89 static int
90 aggregatevalues0(const struct kmr_kv_box kv[], const long n,
91  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
92 {
93  int nprocs, rank;
94  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
95  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
96  assert(n == nprocs);
97  for (int i = 0; i < n; i++) {
98  if (i > 0) {
99  struct kmr_kv_box b0 = kv[0];
100  switch (kvs->c.key_data) {
101  case KMR_KV_BAD:
102  assert(kvs->c.key_data != KMR_KV_BAD);
103  break;
104  case KMR_KV_INTEGER:
105  assert(kv[i].klen == b0.klen
106  && kv[i].k.i == b0.k.i);
107  break;
108  case KMR_KV_FLOAT8:
109  assert(kv[i].klen == b0.klen
110  && kv[i].k.d == b0.k.d);
111  break;
112  case KMR_KV_OPAQUE:
113  case KMR_KV_CSTRING:
114  case KMR_KV_POINTER_OWNED:
115  case KMR_KV_POINTER_UNMANAGED:
116  assert(kv[i].klen == b0.klen
117  && memcmp(kv[i].k.p, b0.k.p, (size_t)b0.klen) == 0);
118  break;
119  default:
120  assert(0);
121  break;
122  }
123  }
124  }
125  char buf[48];
126  int cc;
127  snprintf(buf, 48, "%ld_*_%s", n, kv[0].v.p);
128  int len = (int)(strlen(buf) + 1);
129  struct kmr_kv_box akv = {.klen = kv[0].klen,
130  .vlen = len,
131  .k.p = kv[0].k.p,
132  .v.p = buf};
133  cc = kmr_add_kv(kvo, akv);
134  assert(cc == MPI_SUCCESS);
135  return MPI_SUCCESS;
136 }
137 
138 static int
139 checkkeyvalues0(const struct kmr_kv_box kv0,
140  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i)
141 {
142  KMR *mr = kvs0->c.mr;
143  char buf[1024];
144  int cc;
145  int x;
146  char gomi;
147  cc = sscanf((&((char *)kv0.k.p)[3]), "%d%c", &x, &gomi);
148  assert(cc == 1);
149  snprintf(buf, sizeof(buf), "%d_*_newvalue%d", mr->nprocs, x);
150  assert(strlen(kv0.v.p) == strlen(buf));
151  size_t len = strlen(buf);
152  assert(strncmp((kv0.v.p), buf, len) == 0);
153  return MPI_SUCCESS;
154 }
155 
156 /* Tests simplest. It shrinks the block-size (preset_block_size), to
157  test growing buffers. */
158 
159 static void
160 simple0(int nprocs, int rank, _Bool pushoff)
161 {
162  MPI_Barrier(MPI_COMM_WORLD);
163  usleep(50 * 1000);
164  if (!pushoff) {
165  if (rank == 0) {printf("CHECK SIMPLE OPERATIONS...\n");}
166  } else {
167  if (rank == 0) {printf("CHECK PUSH-OFF KVS...\n");}
168  }
169  fflush(0);
170  usleep(50 * 1000);
171 
172  int cc;
173 
174  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
175  assert(mr != 0);
176  mr->pushoff_stat = 1;
177  mr->preset_block_size = 750;
178  //mr->pushoff_block_size = 1024;
179 
180  int N = 1000000;
181 
182  /* Put pairs. */
183 
184  MPI_Barrier(mr->comm);
185  usleep(50 * 1000);
186  if (rank == 0) {printf("ADD (%d elements)\n", N);}
187  fflush(0);
188  usleep(50 * 1000);
189 
190  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
191  cc = kmr_map_on_rank_zero(kvs0, &N, kmr_noopt, addsomekeys0);
192  assert(cc == MPI_SUCCESS);
193 
194  long cnt0;
195  cc = kmr_get_element_count(kvs0, &cnt0);
196  assert(cc == MPI_SUCCESS);
197  assert(cnt0 == N);
198 
199  /* Replicate pairs to all ranks. */
200 
201  MPI_Barrier(mr->comm);
202  usleep(50 * 1000);
203  if (rank == 0) {printf("REPLICATE\n");}
204  fflush(0);
205  usleep(50 * 1000);
206 
207  KMR_KVS *kvs1 = kmr_create_kvs(mr, kvs0->c.key_data, kvs0->c.value_data);
208  cc = kmr_replicate(kvs0, kvs1, kmr_noopt);
209  assert(cc == MPI_SUCCESS);
210 
211  long cnt1;
212  cc = kmr_get_element_count(kvs1, &cnt1);
213  assert(cc == MPI_SUCCESS);
214  assert(cnt1 == (N * nprocs));
215 
216  /* Pack and Unpack. */
217 
218  MPI_Barrier(mr->comm);
219  usleep(50 * 1000);
220  if (rank == 0) {printf("PACK+UNPACK\n");}
221  fflush(0);
222  usleep(50 * 1000);
223 
224  void *data = 0;
225  size_t sz = 0;
226  cc = kmr_save_kvs(kvs1, &data, &sz, kmr_noopt);
227  assert(cc == MPI_SUCCESS && data != 0 && sz != 0);
228  cc = kmr_free_kvs(kvs1);
229  assert(cc == MPI_SUCCESS);
230 
231  KMR_KVS *kvs1r = kmr_create_kvs(mr, KMR_KV_BAD, KMR_KV_BAD);
232  cc = kmr_restore_kvs(kvs1r, data, sz, kmr_noopt);
233  assert(cc == MPI_SUCCESS);
234  free(data);
235  assert((kvs1r->c.key_data == KMR_KV_OPAQUE
236  || kvs1r->c.key_data == KMR_KV_CSTRING)
237  && (kvs1r->c.value_data == KMR_KV_OPAQUE
238  || kvs1r->c.value_data == KMR_KV_CSTRING));
239 
240  /* Map pairs. */
241 
242  MPI_Barrier(mr->comm);
243  usleep(50 * 1000);
244  if (rank == 0) {printf("MAP\n");}
245  fflush(0);
246  usleep(50 * 1000);
247 
248  KMR_KVS *kvs2;
249  if (!pushoff) {
250  kvs2 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
251  } else {
252  kvs2 = kmr_create_pushoff_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE,
253  kmr_noopt,
254  __FILE__, __LINE__, __func__);
255  }
256  cc = kmr_map(kvs1r, kvs2, 0, kmr_noopt, replacevalues0);
257  assert(cc == MPI_SUCCESS);
258 
259  long cnt2;
260  cc = kmr_get_element_count(kvs2, &cnt2);
261  assert(cc == MPI_SUCCESS);
262  assert(cnt2 == (N * nprocs));
263 
264  /* Collect pairs by theirs keys. */
265 
266  MPI_Barrier(mr->comm);
267  usleep(50 * 1000);
268  if (rank == 0) {printf("SHUFFLE\n");}
269  fflush(0);
270  usleep(50 * 1000);
271 
272  KMR_KVS *kvs3 = kmr_create_kvs(mr, kvs2->c.key_data, kvs2->c.value_data);
273  cc = kmr_shuffle(kvs2, kvs3, kmr_noopt);
274  assert(cc == MPI_SUCCESS);
275 
276 #if 0
277  if (!pushoff) {
278  kvs3 = kmr_create_kvs(mr, kvs2->c.key_data, kvs2->c.value_data);
279  cc = kmr_shuffle(kvs2, kvs3, kmr_noopt);
280  } else {
281  kvs3 = kvs2;
282  kvs2 = 0;
283  cc = MPI_SUCCESS;
284  }
285  assert(cc == MPI_SUCCESS);
286 #endif
287 
288  long cnt3;
289  cc = kmr_get_element_count(kvs3, &cnt3);
290  assert(cc == MPI_SUCCESS);
291  assert(cnt3 == (N * nprocs));
292 
293  /* Reduce collected pairs. */
294 
295  MPI_Barrier(mr->comm);
296  usleep(50 * 1000);
297  if (rank == 0) {printf("REDUCE\n");}
298  fflush(0);
299  usleep(50 * 1000);
300 
301  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
302  cc = kmr_reduce(kvs3, kvs4, 0, kmr_noopt, aggregatevalues0);
303  assert(cc == MPI_SUCCESS);
304 
305  //kmr_dump_kvs(kvs4, 0);
306 
307  long cnt4;
308  cc = kmr_get_element_count(kvs4, &cnt4);
309  assert(cc == MPI_SUCCESS);
310  assert(cnt4 == N);
311 
312  /* Gather pairs to rank0. */
313 
314  MPI_Barrier(mr->comm);
315  usleep(50 * 1000);
316  if (rank == 0) {printf("GATHER\n");}
317  fflush(0);
318  usleep(50 * 1000);
319 
320  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
321  struct kmr_option opt = {.rank_zero = 1};
322  cc = kmr_replicate(kvs4, kvs5, opt);
323  assert(cc == MPI_SUCCESS);
324 
325  long cnt5;
326  cc = kmr_get_element_count(kvs5, &cnt5);
327  assert(cc == MPI_SUCCESS);
328  assert(cnt5 == N);
329 
330  /* Check key-value pairs (pairs on rank0 only). */
331 
332  cc = kmr_map(kvs5, 0, 0, kmr_noopt, checkkeyvalues0);
333  assert(cc == MPI_SUCCESS);
334 
335  if (pushoff && mr->pushoff_stat) {
336  char *s = "STATISTICS on push-off kvs:\n";
337  kmr_print_statistics_on_pushoff(mr, s);
338  }
339 
340  cc = kmr_free_context(mr);
341  assert(cc == MPI_SUCCESS);
342 }
343 
344 static int
345 addkeys1(const struct kmr_kv_box kv0,
346  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i_)
347 {
348  assert(kvs0 == 0);
349  KMR *mr = kvo->c.mr;
350  assert(mr->rank == 0);
351  long *arg = p;
352  int NN = (int)*arg;
353  for (int i = 0; i < NN; i++) {
354  struct kmr_kv_box kv = {
355  .klen = sizeof(long),
356  .vlen = sizeof(long),
357  .k.i = i,
358  .v.i = i
359  };
360  kmr_add_kv(kvo, kv);
361  }
362  return MPI_SUCCESS;
363 }
364 
365 static int
366 checksorted1(const struct kmr_kv_box kv0,
367  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i)
368 {
369  assert(kv0.k.i == i && kv0.v.i == i);
370  return MPI_SUCCESS;
371 }
372 
373 /* Tests distribute(). */
374 
375 static void
376 simple1(int nprocs, int rank)
377 {
378  MPI_Barrier(MPI_COMM_WORLD);
379  if (rank == 0) {printf("DISTIBUTE\n");}
380  fflush(0);
381  usleep(50 * 1000);
382 
383  /* Check with 10 keys for each rank. */
384 
385  const long MM = 10;
386  const long NN = MM * nprocs;
387 
388  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
389  assert(mr != 0);
390 
391  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
392  kmr_map_on_rank_zero(kvs0, (void *)&NN, kmr_noopt, addkeys1);
393 
394  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
395  kmr_distribute(kvs0, kvs1, 1, kmr_noopt);
396  //kmr_dump_kvs(kvs1, 0);
397  assert(kvs1->c.element_count == MM);
398 
399  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
400  kmr_sort_small(kvs1, kvs2, kmr_noopt);
401  //kmr_dump_kvs(kvs2, 0);
402 
403  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
404  kmr_distribute(kvs2, kvs3, 0, kmr_noopt);
405  //kmr_dump_kvs(kvs3, 0);
406  assert(kvs3->c.element_count == MM);
407 
408  /* Check key-value pairs (after collecting to rank0). */
409 
410  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
411  struct kmr_option opt = {.rank_zero = 1};
412  kmr_replicate(kvs3, kvs4, opt);
413 
414  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
415  kmr_sort_locally(kvs4, kvs5, 0, kmr_noopt);
416 
417  kmr_map(kvs5, 0, 0, kmr_noopt, checksorted1);
418 
419  kmr_free_context(mr);
420 }
421 
422 /* Tests on empty KVS. */
423 
424 static void
425 simple2(int nprocs, int rank)
426 {
427  MPI_Barrier(MPI_COMM_WORLD);
428  usleep(50 * 1000);
429  if (rank == 0) {printf("CHECK OPERATIONS WITH EMPTY KVS...\n");}
430  fflush(0);
431  usleep(50 * 1000);
432 
433  int cc;
434 
435  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
436  assert(mr != 0);
437 
438  /* Make empty KVS. */
439 
440  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
441  cc = kmr_add_kv_done(kvs0);
442  assert(cc == MPI_SUCCESS);
443 
444  long cnt0;
445  cc = kmr_get_element_count(kvs0, &cnt0);
446  assert(cc == MPI_SUCCESS);
447  assert(cnt0 == 0);
448 
449  /* Replicate. */
450 
451  KMR_KVS *kvs1 = kmr_create_kvs(mr, kvs0->c.key_data, kvs0->c.value_data);
452  cc = kmr_replicate(kvs0, kvs1, kmr_noopt);
453  assert(cc == MPI_SUCCESS);
454 
455  /* Map. */
456 
457  KMR_KVS *kvs2 = kmr_create_kvs(mr, kvs1->c.key_data, kvs1->c.value_data);
458  cc = kmr_map(kvs1, kvs2, 0, kmr_noopt, 0);
459  assert(cc == MPI_SUCCESS);
460 
461  /* Shuffle. */
462 
463  KMR_KVS *kvs3 = kmr_create_kvs(mr, kvs2->c.key_data, kvs2->c.value_data);
464  cc = kmr_shuffle(kvs2, kvs3, kmr_noopt);
465  assert(cc == MPI_SUCCESS);
466 
467  /* Reduce. */
468 
469  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
470  cc = kmr_reduce(kvs3, kvs4, 0, kmr_noopt, 0);
471  assert(cc == MPI_SUCCESS);
472 
473  /* Sort. */
474 
475  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
476  cc = kmr_sort(kvs4, kvs5, kmr_noopt);
477  assert(cc == MPI_SUCCESS);
478  KMR_KVS *kvs6 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
479  cc = kmr_sort_locally(kvs5, kvs6, 0, kmr_noopt);
480  assert(cc == MPI_SUCCESS);
481  cc = kmr_free_kvs(kvs6);
482  assert(cc == MPI_SUCCESS);
483 
484  /* Concat. */
485 
486  KMR_KVS *kvs70 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
487  cc = kmr_add_kv_done(kvs70);
488  assert(cc == MPI_SUCCESS);
489  KMR_KVS *kvs71 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
490  cc = kmr_add_kv_done(kvs71);
491  assert(cc == MPI_SUCCESS);
492  KMR_KVS *kvs72 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
493  KMR_KVS *vec[] = {kvs70, kvs71};
494  kmr_concatenate_kvs(vec, 2, kvs72, kmr_noopt);
495  assert(cc == MPI_SUCCESS);
496  cc = kmr_free_kvs(kvs72);
497  assert(cc == MPI_SUCCESS);
498 
499  cc = kmr_free_context(mr);
500  assert(cc == MPI_SUCCESS);
501 }
502 
503 /* Tests on option (property list) loader. */
504 
505 static void
506 simple3(int nprocs, int rank)
507 {
508  MPI_Barrier(MPI_COMM_WORLD);
509  usleep(50 * 1000);
510  if (rank == 0) {printf("CHECK LOADING OPTIONS (PROPERTY LIST)...\n");}
511  fflush(0);
512  usleep(50 * 1000);
513 
514  static char props[] =
515  "# Test file for property loader.\n"
516  "key0=value0\n"
517  "key1=\n"
518  "=value2\n"
519  "key3 value3\n"
520  "key4\\\n" " " ":value4\n"
521  "ke\\\n" " " "y5=val\\\n" " " "ue5\n"
522  "ke\\\n" " " "\\\n" " " "y6\\\n"
523  "=val\\\n" " " "\\\n" " " "ue6\n"
524  "key\\u0037=\\u0076alue7\n"
525  "key#is!anything=value#is!much=more\n";
526 
527  static char *pairs[][2] =
528  {{"key0", "value0"},
529  {"key1", ""},
530  {"", "value2"},
531  {"key3", "value3"},
532  {"key4", "value4"},
533  {"key5", "value5"},
534  {"key6", "value6"},
535  {"key7", "value7"},
536  {"key#is!anything", "value#is!much=more"}};
537 
538  int npairs = (sizeof(pairs) / (sizeof(char *) * 2));
539 
540  int cc;
541 
542  if (rank == 0) {
543  if (1) {
544  system("rm -f properties");
545  FILE *f = fopen("properties", "w");
546  assert(f != 0);
547  size_t cx = fwrite(props, (sizeof(props) - 1), 1, f);
548  assert(cx == 1);
549  cc = fclose(f);
550  assert(cc == 0);
551  }
552 
553  MPI_Info info;
554  MPI_Info_create(&info);
555 
556  cc = kmr_load_properties(info, "properties");
557  assert(cc == MPI_SUCCESS);
558 
559  //kmr_dump_mpi_info("", info);
560 
561  /* Decrease count by one, because an empty key is ignore. */
562 
563  {
564  int nkeys;
565  cc = MPI_Info_get_nkeys(info, &nkeys);
566  assert(cc == MPI_SUCCESS);
567  /* (SKIP EMPTY KEY AND EMPTY VALUE IN TEST DATA). */
568  assert((npairs - 2) == nkeys);
569  for (int i = 0; i < npairs; i++) {
570  if (*pairs[i][0] == 0) {
571  continue;
572  }
573  if (*pairs[i][1] == 0) {
574  continue;
575  }
576  char *key = pairs[i][0];
577  char value[MPI_MAX_INFO_VAL + 1];
578  int vlen;
579  int flag;
580  cc = MPI_Info_get_valuelen(info, key, &vlen, &flag);
581  assert(cc == MPI_SUCCESS && flag != 0);
582  assert(vlen <= MPI_MAX_INFO_VAL);
583  cc = MPI_Info_get(info, key, MPI_MAX_INFO_VAL, value, &flag);
584  assert(cc == MPI_SUCCESS && flag != 0);
585  assert(strcmp(pairs[i][1], value) == 0);
586  }
587  }
588 
589  MPI_Info_free(&info);
590 
591  if (1) {
592  system("rm -f properties");
593  }
594  }
595 }
596 
597 /* Tests on sorting for small number of entries. */
598 
599 static void
600 simple4(int nprocs, int rank)
601 {
602 #define KLEN 10
603 #define VLEN 90
604 
605  MPI_Barrier(MPI_COMM_WORLD);
606  usleep(50 * 1000);
607  if (rank == 0) {printf("CHECK SORTER (SMALL DATA)...\n");}
608  fflush(0);
609  usleep(50 * 1000);
610 
611  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
612  assert(mr != 0);
613 
614  int cc;
615  char k[KLEN+1];
616  char v[VLEN+1];
617 
618  struct kmr_option inspect = kmr_noopt;
619  inspect.inspect = 1;
620 
621  /* Local sort on opaques. */
622 
623  {
624  int N = 1000 * 1000;
625 
626  if (rank == 0) {
627  printf("Checking local sort on opaques N=%d\n", N);
628  printf("Generating random strings...\n");
629  fflush(0);
630  }
631 
632  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
633  for (int i = 0; i < N; i++) {
634  const int len = 100;
635  char bb[128];
636  for (int j = 0; j < (len - 1); j++) {
637  long vv = lrand48();
638  assert(vv >= 0);
639  int c0 = (int)(vv % 26);
640  int c1 = ('A' + (c0 - 0));
641  bb[j] = (char)c1;
642  }
643  bb[(len - 1)] = 0;
644  struct kmr_kv_box kv = {
645  .klen = len,
646  .vlen = len,
647  .k.p = bb,
648  .v.p = bb
649  };
650  cc = kmr_add_kv(kvs0, kv);
651  assert(cc == MPI_SUCCESS);
652  }
653  cc = kmr_add_kv_done(kvs0);
654  assert(cc == MPI_SUCCESS);
655 
656  //kmr_dump_kvs(kvs0, 0); fflush(0);
657 
658  if (rank == 0) {printf("Sorting (normal)...\n"); fflush(0);}
659 
660  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
661  cc = kmr_sort_locally(kvs0, kvs1, 0, kmr_noopt);
662  assert(cc == MPI_SUCCESS);
663  if (rank == 0) {printf("Checking...\n"); fflush(0);}
664  kmr_assert_sorted(kvs1, 1, 0, 0);
665 
666  if (rank == 0) {printf("Sorting (shuffle)...\n"); fflush(0);}
667 
668  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
669  cc = kmr_sort_locally(kvs1, kvs2, 1, kmr_noopt);
670  assert(cc == MPI_SUCCESS);
671  if (rank == 0) {printf("Check...\n"); fflush(0);}
672  kmr_assert_sorted(kvs2, 1, 1, 0);
673 
674  cc = kmr_free_kvs(kvs2);
675  assert(cc == MPI_SUCCESS);
676  }
677 
678  /* Local sort on integers. */
679 
680  {
681  int N = 1000 * 1000;
682 
683  if (rank == 0) {
684  printf("Checking local sort on integers N=%d\n", N);
685  printf("Generating random numbers...\n");
686  fflush(0);
687  }
688 
689  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
690  for (int i = 0; i < N; i++) {
691  long vv = ((mrand48() << 32) ^ mrand48());
692  struct kmr_kv_box kv = {
693  .klen = sizeof(long),
694  .vlen = sizeof(long),
695  .k.i = vv,
696  .v.i = vv
697  };
698  cc = kmr_add_kv(kvs0, kv);
699  assert(cc == MPI_SUCCESS);
700  }
701  cc = kmr_add_kv_done(kvs0);
702  assert(cc == MPI_SUCCESS);
703 
704  if (rank == 0) {printf("Sorting (normal)...\n"); fflush(0);}
705 
706  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
707  cc = kmr_sort_locally(kvs0, kvs1, 0, kmr_noopt);
708  assert(cc == MPI_SUCCESS);
709  //kmr_dump_kvs(kvs1, 0); fflush(0);
710  if (rank == 0) {printf("Checking...\n"); fflush(0);}
711  kmr_assert_sorted(kvs1, 1, 0, 0);
712 
713  if (rank == 0) {printf("Sorting (shuffle)...\n"); fflush(0);}
714 
715  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
716  cc = kmr_sort_locally(kvs1, kvs2, 1, kmr_noopt);
717  assert(cc == MPI_SUCCESS);
718  if (rank == 0) {printf("Checking...\n"); fflush(0);}
719  kmr_assert_sorted(kvs2, 1, 1, 0);
720 
721  cc = kmr_free_kvs(kvs2);
722  assert(cc == MPI_SUCCESS);
723  }
724 
725  /* Local sort on doubles. */
726 
727  {
728  int N = 1000 * 1000;
729 
730  if (rank == 0) {
731  printf("Checking local sort on doubles N=%d\n", N);
732  printf("Generating random numbers...\n");
733  fflush(0);
734  }
735 
736  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_FLOAT8, KMR_KV_FLOAT8);
737  for (int i = 0; i < N; i++) {
738  double vv = (drand48() - 0.5) * 10000.0;
739  struct kmr_kv_box kv = {
740  .klen = sizeof(double),
741  .vlen = sizeof(double),
742  .k.d = vv,
743  .v.d = vv
744  };
745  cc = kmr_add_kv(kvs0, kv);
746  assert(cc == MPI_SUCCESS);
747  }
748  cc = kmr_add_kv_done(kvs0);
749  assert(cc == MPI_SUCCESS);
750 
751  if (rank == 0) {printf("Sorting (normal)...\n"); fflush(0);}
752 
753  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_FLOAT8, KMR_KV_FLOAT8);
754  cc = kmr_sort_locally(kvs0, kvs1, 0, kmr_noopt);
755  assert(cc == MPI_SUCCESS);
756  //kmr_dump_kvs(kvs1, 0); fflush(0);
757  if (rank == 0) {printf("Checking...\n"); fflush(0);}
758  kmr_assert_sorted(kvs1, 1, 0, 0);
759 
760  if (rank == 0) {printf("Sorting (shuffle)...\n"); fflush(0);}
761 
762  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_FLOAT8, KMR_KV_FLOAT8);
763  cc = kmr_sort_locally(kvs1, kvs2, 1, kmr_noopt);
764  assert(cc == MPI_SUCCESS);
765  if (rank == 0) {printf("Checking...\n"); fflush(0);}
766  kmr_assert_sorted(kvs2, 1, 1, 0);
767 
768  cc = kmr_free_kvs(kvs2);
769  assert(cc == MPI_SUCCESS);
770  }
771 
772  /* Sort short opaque key-values. */
773 
774  {
775  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
776  int N = 1000;
777  for (int i = 0; i < N; i++) {
778  memset(k, 0, sizeof(k));
779  memset(v, 0, sizeof(v));
780  snprintf(k, (KLEN+1), "%05d%05d", rank, (N - 1 - i));
781  snprintf(v, (VLEN+1), "value=%d/%d", rank, i);
782  struct kmr_kv_box kv = {
783  .klen = KLEN, .vlen = VLEN, .k.p = k, .v.p = v};
784  cc = kmr_add_kv(kvs0, kv);
785  assert(cc == MPI_SUCCESS);
786  }
787  cc = kmr_add_kv_done(kvs0);
788  assert(cc == MPI_SUCCESS);
789 
790  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
791  cc = kmr_sort_locally(kvs0, kvs1, 0, inspect);
792  assert(cc == MPI_SUCCESS);
793  //kmr_dump_kvs(kvs1, 0);
794  kmr_assert_sorted(kvs1, 1, 0, 0);
795 
796  cc = kmr_free_kvs(kvs0);
797  assert(cc == MPI_SUCCESS);
798  cc = kmr_free_kvs(kvs1);
799  assert(cc == MPI_SUCCESS);
800  }
801 
802  /* (nprocs x nprocs) entries. */
803 
804  {
805  MPI_Barrier(MPI_COMM_WORLD);
806  usleep(50 * 1000);
807  if (rank == 0) {printf("SORT (int) nprocs x nprocs data...\n");}
808  fflush(0);
809  usleep(50 * 1000);
810 
811  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
812  int N = nprocs;
813  for (int i = 0; i < N; i++) {
814  snprintf(v, 80, "value=%d/%d", rank, i);
815  struct kmr_kv_box kv = {
816  .klen = (int)sizeof(long),
817  .vlen = (int)(strlen(v) + 1),
818  .k.i = (100 * i),
819  .v.p = v};
820  cc = kmr_add_kv(kvs0, kv);
821  assert(cc == MPI_SUCCESS);
822  }
823  cc = kmr_add_kv_done(kvs0);
824  assert(cc == MPI_SUCCESS);
825 
826  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
827  cc = kmr_sort_small(kvs0, kvs1, inspect);
828  assert(cc == MPI_SUCCESS);
829  //kmr_dump_kvs(kvs1, 0);
830  kmr_assert_sorted(kvs1, 0, 0, 0);
831 
832  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
833  cc = kmr_sort_large(kvs0, kvs2, inspect);
834  assert(cc == MPI_SUCCESS);
835  //kmr_dump_kvs(kvs2, 0);
836  kmr_assert_sorted(kvs2, 0, 0, 0);
837 
838  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
839  cc = kmr_sort_by_one(kvs0, kvs3, inspect);
840  assert(cc == MPI_SUCCESS);
841  kmr_assert_sorted(kvs3, 0, 0, 0);
842 
843  cc = kmr_free_kvs(kvs0);
844  assert(cc == MPI_SUCCESS);
845  }
846 
847  /* nprocs entries (on a sole rank). */
848 
849  {
850  MPI_Barrier(MPI_COMM_WORLD);
851  usleep(50 * 1000);
852  if (rank == 0) {printf("SORT (int) nprocs data...\n");}
853  fflush(0);
854  usleep(50 * 1000);
855 
856  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
857  int N = nprocs;
858  if (rank == (nprocs - 1)) {
859  for (int i = 0; i < N; i++) {
860  snprintf(v, 80, "value=%d/%d", rank, i);
861  struct kmr_kv_box kv = {
862  .klen = (int)sizeof(long),
863  .vlen = (int)(strlen(v) + 1),
864  .k.i = (100 * i),
865  .v.p = v};
866  cc = kmr_add_kv(kvs0, kv);
867  assert(cc == MPI_SUCCESS);
868  }
869  }
870  cc = kmr_add_kv_done(kvs0);
871  assert(cc == MPI_SUCCESS);
872 
873  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
874  cc = kmr_sort_small(kvs0, kvs1, inspect);
875  assert(cc == MPI_SUCCESS);
876  //kmr_dump_kvs(kvs1, 0);
877  kmr_assert_sorted(kvs1, 0, 0, 0);
878 
879  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
880  cc = kmr_sort_large(kvs0, kvs2, inspect);
881  assert(cc == MPI_SUCCESS);
882  //kmr_dump_kvs(kvs2, 0);
883  kmr_assert_sorted(kvs2, 0, 0, 0);
884 
885  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
886  cc = kmr_sort_by_one(kvs0, kvs3, inspect);
887  assert(cc == MPI_SUCCESS);
888  kmr_assert_sorted(kvs3, 0, 0, 0);
889 
890  cc = kmr_free_kvs(kvs0);
891  assert(cc == MPI_SUCCESS);
892  }
893 
894  /* (1/2 x N x nprocs x nprocs) entries. */
895 
896  {
897  MPI_Barrier(MPI_COMM_WORLD);
898  usleep(50 * 1000);
899  if (rank == 0) {printf("SORT (int) N x nprocs x nprocs data...\n");}
900  fflush(0);
901  usleep(50 * 1000);
902 
903  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
904  int N = (100 * nprocs);
905  for (int i = 0; i < N; i++) {
906  snprintf(v, 80, "value=%d/%d", rank, i);
907  struct kmr_kv_box kv = {
908  .klen = (int)sizeof(long),
909  .vlen = (int)(strlen(v) + 1),
910  .k.i = (100 * i),
911  .v.p = v};
912  cc = kmr_add_kv(kvs0, kv);
913  assert(cc == MPI_SUCCESS);
914  }
915  cc = kmr_add_kv_done(kvs0);
916  assert(cc == MPI_SUCCESS);
917 
918  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
919  cc = kmr_sort_small(kvs0, kvs1, inspect);
920  assert(cc == MPI_SUCCESS);
921  //kmr_dump_kvs(kvs1, 0);
922  kmr_assert_sorted(kvs1, 0, 0, 0);
923 
924  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
925  cc = kmr_sort_large(kvs0, kvs2, inspect);
926  assert(cc == MPI_SUCCESS);
927  //kmr_dump_kvs(kvs2, 0);
928  kmr_assert_sorted(kvs2, 0, 0, 0);
929 
930  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
931  cc = kmr_sort_by_one(kvs0, kvs3, inspect);
932  assert(cc == MPI_SUCCESS);
933  kmr_assert_sorted(kvs3, 0, 0, 0);
934 
935  cc = kmr_free_kvs(kvs0);
936  assert(cc == MPI_SUCCESS);
937  }
938 
939  /* (nprocs x nprocs) entries. */
940 
941  {
942  MPI_Barrier(MPI_COMM_WORLD);
943  usleep(50 * 1000);
944  if (rank == 0) {printf("SORT (byte array) nprocs x nprocs data...\n");}
945  fflush(0);
946  usleep(50 * 1000);
947 
948  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
949  int N = nprocs;
950  for (int i = 0; i < N; i++) {
951  memset(k, 0, sizeof(k));
952  memset(v, 0, sizeof(v));
953  snprintf(k, (KLEN+1), "%05d%05d", rank, (N - 1 - i));
954  snprintf(v, (VLEN+1), "value=%d/%d", rank, i);
955  struct kmr_kv_box kv = {
956  .klen = KLEN, .vlen = VLEN, .k.p = k, .v.p = v};
957  cc = kmr_add_kv(kvs0, kv);
958  assert(cc == MPI_SUCCESS);
959  }
960  cc = kmr_add_kv_done(kvs0);
961  assert(cc == MPI_SUCCESS);
962 
963  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
964  cc = kmr_sort_small(kvs0, kvs1, inspect);
965  assert(cc == MPI_SUCCESS);
966  //kmr_dump_kvs(kvs1, 0);
967  kmr_assert_sorted(kvs1, 0, 0, 0);
968 
969  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
970  cc = kmr_sort_large(kvs0, kvs2, inspect);
971  assert(cc == MPI_SUCCESS);
972  //kmr_dump_kvs(kvs2, 0);
973  kmr_assert_sorted(kvs2, 0, 0, 0);
974 
975  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
976  cc = kmr_sort_by_one(kvs0, kvs3, inspect);
977  assert(cc == MPI_SUCCESS);
978  kmr_assert_sorted(kvs3, 0, 0, 0);
979 
980  cc = kmr_free_kvs(kvs0);
981  assert(cc == MPI_SUCCESS);
982  }
983 
984  cc = kmr_free_context(mr);
985  assert(cc == MPI_SUCCESS);
986 }
987 
988 static int
989 kmr_icmp(const void *a0, const void *a1)
990 {
991  const long *p0 = a0;
992  const long *p1 = a1;
993  long d = (*p0 - *p1);
994  return ((d == 0) ? 0 : ((d < 0) ? -1 : 1));
995 }
996 
997 /* Check bsearch utility. */
998 
999 static void
1000 simple5(int nprocs, int rank)
1001 {
1002  MPI_Barrier(MPI_COMM_WORLD);
1003  usleep(50 * 1000);
1004  if (rank == 0) {printf("CHECK BSEARCH UTILITY...\n");}
1005  fflush(0);
1006  usleep(50 * 1000);
1007 
1008  /* Check with length 20-40. */
1009 
1010 #define NN 41
1011 #define A0(I) (10 + 10 * (I))
1012 
1013  long a0[NN];
1014  for (int i = 0; i < NN; i++) {
1015  a0[i] = A0(i);
1016  }
1017 
1018  for (int N = 20; N < NN; N++) {
1019  long *p0;
1020  long k;
1021 
1022  k = A0(-1);
1023  p0 = kmr_bsearch(&k, a0, (size_t)N, sizeof(long), kmr_icmp);
1024  assert(p0 == &a0[0]);
1025 
1026  for (int i = 0; i < N; i++) {
1027  k = A0(i);
1028  p0 = kmr_bsearch(&k, a0, (size_t)N, sizeof(long), kmr_icmp);
1029  assert(p0 == &a0[i]);
1030  }
1031 
1032  k = A0(N);
1033  p0 = kmr_bsearch(&k, a0, (size_t)N, sizeof(long), kmr_icmp);
1034  assert(p0 == &a0[N]);
1035  }
1036 
1037  MPI_Barrier(MPI_COMM_WORLD);
1038  usleep(50 * 1000);
1039 
1040 #undef NN
1041 #undef A0
1042 }
1043 
1044 /* Test kmr_map_for_some(). */
1045 
1046 static int
1047 addkeys6(const struct kmr_kv_box kv0,
1048  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i_)
1049 {
1050  long *arg = p;
1051  int NN = (int)*arg;
1052  for (int i = 0; i < NN; i++) {
1053  struct kmr_kv_box kv = {
1054  .klen = sizeof(long),
1055  .vlen = sizeof(long),
1056  .k.i = i,
1057  .v.i = i
1058  };
1059  kmr_add_kv(kvo, kv);
1060  }
1061  return MPI_SUCCESS;
1062 }
1063 
1064 static int
1065 addeach6(const struct kmr_kv_box kv,
1066  const KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
1067  const long index)
1068 {
1069  long *counter = arg;
1070  _Pragma("omp critical")
1071  {
1072  (*counter)++;
1073  }
1074  kmr_add_kv(kvo, kv);
1075  return MPI_SUCCESS;
1076 }
1077 
1078 static int
1079 addnone6(const struct kmr_kv_box kv,
1080  const KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
1081  const long index)
1082 {
1083  long *counter = arg;
1084  _Pragma("omp critical")
1085  {
1086  (*counter)++;
1087  }
1088  return MPI_SUCCESS;
1089 }
1090 
1091 static void
1092 simple6(int nprocs, int rank)
1093 {
1094  MPI_Barrier(MPI_COMM_WORLD);
1095  usleep(50 * 1000);
1096  if (rank == 0) {printf("CHECK KMR_MAP_FOR_SOME()...\n");}
1097  fflush(0);
1098  usleep(50 * 1000);
1099 
1100  struct kmr_option inspect = {.inspect = 1};
1101 
1102  /* Check with 10000 keys. */
1103 
1104  const long NN = 10000;
1105 
1106  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
1107  assert(mr != 0);
1108 
1109  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
1110  kmr_map_once(kvs0, (void *)&NN, kmr_noopt, 0, addkeys6);
1111 
1112  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
1113  kmr_distribute(kvs0, kvs1, 1, kmr_noopt);
1114  assert(kvs1->c.element_count == NN);
1115 
1116  long c1 = 0;
1117  kmr_get_element_count(kvs1, &c1);
1118 
1119  long calls2 = 0;
1120  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
1121  kmr_map_for_some(kvs1, kvs2, &calls2, inspect, addeach6);
1122  long c2 = 0;
1123  kmr_get_element_count(kvs2, &c2);
1124  kmr_free_kvs(kvs2);
1125 
1126  if (rank == 0) {
1127  printf("ADD ALL count(kvs2)=%ld for %ld (calls=%ld)\n",
1128  c2, c1, calls2);
1129  fflush(0);
1130  }
1131  assert(c2 > 0 && c2 <= c1);
1132 
1133  long calls3 = 0;
1134  long e1 = kvs1->c.element_count;
1135  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
1136  kmr_map_for_some(kvs1, kvs3, &calls3, inspect, addnone6);
1137  long c3 = 0;
1138  kmr_get_element_count(kvs3, &c3);
1139  kmr_free_kvs(kvs3);
1140 
1141  if (rank == 0) {
1142  printf("ADD NONE count(kvs3)=%ld for %ld (calls=%ld for %ld)\n",
1143  c3, c1, calls3, e1);
1144  fflush(0);
1145  }
1146  assert(c3 == 0 && e1 == calls3);
1147 
1148  kmr_free_kvs(kvs1);
1149 
1150  /* Check using KVS after inspecting by kmr_take_one(). */
1151 
1152  MPI_Barrier(MPI_COMM_WORLD);
1153  usleep(50 * 1000);
1154  if (rank == 0) {printf("CHECK KMR_TAKE_ONE()...\n");}
1155  fflush(0);
1156  usleep(50 * 1000);
1157 
1158  const long ONE = 1;
1159 
1160  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
1161  kmr_map_once(kvs4, (void *)&ONE, kmr_noopt, 0, addkeys6);
1162 
1163  struct kmr_kv_box kv;
1164  kmr_take_one(kvs4, &kv);
1165  assert(kv.k.i == 0 && kv.v.i == 0);
1166 
1167  long calls4 = 0;
1168  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
1169  kmr_map(kvs4, kvs5, &calls4, kmr_noopt, addeach6);
1170 
1171  kmr_free_kvs(kvs5);
1172 
1173  kmr_free_context(mr);
1174 }
1175 
1176 extern void kmr_isort(void *a, size_t n, size_t es, int depth);
1177 
1178 /* Check isort() utility. */
1179 
1180 static void
1181 simple7(int nprocs, int rank)
1182 {
1183  MPI_Barrier(MPI_COMM_WORLD);
1184  usleep(50 * 1000);
1185  if (rank == 0) {printf("CHECK ISORT UTILITY (local sort)...\n");}
1186  fflush(0);
1187  usleep(50 * 1000);
1188 
1189  int setntheads = 1;
1190 
1191 #ifdef __K
1192  char *fastomp = getenv("FLIB_FASTOMP");
1193  if (!(fastomp != 0 && strcmp(fastomp, "FALSE") == 0)) {
1194  if (rank == 0) {printf("Set environment variable FLIB_FASTOMP=FALSE"
1195  " and run again. Otherwise,"
1196  " omp_set_num_threads() is ignored.\n");}
1197  fflush(0);
1198  setntheads = 0;
1199  }
1200 #endif
1201 
1202  /* Check with length 20. */
1203 
1204  if (1) {
1205  long a0[20] = {19, 18, 17, 16, 15, 14, 13, 12, 11, 10,
1206  9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
1207  size_t n0 = (sizeof(a0) / sizeof(long));
1208  for (int i = 0; i < (int)n0; i++) {
1209  a0[i] += 1000000000000;
1210  }
1211 
1212  if (rank == 0) {printf("Sorting data N=%zd\n", n0);}
1213  fflush(0);
1214 
1215  kmr_isort(a0, n0, sizeof(long), 0);
1216 
1217  for (int i = 0; i < (int)n0; i++) {
1218  assert(a0[i] == (i + 1000000000000));
1219  }
1220 
1221  //if (rank == 0) {printf("OK\n");}
1222  //fflush(0);
1223  }
1224 
1225  /* Check with length 3M. */
1226 
1227  if (1) {
1228  //long n1 = 100000000; /*100M*/
1229  long n1 = 3000000; /*3M*/
1230  long *a1 = malloc(sizeof(long) * (size_t)n1);
1231  if (a1 == 0) {
1232  perror("malloc");
1233  MPI_Abort(MPI_COMM_WORLD, 1);
1234  }
1235 
1236  if (rank == 0) {printf("Sorting data N=%zd\n", n1);}
1237  fflush(0);
1238 
1239  if (rank == 0) {printf("Generating random numbers...\n");}
1240  fflush(0);
1241 
1242  for (long i = 0; i < n1; i++) {
1243  a1[i] = ((((long)rand()) << 31) ^ ((long)rand()));
1244  }
1245 
1246  if (0) {
1247  printf("Problem...\n");
1248  for (long i = 0; i < 10; i++) {
1249  printf("%ld\n", a1[i]);
1250  }
1251  printf("\n");
1252  }
1253 
1254  if (rank == 0) {printf("Sorting (threads=1)...\n");}
1255  fflush(0);
1256 
1257  double t0 = wtime();
1258  kmr_isort(a1, (size_t)n1, sizeof(long), 0);
1259  double t1 = wtime();
1260  if (rank == 0) {printf("time=%f\n", (t1 - t0));}
1261  fflush(0);
1262 
1263  if (0) {
1264  printf("Result...\n");
1265  for (long i = 0; i < 10; i++) {
1266  printf("%ld\n", a1[i]);
1267  }
1268  printf("\n");
1269  }
1270 
1271  if (rank == 0) {printf("Checking...\n");}
1272  long lb = LONG_MIN;
1273  for (long i = 0; i < n1; i++) {
1274  assert(a1[i] >= lb);
1275  if (a1[i] > lb) {
1276  lb = a1[i];
1277  }
1278  }
1279 
1280  //if (rank == 0) {printf("OK\n");}
1281  //fflush(0);
1282  }
1283 
1284  if (1) {
1285 #ifdef _OPENMP
1286  //long n5 = 100000000; /*100M*/
1287  long n5 = 3000000; /*3M*/
1288  long *a5 = malloc(sizeof(long) * (size_t)n5);
1289  if (a5 == 0) {
1290  perror("malloc");
1291  MPI_Abort(MPI_COMM_WORLD, 1);
1292  }
1293 
1294  if (rank == 0) {printf("Sorting data N=%zd\n", n5);}
1295 
1296  for (int threads = 1; threads <= 8; threads *= 2) {
1297  if (setntheads) {
1298  omp_set_num_threads(threads);
1299  }
1300 
1301  int threadsused = 0;
1302 #pragma omp parallel
1303  {
1304  threadsused = omp_get_num_threads();
1305  }
1306 
1307  if (rank == 0) {printf("Generating random numbers...\n");}
1308  fflush(0);
1309 
1310  srand(20140204);
1311  for (long i = 0; i < n5; i++) {
1312  a5[i] = ((((long)rand()) << 31) ^ ((long)rand()));
1313  }
1314 
1315  if (rank == 0) {printf("Sorting (threads=%d)...\n", threadsused);}
1316  fflush(0);
1317 
1318  double t0 = wtime();
1319  kmr_isort(a5, (size_t)n5, sizeof(long), 5);
1320  double t1 = wtime();
1321  if (rank == 0) {printf("time=%f\n", (t1 - t0));}
1322  fflush(0);
1323 
1324  if (rank == 0) {printf("Checking...\n");}
1325  fflush(0);
1326 
1327  long lb5 = LONG_MIN;
1328  for (long i = 0; i < n5; i++) {
1329  assert(a5[i] >= lb5);
1330  if (a5[i] > lb5) {
1331  lb5 = a5[i];
1332  }
1333  }
1334 
1335  //if (rank == 0) {printf("OK\n");}
1336  //fflush(0);
1337  }
1338 #else
1339  printf("NOT OMP\n");
1340  fflush(0);
1341 #endif
1342  }
1343 }
1344 
1345 /* Tests kmr_shuffle_leveling_pair_count(). makemanyintegerkeys8()
1346  generate random pairs. copynegate8() makes a copy negating the
1347  value in the KVS for later checking. */
1348 
1349 static int
1350 makemanyintegerkeys8(const struct kmr_kv_box kv0,
1351  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i_)
1352 {
1353  /* (N: Same keys are generated upto N times). */
1354  int N = 4;
1355  long MM = *(long *)p;
1356  KMR *mr = kvo->c.mr;
1357  int rank = mr->rank;
1358  int nprocs = mr->nprocs;
1359  long cnt = (MM * (nprocs - rank) / nprocs);
1360  for (long i = 0; i < cnt; i++) {
1361  long j = (i / N);
1362  struct kmr_kv_box kv = {
1363  .klen = sizeof(long),
1364  .vlen = sizeof(long),
1365  .k.i = (rank + (j * nprocs)),
1366  .v.i = (i + 1)
1367  };
1368  kmr_add_kv(kvo, kv);
1369  }
1370  return MPI_SUCCESS;
1371 }
1372 
1373 static int
1374 makemanystringkeys8(const struct kmr_kv_box kv0,
1375  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i_)
1376 {
1377  /* (N: Same keys are generated upto N times). */
1378  int N = 4;
1379  long MM = *(long *)p;
1380  KMR *mr = kvo->c.mr;
1381  int rank = mr->rank;
1382  int nprocs = mr->nprocs;
1383  long cnt = (MM * (nprocs - rank) / nprocs);
1384  for (long i = 0; i < cnt; i++) {
1385  long j = (i / N);
1386  char k[80];
1387  snprintf(k, 80, "key%ld", (rank + (j * nprocs)));
1388  struct kmr_kv_box kv = {
1389  .klen = (int)(strlen(k) + 1),
1390  .vlen = sizeof(long),
1391  .k.p = k,
1392  .v.i = (i + 1)
1393  };
1394  kmr_add_kv(kvo, kv);
1395  }
1396  return MPI_SUCCESS;
1397 }
1398 
1399 static int
1400 copynegate8(const struct kmr_kv_box kv0,
1401  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i_)
1402 {
1403  assert(kv0.v.i > 0);
1404  struct kmr_kv_box kv = {
1405  .klen = kv0.klen,
1406  .vlen = kv0.vlen,
1407  .k = kv0.k,
1408  .v.i = (- kv0.v.i),
1409  };
1410  kmr_add_kv(kvo, kv);
1411  return MPI_SUCCESS;
1412 }
1413 
1414 static int
1415 comparebycanceling8(const struct kmr_kv_box kv[], const long n,
1416  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
1417 {
1418  long values[n];
1419  for (long i = 0; i < n; i++) {
1420  assert(kv[i].v.i != 0);
1421  values[i] = kv[i].v.i;
1422  }
1423  for (long i = 0; i < n; i++) {
1424  if (values[i] == 0) {
1425  continue;
1426  } else {
1427  assert(i < (n - 1));
1428  for (long j = (i + 1); j < n; j++) {
1429  if (values[i] == (- values[j])) {
1430  values[i] = 0;
1431  values[j] = 0;
1432  break;
1433  }
1434  assert(j != (n - 1));
1435  }
1436  }
1437  }
1438  for (long i = 0; i < n; i++) {
1439  assert(values[i] == 0);
1440  }
1441  return MPI_SUCCESS;
1442 }
1443 
1444 static void
1445 simple8(int nprocs, int rank)
1446 {
1447  MPI_Barrier(MPI_COMM_WORLD);
1448  if (rank == 0) {printf("PSEUDO-SCAN\n");}
1449  fflush(0);
1450  usleep(50 * 1000);
1451 
1452  /* Check with (MM * (nprocs - rank) / nprocs) keys on ranks. */
1453 
1454  const long MM = 100;
1455 
1456  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
1457  assert(mr != 0);
1458 
1459  /* DO ONCE ON INTEGER KEYS AND ONCE ON STRING KEYS. */
1460 
1461  for (int i = 0; i < 2; i++) {
1462  assert(i == 0 || i == 1);
1463  kmr_mapfn_t makedatafn = ((i == 0)
1464  ? makemanyintegerkeys8
1465  : makemanystringkeys8);
1466  enum kmr_kv_field keyf = ((i == 0)
1467  ? KMR_KV_INTEGER
1468  : KMR_KV_OPAQUE);
1469 
1470  KMR_KVS *kvs0 = kmr_create_kvs(mr, keyf, KMR_KV_INTEGER);
1471  kmr_map_once(kvs0, (void *)&MM, kmr_noopt, 0, makedatafn);
1472 
1473  KMR_KVS *kvs1 = kmr_create_kvs(mr, keyf, KMR_KV_INTEGER);
1474  struct kmr_option inspect = {.inspect = 1};
1475  kmr_map(kvs0, kvs1, 0, inspect, copynegate8);
1476 
1477  KMR_KVS *kvs2 = kmr_create_kvs(mr, keyf, KMR_KV_INTEGER);
1478  kmr_shuffle_leveling_pair_count(kvs0, kvs2);
1479 
1480  long counts[nprocs];
1481  double stat[4];
1482  kmr_histogram_count_by_ranks(kvs2, counts, stat, 1);
1483 
1484  if (rank == 0) {
1485  printf("number of pairs on ranks:\n");
1486  for (int r = 0; r < nprocs; r++) {
1487  printf("%ld", counts[r]);
1488  if (r == (nprocs - 1)) {
1489  printf("\n");
1490  } else if (r == 10) {
1491  printf("\n");
1492  } else {
1493  printf(",");
1494  }
1495  }
1496  fflush(0);
1497  }
1498  //kmr_dump_kvs(kvs2, 0);
1499 
1500  /* Check the shuffled KVS is a rearrangement of the original KVS. */
1501 
1502  KMR_KVS *kvs3 = kmr_create_kvs(mr, keyf, KMR_KV_INTEGER);
1503  KMR_KVS *kvsvec[2] = {kvs1, kvs2};
1504  kmr_concatenate_kvs(kvsvec, 2, kvs3, kmr_noopt);
1505 
1506  KMR_KVS *kvs4 = kmr_create_kvs(mr, keyf, KMR_KV_INTEGER);
1507  kmr_shuffle(kvs3, kvs4, kmr_noopt);
1508 
1509  kmr_reduce(kvs4, 0, 0, kmr_noopt, comparebycanceling8);
1510  }
1511 
1512  kmr_free_context(mr);
1513 }
1514 
1515 int
1516 main(int argc, char *argv[])
1517 {
1518  int nprocs, rank, thlv;
1519  /*MPI_Init(&argc, &argv);*/
1520  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
1521  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
1522  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1523 
1524  kmr_init();
1525 
1526  if (rank == 0) {
1527 #ifdef NDEBUG
1528  _Bool assertion = 0;
1529 #else
1530  _Bool assertion = 1;
1531 #endif
1532  if (!assertion) {
1533  kmr_error(0, "Assertion disabled; recompile this test");
1534  return 1;
1535  }
1536  }
1537 
1538  simple0(nprocs, rank, 0);
1539  simple0(nprocs, rank, 1);
1540  simple1(nprocs, rank);
1541  simple2(nprocs, rank);
1542  simple3(nprocs, rank);
1543  simple4(nprocs, rank);
1544  simple5(nprocs, rank);
1545  simple6(nprocs, rank);
1546  simple7(nprocs, rank);
1547  simple8(nprocs, rank);
1548 
1549  MPI_Barrier(MPI_COMM_WORLD);
1550  usleep(50 * 1000);
1551  if (rank == 0) {printf("OK\n");}
1552  fflush(0);
1553 
1554  kmr_fin();
1555 
1556  MPI_Finalize();
1557 
1558  return 0;
1559 }
Key-Value Stream (abstract).
Definition: kmr.h:632
Utilities Private Part (do not include from applications).
int kmr_map_for_some(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps until some key-value are added.
Definition: kmrmoreops.c:1170
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:658
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:809
KMR_KVS * kmr_create_pushoff_kvs(KMR *mr, enum kmr_kv_field kf, enum kmr_kv_field vf, struct kmr_option opt, const char *, const int, const char *)
Makes a new key-value stream with the specified field data-types.
Definition: kmraltkvs.c:85
static int kmr_icmp(const void *a0, const void *a1)
Compares the key field of keyed-records for qsort/bsearch.
Definition: kmrbase.c:1873
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
Definition: kmrbase.c:1460
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_save_kvs(KMR_KVS *kvi, void **dataq, size_t *szq, struct kmr_option opt)
Packs locally the contents of a key-value stream to a byte array.
Definition: kmrbase.c:1026
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2094
int kmr_distribute(KMR_KVS *kvi, KMR_KVS *kvo, _Bool cyclic, struct kmr_option opt)
Distributes key-values so that each rank has approximately the same number of pairs.
Definition: kmrmoreops.c:835
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:939
int kmr_sort_by_one(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sort by rank0, a degenerated case for small number of keys.
Definition: kmrmoreops.c:544
KMR Context.
Definition: kmr.h:247
int kmr_concatenate_kvs(KMR_KVS *kvs[], int nkvs, KMR_KVS *kvo, struct kmr_option opt)
Concatenates a number of KVSes to one.
Definition: kmrbase.c:2754
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:679
kmr_kv_field
Datatypes of Keys or Values.
Definition: kmr.h:368
void kmr_isort(void *a, size_t n, size_t es, int depth)
Sorts by comparator on long integers.
Definition: kmrisort.c:292
int kmr_take_one(KMR_KVS *kvi, struct kmr_kv_box *kv)
Extracts a single key-value pair locally in the key-value stream KVI.
Definition: kmrbase.c:1427
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:401
int kmr_sort_large(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream by the regular or the random sampling-sort.
Definition: kmrmoreops.c:469
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
int kmr_get_element_count(KMR_KVS *kvs, long *v)
Gets the total number of key-value pairs.
Definition: kmrmoreops.c:114
#define kmr_init()
Sets up the environment.
Definition: kmr.h:794
void * kmr_bsearch(const void *key, const void *base, size_t nel, size_t size, int(*compar)(const void *, const void *))
Searches a key entry like bsearch(3C), but returns a next greater entry instead of null on no match...
Definition: kmrutil.c:569
int kmr_sort(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream globally.
Definition: kmrmoreops.c:575
int kmr_assert_sorted(KMR_KVS *kvi, _Bool locally, _Bool shuffling, _Bool ranking)
Checks a key-value stream is sorted.
Definition: kmrutil.c:717
int kmr_shuffle_leveling_pair_count(KMR_KVS *kvi, KMR_KVS *kvo)
Shuffles key-values so that each rank has approximately the same number of pairs. ...
Definition: kmrmoreops.c:1074
int kmr_histogram_count_by_ranks(KMR_KVS *kvs, long *frq, double *var, _Bool rankzeroonly)
Fills an integer array FRQ[i] with the count of the elements of each rank.
Definition: kmrmoreops.c:1569
int kmr_restore_kvs(KMR_KVS *kvo, void *data, size_t sz, struct kmr_option opt)
Unpacks locally the contents of a key-value stream from a byte array.
Definition: kmrbase.c:1092
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:367
KMR Interface.
int kmr_load_properties(MPI_Info conf, char *filename)
Loads properties into MPI_Info (in Latin1 characters).
Definition: kmrutil.c:2020
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2240
int kmr_sort_small(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream, by partitioning to equal ranges.
Definition: kmrmoreops.c:388
int kmr_sort_locally(KMR_KVS *kvi, KMR_KVS *kvo, _Bool shuffling, struct kmr_option opt)
Reorders key-value pairs in a single rank.
Definition: kmrbase.c:2051
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
Definition: kmr.h:736
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
Definition: kmrbase.c:1514
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:168