23 #include "../config.h" 29 const int kmr_version = KMR_H;
31 #define MIN(a,b) (((a)<(b))?(a):(b)) 32 #define MAX(a,b) (((a)>(b))?(a):(b)) 38 #define BLOCK_SIZE (64 * 1024 * 1024) 43 #define MAP_PARK_SIZE (1024) 48 #define PUSHOFF_SIZE (64 * 1024) 55 kmr_assert_on_tail_marker(
KMR_KVS *kvs)
57 if (kvs != 0 && kvs->c.block_count == 1) {
59 size_t netsz = kvs->c.storage_netsize;
61 assert((((intptr_t)e) & 3) == 0
62 && e->klen == -1 && e->vlen == -1);
72 kmr_init_2(
int ignore)
75 assert(
sizeof(
long) ==
sizeof(
size_t)
76 &&
sizeof(
long) ==
sizeof(ssize_t)
77 &&
sizeof(
long) ==
sizeof(off_t)
78 &&
sizeof(
long) ==
sizeof(uint64_t)
79 &&
sizeof(
long) >=
sizeof(intptr_t)
80 &&
sizeof(
long) >=
sizeof(
void *));
81 assert(kmr_check_alignment(offsetof(
struct kmr_kvs_entry, c)));
82 assert(kmr_check_alignment(offsetof(
struct kmr_kvs_block, data)));
84 assert(
sizeof(
struct kmr_option) ==
sizeof(
long)
89 cc = MPI_Type_get_extent(MPI_LONG, &lb, &extent);
90 assert(cc == MPI_SUCCESS);
91 assert(lb == 0 && extent == 8);
96 int tid = omp_get_thread_num();
110 union {
struct kmr_option o;
unsigned long i;} opt0 = {.o = opt};
111 union {
struct kmr_file_option o;
unsigned long i;} fopt0 = {.o = fopt};
112 opt0.o.rank_zero = 0;
113 fopt0.o.shuffle_names = 0;
114 assert(kf == KMR_KV_POINTER_UNMANAGED
115 && opt.rank_zero && fopt.shuffle_names
116 && opt0.i == 0 && fopt0.i == 0);
138 cc = MPI_Initialized(&ok);
139 if (cc == MPI_SUCCESS && ok != 0) {
141 }
else if (cc == MPI_SUCCESS && ok == 0) {
143 cc = MPI_Init_thread(refargc, refargv, MPI_THREAD_SERIALIZED, &thlv);
144 return (cc == MPI_SUCCESS);
169 const char *identifying_name)
173 KMR_DEBUGX(memset(mr, 0,
sizeof(
struct kmr_ctx)));
175 cc = MPI_Comm_size(comm, &mr->nprocs);
176 assert(cc == MPI_SUCCESS);
177 cc = MPI_Comm_rank(comm, &mr->rank);
178 assert(cc == MPI_SUCCESS);
179 cc = MPI_Comm_dup(comm, &mr->comm);
180 if (cc != MPI_SUCCESS) {
181 kmr_error_mpi(mr,
"MPI_Comm_dup", cc);
182 MPI_Abort(MPI_COMM_WORLD, 1);
186 int omp_thrd = omp_get_thread_limit();
190 assert(omp_thrd >= 1);
193 cc = MPI_Query_thread(&mpi_thrd);
194 assert(cc == MPI_SUCCESS);
195 assert(mpi_thrd == MPI_THREAD_SINGLE
196 || mpi_thrd == MPI_THREAD_FUNNELED
197 || mpi_thrd == MPI_THREAD_SERIALIZED
198 || mpi_thrd == MPI_THREAD_MULTIPLE);
199 if (mpi_thrd == MPI_THREAD_SINGLE
200 || mpi_thrd == MPI_THREAD_FUNNELED) {
203 char *s = ((mpi_thrd == MPI_THREAD_SINGLE)
204 ?
"MPI_THREAD_SINGLE" 205 :
"MPI_THREAD_FUNNELED");
206 snprintf(ee,
sizeof(ee),
"Thread support of MPI is low: %s", s);
207 kmr_warning(mr, 1, ee);
214 mr->ckpt_kvs_id_counter = 0;
217 mr->ckpt_selective = 0;
218 mr->ckpt_no_fsync = 0;
228 mr->mapper_park_size = MAP_PARK_SIZE;
229 mr->preset_block_size = BLOCK_SIZE;
230 mr->malloc_overhead = (int)
sizeof(
void *);
232 mr->atoa_threshold = 512;
233 mr->atoa_size_limit = 0;
234 mr->atoa_requests_limit = 0;
236 mr->sort_trivial = 100000;
237 mr->sort_threshold = 100L;
238 mr->sort_sample_factor = 10000;
239 mr->sort_threads_depth = 5;
241 mr->file_io_block_size = (1024 * 1024);
243 mr->rlimit_nofile = -1;
245 mr->pushoff_block_size = PUSHOFF_SIZE;
246 mr->pushoff_poll_rate = 0;
248 #if defined(KMRLIBDIR) 249 mr->kmr_installation_path = KMRLIBDIR;
251 mr->kmr_installation_path = 0;
253 mr->spawn_watch_program = 0;
254 mr->spawn_watch_prefix = 0;
255 mr->spawn_watch_host_name = 0;
256 mr->spawn_max_processes = 0;
257 mr->spawn_watch_af = 4;
258 mr->spawn_watch_port_range[0] = 0;
259 mr->spawn_watch_port_range[1] = 0;
260 mr->spawn_gap_msec[0] = 1000;
261 mr->spawn_gap_msec[1] = 10000;
262 mr->spawn_watch_accept_onhold_msec = (60 * 1000);
264 mr->spawn_self = MPI_COMM_NULL;
265 mr->spawn_retry_limit = 20;
266 mr->spawn_retry_gap_msec = (15 * 1000);
268 mr->simple_workflow = 0;
269 mr->swf_spawner_so = 0;
270 mr->swf_spawner_library = 0;
271 mr->swf_args_size = 0;
273 mr->swf_record_history = 1;
274 mr->swf_debug_master = 0;
279 mr->single_thread = 0;
280 mr->one_step_sort = 0;
282 mr->trace_sorting = 0;
283 mr->trace_file_io = 0;
284 mr->trace_map_ms = 0;
285 mr->trace_map_spawn = 0;
286 mr->trace_alltoall = 0;
290 mr->file_io_dummy_striping = 1;
291 mr->file_io_always_alltoallv = 0;
292 mr->map_ms_use_exec = 0;
293 mr->map_ms_abort_on_signal = 0;
294 mr->spawn_sync_at_startup = 0;
295 mr->spawn_watch_all = 0;
296 mr->spawn_disconnect_early = 0;
297 mr->spawn_disconnect_but_free = 0;
298 mr->spawn_pass_intercomm_in_argument = 0;
299 mr->keep_fds_at_fork = 0;
301 mr->mpi_thread_support = (mpi_thrd == MPI_THREAD_SERIALIZED
302 || mpi_thrd == MPI_THREAD_MULTIPLE);
304 mr->stop_at_some_check_globally = 0;
305 mr->pushoff_hang_out = 0;
306 mr->pushoff_fast_notice = 0;
307 mr->pushoff_stat = 1;
308 memset(&mr->pushoff_statistics, 0,
sizeof(mr->pushoff_statistics));
310 mr->kmrviz_trace = 0;
312 if (identifying_name != 0) {
313 size_t s = strlen(identifying_name);
314 assert(s < KMR_JOB_NAME_LEN);
315 strncpy(mr->identifying_name, identifying_name, KMR_JOB_NAME_LEN);
316 mr->identifying_name[KMR_JOB_NAME_LEN - 1] = 0;
318 mr->identifying_name[0] = 0;
325 cc = MPI_Info_create(&mr->conf);
326 assert(cc == MPI_SUCCESS);
327 cc = kmr_load_preference(mr, mr->conf);
328 assert(cc == MPI_SUCCESS);
329 if (conf != MPI_INFO_NULL) {
331 assert(cc == MPI_SUCCESS);
334 kmr_check_options(mr, mr->conf);
348 kmr_create_context_world()
355 kmr_create_context_ff(
const int fcomm,
const int finfo,
356 const char *identifying_name)
358 MPI_Comm comm = MPI_Comm_f2c(fcomm);
359 MPI_Info info = MPI_Info_f2c(finfo);
370 if (mr->kvses.head != 0 || mr->kvses.tail != 0) {
371 kmr_warning(mr, 1,
"Some key-value streams remain unfreed");
372 for (
KMR_KVS *p = mr->kvses.head; p != 0; p = p->c.link.next) {
373 if (!KMR_KVS_MAGIC_OK(p->c.magic)) {
374 kmr_warning(mr, 1,
"- unfreed kvs in bad state");
375 }
else if (p->c.magic == KMR_KVS_ONCORE) {
376 if (p->c.info_line0.file != 0) {
378 snprintf(ee, 80,
"- kvs allocated at %s:%d: %s",
379 p->c.info_line0.file, p->c.info_line0.line,
380 p->c.info_line0.func);
381 kmr_warning(mr, 1, ee);
384 kmr_warning(mr, 1,
"- unfreed kvs in bad state");
389 if (mr->spawn_self != MPI_COMM_NULL && mr->spawn_self != MPI_COMM_SELF) {
390 cc = MPI_Comm_free(&mr->spawn_self);
391 assert(cc == MPI_SUCCESS);
402 if (mr->log_traces != 0) {
403 cc = fclose(mr->log_traces);
406 char *m = strerror(errno);
407 snprintf(ee,
sizeof(ee),
"Closing log file failed: %s", m);
408 kmr_warning(mr, 1, ee);
413 cc = MPI_Comm_free(&mr->comm);
414 assert(cc == MPI_SUCCESS);
415 if (mr->conf != MPI_INFO_NULL) {
416 cc = MPI_Info_free(&mr->conf);
417 assert(cc == MPI_SUCCESS);
420 if (mr->spawn_watch_program != 0) {
422 mr->spawn_watch_program = 0;
424 assert(mr->spawn_comms == 0);
429 if (mr->simple_workflow != 0) {
431 mr->simple_workflow = 0;
433 assert(mr->swf_spawner_so == 0);
434 if (mr->swf_spawner_library != 0) {
436 mr->swf_spawner_library = 0;
439 kmr_free(mr,
sizeof(
struct kmr_ctx));
444 kmr_get_context_of_kvs(
KMR_KVS const *kvs)
456 KMR_KVS *prev = kvs->c.link.prev;
457 KMR_KVS *next = kvs->c.link.next;
459 prev->c.link.next = next;
461 assert(mr->kvses.head == kvs);
462 mr->kvses.head = next;
465 next->c.link.prev = prev;
467 assert(mr->kvses.tail == kvs);
468 mr->kvses.tail = prev;
481 KMR_DEBUGX(memset(kvs, 0,
sizeof(
KMR_KVS)));
482 kvs->c.magic = KMR_KVS_ONCORE;
487 mr->ckpt_kvs_id_counter++;
488 kvs->c.ckpt_kvs_id = mr->ckpt_kvs_id_counter;
489 kvs->c.ckpt_generated_op = 0;
490 kvs->c.ckpt_consumed_op = 0;
493 kvs->c.key_data = KMR_KV_BAD;
494 kvs->c.value_data = KMR_KV_BAD;
495 kvs->c.element_count = 0;
501 kvs->c.shuffled_in_pushoff = 0;
502 kvs->c._uniformly_sized_ = 0;
504 kvs->c.block_size = (mr->preset_block_size - mr->malloc_overhead);
505 kvs->c.element_size_limit = (kvs->c.block_size / 4);
506 kvs->c.storage_netsize = 0;
507 kvs->c.block_count = 0;
508 kvs->c.first_block = 0;
513 kvs->c.under_threaded_operation = 0;
514 kvs->c.current_block = 0;
515 kvs->c.adding_point = 0;
516 kvs->c.temporary_data = 0;
530 kvs->c.magic = KMR_KVS_ONCORE;
538 kvs->c.key_data = KMR_KV_BAD;
539 kvs->c.value_data = KMR_KV_BAD;
540 kvs->c.element_count = 0;
546 kvs->c.shuffled_in_pushoff = 0;
547 kvs->c._uniformly_sized_ = 0;
549 kvs->c.block_size = (mr->preset_block_size - mr->malloc_overhead);
550 kvs->c.element_size_limit = (kvs->c.block_size / 4);
551 kvs->c.storage_netsize = 0;
552 kvs->c.block_count = 0;
553 kvs->c.first_block = 0;
558 kvs->c.under_threaded_operation = 0;
559 kvs->c.current_block = 0;
560 kvs->c.adding_point = 0;
561 kvs->c.temporary_data = 0;
570 const char *file,
const int line,
const char *func)
573 kvs->c.key_data = kf;
574 kvs->c.value_data = vf;
575 kvs->c.info_line0.file = file;
576 kvs->c.info_line0.func = func;
577 kvs->c.info_line0.line = line;
594 assert(kvi != 0 && kvo != 0
595 && kvi->c.magic == KMR_KVS_ONCORE
596 && kvo->c.magic == KMR_KVS_ONCORE
597 && kvi->c.oncore && kvo->c.oncore);
598 assert(kvi->c.key_data == kvo->c.key_data
599 && kvi->c.value_data == kvo->c.value_data);
600 assert(kvi->c.stowed && !kvo->c.stowed);
606 kvi->c.first_block = 0;
615 kvo->c.stowed = kvi->c.stowed;
616 kvo->c.nogrow = kvi->c.nogrow;
617 kvo->c.sorted = kvi->c.sorted;
618 kvo->c.element_count = kvi->c.element_count;
619 kvo->c.storage_netsize = kvi->c.storage_netsize;
620 kvo->c.block_count = kvi->c.block_count;
621 kvo->c.first_block = kvi->c.first_block;
622 kvo->c.ms = kvi->c.ms;
630 kvi->c.first_block = 0;
633 assert(cc == MPI_SUCCESS);
642 kmr_free_kvs_oncore(
KMR_KVS *kvs)
647 kmr_free(b, b->size);
650 if (kvs->c.ms != 0) {
651 long cnt = kvs->c.element_count;
653 + (sizeof(char) * (size_t)cnt));
654 kmr_free(kvs->c.ms, sz);
656 if (kvs->c.temporary_data != 0) {
657 kmr_free(kvs->c.temporary_data, 0);
659 kvs->c.magic = KMR_KVS_BAD;
681 if (!KMR_KVS_MAGIC_OK(kvs->c.magic)) {
682 kmr_error(0,
"kmr_free_kvs: kvs already freed or corrupted");
687 if (kvs->c.magic == KMR_KVS_ONCORE) {
688 cc = kmr_free_kvs_oncore(kvs);
690 }
else if (kvs->c.magic == KMR_KVS_PUSHOFF) {
691 cc = kmr_free_kvs_pushoff(kvs, 1);
694 assert((kvs->c.magic == KMR_KVS_ONCORE)
695 || (kvs->c.magic == KMR_KVS_PUSHOFF));
712 kmr_allocate_block(
KMR_KVS *kvs,
size_t size)
715 assert(kvs->c.element_count == 0 && kvs->c.storage_netsize == 0
716 && kvs->c.block_count == 0 && kvs->c.first_block == 0
717 && kvs->c.current_block == 0 && kvs->c.adding_point == 0);
722 kvs->c.block_size = 0;
725 }
else if (size == 1) {
727 sz = kvs->c.block_size;
728 assert(kvs->c.nogrow == 0);
730 assert(kvs->c.first_block == 0 && kvs->c.current_block == 0
731 && kvs->c.block_count == 0 && kvs->c.adding_point == 0);
735 kvs->c.block_size = sz;
736 kvs->c.storage_netsize = netsz;
740 kmr_kvs_reset_block(kvs, b, sz, netsz);
741 kmr_kvs_insert_block(kvs, b);
750 kmr_kvs_adjust_adding_point(
KMR_KVS *kvs)
752 if (kvs->c.block_count == 0) {
753 assert(kvs->c.current_block == 0 && kvs->c.adding_point == 0);
755 assert(kvs->c.current_block != 0 && kvs->c.adding_point != 0);
757 assert(kmr_kvs_first_entry(kvs, b) == kvs->c.adding_point);
758 kvs->c.adding_point = kmr_kvs_last_entry_limit(kvs, b);
759 assert(kvs->c.adding_point == kmr_kvs_adding_point(b));
770 struct kmr_kv_box *xkv, _Bool reserve_space_only)
772 kmr_assert_kv_sizes(kvs, kv);
773 assert(!kvs->c.nogrow || kvs->c.storage_netsize != 0);
776 size_t sz = kmr_kvs_entry_size_of_box(kvs, kv);
777 if (sz > (kvs->c.element_size_limit)) {
779 snprintf(ee, 80,
"key-value too large (size=%zd)", sz);
782 if (kvs->c.first_block == 0) {
783 assert(kvs->c.element_count == 0);
784 cc = kmr_allocate_block(kvs, 1);
785 assert(cc == MPI_SUCCESS);
787 if (!kmr_kvs_entry_fits_in_block(kvs, kvs->c.current_block, sz)) {
788 assert(!kvs->c.nogrow);
789 kmr_kvs_mark_entry_tail(kvs->c.adding_point);
790 cc = kmr_allocate_block(kvs, 1);
791 assert(cc == MPI_SUCCESS);
795 if (!kvs->c.nogrow) {
796 kvs->c.storage_netsize += kmr_kvs_entry_netsize(e);
798 kvs->c.current_block->partial_element_count++;
799 kvs->c.current_block->fill_size += kmr_kvs_entry_size(kvs, e);
800 kvs->c.adding_point = kmr_kvs_next_entry(kvs, e);
801 kvs->c.element_count++;
811 kmr_assert_kvs_ok(0, kvs, 0, 1);
813 if (kvs->c.magic == KMR_KVS_ONCORE) {
816 cc = kmr_add_kv_nomutex(kvs, kv, 0, 0);
819 }
else if (kvs->c.magic == KMR_KVS_PUSHOFF) {
826 assert((kvs->c.magic == KMR_KVS_ONCORE)
827 || (kvs->c.magic == KMR_KVS_PUSHOFF));
840 switch (kvs->c.key_data) {
842 xassert(kvs->c.key_data != KMR_KV_BAD);
853 case KMR_KV_POINTER_OWNED:
854 case KMR_KV_POINTER_UNMANAGED:
864 switch (kvs->c.value_data) {
866 xassert(kvs->c.value_data != KMR_KV_BAD);
877 case KMR_KV_POINTER_OWNED:
878 case KMR_KV_POINTER_UNMANAGED:
887 struct kmr_kv_box kv = {.klen = klen, .vlen = vlen, .k = xk, .v = xv};
902 void **keyp,
void **valuep)
904 kmr_assert_kvs_ok(0, kvs, 0, 1);
905 assert(kvs->c.magic == KMR_KVS_ONCORE);
913 cc = kmr_add_kv_nomutex(kvs, kv, &xkv, 1);
916 *keyp = (
void *)xkv.k.p;
919 *valuep = (
void *)xkv.v.p;
927 int cc = kmr_add_kv_nomutex(kvs, kv, 0, 0);
941 kmr_assert_kvs_ok(0, kvs, 0, 1);
942 if (kvs->c.magic == KMR_KVS_ONCORE) {
944 kmr_error(kvs->c.mr,
"kmr_add_kv_done: may be called already");
946 if (kvs->c.element_count == 0) {
947 assert(kvs->c.current_block == 0 && kvs->c.adding_point == 0);
949 assert(kvs->c.current_block != 0 && kvs->c.adding_point != 0);
950 kmr_kvs_mark_entry_tail(kvs->c.adding_point);
953 kvs->c.current_block = 0;
954 kvs->c.adding_point = 0;
955 assert(kvs->c.block_count == 0 || kvs->c.first_block != 0);
956 }
else if (kvs->c.magic == KMR_KVS_PUSHOFF) {
959 assert((kvs->c.magic == KMR_KVS_ONCORE)
960 || (kvs->c.magic == KMR_KVS_PUSHOFF));
973 if (!((kvs->c.key_data == KMR_KV_OPAQUE
974 || kvs->c.key_data == KMR_KV_CSTRING)
975 && (kvs->c.value_data == KMR_KV_OPAQUE
976 || kvs->c.value_data == KMR_KV_CSTRING))) {
978 "key-value data-types need be opaque for strings");
980 size_t klen = (strlen(k) + 1);
981 size_t vlen = (strlen(v) + 1);
982 assert(klen <= INT_MAX && vlen <= INT_MAX);
1006 kmr_collapse_as_opaque(
KMR_KVS *kvi,
KMR_KVS *kvo, _Bool inspectp)
1008 assert(kvi != 0 && kvo != 0);
1009 assert(kmr_fields_pointer_p(kvi) || kvi->c.block_count > 1);
1011 cc = kmr_allocate_block(kvo, kvi->c.storage_netsize);
1012 assert(cc == MPI_SUCCESS);
1013 struct kmr_option collapse = {.collapse = 1, .inspect = inspectp};
1015 assert(cc == MPI_SUCCESS);
1030 kmr_error_at_site(0,
"Null input kvs", 0);
1031 }
else if (!KMR_KVS_MAGIC_OK(kvs->c.magic)) {
1032 kmr_error_at_site(0,
"Bad input kvs (freed or corrupted)", 0);
1034 assert(kvs->c.magic == KMR_KVS_ONCORE);
1035 kmr_check_fn_options(kvs->c.mr, kmr_noopt, opt, __func__);
1037 if (kvs->c.ms != 0 || kvs->c.temporary_data != 0) {
1038 kmr_warning(kvs->c.mr, 5,
1039 "Some fields in KVS may be lost in saved image");
1042 if (kmr_fields_pointer_p(kvs) || (kvs->c.block_count > 1)) {
1043 enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvs->c.key_data);
1044 enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvs->c.value_data);
1046 cc = kmr_collapse_as_opaque(kvs, kvs1, 0);
1047 assert(cc == MPI_SUCCESS);
1048 assert(!kmr_fields_pointer_p(kvs1) && kvs->c.block_count <= 1);
1050 assert(cc == MPI_SUCCESS);
1054 assert(!kmr_fields_pointer_p(kvs));
1055 size_t netsz = kvs->c.storage_netsize;
1057 size_t sz = (
sizeof(
KMR_KVS) + blocksz);
1058 unsigned char *b = malloc(sz);
1060 return MPI_ERR_BUFFER;
1064 memcpy(h, kvs,
sizeof(
KMR_KVS));
1065 h->c.magic = KMR_KVS_ONCORE_PACKED;
1069 h->c.block_count = 1;
1070 h->c.first_block = 0;
1071 h->c.current_block = 0;
1072 h->c.adding_point = 0;
1074 h->c.temporary_data = 0;
1075 if (kvs->c.block_count == 0) {
1077 }
else if (kvs->c.block_count == 1) {
1078 memcpy(s, kvs->c.first_block, blocksz);
1095 assert(kvo != 0 && kvo->c.magic == KMR_KVS_ONCORE);
1096 kmr_check_fn_options(kvo->c.mr, kmr_noopt, opt, __func__);
1098 unsigned char *b = data;
1100 unsigned char *s = (b +
sizeof(
KMR_KVS));
1101 if (h->c.magic != KMR_KVS_ONCORE_PACKED) {
1102 kmr_warning(kvo->c.mr, 1,
"Bad packed data, magic mismatch");
1103 return MPI_ERR_TYPE;
1105 size_t netsz = h->c.storage_netsize;
1107 cc = kmr_allocate_block(kvo, netsz);
1108 assert(cc == MPI_SUCCESS);
1110 memcpy(kvo->c.first_block, s, blocksz);
1112 kvo->c.key_data = h->c.key_data;
1113 kvo->c.value_data = h->c.value_data;
1114 assert(kvo->c.sorted == 0);
1115 kvo->c.element_count = h->c.element_count;
1116 kmr_kvs_adjust_adding_point(kvo);
1128 kmr_map_parked(
struct kmr_kv_box *ev,
long evcnt,
long mapcount,
1129 _Bool k_reclaim, _Bool v_reclaim,
1134 KMR *mr = kvi->c.mr;
1135 long cnt = kvi->c.element_count;
1136 if (mr->single_thread || opt.nothreading) {
1137 for (
long i = 0; i < evcnt; i++) {
1138 double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
1139 cc = (*m)(ev[i], kvi, kvo, arg, (mapcount + i));
1140 double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
1141 if (cc != MPI_SUCCESS) {
1143 snprintf(ee,
sizeof(ee),
1144 "Map-fn returned with error cc=%d", cc);
1147 if (mr->log_traces != 0) {
1148 kmr_log_map(mr, kvi, &ev[i], (mapcount + 1), cnt,
1154 kvo->c.under_threaded_operation = 1;
1156 KMR_OMP_PARALLEL_FOR_
1157 for (
long i = 0; i < evcnt; i++) {
1158 double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
1159 int ccx = (*m)(ev[i], kvi, kvo, arg, (mapcount + i));
1160 double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
1161 if (ccx != MPI_SUCCESS) {
1163 snprintf(ee,
sizeof(ee),
1164 "Map-fn returned with error cc=%d", ccx);
1167 if (mr->log_traces != 0) {
1168 kmr_log_map(mr, kvi, &ev[i], (mapcount + 1), cnt,
1173 kvo->c.under_threaded_operation = 0;
1176 for (
long i = 0; i < evcnt; i++) {
1178 kmr_free((
void *)ev[i].k.p, (
size_t)ev[i].klen);
1181 kmr_free((
void *)ev[i].v.p, (
size_t)ev[i].vlen);
1193 _Bool stop_when_some_added,
1197 kmr_assert_kvs_ok(kvi, kvo, 1, 0);
1198 assert(from >= 0 && stride > 0 && limit >= 0);
1199 assert(kvi->c.current_block == 0);
1200 limit = ((limit != 0) ? limit : LONG_MAX);
1201 KMR *mr = kvi->c.mr;
1205 if (kvo != 0 && !opt.keep_open) {
1219 if (mr->step_sync) {
1220 cc = MPI_Barrier(mr->comm);
1221 assert(MPI_SUCCESS);
1223 if (kvo != 0 && opt.collapse) {
1224 assert(!kmr_fields_pointer_p(kvo));
1226 _Bool k_reclaim = (!opt.inspect && (kmr_key_pointer_p(kvi)));
1227 _Bool v_reclaim = (!opt.inspect && (kmr_value_pointer_p(kvi)));
1228 long evsz = mr->mapper_park_size;
1233 long nextindex = from;
1235 kvi->c.current_block = kvi->c.first_block;
1236 while (index < kvi->c.element_count) {
1237 assert(kvi->c.current_block != 0);
1240 for (
int i = 0; i < b->partial_element_count; i++) {
1242 if (index == nextindex && index < limit) {
1244 nextindex = (index + stride);
1256 kmr_free((
void *)w->p, (
size_t)e->klen);
1261 kmr_free((
void *)w->p, (
size_t)e->vlen);
1265 if (evcnt >= evsz) {
1266 cc = kmr_map_parked(ev, evcnt, mapcount, k_reclaim, v_reclaim,
1267 kvi, kvo, m, arg, opt);
1268 assert(cc == MPI_SUCCESS);
1277 if (stop_when_some_added) {
1279 if (mr->stop_at_some_check_globally) {
1282 done = (kvo->c.element_count != 0);
1286 index = (kvi->c.element_count - 1);
1287 while (b->next != 0) {
1293 e = kmr_kvs_next(kvi, e, 1);
1296 kvi->c.current_block = b->next;
1298 assert(kvi->c.current_block == 0);
1300 cc = kmr_map_parked(ev, evcnt, mapcount, k_reclaim, v_reclaim,
1301 kvi, kvo, m, arg, opt);
1302 assert(cc == MPI_SUCCESS);
1311 if (kvo != 0 && !opt.keep_open) {
1323 kmr_free(ev, (
sizeof(
struct kmr_kv_box) * (
size_t)evsz));
1350 const char *file,
const int line,
const char *func)
1353 kmr_assert_kvs_ok(kvi, kvo, 1, 0);
1354 KMR *mr = kvi->c.mr;
1355 struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
1356 .keep_open = 1, .collapse = 1,
1358 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
1361 if (mr->kmrviz_trace) {
1365 if (mr->atwork == 0) {
1374 kvi, kvo, arg, opt, m);
1376 assert(!opt.inspect && !opt.keep_open);
1380 if (mr->atwork == &info) {
1383 if (mr->kmrviz_trace) {
1400 KMR *mr = kvi->c.mr;
1401 int nprocs = mr->nprocs;
1403 if (mr->rank != 0) {
1404 cc = MPI_Recv(0, 0, MPI_INT, (mr->rank - 1),
1405 KMR_TAG_MAP_BY_RANK, mr->comm, MPI_STATUS_IGNORE);
1406 assert(cc == MPI_SUCCESS);
1408 cc =
kmr_map(kvi, kvo, arg, opt, m);
1409 assert(cc == MPI_SUCCESS);
1411 if (mr->rank != (nprocs - 1)) {
1413 cc = MPI_Send(0, 0, MPI_INT, (mr->rank + 1),
1414 KMR_TAG_MAP_BY_RANK, mr->comm);
1415 assert(cc == MPI_SUCCESS);
1429 kmr_assert_kvs_ok(kvi, 0, 1, 0);
1430 assert(kvi->c.current_block == 0);
1431 KMR *mr = kvi->c.mr;
1432 kvi->c.current_block = kvi->c.first_block;
1433 if (kvi->c.element_count == 1) {
1434 assert(kvi->c.current_block != 0);
1437 assert(b->partial_element_count == 1);
1440 kvi->c.current_block = 0;
1443 if (kvi->c.element_count == 0) {
1444 kmr_warning(mr, 1,
"kmr_take_one for no entries");
1447 kmr_warning(mr, 1,
"kmr_take_one for multiple entries");
1450 MPI_Abort(MPI_COMM_WORLD, 1);
1463 kmr_assert_kvs_ok(0, kvo, 0, 1);
1464 KMR *mr = kvo->c.mr;
1465 struct kmr_option kmr_supported = {.keep_open = 1, .take_ckpt = 1};
1466 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
1467 int rank = mr->rank;
1471 if (mr->kmrviz_trace) {
1482 if (!rank_zero_only || rank == 0) {
1483 struct kmr_kv_box kv = {.klen = 0, .vlen = 0, .k.i = 0, .v.i = 0};
1484 cc = (*m)(kv, 0, kvo, arg, 0);
1485 if (cc != MPI_SUCCESS) {
1487 snprintf(ee,
sizeof(ee),
1488 "Map-fn returned with error cc=%d", cc);
1492 if (!opt.keep_open) {
1494 assert(cc == MPI_SUCCESS);
1501 if (mr->kmrviz_trace) {
1519 assert(cc == MPI_SUCCESS);
1529 static inline unsigned long 1530 kmr_hash_key_opaque(
const unsigned char *p,
int n)
1532 #define ROT(X,N) ((X) << (N) | (X) >> (64-(N))) 1533 #define KEY(V) (k = (V), k *= 0x87c37b91114253d5UL, \ 1534 k = ROT(k, 31), k *= 0x4cf5ad432745937fUL) 1535 #define MIX() (h ^= k, h = ROT(h, 31), h = h * 5 + 0xe6546b64) 1536 #define FIN() (h ^= (h >> 33), h *= 0xff51afd7ed558ccdUL, h ^= (h >> 33), \ 1537 h *= 0xc4ceb9fe1a85ec53UL, h ^= (h >> 33)) 1538 unsigned long h = 0x85ebca6bUL;
1540 const unsigned long *v = (
void *)p;
1542 int rn = (n - (8 * n8));
1543 const unsigned char *r = &p[(8 * n8)];
1544 for (
int i = 0; i < n8; i++) {
1548 union {
unsigned long i;
unsigned char c[8];} u = {.i = 0UL};
1549 for (
int i = 0; i < rn; i++) {
1566 static inline unsigned long 1567 kmr_hash_key_opaque(
const unsigned char *p,
int n)
1569 unsigned long hash = 0;
1570 for (i = 0 ; i < n ; i++) {
1581 static inline unsigned long 1582 kmr_hash_key_opaque(
const unsigned char *p,
int n)
1584 unsigned long hash = 0;
1588 for (i = 0 ; i < n ; i++) {
1595 for (i = 0 ; i < n ; i += k) {
1610 static inline signed long 1613 switch (kvs->c.key_data) {
1615 xassert(kvs->c.key_data != KMR_KV_BAD);
1618 case KMR_KV_CSTRING:
1619 return (
signed long)kmr_hash_key_opaque((
void *)kv.k.p, kv.klen);
1620 case KMR_KV_INTEGER:
1624 case KMR_KV_POINTER_OWNED:
1625 case KMR_KV_POINTER_UNMANAGED:
1626 xassert(kvs->c.key_data != KMR_KV_POINTER_OWNED
1627 && kvs->c.key_data != KMR_KV_POINTER_UNMANAGED);
1640 kmr_stable_key_opaque(
const struct kmr_kv_box kv)
1642 unsigned char *p = (
unsigned char *)kv.k.p;
1644 unsigned long hash = 0;
1645 for (
int i = 0; i < (int)
sizeof(
long); i++) {
1646 unsigned char v = ((i < n) ? p[i] : 0);
1647 hash = ((hash << 8) + v);
1649 return (
long)(hash >> 1);
1660 switch (kvs->c.key_data) {
1662 xassert(kvs->c.key_data != KMR_KV_BAD);
1665 case KMR_KV_CSTRING:
1666 return kmr_stable_key_opaque(kv);
1667 case KMR_KV_INTEGER:
1672 long v1 = ((v0 >= 0L) ? v0 : ((-v0) | (1L << 63)));
1676 case KMR_KV_POINTER_OWNED:
1677 case KMR_KV_POINTER_UNMANAGED:
1678 xassert(kvs->c.key_data != KMR_KV_POINTER_OWNED
1679 && kvs->c.key_data != KMR_KV_POINTER_UNMANAGED);
1693 unsigned int nprocs = (
unsigned int)kvs->c.mr->nprocs;
1694 unsigned long v = (
unsigned long)kmr_hash_key(kvs, kv);
1695 unsigned int h = (((v >> 32) ^ v) & ((1L << 32) - 1));
1696 return (
int)(h % nprocs);
1701 #define KMR_CMP3(X, Y) (((X) == (Y)) ? 0 : ((X) < (Y)) ? -1 : 1) 1708 kmr_compare_lexicographically(
const unsigned char *p,
const int plen,
1709 const unsigned char *q,
const int qlen)
1711 int s = MIN(plen, qlen);
1713 for (
int i = 0; i < s; i++) {
1715 return (p[i] - q[i]);
1719 int cc = memcmp(p, q, (
size_t)s);
1723 return (plen - qlen);
1730 kmr_compare_opaque(
const struct kmr_kv_box *p,
1733 return kmr_compare_lexicographically((
unsigned char *)p->k.p, p->klen,
1734 (
unsigned char *)q->k.p, q->klen);
1738 kmr_compare_integer(
const struct kmr_kv_box *p0,
1741 return KMR_CMP3(p0->k.i, p1->k.i);
1745 kmr_compare_float8(
const struct kmr_kv_box *p0,
1748 return KMR_CMP3(p0->k.d, p1->k.d);
1757 struct kmr_kv_box b0 = kmr_pick_kv2(p0->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1758 struct kmr_kv_box b1 = kmr_pick_kv2(p1->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1759 return kmr_compare_lexicographically((
unsigned char *)b0.k.p, b0.klen,
1760 (
unsigned char *)b1.k.p, b1.klen);
1771 struct kmr_kv_box b0 = kmr_pick_kv2(p0->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1772 struct kmr_kv_box b1 = kmr_pick_kv2(p1->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1775 return KMR_CMP3(v0, v1);
1786 struct kmr_kv_box b0 = kmr_pick_kv2(p0->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1787 struct kmr_kv_box b1 = kmr_pick_kv2(p1->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1790 return KMR_CMP3(v0, v1);
1796 kmr_choose_sorter(
const KMR_KVS *kvs)
1798 switch (kvs->c.key_data) {
1800 xassert(kvs->c.key_data != KMR_KV_BAD);
1802 case KMR_KV_INTEGER:
1803 return kmr_compare_integer;
1805 return kmr_compare_float8;
1807 case KMR_KV_CSTRING:
1808 case KMR_KV_POINTER_OWNED:
1809 case KMR_KV_POINTER_UNMANAGED:
1810 return kmr_compare_opaque;
1820 static kmr_record_sorter_t
1821 kmr_choose_record_sorter(
const KMR_KVS *kvs)
1823 switch (kvs->c.key_data) {
1825 xassert(kvs->c.key_data != KMR_KV_BAD);
1827 case KMR_KV_INTEGER:
1829 return kmr_compare_record_integer_;
1832 return kmr_compare_record_float8_;
1834 case KMR_KV_CSTRING:
1835 case KMR_KV_POINTER_OWNED:
1836 case KMR_KV_POINTER_UNMANAGED:
1837 return kmr_compare_record_opaque;
1850 kmr_copy_record_shuffle_fn(
const struct kmr_kv_box kv,
1861 kmr_copy_record_sorting_fn(
const struct kmr_kv_box kv,
1879 return KMR_CMP3(v0, v1);
1886 kmr_assert_kvs_ok(kvi, kvo, 1, 1);
1887 assert(kmr_shuffle_compatible_p(kvo, kvi));
1888 KMR *mr = kvi->c.mr;
1891 if (mr->kmrviz_trace) {
1904 _Bool twostep = !mr->one_step_sort;
1905 _Bool primekey = ((kvi->c.key_data == KMR_KV_INTEGER)
1906 || (kvi->c.key_data == KMR_KV_FLOAT8));
1907 double timestamp[5];
1909 long cnt = kvi->c.element_count;
1910 timestamp[0] = MPI_Wtime();
1916 .nothreading = opt.nothreading
1919 cc =
kmr_map(kvi, 0, ev, inspect, kmr_copy_record_shuffle_fn);
1920 assert(cc == MPI_SUCCESS);
1922 cc =
kmr_map(kvi, 0, ev, inspect, kmr_copy_record_sorting_fn);
1923 assert(cc == MPI_SUCCESS);
1927 assert(cc == MPI_SUCCESS);
1929 timestamp[1] = MPI_Wtime();
1930 if (shuffling || twostep || primekey) {
1931 if (mr->single_thread || opt.nothreading) {
1935 mr->sort_threads_depth);
1938 timestamp[2] = MPI_Wtime();
1939 if (!shuffling && !primekey) {
1941 long *runs =
kmr_malloc(
sizeof(
long) * (
size_t)cnt);
1951 cc = KMR_CMP3(ev[i - 1].v, ev[i].v);
1953 assert(nruns < cnt);
1957 assert(i == cnt && (cnt == 0 || runs[nruns - 1] == cnt));
1959 nruns = (cnt == 0 ? 0 : 1);
1962 kmr_record_sorter_t cmp1 = kmr_choose_record_sorter(kvi);
1963 if (mr->single_thread || opt.nothreading) {
1964 for (
long k = 0; k < nruns; k++) {
1965 long j = (k == 0 ? 0 : runs[k - 1]);
1969 qsort(&ev[j], (
size_t)(i - j),
1971 (kmr_qsorter_t)cmp1);
1975 KMR_OMP_PARALLEL_FOR_
1976 for (
long k = 0; k < nruns; k++) {
1977 long j = (k == 0 ? 0 : runs[k - 1]);
1981 qsort(&ev[j], (
size_t)(i - j),
1983 (kmr_qsorter_t)cmp1);
1987 kmr_free(runs, (
sizeof(
long) * (
size_t)cnt));
1989 timestamp[3] = MPI_Wtime();
1990 size_t sz = kvi->c.storage_netsize;
1991 cc = kmr_allocate_block(kvo, sz);
1992 assert(cc == MPI_SUCCESS);
1994 for (
long i = 0 ; i < cnt; i++) {
1996 kmr_add_kv_nomutex(kvo, kv, 0, 0);
1998 timestamp[4] = MPI_Wtime();
1999 assert(sz == 0 || kmr_kvs_entry_tail_p(kvo->c.adding_point));
2000 assert(sz == 0 || kvo->c.block_count == 1);
2002 kmr_assert_on_tail_marker(kvo);
2005 _Bool tracing = mr->trace_sorting;
2006 if (tracing && (5 <= mr->verbosity)) {
2007 fprintf(stderr, (
";;KMR [%05d] kmr_sort_locally" 2008 " time=(%f %f %f %f) (msec)\n"),
2010 ((timestamp[1] - timestamp[0]) * 1e3),
2011 ((timestamp[2] - timestamp[1]) * 1e3),
2012 ((timestamp[3] - timestamp[2]) * 1e3),
2013 ((timestamp[4] - timestamp[3]) * 1e3));
2026 if (mr->kmrviz_trace) {
2054 kmr_assert_kvs_ok(kvi, kvo, 1, 1);
2055 assert(kmr_shuffle_compatible_p(kvo, kvi));
2056 struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
2058 kmr_check_fn_options(kvi->c.mr, kmr_supported, opt, __func__);
2059 _Bool ranking = opt.key_as_rank;
2060 kmr_sort_locally_lo(kvi, kvo, shuffling, ranking, opt);
2071 kmr_count_entries(
KMR_KVS *kvs, _Bool bound_in_block)
2073 kvs->c.current_block = kvs->c.first_block;
2074 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2080 e = kmr_kvs_next(kvs, e, bound_in_block);
2096 kmr_assert_kvs_ok(kvi, kvo, 1, 1);
2097 assert(kmr_shuffle_compatible_p(kvo, kvi));
2098 KMR *mr = kvi->c.mr;
2099 struct kmr_option kmr_supported = {.inspect = 1, .key_as_rank = 1,
2101 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
2102 _Bool ranking = opt.key_as_rank;
2105 if (mr->kmrviz_trace) {
2112 if (kvi->c.magic == KMR_KVS_PUSHOFF) {
2113 kmr_pushoff_make_stationary(kvi);
2115 if (kvi->c.shuffled_in_pushoff) {
2116 assert(!mr->ckpt_enable);
2134 enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvi->c.key_data);
2135 enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvi->c.value_data);
2139 kmr_sort_locally_lo(kvi, kvs1, 1, ranking, n_opt);
2140 assert(kvs1->c.stowed);
2143 assert(!kmr_fields_pointer_p(kvs1));
2144 assert(kvs1->c.block_count <= 1);
2147 int nprocs = mr->nprocs;
2148 long cnt = kvs1->c.element_count;
2149 long *ssz =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
2150 long *sdp =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
2151 long *rsz =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
2152 long *rdp =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
2153 for (
int r = 0; r < nprocs; r++) {
2158 assert(kvs1->c.current_block == 0);
2159 kvs1->c.current_block = kvs1->c.first_block;
2160 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs1, kvs1->c.first_block);
2161 for (
long i = 0; i < cnt; i++) {
2164 int r = (ranking ? (int)kv.k.i : kmr_pitch_rank(kv, kvs1));
2165 assert(0 <= r && r < nprocs);
2166 if (ranking && !(0 <= r && r < nprocs)) {
2167 kmr_error(mr,
"key entries are not ranks");
2170 kmr_error(mr,
"key-value entries are not sorted (internal error)");
2172 ssz[r] += (long)kmr_kvs_entry_netsize(e);
2174 e = kmr_kvs_next(kvs1, e, 0);
2178 assert(cc == MPI_SUCCESS);
2181 for (
int r = 0; r < nprocs; r++) {
2187 cc = kmr_allocate_block(kvo, (
size_t)recvsz);
2188 assert(cc == MPI_SUCCESS);
2194 assert(cc == MPI_SUCCESS);
2195 long ocnt = kmr_count_entries(kvo, 1);
2196 assert(kvo->c.sorted == 0);
2197 kvo->c.element_count = ocnt;
2199 assert(kvo->c.block_count == 1);
2200 rb->partial_element_count = ocnt;
2201 rb->fill_size = (size_t)recvsz;
2203 kmr_kvs_adjust_adding_point(kvo);
2215 assert(kvo->c.element_count == 0 || kvo->c.storage_netsize != 0);
2216 xassert(!kmr_fields_pointer_p(kvo));
2218 kmr_free(ssz, (
sizeof(
long) * (
size_t)nprocs));
2219 kmr_free(sdp, (
sizeof(
long) * (
size_t)nprocs));
2220 kmr_free(rsz, (
sizeof(
long) * (
size_t)nprocs));
2221 kmr_free(rdp, (
sizeof(
long) * (
size_t)nprocs));
2225 if (mr->kmrviz_trace) {
2242 kmr_assert_kvs_ok(kvi, kvo, 1, 1);
2243 KMR *mr = kvi->c.mr;
2244 struct kmr_option kmr_supported = {.inspect = 1, .rank_zero = 1,
2246 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
2247 int nprocs = mr->nprocs;
2248 int rank = mr->rank;
2253 if (mr->kmrviz_trace) {
2258 if (kmr_fields_pointer_p(kvi) || kvi->c.block_count > 1) {
2264 cc = kmr_collapse_as_opaque(kvi, kvs1, 1);
2265 assert(cc == MPI_SUCCESS);
2270 kmr_assert_on_tail_marker(kvs1);
2271 assert(kvs1->c.block_count <= 1);
2278 assert(cc == MPI_SUCCESS);
2282 assert(cc == MPI_SUCCESS);
2288 long *rsz =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
2289 long *rdp =
kmr_malloc(
sizeof(
long) * (
size_t)nprocs);
2291 long ssz = (long)kvs1->c.storage_netsize;
2293 assert(cc == MPI_SUCCESS);
2295 if (!opt.rank_zero || rank == 0) {
2296 for (
int r = 0; r < nprocs; r++) {
2301 if (!(kvo->c.key_data == kvs1->c.key_data
2302 && kvo->c.value_data == kvs1->c.value_data)) {
2303 kmr_error(mr,
"key-data or value-data types mismatch");
2305 cc = kmr_allocate_block(kvo, (
size_t)recvsz);
2306 assert(cc == MPI_SUCCESS);
2311 cc =
kmr_allgatherv(mr, opt.rank_zero, sbuf, ssz, rbuf, rsz, rdp);
2312 assert(cc == MPI_SUCCESS);
2313 assert(kvo->c.element_count == 0);
2314 long ocnt = kmr_count_entries(kvo, 1);
2315 kvo->c.element_count = ocnt;
2317 rb->partial_element_count = ocnt;
2318 rb->fill_size = (size_t)recvsz;
2320 kmr_kvs_adjust_adding_point(kvo);
2322 assert(cc == MPI_SUCCESS);
2323 kmr_assert_on_tail_marker(kvo);
2324 assert(kvo->c.element_count == 0 || kvo->c.storage_netsize != 0);
2332 assert(cc == MPI_SUCCESS);
2336 assert(cc == MPI_SUCCESS);
2339 kmr_free(rsz, (
sizeof(
long) * (
size_t)nprocs));
2340 kmr_free(rdp, (
sizeof(
long) * (
size_t)nprocs));
2345 if (mr->kmrviz_trace) {
2358 kmr_copy_record_fn(
const struct kmr_kv_box kv,
2374 xassert(kvi->c.current_block == 0);
2375 kmr_sorter_t cmp = kmr_choose_sorter(kvi);
2376 KMR *mr = kvi->c.mr;
2377 long cnt = kvi->c.element_count;
2381 kvi->c.current_block = kvi->c.first_block;
2382 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, kvi->c.first_block);
2394 while (index < start_from) {
2395 e = kmr_kvs_next(kvi, e, 1);
2405 assert(index < cnt);
2406 e = kmr_kvs_next(kvi, e, 1);
2416 cc = (*cmp)(&kv0, &kv1);
2422 assert(ej == 0 && e == 0);
2432 for (
long i = 0; i < n; i++) {
2435 e = kmr_kvs_next(kvi, e, 1);
2438 double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
2439 cc = (*r)(ev, n, kvi, kvo, arg);
2440 double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
2441 if (cc != MPI_SUCCESS) {
2443 snprintf(ee,
sizeof(ee),
2444 "Reduce-fn returned with error cc=%d", cc);
2447 if (mr->log_traces != 0) {
2448 kmr_log_reduce(mr, kvi, ev, n, r, (t1 - t0));
2457 assert(index == cnt);
2465 kmr_free(ev, (
sizeof(
struct kmr_kv_box) * (
size_t)evsz));
2471 kmr_reduce_threading(_Bool stop_when_some_added,
2486 .nothreading = opt.nothreading
2488 assert(kvi->c.current_block == 0);
2489 long cnt = kvi->c.element_count;
2493 cc =
kmr_map(kvi, 0, ev, inspect, kmr_copy_record_fn);
2494 assert(cc == MPI_SUCCESS);
2497 kmr_sorter_t cmp = kmr_choose_sorter(kvi);
2498 long *runs =
kmr_malloc(
sizeof(
long) * (
size_t)cnt);
2514 cc = (*cmp)(&ev[i - 1], &ev[i]);
2517 assert(nruns < cnt);
2521 assert(i == cnt && (cnt == 0 || runs[nruns - 1] == cnt));
2525 kvo->c.under_threaded_operation = 1;
2527 KMR *mr = kvi->c.mr;
2529 KMR_OMP_PARALLEL_FOR_
2530 for (
long k = 0; k < nruns; k++) {
2533 long j = (k == 0 ? 0 : runs[k - 1]);
2536 double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
2537 int ccx = (*r)(&ev[j], (i - j), kvi, kvo, arg);
2538 double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
2539 if (ccx != MPI_SUCCESS) {
2541 snprintf(ee,
sizeof(ee),
2542 "Reduce-fn returned with error cc=%d", ccx);
2545 if (mr->log_traces != 0) {
2546 kmr_log_reduce(mr, kvi, ev, (i - j), r, (t1 - t0));
2556 if (stop_when_some_added) {
2558 if (mr->stop_at_some_check_globally) {
2561 done = (kvo->c.element_count != 0);
2573 kvo->c.under_threaded_operation = 0;
2585 kmr_free(runs, (
sizeof(
long) * (
size_t)cnt));
2586 kmr_free(ev, (
sizeof(
struct kmr_kv_box) * (
size_t)cnt));
2610 const char *file,
const int line,
const char *func)
2612 kmr_assert_kvs_ok(kvi, kvo, 1, 0);
2613 KMR *mr = kvi->c.mr;
2614 struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
2616 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
2617 struct kmr_option i_opt = kmr_copy_options_i_part(opt);
2618 struct kmr_option o_opt = kmr_copy_options_o_part(opt);
2621 if (mr->kmrviz_trace) {
2631 enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvi->c.key_data);
2632 enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvi->c.value_data);
2637 kmr_sort_locally_lo(kvi, kvs1, 0, 0, i_opt);
2643 if (mr->atwork == 0) {
2650 if (mr->single_thread || opt.nothreading) {
2651 cc = kmr_reduce_nothreading(kvs1, kvo, arg, o_opt, r);
2653 cc = kmr_reduce_threading(stop_when_some_added,
2654 kvs1, kvo, arg, o_opt, r);
2656 if (mr->atwork == &info) {
2661 kmr_assert_on_tail_marker(kvs1);
2668 if (mr->kmrviz_trace) {
2686 kmr_assert_kvs_ok(kvi, kvo, 1, 0);
2687 KMR *mr = kvi->c.mr;
2688 assert(kvi->c.current_block == 0);
2689 struct kmr_option kmr_supported = {.inspect = 1, .take_ckpt = 1};
2690 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
2705 long cnt = kvi->c.element_count;
2713 cc =
kmr_map(kvi, 0, ev, inspect, kmr_copy_record_fn);
2714 assert(cc == MPI_SUCCESS);
2716 cc = (*r)(&ev[0], cnt, kvi, kvo, arg);
2717 if (cc != MPI_SUCCESS) {
2719 snprintf(ee,
sizeof(ee),
2720 "Reduce-fn returned with error cc=%d", cc);
2733 kmr_free(ev, (
sizeof(
struct kmr_kv_box) * (
size_t)cnt));
2757 for (
int i = 0; i < nkvs; i++) {
2758 kmr_assert_i_kvs_ok(kvs[i], 1);
2760 kmr_assert_o_kvs_ok(kvo, 1);
2761 if (kvo->c.element_count > 0) {
2762 KMR *mr = kvo->c.mr;
2763 kmr_error(mr,
"kmr_concatenate_kvs: Output kvs has entries");
2765 kmr_check_fn_options(kvo->c.mr, kmr_noopt, opt, __func__);
2773 for (
int i = 0; i < nkvs; i++) {
2774 elements += kvs[i]->c.element_count;
2775 netsize += kvs[i]->c.storage_netsize;
2776 blocks += kvs[i]->c.block_count;
2780 kvs[i]->c.first_block = 0;
2782 assert(storage == 0);
2786 assert(blocks != 0 && p->next == 0);
2791 while (p->next != 0) {
2798 kvo->c.first_block = storage;
2799 kvo->c.element_count = elements;
2800 kvo->c.storage_netsize = netsize;
2801 kvo->c.block_count = blocks;
2805 kvo->c.current_block = 0;
2806 kvo->c.adding_point = 0;
2807 assert(kvo->c.block_count == 0 || kvo->c.first_block != 0);
2819 kmr_assert_kvs_ok(kvs, 0, 1, 0);
2820 assert(kvs->c.magic == KMR_KVS_ONCORE);
2822 long cnt = kvs->c.element_count;
2823 kvs->c.current_block = kvs->c.first_block;
2824 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2826 for (
long i = 0; i < cnt && e != 0; i++) {
2828 e = kmr_kvs_next(kvs, e, 0);
2830 kvs->c.current_block = 0;
2833 if (kvs->c.element_count == 0) {
2837 for (b = kvs->c.first_block; b->next != 0; b = b->next);
2838 kvs->c.current_block = b;
2841 e = kmr_kvs_first_entry(kvs, b);
2843 long cnt = b->partial_element_count;
2844 for (
long i = 0; i < cnt && e != 0; i++) {
2846 e = kmr_kvs_next(kvs, e, 1);
2848 kvs->c.current_block = 0;
2862 kmr_assert_kvs_ok(kvs, 0, 1, 0);
2863 assert(kvs->c.magic == KMR_KVS_ONCORE);
2864 long cnt = MIN(n, kvs->c.element_count);
2865 kvs->c.current_block = kvs->c.first_block;
2866 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2867 for (
long i = 0; i < cnt && e != 0; i++) {
2869 e = kmr_kvs_next(kvs, e, 0);
2871 kvs->c.current_block = 0;
2885 kmr_assert_kvs_ok(kvs, 0, 1, 0);
2886 assert(kvs->c.magic == KMR_KVS_ONCORE);
2887 long cnt = MIN(n, kvs->c.element_count);
2888 kvs->c.current_block = kvs->c.first_block;
2889 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2890 for (
long i = 0; i < cnt && e != 0; i++) {
2892 e = kmr_kvs_next(kvs, e, 0);
2894 kvs->c.current_block = 0;
2903 long n, _Bool shuffling, _Bool ranking)
2905 kmr_assert_kvs_ok(kvs, 0, 1, 0);
2906 assert(kvs->c.magic == KMR_KVS_ONCORE);
2907 long cnt = MIN(n, kvs->c.element_count);
2908 kvs->c.current_block = kvs->c.first_block;
2909 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2910 for (
long i = 0; i < cnt && e != 0; i++) {
2913 ev[i].v = (ranking ? kv.k.i : kmr_pitch_rank(kv, kvs));
2916 ev[i].v = kmr_stable_key(kv, kvs);
2919 e = kmr_kvs_next(kvs, e, 0);
2921 kvs->c.current_block = 0;
2933 kmr_error(mr,
"kmr_legal_minimum_field_size: Bad field");
2935 case KMR_KV_INTEGER:
2936 return sizeof(long);
2938 return sizeof(double);
2940 case KMR_KV_CSTRING:
2941 case KMR_KV_POINTER_OWNED:
2942 case KMR_KV_POINTER_UNMANAGED:
2945 kmr_error(mr,
"kmr_legal_minimum_field_size: Bad field");
2966 KMR *mr = kvo->c.mr;
2970 long cnt = kvi->c.element_count;
2974 assert(cc == MPI_SUCCESS);
2977 for (
long i = 0; i < cnt; i++) {
2980 assert(cc == MPI_SUCCESS);
2985 cc = (*r)(bx, 2, kvi, xs, 0);
2986 if (cc != MPI_SUCCESS) {
2988 snprintf(ee,
sizeof(ee),
2989 "Reduce-fn returned with error cc=%d", cc);
2993 assert(cc == MPI_SUCCESS);
2995 bx[0].klen = bx[1].klen;
2998 assert(cc == MPI_SUCCESS);
3003 assert(cc == MPI_SUCCESS);
3005 if (carryout != 0) {
3008 assert(cc == MPI_SUCCESS);
3010 assert(cc == MPI_SUCCESS);
3012 assert(cc == MPI_SUCCESS);
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
int kmr_ckpt_progress_init(KMR_KVS *, KMR_KVS *, struct kmr_option)
It initializes a progress of MapReduce checkpointing.
Key-Value Stream (abstract).
int kmr_add_kv1(KMR_KVS *kvs, void *k, int klen, void *v, int vlen)
Adds a key-value pair as given directly by a pointer.
int kmr_save_kvs(KMR_KVS *kvs, void **dataq, size_t *szq, struct kmr_option opt)
Packs locally the contents of a key-value stream to a byte array.
kmr_trace_entry_t * kmr_trace_add_entry(KMR *mr, kmr_trace_event_t ev, kmr_trace_entry_t *pre, KMR_KVS *kvi, KMR_KVS *kvo)
Add an entry to the trace.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *identifying_name)
Makes a new KMR context (a context has type KMR).
Utilities Private Part (do not include from applications).
int kmr_add_identity_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Adds a given key-value pair unmodified.
Options to Mapping, Shuffling, and Reduction.
int kmr_finish_swf(KMR *mr)
Clears the lanes of simple workflow.
void kmr_ckpt_progress_fin(KMR *)
It finalizes the progress of MapReduce checkpointing.
static KMR_KVS * kmr_create_raw_kvs(KMR *mr, const KMR_KVS *_similar)
Makes a new key-value stream (type KMR_KVS).
void kmr_ckpt_restore_ckpt(KMR_KVS *)
It restores checkpoint data to kvs.
int kmr_map_skipping(long from, long stride, long limit, _Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps by skipping the number of entries.
int kmr_add_string(KMR_KVS *kvs, const char *k, const char *v)
Adds a key-value pair of strings.
static int kmr_icmp(const void *a0, const void *a1)
Compares the key field of keyed-records for qsort/bsearch.
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_sort_locally(KMR_KVS *kvi, KMR_KVS *kvo, _Bool shuffling, struct kmr_option opt)
Reorders key-value pairs in a single rank.
int kmr_map9(_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m, const char *file, const int line, const char *func)
Maps simply.
int kmr_ckpt_enabled(KMR *)
Check if checkpoint/restart is enabled.
static const size_t kmr_kvs_entry_header
Size of an Entry Header.
Keyed-Record for Sorting.
int kmr_add_kv_space(KMR_KVS *kvs, const struct kmr_kv_box kv, void **keyp, void **valuep)
Adds a key-value pair, but only allocates a space and returns the pointers to the key and the value p...
long kmr_ckpt_first_unprocessed_kv(KMR *)
It returns the index of the first unprocessed key-value in the input KVS.
int kmr_move_kvs(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Moves the contents of the input KVI to the output KVO.
int kmr_map_rank_by_rank(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps sequentially with rank by rank for debugging.
struct kmr_kvs_entry * kmr_find_kvs_last_entry(KMR_KVS *kvs)
Finds the last entry of a key-value stream.
void kmr_ckpt_save_kvo_whole(KMR *, KMR_KVS *)
It saves all key-value pairs in the output KVS to a checkpoint data file.
int kmr_initialize_mpi(int *refargc, char ***refargv)
Checks the initialization state of MPI, and initializes MPI when not.
int kmr_allgatherv(KMR *mr, _Bool rankzeroonly, void *sbuf, long scnt, void *rbuf, long *rcnts, long *rdsps)
All-gathers data, or gathers data when RANKZEROONLY.
void kmr_ckpt_save_kvo_block_add(KMR *, KMR_KVS *, long)
It adds a new block of key-value pairs of the output KVS to the checkpoint data file.
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
kmr_kv_field
Datatypes of Keys or Values.
int kmr_retrieve_kv_box_entries(KMR_KVS *kvs, struct kmr_kv_box *ev, long n)
Fills local key-value entries in an array of kmr_kv_box for inspection.
void kmr_isort(void *a, size_t n, size_t es, int depth)
Sorts by comparator on long integers.
int kmr_take_one(KMR_KVS *kvi, struct kmr_kv_box *kv)
Extracts a single key-value pair locally in the key-value stream KVI.
int kmr_concatenate_kvs(KMR_KVS *kvs[], int nkvs, KMR_KVS *kvo, struct kmr_option opt)
Concatenates a number of KVSes to one.
void kmr_ckpt_create_context(KMR *)
Initialize checkpoint context.
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
int kmr_reduce9(_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r, const char *file, const int line, const char *func)
Reduces key-value pairs.
Options to Mapping by Spawns.
void kmr_ckpt_save_kvo_block_fin(KMR *, KMR_KVS *)
It finalizes saving block of key-value pairs of the output KVS to the checkpoint data file...
State during kmr_map_ms().
void kmr_ckpt_free_context(KMR *)
Free checkpoint context.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
int kmr_ckpt_disable_ckpt(KMR *)
It temporally disables checkpoint/restart.
int kmr_add_kv_pushoff(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
void kmr_trace_initialize(KMR *mr)
Initialize a trace.
int kmr_assert_sorted(KMR_KVS *kvi, _Bool locally, _Bool shuffling, _Bool ranking)
Checks a key-value stream is sorted.
void kmr_trace_finalize(KMR *mr)
Finalize a trace.
void kmr_ckpt_save_kvo_block_init(KMR *, KMR_KVS *)
It initializes saving blocks of key-value pairs of the output KVS to a checkpoint data file...
int kmr_retrieve_keyed_records(KMR_KVS *kvs, struct kmr_keyed_record *ev, long n, _Bool shuffling, _Bool ranking)
Fills keyed records in an array for sorting.
#define xassert(X)
Asserts and aborts, but it cannot be disabled.
#define kmr_realloc(P, Z)
Allocates memory, or aborts when failed.
int kmr_scan_locally(KMR_KVS *kvi, KMR_KVS *carryin, KMR_KVS *kvo, KMR_KVS *carryout, kmr_redfn_t r)
Scans every key-value with a reduce-function locally (independently on each rank).
int kmr_retrieve_kvs_entries(KMR_KVS *kvs, struct kmr_kvs_entry **ev, long n)
Fills local key-value entries in an array for inspection.
int kmr_add_kv_done_pushoff(KMR_KVS *kvs)
Marks finished adding key-value pairs, called from kmr_add_kv_done().
Options to Mapping on Files.
int kmr_fin(void)
Clears the environment.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv().
int(* kmr_redfn_t)(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg)
Reduce-function Type.
int kmr_gather_sizes(KMR *mr, long siz, long *rbuf)
Calls all-gather for collecting one long-integer.
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
int kmr_ckpt_enable_ckpt(KMR *, int)
It temporally enables checkpoint/restart which has been disabled by calling kmr_ckpt_disable_ckpt().
void kmr_free_string(char *s)
Frees a string strduped.
static void kmr_poke_kv(struct kmr_kvs_entry *e, const struct kmr_kv_box kv, struct kmr_kv_box *xkv, const KMR_KVS *kvs, _Bool reserve_space_only)
Stores a key-value pair at the entry E in the store – a reverse of kmr_pick_kv().
Information of Source Code Line.
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
void kmr_ckpt_lock_start(KMR *)
Define the start position of code region that is referred when restart.
int kmr_copy_mpi_info(MPI_Info src, MPI_Info dst)
Copies contents of MPI_Info.
KMR_KVS * kmr_create_kvs7(KMR *mr, enum kmr_kv_field kf, enum kmr_kv_field vf, struct kmr_option opt, const char *file, const int line, const char *func)
Makes a new key-value stream with the specified field data-types.
int kmr_alltoallv(KMR *mr, void *sbuf, long *scounts, long *sdsps, void *rbuf, long *rcounts, long *rdsps)
Does all-to-all-v, but it takes the arguments by long-integers.
int kmr_restore_kvs(KMR_KVS *kvo, void *data, size_t sz_, struct kmr_option opt)
Unpacks locally the contents of a key-value stream from a byte array.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
int kmr_reduce_as_one(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Calls a reduce-function once as if all key-value pairs had the same key.
void kmr_ckpt_remove_ckpt(KMR_KVS *)
It removes checkpoint data file.
void kmr_ckpt_lock_finish(KMR *)
Define the end position of code region that is referred when restart.
int kmr_exchange_sizes(KMR *mr, long *sbuf, long *rbuf)
Calls all-to-all to exchange one long-integer.
int kmr_legal_minimum_field_size(KMR *mr, enum kmr_kv_field f)
Returns a minimum byte size of the field: 8 for INTEGER and FLOAT8, 0 for others. ...