KMR
kmrbase.c
Go to the documentation of this file.
1 /* kmrbase.c (2014-02-04) */
2 /* Copyright (C) 2012-2018 RIKEN R-CCS */
3 
4 /** \file kmrbase.c KMR Base Implementation (on-memory operations).
5  KMR aims at fast shuffling and scalability, and provides modest
6  utilities for programming with map-reduce. This part implements
7  on-memory operations. */
8 
9 /* NOTE: (1) KMR and KMR_KVS are handled collectively (allocated,
10  modified, and freed). */
11 
12 #include <mpi.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <unistd.h>
16 #include <limits.h>
17 #include <errno.h>
18 #include <assert.h>
19 #include <ctype.h>
20 #ifdef _OPENMP
21 #include <omp.h>
22 #endif
23 #include "../config.h"
24 #include "kmr.h"
25 #include "kmrimpl.h"
26 #include "kmrtrace.h"
27 
28 int KMR_API_ID = 0;
29 const int kmr_version = KMR_H;
30 
31 #define MIN(a,b) (((a)<(b))?(a):(b))
32 #define MAX(a,b) (((a)>(b))?(a):(b))
33 #define NEVERHERE 0
34 
35 /* Default unit of allocation of memory block for key-value pairs.
36  See preset_block_size in struct kmr_ctx. */
37 
38 #define BLOCK_SIZE (64 * 1024 * 1024)
39 
40 /* Default of the number of entries pooled before calling a
41  map-function. See mapper_park_size in struct kmr_ctx. */
42 
43 #define MAP_PARK_SIZE (1024)
44 
45 /* Default block size of a push-off key-value stream. See See
46  pushoff_block_size in struct kmr_ctx. */
47 
48 #define PUSHOFF_SIZE (64 * 1024)
49 
50 /* Checks if an end-of-block marker is properly placed. It only
51  checks when a key-value stream consists of a single preallocated
52  block. */
53 
54 static inline void
55 kmr_assert_on_tail_marker(KMR_KVS *kvs)
56 {
57  if (kvs != 0 && kvs->c.block_count == 1) {
58  struct kmr_kvs_block *b = kvs->c.first_block;
59  size_t netsz = kvs->c.storage_netsize;
60  struct kmr_kvs_entry *e = kmr_kvs_entry_at(kvs, b, netsz);
61  assert((((intptr_t)e) & 3) == 0
62  && e->klen == -1 && e->vlen == -1);
63  }
64 }
65 
66 /* Sets up the environment (nothing currently). It checks the data
67  sizes in C meets the KMR assumptions. It also checks MPI_LONG has
68  8-byte size, that is assumed in all-to-all communication. It calls
69  an OMP function, to initialize the threads environment here. */
70 
71 int
72 kmr_init_2(int ignore)
73 {
74  int cc;
75  assert(sizeof(long) == sizeof(size_t)
76  && sizeof(long) == sizeof(ssize_t)
77  && sizeof(long) == sizeof(off_t)
78  && sizeof(long) == sizeof(uint64_t)
79  && sizeof(long) >= sizeof(intptr_t)
80  && sizeof(long) >= sizeof(void *));
81  assert(kmr_check_alignment(offsetof(struct kmr_kvs_entry, c)));
82  assert(kmr_check_alignment(offsetof(struct kmr_kvs_block, data)));
83  assert(kmr_check_alignment(offsetof(struct kmr_ntuple_entry, len)));
84  assert(sizeof(struct kmr_option) == sizeof(long)
85  && sizeof(struct kmr_file_option) == sizeof(long)
86  && sizeof(struct kmr_spawn_option) == sizeof(long));
87  MPI_Aint lb;
88  MPI_Aint extent;
89  cc = MPI_Type_get_extent(MPI_LONG, &lb, &extent);
90  assert(cc == MPI_SUCCESS);
91  assert(lb == 0 && extent == 8);
92 
93 #if 0
94  KMR_OMP_PARALLEL_
95  {
96  int tid = omp_get_thread_num();
97  assert(tid >= 0);
98  }
99 #endif
100 
101  return MPI_SUCCESS;
102 }
103 
104 /** Sets up the environment, and checks the constant definitions in C
105  and Fortran are consistent. */
106 
107 int
108 kmr_init_ff(int kf, struct kmr_option opt, struct kmr_file_option fopt)
109 {
110  union {struct kmr_option o; unsigned long i;} opt0 = {.o = opt};
111  union {struct kmr_file_option o; unsigned long i;} fopt0 = {.o = fopt};
112  opt0.o.rank_zero = 0;
113  fopt0.o.shuffle_names = 0;
114  assert(kf == KMR_KV_POINTER_UNMANAGED
115  && opt.rank_zero && fopt.shuffle_names
116  && opt0.i == 0 && fopt0.i == 0);
117  kmr_init_2(0);
118  return MPI_SUCCESS;
119 }
120 
121 /** Clears the environment. */
122 
123 int
124 kmr_fin(void)
125 {
126  return MPI_SUCCESS;
127 }
128 
129 /** Checks the initialization state of MPI, and initializes MPI when
130  not. It returns 1, or 0 when something is wrong. It is for the
131  Python binding. */
132 
133 int
134 kmr_initialize_mpi(int *refargc, char ***refargv)
135 {
136  int cc;
137  int ok;
138  cc = MPI_Initialized(&ok);
139  if (cc == MPI_SUCCESS && ok != 0) {
140  return 1;
141  } else if (cc == MPI_SUCCESS && ok == 0) {
142  int thlv;
143  cc = MPI_Init_thread(refargc, refargv, MPI_THREAD_SERIALIZED, &thlv);
144  return (cc == MPI_SUCCESS);
145  } else {
146  return 0;
147  }
148 }
149 
150 /** Makes a new KMR context (a context has type KMR). A KMR context
151  is a record of common information to all key-value streams. COMM
152  is a communicator for use inside. It dups the given communicator
153  inside, to avoid conflicts with other calls to MPI functions. MPI
154  should be initialized with a thread support level of either
155  MPI_THREAD_SERIALIZED or MPI_THREAD_MULTIPLE. CONF specifies
156  configuration options. It should be freed after a call. The
157  options can differ on each rank, (in this version). The
158  configuration options are first taken from a file with a name
159  specified by the environment variable "KMROPTION" on rank0, and
160  they are merged with the explicitly given ones. The KMROPTION
161  file has the file format of Java properties (but only in Latin
162  characters). Refer to JDK documents on "java.util.Properties" (on
163  "load" method) for the file format. The explicitly given ones
164  have precedence. IDENTIFYING_NAME is just recorded in the
165  context, and has no specific use. It may be null. */
166 
167 KMR *
168 kmr_create_context(const MPI_Comm comm, const MPI_Info conf,
169  const char *identifying_name)
170 {
171  int cc;
172  KMR *mr = kmr_malloc(sizeof(struct kmr_ctx));
173  KMR_DEBUGX(memset(mr, 0, sizeof(struct kmr_ctx)));
174 
175  cc = MPI_Comm_size(comm, &mr->nprocs);
176  assert(cc == MPI_SUCCESS);
177  cc = MPI_Comm_rank(comm, &mr->rank);
178  assert(cc == MPI_SUCCESS);
179  cc = MPI_Comm_dup(comm, &mr->comm);
180  if (cc != MPI_SUCCESS) {
181  kmr_error_mpi(mr, "MPI_Comm_dup", cc);
182  MPI_Abort(MPI_COMM_WORLD, 1);
183  }
184 
185 #ifdef _OPENMP
186  int omp_thrd = omp_get_thread_limit();
187 #else
188  int omp_thrd = 1;
189 #endif
190  assert(omp_thrd >= 1);
191 
192  int mpi_thrd;
193  cc = MPI_Query_thread(&mpi_thrd);
194  assert(cc == MPI_SUCCESS);
195  assert(mpi_thrd == MPI_THREAD_SINGLE
196  || mpi_thrd == MPI_THREAD_FUNNELED
197  || mpi_thrd == MPI_THREAD_SERIALIZED
198  || mpi_thrd == MPI_THREAD_MULTIPLE);
199  if (mpi_thrd == MPI_THREAD_SINGLE
200  || mpi_thrd == MPI_THREAD_FUNNELED) {
201  if (omp_thrd > 1) {
202  char ee[80];
203  char *s = ((mpi_thrd == MPI_THREAD_SINGLE)
204  ? "MPI_THREAD_SINGLE"
205  : "MPI_THREAD_FUNNELED");
206  snprintf(ee, sizeof(ee), "Thread support of MPI is low: %s", s);
207  kmr_warning(mr, 1, ee);
208  }
209  }
210 
211  mr->kvses.head = 0;
212  mr->kvses.tail = 0;
213 
214  mr->ckpt_kvs_id_counter = 0;
215  mr->ckpt_ctx = 0;
216  mr->ckpt_enable = 0;
217  mr->ckpt_selective = 0;
218  mr->ckpt_no_fsync = 0;
219 
220  mr->kvt_ctx = 0;
221 
222  mr->log_traces = 0;
223  mr->atwork = 0;
224 
225  mr->spawn_size = 0;
226  mr->spawn_comms = 0;
227 
228  mr->mapper_park_size = MAP_PARK_SIZE;
229  mr->preset_block_size = BLOCK_SIZE;
230  mr->malloc_overhead = (int)sizeof(void *);
231 
232  mr->atoa_threshold = 512;
233  mr->atoa_size_limit = 0;
234  mr->atoa_requests_limit = 0;
235 
236  mr->sort_trivial = 100000;
237  mr->sort_threshold = 100L;
238  mr->sort_sample_factor = 10000;
239  mr->sort_threads_depth = 5;
240 
241  mr->file_io_block_size = (1024 * 1024);
242 
243  mr->rlimit_nofile = -1;
244 
245  mr->pushoff_block_size = PUSHOFF_SIZE;
246  mr->pushoff_poll_rate = 0;
247 
248 #if defined(KMRLIBDIR)
249  mr->kmr_installation_path = KMRLIBDIR;
250 #else
251  mr->kmr_installation_path = 0;
252 #endif
253  mr->spawn_watch_program = 0;
254  mr->spawn_watch_prefix = 0;
255  mr->spawn_watch_host_name = 0;
256  mr->spawn_max_processes = 0;
257  mr->spawn_watch_af = 4;
258  mr->spawn_watch_port_range[0] = 0;
259  mr->spawn_watch_port_range[1] = 0;
260  mr->spawn_gap_msec[0] = 1000;
261  mr->spawn_gap_msec[1] = 10000;
262  mr->spawn_watch_accept_onhold_msec = (60 * 1000);
263 
264  mr->spawn_self = MPI_COMM_NULL;
265  mr->spawn_retry_limit = 20;
266  mr->spawn_retry_gap_msec = (15 * 1000);
267 
268  mr->simple_workflow = 0;
269  mr->swf_spawner_so = 0;
270  mr->swf_spawner_library = 0;
271  mr->swf_args_size = 0;
272  mr->swf_exec_so = 0;
273  mr->swf_record_history = 1;
274  mr->swf_debug_master = 0;
275 
276  mr->verbosity = 5;
277 
278  mr->onk = 1;
279  mr->single_thread = 0;
280  mr->one_step_sort = 0;
281  mr->step_sync = 0;
282  mr->trace_sorting = 0;
283  mr->trace_file_io = 0;
284  mr->trace_map_ms = 0;
285  mr->trace_map_spawn = 0;
286  mr->trace_alltoall = 0;
287  mr->trace_kmrdp = 0;
288  mr->trace_iolb = 0;
289  mr->std_abort = 0;
290  mr->file_io_dummy_striping = 1;
291  mr->file_io_always_alltoallv = 0;
292  mr->map_ms_use_exec = 0;
293  mr->map_ms_abort_on_signal = 0;
294  mr->spawn_sync_at_startup = 0;
295  mr->spawn_watch_all = 0;
296  mr->spawn_disconnect_early = 0;
297  mr->spawn_disconnect_but_free = 0;
298  mr->spawn_pass_intercomm_in_argument = 0;
299  mr->keep_fds_at_fork = 0;
300 
301  mr->mpi_thread_support = (mpi_thrd == MPI_THREAD_SERIALIZED
302  || mpi_thrd == MPI_THREAD_MULTIPLE);
303 
304  mr->stop_at_some_check_globally = 0;
305  mr->pushoff_hang_out = 0;
306  mr->pushoff_fast_notice = 0;
307  mr->pushoff_stat = 1;
308  memset(&mr->pushoff_statistics, 0, sizeof(mr->pushoff_statistics));
309 
310  mr->kmrviz_trace = 0;
311 
312  if (identifying_name != 0) {
313  size_t s = strlen(identifying_name);
314  assert(s < KMR_JOB_NAME_LEN);
315  strncpy(mr->identifying_name, identifying_name, KMR_JOB_NAME_LEN);
316  mr->identifying_name[KMR_JOB_NAME_LEN - 1] = 0;
317  } else {
318  mr->identifying_name[0] = 0;
319  }
320 
321  /* KMR is now usable (with default setting). */
322 
323  /* Load and merge MPI infos. */
324 
325  cc = MPI_Info_create(&mr->conf);
326  assert(cc == MPI_SUCCESS);
327  cc = kmr_load_preference(mr, mr->conf);
328  assert(cc == MPI_SUCCESS);
329  if (conf != MPI_INFO_NULL) {
330  cc = kmr_copy_mpi_info(conf, mr->conf);
331  assert(cc == MPI_SUCCESS);
332  }
333 
334  kmr_check_options(mr, mr->conf);
335 
336  /* Initialize checkpoint context. */
337 
339 
340  /* Initialize KMRViz trace */
341 
343 
344  return mr;
345 }
346 
347 KMR *
348 kmr_create_context_world()
349 {
350  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, "");
351  return mr;
352 }
353 
354 KMR *
355 kmr_create_context_ff(const int fcomm, const int finfo,
356  const char *identifying_name)
357 {
358  MPI_Comm comm = MPI_Comm_f2c(fcomm);
359  MPI_Info info = MPI_Info_f2c(finfo);
360  KMR *mr = kmr_create_context(comm, info, identifying_name);
361  return mr;
362 }
363 
364 /** Releases a context created with kmr_create_context(). */
365 
366 int
368 {
369  int cc;
370  if (mr->kvses.head != 0 || mr->kvses.tail != 0) {
371  kmr_warning(mr, 1, "Some key-value streams remain unfreed");
372  for (KMR_KVS *p = mr->kvses.head; p != 0; p = p->c.link.next) {
373  if (!KMR_KVS_MAGIC_OK(p->c.magic)) {
374  kmr_warning(mr, 1, "- unfreed kvs in bad state");
375  } else if (p->c.magic == KMR_KVS_ONCORE) {
376  if (p->c.info_line0.file != 0) {
377  char ee[80];
378  snprintf(ee, 80, "- kvs allocated at %s:%d: %s",
379  p->c.info_line0.file, p->c.info_line0.line,
380  p->c.info_line0.func);
381  kmr_warning(mr, 1, ee);
382  }
383  } else {
384  kmr_warning(mr, 1, "- unfreed kvs in bad state");
385  }
386  }
387  }
388 
389  if (mr->spawn_self != MPI_COMM_NULL && mr->spawn_self != MPI_COMM_SELF) {
390  cc = MPI_Comm_free(&mr->spawn_self);
391  assert(cc == MPI_SUCCESS);
392  }
393 
394  /* Finalize KMRViz trace. */
395 
396  kmr_trace_finalize(mr);
397 
398  /* Free checkpoint context. */
399 
401 
402  if (mr->log_traces != 0) {
403  cc = fclose(mr->log_traces);
404  if (cc == EOF) {
405  char ee[80];
406  char *m = strerror(errno);
407  snprintf(ee, sizeof(ee), "Closing log file failed: %s", m);
408  kmr_warning(mr, 1, ee);
409  }
410  mr->log_traces = 0;
411  }
412 
413  cc = MPI_Comm_free(&mr->comm);
414  assert(cc == MPI_SUCCESS);
415  if (mr->conf != MPI_INFO_NULL) {
416  cc = MPI_Info_free(&mr->conf);
417  assert(cc == MPI_SUCCESS);
418  }
419 
420  if (mr->spawn_watch_program != 0) {
421  kmr_free_string(mr->spawn_watch_program);
422  mr->spawn_watch_program = 0;
423  }
424  assert(mr->spawn_comms == 0);
425  /*mr->kmr_installation_path;*/
426  /*mr->spawn_watch_prefix;*/
427  /*mr->spawn_watch_host_name;*/
428 
429  if (mr->simple_workflow != 0) {
430  kmr_finish_swf(mr);
431  mr->simple_workflow = 0;
432  }
433  assert(mr->swf_spawner_so == 0);
434  if (mr->swf_spawner_library != 0) {
435  kmr_free_string(mr->swf_spawner_library);
436  mr->swf_spawner_library = 0;
437  }
438 
439  kmr_free(mr, sizeof(struct kmr_ctx));
440  return MPI_SUCCESS;
441 }
442 
443 KMR *
444 kmr_get_context_of_kvs(KMR_KVS const *kvs)
445 {
446  KMR *mr = kvs->c.mr;
447  return mr;
448 }
449 
450 /* Unlinks a key-value stream from a list on a context. */
451 
452 static inline void
453 kmr_unlink_kvs(KMR_KVS *kvs)
454 {
455  KMR *mr = kvs->c.mr;
456  KMR_KVS *prev = kvs->c.link.prev;
457  KMR_KVS *next = kvs->c.link.next;
458  if (prev != 0) {
459  prev->c.link.next = next;
460  } else {
461  assert(mr->kvses.head == kvs);
462  mr->kvses.head = next;
463  }
464  if (next != 0) {
465  next->c.link.prev = prev;
466  } else {
467  assert(mr->kvses.tail == kvs);
468  mr->kvses.tail = prev;
469  }
470 }
471 
472 /** Makes a new key-value stream (type KMR_KVS). It allocates by the
473  size of the union, which may be larger than the necessary. */
474 
475 static KMR_KVS *
476 kmr_create_raw_kvs(KMR *mr, const KMR_KVS *_similar)
477 {
478  xassert(mr != 0);
479  /*assert(similar->c.mr == mr);*/
480  KMR_KVS *kvs = kmr_malloc(sizeof(KMR_KVS));
481  KMR_DEBUGX(memset(kvs, 0, sizeof(KMR_KVS)));
482  kvs->c.magic = KMR_KVS_ONCORE;
483  kvs->c.mr = mr;
484  kmr_link_kvs(kvs);
485 
486  if (kmr_ckpt_enabled(mr)) {
487  mr->ckpt_kvs_id_counter++;
488  kvs->c.ckpt_kvs_id = mr->ckpt_kvs_id_counter;
489  kvs->c.ckpt_generated_op = 0;
490  kvs->c.ckpt_consumed_op = 0;
491  }
492 
493  kvs->c.key_data = KMR_KV_BAD;
494  kvs->c.value_data = KMR_KV_BAD;
495  kvs->c.element_count = 0;
496 
497  kvs->c.oncore = 1;
498  kvs->c.stowed = 0;
499  kvs->c.nogrow = 0;
500  kvs->c.sorted = 0;
501  kvs->c.shuffled_in_pushoff = 0;
502  kvs->c._uniformly_sized_ = 0;
503 
504  kvs->c.block_size = (mr->preset_block_size - mr->malloc_overhead);
505  kvs->c.element_size_limit = (kvs->c.block_size / 4);
506  kvs->c.storage_netsize = 0;
507  kvs->c.block_count = 0;
508  kvs->c.first_block = 0;
509  kvs->c.ms = 0;
510 
511  /* Transient fields: */
512 
513  kvs->c.under_threaded_operation = 0;
514  kvs->c.current_block = 0;
515  kvs->c.adding_point = 0;
516  kvs->c.temporary_data = 0;
517 
518  //KMR_OMP_INIT_LOCK(&kvs->c.mutex);
519 
520  return kvs;
521 }
522 
523 /* Clears the slots of the structure. It keeps the fields of the a
524  link and checkpointing. */
525 
526 void
527 kmr_init_kvs_oncore(KMR_KVS *kvs, KMR *mr)
528 {
529  assert(mr != 0);
530  kvs->c.magic = KMR_KVS_ONCORE;
531  kvs->c.mr = mr;
532  /*kmr_link_kvs(kvs);*/
533 
534  /*kvs->c.ckpt_kvs_id = 0;*/
535  /*kvs->c.ckpt_generated_op = 0;*/
536  /*kvs->c.ckpt_consumed_op = 0;*/
537 
538  kvs->c.key_data = KMR_KV_BAD;
539  kvs->c.value_data = KMR_KV_BAD;
540  kvs->c.element_count = 0;
541 
542  kvs->c.oncore = 1;
543  kvs->c.stowed = 0;
544  kvs->c.nogrow = 0;
545  kvs->c.sorted = 0;
546  kvs->c.shuffled_in_pushoff = 0;
547  kvs->c._uniformly_sized_ = 0;
548 
549  kvs->c.block_size = (mr->preset_block_size - mr->malloc_overhead);
550  kvs->c.element_size_limit = (kvs->c.block_size / 4);
551  kvs->c.storage_netsize = 0;
552  kvs->c.block_count = 0;
553  kvs->c.first_block = 0;
554  kvs->c.ms = 0;
555 
556  /* Transient fields: */
557 
558  kvs->c.under_threaded_operation = 0;
559  kvs->c.current_block = 0;
560  kvs->c.adding_point = 0;
561  kvs->c.temporary_data = 0;
562 }
563 
564 /** Makes a new key-value stream with the specified field
565  data-types. */
566 
567 KMR_KVS *
569  struct kmr_option opt,
570  const char *file, const int line, const char *func)
571 {
572  KMR_KVS *kvs = kmr_create_raw_kvs(mr, 0);
573  kvs->c.key_data = kf;
574  kvs->c.value_data = vf;
575  kvs->c.info_line0.file = file;
576  kvs->c.info_line0.func = func;
577  kvs->c.info_line0.line = line;
578 
579  if (kmr_ckpt_enabled(mr)) {
581  }
582 
583  return kvs;
584 }
585 
586 /** Moves the contents of the input KVI to the output KVO. It
587  consumes the input KVI. Calling kmr_map() with a null
588  map-function has the same effect. Effective-options: TAKE_CKPT.
589  See struct kmr_option. */
590 
591 int
592 kmr_move_kvs(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
593 {
594  assert(kvi != 0 && kvo != 0
595  && kvi->c.magic == KMR_KVS_ONCORE
596  && kvo->c.magic == KMR_KVS_ONCORE
597  && kvi->c.oncore && kvo->c.oncore);
598  assert(kvi->c.key_data == kvo->c.key_data
599  && kvi->c.value_data == kvo->c.value_data);
600  assert(kvi->c.stowed && !kvo->c.stowed);
601  // struct kmr_option kmr_supported = {.take_ckpt = 1};
602  // kmr_check_fn_options(kvo->c.mr, kmr_supported, opt, __func__);
603 
604  if (kmr_ckpt_enabled(kvo->c.mr)) {
605  if (kmr_ckpt_progress_init(kvi, kvo, opt)) {
606  kvi->c.first_block = 0;
607  kvi->c.ms = 0;
608  kmr_free_kvs(kvi);
609  return MPI_SUCCESS;
610  }
611  }
612 
613  /* Copy state. */
614 
615  kvo->c.stowed = kvi->c.stowed;
616  kvo->c.nogrow = kvi->c.nogrow;
617  kvo->c.sorted = kvi->c.sorted;
618  kvo->c.element_count = kvi->c.element_count;
619  kvo->c.storage_netsize = kvi->c.storage_netsize;
620  kvo->c.block_count = kvi->c.block_count;
621  kvo->c.first_block = kvi->c.first_block;
622  kvo->c.ms = kvi->c.ms;
623 
624  if (kmr_ckpt_enabled(kvo->c.mr)) {
625  kmr_ckpt_save_kvo_whole(kvo->c.mr, kvo);
626  }
627 
628  /* Dispose of input. */
629 
630  kvi->c.first_block = 0;
631  kvi->c.ms = 0;
632  int cc = kmr_free_kvs(kvi);
633  assert(cc == MPI_SUCCESS);
634 
635  if (kmr_ckpt_enabled(kvo->c.mr)) {
636  kmr_ckpt_progress_fin(kvo->c.mr);
637  }
638  return MPI_SUCCESS;
639 }
640 
641 static inline int
642 kmr_free_kvs_oncore(KMR_KVS *kvs)
643 {
644  struct kmr_kvs_block *b = kvs->c.first_block;
645  while (b != 0) {
646  struct kmr_kvs_block *bn = b->next;
647  kmr_free(b, b->size);
648  b = bn;
649  }
650  if (kvs->c.ms != 0) {
651  long cnt = kvs->c.element_count;
652  size_t sz = (sizeof(struct kmr_map_ms_state)
653  + (sizeof(char) * (size_t)cnt));
654  kmr_free(kvs->c.ms, sz);
655  }
656  if (kvs->c.temporary_data != 0) {
657  kmr_free(kvs->c.temporary_data, 0);
658  }
659  kvs->c.magic = KMR_KVS_BAD;
660 
661  /* Delete checkpoint file if exists. */
662 
663  if (kmr_ckpt_enabled(kvs->c.mr)) {
665  }
666 
667  kmr_free(kvs, sizeof(struct kmr_kvs_oncore));
668  return MPI_SUCCESS;
669 }
670 
671 /** Releases a key-value stream (type KMR_KVS). Normally,
672  mapper/shuffler/reducer consumes and frees the input key-value
673  stream, and explicit calls are unnecessary. Here,
674  mapper/shuffler/reducer includes kmr_map(),
675  kmr_map_on_rank_zero(), kmr_map_ms(), kmr_shuffle(),
676  kmr_replicate(), kmr_reduce(), and kmr_reduce_as_one(). */
677 
678 int
680 {
681  if (!KMR_KVS_MAGIC_OK(kvs->c.magic)) {
682  kmr_error(0, "kmr_free_kvs: kvs already freed or corrupted");
683  }
684  kmr_unlink_kvs(kvs);
685 
686  int cc;
687  if (kvs->c.magic == KMR_KVS_ONCORE) {
688  cc = kmr_free_kvs_oncore(kvs);
689  return cc;
690  } else if (kvs->c.magic == KMR_KVS_PUSHOFF) {
691  cc = kmr_free_kvs_pushoff(kvs, 1);
692  return cc;
693  } else {
694  assert((kvs->c.magic == KMR_KVS_ONCORE)
695  || (kvs->c.magic == KMR_KVS_PUSHOFF));
696  assert(NEVERHERE);
697  return 0;
698  }
699 }
700 
701 /* ================================================================ */
702 
703 /* Allocates a new block of storage as a current-block. When the
704  SIZE=1, it allocates a block by the pre-specified block-size, and
705  allows it to glow incrementally. When the SIZE!=1, it allocates a
706  block by that size after increasing it for the spaces of a header
707  and an end-of-block marker. It sets the STORAGE_NETSIZE field, and
708  places an end-of-block marker, because all key-value pairs should
709  fit in the given size. It accepts zero as a legitimate size. */
710 
711 int
712 kmr_allocate_block(KMR_KVS *kvs, size_t size)
713 {
714  if (size != 1) {
715  assert(kvs->c.element_count == 0 && kvs->c.storage_netsize == 0
716  && kvs->c.block_count == 0 && kvs->c.first_block == 0
717  && kvs->c.current_block == 0 && kvs->c.adding_point == 0);
718  }
719  size_t netsz;
720  size_t sz;
721  if (size == 0) {
722  kvs->c.block_size = 0;
723  kvs->c.nogrow = 1;
724  return MPI_SUCCESS;
725  } else if (size == 1) {
726  netsz = 0;
727  sz = kvs->c.block_size;
728  assert(kvs->c.nogrow == 0);
729  } else {
730  assert(kvs->c.first_block == 0 && kvs->c.current_block == 0
731  && kvs->c.block_count == 0 && kvs->c.adding_point == 0);
732  assert(size >= kmr_kvs_entry_header);
733  netsz = size;
734  sz = (netsz + kmr_kvs_block_header + kmr_kvs_entry_header);
735  kvs->c.block_size = sz;
736  kvs->c.storage_netsize = netsz;
737  kvs->c.nogrow = 1;
738  }
739  struct kmr_kvs_block *b = kmr_malloc(sz);
740  kmr_kvs_reset_block(kvs, b, sz, netsz);
741  kmr_kvs_insert_block(kvs, b);
742  return MPI_SUCCESS;
743 }
744 
745 /* Adjusts the adding-point to the end for putting key-value pairs at
746  once. It is only called when the whole key-value stream size is
747  known in advance. */
748 
749 static inline void
750 kmr_kvs_adjust_adding_point(KMR_KVS *kvs)
751 {
752  if (kvs->c.block_count == 0) {
753  assert(kvs->c.current_block == 0 && kvs->c.adding_point == 0);
754  } else {
755  assert(kvs->c.current_block != 0 && kvs->c.adding_point != 0);
756  struct kmr_kvs_block *b = kvs->c.current_block;
757  assert(kmr_kvs_first_entry(kvs, b) == kvs->c.adding_point);
758  kvs->c.adding_point = kmr_kvs_last_entry_limit(kvs, b);
759  assert(kvs->c.adding_point == kmr_kvs_adding_point(b));
760  }
761 }
762 
763 /* Adds a key-value pair. It is a body of kmr_add_kv(), without a
764  mutex. It modifies kmr_kv_box XKV (when non-null) to return the
765  pointers to the opaque fields, when a key or a value is opaque. It
766  does not move actual data when RESERVE_SPACE_ONLY=1. */
767 
768 static inline int
769 kmr_add_kv_nomutex(KMR_KVS *kvs, const struct kmr_kv_box kv,
770  struct kmr_kv_box *xkv, _Bool reserve_space_only)
771 {
772  kmr_assert_kv_sizes(kvs, kv);
773  assert(!kvs->c.nogrow || kvs->c.storage_netsize != 0);
774  KMR *mr = kvs->c.mr;
775  int cc;
776  size_t sz = kmr_kvs_entry_size_of_box(kvs, kv);
777  if (sz > (kvs->c.element_size_limit)) {
778  char ee[80];
779  snprintf(ee, 80, "key-value too large (size=%zd)", sz);
780  kmr_error(mr, ee);
781  }
782  if (kvs->c.first_block == 0) {
783  assert(kvs->c.element_count == 0);
784  cc = kmr_allocate_block(kvs, 1);
785  assert(cc == MPI_SUCCESS);
786  }
787  if (!kmr_kvs_entry_fits_in_block(kvs, kvs->c.current_block, sz)) {
788  assert(!kvs->c.nogrow);
789  kmr_kvs_mark_entry_tail(kvs->c.adding_point);
790  cc = kmr_allocate_block(kvs, 1);
791  assert(cc == MPI_SUCCESS);
792  }
793  struct kmr_kvs_entry *e = kvs->c.adding_point;
794  kmr_poke_kv(e, kv, xkv, kvs, reserve_space_only);
795  if (!kvs->c.nogrow) {
796  kvs->c.storage_netsize += kmr_kvs_entry_netsize(e);
797  }
798  kvs->c.current_block->partial_element_count++;
799  kvs->c.current_block->fill_size += kmr_kvs_entry_size(kvs, e);
800  kvs->c.adding_point = kmr_kvs_next_entry(kvs, e);
801  kvs->c.element_count++;
802  return MPI_SUCCESS;
803 }
804 
805 /** Adds a key-value pair. (It is with serialization when a
806  map-function is threaded). */
807 
808 int
809 kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
810 {
811  kmr_assert_kvs_ok(0, kvs, 0, 1);
812  int cc;
813  if (kvs->c.magic == KMR_KVS_ONCORE) {
814  KMR_OMP_CRITICAL_
815  {
816  cc = kmr_add_kv_nomutex(kvs, kv, 0, 0);
817  }
818  return cc;
819  } else if (kvs->c.magic == KMR_KVS_PUSHOFF) {
820  KMR_OMP_CRITICAL_
821  {
822  cc = kmr_add_kv_pushoff(kvs, kv);
823  }
824  return cc;
825  } else {
826  assert((kvs->c.magic == KMR_KVS_ONCORE)
827  || (kvs->c.magic == KMR_KVS_PUSHOFF));
828  assert(NEVERHERE);
829  return 0;
830  }
831 }
832 
833 /** Adds a key-value pair as given directly by a pointer. An integer
834  or a double be passed by a pointer (thus like &v). */
835 
836 int
837 kmr_add_kv1(KMR_KVS *kvs, void *k, int klen, void *v, int vlen)
838 {
839  union kmr_unit_sized xk;
840  switch (kvs->c.key_data) {
841  case KMR_KV_BAD:
842  xassert(kvs->c.key_data != KMR_KV_BAD);
843  xk.i = 0;
844  break;
845  case KMR_KV_INTEGER:
846  xk.i = *(long *)k;
847  break;
848  case KMR_KV_FLOAT8:
849  xk.d = *(double *)k;
850  break;
851  case KMR_KV_OPAQUE:
852  case KMR_KV_CSTRING:
853  case KMR_KV_POINTER_OWNED:
854  case KMR_KV_POINTER_UNMANAGED:
855  xk.p = k;
856  break;
857  default:
858  xassert(NEVERHERE);
859  xk.i = 0;
860  break;
861  }
862 
863  union kmr_unit_sized xv;
864  switch (kvs->c.value_data) {
865  case KMR_KV_BAD:
866  xassert(kvs->c.value_data != KMR_KV_BAD);
867  xv.i = 0;
868  break;
869  case KMR_KV_INTEGER:
870  xv.i = *(long *)v;
871  break;
872  case KMR_KV_FLOAT8:
873  xv.d = *(double *)v;
874  break;
875  case KMR_KV_OPAQUE:
876  case KMR_KV_CSTRING:
877  case KMR_KV_POINTER_OWNED:
878  case KMR_KV_POINTER_UNMANAGED:
879  xv.p = v;
880  break;
881  default:
882  xassert(NEVERHERE);
883  xv.i = 0;
884  break;
885  }
886 
887  struct kmr_kv_box kv = {.klen = klen, .vlen = vlen, .k = xk, .v = xv};
888  int cc;
889  cc = kmr_add_kv(kvs, kv);
890  return cc;
891 }
892 
893 /** Adds a key-value pair, but only allocates a space and returns the
894  pointers to the key and the value parts. It may enable to create
895  a large key/value data directly in the space. It does not return
896  a proper value if a key/value field is not a pointer. (It cannot
897  be used with a "push-off" key-value stream, because its buffer
898  will be sent out and late fill-in the buffer causes a race). */
899 
900 int
901 kmr_add_kv_space(KMR_KVS *kvs, const struct kmr_kv_box kv,
902  void **keyp, void **valuep)
903 {
904  kmr_assert_kvs_ok(0, kvs, 0, 1);
905  assert(kvs->c.magic == KMR_KVS_ONCORE);
906  int cc;
907  struct kmr_kv_box xkv = {
908  .k.p = 0,
909  .v.p = 0
910  };
911  KMR_OMP_CRITICAL_
912  {
913  cc = kmr_add_kv_nomutex(kvs, kv, &xkv, 1);
914  }
915  if (keyp != 0) {
916  *keyp = (void *)xkv.k.p;
917  }
918  if (valuep != 0) {
919  *valuep = (void *)xkv.v.p;
920  }
921  return cc;
922 }
923 
924 int
925 kmr_add_kv_quick_(KMR_KVS *kvs, const struct kmr_kv_box kv)
926 {
927  int cc = kmr_add_kv_nomutex(kvs, kv, 0, 0);
928  return cc;
929 }
930 
931 /** Marks finished adding key-value pairs. Further addition will be
932  prohibited. Normally, mapper/shuffler/reducer finishes the output
933  key-value stream by itself, and explicit calls are unnecessary.
934  Here, mapper/shuffler/reducer includes kmr_map(),
935  kmr_map_on_rank_zero(), kmr_map_ms(), kmr_shuffle(),
936  kmr_replicate(), and kmr_reduce(). */
937 
938 int
940 {
941  kmr_assert_kvs_ok(0, kvs, 0, 1);
942  if (kvs->c.magic == KMR_KVS_ONCORE) {
943  if (kvs->c.stowed) {
944  kmr_error(kvs->c.mr, "kmr_add_kv_done: may be called already");
945  }
946  if (kvs->c.element_count == 0) {
947  assert(kvs->c.current_block == 0 && kvs->c.adding_point == 0);
948  } else {
949  assert(kvs->c.current_block != 0 && kvs->c.adding_point != 0);
950  kmr_kvs_mark_entry_tail(kvs->c.adding_point);
951  }
952  kvs->c.stowed = 1;
953  kvs->c.current_block = 0;
954  kvs->c.adding_point = 0;
955  assert(kvs->c.block_count == 0 || kvs->c.first_block != 0);
956  } else if (kvs->c.magic == KMR_KVS_PUSHOFF) {
958  } else {
959  assert((kvs->c.magic == KMR_KVS_ONCORE)
960  || (kvs->c.magic == KMR_KVS_PUSHOFF));
961  assert(NEVERHERE);
962  return 0;
963  }
964  return MPI_SUCCESS;
965 }
966 
967 /** Adds a key-value pair of strings. The key and value fields should
968  be of opaque data. */
969 
970 int
971 kmr_add_string(KMR_KVS *kvs, const char *k, const char *v)
972 {
973  if (!((kvs->c.key_data == KMR_KV_OPAQUE
974  || kvs->c.key_data == KMR_KV_CSTRING)
975  && (kvs->c.value_data == KMR_KV_OPAQUE
976  || kvs->c.value_data == KMR_KV_CSTRING))) {
977  kmr_error(kvs->c.mr,
978  "key-value data-types need be opaque for strings");
979  }
980  size_t klen = (strlen(k) + 1);
981  size_t vlen = (strlen(v) + 1);
982  assert(klen <= INT_MAX && vlen <= INT_MAX);
983  struct kmr_kv_box kv;
984  kv.klen = (int)klen;
985  kv.k.p = k;
986  kv.vlen = (int)vlen;
987  kv.v.p = v;
988  int cc = kmr_add_kv(kvs, kv);
989  return cc;
990 }
991 
992 /** Adds a given key-value pair unmodified. It is a map-function. */
993 
994 int
996  const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
997 {
998  kmr_add_kv(kvo, kv);
999  return MPI_SUCCESS;
1000 }
1001 
1002 /* Packs fields as opaque. When key or value field is a pointer,
1003  fields need to be packed to make data-exchanges easy. */
1004 
1005 static int
1006 kmr_collapse_as_opaque(KMR_KVS *kvi, KMR_KVS *kvo, _Bool inspectp)
1007 {
1008  assert(kvi != 0 && kvo != 0);
1009  assert(kmr_fields_pointer_p(kvi) || kvi->c.block_count > 1);
1010  int cc;
1011  cc = kmr_allocate_block(kvo, kvi->c.storage_netsize);
1012  assert(cc == MPI_SUCCESS);
1013  struct kmr_option collapse = {.collapse = 1, .inspect = inspectp};
1014  cc = kmr_map(kvi, kvo, 0, collapse, kmr_add_identity_fn);
1015  assert(cc == MPI_SUCCESS);
1016  return MPI_SUCCESS;
1017 }
1018 
1019 /** Packs locally the contents of a key-value stream to a byte array.
1020  It is used to save or to send a key-value stream. It returns the
1021  allocated memory with its size, and it should be freed by the
1022  user. It may fail on allocating a buffer, and then it returns
1023  MPI_ERR_BUFFER. Its reverse is performed by kmr_restore_kvs(). */
1024 
1025 int
1026 kmr_save_kvs(KMR_KVS *kvs, void **dataq, size_t *szq,
1027  struct kmr_option opt)
1028 {
1029  if (kvs == 0) {
1030  kmr_error_at_site(0, "Null input kvs", 0);
1031  } else if (!KMR_KVS_MAGIC_OK(kvs->c.magic)) {
1032  kmr_error_at_site(0, "Bad input kvs (freed or corrupted)", 0);
1033  }
1034  assert(kvs->c.magic == KMR_KVS_ONCORE);
1035  kmr_check_fn_options(kvs->c.mr, kmr_noopt, opt, __func__);
1036  /*assert(kvs->c.current_block == 0 && kvs->c.adding_point == 0);*/
1037  if (kvs->c.ms != 0 || kvs->c.temporary_data != 0) {
1038  kmr_warning(kvs->c.mr, 5,
1039  "Some fields in KVS may be lost in saved image");
1040  }
1041  int cc;
1042  if (kmr_fields_pointer_p(kvs) || (kvs->c.block_count > 1)) {
1043  enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvs->c.key_data);
1044  enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvs->c.value_data);
1045  KMR_KVS *kvs1 = kmr_create_kvs(kvs->c.mr, keyf, valf);
1046  cc = kmr_collapse_as_opaque(kvs, kvs1, 0);
1047  assert(cc == MPI_SUCCESS);
1048  assert(!kmr_fields_pointer_p(kvs1) && kvs->c.block_count <= 1);
1049  cc = kmr_save_kvs(kvs1, dataq, szq, kmr_noopt);
1050  assert(cc == MPI_SUCCESS);
1051  kmr_free_kvs(kvs1);
1052  return MPI_SUCCESS;
1053  }
1054  assert(!kmr_fields_pointer_p(kvs));
1055  size_t netsz = kvs->c.storage_netsize;
1056  size_t blocksz = (netsz + kmr_kvs_block_header + kmr_kvs_entry_header);
1057  size_t sz = (sizeof(KMR_KVS) + blocksz);
1058  unsigned char *b = malloc(sz);
1059  if (b == 0) {
1060  return MPI_ERR_BUFFER;
1061  }
1062  KMR_KVS *h = (void *)b;
1063  struct kmr_kvs_block *s = (void *)(b + sizeof(KMR_KVS));
1064  memcpy(h, kvs, sizeof(KMR_KVS));
1065  h->c.magic = KMR_KVS_ONCORE_PACKED;
1066  h->c.mr = 0;
1067  h->c.link.next = 0;
1068  h->c.link.prev = 0;
1069  h->c.block_count = 1;
1070  h->c.first_block = 0;
1071  h->c.current_block = 0;
1072  h->c.adding_point = 0;
1073  h->c.ms = 0;
1074  h->c.temporary_data = 0;
1075  if (kvs->c.block_count == 0) {
1076  /*nothing*/
1077  } else if (kvs->c.block_count == 1) {
1078  memcpy(s, kvs->c.first_block, blocksz);
1079  s->size = blocksz;
1080  } else {
1081  xassert(NEVERHERE);
1082  }
1083  *dataq = b;
1084  *szq = sz;
1085  return MPI_SUCCESS;
1086 }
1087 
1088 /** Unpacks locally the contents of a key-value stream from a byte
1089  array. It is a reverse of kmr_save_kvs(). */
1090 
1091 int
1092 kmr_restore_kvs(KMR_KVS *kvo, void *data, size_t sz_,
1093  struct kmr_option opt)
1094 {
1095  assert(kvo != 0 && kvo->c.magic == KMR_KVS_ONCORE);
1096  kmr_check_fn_options(kvo->c.mr, kmr_noopt, opt, __func__);
1097  int cc;
1098  unsigned char *b = data;
1099  KMR_KVS *h = (void *)b;
1100  unsigned char *s = (b + sizeof(KMR_KVS));
1101  if (h->c.magic != KMR_KVS_ONCORE_PACKED) {
1102  kmr_warning(kvo->c.mr, 1, "Bad packed data, magic mismatch");
1103  return MPI_ERR_TYPE;
1104  }
1105  size_t netsz = h->c.storage_netsize;
1106  size_t blocksz = (netsz + kmr_kvs_block_header + kmr_kvs_entry_header);
1107  cc = kmr_allocate_block(kvo, netsz);
1108  assert(cc == MPI_SUCCESS);
1109  if (netsz != 0) {
1110  memcpy(kvo->c.first_block, s, blocksz);
1111  }
1112  kvo->c.key_data = h->c.key_data;
1113  kvo->c.value_data = h->c.value_data;
1114  assert(kvo->c.sorted == 0);
1115  kvo->c.element_count = h->c.element_count;
1116  kmr_kvs_adjust_adding_point(kvo);
1117  kmr_add_kv_done(kvo);
1118  return MPI_SUCCESS;
1119 }
1120 
1121 /* ================================================================ */
1122 
1123 /* Calls a map-function on entries in aggregate. EV holds EVCNT
1124  entries. MAPCOUNT is a counter of mapped entries from the
1125  beginning. */
1126 
1127 static inline int
1128 kmr_map_parked(struct kmr_kv_box *ev, long evcnt, long mapcount,
1129  _Bool k_reclaim, _Bool v_reclaim,
1130  KMR_KVS *kvi, KMR_KVS *kvo, kmr_mapfn_t m,
1131  void *arg, struct kmr_option opt)
1132 {
1133  int cc;
1134  KMR *mr = kvi->c.mr;
1135  long cnt = kvi->c.element_count;
1136  if (mr->single_thread || opt.nothreading) {
1137  for (long i = 0; i < evcnt; i++) {
1138  double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
1139  cc = (*m)(ev[i], kvi, kvo, arg, (mapcount + i));
1140  double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
1141  if (cc != MPI_SUCCESS) {
1142  char ee[80];
1143  snprintf(ee, sizeof(ee),
1144  "Map-fn returned with error cc=%d", cc);
1145  kmr_error(mr, ee);
1146  }
1147  if (mr->log_traces != 0) {
1148  kmr_log_map(mr, kvi, &ev[i], (mapcount + 1), cnt,
1149  m, (t1 - t0));
1150  }
1151  }
1152  } else {
1153  if (kvo != 0) {
1154  kvo->c.under_threaded_operation = 1;
1155  }
1156  KMR_OMP_PARALLEL_FOR_
1157  for (long i = 0; i < evcnt; i++) {
1158  double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
1159  int ccx = (*m)(ev[i], kvi, kvo, arg, (mapcount + i));
1160  double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
1161  if (ccx != MPI_SUCCESS) {
1162  char ee[80];
1163  snprintf(ee, sizeof(ee),
1164  "Map-fn returned with error cc=%d", ccx);
1165  kmr_error(mr, ee);
1166  }
1167  if (mr->log_traces != 0) {
1168  kmr_log_map(mr, kvi, &ev[i], (mapcount + 1), cnt,
1169  m, (t1 - t0));
1170  }
1171  }
1172  if (kvo != 0) {
1173  kvo->c.under_threaded_operation = 0;
1174  }
1175  }
1176  for (long i = 0; i < evcnt; i++) {
1177  if (k_reclaim) {
1178  kmr_free((void *)ev[i].k.p, (size_t)ev[i].klen);
1179  }
1180  if (v_reclaim) {
1181  kmr_free((void *)ev[i].v.p, (size_t)ev[i].vlen);
1182  }
1183  }
1184  return MPI_SUCCESS;
1185 }
1186 
1187 /** Maps by skipping the number of entries. It calls a map-function
1188  on entries from FROM, skipping by STRIDE, up to LIMIT
1189  non-inclusive. See kmr_map(). */
1190 
1191 int
1192 kmr_map_skipping(long from, long stride, long limit,
1193  _Bool stop_when_some_added,
1194  KMR_KVS *kvi, KMR_KVS *kvo,
1195  void *arg, struct kmr_option opt, kmr_mapfn_t m)
1196 {
1197  kmr_assert_kvs_ok(kvi, kvo, 1, 0);
1198  assert(from >= 0 && stride > 0 && limit >= 0);
1199  assert(kvi->c.current_block == 0);
1200  limit = ((limit != 0) ? limit : LONG_MAX);
1201  KMR *mr = kvi->c.mr;
1202 
1203  if (kmr_ckpt_enabled(mr)) {
1204  if (kmr_ckpt_progress_init(kvi, kvo, opt)){
1205  if (kvo != 0 && !opt.keep_open) {
1206  kmr_add_kv_done(kvo);
1207  }
1208  if (!opt.inspect) {
1209  kmr_free_kvs(kvi);
1210  }
1211  return MPI_SUCCESS;
1212  }
1213  from = kmr_ckpt_first_unprocessed_kv(mr);
1215  }
1216 
1217  int cc;
1218 
1219  if (mr->step_sync) {
1220  cc = MPI_Barrier(mr->comm);
1221  assert(MPI_SUCCESS);
1222  }
1223  if (kvo != 0 && opt.collapse) {
1224  assert(!kmr_fields_pointer_p(kvo));
1225  }
1226  _Bool k_reclaim = (!opt.inspect && (kmr_key_pointer_p(kvi)));
1227  _Bool v_reclaim = (!opt.inspect && (kmr_value_pointer_p(kvi)));
1228  long evsz = mr->mapper_park_size;
1229  struct kmr_kv_box *
1230  ev = kmr_malloc((sizeof(struct kmr_kv_box) * (size_t)evsz));
1231  long evcnt = 0;
1232  long mapcount = 0;
1233  long nextindex = from;
1234  long index = 0;
1235  kvi->c.current_block = kvi->c.first_block;
1236  while (index < kvi->c.element_count) {
1237  assert(kvi->c.current_block != 0);
1238  struct kmr_kvs_block *b = kvi->c.current_block;
1239  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, b);
1240  for (int i = 0; i < b->partial_element_count; i++) {
1241  assert(e != 0);
1242  if (index == nextindex && index < limit) {
1243  ev[evcnt++] = kmr_pick_kv(e, kvi);
1244  nextindex = (index + stride);
1245  if (k_reclaim) {
1246  union kmr_unit_sized *w = kmr_point_key(e);
1247  w->p = 0;
1248  }
1249  if (v_reclaim) {
1250  union kmr_unit_sized *w = kmr_point_value(e);
1251  w->p = 0;
1252  }
1253  } else {
1254  if (k_reclaim) {
1255  union kmr_unit_sized *w = kmr_point_key(e);
1256  kmr_free((void *)w->p, (size_t)e->klen);
1257  w->p = 0;
1258  }
1259  if (v_reclaim) {
1260  union kmr_unit_sized *w = kmr_point_value(e);
1261  kmr_free((void *)w->p, (size_t)e->vlen);
1262  w->p = 0;
1263  }
1264  }
1265  if (evcnt >= evsz) {
1266  cc = kmr_map_parked(ev, evcnt, mapcount, k_reclaim, v_reclaim,
1267  kvi, kvo, m, arg, opt);
1268  assert(cc == MPI_SUCCESS);
1269 
1270  if (kmr_ckpt_enabled(mr)) {
1271  kmr_ckpt_save_kvo_block_add(mr, kvo, evcnt);
1272  }
1273 
1274  mapcount += evcnt;
1275  evcnt = 0;
1276 
1277  if (stop_when_some_added) {
1278  _Bool done;
1279  if (mr->stop_at_some_check_globally) {
1280  done = 0;
1281  } else {
1282  done = (kvo->c.element_count != 0);
1283  }
1284  if (done) {
1285  /* Fake as if go to the end. */
1286  index = (kvi->c.element_count - 1);
1287  while (b->next != 0) {
1288  b = b->next;
1289  }
1290  }
1291  }
1292  }
1293  e = kmr_kvs_next(kvi, e, 1);
1294  index++;
1295  }
1296  kvi->c.current_block = b->next;
1297  }
1298  assert(kvi->c.current_block == 0);
1299  if (evcnt > 0) {
1300  cc = kmr_map_parked(ev, evcnt, mapcount, k_reclaim, v_reclaim,
1301  kvi, kvo, m, arg, opt);
1302  assert(cc == MPI_SUCCESS);
1303 
1304  if (kmr_ckpt_enabled(mr)) {
1305  kmr_ckpt_save_kvo_block_add(mr, kvo, evcnt);
1306  }
1307 
1308  mapcount += evcnt;
1309  evcnt = 0;
1310  }
1311  if (kvo != 0 && !opt.keep_open) {
1312  kmr_add_kv_done(kvo);
1313  }
1314 
1315  if (kmr_ckpt_enabled(mr)) {
1316  kmr_ckpt_save_kvo_block_fin(mr, kvo);
1317  }
1318 
1319  if (!opt.inspect) {
1320  kmr_free_kvs(kvi);
1321  }
1322  if (ev != 0) {
1323  kmr_free(ev, (sizeof(struct kmr_kv_box) * (size_t)evsz));
1324  }
1325 
1326  if (kmr_ckpt_enabled(mr)) {
1328  }
1329  return MPI_SUCCESS;
1330 }
1331 
1332 /** Maps simply. It consumes the input key-value stream KVI unless
1333  INSPECT option is marked. The output key-value stream KVO can be
1334  null, but in that case, a map-function cannot add key-value pairs.
1335  The pointer ARG is just passed to a map-function as a general
1336  argument, where accesses to it should be race-free, since a
1337  map-function is called by threads by default. M is the
1338  map-function. See the description on the type ::kmr_mapfn_t. It
1339  copeis the contents of the input KVI to the output KVO, when a
1340  map-function is null. During processing, it first makes an array
1341  pointing to the key-value entries in each data block, and works on
1342  it for ease threading/parallelization. Effective-options:
1343  NOTHREADING, INSPECT, KEEP_OPEN, COLLAPSE, TAKE_CKPT.
1344  See struct kmr_option. */
1345 
1346 int
1347 kmr_map9(_Bool stop_when_some_added,
1348  KMR_KVS *kvi, KMR_KVS *kvo,
1349  void *arg, struct kmr_option opt, kmr_mapfn_t m,
1350  const char *file, const int line, const char *func)
1351 {
1352  int cc;
1353  kmr_assert_kvs_ok(kvi, kvo, 1, 0);
1354  KMR *mr = kvi->c.mr;
1355  struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
1356  .keep_open = 1, .collapse = 1,
1357  .take_ckpt = 1};
1358  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
1359 
1360  kmr_trace_entry_t * kte_start = 0;
1361  if (mr->kmrviz_trace) {
1362  kte_start = kmr_trace_add_entry(mr, KMR_TRACE_EVENT_MAP, 0, kvi, kvo);
1363  }
1364  struct kmr_code_line info;
1365  if (mr->atwork == 0) {
1366  info.file = file;
1367  info.func = func;
1368  info.line = line;
1369  mr->atwork = &info;
1370  }
1371 
1372  if (m != 0) {
1373  cc = kmr_map_skipping(0, 1, 0, stop_when_some_added,
1374  kvi, kvo, arg, opt, m);
1375  } else {
1376  assert(!opt.inspect && !opt.keep_open);
1377  cc = kmr_move_kvs(kvi, kvo, opt);
1378  }
1379 
1380  if (mr->atwork == &info) {
1381  mr->atwork = 0;
1382  }
1383  if (mr->kmrviz_trace) {
1384  if (!opt.inspect) {
1385  kvi = 0;
1386  }
1387  kmr_trace_add_entry(mr, KMR_TRACE_EVENT_MAP, kte_start, kvi, kvo);
1388  }
1389 
1390  return cc;
1391 }
1392 
1393 /** Maps sequentially with rank by rank for debugging. See
1394  kmr_map. */
1395 
1396 int
1398  void *arg, struct kmr_option opt, kmr_mapfn_t m)
1399 {
1400  KMR *mr = kvi->c.mr;
1401  int nprocs = mr->nprocs;
1402  int cc;
1403  if (mr->rank != 0) {
1404  cc = MPI_Recv(0, 0, MPI_INT, (mr->rank - 1),
1405  KMR_TAG_MAP_BY_RANK, mr->comm, MPI_STATUS_IGNORE);
1406  assert(cc == MPI_SUCCESS);
1407  }
1408  cc = kmr_map(kvi, kvo, arg, opt, m);
1409  assert(cc == MPI_SUCCESS);
1410  fflush(0);
1411  if (mr->rank != (nprocs - 1)) {
1412  usleep(1 * 1000);
1413  cc = MPI_Send(0, 0, MPI_INT, (mr->rank + 1),
1414  KMR_TAG_MAP_BY_RANK, mr->comm);
1415  assert(cc == MPI_SUCCESS);
1416  }
1417  return MPI_SUCCESS;
1418 }
1419 
1420 /** Extracts a single key-value pair locally in the key-value stream
1421  KVI. It is an error when zero or more than one entries are in the
1422  KVI. It does not consume the input KVS (INSPECT IMPLIED). The
1423  returned key-value entry must be used before freeing the input
1424  KVS, when it points to an opaque data. */
1425 
1426 int
1428 {
1429  kmr_assert_kvs_ok(kvi, 0, 1, 0);
1430  assert(kvi->c.current_block == 0);
1431  KMR *mr = kvi->c.mr;
1432  kvi->c.current_block = kvi->c.first_block;
1433  if (kvi->c.element_count == 1) {
1434  assert(kvi->c.current_block != 0);
1435  struct kmr_kvs_block *b = kvi->c.current_block;
1436  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, b);
1437  assert(b->partial_element_count == 1);
1438  assert(e != 0);
1439  *kv = kmr_pick_kv(e, kvi);
1440  kvi->c.current_block = 0;
1441  return MPI_SUCCESS;
1442  } else {
1443  if (kvi->c.element_count == 0) {
1444  kmr_warning(mr, 1, "kmr_take_one for no entries");
1445  /*return MPI_ERR_COUNT;*/
1446  } else {
1447  kmr_warning(mr, 1, "kmr_take_one for multiple entries");
1448  /*return MPI_ERR_COUNT;*/
1449  }
1450  MPI_Abort(MPI_COMM_WORLD, 1);
1451  return MPI_SUCCESS;
1452  }
1453 }
1454 
1455 /** Maps once. It calls a map-function once with a dummy key-value
1456  stream and a dummy key-value pair. See kmr_map().
1457  Effective-options: KEEP_OPEN, TAKE_CKPT. See struct kmr_option. */
1458 
1459 int
1460 kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt,
1461  _Bool rank_zero_only, kmr_mapfn_t m)
1462 {
1463  kmr_assert_kvs_ok(0, kvo, 0, 1);
1464  KMR *mr = kvo->c.mr;
1465  struct kmr_option kmr_supported = {.keep_open = 1, .take_ckpt = 1};
1466  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
1467  int rank = mr->rank;
1468  int cc;
1469 
1470  kmr_trace_entry_t * kte_start = 0;
1471  if (mr->kmrviz_trace) {
1472  kte_start = kmr_trace_add_entry(mr, KMR_TRACE_EVENT_MAP_ONCE, 0,
1473  0, kvo);
1474  }
1475  if (kmr_ckpt_enabled(mr)) {
1476  if (kmr_ckpt_progress_init(0, kvo, opt)) {
1477  kmr_add_kv_done(kvo);
1478  return MPI_SUCCESS;
1479  }
1480  }
1481 
1482  if (!rank_zero_only || rank == 0) {
1483  struct kmr_kv_box kv = {.klen = 0, .vlen = 0, .k.i = 0, .v.i = 0};
1484  cc = (*m)(kv, 0, kvo, arg, 0);
1485  if (cc != MPI_SUCCESS) {
1486  char ee[80];
1487  snprintf(ee, sizeof(ee),
1488  "Map-fn returned with error cc=%d", cc);
1489  kmr_error(mr, ee);
1490  }
1491  }
1492  if (!opt.keep_open) {
1493  cc = kmr_add_kv_done(kvo);
1494  assert(cc == MPI_SUCCESS);
1495  }
1496 
1497  if (kmr_ckpt_enabled(mr)) {
1498  kmr_ckpt_save_kvo_whole(mr, kvo);
1500  }
1501  if (mr->kmrviz_trace) {
1502  kmr_trace_add_entry(mr, KMR_TRACE_EVENT_MAP_ONCE, kte_start, 0, kvo);
1503  }
1504 
1505  return MPI_SUCCESS;
1506 }
1507 
1508 /** Maps on rank0 only. It calls a map-function once with a dummy
1509  key-value stream and a dummy key-value pair. It is used to avoid
1510  low-level conditionals like (myrank==0). See kmr_map().
1511  Effective-options: KEEP_OPEN, TAKE_CKPT. See struct kmr_option. */
1512 
1513 int
1514 kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt,
1515  kmr_mapfn_t m)
1516 {
1517  int cc;
1518  cc = kmr_map_once(kvo, arg, opt, 1, m);
1519  assert(cc == MPI_SUCCESS);
1520  return MPI_SUCCESS;
1521 }
1522 
1523 /* ================================================================ */
1524 
1525 #if 1
1526 
1527 /* Hashes. (MurmurHash3; http://code.google.com/p/smhasher). */
1528 
1529 static inline unsigned long
1530 kmr_hash_key_opaque(const unsigned char *p, int n)
1531 {
1532 #define ROT(X,N) ((X) << (N) | (X) >> (64-(N)))
1533 #define KEY(V) (k = (V), k *= 0x87c37b91114253d5UL, \
1534  k = ROT(k, 31), k *= 0x4cf5ad432745937fUL)
1535 #define MIX() (h ^= k, h = ROT(h, 31), h = h * 5 + 0xe6546b64)
1536 #define FIN() (h ^= (h >> 33), h *= 0xff51afd7ed558ccdUL, h ^= (h >> 33), \
1537  h *= 0xc4ceb9fe1a85ec53UL, h ^= (h >> 33))
1538  unsigned long h = 0x85ebca6bUL; /*c2b2ae35*/
1539  unsigned long k;
1540  const unsigned long *v = (void *)p;
1541  int n8 = (n / 8);
1542  int rn = (n - (8 * n8));
1543  const unsigned char *r = &p[(8 * n8)];
1544  for (int i = 0; i < n8; i++) {
1545  KEY(v[i]);
1546  MIX();
1547  }
1548  union {unsigned long i; unsigned char c[8];} u = {.i = 0UL};
1549  for (int i = 0; i < rn; i++) {
1550  u.c[i] = r[i];
1551  }
1552  KEY(u.i);
1553  MIX();
1554  FIN();
1555  return h;
1556 #undef ROT
1557 #undef KEY
1558 #undef MIX
1559 #undef FIN
1560 }
1561 
1562 #elif 0
1563 
1564 /* (java 1.2 hash). */
1565 
1566 static inline unsigned long
1567 kmr_hash_key_opaque(const unsigned char *p, int n)
1568 {
1569  unsigned long hash = 0;
1570  for (i = 0 ; i < n ; i++) {
1571  hash *= 31;
1572  hash += p[i];
1573  }
1574  return hash;
1575 }
1576 
1577 #else
1578 
1579 /* (java 1.0 hash). */
1580 
1581 static inline unsigned long
1582 kmr_hash_key_opaque(const unsigned char *p, int n)
1583 {
1584  unsigned long hash = 0;
1585  int m, k, i;
1586  if (n <= 15) {
1587  m = 1;
1588  for (i = 0 ; i < n ; i++) {
1589  hash += (p[i] * m);
1590  m *= 37;
1591  }
1592  } else {
1593  m = 1;
1594  k = n / 8;
1595  for (i = 0 ; i < n ; i += k) {
1596  hash += (p[i] * m);
1597  m *= 39;
1598  }
1599  }
1600  return hash;
1601 }
1602 
1603 #endif
1604 
1605 /* Makes an integer key for a key-value pair for shuffling. It
1606  returns the long bit representation for float. This hash is OK for
1607  the key_as_rank option. (kmr_hash_key() is for shuffling, and
1608  kmr_stable_key() is for sorting). */
1609 
1610 static inline signed long
1611 kmr_hash_key(const KMR_KVS *kvs, const struct kmr_kv_box kv)
1612 {
1613  switch (kvs->c.key_data) {
1614  case KMR_KV_BAD:
1615  xassert(kvs->c.key_data != KMR_KV_BAD);
1616  return -1;
1617  case KMR_KV_OPAQUE:
1618  case KMR_KV_CSTRING:
1619  return (signed long)kmr_hash_key_opaque((void *)kv.k.p, kv.klen);
1620  case KMR_KV_INTEGER:
1621  return kv.k.i;
1622  case KMR_KV_FLOAT8:
1623  return kv.k.i;
1624  case KMR_KV_POINTER_OWNED:
1625  case KMR_KV_POINTER_UNMANAGED:
1626  xassert(kvs->c.key_data != KMR_KV_POINTER_OWNED
1627  && kvs->c.key_data != KMR_KV_POINTER_UNMANAGED);
1628  return -1;
1629  default:
1630  xassert(NEVERHERE);
1631  return -1;
1632  }
1633 }
1634 
1635 /* Returns an integer of the first 8 bytes, shifted to the right by
1636  one-bit to make it positive. It is consistent with the ordering by
1637  memcmp(). */
1638 
1639 static inline long
1640 kmr_stable_key_opaque(const struct kmr_kv_box kv)
1641 {
1642  unsigned char *p = (unsigned char *)kv.k.p;
1643  int n = kv.klen;
1644  unsigned long hash = 0;
1645  for (int i = 0; i < (int)sizeof(long); i++) {
1646  unsigned char v = ((i < n) ? p[i] : 0);
1647  hash = ((hash << 8) + v);
1648  }
1649  return (long)(hash >> 1);
1650 }
1651 
1652 /* Makes an integer key for a key-value pair for sorting. It returns
1653  a signed value for comparing integers. It is consistent with the
1654  ordering by memcmp() for opaque keys. (kmr_hash_key() is for
1655  shuffling, and kmr_stable_key() is for sorting). */
1656 
1657 signed long
1658 kmr_stable_key(const struct kmr_kv_box kv, const KMR_KVS *kvs)
1659 {
1660  switch (kvs->c.key_data) {
1661  case KMR_KV_BAD:
1662  xassert(kvs->c.key_data != KMR_KV_BAD);
1663  return -1;
1664  case KMR_KV_OPAQUE:
1665  case KMR_KV_CSTRING:
1666  return kmr_stable_key_opaque(kv);
1667  case KMR_KV_INTEGER:
1668  return kv.k.i;
1669  case KMR_KV_FLOAT8:
1670  {
1671  long v0 = kv.k.i;
1672  long v1 = ((v0 >= 0L) ? v0 : ((-v0) | (1L << 63)));
1673  /*assert(v0 >= 0 || v1 < 0);*/
1674  return v1;
1675  }
1676  case KMR_KV_POINTER_OWNED:
1677  case KMR_KV_POINTER_UNMANAGED:
1678  xassert(kvs->c.key_data != KMR_KV_POINTER_OWNED
1679  && kvs->c.key_data != KMR_KV_POINTER_UNMANAGED);
1680  return -1;
1681  default:
1682  xassert(NEVERHERE);
1683  return -1;
1684  }
1685 }
1686 
1687 /* Determines a rank to which this key-value entry is directed. It is
1688  bases on the hashed keys. */
1689 
1690 int
1691 kmr_pitch_rank(const struct kmr_kv_box kv, KMR_KVS *kvs)
1692 {
1693  unsigned int nprocs = (unsigned int)kvs->c.mr->nprocs;
1694  unsigned long v = (unsigned long)kmr_hash_key(kvs, kv);
1695  unsigned int h = (((v >> 32) ^ v) & ((1L << 32) - 1));
1696  return (int)(h % nprocs);
1697 }
1698 
1699 /* Compares in three-way, returning -1, 0, or 1. */
1700 
1701 #define KMR_CMP3(X, Y) (((X) == (Y)) ? 0 : ((X) < (Y)) ? -1 : 1)
1702 
1703 /* Compares byte strings lexicographically. This is compatible to
1704  memcmp() when the lengths are equal, for the terasort
1705  requirement. */
1706 
1707 static inline int
1708 kmr_compare_lexicographically(const unsigned char *p, const int plen,
1709  const unsigned char *q, const int qlen)
1710 {
1711  int s = MIN(plen, qlen);
1712 #if 0
1713  for (int i = 0; i < s; i++) {
1714  if (p[i] != q[i]) {
1715  return (p[i] - q[i]);
1716  }
1717  }
1718 #endif
1719  int cc = memcmp(p, q, (size_t)s);
1720  if (cc != 0) {
1721  return cc;
1722  } else {
1723  return (plen - qlen);
1724  }
1725 }
1726 
1727 /* Compares keys lexicographically as byte strings. */
1728 
1729 static int
1730 kmr_compare_opaque(const struct kmr_kv_box *p,
1731  const struct kmr_kv_box *q)
1732 {
1733  return kmr_compare_lexicographically((unsigned char *)p->k.p, p->klen,
1734  (unsigned char *)q->k.p, q->klen);
1735 }
1736 
1737 static int
1738 kmr_compare_integer(const struct kmr_kv_box *p0,
1739  const struct kmr_kv_box *p1)
1740 {
1741  return KMR_CMP3(p0->k.i, p1->k.i);
1742 }
1743 
1744 static int
1745 kmr_compare_float8(const struct kmr_kv_box *p0,
1746  const struct kmr_kv_box *p1)
1747 {
1748  return KMR_CMP3(p0->k.d, p1->k.d);
1749 }
1750 
1751 /* Compares keys lexicographically as byte strings. */
1752 
1753 static int
1754 kmr_compare_record_opaque(const struct kmr_keyed_record *p0,
1755  const struct kmr_keyed_record *p1)
1756 {
1757  struct kmr_kv_box b0 = kmr_pick_kv2(p0->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1758  struct kmr_kv_box b1 = kmr_pick_kv2(p1->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1759  return kmr_compare_lexicographically((unsigned char *)b0.k.p, b0.klen,
1760  (unsigned char *)b1.k.p, b1.klen);
1761 }
1762 
1763 /* (UNUSED) Sorting uses the key part of the record and it does not
1764  peek in the kmr_kv_box. */
1765 
1766 static int
1767 kmr_compare_record_integer_(const struct kmr_keyed_record *p0,
1768  const struct kmr_keyed_record *p1)
1769 {
1770  assert(NEVERHERE);
1771  struct kmr_kv_box b0 = kmr_pick_kv2(p0->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1772  struct kmr_kv_box b1 = kmr_pick_kv2(p1->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1773  long v0 = b0.k.i;
1774  long v1 = b1.k.i;
1775  return KMR_CMP3(v0, v1);
1776 }
1777 
1778 /* (UNUSED) Sorting uses the key part of the record and it does not
1779  peek in the kmr_kv_box. */
1780 
1781 static int
1782 kmr_compare_record_float8_(const struct kmr_keyed_record *p0,
1783  const struct kmr_keyed_record *p1)
1784 {
1785  assert(NEVERHERE);
1786  struct kmr_kv_box b0 = kmr_pick_kv2(p0->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1787  struct kmr_kv_box b1 = kmr_pick_kv2(p1->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1788  double v0 = b0.k.d;
1789  double v1 = b1.k.d;
1790  return KMR_CMP3(v0, v1);
1791 }
1792 
1793 /* Returns an appropriate comparator for kmr_kv_box. */
1794 
1795 kmr_sorter_t
1796 kmr_choose_sorter(const KMR_KVS *kvs)
1797 {
1798  switch (kvs->c.key_data) {
1799  case KMR_KV_BAD:
1800  xassert(kvs->c.key_data != KMR_KV_BAD);
1801  return 0;
1802  case KMR_KV_INTEGER:
1803  return kmr_compare_integer;
1804  case KMR_KV_FLOAT8:
1805  return kmr_compare_float8;
1806  case KMR_KV_OPAQUE:
1807  case KMR_KV_CSTRING:
1808  case KMR_KV_POINTER_OWNED:
1809  case KMR_KV_POINTER_UNMANAGED:
1810  return kmr_compare_opaque;
1811  default:
1812  xassert(NEVERHERE);
1813  return 0;
1814  }
1815 }
1816 
1817 /* Returns an appropriate comparator for keyed-records. It is only
1818  called for KMR_KV_OPAQUE. */
1819 
1820 static kmr_record_sorter_t
1821 kmr_choose_record_sorter(const KMR_KVS *kvs)
1822 {
1823  switch (kvs->c.key_data) {
1824  case KMR_KV_BAD:
1825  xassert(kvs->c.key_data != KMR_KV_BAD);
1826  return 0;
1827  case KMR_KV_INTEGER:
1828  assert(NEVERHERE);
1829  return kmr_compare_record_integer_;
1830  case KMR_KV_FLOAT8:
1831  assert(NEVERHERE);
1832  return kmr_compare_record_float8_;
1833  case KMR_KV_OPAQUE:
1834  case KMR_KV_CSTRING:
1835  case KMR_KV_POINTER_OWNED:
1836  case KMR_KV_POINTER_UNMANAGED:
1837  return kmr_compare_record_opaque;
1838  default:
1839  xassert(NEVERHERE);
1840  return 0;
1841  }
1842 }
1843 
1844 #if 0
1845 
1846 /* Copies the entries as keyed-records with hashed keys for
1847  shuffling. */
1848 
1849 static int
1850 kmr_copy_record_shuffle_fn(const struct kmr_kv_box kv,
1851  const KMR_KVS *kvi, KMR_KVS *kvo, void *p,
1852  const long i)
1853 {
1854  return MPI_SUCCESS;
1855 }
1856 
1857 /* Copies the entries as keyed-records with stable keys for
1858  sorting. */
1859 
1860 static int
1861 kmr_copy_record_sorting_fn(const struct kmr_kv_box kv,
1862  const KMR_KVS *kvi, KMR_KVS *kvo, void *p,
1863  const long i)
1864 {
1865  return MPI_SUCCESS;
1866 }
1867 
1868 #endif
1869 
1870 /** Compares the key field of keyed-records for qsort/bsearch. */
1871 
1872 static int
1873 kmr_icmp(const void *a0, const void *a1)
1874 {
1875  const struct kmr_keyed_record *p0 = a0;
1876  const struct kmr_keyed_record *p1 = a1;
1877  long v0 = p0->v;
1878  long v1 = p1->v;
1879  return KMR_CMP3(v0, v1);
1880 }
1881 
1882 static int
1883 kmr_sort_locally_lo(KMR_KVS *kvi, KMR_KVS *kvo, _Bool shuffling,
1884  _Bool ranking, struct kmr_option opt)
1885 {
1886  kmr_assert_kvs_ok(kvi, kvo, 1, 1);
1887  assert(kmr_shuffle_compatible_p(kvo, kvi));
1888  KMR *mr = kvi->c.mr;
1889 
1890  kmr_trace_entry_t * kte_start = 0;
1891  if (mr->kmrviz_trace) {
1892  kte_start = kmr_trace_add_entry(mr, KMR_TRACE_EVENT_SORT, 0, kvi, kvo);
1893  }
1894  if (kmr_ckpt_enabled(mr)) {
1895  if (kmr_ckpt_progress_init(kvi, kvo, opt)) {
1896  kmr_add_kv_done(kvo);
1897  kvo->c.sorted = 1;
1898  if (!opt.inspect) {
1899  kmr_free_kvs(kvi);
1900  }
1901  return MPI_SUCCESS;
1902  }
1903  }
1904  _Bool twostep = !mr->one_step_sort;
1905  _Bool primekey = ((kvi->c.key_data == KMR_KV_INTEGER)
1906  || (kvi->c.key_data == KMR_KV_FLOAT8));
1907  double timestamp[5];
1908  int cc;
1909  long cnt = kvi->c.element_count;
1910  timestamp[0] = MPI_Wtime();
1911  size_t evsz = (sizeof(struct kmr_keyed_record) * (size_t)cnt);
1912  struct kmr_keyed_record *ev = kmr_malloc(evsz);
1913 #if 0
1914  struct kmr_option inspect = {
1915  .inspect = 1,
1916  .nothreading = opt.nothreading
1917  };
1918  if (shuffling) {
1919  cc = kmr_map(kvi, 0, ev, inspect, kmr_copy_record_shuffle_fn);
1920  assert(cc == MPI_SUCCESS);
1921  } else {
1922  cc = kmr_map(kvi, 0, ev, inspect, kmr_copy_record_sorting_fn);
1923  assert(cc == MPI_SUCCESS);
1924  }
1925 #else
1926  cc = kmr_retrieve_keyed_records(kvi, ev, cnt, shuffling, ranking);
1927  assert(cc == MPI_SUCCESS);
1928 #endif
1929  timestamp[1] = MPI_Wtime();
1930  if (shuffling || twostep || primekey) {
1931  if (mr->single_thread || opt.nothreading) {
1932  qsort(ev, (size_t)cnt, sizeof(struct kmr_keyed_record), kmr_icmp);
1933  } else {
1934  kmr_isort(ev, (size_t)cnt, sizeof(struct kmr_keyed_record),
1935  mr->sort_threads_depth);
1936  }
1937  }
1938  timestamp[2] = MPI_Wtime();
1939  if (!shuffling && !primekey) {
1940  /* Sort the array sorted by hashed keys, again by true keys. */
1941  long *runs = kmr_malloc(sizeof(long) * (size_t)cnt);
1942  long nruns = 0;
1943  if (twostep) {
1944  long i = 0;
1945  while (i < cnt) {
1946  do {
1947  i++;
1948  if (i == cnt) {
1949  break;
1950  }
1951  cc = KMR_CMP3(ev[i - 1].v, ev[i].v);
1952  } while (cc == 0);
1953  assert(nruns < cnt);
1954  runs[nruns] = i;
1955  nruns++;
1956  }
1957  assert(i == cnt && (cnt == 0 || runs[nruns - 1] == cnt));
1958  } else {
1959  nruns = (cnt == 0 ? 0 : 1);
1960  runs[0] = cnt;
1961  }
1962  kmr_record_sorter_t cmp1 = kmr_choose_record_sorter(kvi);
1963  if (mr->single_thread || opt.nothreading) {
1964  for (long k = 0; k < nruns; k++) {
1965  long j = (k == 0 ? 0 : runs[k - 1]);
1966  long i = runs[k];
1967  assert(j < i);
1968  if ((i - j) > 1) {
1969  qsort(&ev[j], (size_t)(i - j),
1970  sizeof(struct kmr_keyed_record),
1971  (kmr_qsorter_t)cmp1);
1972  }
1973  }
1974  } else {
1975  KMR_OMP_PARALLEL_FOR_
1976  for (long k = 0; k < nruns; k++) {
1977  long j = (k == 0 ? 0 : runs[k - 1]);
1978  long i = runs[k];
1979  assert(j < i);
1980  if ((i - j) > 1) {
1981  qsort(&ev[j], (size_t)(i - j),
1982  sizeof(struct kmr_keyed_record),
1983  (kmr_qsorter_t)cmp1);
1984  }
1985  }
1986  }
1987  kmr_free(runs, (sizeof(long) * (size_t)cnt));
1988  }
1989  timestamp[3] = MPI_Wtime();
1990  size_t sz = kvi->c.storage_netsize;
1991  cc = kmr_allocate_block(kvo, sz);
1992  assert(cc == MPI_SUCCESS);
1993  /*NEED-THREADING*/
1994  for (long i = 0 ; i < cnt; i++) {
1995  struct kmr_kv_box kv = kmr_pick_kv(ev[i].e, kvi);
1996  kmr_add_kv_nomutex(kvo, kv, 0, 0);
1997  }
1998  timestamp[4] = MPI_Wtime();
1999  assert(sz == 0 || kmr_kvs_entry_tail_p(kvo->c.adding_point));
2000  assert(sz == 0 || kvo->c.block_count == 1);
2001  kmr_add_kv_done(kvo);
2002  kmr_assert_on_tail_marker(kvo);
2003  kvo->c.sorted = 1;
2004  kmr_free(ev, evsz);
2005  _Bool tracing = mr->trace_sorting;
2006  if (tracing && (5 <= mr->verbosity)) {
2007  fprintf(stderr, (";;KMR [%05d] kmr_sort_locally"
2008  " time=(%f %f %f %f) (msec)\n"),
2009  mr->rank,
2010  ((timestamp[1] - timestamp[0]) * 1e3),
2011  ((timestamp[2] - timestamp[1]) * 1e3),
2012  ((timestamp[3] - timestamp[2]) * 1e3),
2013  ((timestamp[4] - timestamp[3]) * 1e3));
2014  fflush(0);
2015  }
2016  if (kmr_ckpt_enabled(mr)) {
2017  kmr_ckpt_save_kvo_whole(mr, kvo);
2018  }
2019  if (!opt.inspect) {
2020  kmr_free_kvs(kvi);
2021  kvi = 0;
2022  }
2023  if (kmr_ckpt_enabled(mr)) {
2025  }
2026  if (mr->kmrviz_trace) {
2027  kmr_trace_add_entry(mr, KMR_TRACE_EVENT_SORT, kte_start, kvi, kvo);
2028  }
2029 
2030  return MPI_SUCCESS;
2031 }
2032 
2033 /** Reorders key-value pairs in a single rank. It sorts pairs when
2034  SHUFFLING is false, or gathers pairs with the same hashed keys
2035  adjacent when SHUFFLING is true. It only respects for not
2036  ordering but just equality when shuffling. The sort-keys for
2037  shuffling are destination ranks for shuffling (taking a modulo of
2038  the hashed key with nprocs). As a sorting, it is NOT-STABLE due
2039  to quick-sort used inside. It converts pointer keys and values to
2040  opaque ones for sending.\n
2041  Sorting on a key-value stream is by memcmp(), unless the keys are
2042  integer or floating-point numbers (ordering on integers and
2043  memcmp() are different). Sorting on non-numbers is performed in
2044  two steps: the first step sorts by the integer rankings, and the
2045  second by the specified comparator. And thus, the comparator is
2046  required to have a corresponding generator of integer rankings.
2047  It consumes the input key-value stream. Effective-options:
2048  NOTHREADING, INSPECT, KEY_AS_RANK. */
2049 
2050 int
2051 kmr_sort_locally(KMR_KVS *kvi, KMR_KVS *kvo, _Bool shuffling,
2052  struct kmr_option opt)
2053 {
2054  kmr_assert_kvs_ok(kvi, kvo, 1, 1);
2055  assert(kmr_shuffle_compatible_p(kvo, kvi));
2056  struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
2057  .key_as_rank = 1};
2058  kmr_check_fn_options(kvi->c.mr, kmr_supported, opt, __func__);
2059  _Bool ranking = opt.key_as_rank;
2060  kmr_sort_locally_lo(kvi, kvo, shuffling, ranking, opt);
2061  return MPI_SUCCESS;
2062 }
2063 
2064 /* ================================================================ */
2065 
2066 /* Counts the number of entries in the key-value stream. If
2067  BOUND_IN_BLOCK is true, it counts only ones in a single data
2068  block. */
2069 
2070 static inline long
2071 kmr_count_entries(KMR_KVS *kvs, _Bool bound_in_block)
2072 {
2073  kvs->c.current_block = kvs->c.first_block;
2074  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2075  long cnt = 0;
2076  while (e != 0) {
2077  /*struct kmr_kv_box kv = kmr_pick_kv(e, kvs);*/
2078  /*printf("entry %d %d %s %s\n", kv.klen, kv.vlen, kv.k.p, kv.v.p);*/
2079  cnt++;
2080  e = kmr_kvs_next(kvs, e, bound_in_block);
2081  }
2082  return cnt;
2083 }
2084 
2085 /** Shuffles key-value pairs to the appropriate destination ranks. It
2086  first sorts pairs by the destination ranks of the keys, and then
2087  exchanges pairs with all-to-all communication. It converts
2088  pointer keys and values to opaque ones for sending during the
2089  sorting stage. Note that the key-value pairs are sorted by the
2090  hash-values prior to exchange. Effective-options: INSPECT,
2091  KEY_AS_RANK, TAKE_CKPT. See struct kmr_option. */
2092 
2093 int
2094 kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
2095 {
2096  kmr_assert_kvs_ok(kvi, kvo, 1, 1);
2097  assert(kmr_shuffle_compatible_p(kvo, kvi));
2098  KMR *mr = kvi->c.mr;
2099  struct kmr_option kmr_supported = {.inspect = 1, .key_as_rank = 1,
2100  .take_ckpt = 1};
2101  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
2102  _Bool ranking = opt.key_as_rank;
2103 
2104  kmr_trace_entry_t * kte_start = 0;
2105  if (mr->kmrviz_trace) {
2106  kte_start = kmr_trace_add_entry(mr, KMR_TRACE_EVENT_SHUFFLE, 0,
2107  kvi, kvo);
2108  }
2109 
2110  /* SKIP SHUFFLING IF MARKED AS SHUFFLED. */
2111 
2112  if (kvi->c.magic == KMR_KVS_PUSHOFF) {
2113  kmr_pushoff_make_stationary(kvi);
2114  }
2115  if (kvi->c.shuffled_in_pushoff) {
2116  assert(!mr->ckpt_enable);
2117  int cc = kmr_move_kvs(kvi, kvo, opt);
2118  return cc;
2119  }
2120 
2121  if (kmr_ckpt_enabled(mr)) {
2122  if (kmr_ckpt_progress_init(kvi, kvo, opt)) {
2123  kmr_add_kv_done(kvo);
2124  if (!opt.inspect) {
2125  kmr_free_kvs(kvi);
2126  }
2127  return MPI_SUCCESS;
2128  }
2129  }
2130  int kcdc = kmr_ckpt_disable_ckpt(mr);
2131 
2132  /* Sort for shuffling. */
2133 
2134  enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvi->c.key_data);
2135  enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvi->c.value_data);
2136  struct kmr_option n_opt = opt;
2137  n_opt.inspect = 1;
2138  KMR_KVS *kvs1 = kmr_create_kvs(mr, keyf, valf);
2139  kmr_sort_locally_lo(kvi, kvs1, 1, ranking, n_opt);
2140  assert(kvs1->c.stowed);
2141  /*kmr_dump_kvs(kvs1, 0);*/
2142  /*kmr_guess_communication_pattern_(kvs1, opt);*/
2143  assert(!kmr_fields_pointer_p(kvs1));
2144  assert(kvs1->c.block_count <= 1);
2145 
2146  int cc;
2147  int nprocs = mr->nprocs;
2148  long cnt = kvs1->c.element_count;
2149  long *ssz = kmr_malloc(sizeof(long) * (size_t)nprocs);
2150  long *sdp = kmr_malloc(sizeof(long) * (size_t)nprocs);
2151  long *rsz = kmr_malloc(sizeof(long) * (size_t)nprocs);
2152  long *rdp = kmr_malloc(sizeof(long) * (size_t)nprocs);
2153  for (int r = 0; r < nprocs; r++) {
2154  ssz[r] = 0;
2155  rsz[r] = 0;
2156  }
2157  int rank = 0;
2158  assert(kvs1->c.current_block == 0);
2159  kvs1->c.current_block = kvs1->c.first_block;
2160  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs1, kvs1->c.first_block);
2161  for (long i = 0; i < cnt; i++) {
2162  assert(e != 0);
2163  struct kmr_kv_box kv = kmr_pick_kv(e, kvs1);
2164  int r = (ranking ? (int)kv.k.i : kmr_pitch_rank(kv, kvs1));
2165  assert(0 <= r && r < nprocs);
2166  if (ranking && !(0 <= r && r < nprocs)) {
2167  kmr_error(mr, "key entries are not ranks");
2168  }
2169  if (r < rank) {
2170  kmr_error(mr, "key-value entries are not sorted (internal error)");
2171  }
2172  ssz[r] += (long)kmr_kvs_entry_netsize(e);
2173  rank = r;
2174  e = kmr_kvs_next(kvs1, e, 0);
2175  }
2176  /* Exchange send-receive counts. */
2177  cc = kmr_exchange_sizes(mr, ssz, rsz);
2178  assert(cc == MPI_SUCCESS);
2179  long sendsz = 0;
2180  long recvsz = 0;
2181  for (int r = 0; r < nprocs; r++) {
2182  sdp[r] = sendsz;
2183  sendsz += ssz[r];
2184  rdp[r] = recvsz;
2185  recvsz += rsz[r];
2186  }
2187  cc = kmr_allocate_block(kvo, (size_t)recvsz);
2188  assert(cc == MPI_SUCCESS);
2189  struct kmr_kvs_block *sb = kvs1->c.first_block;
2190  struct kmr_kvs_entry *sbuf = kmr_kvs_first_entry(kvs1, sb);
2191  struct kmr_kvs_block *rb = kvo->c.first_block;
2192  struct kmr_kvs_entry *rbuf = kmr_kvs_first_entry(kvo, rb);
2193  cc = kmr_alltoallv(mr, sbuf, ssz, sdp, rbuf, rsz, rdp);
2194  assert(cc == MPI_SUCCESS);
2195  long ocnt = kmr_count_entries(kvo, 1);
2196  assert(kvo->c.sorted == 0);
2197  kvo->c.element_count = ocnt;
2198  if (recvsz != 0) {
2199  assert(kvo->c.block_count == 1);
2200  rb->partial_element_count = ocnt;
2201  rb->fill_size = (size_t)recvsz;
2202  }
2203  kmr_kvs_adjust_adding_point(kvo);
2204  kmr_add_kv_done(kvo);
2205 
2206  kmr_ckpt_enable_ckpt(mr, kcdc);
2207  if (kmr_ckpt_enabled(mr)) {
2208  kmr_ckpt_save_kvo_whole(mr, kvo);
2209  }
2210 
2211  if (!opt.inspect) {
2212  kmr_free_kvs(kvi);
2213  kvi = 0;
2214  }
2215  assert(kvo->c.element_count == 0 || kvo->c.storage_netsize != 0);
2216  xassert(!kmr_fields_pointer_p(kvo));
2217  kmr_free_kvs(kvs1);
2218  kmr_free(ssz, (sizeof(long) * (size_t)nprocs));
2219  kmr_free(sdp, (sizeof(long) * (size_t)nprocs));
2220  kmr_free(rsz, (sizeof(long) * (size_t)nprocs));
2221  kmr_free(rdp, (sizeof(long) * (size_t)nprocs));
2222  if (kmr_ckpt_enabled(mr)) {
2224  }
2225  if (mr->kmrviz_trace) {
2226  kmr_trace_add_entry(mr, KMR_TRACE_EVENT_SHUFFLE, kte_start, kvi, kvo);
2227  }
2228 
2229  return MPI_SUCCESS;
2230 }
2231 
2232 /** Replicates key-value pairs to be visible on all ranks, that is, it
2233  has the effect of bcast or all-gather. It gathers pairs on rank0
2234  only by the option RANK_ZERO. It moves stably, keeping the
2235  ordering of ranks and the ordering of local key-value pairs.
2236  Effective-options: INSPECT, RANK_ZERO, TAKE_CKPT.
2237  See struct kmr_option. */
2238 
2239 int
2240 kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
2241 {
2242  kmr_assert_kvs_ok(kvi, kvo, 1, 1);
2243  KMR *mr = kvi->c.mr;
2244  struct kmr_option kmr_supported = {.inspect = 1, .rank_zero = 1,
2245  .take_ckpt = 1};
2246  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
2247  int nprocs = mr->nprocs;
2248  int rank = mr->rank;
2249  int cc;
2250  KMR_KVS *kvs1;
2251 
2252  kmr_trace_entry_t * kte_start = 0;
2253  if (mr->kmrviz_trace) {
2254  kte_start = kmr_trace_add_entry(mr, KMR_TRACE_EVENT_REPLICATE, 0,
2255  kvi, kvo);
2256  }
2257 
2258  if (kmr_fields_pointer_p(kvi) || kvi->c.block_count > 1) {
2259  enum kmr_kv_field keyf = kvi->c.key_data;
2260  enum kmr_kv_field valf = kvi->c.value_data;
2261  kvs1 = kmr_create_kvs(mr, keyf, valf);
2262 
2263  int kcdc = kmr_ckpt_disable_ckpt(mr);
2264  cc = kmr_collapse_as_opaque(kvi, kvs1, 1);
2265  assert(cc == MPI_SUCCESS);
2266  kmr_ckpt_enable_ckpt(mr, kcdc);
2267  } else {
2268  kvs1 = kvi;
2269  }
2270  kmr_assert_on_tail_marker(kvs1);
2271  assert(kvs1->c.block_count <= 1);
2272 
2273  if (kmr_ckpt_enabled(mr)) {
2274  if (kmr_ckpt_progress_init(kvi, kvo, opt)) {
2275  kmr_add_kv_done(kvo);
2276  if (kvs1 != kvi) {
2277  cc = kmr_free_kvs(kvs1);
2278  assert(cc == MPI_SUCCESS);
2279  }
2280  if (!opt.inspect) {
2281  cc = kmr_free_kvs(kvi);
2282  assert(cc == MPI_SUCCESS);
2283  }
2284  return MPI_SUCCESS;
2285  }
2286  }
2287 
2288  long *rsz = kmr_malloc(sizeof(long) * (size_t)nprocs);
2289  long *rdp = kmr_malloc(sizeof(long) * (size_t)nprocs);
2290  /* Exchange send-receive counts. */
2291  long ssz = (long)kvs1->c.storage_netsize;
2292  cc = kmr_gather_sizes(mr, ssz, rsz);
2293  assert(cc == MPI_SUCCESS);
2294  long recvsz = 0;
2295  if (!opt.rank_zero || rank == 0) {
2296  for (int r = 0; r < nprocs; r++) {
2297  rdp[r] = recvsz;
2298  recvsz += rsz[r];
2299  }
2300  }
2301  if (!(kvo->c.key_data == kvs1->c.key_data
2302  && kvo->c.value_data == kvs1->c.value_data)) {
2303  kmr_error(mr, "key-data or value-data types mismatch");
2304  }
2305  cc = kmr_allocate_block(kvo, (size_t)recvsz);
2306  assert(cc == MPI_SUCCESS);
2307  struct kmr_kvs_block *sb = kvs1->c.first_block;
2308  struct kmr_kvs_entry *sbuf = kmr_kvs_first_entry(kvs1, sb);
2309  struct kmr_kvs_block *rb = kvo->c.first_block;
2310  struct kmr_kvs_entry *rbuf = kmr_kvs_first_entry(kvo, rb);
2311  cc = kmr_allgatherv(mr, opt.rank_zero, sbuf, ssz, rbuf, rsz, rdp);
2312  assert(cc == MPI_SUCCESS);
2313  assert(kvo->c.element_count == 0);
2314  long ocnt = kmr_count_entries(kvo, 1);
2315  kvo->c.element_count = ocnt;
2316  if (recvsz != 0) {
2317  rb->partial_element_count = ocnt;
2318  rb->fill_size = (size_t)recvsz;
2319  }
2320  kmr_kvs_adjust_adding_point(kvo);
2321  cc = kmr_add_kv_done(kvo);
2322  assert(cc == MPI_SUCCESS);
2323  kmr_assert_on_tail_marker(kvo);
2324  assert(kvo->c.element_count == 0 || kvo->c.storage_netsize != 0);
2325 
2326  if (kmr_ckpt_enabled(mr)) {
2327  kmr_ckpt_save_kvo_whole(mr, kvo);
2328  }
2329 
2330  if (kvs1 != kvi) {
2331  cc = kmr_free_kvs(kvs1);
2332  assert(cc == MPI_SUCCESS);
2333  }
2334  if (!opt.inspect) {
2335  cc = kmr_free_kvs(kvi);
2336  assert(cc == MPI_SUCCESS);
2337  kvi = 0;
2338  }
2339  kmr_free(rsz, (sizeof(long) * (size_t)nprocs));
2340  kmr_free(rdp, (sizeof(long) * (size_t)nprocs));
2341 
2342  if (kmr_ckpt_enabled(mr)) {
2344  }
2345  if (mr->kmrviz_trace) {
2346  kmr_trace_add_entry(mr, KMR_TRACE_EVENT_REPLICATE, kte_start,
2347  kvi, kvo);
2348  }
2349  return MPI_SUCCESS;
2350 }
2351 
2352 /* ================================================================ */
2353 
2354 /* Copies the entires as the keyed-records. The keyed-records hould
2355  hashed keys for sorting. */
2356 
2357 static int
2358 kmr_copy_record_fn(const struct kmr_kv_box kv,
2359  const KMR_KVS *kvi, KMR_KVS *kvo, void *p,
2360  const long i)
2361 {
2362  struct kmr_kv_box *ev = p;
2363  ev[i] = kv;
2364  return MPI_SUCCESS;
2365 }
2366 
2367 /* Reduces key-value pairs. The version "nothreading" uses less
2368  memory for sort-records than the version "threading". */
2369 
2370 static int
2371 kmr_reduce_nothreading(KMR_KVS *kvi, KMR_KVS *kvo,
2372  void *arg, struct kmr_option opt, kmr_redfn_t r)
2373 {
2374  xassert(kvi->c.current_block == 0);
2375  kmr_sorter_t cmp = kmr_choose_sorter(kvi);
2376  KMR *mr = kvi->c.mr;
2377  long cnt = kvi->c.element_count;
2378  int cc;
2379  struct kmr_kv_box *ev = 0;
2380  long evsz = 0;
2381  kvi->c.current_block = kvi->c.first_block;
2382  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, kvi->c.first_block);
2383  long index = 0;
2384  long redcount = 0;
2385 
2386  if (kmr_ckpt_enabled(mr)) {
2387  if (kmr_ckpt_progress_init(kvi, kvo, opt)) {
2388  if (kvo != 0) {
2389  kmr_add_kv_done(kvo);
2390  }
2391  return MPI_SUCCESS;
2392  }
2393  long start_from = kmr_ckpt_first_unprocessed_kv(mr);
2394  while (index < start_from) {
2395  e = kmr_kvs_next(kvi, e, 1);
2396  index++;
2397  }
2399  }
2400 
2401  for (;;) {
2402  struct kmr_kvs_entry *ej = e;
2403  long n = 0;
2404  while (e != 0) {
2405  assert(index < cnt);
2406  e = kmr_kvs_next(kvi, e, 1);
2407  n++;
2408  index++;
2409  if (e == 0) {
2410  break;
2411  }
2412  //struct kmr_keyed_record s0 = {.v = 0, .e = kmr_pick_kv(ej, kvi)};
2413  //struct kmr_keyed_record s1 = {.v = 0, .e = kmr_pick_kv(e, kvi)};
2414  struct kmr_kv_box kv0 = kmr_pick_kv(ej, kvi);
2415  struct kmr_kv_box kv1 = kmr_pick_kv(e, kvi);
2416  cc = (*cmp)(&kv0, &kv1);
2417  if (cc != 0) {
2418  break;
2419  }
2420  }
2421  if (n == 0) {
2422  assert(ej == 0 && e == 0);
2423  break;
2424  }
2425  assert(n > 0);
2426  if (n > evsz) {
2427  evsz = n;
2428  ev = kmr_realloc(ev, (sizeof(struct kmr_kv_box) * (size_t)evsz));
2429  assert(ev != 0);
2430  }
2431  e = ej;
2432  for (long i = 0; i < n; i++) {
2433  assert(e != 0);
2434  ev[i] = kmr_pick_kv(e, kvi);
2435  e = kmr_kvs_next(kvi, e, 1);
2436  }
2437 
2438  double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
2439  cc = (*r)(ev, n, kvi, kvo, arg);
2440  double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
2441  if (cc != MPI_SUCCESS) {
2442  char ee[80];
2443  snprintf(ee, sizeof(ee),
2444  "Reduce-fn returned with error cc=%d", cc);
2445  kmr_error(mr, ee);
2446  }
2447  if (mr->log_traces != 0) {
2448  kmr_log_reduce(mr, kvi, ev, n, r, (t1 - t0));
2449  }
2450 
2451  if (kmr_ckpt_enabled(mr)) {
2452  kmr_ckpt_save_kvo_block_add(mr, kvo, n);
2453  }
2454 
2455  redcount += n;
2456  }
2457  assert(index == cnt);
2458  if (kmr_ckpt_enabled(mr)) {
2459  kmr_ckpt_save_kvo_block_fin(mr, kvo);
2460  }
2461  if (kvo != 0) {
2462  kmr_add_kv_done(kvo);
2463  }
2464  if (ev != 0) {
2465  kmr_free(ev, (sizeof(struct kmr_kv_box) * (size_t)evsz));
2466  }
2467  return MPI_SUCCESS;
2468 }
2469 
2470 static int
2471 kmr_reduce_threading(_Bool stop_when_some_added,
2472  KMR_KVS *kvi, KMR_KVS *kvo,
2473  void *arg, struct kmr_option opt, kmr_redfn_t r)
2474 {
2475  int cc;
2476  if (kmr_ckpt_enabled(kvi->c.mr)) {
2477  if (kmr_ckpt_progress_init(kvi, kvo, opt)) {
2478  if (kvo != 0) {
2479  kmr_add_kv_done(kvo);
2480  }
2481  return MPI_SUCCESS;
2482  }
2483  }
2484  struct kmr_option inspect = {
2485  .inspect = 1,
2486  .nothreading = opt.nothreading
2487  };
2488  assert(kvi->c.current_block == 0);
2489  long cnt = kvi->c.element_count;
2490  struct kmr_kv_box *
2491  ev = kmr_malloc(sizeof(struct kmr_kv_box) * (size_t)cnt);
2492  int kcdc = kmr_ckpt_disable_ckpt(kvi->c.mr);
2493  cc = kmr_map(kvi, 0, ev, inspect, kmr_copy_record_fn);
2494  assert(cc == MPI_SUCCESS);
2495  kmr_ckpt_enable_ckpt(kvi->c.mr, kcdc);
2496 
2497  kmr_sorter_t cmp = kmr_choose_sorter(kvi);
2498  long *runs = kmr_malloc(sizeof(long) * (size_t)cnt);
2499  long nruns = 0;
2500  {
2501  long i = 0;
2502 #if 0
2503  if (kmr_ckpt_enabled(kvi->c.mr)) {
2504  i = kmr_ckpt_first_unprocessed_kv(kvi->c.mr);
2505  kmr_ckpt_save_kvo_block_init(kvi->c.mr, kvo);
2506  }
2507 #endif
2508  while (i < cnt) {
2509  do {
2510  i++;
2511  if (i == cnt) {
2512  break;
2513  }
2514  cc = (*cmp)(&ev[i - 1], &ev[i]);
2515  assert(cc <= 0);
2516  } while (cc == 0);
2517  assert(nruns < cnt);
2518  runs[nruns] = i;
2519  nruns++;
2520  }
2521  assert(i == cnt && (cnt == 0 || runs[nruns - 1] == cnt));
2522  }
2523  {
2524  if (kvo != 0) {
2525  kvo->c.under_threaded_operation = 1;
2526  }
2527  KMR *mr = kvi->c.mr;
2528  _Bool skip = 0;
2529  KMR_OMP_PARALLEL_FOR_
2530  for (long k = 0; k < nruns; k++) {
2531  /* (Access to stop is sloppy). */
2532  if (!skip) {
2533  long j = (k == 0 ? 0 : runs[k - 1]);
2534  long i = runs[k];
2535  assert(j < i);
2536  double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
2537  int ccx = (*r)(&ev[j], (i - j), kvi, kvo, arg);
2538  double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
2539  if (ccx != MPI_SUCCESS) {
2540  char ee[80];
2541  snprintf(ee, sizeof(ee),
2542  "Reduce-fn returned with error cc=%d", ccx);
2543  kmr_error(mr, ee);
2544  }
2545  if (mr->log_traces != 0) {
2546  kmr_log_reduce(mr, kvi, ev, (i - j), r, (t1 - t0));
2547  }
2548 #if 0
2549  if (kmr_ckpt_enabled(mr)) {
2550  KMR_OMP_CRITICAL_
2551  {
2552  kmr_ckpt_save_kvo_block_add(mr, kvo, (i - j));
2553  }
2554  }
2555 #endif
2556  if (stop_when_some_added) {
2557  _Bool done;
2558  if (mr->stop_at_some_check_globally) {
2559  done = 0;
2560  } else {
2561  done = (kvo->c.element_count != 0);
2562  }
2563  if (done) {
2564  KMR_OMP_CRITICAL_
2565  {
2566  skip = 1;
2567  }
2568  }
2569  }
2570  }
2571  }
2572  if (kvo != 0) {
2573  kvo->c.under_threaded_operation = 0;
2574  }
2575  }
2576  if (kmr_ckpt_enabled(kvi->c.mr)) {
2577 #if 0
2578  kmr_ckpt_save_kvo_block_fin(mr, kvo);
2579 #endif
2580  kmr_ckpt_save_kvo_whole(kvi->c.mr, kvo);
2581  }
2582  if (kvo != 0) {
2583  kmr_add_kv_done(kvo);
2584  }
2585  kmr_free(runs, (sizeof(long) * (size_t)cnt));
2586  kmr_free(ev, (sizeof(struct kmr_kv_box) * (size_t)cnt));
2587  return MPI_SUCCESS;
2588 }
2589 
2590 /** Reduces key-value pairs. It does not include shuffling, and thus,
2591  it requires being preceded by shuffling. Or, it works on local
2592  data (as a local combiner), if it is not preceded by shuffling.
2593  It always consumes the input key-value stream KVI. An output
2594  key-value stream KVO can be null. It passes an array of key-value
2595  pairs to a reduce-function whose keys are all equal (equality is
2596  by bits). The pointer ARG is just passed to a reduce-function as
2597  a general argument, where accesses to it should be race-free,
2598  since a reduce-function is called by threads by default. R is a
2599  reduce-function. See the description on the type ::kmr_redfn_t.
2600  A reduce-function may see a different input key-value stream
2601  (internally created one) instead of the one given. During
2602  reduction, it first scans adjacent equal keys, then calls a given
2603  reduce-function. Effective-options: NOTHREADING, INSPECT, TAKE_CKPT.
2604  See struct kmr_option. */
2605 
2606 int
2607 kmr_reduce9(_Bool stop_when_some_added,
2608  KMR_KVS *kvi, KMR_KVS *kvo,
2609  void *arg, struct kmr_option opt, kmr_redfn_t r,
2610  const char *file, const int line, const char *func)
2611 {
2612  kmr_assert_kvs_ok(kvi, kvo, 1, 0);
2613  KMR *mr = kvi->c.mr;
2614  struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
2615  .take_ckpt = 1};
2616  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
2617  struct kmr_option i_opt = kmr_copy_options_i_part(opt);
2618  struct kmr_option o_opt = kmr_copy_options_o_part(opt);
2619 
2620  kmr_trace_entry_t * kte_start = 0;
2621  if (mr->kmrviz_trace) {
2622  kte_start = kmr_trace_add_entry(mr, KMR_TRACE_EVENT_REDUCE, 0,
2623  kvi, kvo);
2624  }
2625  if (kmr_ckpt_enabled(mr)) {
2626  kmr_ckpt_lock_start(mr);
2627  }
2628 
2629  /* Sort for reduction. */
2630 
2631  enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvi->c.key_data);
2632  enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvi->c.value_data);
2633  KMR_KVS *kvs1 = kmr_create_kvs(mr, keyf, valf);
2634 
2635  /* Make checkpoint for kvi and kvs1. */
2636 
2637  kmr_sort_locally_lo(kvi, kvs1, 0, 0, i_opt);
2638  KMR_DEBUGX(kmr_assert_sorted(kvs1, 1, 0, 0));
2639 
2640  /* Make checkpoint for kvs1 and kvo. */
2641 
2642  struct kmr_code_line info;
2643  if (mr->atwork == 0) {
2644  info.file = file;
2645  info.func = func;
2646  info.line = line;
2647  mr->atwork = &info;
2648  }
2649  int cc;
2650  if (mr->single_thread || opt.nothreading) {
2651  cc = kmr_reduce_nothreading(kvs1, kvo, arg, o_opt, r);
2652  } else {
2653  cc = kmr_reduce_threading(stop_when_some_added,
2654  kvs1, kvo, arg, o_opt, r);
2655  }
2656  if (mr->atwork == &info) {
2657  mr->atwork = 0;
2658  }
2659 
2660  //kmr_assert_on_tail_marker(kvo);
2661  kmr_assert_on_tail_marker(kvs1);
2662  kmr_free_kvs(kvs1);
2663 
2664  if (kmr_ckpt_enabled(mr)) {
2667  }
2668  if (mr->kmrviz_trace) {
2669  if (!opt.inspect) {
2670  kvi = 0;
2671  }
2672  kmr_trace_add_entry(mr, KMR_TRACE_EVENT_REDUCE, kte_start, kvi, kvo);
2673  }
2674 
2675  return cc;
2676 }
2677 
2678 /** Calls a reduce-function once as if all key-value pairs had the
2679  same key. See kmr_reduce(). Effective-options: INSPECT, TAKE_CKPT.
2680  See struct kmr_option. */
2681 
2682 int
2684  void *arg, struct kmr_option opt, kmr_redfn_t r)
2685 {
2686  kmr_assert_kvs_ok(kvi, kvo, 1, 0);
2687  KMR *mr = kvi->c.mr;
2688  assert(kvi->c.current_block == 0);
2689  struct kmr_option kmr_supported = {.inspect = 1, .take_ckpt = 1};
2690  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
2691 
2692  if (kmr_ckpt_enabled(mr)) {
2693  if (kmr_ckpt_progress_init(kvi, kvo, opt)) {
2694  if (kvo != 0) {
2695  kmr_add_kv_done(kvo);
2696  }
2697  if (!opt.inspect) {
2698  kmr_free_kvs(kvi);
2699  }
2700  return MPI_SUCCESS;
2701  }
2702  }
2703  int kcdc = kmr_ckpt_disable_ckpt(mr);
2704 
2705  long cnt = kvi->c.element_count;
2706  struct kmr_kv_box *
2707  ev = kmr_malloc(sizeof(struct kmr_kv_box) * (size_t)cnt);
2708 
2709  if (cnt > 0) {
2710  int cc;
2711 
2712  struct kmr_option inspect = {.inspect = 1};
2713  cc = kmr_map(kvi, 0, ev, inspect, kmr_copy_record_fn);
2714  assert(cc == MPI_SUCCESS);
2715 
2716  cc = (*r)(&ev[0], cnt, kvi, kvo, arg);
2717  if (cc != MPI_SUCCESS) {
2718  char ee[80];
2719  snprintf(ee, sizeof(ee),
2720  "Reduce-fn returned with error cc=%d", cc);
2721  kmr_error(mr, ee);
2722  }
2723  }
2724 
2725  kmr_ckpt_enable_ckpt(mr, kcdc);
2726  if (kmr_ckpt_enabled(mr)) {
2727  kmr_ckpt_save_kvo_whole(mr, kvo);
2728  }
2729 
2730  if (kvo != 0) {
2731  kmr_add_kv_done(kvo);
2732  }
2733  kmr_free(ev, (sizeof(struct kmr_kv_box) * (size_t)cnt));
2734  if (!opt.inspect) {
2735  kmr_free_kvs(kvi);
2736  }
2737 
2738  if (kmr_ckpt_enabled(mr)) {
2740  }
2741  return MPI_SUCCESS;
2742 }
2743 
2744 /* ================================================================ */
2745 
2746 /** Concatenates a number of KVSes to one. Inputs are consumed. (It
2747  is fast because the key-value data is stored internally as a list
2748  of data blocks, and this routine just links them). Note that
2749  concatenating KVS can in effect be performed by consecutive calls
2750  to kmr_map() with the KEEP_OPEN option using the same output KVS.
2751  Effective-options: none. */
2752 
2753 int
2754 kmr_concatenate_kvs(KMR_KVS *kvs[], int nkvs, KMR_KVS *kvo,
2755  struct kmr_option opt)
2756 {
2757  for (int i = 0; i < nkvs; i++) {
2758  kmr_assert_i_kvs_ok(kvs[i], 1);
2759  }
2760  kmr_assert_o_kvs_ok(kvo, 1);
2761  if (kvo->c.element_count > 0) {
2762  KMR *mr = kvo->c.mr;
2763  kmr_error(mr, "kmr_concatenate_kvs: Output kvs has entries");
2764  }
2765  kmr_check_fn_options(kvo->c.mr, kmr_noopt, opt, __func__);
2766 
2767  struct kmr_kvs_block *storage = 0;
2768  long elements = 0;
2769  size_t netsize = 0;
2770  long blocks = 0;
2771 
2772  struct kmr_kvs_block *p = 0;
2773  for (int i = 0; i < nkvs; i++) {
2774  elements += kvs[i]->c.element_count;
2775  netsize += kvs[i]->c.storage_netsize;
2776  blocks += kvs[i]->c.block_count;
2777 
2778  struct kmr_kvs_block *bb = kvs[i]->c.first_block;
2779  if (bb != 0) {
2780  kvs[i]->c.first_block = 0;
2781  if (p == 0) {
2782  assert(storage == 0);
2783  p = bb;
2784  storage = bb;
2785  } else {
2786  assert(blocks != 0 && p->next == 0);
2787  p->next = bb;
2788  }
2789  }
2790  if (p != 0) {
2791  while (p->next != 0) {
2792  p = p->next;
2793  }
2794  }
2795  kmr_free_kvs(kvs[i]);
2796  }
2797 
2798  kvo->c.first_block = storage;
2799  kvo->c.element_count = elements;
2800  kvo->c.storage_netsize = netsize;
2801  kvo->c.block_count = blocks;
2802 
2803  /*kmr_add_kv_done(kvo);*/
2804  kvo->c.stowed = 1;
2805  kvo->c.current_block = 0;
2806  kvo->c.adding_point = 0;
2807  assert(kvo->c.block_count == 0 || kvo->c.first_block != 0);
2808 
2809  return MPI_SUCCESS;
2810 }
2811 
2812 /** Finds the last entry of a key-value stream. It returns null when
2813  a key-value stream is empty. It sequentially scans all the
2814  entries and slow. */
2815 
2816 struct kmr_kvs_entry *
2818 {
2819  kmr_assert_kvs_ok(kvs, 0, 1, 0);
2820  assert(kvs->c.magic == KMR_KVS_ONCORE);
2821 #if 0
2822  long cnt = kvs->c.element_count;
2823  kvs->c.current_block = kvs->c.first_block;
2824  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2825  struct kmr_kvs_entry *o = 0;
2826  for (long i = 0; i < cnt && e != 0; i++) {
2827  o = e;
2828  e = kmr_kvs_next(kvs, e, 0);
2829  }
2830  kvs->c.current_block = 0;
2831  return o;
2832 #else
2833  if (kvs->c.element_count == 0) {
2834  return 0;
2835  } else {
2836  struct kmr_kvs_block *b;
2837  for (b = kvs->c.first_block; b->next != 0; b = b->next);
2838  kvs->c.current_block = b;
2839  struct kmr_kvs_entry *e;
2840  struct kmr_kvs_entry *o;
2841  e = kmr_kvs_first_entry(kvs, b);
2842  o = 0;
2843  long cnt = b->partial_element_count;
2844  for (long i = 0; i < cnt && e != 0; i++) {
2845  o = e;
2846  e = kmr_kvs_next(kvs, e, 1);
2847  }
2848  kvs->c.current_block = 0;
2849  return o;
2850  }
2851 #endif
2852 }
2853 
2854 /** Fills local key-value entries in an array for inspection. The
2855  returned pointers point to the inside of the KVS. It returns up
2856  to N entries, and the array EV should be as large as N. It
2857  implies inspect. */
2858 
2859 int
2861 {
2862  kmr_assert_kvs_ok(kvs, 0, 1, 0);
2863  assert(kvs->c.magic == KMR_KVS_ONCORE);
2864  long cnt = MIN(n, kvs->c.element_count);
2865  kvs->c.current_block = kvs->c.first_block;
2866  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2867  for (long i = 0; i < cnt && e != 0; i++) {
2868  ev[i] = e;
2869  e = kmr_kvs_next(kvs, e, 0);
2870  }
2871  kvs->c.current_block = 0;
2872  return MPI_SUCCESS;
2873 }
2874 
2875 /** Fills local key-value entries in an array of kmr_kv_box for
2876  inspection. While kmr_retrieve_kvs_entries() returns raw entries
2877  but kmr_retrieve_kv_box_entries() returns entries of kmr_kv_box.
2878  The returned pointers point to the inside of the KVS. It returns
2879  up to N entries, and the array EV should be as large as N. It
2880  implies inspect. */
2881 
2882 int
2884 {
2885  kmr_assert_kvs_ok(kvs, 0, 1, 0);
2886  assert(kvs->c.magic == KMR_KVS_ONCORE);
2887  long cnt = MIN(n, kvs->c.element_count);
2888  kvs->c.current_block = kvs->c.first_block;
2889  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2890  for (long i = 0; i < cnt && e != 0; i++) {
2891  ev[i] = kmr_pick_kv(e, kvs);
2892  e = kmr_kvs_next(kvs, e, 0);
2893  }
2894  kvs->c.current_block = 0;
2895  return MPI_SUCCESS;
2896 }
2897 
2898 /** Fills keyed records in an array for sorting. The array EV should
2899  be as large as N. It implies inspect. */
2900 
2901 int
2903  long n, _Bool shuffling, _Bool ranking)
2904 {
2905  kmr_assert_kvs_ok(kvs, 0, 1, 0);
2906  assert(kvs->c.magic == KMR_KVS_ONCORE);
2907  long cnt = MIN(n, kvs->c.element_count);
2908  kvs->c.current_block = kvs->c.first_block;
2909  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2910  for (long i = 0; i < cnt && e != 0; i++) {
2911  struct kmr_kv_box kv = kmr_pick_kv(e, kvs);
2912  if (shuffling) {
2913  ev[i].v = (ranking ? kv.k.i : kmr_pitch_rank(kv, kvs));
2914  ev[i].e = e;
2915  } else {
2916  ev[i].v = kmr_stable_key(kv, kvs);
2917  ev[i].e = e;
2918  }
2919  e = kmr_kvs_next(kvs, e, 0);
2920  }
2921  kvs->c.current_block = 0;
2922  return MPI_SUCCESS;
2923 }
2924 
2925 /** Returns a minimum byte size of the field: 8 for INTEGER and
2926  FLOAT8, 0 for others. */
2927 
2928 int
2930 {
2931  switch (f) {
2932  case KMR_KV_BAD:
2933  kmr_error(mr, "kmr_legal_minimum_field_size: Bad field");
2934  return 0;
2935  case KMR_KV_INTEGER:
2936  return sizeof(long);
2937  case KMR_KV_FLOAT8:
2938  return sizeof(double);
2939  case KMR_KV_OPAQUE:
2940  case KMR_KV_CSTRING:
2941  case KMR_KV_POINTER_OWNED:
2942  case KMR_KV_POINTER_UNMANAGED:
2943  return 0;
2944  default:
2945  kmr_error(mr, "kmr_legal_minimum_field_size: Bad field");
2946  return 0;
2947  }
2948 }
2949 
2950 /** Scans every key-value with a reduce-function locally
2951  (independently on each rank). It works in the order in the KVS.
2952  It ignores differences of the keys. It gets the start value from
2953  CARRYIN and puts the final value to CARRYOUT. The output has the
2954  same number of entries as the input. The carry-in and carry-out
2955  have one entry. The carry-out can be null. The reduce-function
2956  is called on each key-value pair as the right operand with the
2957  previous value as the left operand, and it should output a single
2958  value. The key part of the output is ignored and a pair is stored
2959  under the original key. */
2960 
2961 int
2963  KMR_KVS *kvo, KMR_KVS *carryout, kmr_redfn_t r)
2964 {
2965  int cc;
2966  KMR *mr = kvo->c.mr;
2967  enum kmr_kv_field keyf = kvi->c.key_data;
2968  enum kmr_kv_field valf = kvi->c.value_data;
2969 
2970  long cnt = kvi->c.element_count;
2971  size_t evsz = (sizeof(struct kmr_keyed_record) * (size_t)cnt);
2972  struct kmr_keyed_record *ev = kmr_malloc(evsz);
2973  cc = kmr_retrieve_keyed_records(kvi, ev, cnt, 0, 0);
2974  assert(cc == MPI_SUCCESS);
2975 
2976  KMR_KVS *lastvalue = carryin;
2977  for (long i = 0; i < cnt; i++) {
2978  struct kmr_kv_box kv;
2979  cc = kmr_take_one(lastvalue, &kv);
2980  assert(cc == MPI_SUCCESS);
2981  struct kmr_kv_box bx[2];
2982  bx[0] = kv;
2983  bx[1] = kmr_pick_kv(ev[i].e, kvi);
2984  KMR_KVS *xs = kmr_create_kvs(mr, keyf, valf);
2985  cc = (*r)(bx, 2, kvi, xs, 0);
2986  if (cc != MPI_SUCCESS) {
2987  char ee[80];
2988  snprintf(ee, sizeof(ee),
2989  "Reduce-fn returned with error cc=%d", cc);
2990  kmr_error(mr, ee);
2991  }
2992  cc = kmr_add_kv_done(xs);
2993  assert(cc == MPI_SUCCESS);
2994  /* Put the last value as it is a non-inclusive scan. */
2995  bx[0].klen = bx[1].klen;
2996  bx[0].k = bx[1].k;
2997  cc = kmr_add_kv(kvo, bx[0]);
2998  assert(cc == MPI_SUCCESS);
2999  kmr_free_kvs(lastvalue);
3000  lastvalue = xs;
3001  }
3002  cc = kmr_add_kv_done(kvo);
3003  assert(cc == MPI_SUCCESS);
3004 
3005  if (carryout != 0) {
3006  struct kmr_kv_box kv;
3007  cc = kmr_take_one(lastvalue, &kv);
3008  assert(cc == MPI_SUCCESS);
3009  cc = kmr_add_kv(carryout, kv);
3010  assert(cc == MPI_SUCCESS);
3011  cc = kmr_add_kv_done(carryout);
3012  assert(cc == MPI_SUCCESS);
3013  }
3014  kmr_free_kvs(lastvalue);
3015  kmr_free_kvs(kvi);
3016 
3017  if (ev != 0) {
3018  kmr_free(ev, evsz);
3019  }
3020 
3021  return MPI_SUCCESS;
3022 }
3023 
3024 /*
3025 Copyright (C) 2012-2018 RIKEN R-CCS
3026 This library is distributed WITHOUT ANY WARRANTY. This library can be
3027 redistributed and/or modified under the terms of the BSD 2-Clause License.
3028 */
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:679
int kmr_ckpt_progress_init(KMR_KVS *, KMR_KVS *, struct kmr_option)
It initializes a progress of MapReduce checkpointing.
Definition: kmrckpt.c:2754
Key-Value Stream (abstract).
Definition: kmr.h:632
int kmr_add_kv1(KMR_KVS *kvs, void *k, int klen, void *v, int vlen)
Adds a key-value pair as given directly by a pointer.
Definition: kmrbase.c:837
int kmr_save_kvs(KMR_KVS *kvs, void **dataq, size_t *szq, struct kmr_option opt)
Packs locally the contents of a key-value stream to a byte array.
Definition: kmrbase.c:1026
kmr_trace_entry_t * kmr_trace_add_entry(KMR *mr, kmr_trace_event_t ev, kmr_trace_entry_t *pre, KMR_KVS *kvi, KMR_KVS *kvo)
Add an entry to the trace.
Definition: kmrtrace.c:174
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2094
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *identifying_name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:168
Utilities Private Part (do not include from applications).
int kmr_add_identity_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Adds a given key-value pair unmodified.
Definition: kmrbase.c:995
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:658
int kmr_finish_swf(KMR *mr)
Clears the lanes of simple workflow.
Definition: kmrwfmap.c:750
void kmr_ckpt_progress_fin(KMR *)
It finalizes the progress of MapReduce checkpointing.
Definition: kmrckpt.c:2846
static KMR_KVS * kmr_create_raw_kvs(KMR *mr, const KMR_KVS *_similar)
Makes a new key-value stream (type KMR_KVS).
Definition: kmrbase.c:476
void kmr_ckpt_restore_ckpt(KMR_KVS *)
It restores checkpoint data to kvs.
Definition: kmrckpt.c:2558
int kmr_map_skipping(long from, long stride, long limit, _Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps by skipping the number of entries.
Definition: kmrbase.c:1192
int kmr_add_string(KMR_KVS *kvs, const char *k, const char *v)
Adds a key-value pair of strings.
Definition: kmrbase.c:971
static int kmr_icmp(const void *a0, const void *a1)
Compares the key field of keyed-records for qsort/bsearch.
Definition: kmrbase.c:1873
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:177
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_sort_locally(KMR_KVS *kvi, KMR_KVS *kvo, _Bool shuffling, struct kmr_option opt)
Reorders key-value pairs in a single rank.
Definition: kmrbase.c:2051
Key-Value Stream.
Definition: kmr.h:503
int kmr_map9(_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m, const char *file, const int line, const char *func)
Maps simply.
Definition: kmrbase.c:1347
int kmr_ckpt_enabled(KMR *)
Check if checkpoint/restart is enabled.
Definition: kmrckpt.c:2479
static const size_t kmr_kvs_entry_header
Size of an Entry Header.
Definition: kmr.h:425
Keyed-Record for Sorting.
Definition: kmr.h:415
int kmr_add_kv_space(KMR_KVS *kvs, const struct kmr_kv_box kv, void **keyp, void **valuep)
Adds a key-value pair, but only allocates a space and returns the pointers to the key and the value p...
Definition: kmrbase.c:901
long kmr_ckpt_first_unprocessed_kv(KMR *)
It returns the index of the first unprocessed key-value in the input KVS.
Definition: kmrckpt.c:2536
int kmr_move_kvs(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Moves the contents of the input KVI to the output KVO.
Definition: kmrbase.c:592
Definition: kmr.h:391
int kmr_map_rank_by_rank(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps sequentially with rank by rank for debugging.
Definition: kmrbase.c:1397
KMR Context.
Definition: kmr.h:247
struct kmr_kvs_entry * kmr_find_kvs_last_entry(KMR_KVS *kvs)
Finds the last entry of a key-value stream.
Definition: kmrbase.c:2817
void kmr_ckpt_save_kvo_whole(KMR *, KMR_KVS *)
It saves all key-value pairs in the output KVS to a checkpoint data file.
Definition: kmrckpt.c:2639
Definition: kmrtrace.h:27
int kmr_initialize_mpi(int *refargc, char ***refargv)
Checks the initialization state of MPI, and initializes MPI when not.
Definition: kmrbase.c:134
int kmr_allgatherv(KMR *mr, _Bool rankzeroonly, void *sbuf, long scnt, void *rbuf, long *rcnts, long *rdsps)
All-gathers data, or gathers data when RANKZEROONLY.
Definition: kmratoa.c:74
void kmr_ckpt_save_kvo_block_add(KMR *, KMR_KVS *, long)
It adds a new block of key-value pairs of the output KVS to the checkpoint data file.
Definition: kmrckpt.c:2671
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
Definition: kmrbase.c:1514
kmr_kv_field
Datatypes of Keys or Values.
Definition: kmr.h:368
int kmr_retrieve_kv_box_entries(KMR_KVS *kvs, struct kmr_kv_box *ev, long n)
Fills local key-value entries in an array of kmr_kv_box for inspection.
Definition: kmrbase.c:2883
void kmr_isort(void *a, size_t n, size_t es, int depth)
Sorts by comparator on long integers.
Definition: kmrisort.c:292
int kmr_take_one(KMR_KVS *kvi, struct kmr_kv_box *kv)
Extracts a single key-value pair locally in the key-value stream KVI.
Definition: kmrbase.c:1427
int kmr_concatenate_kvs(KMR_KVS *kvs[], int nkvs, KMR_KVS *kvo, struct kmr_option opt)
Concatenates a number of KVSes to one.
Definition: kmrbase.c:2754
void kmr_ckpt_create_context(KMR *)
Initialize checkpoint context.
Definition: kmrckpt.c:119
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:401
int kmr_reduce9(_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r, const char *file, const int line, const char *func)
Reduces key-value pairs.
Definition: kmrbase.c:2607
Options to Mapping by Spawns.
Definition: kmr.h:708
void kmr_ckpt_save_kvo_block_fin(KMR *, KMR_KVS *)
It finalizes saving block of key-value pairs of the output KVS to the checkpoint data file...
Definition: kmrckpt.c:2686
State during kmr_map_ms().
Definition: kmr.h:453
void kmr_ckpt_free_context(KMR *)
Free checkpoint context.
Definition: kmrckpt.c:162
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:939
int kmr_ckpt_disable_ckpt(KMR *)
It temporally disables checkpoint/restart.
Definition: kmrckpt.c:2495
int kmr_add_kv_pushoff(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmraltkvs.c:460
void kmr_trace_initialize(KMR *mr)
Initialize a trace.
Definition: kmrtrace.c:153
int kmr_assert_sorted(KMR_KVS *kvi, _Bool locally, _Bool shuffling, _Bool ranking)
Checks a key-value stream is sorted.
Definition: kmrutil.c:717
void kmr_trace_finalize(KMR *mr)
Finalize a trace.
Definition: kmrtrace.c:162
void kmr_ckpt_save_kvo_block_init(KMR *, KMR_KVS *)
It initializes saving blocks of key-value pairs of the output KVS to a checkpoint data file...
Definition: kmrckpt.c:2655
int kmr_retrieve_keyed_records(KMR_KVS *kvs, struct kmr_keyed_record *ev, long n, _Bool shuffling, _Bool ranking)
Fills keyed records in an array for sorting.
Definition: kmrbase.c:2902
#define xassert(X)
Asserts and aborts, but it cannot be disabled.
Definition: kmrdp.cpp:51
#define kmr_realloc(P, Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:181
int kmr_scan_locally(KMR_KVS *kvi, KMR_KVS *carryin, KMR_KVS *kvo, KMR_KVS *carryout, kmr_redfn_t r)
Scans every key-value with a reduce-function locally (independently on each rank).
Definition: kmrbase.c:2962
int kmr_retrieve_kvs_entries(KMR_KVS *kvs, struct kmr_kvs_entry **ev, long n)
Fills local key-value entries in an array for inspection.
Definition: kmrbase.c:2860
KMR Interface.
int kmr_add_kv_done_pushoff(KMR_KVS *kvs)
Marks finished adding key-value pairs, called from kmr_add_kv_done().
Definition: kmraltkvs.c:555
Options to Mapping on Files.
Definition: kmr.h:683
Unit-Sized Storage.
Definition: kmr.h:383
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2240
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv().
Definition: kmrimpl.h:551
int(* kmr_redfn_t)(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg)
Reduce-function Type.
Definition: kmr.h:747
int kmr_gather_sizes(KMR *mr, long siz, long *rbuf)
Calls all-gather for collecting one long-integer.
Definition: kmratoa.c:62
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
Definition: kmrbase.c:1460
int kmr_ckpt_enable_ckpt(KMR *, int)
It temporally enables checkpoint/restart which has been disabled by calling kmr_ckpt_disable_ckpt().
Definition: kmrckpt.c:2516
void kmr_free_string(char *s)
Frees a string strduped.
Definition: kmrutil.c:611
N-Tuple Argument.
Definition: kmr.h:787
static void kmr_poke_kv(struct kmr_kvs_entry *e, const struct kmr_kv_box kv, struct kmr_kv_box *xkv, const KMR_KVS *kvs, _Bool reserve_space_only)
Stores a key-value pair at the entry E in the store – a reverse of kmr_pick_kv().
Definition: kmrimpl.h:599
Information of Source Code Line.
Definition: kmr.h:107
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
Definition: kmr.h:736
void kmr_ckpt_lock_start(KMR *)
Define the start position of code region that is referred when restart.
Definition: kmrckpt.c:1934
int kmr_copy_mpi_info(MPI_Info src, MPI_Info dst)
Copies contents of MPI_Info.
Definition: kmrutil.c:2408
KMR_KVS * kmr_create_kvs7(KMR *mr, enum kmr_kv_field kf, enum kmr_kv_field vf, struct kmr_option opt, const char *file, const int line, const char *func)
Makes a new key-value stream with the specified field data-types.
Definition: kmrbase.c:568
int kmr_alltoallv(KMR *mr, void *sbuf, long *scounts, long *sdsps, void *rbuf, long *rcounts, long *rdsps)
Does all-to-all-v, but it takes the arguments by long-integers.
Definition: kmratoa.c:124
int kmr_restore_kvs(KMR_KVS *kvo, void *data, size_t sz_, struct kmr_option opt)
Unpacks locally the contents of a key-value stream from a byte array.
Definition: kmrbase.c:1092
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:809
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:367
int kmr_reduce_as_one(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Calls a reduce-function once as if all key-value pairs had the same key.
Definition: kmrbase.c:2683
void kmr_ckpt_remove_ckpt(KMR_KVS *)
It removes checkpoint data file.
Definition: kmrckpt.c:2613
void kmr_ckpt_lock_finish(KMR *)
Define the end position of code region that is referred when restart.
Definition: kmrckpt.c:1945
KMRViz tracing Support.
int kmr_exchange_sizes(KMR *mr, long *sbuf, long *rbuf)
Calls all-to-all to exchange one long-integer.
Definition: kmratoa.c:50
int kmr_legal_minimum_field_size(KMR *mr, enum kmr_kv_field f)
Returns a minimum byte size of the field: 8 for INTEGER and FLOAT8, 0 for others. ...
Definition: kmrbase.c:2929