13 #include <sys/types.h> 23 #include "../config.h" 29 static void kmr_ckpt_init_environment(
KMR *);
30 static int kmr_ckpt_check_restart(
KMR *,
int **,
int *,
int *);
31 static void kmr_ckpt_restore_prev_progress(
KMR *,
int *,
int);
32 static void kmr_ckpt_restore_prev_state(
KMR *,
const char *,
int*,
int,
int);
33 static void kmr_ckpt_restore_prev_state_each_rank(
KMR *,
37 static void kmr_ckpt_merge_ignore_ckpt_data(
long,
40 static void kmr_ckpt_merge_store_ckpt_data(
long,
int,
long,
43 static void kmr_ckpt_merge_update_ckpt_data(
long,
int,
long,
long,
47 static void kmr_ckpt_merge_sort_data(
KMR *,
const char *,
long,
49 static void kmr_ckpt_merge_write_file(
KMR *,
const char *,
52 static void kmr_ckpt_init_log(
KMR *,
const char *);
53 static void kmr_ckpt_fin_log(
KMR *);
54 static FILE *kmr_ckpt_open_log(
KMR *,
const char *,
struct kmr_ckpt_log *,
56 static void kmr_ckpt_log_whole_start(
KMR *);
57 static void kmr_ckpt_log_whole_finish(
KMR *);
58 static void kmr_ckpt_log_block_start(
KMR *,
KMR_KVS *);
59 static void kmr_ckpt_log_block_add(
KMR *,
long,
long);
60 static void kmr_ckpt_log_block_finish(
KMR *);
61 static void kmr_ckpt_log_index_start(
KMR *,
KMR_KVS *);
62 static void kmr_ckpt_log_index_add(
KMR *,
long,
long);
63 static void kmr_ckpt_log_index_finish(
KMR *);
64 static void kmr_ckpt_log_delete_start(
KMR *,
long);
65 static void kmr_ckpt_log_delete_finish(
KMR *,
long);
66 static void kmr_ckpt_log_deletable(
KMR *,
long );
67 static void kmr_ckpt_log_progress(
KMR *);
68 static void kmr_ckpt_log_skipped(
KMR *);
70 static void kmr_ckpt_delete_ckpt_data(
KMR *,
long);
71 static void kmr_ckpt_delete_ckpt_files(
KMR *,
const char *,
int);
72 static void kmr_ckpt_save_ckpt(
KMR_KVS *);
73 static void kmr_ckpt_kv_record_init(
KMR *,
KMR_KVS *);
74 static long kmr_ckpt_kv_record_add(
KMR_KVS *);
75 static void kmr_ckpt_kv_record_fin(
KMR *);
76 static FILE *kmr_ckpt_open(
KMR_KVS *,
const char *);
77 static FILE *kmr_ckpt_open_path(
KMR *,
const char *,
const char *);
78 static void kmr_ckpt_save_nprocs(
KMR *,
const char *);
79 static void kmr_ckpt_make_fname(
const char *,
const char *,
80 enum kmr_ckpt_type,
int,
long,
char *,
size_t);
81 static void kmr_ckpt_get_data_flist(
KMR *,
const char *,
84 static void kmr_ckpt_flush(
KMR *, FILE *);
86 static void kmr_ckpt_list_init(
struct kmr_ckpt_list *, kmr_ckpt_list_alocfn_t,
87 kmr_ckpt_list_freefn_t, kmr_ckpt_list_compfn_t);
89 static void kmr_ckpt_list_add(
struct kmr_ckpt_list *,
void *);
90 static void *kmr_ckpt_list_del(
struct kmr_ckpt_list *,
void *);
91 static void *kmr_ckpt_list_search(
struct kmr_ckpt_list *,
void *);
92 static void *kmr_ckpt_list_rsearch(
struct kmr_ckpt_list *,
void *);
95 static void kmr_ckpt_int_list_add(
struct kmr_ckpt_list *,
long);
96 static long kmr_ckpt_int_list_del(
struct kmr_ckpt_list *,
long);
97 static long kmr_ckpt_int_list_search(
struct kmr_ckpt_list *,
long);
98 static long kmr_ckpt_int_list_rsearch(
struct kmr_ckpt_list *,
long);
123 mr->ckpt_ctx = ckptctx;
124 snprintf(ckptctx->ckpt_dname, KMR_CKPT_DIRLEN,
"./%s%05d",
125 KMR_CKPT_DIRNAME, mr->rank);
126 ckptctx->prev_mode = KMR_CKPT_ALL;
127 ckptctx->ckpt_log_fp = NULL;
128 ckptctx->progress_counter = 0;
129 ckptctx->prev_progress = 0;
130 ckptctx->prev_global_progress = 0;
131 ckptctx->cur_kvi_id = KMR_CKPT_DUMMY_ID;
132 ckptctx->cur_kvo_id = KMR_CKPT_DUMMY_ID;
133 ckptctx->ckpt_data_fp = NULL;
134 ckptctx->saved_element_count = 0;
135 ckptctx->saved_adding_point = NULL;
136 ckptctx->saved_current_block = NULL;
137 ckptctx->kv_positions = NULL;
138 ckptctx->kv_positions_count = 0;
139 ckptctx->lock_id = 0;
140 ckptctx->lock_counter = 0;
141 ckptctx->initialized = 0;
142 ckptctx->slct_cur_take_ckpt = 0;
143 if (mr->ckpt_selective) {
146 kmr_ckpt_int_list_init(ckptctx->slct_skip_ops);
148 ckptctx->slct_skip_ops = NULL;
151 if (mr->ckpt_enable) {
152 kmr_ckpt_init_environment(mr);
166 MPI_Barrier(mr->comm);
167 kmr_ckpt_fin_log(mr);
168 kmr_ckpt_delete_ckpt_files(mr, ckptctx->ckpt_dname, mr->rank);
169 kmr_free(ckptctx->kv_positions,
170 sizeof(
struct kv_position) * (
size_t)ckptctx->kv_positions_count);
172 if (mr->ckpt_selective) {
173 kmr_ckpt_int_list_free(ckptctx->slct_skip_ops);
174 kmr_free(ckptctx->slct_skip_ops,
sizeof(
struct kmr_ckpt_list));
186 kmr_ckpt_init_environment(
KMR *mr)
189 if (ckptctx->initialized) {
194 int *prev_ranks = NULL;
195 int prev_rank_count = 0;
197 int restarted = kmr_ckpt_check_restart(mr, &prev_ranks, &prev_rank_count,
201 int cc = MPI_Allreduce(&restarted, &all_restarted, 1, MPI_INT,
203 assert(cc == MPI_SUCCESS);
204 assert(restarted == all_restarted);
208 char tmp_dname[KMR_CKPT_DIRLEN];
209 snprintf(tmp_dname, KMR_CKPT_DIRLEN,
"./tmp_%s%05d",
210 KMR_CKPT_DIRNAME, mr->rank);
211 kmr_ckpt_delete_ckpt_files(mr, tmp_dname, mr->rank);
212 int cc = mkdir(tmp_dname, S_IRWXU);
214 char msg[KMR_CKPT_MSGLEN];
215 snprintf(msg,
sizeof(msg),
216 "Failed to create a directory for checkpoint %s", tmp_dname);
222 kmr_ckpt_restore_prev_progress(mr, prev_ranks, prev_rank_count);
223 kmr_ckpt_restore_prev_state(mr, tmp_dname, prev_ranks, prev_rank_count,
228 char log_fpath[KMR_CKPT_PATHLEN];
229 kmr_ckpt_make_fname(tmp_dname, KMR_CKPT_FNAME_PREFIX, KMR_CKPT_LOG,
230 mr->rank, 0, log_fpath,
sizeof(log_fpath));
231 kmr_ckpt_init_log(mr, log_fpath);
235 kmr_ckpt_save_nprocs(mr, tmp_dname);
239 for (
int i = 0; i < prev_rank_count; i++) {
240 char old_dname[KMR_CKPT_DIRLEN];
241 snprintf(old_dname, KMR_CKPT_DIRLEN,
"./%s%05d.old",
242 KMR_CKPT_DIRNAME, prev_ranks[i]);
243 kmr_ckpt_delete_ckpt_files(mr, old_dname, prev_ranks[i]);
244 char cur_dname[KMR_CKPT_DIRLEN];
245 snprintf(cur_dname, KMR_CKPT_DIRLEN,
"./%s%05d",
246 KMR_CKPT_DIRNAME, prev_ranks[i]);
248 cc = stat(cur_dname, &sb);
250 cc = rename(cur_dname, old_dname);
254 MPI_Barrier(mr->comm);
255 cc = rename(tmp_dname, ckptctx->ckpt_dname);
259 kmr_free(prev_ranks,
sizeof(
int) * (
size_t)prev_rank_count);
261 ckptctx->initialized = 1;
267 kmr_ckpt_check_restart(
KMR *mr,
int **target_ranks,
int *target_rank_count,
271 _Bool force_start_from_scratch = 0;
274 int cc = stat(ckptctx->ckpt_dname, &sb);
276 if (!S_ISDIR(sb.st_mode)) {
277 char msg[KMR_CKPT_MSGLEN];
278 snprintf(msg,
sizeof(msg),
279 "Non-directory file for checkpoint directory %s " 281 ckptctx->ckpt_dname);
285 char fpath[KMR_CKPT_PATHLEN];
286 kmr_ckpt_make_fname(ckptctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
287 KMR_CKPT_LOG, mr->rank, 0, fpath,
sizeof(fpath));
288 cc = access(fpath, R_OK);
291 unsigned long log_size = 0;
292 FILE *fp = kmr_ckpt_open_log(mr, fpath, &log_hdr, &log_size);
294 assert(mr->rank == log_hdr.rank);
295 assert(log_hdr.nprocs > 0);
297 force_start_from_scratch = 1;
298 char msg[KMR_CKPT_MSGLEN];
299 snprintf(msg,
sizeof(msg),
300 "Log file exists, but no log is recorded in %s. " 301 "All logs are discarded and start from scratch",
303 kmr_warning(mr, 1, msg);
305 int quotient = log_hdr.nprocs / mr->nprocs;
306 int rest = log_hdr.nprocs % mr->nprocs;
307 int cnt = quotient + ((mr->rank < rest) ? 1 : 0);
309 *target_ranks = (
int*)
kmr_malloc(
sizeof(
int) * (size_t)cnt);
310 int offset = mr->rank * quotient +
311 ((mr->rank < rest) ? mr->rank : rest);
312 for (
int i = 0; i < cnt; i++) {
313 (*target_ranks)[i] = offset + i;
316 *target_rank_count = cnt;
317 *target_nprocs = log_hdr.nprocs;
318 if (mr->nprocs > log_hdr.nprocs) {
320 char msg[KMR_CKPT_MSGLEN];
321 snprintf(msg,
sizeof(msg),
322 "Currently restart with bigger number of processes " 326 if (mr->ckpt_selective && mr->nprocs != log_hdr.nprocs) {
327 char msg[KMR_CKPT_MSGLEN];
328 snprintf(msg,
sizeof(msg),
329 "Restart with different number of processes " 330 "is not supported in selective mode");
333 ckptctx->prev_mode = log_hdr.mode;
335 char msg[KMR_CKPT_MSGLEN];
336 snprintf(msg,
sizeof(msg),
337 "Structure of a checkpoint directory may be wrong %s. " 338 "Delete all checkpoint directories",
339 ckptctx->ckpt_dname);
343 if (errno != ENOENT) {
344 char msg[KMR_CKPT_MSGLEN];
345 snprintf(msg,
sizeof(msg),
346 "Unknown error on checkpoint directory %s",
347 ckptctx->ckpt_dname);
350 assert(*target_rank_count == 0);
354 if (*target_rank_count > 0) {
355 for (
int i = 1; i < *target_rank_count; i++) {
357 int t_rank = (*target_ranks)[i];
358 char dpath[KMR_CKPT_DIRLEN];
359 snprintf(dpath, KMR_CKPT_DIRLEN,
"./%s%05d", KMR_CKPT_DIRNAME,
361 char fpath[KMR_CKPT_PATHLEN];
362 kmr_ckpt_make_fname(dpath, KMR_CKPT_FNAME_PREFIX, KMR_CKPT_LOG,
363 t_rank, 0, fpath,
sizeof(fpath));
364 cc = access(fpath, R_OK);
367 unsigned long log_size = 0;
368 FILE *fp = kmr_ckpt_open_log(mr, fpath, &log_hdr, &log_size);
370 if (log_hdr.nprocs < 0) {
374 force_start_from_scratch = 1;
375 char msg[KMR_CKPT_MSGLEN];
376 snprintf(msg,
sizeof(msg),
377 "Log file exists, but no log is recorded in %s. " 378 "All logs are discarded and start from scratch",
380 kmr_warning(mr, 1, msg);
386 kmr_free(*target_ranks, (
size_t)*target_rank_count);
387 char msg[KMR_CKPT_MSGLEN];
388 snprintf(msg,
sizeof(msg),
389 "Wrong structure of checkpoint directory %s. ",
394 if (!force_start_from_scratch) {
403 kmr_ckpt_restore_prev_progress_all(
KMR *mr,
404 int *target_ranks,
int target_rank_count)
407 long min_progress = -1;
410 for (
int i = 0; i < target_rank_count; i++) {
411 int rank = target_ranks[i];
412 char dpath[KMR_CKPT_DIRLEN];
413 snprintf(dpath, KMR_CKPT_DIRLEN,
"./%s%05d", KMR_CKPT_DIRNAME, rank);
414 char fpath[KMR_CKPT_PATHLEN];
415 kmr_ckpt_make_fname(dpath, KMR_CKPT_FNAME_PREFIX, KMR_CKPT_LOG,
416 rank, 0, fpath,
sizeof(fpath));
418 unsigned long total, size = 0;
419 FILE *fp = kmr_ckpt_open_log(mr, fpath, &log_hdr, &total);
420 long max_done_op = 0, cur_op = 0;
421 _Bool num_procs_locked = 0;
422 while (size < total) {
424 size_t rc = fread((
void *)&e,
sizeof(e), 1, fp);
426 char msg[KMR_CKPT_MSGLEN];
427 snprintf(msg,
sizeof(msg),
428 "Failed to read a checkpoint log entry");
432 case KMR_CKPT_LOG_WHOLE_START:
433 case KMR_CKPT_LOG_BLOCK_START:
434 case KMR_CKPT_LOG_INDEX_START:
437 case KMR_CKPT_LOG_WHOLE_FINISH:
438 case KMR_CKPT_LOG_BLOCK_FINISH:
439 case KMR_CKPT_LOG_INDEX_FINISH:
440 max_done_op = cur_op;
443 case KMR_CKPT_LOG_SKIPPED:
444 max_done_op = e.op_seqno;
446 case KMR_CKPT_LOG_LOCK_START:
447 assert(num_procs_locked == 0);
448 num_procs_locked = 1;
450 case KMR_CKPT_LOG_LOCK_FINISH:
451 assert(num_procs_locked == 1);
452 num_procs_locked = 0;
458 if (num_procs_locked && target_rank_count > 1) {
460 char msg[KMR_CKPT_MSGLEN];
461 snprintf(msg,
sizeof(msg),
462 "Fault occurred in a critical region and can not restart " 463 "with the different number of processes. " 464 "Restart with the same number of processes with " 465 "the previous run.");
468 if (min_progress < 0) {
469 min_progress = max_done_op;
471 if (max_done_op < min_progress) {
472 min_progress = max_done_op;
476 assert(min_progress >= 0);
479 long global_min_progress;
480 int cc = MPI_Allreduce(&min_progress, &global_min_progress, 1, MPI_LONG,
482 assert(cc == MPI_SUCCESS);
484 ckptctx->prev_progress = min_progress;
485 ckptctx->prev_global_progress = global_min_progress;
490 kmr_ckpt_restore_prev_progress_selective(
KMR *mr,
int *target_ranks,
491 int target_rank_count)
493 long min_progress = -1, max_progress = -1;
495 for (
int i = 0; i < target_rank_count; i++) {
496 int rank = target_ranks[i];
497 char dpath[KMR_CKPT_DIRLEN];
498 snprintf(dpath, KMR_CKPT_DIRLEN,
"./%s%05d", KMR_CKPT_DIRNAME, rank);
499 char fpath[KMR_CKPT_PATHLEN];
500 kmr_ckpt_make_fname(dpath, KMR_CKPT_FNAME_PREFIX, KMR_CKPT_LOG,
501 rank, 0, fpath,
sizeof(fpath));
503 unsigned long total, size = 0;
504 FILE *fp = kmr_ckpt_open_log(mr, fpath, &log_hdr, &total);
505 long target_kvs_id = KMR_CKPT_DUMMY_ID;
508 kmr_ckpt_kvs_chains_init(&chains);
511 kmr_ckpt_int_list_init(&kvses);
512 _Bool num_procs_locked = 0;
513 while (size < total) {
515 size_t rc = fread((
void *)&e,
sizeof(e), 1, fp);
517 char msg[KMR_CKPT_MSGLEN];
518 snprintf(msg,
sizeof(msg),
519 "Failed to read a checkpoint log entry");
524 .kvo_id = e.kvo_id };
527 case KMR_CKPT_LOG_WHOLE_START:
528 target_kvs_id = e.kvo_id;
530 case KMR_CKPT_LOG_WHOLE_FINISH:
531 kmr_ckpt_int_list_add(&kvses, target_kvs_id);
532 target_kvs_id = KMR_CKPT_DUMMY_ID;
534 case KMR_CKPT_LOG_DELETABLE:
535 v = kmr_ckpt_int_list_del(&kvses, e.kvo_id);
536 assert(v == e.kvo_id);
538 case KMR_CKPT_LOG_PROGRESS:
539 case KMR_CKPT_LOG_SKIPPED:
540 if (op.kvi_id == KMR_CKPT_DUMMY_ID) {
541 kmr_ckpt_kvs_chains_new_chain(&chains, op);
543 kmr_ckpt_kvs_chains_connect(&chains, op);
546 case KMR_CKPT_LOG_LOCK_START:
547 assert(num_procs_locked == 0);
548 num_procs_locked = 1;
550 case KMR_CKPT_LOG_LOCK_FINISH:
551 assert(num_procs_locked == 1);
552 num_procs_locked = 0;
558 if (num_procs_locked) {
564 long open_min_progress = LONG_MAX;
565 long open_max_progress = 0;
567 for (
int j = 0; j < chains.chainlst_size; j++) {
571 if (last_op->op_seqno > last_op_id) {
572 last_op_id = last_op->op_seqno;
574 if (last_op->kvo_id != KMR_CKPT_DUMMY_ID) {
577 for (item = list->tail; item != 0; item = item->prev) {
580 long v = kmr_ckpt_int_list_search(&kvses, op->kvo_id);
581 if (v == op->kvo_id) {
582 if (op->op_seqno < open_min_progress) {
583 open_min_progress = op->op_seqno;
585 if (op->op_seqno > open_max_progress) {
586 open_max_progress = op->op_seqno;
593 if (open_min_progress == LONG_MAX && open_max_progress == 0) {
594 open_min_progress = last_op_id;
595 open_max_progress = last_op_id;
599 struct kmr_ckpt_list *skip_ops = mr->ckpt_ctx->slct_skip_ops;
600 for (
int j = 0; j < chains.chainlst_size; j++) {
604 if (last_op->op_seqno <= open_min_progress) {
609 if (head_op->op_seqno > open_max_progress) {
612 if (last_op->kvo_id == KMR_CKPT_DUMMY_ID) {
616 for (item = list->head; item != 0; item = item->next) {
619 if (op->op_seqno > open_min_progress) {
620 kmr_ckpt_int_list_add(skip_ops, op->op_seqno);
629 for (item = list->tail; item != 0; item = item->prev) {
632 long v = kmr_ckpt_int_list_search(&kvses, op->kvo_id);
633 if (v == op->kvo_id) {
637 if (op->op_seqno > open_min_progress) {
638 kmr_ckpt_int_list_add(skip_ops, op->op_seqno);
644 kmr_ckpt_kvs_chains_free(&chains);
645 kmr_ckpt_int_list_free(&kvses);
646 min_progress = open_min_progress;
647 max_progress = open_max_progress;
650 assert(max_progress >= 0 && min_progress >= 0);
651 mr->ckpt_ctx->prev_progress = max_progress;
652 mr->ckpt_ctx->prev_global_progress = min_progress;
657 kmr_ckpt_restore_prev_progress(
KMR *mr,
658 int *target_ranks,
int target_rank_count)
660 if (!mr->ckpt_selective) {
661 kmr_ckpt_restore_prev_progress_all(mr, target_ranks,
664 kmr_ckpt_restore_prev_progress_selective(mr, target_ranks,
671 kmr_ckpt_restore_prev_state(
KMR *mr,
const char *wdpath,
672 int *target_ranks,
int target_rank_count,
677 (
char **)
kmr_malloc(
sizeof(
char *) * (size_t)target_rank_count);
680 int *nfiles = (
int *)
kmr_malloc(
sizeof(
int) * (size_t)target_rank_count);
681 int max_merge_count = 0;
682 for (
int i = 0; i < target_rank_count; i++) {
683 rdpaths[i] = (
char*)
kmr_malloc(
sizeof(
char) * KMR_CKPT_DIRLEN);
684 snprintf(rdpaths[i], KMR_CKPT_DIRLEN,
"./%s%05d",
685 KMR_CKPT_DIRNAME, target_ranks[i]);
686 kmr_ckpt_get_data_flist(mr, rdpaths[i], &dataflsts[i], &nfiles[i], 1);
687 max_merge_count += nfiles[i];
692 merge_ctx.max_each_merge = target_rank_count;
693 merge_ctx.merges_count = 0;
697 for (
int i = 0; i < target_rank_count; i++) {
699 prev_state.prev_rank = target_ranks[i];
700 prev_state.prev_nprocs = prev_nprocs;
701 prev_state.ckpt_dir = rdpaths[i];
702 prev_state.dataflst = dataflsts[i];
703 prev_state.dataflst_size = nfiles[i];
704 kmr_ckpt_restore_prev_state_each_rank(mr, &prev_state, &merge_ctx);
710 for (
int i = 0; i < target_rank_count; i++) {
711 fprintf(stderr,
"index: %d\n", i);
712 fprintf(stderr,
" rdpath: %s\n", rdpaths[i]);
713 fprintf(stderr,
" nfiles: %d\n", nfiles[i]);
714 for (
int j = 0; j < nfiles[i]; j++) {
716 fprintf(stderr,
" ckptflst: %ld, %s/%s\n",
717 file->kvs_id, file->dname, file->fname);
720 fprintf(stderr,
"max_merge_count: %d\n", max_merge_count);
722 fprintf(stderr,
"\n\n");
724 fprintf(stderr,
"merge_count: %d\n", merge_ctx.merges_count);
725 for (
int i = 0; i < merge_ctx.merges_count; i++) {
726 fprintf(stderr,
"merge\n");
727 fprintf(stderr,
" rank: %d\n", merge_ctx.merges[i].rank);
728 fprintf(stderr,
" kvs_id: %ld\n", merge_ctx.merges[i].kvs_id);
729 fprintf(stderr,
" src_lst: %d\n",
730 merge_ctx.merges[i].src_lst_count);
731 for (
int j = 0; j < merge_ctx.merges[i].src_lst_count; j++) {
733 &(merge_ctx.merges[i].src_lst[j]);
734 fprintf(stderr,
" rank: %d, n_kvi: %ld, n_kvo: %ld\n",
735 source->rank, source->n_kvi, source->n_kvo);
736 fprintf(stderr,
" file: %s/%s\n",
737 source->file->dname, source->file->fname);
738 if (merge_ctx.merges[i].src_lst[j].done_ikv_lst_size > 0) {
739 fprintf(stderr,
" done ikvs index: ");
740 for (
int k = 0; k < source->done_ikv_lst_size; k++) {
741 fprintf(stderr,
"%ld,", source->done_ikv_lst[k]);
743 fprintf(stderr,
"\n");
751 for (
int i = 0; i < merge_ctx.merges_count; i++) {
753 for (
int j = 0; j < merge->src_lst_count; j++) {
754 if (merge->src_lst[j].n_kvi > 0 &&
755 merge->src_lst[j].done_ikv_lst_size > 0) {
757 kmr_ckpt_merge_sort_data(mr, wdpath, merge->kvs_id,
764 for (
int i = 0; i < merge_ctx.merges_count; i++) {
765 kmr_ckpt_merge_write_file(mr, wdpath, &merge_ctx.merges[i]);
770 for (
int i = 0; i < merge_ctx.merges_count; i++) {
772 for (
int j = 0; j < merge->src_lst_count; j++) {
774 if (mg_src->kvi_op_seqno > 0) {
775 ckptctx->kv_positions_count++;
783 for (
int i = 0; i < merge_ctx.merges_count; i++) {
786 for (
int j = 0; j < merge->src_lst_count; j++) {
788 if (mg_src->n_kvi > 0) {
789 struct kv_position *kvpos = &ckptctx->kv_positions[idx];
792 kvpos->op_seqno = mg_src->kvi_op_seqno;
793 kvpos->start_from = mg_src->n_kvi;
795 assert(mg_src->kvi_op_seqno == kvpos->op_seqno);
796 kvpos->start_from += mg_src->n_kvi;
807 for (
int i = 0; i < ckptctx->kv_positions_count; i++) {
808 fprintf(stderr,
"op_seqno: %ld, start_from: %ld\n",
809 ckptctx->kv_positions[i].op_seqno,
810 ckptctx->kv_positions[i].start_from);
815 for (
int i = 0; i < merge_ctx.merges_count; i++) {
816 for (
int j = 0; j < merge_ctx.merges[i].src_lst_count; j++) {
817 if (merge_ctx.merges[i].src_lst[j].done_ikv_lst_size > 0) {
819 kmr_free(mg_src->done_ikv_lst,
820 sizeof(
long) * (
size_t)mg_src->done_ikv_lst_size);
821 char fpath[KMR_CKPT_PATHLEN];
822 snprintf(fpath, KMR_CKPT_PATHLEN,
"%s/%s",
823 mg_src->file->dname, mg_src->file->fname);
829 kmr_free(merge_ctx.merges,
831 for (
int i = 0; i < target_rank_count; i++) {
832 kmr_free(dataflsts[i],
834 kmr_free(rdpaths[i],
sizeof(
char) * KMR_CKPT_DIRLEN);
838 kmr_free(nfiles,
sizeof(
int) * (
size_t)target_rank_count);
839 kmr_free(rdpaths,
sizeof(
char *) * (
size_t)target_rank_count);
845 kmr_ckpt_restore_prev_state_each_rank_all
850 char logfile[KMR_CKPT_PATHLEN];
851 kmr_ckpt_make_fname(prev_state->ckpt_dir, KMR_CKPT_FNAME_PREFIX,
852 KMR_CKPT_LOG, prev_state->prev_rank, 0,
853 logfile,
sizeof(logfile));
855 unsigned long total, size = 0;
856 FILE *fp = kmr_ckpt_open_log(mr, logfile, &log_hdr, &total);
861 kmr_ckpt_kvs_chains_init(&kvs_chains);
865 long nkvi = 0, nkvo = 0;
867 long undel_kvs_id = 0;
869 while (size < total) {
871 size_t rc = fread((
void *)&e,
sizeof(e), 1, fp);
873 char msg[KMR_CKPT_MSGLEN];
874 snprintf(msg,
sizeof(msg),
875 "Failed to read a checkpoint log entry");
880 .kvo_id = e.kvo_id };
882 case KMR_CKPT_LOG_WHOLE_START:
884 if (cur_op <= ckptctx->prev_global_progress) {
888 if (e.kvi_id == KMR_CKPT_DUMMY_ID) {
891 kmr_ckpt_merge_ignore_ckpt_data(e.kvo_id, prev_state,
893 kmr_ckpt_kvs_chains_new_chain(&kvs_chains, op);
897 int cc = kmr_ckpt_merge_check_ignorable(&kvs_chains,
900 kmr_ckpt_merge_ignore_ckpt_data(e.kvo_id, prev_state,
902 kmr_ckpt_kvs_chains_connect(&kvs_chains, op);
906 if (cur_op > ckptctx->prev_progress) {
910 case KMR_CKPT_LOG_WHOLE_FINISH:
911 assert(e.op_seqno == cur_op);
912 kmr_ckpt_merge_store_ckpt_data(e.kvo_id, mr->rank, -1,
913 prev_state, merge_ctx);
916 case KMR_CKPT_LOG_BLOCK_START:
918 assert(e.kvi_id != KMR_CKPT_DUMMY_ID);
919 if (cur_op <= ckptctx->prev_global_progress) {
925 int cc = kmr_ckpt_merge_check_ignorable(&kvs_chains, e.kvi_id);
927 kmr_ckpt_merge_ignore_ckpt_data(e.kvo_id, prev_state,
929 kmr_ckpt_kvs_chains_connect(&kvs_chains, op);
932 if (cur_op > ckptctx->prev_progress) {
938 case KMR_CKPT_LOG_BLOCK_ADD:
939 assert(e.op_seqno == cur_op);
940 if (cur_op > ckptctx->prev_progress) {
946 case KMR_CKPT_LOG_BLOCK_FINISH:
947 assert(e.op_seqno == cur_op);
948 kmr_ckpt_merge_store_ckpt_data(e.kvo_id, mr->rank, -1,
949 prev_state, merge_ctx);
951 if (cur_op > ckptctx->prev_progress) {
956 case KMR_CKPT_LOG_INDEX_START:
958 assert(e.kvi_id != KMR_CKPT_DUMMY_ID);
959 if (cur_op <= ckptctx->prev_global_progress) {
965 int cc = kmr_ckpt_merge_check_ignorable(&kvs_chains, e.kvi_id);
967 kmr_ckpt_merge_ignore_ckpt_data(e.kvo_id, prev_state,
969 kmr_ckpt_kvs_chains_connect(&kvs_chains, op);
972 if (cur_op > ckptctx->prev_progress) {
974 kmr_ckpt_int_list_init(&spawn_dones);
979 case KMR_CKPT_LOG_INDEX_ADD:
980 assert(e.op_seqno == cur_op);
981 if (cur_op > ckptctx->prev_progress) {
983 kmr_ckpt_int_list_add(&spawn_dones, e.n_kvi);
988 case KMR_CKPT_LOG_INDEX_FINISH:
989 assert(e.op_seqno == cur_op);
990 kmr_ckpt_merge_store_ckpt_data(e.kvo_id, mr->rank, -1,
991 prev_state, merge_ctx);
993 if (cur_op > ckptctx->prev_progress) {
994 kmr_ckpt_int_list_free(&spawn_dones);
999 case KMR_CKPT_LOG_DELETE_START:
1000 undel_kvs_id = e.kvi_id;
1002 case KMR_CKPT_LOG_DELETE_FINISH:
1003 assert(e.kvi_id == undel_kvs_id);
1006 case KMR_CKPT_LOG_SKIPPED:
1007 kmr_ckpt_merge_store_ckpt_data(e.kvo_id, mr->rank, -1,
1008 prev_state, merge_ctx);
1015 switch (last_log.state) {
1016 case KMR_CKPT_LOG_WHOLE_START:
1017 kmr_ckpt_merge_ignore_ckpt_data(last_log.kvo_id, prev_state,
1020 case KMR_CKPT_LOG_BLOCK_START:
1021 kmr_ckpt_merge_ignore_ckpt_data(last_log.kvo_id, prev_state,
1024 case KMR_CKPT_LOG_BLOCK_ADD:
1025 kmr_ckpt_merge_store_ckpt_data(last_log.kvo_id, mr->rank, nkvo,
1026 prev_state, merge_ctx);
1027 kmr_ckpt_merge_update_ckpt_data(last_log.kvi_id, mr->rank,
1028 last_log.op_seqno, nkvi, NULL,
1029 prev_state, merge_ctx);
1031 case KMR_CKPT_LOG_INDEX_START:
1032 kmr_ckpt_merge_ignore_ckpt_data(last_log.kvo_id, prev_state,
1035 case KMR_CKPT_LOG_INDEX_ADD:
1036 kmr_ckpt_merge_store_ckpt_data(last_log.kvo_id, mr->rank, nkvo,
1037 prev_state, merge_ctx);
1038 assert(nkvi >= spawn_dones.size);
1039 kmr_ckpt_merge_update_ckpt_data(last_log.kvi_id, mr->rank,
1040 last_log.op_seqno, nkvi,
1041 &spawn_dones, prev_state,
1043 kmr_ckpt_int_list_free(&spawn_dones);
1047 if (undel_kvs_id != 0) {
1049 kmr_ckpt_merge_ignore_ckpt_data(undel_kvs_id, prev_state, merge_ctx);
1051 kmr_ckpt_kvs_chains_free(&kvs_chains);
1054 for (
int i = 0; i < prev_state->dataflst_size; i++) {
1055 if (prev_state->dataflst[i].checked != 1) {
1056 char msg[KMR_CKPT_MSGLEN];
1057 snprintf(msg,
sizeof(msg),
1058 "Checkpoint state is wrong. " 1059 "Delete all checkpoint and restart again");
1068 kmr_ckpt_restore_prev_state_each_rank_selective
1072 char logfile[KMR_CKPT_PATHLEN];
1073 kmr_ckpt_make_fname(prev_state->ckpt_dir, KMR_CKPT_FNAME_PREFIX,
1074 KMR_CKPT_LOG, prev_state->prev_rank, 0,
1075 logfile,
sizeof(logfile));
1077 unsigned long total, size = 0;
1078 FILE *fp = kmr_ckpt_open_log(mr, logfile, &log_hdr, &total);
1082 kmr_ckpt_int_list_init(&kvses);
1083 while (size < total) {
1085 size_t rc = fread((
void *)&e,
sizeof(e), 1, fp);
1087 char msg[KMR_CKPT_MSGLEN];
1088 snprintf(msg,
sizeof(msg),
1089 "Failed to read a checkpoint log entry");
1093 case KMR_CKPT_LOG_DELETABLE:
1094 kmr_ckpt_int_list_add(&kvses, e.kvo_id);
1100 for (
int i = 0; i < prev_state->dataflst_size; i++) {
1102 long v = kmr_ckpt_int_list_rsearch(&kvses, file->kvs_id);
1103 if (v == file->kvs_id) {
1105 kmr_ckpt_merge_ignore_ckpt_data(file->kvs_id, prev_state,
1108 kmr_ckpt_merge_store_ckpt_data(file->kvs_id, mr->rank, -1,
1109 prev_state, merge_ctx);
1112 kmr_ckpt_int_list_free(&kvses);
1115 for (
int i = 0; i < prev_state->dataflst_size; i++) {
1116 if (prev_state->dataflst[i].checked != 1) {
1117 char msg[KMR_CKPT_MSGLEN];
1118 snprintf(msg,
sizeof(msg),
1119 "Checkpoint state is wrong. " 1120 "Delete all checkpoint and restart again");
1128 kmr_ckpt_restore_prev_state_each_rank(
KMR *mr,
1132 if (!mr->ckpt_selective) {
1133 kmr_ckpt_restore_prev_state_each_rank_all(mr, prev_state, merge_ctx);
1135 kmr_ckpt_restore_prev_state_each_rank_selective(mr, prev_state,
1145 struct kmr_ckpt_list *c = kmr_ckpt_kvs_chains_find(chains, kvi_id);
1154 kmr_ckpt_find_data_file(
long kvs_id,
1158 for (
int i = 0; i < nfiles; i++) {
1159 if (dataflst[i].kvs_id == kvs_id) {
1160 file = &dataflst[i];
1169 kmr_ckpt_merge_ignore_ckpt_data(
long kvo_id,
1174 kmr_ckpt_find_data_file(kvo_id, prev_state->dataflst,
1175 prev_state->dataflst_size);
1183 if (file->merged == 1) {
1185 for (
int i = 0; i < merge_ctx->merges_count; i++) {
1186 if (merge_ctx->merges[i].kvs_id == kvo_id) {
1187 merge = &merge_ctx->merges[i];
1194 for (
int i = 0; i < merge->src_lst_count; i++) {
1195 if (merge->src_lst[i].rank == prev_state->prev_rank) {
1197 mg_src = &merge->src_lst[i];
1201 assert(idx != -1 && mg_src != 0);
1203 if (mg_src->done_ikv_lst_size != 0) {
1204 kmr_free(mg_src->done_ikv_lst,
1205 sizeof(
long) * (
size_t)mg_src->done_ikv_lst_size);
1207 if (merge->src_lst_count == 1) {
1210 for (
int i = idx; i < merge->src_lst_count - 1; i++) {
1215 memset(&merge->src_lst[merge->src_lst_count - 1], 0,
1218 merge->src_lst_count -= 1;
1225 kmr_ckpt_merge_store_ckpt_data(
long kvo_id,
int rank,
long nkvo,
1230 kmr_ckpt_find_data_file(kvo_id, prev_state->dataflst,
1231 prev_state->dataflst_size);
1232 if (file == 0 || file->merged == 1) {
1239 int cnt = merge_ctx->merges_count;
1240 for (
int i = 0; i < cnt; i++) {
1241 if (merge_ctx->merges[i].kvs_id == kvo_id) {
1242 merge = &merge_ctx->merges[i];
1248 merge = &merge_ctx->merges[cnt++];
1250 merge->kvs_id = kvo_id;
1253 merge->src_lst_count = 0;
1254 merge_ctx->merges_count = cnt;
1257 mg_src->rank = prev_state->prev_rank;
1259 mg_src->n_kvo = nkvo;
1260 mg_src->done_ikv_lst = 0;
1261 mg_src->done_ikv_lst_size = 0;
1262 mg_src->kvi_op_seqno = -1;
1263 mg_src->file = file;
1264 merge->src_lst_count += 1;
1268 kmr_ckpt_cmp_long(
const void *v1,
const void *v2)
1270 long _v1 = *((
long *)v1);
1271 long _v2 = *((
long *)v2);
1274 }
else if ( _v1 < _v2 ) {
1283 kmr_ckpt_merge_update_ckpt_data(
long kvi_id,
int rank,
1284 long kvi_op_seqno,
long nkvi,
1290 kmr_ckpt_find_data_file(kvi_id, prev_state->dataflst,
1291 prev_state->dataflst_size);
1293 assert(file->checked == 1 && file->merged == 1);
1296 for (
int i = 0; i < merge_ctx->merges_count; i++) {
1297 if (merge_ctx->merges[i].kvs_id == kvi_id) {
1298 merge = &merge_ctx->merges[i];
1304 for (
int i = 0; i < merge->src_lst_count; i++) {
1305 if (merge->src_lst[i].rank == prev_state->prev_rank) {
1306 mg_src = &merge->src_lst[i];
1310 assert(mg_src != 0);
1311 assert(mg_src->n_kvo == -1);
1312 mg_src->n_kvi = nkvi;
1313 mg_src->kvi_op_seqno = kvi_op_seqno;
1314 if (done_ikv_lst->size != 0) {
1315 mg_src->done_ikv_lst =
1316 (
long *)
kmr_malloc(
sizeof(
long) * (size_t)done_ikv_lst->size);
1319 for (item = done_ikv_lst->head; item != 0; item = item->next) {
1320 mg_src->done_ikv_lst[idx] = *(
long *)item->val;
1323 qsort(mg_src->done_ikv_lst, (
size_t)done_ikv_lst->size,
sizeof(
long),
1325 mg_src->done_ikv_lst_size = done_ikv_lst->size;
1333 kmr_ckpt_merge_sort_data(
KMR *mr,
const char *wdpath,
long kvs_id,
1336 assert(mrg_src->file->merged == 1);
1339 ndata->kvs_id = kvs_id;
1342 snprintf(ndata->fname,
sizeof(ndata->fname),
"%s.sorted",
1343 mrg_src->file->fname);
1344 strncpy(ndata->dname, wdpath,
sizeof(ndata->dname) - 1);
1346 char dst_fpath[KMR_CKPT_PATHLEN];
1347 snprintf(dst_fpath, KMR_CKPT_PATHLEN,
"%s/%s", ndata->dname, ndata->fname);
1348 int cc = access(dst_fpath, F_OK);
1350 FILE *wfp = kmr_ckpt_open_path(mr, dst_fpath,
"w");
1352 char tmp_fpath[KMR_CKPT_PATHLEN];
1353 snprintf(tmp_fpath, KMR_CKPT_PATHLEN,
"%s/%s.rest",
1354 ndata->dname, ndata->fname);
1355 cc = access(tmp_fpath, F_OK);
1358 char src_fpath[KMR_CKPT_PATHLEN];
1359 snprintf(src_fpath, KMR_CKPT_PATHLEN,
"%s/%s",
1360 mrg_src->file->dname, mrg_src->file->fname);
1362 cc = stat(src_fpath, &sb);
1364 char msg[KMR_CKPT_MSGLEN];
1365 snprintf(msg,
sizeof(msg),
1366 "Failed to access a checkpoint data file %s", src_fpath);
1369 FILE *rfp = kmr_ckpt_open_path(mr, src_fpath,
"r");
1374 size_t rc = fread((
void *)&hdr, hdrsiz, 1, rfp);
1376 char msg[KMR_CKPT_MSGLEN];
1377 snprintf(msg,
sizeof(msg),
1378 "Failed to read a checkpoint data file %s", src_fpath);
1381 rc = fwrite((
void *)&hdr, hdrsiz, 1, wfp);
1383 char msg[KMR_CKPT_MSGLEN];
1384 snprintf(msg,
sizeof(msg),
1385 "Failed to write a checkpoint data file %s", dst_fpath);
1390 size_t total_size = (size_t)sb.st_size - hdrsiz;
1391 size_t cur_size = 0;
1394 size_t read_size = 0;
1395 long idx = 0, start_idx = 0;
1398 FILE *wfp2 = kmr_ckpt_open_path(mr, tmp_fpath,
"w");
1399 while (read_size < total_size) {
1403 rc = fread((
void *)&e, kv_hdrsiz, 1, rfp);
1405 char msg[KMR_CKPT_MSGLEN];
1406 snprintf(msg,
sizeof(msg),
1407 "Failed to read a checkpoint data file %s", src_fpath);
1412 if (bufsiz < kv_bdysiz) {
1416 rc = fread(buf, kv_bdysiz, 1, rfp);
1418 char msg[KMR_CKPT_MSGLEN];
1419 snprintf(msg,
sizeof(msg),
1420 "Failed to read a checkpoint data file %s", src_fpath);
1426 for (
long i = start_idx; i < mrg_src->done_ikv_lst_size; i++) {
1427 if (idx == mrg_src->done_ikv_lst[i]) {
1434 cur_size += kv_hdrsiz + kv_bdysiz;
1439 rc = fwrite((
void *)&e, kv_hdrsiz, 1, twfp);
1441 char msg[KMR_CKPT_MSGLEN];
1442 snprintf(msg,
sizeof(msg),
1443 "Failed to write a checkpoint data file %s",
1447 rc = fwrite(buf, kv_bdysiz, 1, twfp);
1449 char msg[KMR_CKPT_MSGLEN];
1450 snprintf(msg,
sizeof(msg),
1451 "Failed to write a checkpoint data file %s",
1455 read_size += kv_hdrsiz + kv_bdysiz;
1458 kmr_free(buf, bufsiz);
1459 kmr_ckpt_flush(mr, wfp2);
1465 cc = stat(tmp_fpath, &sb);
1467 char msg[KMR_CKPT_MSGLEN];
1468 snprintf(msg,
sizeof(msg),
1469 "Failed to access a checkpoint data file %s", tmp_fpath);
1472 rfp = kmr_ckpt_open_path(mr, tmp_fpath,
"r");
1474 rc = fread(buf, (
size_t)sb.st_size, 1, rfp);
1476 char msg[KMR_CKPT_MSGLEN];
1477 snprintf(msg,
sizeof(msg),
1478 "Failed to read a checkpoint data file %s", tmp_fpath);
1481 rc = fwrite(buf, (
size_t)sb.st_size, 1, wfp);
1483 char msg[KMR_CKPT_MSGLEN];
1484 snprintf(msg,
sizeof(msg),
1485 "Failed to write a checkpoint data file %s", dst_fpath);
1488 kmr_free(buf, (
size_t)sb.st_size);
1489 assert((cur_size + (
size_t)sb.st_size) == total_size);
1492 kmr_ckpt_flush(mr, wfp);
1495 mrg_src->file = ndata;
1500 kmr_ckpt_merge_write_file(
KMR *mr,
const char *wdpath,
1503 char dst_fpath[KMR_CKPT_PATHLEN];
1504 kmr_ckpt_make_fname(wdpath, KMR_CKPT_FNAME_PREFIX,
1505 KMR_CKPT_DATA, merge->rank, merge->kvs_id,
1506 dst_fpath,
sizeof(dst_fpath));
1507 int cc = access(dst_fpath, F_OK);
1509 FILE *wfp = kmr_ckpt_open_path(mr, dst_fpath,
"w");
1512 char hdr_src_fpath[KMR_CKPT_PATHLEN];
1513 snprintf(hdr_src_fpath, KMR_CKPT_PATHLEN,
"%s/%s",
1514 merge->src_lst[0].file->dname, merge->src_lst[0].file->fname);
1515 FILE *rfp = kmr_ckpt_open_path(mr, hdr_src_fpath,
"r");
1519 size_t rc = fread((
void *)&hdr, hdrsiz, 1, rfp);
1521 char msg[KMR_CKPT_MSGLEN];
1522 snprintf(msg,
sizeof(msg),
1523 "Failed to read a checkpoint data file %s", hdr_src_fpath);
1526 hdr.nprocs = mr->nprocs;
1527 hdr.rank = mr->rank;
1528 rc = fwrite((
void *)&hdr, hdrsiz, 1, wfp);
1530 char msg[KMR_CKPT_MSGLEN];
1531 snprintf(msg,
sizeof(msg),
1532 "Failed to write a checkpoint data file %s", dst_fpath);
1543 struct merge_file *mfs = (
struct merge_file *)
1544 kmr_malloc(
sizeof(
struct merge_file) * (size_t)merge->src_lst_count);
1545 for (
int i = 0; i < merge->src_lst_count; i++) {
1547 assert(file->merged == 1);
1548 char fpath[KMR_CKPT_PATHLEN];
1549 snprintf(fpath, KMR_CKPT_PATHLEN,
"%s/%s", file->dname, file->fname);
1551 cc = stat(fpath, &sb);
1553 char msg[KMR_CKPT_MSGLEN];
1554 snprintf(msg,
sizeof(msg),
1555 "Failed to access a checkpoint data file %s", fpath);
1559 mfs[i].fp = kmr_ckpt_open_path(mr, fpath,
"r");
1560 mfs[i].size = (size_t)sb.st_size - hdrsiz;
1561 mfs[i].cur_size = 0;
1562 fseek(mfs[i].fp, (
long)hdrsiz, SEEK_SET);
1566 for (
int i = 0; i < merge->src_lst_count; i++) {
1568 if (mg_src->n_kvi > 0) {
1569 assert(mg_src->n_kvo == -1);
1572 unsigned char *buf = (
unsigned char *)
kmr_malloc(bufsiz);
1573 while (mfs[i].cur_size < mfs[i].size) {
1574 if (kvicnt >= mg_src->n_kvi) {
1580 rc = fread((
void *)&e, kv_hdrsiz, 1, mfs[i].fp);
1582 char msg[KMR_CKPT_MSGLEN];
1583 snprintf(msg,
sizeof(msg),
1584 "Failed to read a checkpoint data file");
1589 if (bufsiz < kv_bdysiz) {
1593 rc = fread((
void *)buf, kv_bdysiz, 1, mfs[i].fp);
1595 char msg[KMR_CKPT_MSGLEN];
1596 snprintf(msg,
sizeof(msg),
1597 "Failed to read a checkpoint data file");
1601 rc = fwrite((
void *)&e, kv_hdrsiz, 1, wfp);
1603 char msg[KMR_CKPT_MSGLEN];
1604 snprintf(msg,
sizeof(msg),
1605 "Failed to write a checkpoint data file %s",
1609 rc = fwrite((
void *)buf, kv_bdysiz, 1, wfp);
1611 char msg[KMR_CKPT_MSGLEN];
1612 snprintf(msg,
sizeof(msg),
1613 "Failed to write a checkpoint data file %s",
1618 mfs[i].cur_size += kv_hdrsiz + kv_bdysiz;
1620 kmr_free(buf, bufsiz);
1625 for (
int i = 0; i < merge->src_lst_count; i++) {
1629 unsigned char *buf = (
unsigned char *)
kmr_malloc(bufsiz);
1630 while (mfs[i].cur_size < mfs[i].size) {
1631 if ((mg_src->n_kvo >= 0) && (kvocnt >= mg_src->n_kvo)) {
1637 rc = fread((
void *)&e, kv_hdrsiz, 1, mfs[i].fp);
1639 char msg[KMR_CKPT_MSGLEN];
1640 snprintf(msg,
sizeof(msg),
1641 "Failed to read a checkpoint data file");
1646 if (bufsiz < kv_bdysiz) {
1650 rc = fread((
void *)buf, kv_bdysiz, 1, mfs[i].fp);
1652 char msg[KMR_CKPT_MSGLEN];
1653 snprintf(msg,
sizeof(msg),
1654 "Failed to read a checkpoint data file");
1658 rc = fwrite((
void *)&e, kv_hdrsiz, 1, wfp);
1660 char msg[KMR_CKPT_MSGLEN];
1661 snprintf(msg,
sizeof(msg),
1662 "Failed to write a checkpoint data file %s",
1666 rc = fwrite((
void *)buf, kv_bdysiz, 1, wfp);
1668 char msg[KMR_CKPT_MSGLEN];
1669 snprintf(msg,
sizeof(msg),
1670 "Failed to write a checkpoint data file %s",
1675 mfs[i].cur_size += kv_hdrsiz + kv_bdysiz;
1677 kmr_free(buf, bufsiz);
1680 for (
int i = 0; i < merge->src_lst_count; i++) {
1683 kmr_free(mfs,
sizeof(
struct merge_file) * (
size_t)merge->src_lst_count);
1684 kmr_ckpt_flush(mr, wfp);
1694 kmr_ckpt_init_log(
KMR *mr,
const char *log_fpath)
1697 memset((
void *)&ckptld, 0,
sizeof(ckptld));
1698 if (mr->ckpt_selective) {
1699 ckptld.mode = KMR_CKPT_SELECTIVE;
1701 ckptld.mode = KMR_CKPT_ALL;
1703 ckptld.nprocs = mr->nprocs;
1704 ckptld.rank = mr->rank;
1705 FILE *fp = kmr_ckpt_open_path(mr, log_fpath,
"w");
1707 size_t ret = fwrite((
void *)&ckptld, size, 1, fp);
1709 char msg[KMR_CKPT_MSGLEN];
1710 snprintf(msg,
sizeof(msg),
1711 "Failed to write header of checkpoint log %s", log_fpath);
1714 kmr_ckpt_flush(mr, fp);
1715 mr->ckpt_ctx->ckpt_log_fp = fp;
1719 kmr_ckpt_fin_log(
KMR *mr)
1721 fclose(mr->ckpt_ctx->ckpt_log_fp);
1729 ckptctx->ckpt_log_fp);
1731 char msg[KMR_CKPT_MSGLEN];
1732 snprintf(msg,
sizeof(msg),
"Failed to add checkpoint log");
1735 kmr_ckpt_flush(mr, ckptctx->ckpt_log_fp);
1739 kmr_ckpt_save_log2(
KMR *mr,
int state)
1743 ckptle.op_seqno = ckptctx->progress_counter;
1744 ckptle.kvi_id = ckptctx->cur_kvi_id;
1745 ckptle.kvo_id = ckptctx->cur_kvo_id;
1746 ckptle.state = state;
1749 kmr_ckpt_save_log_raw(mr, &ckptle);
1753 kmr_ckpt_save_log4(
KMR *mr,
int state,
long nkvi,
long nkvo)
1757 ckptle.op_seqno = ckptctx->progress_counter;
1758 ckptle.kvi_id = ckptctx->cur_kvi_id;
1759 ckptle.kvo_id = ckptctx->cur_kvo_id;
1760 ckptle.state = state;
1761 ckptle.n_kvi = nkvi;
1762 ckptle.n_kvo = nkvo;
1763 kmr_ckpt_save_log_raw(mr, &ckptle);
1767 kmr_ckpt_save_log_del(
KMR *mr,
int state,
long kvs_id)
1770 ckptle.op_seqno = -1;
1771 ckptle.kvi_id = kvs_id;
1772 ckptle.kvo_id = kvs_id;
1773 ckptle.state = state;
1776 kmr_ckpt_save_log_raw(mr, &ckptle);
1780 kmr_ckpt_save_log_lock(
KMR *mr,
int state)
1783 ckptle.op_seqno = -1;
1784 ckptle.kvi_id = KMR_CKPT_DUMMY_ID;
1785 ckptle.kvo_id = KMR_CKPT_DUMMY_ID;
1786 ckptle.state = state;
1789 kmr_ckpt_save_log_raw(mr, &ckptle);
1796 kmr_ckpt_log_whole_start(
KMR *mr)
1798 kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_WHOLE_START);
1805 kmr_ckpt_log_whole_finish(
KMR *mr)
1807 kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_WHOLE_FINISH);
1815 kmr_ckpt_log_block_start(
KMR *mr,
KMR_KVS *kvo)
1818 long nkvo = (kvo == 0) ? 0 : kvo->c.element_count;
1819 kmr_ckpt_save_log4(mr, KMR_CKPT_LOG_BLOCK_START, nkvi, nkvo);
1828 kmr_ckpt_log_block_add(
KMR *mr,
long nkvi,
long nkvo)
1830 kmr_ckpt_save_log4(mr, KMR_CKPT_LOG_BLOCK_ADD, nkvi, nkvo);
1838 kmr_ckpt_log_block_finish(
KMR *mr)
1840 kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_BLOCK_FINISH);
1848 kmr_ckpt_log_index_start(
KMR *mr,
KMR_KVS *kvo)
1851 long nkvo = (kvo == 0) ? 0 : kvo->c.element_count;
1852 kmr_ckpt_save_log4(mr, KMR_CKPT_LOG_INDEX_START, nkvi, nkvo);
1861 kmr_ckpt_log_index_add(
KMR *mr,
long ikv_index,
long nkvo)
1863 kmr_ckpt_save_log4(mr, KMR_CKPT_LOG_INDEX_ADD, ikv_index, nkvo);
1870 kmr_ckpt_log_index_finish(
KMR *mr)
1872 kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_INDEX_FINISH);
1880 kmr_ckpt_log_delete_start(
KMR *mr,
long kvs_id)
1882 kmr_ckpt_save_log_del(mr, KMR_CKPT_LOG_DELETE_START, kvs_id);
1890 kmr_ckpt_log_delete_finish(
KMR *mr,
long kvs_id)
1892 kmr_ckpt_save_log_del(mr, KMR_CKPT_LOG_DELETE_FINISH, kvs_id);
1901 kmr_ckpt_log_deletable(
KMR *mr,
long kvs_id)
1903 kmr_ckpt_save_log_del(mr, KMR_CKPT_LOG_DELETABLE, kvs_id);
1910 kmr_ckpt_log_progress(
KMR *mr)
1913 if (!(ckptctx->cur_kvi_id == KMR_CKPT_DUMMY_ID &&
1914 ckptctx->cur_kvo_id == KMR_CKPT_DUMMY_ID) ) {
1915 kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_PROGRESS);
1923 kmr_ckpt_log_skipped(
KMR *mr)
1925 kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_SKIPPED);
1936 kmr_ckpt_save_log_lock(mr, KMR_CKPT_LOG_LOCK_START);
1947 kmr_ckpt_save_log_lock(mr, KMR_CKPT_LOG_LOCK_FINISH);
1951 kmr_ckpt_open_log(
KMR *mr,
const char *path,
struct kmr_ckpt_log *log_hdr,
1952 unsigned long *size)
1955 int cc = stat(path, &sb);
1957 char msg[KMR_CKPT_MSGLEN];
1958 snprintf(msg,
sizeof(msg),
1959 "Failed to access a checkpoint log %s", path);
1962 FILE *fp = kmr_ckpt_open_path(mr, path,
"r");
1964 size_t rc = fread((
void *)log_hdr, hdrsz, 1, fp);
1966 char msg[KMR_CKPT_MSGLEN];
1967 snprintf(msg,
sizeof(msg),
1968 "Failed to read a checkpoint log %s", path);
1971 assert(sb.st_size >= 0);
1972 assert((
size_t)sb.st_size >= hdrsz);
1973 *size = (size_t)sb.st_size - hdrsz;
1984 kmr_ckpt_write_file_p(
KMR *mr)
1986 assert(mr->ckpt_enable);
1987 if (mr->ckpt_selective && !mr->ckpt_ctx->slct_cur_take_ckpt) {
1996 kmr_ckpt_delete_ckpt_data(
KMR *mr,
long kvs_id)
1998 char fpath[KMR_CKPT_PATHLEN];
2000 kmr_ckpt_make_fname(ckptctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
2001 KMR_CKPT_DATA, mr->rank, kvs_id, fpath,
sizeof(fpath));
2002 int cc = access(fpath, F_OK);
2004 kmr_ckpt_log_delete_start(mr, kvs_id);
2007 kmr_ckpt_log_delete_finish(mr, kvs_id);
2017 kmr_ckpt_delete_ckpt_files(
KMR *mr,
const char *target_dir,
int rank)
2020 int cc = stat(target_dir, &sb);
2022 if (!S_ISDIR(sb.st_mode)) {
2023 char msg[KMR_CKPT_MSGLEN];
2024 snprintf(msg,
sizeof(msg),
2025 "File %s should not exist or " 2026 "if exists, shoud be a directory.", target_dir);
2036 kmr_ckpt_get_data_flist(mr, target_dir, &dataflst, &nfiles, 0);
2037 for (
int i = 0; i < nfiles; i++) {
2038 char fpath[KMR_CKPT_PATHLEN];
2039 snprintf(fpath,
sizeof(fpath),
"%s/%s", target_dir, dataflst[i].fname);
2040 cc = access(fpath, F_OK);
2045 "Failed to delete checkpoint file %s on rank[%05d]\n",
2049 if (dataflst != NULL) {
2054 char fpath[KMR_CKPT_PATHLEN];
2055 kmr_ckpt_make_fname(target_dir, KMR_CKPT_FNAME_PREFIX, KMR_CKPT_LOG,
2056 rank, 0, fpath,
sizeof(fpath));
2057 cc = access(fpath, F_OK);
2063 if (mr->rank == 0) {
2064 char fpath[KMR_CKPT_PATHLEN];
2065 memset(fpath, 0,
sizeof(fpath));
2066 snprintf(fpath,
sizeof(fpath),
"%s/nprocs", target_dir);
2067 cc = access(fpath, F_OK);
2073 cc = rmdir(target_dir);
2081 kmr_ckpt_init_data_file(
KMR *mr,
const char *dname ,
const char *fname,
2084 char fpath[KMR_CKPT_PATHLEN];
2085 snprintf(fpath, KMR_CKPT_PATHLEN,
"%s/%s", dname, fname);
2086 int cc = access(fpath, F_OK);
2088 char msg[KMR_CKPT_MSGLEN];
2089 snprintf(msg,
sizeof(msg),
2090 "Failed to access checkpoint file %s", fpath);
2096 FILE *fp = kmr_ckpt_open_path(mr, fpath,
"r");
2097 size_t rc = fread((
void *)&hdr, hdrsz, 1, fp);
2099 file->kvs_id = hdr.kvs_id;
2101 char msg[KMR_CKPT_MSGLEN];
2102 snprintf(msg,
sizeof(msg),
2103 "Failed to read checkpoint file %s. Ignore this file",
2105 kmr_warning(mr, 1, msg);
2106 file->kvs_id = KMR_CKPT_DUMMY_ID;
2111 strncpy(file->fname, fname,
sizeof(file->fname) - 1);
2112 strncpy(file->dname, dname,
sizeof(file->dname) - 1);
2119 kmr_ckpt_save_ckpt(
KMR_KVS *kvs) {
2121 size_t tsize = kvs->c.storage_netsize + offsetof(
struct kmr_ckpt_data, data);
2123 memset(buf, 0, tsize);
2126 ckpt->nprocs = kvs->c.mr->nprocs;
2127 ckpt->rank = kvs->c.mr->rank;
2128 ckpt->kvs_id = kvs->c.ckpt_kvs_id;
2129 ckpt->key_data = kvs->c.key_data;
2130 ckpt->value_data = kvs->c.value_data;
2132 enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvs->c.key_data);
2133 enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvs->c.value_data);
2135 unsigned char *p = (
unsigned char *)&ckpt->data[0];
2136 ckptctx->saved_current_block = kvs->c.current_block;
2137 kvs->c.current_block = kvs->c.first_block;
2139 while (cnt < kvs->c.element_count) {
2140 assert(kvs->c.current_block != 0);
2144 for (
long i = 0; i < b->partial_element_count; i++) {
2147 kmr_poke_kv2((
struct kmr_kvs_entry *)p, ev, 0, keyf, valf, 0);
2149 e = kmr_kvs_next(kvs, e, 1);
2152 kvs->c.current_block = b->next;
2154 kvs->c.current_block = ckptctx->saved_current_block;
2156 FILE *fp = kmr_ckpt_open(kvs,
"w");
2157 size_t ret = fwrite(buf, tsize, 1, fp);
2159 char msg[KMR_CKPT_MSGLEN];
2160 snprintf(msg,
sizeof(msg),
"Checkpoint: save checkpoint error write failed");
2161 kmr_error(kvs->c.mr, msg);
2163 kmr_ckpt_flush(kvs->c.mr, fp);
2172 kmr_ckpt_kv_record_init_data(
KMR *mr,
KMR_KVS *kvs)
2175 mr->ckpt_ctx->ckpt_data_fp = NULL;
2180 char fpath[KMR_CKPT_PATHLEN];
2182 kmr_ckpt_make_fname(ckptctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
2183 KMR_CKPT_DATA, mr->rank, kvs->c.ckpt_kvs_id,
2184 fpath,
sizeof(fpath));
2185 int cc = access(fpath, W_OK);
2188 fp = kmr_ckpt_open_path(mr, fpath,
"a+");
2189 }
else if (cc < 0 && errno == ENOENT) {
2192 memset((
void *)&ckpt, 0,
sizeof(ckpt));
2193 ckpt.nprocs = mr->nprocs;
2194 ckpt.rank = mr->rank;
2195 ckpt.kvs_id = kvs->c.ckpt_kvs_id;
2196 ckpt.key_data = kvs->c.key_data;
2197 ckpt.value_data = kvs->c.value_data;
2198 fp = kmr_ckpt_open_path(mr, fpath,
"w+");
2200 size_t ret = fwrite((
void *)&ckpt, size, 1, fp);
2202 char msg[KMR_CKPT_MSGLEN];
2203 snprintf(msg,
sizeof(msg),
2204 "Failed to write header of checkpoint file %s", fpath);
2207 kmr_ckpt_flush(mr, fp);
2212 ckptctx->ckpt_data_fp = fp;
2220 kmr_ckpt_kv_record_init(
KMR *mr,
KMR_KVS *kvo)
2223 kmr_ckpt_kv_record_init_data(mr, kvo);
2227 ckptctx->saved_current_block = kvo->c.current_block;
2228 ckptctx->saved_adding_point = kvo->c.adding_point;
2229 ckptctx->saved_element_count = kvo->c.element_count;
2240 kmr_ckpt_kv_record_add(
KMR_KVS *kvo)
2247 assert(ckptctx->ckpt_data_fp != NULL);
2248 long cnt = kvo->c.element_count - ckptctx->saved_element_count;
2252 b = kvo->c.first_block;
2256 e = kmr_kvs_first_entry(kvo, b);
2259 for (
long i = 0; i < cnt; i++) {
2260 if (kmr_kvs_entry_tail_p(e)) {
2263 e = kmr_kvs_first_entry(kvo, b);
2266 size_t size = kmr_kvs_entry_netsize(e);
2267 size_t ret = fwrite((
void *)e, size, 1, ckptctx->ckpt_data_fp);
2269 char msg[KMR_CKPT_MSGLEN];
2270 snprintf(msg,
sizeof(msg),
2271 "Failed to add kv to a checkpoint file");
2272 kmr_error(kvo->c.mr, msg);
2274 e = kmr_kvs_next_entry(kvo, e);
2276 kmr_ckpt_flush(kvo->c.mr, ckptctx->ckpt_data_fp);
2277 ckptctx->saved_current_block = b;
2278 ckptctx->saved_adding_point = e;
2279 ckptctx->saved_element_count = kvo->c.element_count;
2288 kmr_ckpt_kv_record_fin(
KMR *mr)
2292 if (ckptctx->ckpt_data_fp != NULL) {
2293 kmr_ckpt_flush(mr, ckptctx->ckpt_data_fp);
2294 fclose(ckptctx->ckpt_data_fp);
2297 ckptctx->ckpt_data_fp = NULL;
2298 ckptctx->saved_element_count = 0;
2299 ckptctx->saved_adding_point = NULL;
2300 ckptctx->saved_current_block = NULL;
2308 kmr_ckpt_get_data_flist(
KMR *mr,
const char *dname,
2313 int cc = stat(dname, &sb);
2318 if (!S_ISDIR(sb.st_mode)) {
2319 fprintf(stderr,
"File %s is not a directory.\n", dname);
2325 long nmax = pathconf(dname, _PC_NAME_MAX);
2327 direntsz = (64 * 1024);
2329 direntsz = (offsetof(
struct dirent, d_name) + (size_t)nmax + 1);
2332 struct dirent *dent;
2337 fprintf(stderr,
"Failed to open directory %s.\n", dname);
2342 char prefix[KMR_CKPT_PATHLEN];
2343 snprintf(prefix, KMR_CKPT_PATHLEN, KMR_CKPT_FNAME_PREFIX
"_data_");
2345 while (readdir_r(d, (
void *)b, &dent) == 0) {
2349 cc = strncmp(dent->d_name, prefix, strlen(prefix));
2357 memset(dataflst, 0, siz);
2361 while (readdir_r(d, (
void *)b, &dent) == 0) {
2365 cc = strncmp(dent->d_name, prefix, strlen(prefix));
2367 kmr_ckpt_init_data_file(mr, dname, dent->d_name, setall,
2380 kmr_ckpt_save_nprocs(
KMR *mr,
const char *dname)
2383 const char *target = (dname != 0) ? dname : ckptctx->ckpt_dname;
2384 char fpath[KMR_CKPT_PATHLEN], wstring[128], msg[KMR_CKPT_MSGLEN];
2385 memset(fpath, 0,
sizeof(fpath));
2386 snprintf(fpath,
sizeof(fpath),
"%s/nprocs", target);
2387 int cc = access(fpath, R_OK);
2391 FILE *fp = kmr_ckpt_open_path(mr, fpath,
"w");
2393 snprintf(msg,
sizeof(msg),
2394 "Failed to open nprocs file %s", fpath);
2397 memset(wstring, 0,
sizeof(wstring));
2398 snprintf(wstring,
sizeof(wstring),
"nprocs=%d\n", mr->nprocs);
2399 size_t ret = fwrite(wstring, strlen(wstring), 1, fp);
2401 snprintf(msg,
sizeof(msg),
"Failed to save nprocs to file %s",
2405 kmr_ckpt_flush(mr, fp);
2412 kmr_ckpt_open(
KMR_KVS *kvs,
const char *mode)
2414 char fpath[KMR_CKPT_PATHLEN];
2415 KMR *mr = kvs->c.mr;
2417 kmr_ckpt_make_fname(ckptctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
2418 KMR_CKPT_DATA, mr->rank, kvs->c.ckpt_kvs_id,
2419 fpath,
sizeof(fpath));
2420 FILE *fp = kmr_ckpt_open_path(mr, fpath, mode);
2426 kmr_ckpt_open_path(
KMR *mr,
const char *fpath,
const char *mode)
2428 FILE *fp = fopen(fpath, mode);
2430 char msg[KMR_CKPT_MSGLEN];
2431 snprintf(msg,
sizeof(msg),
2432 "Failed to open a checkpoint file %s", fpath);
2435 int cc = fcntl(fileno(fp), F_SETFD, FD_CLOEXEC);
2442 kmr_ckpt_make_fname(
const char *dirname,
const char *fprefix,
2443 enum kmr_ckpt_type type,
2444 int rank,
long kvs_id,
char *fpath,
size_t len)
2446 memset(fpath, 0, len);
2447 assert(type == KMR_CKPT_DATA || type == KMR_CKPT_LOG);
2448 if (type == KMR_CKPT_DATA) {
2449 snprintf(fpath, len-1,
"%s/%s_data_%05d_%03ld",
2450 dirname, fprefix, rank, kvs_id);
2451 }
else if (type == KMR_CKPT_LOG) {
2452 snprintf(fpath, len-1,
"%s/%s_log_%05d",
2453 dirname, fprefix, rank);
2459 kmr_ckpt_flush(
KMR *mr, FILE *fp)
2462 if (!mr->ckpt_no_fsync) {
2463 int cc = fsync(fileno(fp));
2482 if (mr->ckpt_enable == 1 && ckptctx->initialized) {
2498 if (mr->ckpt_enable == 1 && ckptctx->initialized &&
2499 ckptctx->lock_id == 0) {
2500 mr->ckpt_enable = 0;
2501 ckptctx->lock_id = ++ckptctx->lock_counter;
2502 return ckptctx->lock_id;
2519 if (mr->ckpt_enable == 0 && ckptctx->initialized &&
2520 ckptctx->lock_id == lock_id) {
2521 mr->ckpt_enable = 1;
2522 ckptctx->lock_id = 0;
2538 if (mr->ckpt_selective) {
2542 long op_seqno = ckptctx->progress_counter;
2543 long start_from = 0;
2544 for (
int i = 0; i < ckptctx->kv_positions_count; i++) {
2545 if (ckptctx->kv_positions[i].op_seqno == op_seqno) {
2546 start_from = ckptctx->kv_positions[i].start_from;
2560 KMR *mr = kvs->c.mr;
2561 char fpath[KMR_CKPT_PATHLEN];
2562 kmr_ckpt_make_fname(mr->ckpt_ctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
2563 KMR_CKPT_DATA, mr->rank, kvs->c.ckpt_kvs_id,
2564 fpath,
sizeof(fpath));
2565 int cc = access(fpath, R_OK);
2571 cc = stat(fpath, &sb);
2573 char msg[KMR_CKPT_MSGLEN];
2574 snprintf(msg,
sizeof(msg),
2575 "Failed to access a checkpoint file %s", fpath);
2576 kmr_error(kvs->c.mr, msg);
2578 size_t siz = (size_t)sb.st_size;
2580 FILE *fp = kmr_ckpt_open_path(kvs->c.mr, fpath,
"r");
2581 size_t ret = fread(buf, siz, 1, fp);
2583 char msg[KMR_CKPT_MSGLEN];
2584 snprintf(msg,
sizeof(msg),
2585 "Failed to load a checkpoint file %s", fpath);
2586 kmr_error(kvs->c.mr, msg);
2593 while (cur_siz < siz) {
2601 cur_siz += kmr_kvs_entry_netsize(e);
2602 e = kmr_kvs_next_entry(kvs, e);
2605 assert(cur_siz == siz);
2615 KMR *mr = kvs->c.mr;
2616 if (!mr->ckpt_selective) {
2618 kmr_ckpt_delete_ckpt_data(mr, kvs->c.ckpt_kvs_id);
2621 char fpath[KMR_CKPT_PATHLEN];
2623 kmr_ckpt_make_fname(ckptctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
2624 KMR_CKPT_DATA, mr->rank, kvs->c.ckpt_kvs_id,
2625 fpath,
sizeof(fpath));
2626 int cc = access(fpath, F_OK);
2628 kmr_ckpt_log_deletable(mr, kvs->c.ckpt_kvs_id);
2641 if (kmr_ckpt_write_file_p(mr)) {
2642 kmr_ckpt_log_whole_start(mr);
2643 kmr_ckpt_save_ckpt(kvo);
2644 kmr_ckpt_log_whole_finish(mr);
2657 if (!mr->ckpt_selective) {
2658 kmr_ckpt_log_block_start(mr, kvo);
2659 kmr_ckpt_kv_record_init(mr, kvo);
2673 if (!mr->ckpt_selective) {
2674 long nkvo = kmr_ckpt_kv_record_add(kvo);
2675 kmr_ckpt_log_block_add(mr, nkvi, nkvo);
2688 if (!mr->ckpt_selective) {
2689 kmr_ckpt_kv_record_fin(mr);
2690 kmr_ckpt_log_block_finish(mr);
2706 if (!mr->ckpt_selective) {
2707 kmr_ckpt_log_index_start(mr, kvo);
2708 kmr_ckpt_kv_record_init(mr, kvo);
2721 if (!mr->ckpt_selective) {
2722 long nkvo = kmr_ckpt_kv_record_add(kvo);
2723 kmr_ckpt_log_index_add(mr, ikv_index, nkvo);
2736 if (!mr->ckpt_selective) {
2737 kmr_ckpt_kv_record_fin(mr);
2738 kmr_ckpt_log_index_finish(mr);
2756 KMR *mr = (kvo != 0) ? kvo->c.mr : kvi->c.mr;
2757 if (opt.keep_open) {
2758 char msg[KMR_CKPT_MSGLEN];
2759 snprintf(msg,
sizeof(msg),
2760 "'keep_open' option can't be used when checkpoint/restart" 2767 ckptctx->progress_counter += 1;
2769 kvi->c.ckpt_consumed_op = ckptctx->progress_counter;
2772 kvo->c.ckpt_generated_op = ckptctx->progress_counter;
2774 assert(ckptctx->cur_kvi_id == KMR_CKPT_DUMMY_ID);
2775 assert(ckptctx->cur_kvo_id == KMR_CKPT_DUMMY_ID);
2777 ckptctx->cur_kvi_id = kvi->c.ckpt_kvs_id;
2779 ckptctx->cur_kvi_id = KMR_CKPT_DUMMY_ID;
2782 ckptctx->cur_kvo_id = kvo->c.ckpt_kvs_id;
2784 ckptctx->cur_kvo_id = KMR_CKPT_DUMMY_ID;
2789 long progress = ckptctx->progress_counter;
2790 if (!mr->ckpt_selective) {
2792 if (progress <= ckptctx->prev_global_progress) {
2794 }
else if (progress > ckptctx->prev_global_progress &&
2795 progress <= ckptctx->prev_progress ) {
2799 if (kvi->c.element_count == 0) {
2810 if (progress <= ckptctx->prev_global_progress) {
2812 }
else if (progress > ckptctx->prev_global_progress &&
2813 progress <= ckptctx->prev_progress ) {
2814 long v = kmr_ckpt_int_list_del(ckptctx->slct_skip_ops, progress);
2815 if (v == progress) {
2825 kmr_ckpt_log_skipped(mr);
2826 ckptctx->cur_kvi_id = KMR_CKPT_DUMMY_ID;
2827 ckptctx->cur_kvo_id = KMR_CKPT_DUMMY_ID;
2832 if (mr->ckpt_selective) {
2833 if (opt.take_ckpt) {
2834 ckptctx->slct_cur_take_ckpt = 1;
2848 kmr_ckpt_log_progress(mr);
2850 if (mr->ckpt_selective) {
2851 ckptctx->slct_cur_take_ckpt = 0;
2853 ckptctx->cur_kvi_id = KMR_CKPT_DUMMY_ID;
2854 ckptctx->cur_kvo_id = KMR_CKPT_DUMMY_ID;
2864 kmr_ckpt_list_alocfn_t alocfn,
2865 kmr_ckpt_list_freefn_t freefn,
2866 kmr_ckpt_list_compfn_t compfn)
2871 list->alocfn = alocfn;
2872 list->freefn = freefn;
2873 list->compfn = compfn;
2881 for (item = list->head; item != 0; ) {
2884 (*(list->freefn))(del);
2886 kmr_ckpt_list_init(list, list->alocfn, list->freefn, list->compfn);
2896 if (list->size == KMR_CKPT_LIST_MAX) {
2898 list->head = list->head->next;
2899 list->head->prev = 0;
2900 (*(list->freefn))(item->val);
2901 kmr_free(item, isize);
2905 item->val = (*(list->alocfn))(val);
2908 if (list->head == 0) {
2912 list->tail->next = item;
2913 item->prev = list->tail;
2927 for (item = list->head; item != 0; item = item->next) {
2928 if ((*(list->compfn))(item->val, val) == 0) {
2934 void *ret = item->val;
2935 if (!(item == list->head || item == list->tail)) {
2936 item->prev->next = item->next;
2937 item->next->prev = item->prev;
2939 if (item == list->head) {
2940 list->head = item->next;
2941 if (list->head != 0) {
2942 list->head->prev = 0;
2945 if (item == list->tail) {
2946 list->tail = item->prev;
2947 if (list->tail != 0) {
2948 list->tail->next = 0;
2965 for (item = list->head; item != 0; item = item->next) {
2966 if ((*(list->compfn))(item->val, val) == 0) {
2976 kmr_ckpt_list_rsearch(
struct kmr_ckpt_list *list,
void *val)
2979 for (item = list->tail; item != 0; item = item->prev) {
2980 if ((*(list->compfn))(item->val, val) == 0) {
2989 kmr_ckpt_int_list_alocfn(
void *val)
2998 kmr_ckpt_int_list_freefn(
void *val)
3000 kmr_free(val,
sizeof(
long));
3005 kmr_ckpt_int_list_compfn(
void *v1,
void *v2)
3007 long _v1 = *(
long *)v1;
3008 long _v2 = *(
long *)v2;
3011 }
else if ( _v1 < _v2 ) {
3022 kmr_ckpt_list_init(list, kmr_ckpt_int_list_alocfn,
3023 kmr_ckpt_int_list_freefn, kmr_ckpt_int_list_compfn);
3030 kmr_ckpt_list_free(list);
3038 kmr_ckpt_list_add(list, &val);
3047 long *v = (
long *)kmr_ckpt_list_del(list, &val);
3058 kmr_ckpt_int_list_search(
struct kmr_ckpt_list *list,
long val)
3060 long *v = (
long *)kmr_ckpt_list_search(list, &val);
3071 kmr_ckpt_int_list_rsearch(
struct kmr_ckpt_list *list,
long val)
3073 long *v = (
long *)kmr_ckpt_list_rsearch(list, &val);
3084 static void test_kmr_ckpt_int_list()
3087 kmr_ckpt_int_list_init(&list);
3088 long v = kmr_ckpt_int_list_del(&list, 1);
3090 assert(list.size == 0);
3091 v = kmr_ckpt_int_list_search(&list, 1);
3093 assert(list.size == 0);
3094 v = kmr_ckpt_int_list_rsearch(&list, 1);
3096 assert(list.size == 0);
3097 kmr_ckpt_int_list_add(&list, 10);
3098 assert(list.size == 1);
3099 kmr_ckpt_int_list_add(&list, 20);
3100 assert(list.size == 2);
3101 kmr_ckpt_int_list_add(&list, 30);
3102 assert(list.size == 2);
3103 v = kmr_ckpt_int_list_search(&list, 10);
3105 v = kmr_ckpt_int_list_rsearch(&list, 10);
3107 v = kmr_ckpt_int_list_search(&list, 20);
3109 v = kmr_ckpt_int_list_rsearch(&list, 20);
3111 v = kmr_ckpt_int_list_search(&list, 30);
3113 v = kmr_ckpt_int_list_rsearch(&list, 30);
3115 v = kmr_ckpt_int_list_del(&list, 1);
3117 assert(list.size == 2);
3118 v = kmr_ckpt_int_list_del(&list, 20);
3120 assert(list.size == 1);
3121 v = kmr_ckpt_int_list_del(&list, 30);
3123 assert(list.head == 0);
3124 assert(list.tail == 0);
3125 kmr_ckpt_int_list_free(&list);
3126 fprintf(stderr,
"interger list test done.\n");
3132 kmr_ckpt_opr_list_alocfn(
void *val)
3141 kmr_ckpt_opr_list_freefn(
void *val)
3148 kmr_ckpt_opr_list_compfn(
void *v1,
void *v2)
3152 if ( _v1.op_seqno > _v2.op_seqno ) {
3154 }
else if ( _v1.op_seqno < _v2.op_seqno ) {
3165 kmr_ckpt_list_init(list, kmr_ckpt_opr_list_alocfn,
3166 kmr_ckpt_opr_list_freefn, kmr_ckpt_opr_list_compfn);
3173 kmr_ckpt_list_free(list);
3181 kmr_ckpt_list_add(list, &op);
3188 chains->chainlst = 0;
3189 chains->chainlst_size = 0;
3196 for (
int i = 0; i < chains->chainlst_size; i++) {
3198 kmr_ckpt_opr_list_free(list);
3200 kmr_ckpt_kvs_chains_init(chains);
3208 int idx = chains->chainlst_size;
3209 chains->chainlst_size += 1;
3212 sizeof(
struct kmr_ckpt_list) * (size_t)chains->chainlst_size);
3214 kmr_ckpt_opr_list_init(list);
3215 kmr_ckpt_opr_list_add(list, op);
3223 struct kmr_ckpt_list *list = kmr_ckpt_kvs_chains_find(chains, op.kvi_id);
3225 kmr_ckpt_opr_list_add(list, op);
3227 kmr_ckpt_kvs_chains_new_chain(chains, op);
3237 for (
int i = 0; i < chains->chainlst_size; i++) {
3241 if (last_op->kvo_id == KMR_CKPT_DUMMY_ID) {
3246 for (item = list->tail; item != 0; item = item->prev) {
3249 if (op->kvo_id == kvo_id) {
3259 static void test_kmr_ckpt_kvs_chains()
3262 kmr_ckpt_kvs_chains_init(&chains);
3264 .kvi_id = KMR_CKPT_DUMMY_ID,
3266 kmr_ckpt_kvs_chains_new_chain(&chains, op1);
3267 assert(chains.chainlst_size == 1);
3271 kmr_ckpt_kvs_chains_connect(&chains, op2);
3272 assert(chains.chainlst_size == 1);
3273 assert(chains.chainlst[0].size == 2);
3275 .kvi_id = KMR_CKPT_DUMMY_ID,
3277 kmr_ckpt_kvs_chains_new_chain(&chains, op3);
3278 assert(chains.chainlst_size == 2);
3279 kmr_ckpt_kvs_chains_free(&chains);
3280 fprintf(stderr,
"kvs chains test done.\n");
Key-Value Stream (abstract).
void kmr_ckpt_save_kvo_each_add(KMR *mr, KMR_KVS *kvo, long ikv_index)
It adds new key-value pairs of the output KVS to the checkpoint data file.
Utilities Private Part (do not include from applications).
Options to Mapping, Shuffling, and Reduction.
void kmr_ckpt_remove_ckpt(KMR_KVS *kvs)
It removes checkpoint data file.
#define KMR_ALIGN(X)
Rounds up a given size to the alignment restriction (currently eight bytes).
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
void kmr_ckpt_lock_start(KMR *mr)
Define the start position of code region that is referred when restart.
int kmr_ckpt_enable_ckpt(KMR *mr, int lock_id)
It temporally enables checkpoint/restart which has been disabled by calling kmr_ckpt_disable_ckpt().
int kmr_ckpt_disable_ckpt(KMR *mr)
It temporally disables checkpoint/restart.
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
void kmr_ckpt_save_kvo_block_fin(KMR *mr, KMR_KVS *kvo)
It finalizes saving block of key-value pairs of the output KVS to the checkpoint data file...
void kmr_ckpt_save_kvo_whole(KMR *mr, KMR_KVS *kvo)
It saves all key-value pairs in the output KVS to a checkpoint data file.
void kmr_ckpt_free_context(KMR *mr)
Free checkpoint context.
void kmr_ckpt_save_kvo_each_fin(KMR *mr, KMR_KVS *kvo)
It finalizes saving indexed key-value pairs of the output KVS to the checkpoint data file...
kmr_kv_field
Datatypes of Keys or Values.
void kmr_ckpt_save_kvo_block_add(KMR *mr, KMR_KVS *kvo, long nkvi)
It adds a new block of key-value pairs of the output KVS to the checkpoint data file.
long kmr_ckpt_first_unprocessed_kv(KMR *mr)
It returns the index of the first unprocessed key-value in the input KVS.
Handy Copy of a Key-Value Field.
void kmr_ckpt_save_kvo_each_init(KMR *mr, KMR_KVS *kvo)
It initializes saving indexed key-value pairs of the output KVS to a checkpoint data file...
void kmr_ckpt_progress_fin(KMR *mr)
It finalizes the progress of MapReduce checkpointing.
void kmr_ckpt_lock_finish(KMR *mr)
Define the end position of code region that is referred when restart.
void kmr_ckpt_create_context(KMR *mr)
Initialize checkpoint context.
#define kmr_realloc(P, Z)
Allocates memory, or aborts when failed.
Checkpoint/Restart Support.
int kmr_ckpt_progress_init(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
It initializes a progress of MapReduce checkpointing.
int kmr_ckpt_enabled(KMR *mr)
Check if checkpoint/restart is enabled.
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv().
void kmr_ckpt_save_kvo_block_init(KMR *mr, KMR_KVS *kvo)
It initializes saving blocks of key-value pairs of the output KVS to a checkpoint data file...
void kmr_ckpt_restore_ckpt(KMR_KVS *kvs)
It restores checkpoint data to kvs.