23 #include "../config.h"    29 const int kmr_version = KMR_H;
    31 #define MIN(a,b) (((a)<(b))?(a):(b))    32 #define MAX(a,b) (((a)>(b))?(a):(b))    38 #define BLOCK_SIZE (64 * 1024 * 1024)    43 #define MAP_PARK_SIZE (1024)    48 #define PUSHOFF_SIZE (64 * 1024)    55 kmr_assert_on_tail_marker(
KMR_KVS *kvs)
    57     if (kvs != 0 && kvs->c.block_count == 1) {
    59         size_t netsz = kvs->c.storage_netsize;
    61         assert((((intptr_t)e) & 3) == 0
    62                && e->klen == -1 && e->vlen == -1);
    72 kmr_init_2(
int ignore)
    75     assert(
sizeof(
long) == 
sizeof(
size_t)
    76            && 
sizeof(
long) == 
sizeof(ssize_t)
    77            && 
sizeof(
long) == 
sizeof(off_t)
    78            && 
sizeof(
long) == 
sizeof(uint64_t)
    79            && 
sizeof(
long) >= 
sizeof(intptr_t)
    80            && 
sizeof(
long) >= 
sizeof(
void *));
    81     assert(kmr_check_alignment(offsetof(
struct kmr_kvs_entry, c)));
    82     assert(kmr_check_alignment(offsetof(
struct kmr_kvs_block, data)));
    84     assert(
sizeof(
struct kmr_option) == 
sizeof(
long)
    89     cc = MPI_Type_get_extent(MPI_LONG, &lb, &extent);
    90     assert(cc == MPI_SUCCESS);
    91     assert(lb == 0 && extent == 8);
    96         int tid = omp_get_thread_num();
   110     union {
struct kmr_option o; 
unsigned long i;} opt0 = {.o = opt};
   111     union {
struct kmr_file_option o; 
unsigned long i;} fopt0 = {.o = fopt};
   112     opt0.o.rank_zero = 0;
   113     fopt0.o.shuffle_names = 0;
   114     assert(kf == KMR_KV_POINTER_UNMANAGED
   115            && opt.rank_zero && fopt.shuffle_names
   116            && opt0.i == 0 && fopt0.i == 0);
   138     cc = MPI_Initialized(&ok);
   139     if (cc == MPI_SUCCESS && ok != 0) {
   141     } 
else if (cc == MPI_SUCCESS && ok == 0) {
   143         cc = MPI_Init_thread(refargc, refargv, MPI_THREAD_SERIALIZED, &thlv);
   144         return (cc == MPI_SUCCESS);
   169                    const char *identifying_name)
   173     KMR_DEBUGX(memset(mr, 0, 
sizeof(
struct kmr_ctx)));
   175     cc = MPI_Comm_size(comm, &mr->nprocs);
   176     assert(cc == MPI_SUCCESS);
   177     cc = MPI_Comm_rank(comm, &mr->rank);
   178     assert(cc == MPI_SUCCESS);
   179     cc = MPI_Comm_dup(comm, &mr->comm);
   180     if (cc != MPI_SUCCESS) {
   181         kmr_error_mpi(mr, 
"MPI_Comm_dup", cc);
   182         MPI_Abort(MPI_COMM_WORLD, 1);
   186     int omp_thrd = omp_get_thread_limit();
   190     assert(omp_thrd >= 1);
   193     cc = MPI_Query_thread(&mpi_thrd);
   194     assert(cc == MPI_SUCCESS);
   195     assert(mpi_thrd == MPI_THREAD_SINGLE
   196            || mpi_thrd == MPI_THREAD_FUNNELED
   197            || mpi_thrd == MPI_THREAD_SERIALIZED
   198            || mpi_thrd == MPI_THREAD_MULTIPLE);
   199     if (mpi_thrd == MPI_THREAD_SINGLE
   200         || mpi_thrd == MPI_THREAD_FUNNELED) {
   203             char *s = ((mpi_thrd == MPI_THREAD_SINGLE)
   204                        ? 
"MPI_THREAD_SINGLE"   205                        : 
"MPI_THREAD_FUNNELED");
   206             snprintf(ee, 
sizeof(ee), 
"Thread support of MPI is low: %s", s);
   207             kmr_warning(mr, 1, ee);
   214     mr->ckpt_kvs_id_counter = 0;
   217     mr->ckpt_selective = 0;
   218     mr->ckpt_no_fsync = 0;
   228     mr->mapper_park_size = MAP_PARK_SIZE;
   229     mr->preset_block_size = BLOCK_SIZE;
   230     mr->malloc_overhead = (int)
sizeof(
void *);
   232     mr->atoa_threshold = 512;
   233     mr->atoa_size_limit = 0;
   234     mr->atoa_requests_limit = 0;
   236     mr->sort_trivial = 100000;
   237     mr->sort_threshold = 100L;
   238     mr->sort_sample_factor = 10000;
   239     mr->sort_threads_depth = 5;
   241     mr->file_io_block_size = (1024 * 1024);
   243     mr->rlimit_nofile = -1;
   245     mr->pushoff_block_size = PUSHOFF_SIZE;
   246     mr->pushoff_poll_rate = 0;
   248 #if defined(KMRLIBDIR)   249     mr->kmr_installation_path = KMRLIBDIR;
   251     mr->kmr_installation_path = 0;
   253     mr->spawn_watch_program = 0;
   254     mr->spawn_watch_prefix = 0;
   255     mr->spawn_watch_host_name = 0;
   256     mr->spawn_max_processes = 0;
   257     mr->spawn_watch_af = 4;
   258     mr->spawn_watch_port_range[0] = 0;
   259     mr->spawn_watch_port_range[1] = 0;
   260     mr->spawn_gap_msec[0] = 1000;
   261     mr->spawn_gap_msec[1] = 10000;
   262     mr->spawn_watch_accept_onhold_msec = (60 * 1000);
   264     mr->spawn_self = MPI_COMM_NULL;
   265     mr->spawn_retry_limit = 20;
   266     mr->spawn_retry_gap_msec = (15 * 1000);
   268     mr->simple_workflow = 0;
   269     mr->swf_spawner_so = 0;
   270     mr->swf_spawner_library = 0;
   271     mr->swf_args_size = 0;
   273     mr->swf_record_history = 1;
   274     mr->swf_debug_master = 0;
   279     mr->single_thread = 0;
   280     mr->one_step_sort = 0;
   282     mr->trace_sorting = 0;
   283     mr->trace_file_io = 0;
   284     mr->trace_map_ms = 0;
   285     mr->trace_map_spawn = 0;
   286     mr->trace_alltoall = 0;
   290     mr->file_io_dummy_striping = 1;
   291     mr->file_io_always_alltoallv = 0;
   292     mr->map_ms_use_exec = 0;
   293     mr->map_ms_abort_on_signal = 0;
   294     mr->spawn_sync_at_startup = 0;
   295     mr->spawn_watch_all = 0;
   296     mr->spawn_disconnect_early = 0;
   297     mr->spawn_disconnect_but_free = 0;
   298     mr->spawn_pass_intercomm_in_argument = 0;
   299     mr->keep_fds_at_fork = 0;
   301     mr->mpi_thread_support = (mpi_thrd == MPI_THREAD_SERIALIZED
   302                               || mpi_thrd == MPI_THREAD_MULTIPLE);
   304     mr->stop_at_some_check_globally = 0;
   305     mr->pushoff_hang_out = 0;
   306     mr->pushoff_fast_notice = 0;
   307     mr->pushoff_stat = 1;
   308     memset(&mr->pushoff_statistics, 0, 
sizeof(mr->pushoff_statistics));
   310     mr->kmrviz_trace = 0;
   312     if (identifying_name != 0) {
   313         size_t s = strlen(identifying_name);
   314         assert(s < KMR_JOB_NAME_LEN);
   315         strncpy(mr->identifying_name, identifying_name, KMR_JOB_NAME_LEN);
   316         mr->identifying_name[KMR_JOB_NAME_LEN - 1] = 0;
   318         mr->identifying_name[0] = 0;
   325     cc = MPI_Info_create(&mr->conf);
   326     assert(cc == MPI_SUCCESS);
   327     cc = kmr_load_preference(mr, mr->conf);
   328     assert(cc == MPI_SUCCESS);
   329     if (conf != MPI_INFO_NULL) {
   331         assert(cc == MPI_SUCCESS);
   334     kmr_check_options(mr, mr->conf);
   348 kmr_create_context_world()
   355 kmr_create_context_ff(
const int fcomm, 
const int finfo,
   356                       const char *identifying_name)
   358     MPI_Comm comm = MPI_Comm_f2c(fcomm);
   359     MPI_Info info = MPI_Info_f2c(finfo);
   370     if (mr->kvses.head != 0 || mr->kvses.tail != 0) {
   371         kmr_warning(mr, 1, 
"Some key-value streams remain unfreed");
   372         for (
KMR_KVS *p = mr->kvses.head; p != 0; p = p->c.link.next) {
   373             if (!KMR_KVS_MAGIC_OK(p->c.magic)) {
   374                 kmr_warning(mr, 1, 
"- unfreed kvs in bad state");
   375             } 
else if (p->c.magic == KMR_KVS_ONCORE) {
   376                 if (p->c.info_line0.file != 0) {
   378                     snprintf(ee, 80, 
"- kvs allocated at %s:%d: %s",
   379                              p->c.info_line0.file, p->c.info_line0.line,
   380                              p->c.info_line0.func);
   381                     kmr_warning(mr, 1, ee);
   384                 kmr_warning(mr, 1, 
"- unfreed kvs in bad state");
   389     if (mr->spawn_self != MPI_COMM_NULL && mr->spawn_self != MPI_COMM_SELF) {
   390         cc = MPI_Comm_free(&mr->spawn_self);
   391         assert(cc == MPI_SUCCESS);
   402     if (mr->log_traces != 0) {
   403         cc = fclose(mr->log_traces);
   406             char *m = strerror(errno);
   407             snprintf(ee, 
sizeof(ee), 
"Closing log file failed: %s", m);
   408             kmr_warning(mr, 1, ee);
   413     cc = MPI_Comm_free(&mr->comm);
   414     assert(cc == MPI_SUCCESS);
   415     if (mr->conf != MPI_INFO_NULL) {
   416         cc = MPI_Info_free(&mr->conf);
   417         assert(cc == MPI_SUCCESS);
   420     if (mr->spawn_watch_program != 0) {
   422         mr->spawn_watch_program = 0;
   424     assert(mr->spawn_comms == 0);
   429     if (mr->simple_workflow != 0) {
   431         mr->simple_workflow = 0;
   433     assert(mr->swf_spawner_so == 0);
   434     if (mr->swf_spawner_library != 0) {
   436         mr->swf_spawner_library = 0;
   439     kmr_free(mr, 
sizeof(
struct kmr_ctx));
   444 kmr_get_context_of_kvs(
KMR_KVS const *kvs)
   456     KMR_KVS *prev = kvs->c.link.prev;
   457     KMR_KVS *next = kvs->c.link.next;
   459         prev->c.link.next = next;
   461         assert(mr->kvses.head == kvs);
   462         mr->kvses.head = next;
   465         next->c.link.prev = prev;
   467         assert(mr->kvses.tail == kvs);
   468         mr->kvses.tail = prev;
   481     KMR_DEBUGX(memset(kvs, 0, 
sizeof(
KMR_KVS)));
   482     kvs->c.magic = KMR_KVS_ONCORE;
   487         mr->ckpt_kvs_id_counter++;
   488         kvs->c.ckpt_kvs_id = mr->ckpt_kvs_id_counter;
   489         kvs->c.ckpt_generated_op = 0;
   490         kvs->c.ckpt_consumed_op = 0;
   493     kvs->c.key_data = KMR_KV_BAD;
   494     kvs->c.value_data = KMR_KV_BAD;
   495     kvs->c.element_count = 0;
   501     kvs->c.shuffled_in_pushoff = 0;
   502     kvs->c._uniformly_sized_ = 0;
   504     kvs->c.block_size = (mr->preset_block_size - mr->malloc_overhead);
   505     kvs->c.element_size_limit = (kvs->c.block_size / 4);
   506     kvs->c.storage_netsize = 0;
   507     kvs->c.block_count = 0;
   508     kvs->c.first_block = 0;
   513     kvs->c.under_threaded_operation = 0;
   514     kvs->c.current_block = 0;
   515     kvs->c.adding_point = 0;
   516     kvs->c.temporary_data = 0;
   530     kvs->c.magic = KMR_KVS_ONCORE;
   538     kvs->c.key_data = KMR_KV_BAD;
   539     kvs->c.value_data = KMR_KV_BAD;
   540     kvs->c.element_count = 0;
   546     kvs->c.shuffled_in_pushoff = 0;
   547     kvs->c._uniformly_sized_ = 0;
   549     kvs->c.block_size = (mr->preset_block_size - mr->malloc_overhead);
   550     kvs->c.element_size_limit = (kvs->c.block_size / 4);
   551     kvs->c.storage_netsize = 0;
   552     kvs->c.block_count = 0;
   553     kvs->c.first_block = 0;
   558     kvs->c.under_threaded_operation = 0;
   559     kvs->c.current_block = 0;
   560     kvs->c.adding_point = 0;
   561     kvs->c.temporary_data = 0;
   570                 const char *file, 
const int line, 
const char *func)
   573     kvs->c.key_data = kf;
   574     kvs->c.value_data = vf;
   575     kvs->c.info_line0.file = file;
   576     kvs->c.info_line0.func = func;
   577     kvs->c.info_line0.line = line;
   594     assert(kvi != 0 && kvo != 0
   595            && kvi->c.magic == KMR_KVS_ONCORE
   596            && kvo->c.magic == KMR_KVS_ONCORE
   597            && kvi->c.oncore && kvo->c.oncore);
   598     assert(kvi->c.key_data == kvo->c.key_data
   599            && kvi->c.value_data == kvo->c.value_data);
   600     assert(kvi->c.stowed && !kvo->c.stowed);
   606             kvi->c.first_block = 0;
   615     kvo->c.stowed = kvi->c.stowed;
   616     kvo->c.nogrow = kvi->c.nogrow;
   617     kvo->c.sorted = kvi->c.sorted;
   618     kvo->c.element_count = kvi->c.element_count;
   619     kvo->c.storage_netsize = kvi->c.storage_netsize;
   620     kvo->c.block_count = kvi->c.block_count;
   621     kvo->c.first_block = kvi->c.first_block;
   622     kvo->c.ms = kvi->c.ms;
   630     kvi->c.first_block = 0;
   633     assert(cc == MPI_SUCCESS);
   642 kmr_free_kvs_oncore(
KMR_KVS *kvs)
   647         kmr_free(b, b->size);
   650     if (kvs->c.ms != 0) {
   651         long cnt = kvs->c.element_count;
   653                      + (sizeof(char) * (size_t)cnt));
   654         kmr_free(kvs->c.ms, sz);
   656     if (kvs->c.temporary_data != 0) {
   657         kmr_free(kvs->c.temporary_data, 0);
   659     kvs->c.magic = KMR_KVS_BAD;
   681     if (!KMR_KVS_MAGIC_OK(kvs->c.magic)) {
   682         kmr_error(0, 
"kmr_free_kvs: kvs already freed or corrupted");
   687     if (kvs->c.magic == KMR_KVS_ONCORE) {
   688         cc = kmr_free_kvs_oncore(kvs);
   690     } 
else if (kvs->c.magic == KMR_KVS_PUSHOFF) {
   691         cc = kmr_free_kvs_pushoff(kvs, 1);
   694         assert((kvs->c.magic == KMR_KVS_ONCORE)
   695                || (kvs->c.magic == KMR_KVS_PUSHOFF));
   712 kmr_allocate_block(
KMR_KVS *kvs, 
size_t size)
   715         assert(kvs->c.element_count == 0 && kvs->c.storage_netsize == 0
   716                && kvs->c.block_count == 0 && kvs->c.first_block == 0
   717                && kvs->c.current_block == 0 && kvs->c.adding_point == 0);
   722         kvs->c.block_size = 0;
   725     } 
else if (size == 1) {
   727         sz = kvs->c.block_size;
   728         assert(kvs->c.nogrow == 0);
   730         assert(kvs->c.first_block == 0 && kvs->c.current_block == 0
   731                && kvs->c.block_count == 0 && kvs->c.adding_point == 0);
   735         kvs->c.block_size = sz;
   736         kvs->c.storage_netsize = netsz;
   740     kmr_kvs_reset_block(kvs, b, sz, netsz);
   741     kmr_kvs_insert_block(kvs, b);
   750 kmr_kvs_adjust_adding_point(
KMR_KVS *kvs)
   752     if (kvs->c.block_count == 0) {
   753         assert(kvs->c.current_block == 0 && kvs->c.adding_point == 0);
   755         assert(kvs->c.current_block != 0 && kvs->c.adding_point != 0);
   757         assert(kmr_kvs_first_entry(kvs, b) == kvs->c.adding_point);
   758         kvs->c.adding_point = kmr_kvs_last_entry_limit(kvs, b);
   759         assert(kvs->c.adding_point == kmr_kvs_adding_point(b));
   770                    struct kmr_kv_box *xkv, _Bool reserve_space_only)
   772     kmr_assert_kv_sizes(kvs, kv);
   773     assert(!kvs->c.nogrow || kvs->c.storage_netsize != 0);
   776     size_t sz = kmr_kvs_entry_size_of_box(kvs, kv);
   777     if (sz > (kvs->c.element_size_limit)) {
   779         snprintf(ee, 80, 
"key-value too large (size=%zd)", sz);
   782     if (kvs->c.first_block == 0) {
   783         assert(kvs->c.element_count == 0);
   784         cc = kmr_allocate_block(kvs, 1);
   785         assert(cc == MPI_SUCCESS);
   787     if (!kmr_kvs_entry_fits_in_block(kvs, kvs->c.current_block, sz)) {
   788         assert(!kvs->c.nogrow);
   789         kmr_kvs_mark_entry_tail(kvs->c.adding_point);
   790         cc = kmr_allocate_block(kvs, 1);
   791         assert(cc == MPI_SUCCESS);
   795     if (!kvs->c.nogrow) {
   796         kvs->c.storage_netsize += kmr_kvs_entry_netsize(e);
   798     kvs->c.current_block->partial_element_count++;
   799     kvs->c.current_block->fill_size += kmr_kvs_entry_size(kvs, e);
   800     kvs->c.adding_point = kmr_kvs_next_entry(kvs, e);
   801     kvs->c.element_count++;
   811     kmr_assert_kvs_ok(0, kvs, 0, 1);
   813     if (kvs->c.magic == KMR_KVS_ONCORE) {
   816             cc = kmr_add_kv_nomutex(kvs, kv, 0, 0);
   819     } 
else if (kvs->c.magic == KMR_KVS_PUSHOFF) {
   826         assert((kvs->c.magic == KMR_KVS_ONCORE)
   827                || (kvs->c.magic == KMR_KVS_PUSHOFF));
   840     switch (kvs->c.key_data) {
   842         xassert(kvs->c.key_data != KMR_KV_BAD);
   853     case KMR_KV_POINTER_OWNED:
   854     case KMR_KV_POINTER_UNMANAGED:
   864     switch (kvs->c.value_data) {
   866         xassert(kvs->c.value_data != KMR_KV_BAD);
   877     case KMR_KV_POINTER_OWNED:
   878     case KMR_KV_POINTER_UNMANAGED:
   887     struct kmr_kv_box kv = {.klen = klen, .vlen = vlen, .k = xk, .v = xv};
   902                  void **keyp, 
void **valuep)
   904     kmr_assert_kvs_ok(0, kvs, 0, 1);
   905     assert(kvs->c.magic == KMR_KVS_ONCORE);
   913         cc = kmr_add_kv_nomutex(kvs, kv, &xkv, 1);
   916         *keyp = (
void *)xkv.k.p;
   919         *valuep = (
void *)xkv.v.p;
   927     int cc = kmr_add_kv_nomutex(kvs, kv, 0, 0);
   941     kmr_assert_kvs_ok(0, kvs, 0, 1);
   942     if (kvs->c.magic == KMR_KVS_ONCORE) {
   944             kmr_error(kvs->c.mr, 
"kmr_add_kv_done: may be called already");
   946         if (kvs->c.element_count == 0) {
   947             assert(kvs->c.current_block == 0 && kvs->c.adding_point == 0);
   949             assert(kvs->c.current_block != 0 && kvs->c.adding_point != 0);
   950             kmr_kvs_mark_entry_tail(kvs->c.adding_point);
   953         kvs->c.current_block = 0;
   954         kvs->c.adding_point = 0;
   955         assert(kvs->c.block_count == 0 || kvs->c.first_block != 0);
   956     } 
else if (kvs->c.magic == KMR_KVS_PUSHOFF) {
   959         assert((kvs->c.magic == KMR_KVS_ONCORE)
   960                || (kvs->c.magic == KMR_KVS_PUSHOFF));
   973     if (!((kvs->c.key_data == KMR_KV_OPAQUE
   974            || kvs->c.key_data == KMR_KV_CSTRING)
   975           && (kvs->c.value_data == KMR_KV_OPAQUE
   976               || kvs->c.value_data == KMR_KV_CSTRING))) {
   978                   "key-value data-types need be opaque for strings");
   980     size_t klen = (strlen(k) + 1);
   981     size_t vlen = (strlen(v) + 1);
   982     assert(klen <= INT_MAX && vlen <= INT_MAX);
  1006 kmr_collapse_as_opaque(
KMR_KVS *kvi, 
KMR_KVS *kvo, _Bool inspectp)
  1008     assert(kvi != 0 && kvo != 0);
  1009     assert(kmr_fields_pointer_p(kvi) || kvi->c.block_count > 1);
  1011     cc = kmr_allocate_block(kvo, kvi->c.storage_netsize);
  1012     assert(cc == MPI_SUCCESS);
  1013     struct kmr_option collapse = {.collapse = 1, .inspect = inspectp};
  1015     assert(cc == MPI_SUCCESS);
  1030         kmr_error_at_site(0, 
"Null input kvs", 0);
  1031     } 
else if (!KMR_KVS_MAGIC_OK(kvs->c.magic)) {
  1032         kmr_error_at_site(0, 
"Bad input kvs (freed or corrupted)", 0);
  1034     assert(kvs->c.magic == KMR_KVS_ONCORE);
  1035     kmr_check_fn_options(kvs->c.mr, kmr_noopt, opt, __func__);
  1037     if (kvs->c.ms != 0 || kvs->c.temporary_data != 0) {
  1038         kmr_warning(kvs->c.mr, 5,
  1039                     "Some fields in KVS may be lost in saved image");
  1042     if (kmr_fields_pointer_p(kvs) || (kvs->c.block_count > 1)) {
  1043         enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvs->c.key_data);
  1044         enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvs->c.value_data);
  1046         cc = kmr_collapse_as_opaque(kvs, kvs1, 0);
  1047         assert(cc == MPI_SUCCESS);
  1048         assert(!kmr_fields_pointer_p(kvs1) && kvs->c.block_count <= 1);
  1050         assert(cc == MPI_SUCCESS);
  1054     assert(!kmr_fields_pointer_p(kvs));
  1055     size_t netsz = kvs->c.storage_netsize;
  1057     size_t sz = (
sizeof(
KMR_KVS) + blocksz);
  1058     unsigned char *b = malloc(sz);
  1060         return MPI_ERR_BUFFER;
  1064     memcpy(h, kvs, 
sizeof(
KMR_KVS));
  1065     h->c.magic = KMR_KVS_ONCORE_PACKED;
  1069     h->c.block_count = 1;
  1070     h->c.first_block = 0;
  1071     h->c.current_block = 0;
  1072     h->c.adding_point = 0;
  1074     h->c.temporary_data = 0;
  1075     if (kvs->c.block_count == 0) {
  1077     } 
else if (kvs->c.block_count == 1) {
  1078         memcpy(s, kvs->c.first_block, blocksz);
  1095     assert(kvo != 0 && kvo->c.magic == KMR_KVS_ONCORE);
  1096     kmr_check_fn_options(kvo->c.mr, kmr_noopt, opt, __func__);
  1098     unsigned char *b = data;
  1100     unsigned char *s = (b + 
sizeof(
KMR_KVS));
  1101     if (h->c.magic != KMR_KVS_ONCORE_PACKED) {
  1102         kmr_warning(kvo->c.mr, 1, 
"Bad packed data, magic mismatch");
  1103         return MPI_ERR_TYPE;
  1105     size_t netsz = h->c.storage_netsize;
  1107     cc = kmr_allocate_block(kvo, netsz);
  1108     assert(cc == MPI_SUCCESS);
  1110         memcpy(kvo->c.first_block, s, blocksz);
  1112     kvo->c.key_data = h->c.key_data;
  1113     kvo->c.value_data = h->c.value_data;
  1114     assert(kvo->c.sorted == 0);
  1115     kvo->c.element_count = h->c.element_count;
  1116     kmr_kvs_adjust_adding_point(kvo);
  1128 kmr_map_parked(
struct kmr_kv_box *ev, 
long evcnt, 
long mapcount,
  1129                _Bool k_reclaim, _Bool v_reclaim,
  1134     KMR *mr = kvi->c.mr;
  1135     long cnt = kvi->c.element_count;
  1136     if (mr->single_thread || opt.nothreading) {
  1137         for (
long i = 0; i < evcnt; i++) {
  1138             double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
  1139             cc = (*m)(ev[i], kvi, kvo, arg, (mapcount + i));
  1140             double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
  1141             if (cc != MPI_SUCCESS) {
  1143                 snprintf(ee, 
sizeof(ee),
  1144                          "Map-fn returned with error cc=%d", cc);
  1147             if (mr->log_traces != 0) {
  1148                 kmr_log_map(mr, kvi, &ev[i], (mapcount + 1), cnt,
  1154             kvo->c.under_threaded_operation = 1;
  1156         KMR_OMP_PARALLEL_FOR_
  1157             for (
long i = 0; i < evcnt; i++) {
  1158                 double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
  1159                 int ccx = (*m)(ev[i], kvi, kvo, arg, (mapcount + i));
  1160                 double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
  1161                 if (ccx != MPI_SUCCESS) {
  1163                     snprintf(ee, 
sizeof(ee),
  1164                              "Map-fn returned with error cc=%d", ccx);
  1167                 if (mr->log_traces != 0) {
  1168                     kmr_log_map(mr, kvi, &ev[i], (mapcount + 1), cnt,
  1173             kvo->c.under_threaded_operation = 0;
  1176     for (
long i = 0; i < evcnt; i++) {
  1178             kmr_free((
void *)ev[i].k.p, (
size_t)ev[i].klen);
  1181             kmr_free((
void *)ev[i].v.p, (
size_t)ev[i].vlen);
  1193                  _Bool stop_when_some_added,
  1197     kmr_assert_kvs_ok(kvi, kvo, 1, 0);
  1198     assert(from >= 0 && stride > 0 && limit >= 0);
  1199     assert(kvi->c.current_block == 0);
  1200     limit = ((limit != 0) ? limit : LONG_MAX);
  1201     KMR *mr = kvi->c.mr;
  1205             if (kvo != 0 && !opt.keep_open) {
  1219     if (mr->step_sync) {
  1220         cc = MPI_Barrier(mr->comm);
  1221         assert(MPI_SUCCESS);
  1223     if (kvo != 0 && opt.collapse) {
  1224         assert(!kmr_fields_pointer_p(kvo));
  1226     _Bool k_reclaim = (!opt.inspect && (kmr_key_pointer_p(kvi)));
  1227     _Bool v_reclaim = (!opt.inspect && (kmr_value_pointer_p(kvi)));
  1228     long evsz = mr->mapper_park_size;
  1233     long nextindex = from;
  1235     kvi->c.current_block = kvi->c.first_block;
  1236     while (index < kvi->c.element_count) {
  1237         assert(kvi->c.current_block != 0);
  1240         for (
int i = 0; i < b->partial_element_count; i++) {
  1242             if (index == nextindex && index < limit) {
  1244                 nextindex = (index + stride);
  1256                     kmr_free((
void *)w->p, (
size_t)e->klen);
  1261                     kmr_free((
void *)w->p, (
size_t)e->vlen);
  1265             if (evcnt >= evsz) {
  1266                 cc = kmr_map_parked(ev, evcnt, mapcount, k_reclaim, v_reclaim,
  1267                                     kvi, kvo, m, arg, opt);
  1268                 assert(cc == MPI_SUCCESS);
  1277                 if (stop_when_some_added) {
  1279                     if (mr->stop_at_some_check_globally) {
  1282                         done = (kvo->c.element_count != 0);
  1286                         index = (kvi->c.element_count - 1);
  1287                         while (b->next != 0) {
  1293             e = kmr_kvs_next(kvi, e, 1);
  1296         kvi->c.current_block = b->next;
  1298     assert(kvi->c.current_block == 0);
  1300         cc = kmr_map_parked(ev, evcnt, mapcount, k_reclaim, v_reclaim,
  1301                             kvi, kvo, m, arg, opt);
  1302         assert(cc == MPI_SUCCESS);
  1311     if (kvo != 0 && !opt.keep_open) {
  1323         kmr_free(ev, (
sizeof(
struct kmr_kv_box) * (
size_t)evsz));
  1350          const char *file, 
const int line, 
const char *func)
  1353     kmr_assert_kvs_ok(kvi, kvo, 1, 0);
  1354     KMR *mr = kvi->c.mr;
  1355     struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
  1356                                        .keep_open = 1, .collapse = 1,
  1358     kmr_check_fn_options(mr, kmr_supported, opt, __func__);
  1361     if (mr->kmrviz_trace) {
  1365     if (mr->atwork == 0) {
  1374                               kvi, kvo, arg, opt, m);
  1376         assert(!opt.inspect && !opt.keep_open);
  1380     if (mr->atwork == &info) {
  1383     if (mr->kmrviz_trace) {
  1400     KMR *mr = kvi->c.mr;
  1401     int nprocs = mr->nprocs;
  1403     if (mr->rank != 0) {
  1404         cc = MPI_Recv(0, 0, MPI_INT, (mr->rank - 1),
  1405                       KMR_TAG_MAP_BY_RANK, mr->comm, MPI_STATUS_IGNORE);
  1406         assert(cc == MPI_SUCCESS);
  1408     cc = 
kmr_map(kvi, kvo, arg, opt, m);
  1409     assert(cc == MPI_SUCCESS);
  1411     if (mr->rank != (nprocs - 1)) {
  1413         cc = MPI_Send(0, 0, MPI_INT, (mr->rank + 1),
  1414                       KMR_TAG_MAP_BY_RANK, mr->comm);
  1415         assert(cc == MPI_SUCCESS);
  1429     kmr_assert_kvs_ok(kvi, 0, 1, 0);
  1430     assert(kvi->c.current_block == 0);
  1431     KMR *mr = kvi->c.mr;
  1432     kvi->c.current_block = kvi->c.first_block;
  1433     if (kvi->c.element_count == 1) {
  1434         assert(kvi->c.current_block != 0);
  1437         assert(b->partial_element_count == 1);
  1440         kvi->c.current_block = 0;
  1443         if (kvi->c.element_count == 0) {
  1444             kmr_warning(mr, 1, 
"kmr_take_one for no entries");
  1447             kmr_warning(mr, 1, 
"kmr_take_one for multiple entries");
  1450         MPI_Abort(MPI_COMM_WORLD, 1);
  1463     kmr_assert_kvs_ok(0, kvo, 0, 1);
  1464     KMR *mr = kvo->c.mr;
  1465     struct kmr_option kmr_supported = {.keep_open = 1, .take_ckpt = 1};
  1466     kmr_check_fn_options(mr, kmr_supported, opt, __func__);
  1467     int rank = mr->rank;
  1471     if (mr->kmrviz_trace) {
  1482     if (!rank_zero_only || rank == 0) {
  1483         struct kmr_kv_box kv = {.klen = 0, .vlen = 0, .k.i = 0, .v.i = 0};
  1484         cc = (*m)(kv, 0, kvo, arg, 0);
  1485         if (cc != MPI_SUCCESS) {
  1487             snprintf(ee, 
sizeof(ee),
  1488                      "Map-fn returned with error cc=%d", cc);
  1492     if (!opt.keep_open) {
  1494         assert(cc == MPI_SUCCESS);
  1501     if (mr->kmrviz_trace) {
  1519     assert(cc == MPI_SUCCESS);
  1529 static inline unsigned long  1530 kmr_hash_key_opaque(
const unsigned char *p, 
int n)
  1532 #define ROT(X,N) ((X) << (N) | (X) >> (64-(N)))  1533 #define KEY(V) (k = (V), k *= 0x87c37b91114253d5UL, \  1534                 k = ROT(k, 31), k *= 0x4cf5ad432745937fUL)  1535 #define MIX() (h ^= k, h = ROT(h, 31), h = h * 5 + 0xe6546b64)  1536 #define FIN() (h ^= (h >> 33), h *= 0xff51afd7ed558ccdUL, h ^= (h >> 33), \  1537                h *= 0xc4ceb9fe1a85ec53UL, h ^= (h >> 33))  1538     unsigned long h = 0x85ebca6bUL; 
  1540     const unsigned long *v = (
void *)p;
  1542     int rn = (n - (8 * n8));
  1543     const unsigned char *r = &p[(8 * n8)];
  1544     for (
int i = 0; i < n8; i++) {
  1548     union {
unsigned long i; 
unsigned char c[8];} u = {.i = 0UL};
  1549     for (
int i = 0; i < rn; i++) {
  1566 static inline unsigned long  1567 kmr_hash_key_opaque(
const unsigned char *p, 
int n)
  1569     unsigned long hash = 0;
  1570     for (i = 0 ; i < n ; i++) {
  1581 static inline unsigned long  1582 kmr_hash_key_opaque(
const unsigned char *p, 
int n)
  1584     unsigned long hash = 0;
  1588         for (i = 0 ; i < n ; i++) {
  1595         for (i = 0 ; i < n ; i += k) {
  1610 static inline signed long  1613     switch (kvs->c.key_data) {
  1615         xassert(kvs->c.key_data != KMR_KV_BAD);
  1618     case KMR_KV_CSTRING:
  1619         return (
signed long)kmr_hash_key_opaque((
void *)kv.k.p, kv.klen);
  1620     case KMR_KV_INTEGER:
  1624     case KMR_KV_POINTER_OWNED:
  1625     case KMR_KV_POINTER_UNMANAGED:
  1626         xassert(kvs->c.key_data != KMR_KV_POINTER_OWNED
  1627                 && kvs->c.key_data != KMR_KV_POINTER_UNMANAGED);
  1640 kmr_stable_key_opaque(
const struct kmr_kv_box kv)
  1642     unsigned char *p = (
unsigned char *)kv.k.p;
  1644     unsigned long hash = 0;
  1645     for (
int i = 0; i < (int)
sizeof(
long); i++) {
  1646         unsigned char v = ((i < n) ? p[i] : 0);
  1647         hash = ((hash << 8) + v);
  1649     return (
long)(hash >> 1);
  1660     switch (kvs->c.key_data) {
  1662         xassert(kvs->c.key_data != KMR_KV_BAD);
  1665     case KMR_KV_CSTRING:
  1666         return kmr_stable_key_opaque(kv);
  1667     case KMR_KV_INTEGER:
  1672         long v1 = ((v0 >= 0L) ? v0 : ((-v0) | (1L << 63)));
  1676     case KMR_KV_POINTER_OWNED:
  1677     case KMR_KV_POINTER_UNMANAGED:
  1678         xassert(kvs->c.key_data != KMR_KV_POINTER_OWNED
  1679                 && kvs->c.key_data != KMR_KV_POINTER_UNMANAGED);
  1693     unsigned int nprocs = (
unsigned int)kvs->c.mr->nprocs;
  1694     unsigned long v = (
unsigned long)kmr_hash_key(kvs, kv);
  1695     unsigned int h = (((v >> 32) ^ v) & ((1L << 32) - 1));
  1696     return (
int)(h % nprocs);
  1701 #define KMR_CMP3(X, Y) (((X) == (Y)) ? 0 : ((X) < (Y)) ? -1 : 1)  1708 kmr_compare_lexicographically(
const unsigned char *p, 
const int plen,
  1709                               const unsigned char *q, 
const int qlen)
  1711     int s = MIN(plen, qlen);
  1713     for (
int i = 0; i < s; i++) {
  1715             return (p[i] - q[i]);
  1719     int cc = memcmp(p, q, (
size_t)s);
  1723         return (plen - qlen);
  1730 kmr_compare_opaque(
const struct kmr_kv_box *p,
  1733     return kmr_compare_lexicographically((
unsigned char *)p->k.p, p->klen,
  1734                                          (
unsigned char *)q->k.p, q->klen);
  1738 kmr_compare_integer(
const struct kmr_kv_box *p0,
  1741     return KMR_CMP3(p0->k.i, p1->k.i);
  1745 kmr_compare_float8(
const struct kmr_kv_box *p0,
  1748     return KMR_CMP3(p0->k.d, p1->k.d);
  1757     struct kmr_kv_box b0 = kmr_pick_kv2(p0->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
  1758     struct kmr_kv_box b1 = kmr_pick_kv2(p1->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
  1759     return kmr_compare_lexicographically((
unsigned char *)b0.k.p, b0.klen,
  1760                                          (
unsigned char *)b1.k.p, b1.klen);
  1771     struct kmr_kv_box b0 = kmr_pick_kv2(p0->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
  1772     struct kmr_kv_box b1 = kmr_pick_kv2(p1->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
  1775     return KMR_CMP3(v0, v1);
  1786     struct kmr_kv_box b0 = kmr_pick_kv2(p0->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
  1787     struct kmr_kv_box b1 = kmr_pick_kv2(p1->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
  1790     return KMR_CMP3(v0, v1);
  1796 kmr_choose_sorter(
const KMR_KVS *kvs)
  1798     switch (kvs->c.key_data) {
  1800         xassert(kvs->c.key_data != KMR_KV_BAD);
  1802     case KMR_KV_INTEGER:
  1803         return kmr_compare_integer;
  1805         return kmr_compare_float8;
  1807     case KMR_KV_CSTRING:
  1808     case KMR_KV_POINTER_OWNED:
  1809     case KMR_KV_POINTER_UNMANAGED:
  1810         return kmr_compare_opaque;
  1820 static kmr_record_sorter_t
  1821 kmr_choose_record_sorter(
const KMR_KVS *kvs)
  1823     switch (kvs->c.key_data) {
  1825         xassert(kvs->c.key_data != KMR_KV_BAD);
  1827     case KMR_KV_INTEGER:
  1829         return kmr_compare_record_integer_;
  1832         return kmr_compare_record_float8_;
  1834     case KMR_KV_CSTRING:
  1835     case KMR_KV_POINTER_OWNED:
  1836     case KMR_KV_POINTER_UNMANAGED:
  1837         return kmr_compare_record_opaque;
  1850 kmr_copy_record_shuffle_fn(
const struct kmr_kv_box kv,
  1861 kmr_copy_record_sorting_fn(
const struct kmr_kv_box kv,
  1879     return KMR_CMP3(v0, v1);
  1886     kmr_assert_kvs_ok(kvi, kvo, 1, 1);
  1887     assert(kmr_shuffle_compatible_p(kvo, kvi));
  1888     KMR *mr = kvi->c.mr;
  1891     if (mr->kmrviz_trace) {
  1904     _Bool twostep = !mr->one_step_sort;
  1905     _Bool primekey = ((kvi->c.key_data == KMR_KV_INTEGER)
  1906                       || (kvi->c.key_data == KMR_KV_FLOAT8));
  1907     double timestamp[5];
  1909     long cnt = kvi->c.element_count;
  1910     timestamp[0] = MPI_Wtime();
  1916         .nothreading = opt.nothreading
  1919         cc = 
kmr_map(kvi, 0, ev, inspect, kmr_copy_record_shuffle_fn);
  1920         assert(cc == MPI_SUCCESS);
  1922         cc = 
kmr_map(kvi, 0, ev, inspect, kmr_copy_record_sorting_fn);
  1923         assert(cc == MPI_SUCCESS);
  1927     assert(cc == MPI_SUCCESS);
  1929     timestamp[1] = MPI_Wtime();
  1930     if (shuffling || twostep || primekey) {
  1931         if (mr->single_thread || opt.nothreading) {
  1935                       mr->sort_threads_depth);
  1938     timestamp[2] = MPI_Wtime();
  1939     if (!shuffling && !primekey) {
  1941         long *runs = 
kmr_malloc(
sizeof(
long) * (
size_t)cnt);
  1951                     cc = KMR_CMP3(ev[i - 1].v, ev[i].v);
  1953                 assert(nruns < cnt);
  1957             assert(i == cnt && (cnt == 0 || runs[nruns - 1] == cnt));
  1959             nruns = (cnt == 0 ? 0 : 1);
  1962         kmr_record_sorter_t cmp1 = kmr_choose_record_sorter(kvi);
  1963         if (mr->single_thread || opt.nothreading) {
  1964             for (
long k = 0; k < nruns; k++) {
  1965                 long j = (k == 0 ? 0 : runs[k - 1]);
  1969                     qsort(&ev[j], (
size_t)(i - j),
  1971                           (kmr_qsorter_t)cmp1);
  1975             KMR_OMP_PARALLEL_FOR_
  1976                 for (
long k = 0; k < nruns; k++) {
  1977                     long j = (k == 0 ? 0 : runs[k - 1]);
  1981                         qsort(&ev[j], (
size_t)(i - j),
  1983                               (kmr_qsorter_t)cmp1);
  1987         kmr_free(runs, (
sizeof(
long) * (
size_t)cnt));
  1989     timestamp[3] = MPI_Wtime();
  1990     size_t sz = kvi->c.storage_netsize;
  1991     cc = kmr_allocate_block(kvo, sz);
  1992     assert(cc == MPI_SUCCESS);
  1994     for (
long i = 0 ; i < cnt; i++) {
  1996         kmr_add_kv_nomutex(kvo, kv, 0, 0);
  1998     timestamp[4] = MPI_Wtime();
  1999     assert(sz == 0 || kmr_kvs_entry_tail_p(kvo->c.adding_point));
  2000     assert(sz == 0 || kvo->c.block_count == 1);
  2002     kmr_assert_on_tail_marker(kvo);
  2005     _Bool tracing = mr->trace_sorting;
  2006     if (tracing && (5 <= mr->verbosity)) {
  2007         fprintf(stderr, (
";;KMR [%05d] kmr_sort_locally"  2008                          " time=(%f %f %f %f) (msec)\n"),
  2010                 ((timestamp[1] - timestamp[0]) * 1e3),
  2011                 ((timestamp[2] - timestamp[1]) * 1e3),
  2012                 ((timestamp[3] - timestamp[2]) * 1e3),
  2013                 ((timestamp[4] - timestamp[3]) * 1e3));
  2026     if (mr->kmrviz_trace) {
  2054     kmr_assert_kvs_ok(kvi, kvo, 1, 1);
  2055     assert(kmr_shuffle_compatible_p(kvo, kvi));
  2056     struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
  2058     kmr_check_fn_options(kvi->c.mr, kmr_supported, opt, __func__);
  2059     _Bool ranking = opt.key_as_rank;
  2060     kmr_sort_locally_lo(kvi, kvo, shuffling, ranking, opt);
  2071 kmr_count_entries(
KMR_KVS *kvs, _Bool bound_in_block)
  2073     kvs->c.current_block = kvs->c.first_block;
  2074     struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
  2080         e = kmr_kvs_next(kvs, e, bound_in_block);
  2096     kmr_assert_kvs_ok(kvi, kvo, 1, 1);
  2097     assert(kmr_shuffle_compatible_p(kvo, kvi));
  2098     KMR *mr = kvi->c.mr;
  2099     struct kmr_option kmr_supported = {.inspect = 1, .key_as_rank = 1,
  2101     kmr_check_fn_options(mr, kmr_supported, opt, __func__);
  2102     _Bool ranking = opt.key_as_rank;
  2105     if (mr->kmrviz_trace) {
  2112     if (kvi->c.magic == KMR_KVS_PUSHOFF) {
  2113         kmr_pushoff_make_stationary(kvi);
  2115     if (kvi->c.shuffled_in_pushoff) {
  2116         assert(!mr->ckpt_enable);
  2134     enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvi->c.key_data);
  2135     enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvi->c.value_data);
  2139     kmr_sort_locally_lo(kvi, kvs1, 1, ranking, n_opt);
  2140     assert(kvs1->c.stowed);
  2143     assert(!kmr_fields_pointer_p(kvs1));
  2144     assert(kvs1->c.block_count <= 1);
  2147     int nprocs = mr->nprocs;
  2148     long cnt = kvs1->c.element_count;
  2149     long *ssz = 
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
  2150     long *sdp = 
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
  2151     long *rsz = 
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
  2152     long *rdp = 
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
  2153     for (
int r = 0; r < nprocs; r++) {
  2158     assert(kvs1->c.current_block == 0);
  2159     kvs1->c.current_block = kvs1->c.first_block;
  2160     struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs1, kvs1->c.first_block);
  2161     for (
long i = 0; i < cnt; i++) {
  2164         int r = (ranking ? (int)kv.k.i : kmr_pitch_rank(kv, kvs1));
  2165         assert(0 <= r && r < nprocs);
  2166         if (ranking && !(0 <= r && r < nprocs)) {
  2167             kmr_error(mr, 
"key entries are not ranks");
  2170             kmr_error(mr, 
"key-value entries are not sorted (internal error)");
  2172         ssz[r] += (long)kmr_kvs_entry_netsize(e);
  2174         e = kmr_kvs_next(kvs1, e, 0);
  2178     assert(cc == MPI_SUCCESS);
  2181     for (
int r = 0; r < nprocs; r++) {
  2187     cc = kmr_allocate_block(kvo, (
size_t)recvsz);
  2188     assert(cc == MPI_SUCCESS);
  2194     assert(cc == MPI_SUCCESS);
  2195     long ocnt = kmr_count_entries(kvo, 1);
  2196     assert(kvo->c.sorted == 0);
  2197     kvo->c.element_count = ocnt;
  2199         assert(kvo->c.block_count == 1);
  2200         rb->partial_element_count = ocnt;
  2201         rb->fill_size = (size_t)recvsz;
  2203     kmr_kvs_adjust_adding_point(kvo);
  2215     assert(kvo->c.element_count == 0 || kvo->c.storage_netsize != 0);
  2216     xassert(!kmr_fields_pointer_p(kvo));
  2218     kmr_free(ssz, (
sizeof(
long) * (
size_t)nprocs));
  2219     kmr_free(sdp, (
sizeof(
long) * (
size_t)nprocs));
  2220     kmr_free(rsz, (
sizeof(
long) * (
size_t)nprocs));
  2221     kmr_free(rdp, (
sizeof(
long) * (
size_t)nprocs));
  2225     if (mr->kmrviz_trace) {
  2242     kmr_assert_kvs_ok(kvi, kvo, 1, 1);
  2243     KMR *mr = kvi->c.mr;
  2244     struct kmr_option kmr_supported = {.inspect = 1, .rank_zero = 1,
  2246     kmr_check_fn_options(mr, kmr_supported, opt, __func__);
  2247     int nprocs = mr->nprocs;
  2248     int rank = mr->rank;
  2253     if (mr->kmrviz_trace) {
  2258     if (kmr_fields_pointer_p(kvi) || kvi->c.block_count > 1) {
  2264         cc = kmr_collapse_as_opaque(kvi, kvs1, 1);
  2265         assert(cc == MPI_SUCCESS);
  2270     kmr_assert_on_tail_marker(kvs1);
  2271     assert(kvs1->c.block_count <= 1);
  2278                 assert(cc == MPI_SUCCESS);
  2282                 assert(cc == MPI_SUCCESS);
  2288     long *rsz = 
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
  2289     long *rdp = 
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
  2291     long ssz = (long)kvs1->c.storage_netsize;
  2293     assert(cc == MPI_SUCCESS);
  2295     if (!opt.rank_zero || rank == 0) {
  2296         for (
int r = 0; r < nprocs; r++) {
  2301     if (!(kvo->c.key_data == kvs1->c.key_data
  2302           && kvo->c.value_data == kvs1->c.value_data)) {
  2303         kmr_error(mr, 
"key-data or value-data types mismatch");
  2305     cc = kmr_allocate_block(kvo, (
size_t)recvsz);
  2306     assert(cc == MPI_SUCCESS);
  2311     cc = 
kmr_allgatherv(mr, opt.rank_zero, sbuf, ssz, rbuf, rsz, rdp);
  2312     assert(cc == MPI_SUCCESS);
  2313     assert(kvo->c.element_count == 0);
  2314     long ocnt = kmr_count_entries(kvo, 1);
  2315     kvo->c.element_count = ocnt;
  2317         rb->partial_element_count = ocnt;
  2318         rb->fill_size = (size_t)recvsz;
  2320     kmr_kvs_adjust_adding_point(kvo);
  2322     assert(cc == MPI_SUCCESS);
  2323     kmr_assert_on_tail_marker(kvo);
  2324     assert(kvo->c.element_count == 0 || kvo->c.storage_netsize != 0);
  2332         assert(cc == MPI_SUCCESS);
  2336         assert(cc == MPI_SUCCESS);
  2339     kmr_free(rsz, (
sizeof(
long) * (
size_t)nprocs));
  2340     kmr_free(rdp, (
sizeof(
long) * (
size_t)nprocs));
  2345     if (mr->kmrviz_trace) {
  2358 kmr_copy_record_fn(
const struct kmr_kv_box kv,
  2374     xassert(kvi->c.current_block == 0);
  2375     kmr_sorter_t cmp = kmr_choose_sorter(kvi);
  2376     KMR *mr = kvi->c.mr;
  2377     long cnt = kvi->c.element_count;
  2381     kvi->c.current_block = kvi->c.first_block;
  2382     struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, kvi->c.first_block);
  2394         while (index < start_from) {
  2395             e = kmr_kvs_next(kvi, e, 1);
  2405             assert(index < cnt);
  2406             e = kmr_kvs_next(kvi, e, 1);
  2416             cc = (*cmp)(&kv0, &kv1);
  2422             assert(ej == 0 && e == 0);
  2432         for (
long i = 0; i < n; i++) {
  2435             e = kmr_kvs_next(kvi, e, 1);
  2438         double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
  2439         cc = (*r)(ev, n, kvi, kvo, arg);
  2440         double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
  2441         if (cc != MPI_SUCCESS) {
  2443             snprintf(ee, 
sizeof(ee),
  2444                      "Reduce-fn returned with error cc=%d", cc);
  2447         if (mr->log_traces != 0) {
  2448             kmr_log_reduce(mr, kvi, ev, n, r, (t1 - t0));
  2457     assert(index == cnt);
  2465         kmr_free(ev, (
sizeof(
struct kmr_kv_box) * (
size_t)evsz));
  2471 kmr_reduce_threading(_Bool stop_when_some_added,
  2486         .nothreading = opt.nothreading
  2488     assert(kvi->c.current_block == 0);
  2489     long cnt = kvi->c.element_count;
  2493     cc = 
kmr_map(kvi, 0, ev, inspect, kmr_copy_record_fn);
  2494     assert(cc == MPI_SUCCESS);
  2497     kmr_sorter_t cmp = kmr_choose_sorter(kvi);
  2498     long *runs = 
kmr_malloc(
sizeof(
long) * (
size_t)cnt);
  2514                 cc = (*cmp)(&ev[i - 1], &ev[i]);
  2517             assert(nruns < cnt);
  2521         assert(i == cnt && (cnt == 0 || runs[nruns - 1] == cnt));
  2525             kvo->c.under_threaded_operation = 1;
  2527         KMR *mr = kvi->c.mr;
  2529         KMR_OMP_PARALLEL_FOR_
  2530             for (
long k = 0; k < nruns; k++) {
  2533                     long j = (k == 0 ? 0 : runs[k - 1]);
  2536                     double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
  2537                     int ccx = (*r)(&ev[j], (i - j), kvi, kvo, arg);
  2538                     double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
  2539                     if (ccx != MPI_SUCCESS) {
  2541                         snprintf(ee, 
sizeof(ee),
  2542                                  "Reduce-fn returned with error cc=%d", ccx);
  2545                     if (mr->log_traces != 0) {
  2546                         kmr_log_reduce(mr, kvi, ev, (i - j), r, (t1 - t0));
  2556                     if (stop_when_some_added) {
  2558                         if (mr->stop_at_some_check_globally) {
  2561                             done = (kvo->c.element_count != 0);
  2573             kvo->c.under_threaded_operation = 0;
  2585     kmr_free(runs, (
sizeof(
long) * (
size_t)cnt));
  2586     kmr_free(ev, (
sizeof(
struct kmr_kv_box) * (
size_t)cnt));
  2610             const char *file, 
const int line, 
const char *func)
  2612     kmr_assert_kvs_ok(kvi, kvo, 1, 0);
  2613     KMR *mr = kvi->c.mr;
  2614     struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
  2616     kmr_check_fn_options(mr, kmr_supported, opt, __func__);
  2617     struct kmr_option i_opt = kmr_copy_options_i_part(opt);
  2618     struct kmr_option o_opt = kmr_copy_options_o_part(opt);
  2621     if (mr->kmrviz_trace) {
  2631     enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvi->c.key_data);
  2632     enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvi->c.value_data);
  2637     kmr_sort_locally_lo(kvi, kvs1, 0, 0, i_opt);
  2643     if (mr->atwork == 0) {
  2650     if (mr->single_thread || opt.nothreading) {
  2651         cc = kmr_reduce_nothreading(kvs1, kvo, arg, o_opt, r);
  2653         cc = kmr_reduce_threading(stop_when_some_added,
  2654                                   kvs1, kvo, arg, o_opt, r);
  2656     if (mr->atwork == &info) {
  2661     kmr_assert_on_tail_marker(kvs1);
  2668     if (mr->kmrviz_trace) {
  2686     kmr_assert_kvs_ok(kvi, kvo, 1, 0);
  2687     KMR *mr = kvi->c.mr;
  2688     assert(kvi->c.current_block == 0);
  2689     struct kmr_option kmr_supported = {.inspect = 1, .take_ckpt = 1};
  2690     kmr_check_fn_options(mr, kmr_supported, opt, __func__);
  2705     long cnt = kvi->c.element_count;
  2713         cc = 
kmr_map(kvi, 0, ev, inspect, kmr_copy_record_fn);
  2714         assert(cc == MPI_SUCCESS);
  2716         cc = (*r)(&ev[0], cnt, kvi, kvo, arg);
  2717         if (cc != MPI_SUCCESS) {
  2719             snprintf(ee, 
sizeof(ee),
  2720                      "Reduce-fn returned with error cc=%d", cc);
  2733     kmr_free(ev, (
sizeof(
struct kmr_kv_box) * (
size_t)cnt));
  2757     for (
int i = 0; i < nkvs; i++) {
  2758         kmr_assert_i_kvs_ok(kvs[i], 1);
  2760     kmr_assert_o_kvs_ok(kvo, 1);
  2761     if (kvo->c.element_count > 0) {
  2762         KMR *mr = kvo->c.mr;
  2763         kmr_error(mr, 
"kmr_concatenate_kvs: Output kvs has entries");
  2765     kmr_check_fn_options(kvo->c.mr, kmr_noopt, opt, __func__);
  2773     for (
int i = 0; i < nkvs; i++) {
  2774         elements += kvs[i]->c.element_count;
  2775         netsize += kvs[i]->c.storage_netsize;
  2776         blocks += kvs[i]->c.block_count;
  2780             kvs[i]->c.first_block = 0;
  2782                 assert(storage == 0);
  2786                 assert(blocks != 0 && p->next == 0);
  2791             while (p->next != 0) {
  2798     kvo->c.first_block = storage;
  2799     kvo->c.element_count = elements;
  2800     kvo->c.storage_netsize = netsize;
  2801     kvo->c.block_count = blocks;
  2805     kvo->c.current_block = 0;
  2806     kvo->c.adding_point = 0;
  2807     assert(kvo->c.block_count == 0 || kvo->c.first_block != 0);
  2819     kmr_assert_kvs_ok(kvs, 0, 1, 0);
  2820     assert(kvs->c.magic == KMR_KVS_ONCORE);
  2822     long cnt = kvs->c.element_count;
  2823     kvs->c.current_block = kvs->c.first_block;
  2824     struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
  2826     for (
long i = 0; i < cnt && e != 0; i++) {
  2828         e = kmr_kvs_next(kvs, e, 0);
  2830     kvs->c.current_block = 0;
  2833     if (kvs->c.element_count == 0) {
  2837         for (b = kvs->c.first_block; b->next != 0; b = b->next);
  2838         kvs->c.current_block = b;
  2841         e = kmr_kvs_first_entry(kvs, b);
  2843         long cnt = b->partial_element_count;
  2844         for (
long i = 0; i < cnt && e != 0; i++) {
  2846             e = kmr_kvs_next(kvs, e, 1);
  2848         kvs->c.current_block = 0;
  2862     kmr_assert_kvs_ok(kvs, 0, 1, 0);
  2863     assert(kvs->c.magic == KMR_KVS_ONCORE);
  2864     long cnt = MIN(n, kvs->c.element_count);
  2865     kvs->c.current_block = kvs->c.first_block;
  2866     struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
  2867     for (
long i = 0; i < cnt && e != 0; i++) {
  2869         e = kmr_kvs_next(kvs, e, 0);
  2871     kvs->c.current_block = 0;
  2885     kmr_assert_kvs_ok(kvs, 0, 1, 0);
  2886     assert(kvs->c.magic == KMR_KVS_ONCORE);
  2887     long cnt = MIN(n, kvs->c.element_count);
  2888     kvs->c.current_block = kvs->c.first_block;
  2889     struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
  2890     for (
long i = 0; i < cnt && e != 0; i++) {
  2892         e = kmr_kvs_next(kvs, e, 0);
  2894     kvs->c.current_block = 0;
  2903                            long n, _Bool shuffling, _Bool ranking)
  2905     kmr_assert_kvs_ok(kvs, 0, 1, 0);
  2906     assert(kvs->c.magic == KMR_KVS_ONCORE);
  2907     long cnt = MIN(n, kvs->c.element_count);
  2908     kvs->c.current_block = kvs->c.first_block;
  2909     struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
  2910     for (
long i = 0; i < cnt && e != 0; i++) {
  2913             ev[i].v = (ranking ? kv.k.i : kmr_pitch_rank(kv, kvs));
  2916             ev[i].v = kmr_stable_key(kv, kvs);
  2919         e = kmr_kvs_next(kvs, e, 0);
  2921     kvs->c.current_block = 0;
  2933         kmr_error(mr, 
"kmr_legal_minimum_field_size: Bad field");
  2935     case KMR_KV_INTEGER:
  2936         return sizeof(long);
  2938         return sizeof(double);
  2940     case KMR_KV_CSTRING:
  2941     case KMR_KV_POINTER_OWNED:
  2942     case KMR_KV_POINTER_UNMANAGED:
  2945         kmr_error(mr, 
"kmr_legal_minimum_field_size: Bad field");
  2966     KMR *mr = kvo->c.mr;
  2970     long cnt = kvi->c.element_count;
  2974     assert(cc == MPI_SUCCESS);
  2977     for (
long i = 0; i < cnt; i++) {
  2980         assert(cc == MPI_SUCCESS);
  2985         cc = (*r)(bx, 2, kvi, xs, 0);
  2986         if (cc != MPI_SUCCESS) {
  2988             snprintf(ee, 
sizeof(ee),
  2989                      "Reduce-fn returned with error cc=%d", cc);
  2993         assert(cc == MPI_SUCCESS);
  2995         bx[0].klen = bx[1].klen;
  2998         assert(cc == MPI_SUCCESS);
  3003     assert(cc == MPI_SUCCESS);
  3005     if (carryout != 0) {
  3008         assert(cc == MPI_SUCCESS);
  3010         assert(cc == MPI_SUCCESS);
  3012         assert(cc == MPI_SUCCESS);
 int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS). 
int kmr_ckpt_progress_init(KMR_KVS *, KMR_KVS *, struct kmr_option)
It initializes a progress of MapReduce checkpointing. 
Key-Value Stream (abstract). 
int kmr_add_kv1(KMR_KVS *kvs, void *k, int klen, void *v, int vlen)
Adds a key-value pair as given directly by a pointer. 
int kmr_save_kvs(KMR_KVS *kvs, void **dataq, size_t *szq, struct kmr_option opt)
Packs locally the contents of a key-value stream to a byte array. 
kmr_trace_entry_t * kmr_trace_add_entry(KMR *mr, kmr_trace_event_t ev, kmr_trace_entry_t *pre, KMR_KVS *kvi, KMR_KVS *kvo)
Add an entry to the trace. 
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks. 
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *identifying_name)
Makes a new KMR context (a context has type KMR). 
Utilities Private Part (do not include from applications). 
int kmr_add_identity_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Adds a given key-value pair unmodified. 
Options to Mapping, Shuffling, and Reduction. 
int kmr_finish_swf(KMR *mr)
Clears the lanes of simple workflow. 
void kmr_ckpt_progress_fin(KMR *)
It finalizes the progress of MapReduce checkpointing. 
static KMR_KVS * kmr_create_raw_kvs(KMR *mr, const KMR_KVS *_similar)
Makes a new key-value stream (type KMR_KVS). 
void kmr_ckpt_restore_ckpt(KMR_KVS *)
It restores checkpoint data to kvs. 
int kmr_map_skipping(long from, long stride, long limit, _Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps by skipping the number of entries. 
int kmr_add_string(KMR_KVS *kvs, const char *k, const char *v)
Adds a key-value pair of strings. 
static int kmr_icmp(const void *a0, const void *a1)
Compares the key field of keyed-records for qsort/bsearch. 
#define kmr_malloc(Z)
Allocates memory, or aborts when failed. 
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes. 
int kmr_sort_locally(KMR_KVS *kvi, KMR_KVS *kvo, _Bool shuffling, struct kmr_option opt)
Reorders key-value pairs in a single rank. 
int kmr_map9(_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m, const char *file, const int line, const char *func)
Maps simply. 
int kmr_ckpt_enabled(KMR *)
Check if checkpoint/restart is enabled. 
static const size_t kmr_kvs_entry_header
Size of an Entry Header. 
Keyed-Record for Sorting. 
int kmr_add_kv_space(KMR_KVS *kvs, const struct kmr_kv_box kv, void **keyp, void **valuep)
Adds a key-value pair, but only allocates a space and returns the pointers to the key and the value p...
long kmr_ckpt_first_unprocessed_kv(KMR *)
It returns the index of the first unprocessed key-value in the input KVS. 
int kmr_move_kvs(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Moves the contents of the input KVI to the output KVO. 
int kmr_map_rank_by_rank(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps sequentially with rank by rank for debugging. 
struct kmr_kvs_entry * kmr_find_kvs_last_entry(KMR_KVS *kvs)
Finds the last entry of a key-value stream. 
void kmr_ckpt_save_kvo_whole(KMR *, KMR_KVS *)
It saves all key-value pairs in the output KVS to a checkpoint data file. 
int kmr_initialize_mpi(int *refargc, char ***refargv)
Checks the initialization state of MPI, and initializes MPI when not. 
int kmr_allgatherv(KMR *mr, _Bool rankzeroonly, void *sbuf, long scnt, void *rbuf, long *rcnts, long *rdsps)
All-gathers data, or gathers data when RANKZEROONLY. 
void kmr_ckpt_save_kvo_block_add(KMR *, KMR_KVS *, long)
It adds a new block of key-value pairs of the output KVS to the checkpoint data file. 
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only. 
kmr_kv_field
Datatypes of Keys or Values. 
int kmr_retrieve_kv_box_entries(KMR_KVS *kvs, struct kmr_kv_box *ev, long n)
Fills local key-value entries in an array of kmr_kv_box for inspection. 
void kmr_isort(void *a, size_t n, size_t es, int depth)
Sorts by comparator on long integers. 
int kmr_take_one(KMR_KVS *kvi, struct kmr_kv_box *kv)
Extracts a single key-value pair locally in the key-value stream KVI. 
int kmr_concatenate_kvs(KMR_KVS *kvs[], int nkvs, KMR_KVS *kvo, struct kmr_option opt)
Concatenates a number of KVSes to one. 
void kmr_ckpt_create_context(KMR *)
Initialize checkpoint context. 
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply. 
Handy Copy of a Key-Value Field. 
int kmr_reduce9(_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r, const char *file, const int line, const char *func)
Reduces key-value pairs. 
Options to Mapping by Spawns. 
void kmr_ckpt_save_kvo_block_fin(KMR *, KMR_KVS *)
It finalizes saving block of key-value pairs of the output KVS to the checkpoint data file...
State during kmr_map_ms(). 
void kmr_ckpt_free_context(KMR *)
Free checkpoint context. 
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs. 
int kmr_ckpt_disable_ckpt(KMR *)
It temporally disables checkpoint/restart. 
int kmr_add_kv_pushoff(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair. 
void kmr_trace_initialize(KMR *mr)
Initialize a trace. 
int kmr_assert_sorted(KMR_KVS *kvi, _Bool locally, _Bool shuffling, _Bool ranking)
Checks a key-value stream is sorted. 
void kmr_trace_finalize(KMR *mr)
Finalize a trace. 
void kmr_ckpt_save_kvo_block_init(KMR *, KMR_KVS *)
It initializes saving blocks of key-value pairs of the output KVS to a checkpoint data file...
int kmr_retrieve_keyed_records(KMR_KVS *kvs, struct kmr_keyed_record *ev, long n, _Bool shuffling, _Bool ranking)
Fills keyed records in an array for sorting. 
#define xassert(X)
Asserts and aborts, but it cannot be disabled. 
#define kmr_realloc(P, Z)
Allocates memory, or aborts when failed. 
int kmr_scan_locally(KMR_KVS *kvi, KMR_KVS *carryin, KMR_KVS *kvo, KMR_KVS *carryout, kmr_redfn_t r)
Scans every key-value with a reduce-function locally (independently on each rank). 
int kmr_retrieve_kvs_entries(KMR_KVS *kvs, struct kmr_kvs_entry **ev, long n)
Fills local key-value entries in an array for inspection. 
int kmr_add_kv_done_pushoff(KMR_KVS *kvs)
Marks finished adding key-value pairs, called from kmr_add_kv_done(). 
Options to Mapping on Files. 
int kmr_fin(void)
Clears the environment. 
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv(). 
int(* kmr_redfn_t)(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg)
Reduce-function Type. 
int kmr_gather_sizes(KMR *mr, long siz, long *rbuf)
Calls all-gather for collecting one long-integer. 
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once. 
int kmr_ckpt_enable_ckpt(KMR *, int)
It temporally enables checkpoint/restart which has been disabled by calling kmr_ckpt_disable_ckpt(). 
void kmr_free_string(char *s)
Frees a string strduped. 
static void kmr_poke_kv(struct kmr_kvs_entry *e, const struct kmr_kv_box kv, struct kmr_kv_box *xkv, const KMR_KVS *kvs, _Bool reserve_space_only)
Stores a key-value pair at the entry E in the store – a reverse of kmr_pick_kv(). 
Information of Source Code Line. 
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type. 
void kmr_ckpt_lock_start(KMR *)
Define the start position of code region that is referred when restart. 
int kmr_copy_mpi_info(MPI_Info src, MPI_Info dst)
Copies contents of MPI_Info. 
KMR_KVS * kmr_create_kvs7(KMR *mr, enum kmr_kv_field kf, enum kmr_kv_field vf, struct kmr_option opt, const char *file, const int line, const char *func)
Makes a new key-value stream with the specified field data-types. 
int kmr_alltoallv(KMR *mr, void *sbuf, long *scounts, long *sdsps, void *rbuf, long *rcounts, long *rdsps)
Does all-to-all-v, but it takes the arguments by long-integers. 
int kmr_restore_kvs(KMR_KVS *kvo, void *data, size_t sz_, struct kmr_option opt)
Unpacks locally the contents of a key-value stream from a byte array. 
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair. 
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context(). 
int kmr_reduce_as_one(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Calls a reduce-function once as if all key-value pairs had the same key. 
void kmr_ckpt_remove_ckpt(KMR_KVS *)
It removes checkpoint data file. 
void kmr_ckpt_lock_finish(KMR *)
Define the end position of code region that is referred when restart. 
int kmr_exchange_sizes(KMR *mr, long *sbuf, long *rbuf)
Calls all-to-all to exchange one long-integer. 
int kmr_legal_minimum_field_size(KMR *mr, enum kmr_kv_field f)
Returns a minimum byte size of the field: 8 for INTEGER and FLOAT8, 0 for others. ...