KMR
kmrckpt.c
Go to the documentation of this file.
1 /* kmrckpt.c (2014-04-01) */
2 /* Copyright (C) 2012-2018 RIKEN R-CCS */
3 
4 /** \file kmrckpt.c Checkpoint/Restart Support. */
5 
6 #include <mpi.h>
7 #include <assert.h>
8 #include <stddef.h>
9 #include <stdlib.h>
10 #include <stdio.h>
11 #include <unistd.h>
12 #include <dirent.h>
13 #include <sys/types.h>
14 #include <sys/stat.h>
15 #include <string.h>
16 #include <fcntl.h>
17 #include <errno.h>
18 #include <limits.h>
19 
20 #ifdef _OPENMP
21 #include <omp.h>
22 #endif
23 #include "../config.h"
24 #include "kmr.h"
25 #include "kmrimpl.h"
26 #include "kmrckpt.h"
27 
28 /* Functions for initialization */
29 static void kmr_ckpt_init_environment(KMR *);
30 static int kmr_ckpt_check_restart(KMR *, int **, int *, int *);
31 static void kmr_ckpt_restore_prev_progress(KMR *, int *, int);
32 static void kmr_ckpt_restore_prev_state(KMR *, const char *, int*, int, int);
33 static void kmr_ckpt_restore_prev_state_each_rank(KMR *,
34  struct kmr_ckpt_prev_state *,
35  struct kmr_ckpt_merge_ctx *);
36 static int kmr_ckpt_merge_check_ignorable(struct kmr_ckpt_kvs_chains *, long);
37 static void kmr_ckpt_merge_ignore_ckpt_data(long,
38  struct kmr_ckpt_prev_state *,
39  struct kmr_ckpt_merge_ctx *);
40 static void kmr_ckpt_merge_store_ckpt_data(long, int, long,
41  struct kmr_ckpt_prev_state *,
42  struct kmr_ckpt_merge_ctx *);
43 static void kmr_ckpt_merge_update_ckpt_data(long, int, long, long,
44  struct kmr_ckpt_list *,
45  struct kmr_ckpt_prev_state *,
46  struct kmr_ckpt_merge_ctx *);
47 static void kmr_ckpt_merge_sort_data(KMR *, const char *, long,
48  struct kmr_ckpt_merge_source *);
49 static void kmr_ckpt_merge_write_file(KMR *, const char *,
50  struct kmr_ckpt_merge *);
51 /* Functions for logging */
52 static void kmr_ckpt_init_log(KMR *, const char *);
53 static void kmr_ckpt_fin_log(KMR *);
54 static FILE *kmr_ckpt_open_log(KMR *, const char *, struct kmr_ckpt_log *,
55  unsigned long *);
56 static void kmr_ckpt_log_whole_start(KMR *);
57 static void kmr_ckpt_log_whole_finish(KMR *);
58 static void kmr_ckpt_log_block_start(KMR *, KMR_KVS *);
59 static void kmr_ckpt_log_block_add(KMR *, long, long);
60 static void kmr_ckpt_log_block_finish(KMR *);
61 static void kmr_ckpt_log_index_start(KMR *, KMR_KVS *);
62 static void kmr_ckpt_log_index_add(KMR *, long, long);
63 static void kmr_ckpt_log_index_finish(KMR *);
64 static void kmr_ckpt_log_delete_start(KMR *, long);
65 static void kmr_ckpt_log_delete_finish(KMR *, long);
66 static void kmr_ckpt_log_deletable(KMR *, long );
67 static void kmr_ckpt_log_progress(KMR *);
68 static void kmr_ckpt_log_skipped(KMR *);
69 /* Functions for checkpoint data management */
70 static void kmr_ckpt_delete_ckpt_data(KMR *, long);
71 static void kmr_ckpt_delete_ckpt_files(KMR *, const char *, int);
72 static void kmr_ckpt_save_ckpt(KMR_KVS *);
73 static void kmr_ckpt_kv_record_init(KMR *, KMR_KVS *);
74 static long kmr_ckpt_kv_record_add(KMR_KVS *);
75 static void kmr_ckpt_kv_record_fin(KMR *);
76 static FILE *kmr_ckpt_open(KMR_KVS *, const char *);
77 static FILE *kmr_ckpt_open_path(KMR *, const char *, const char *);
78 static void kmr_ckpt_save_nprocs(KMR *, const char *);
79 static void kmr_ckpt_make_fname(const char *, const char *,
80  enum kmr_ckpt_type, int, long, char *, size_t);
81 static void kmr_ckpt_get_data_flist(KMR *, const char *,
82  struct kmr_ckpt_data_file **, int *,
83  _Bool);
84 static void kmr_ckpt_flush(KMR *, FILE *);
85 /* Utility functions */
86 static void kmr_ckpt_list_init(struct kmr_ckpt_list *, kmr_ckpt_list_alocfn_t,
87  kmr_ckpt_list_freefn_t, kmr_ckpt_list_compfn_t);
88 static void kmr_ckpt_list_free(struct kmr_ckpt_list *);
89 static void kmr_ckpt_list_add(struct kmr_ckpt_list *, void *);
90 static void *kmr_ckpt_list_del(struct kmr_ckpt_list *, void *);
91 static void *kmr_ckpt_list_search(struct kmr_ckpt_list *, void *);
92 static void *kmr_ckpt_list_rsearch(struct kmr_ckpt_list *, void *);
93 static void kmr_ckpt_int_list_init(struct kmr_ckpt_list *);
94 static void kmr_ckpt_int_list_free(struct kmr_ckpt_list *);
95 static void kmr_ckpt_int_list_add(struct kmr_ckpt_list *, long);
96 static long kmr_ckpt_int_list_del(struct kmr_ckpt_list *, long);
97 static long kmr_ckpt_int_list_search(struct kmr_ckpt_list *, long);
98 static long kmr_ckpt_int_list_rsearch(struct kmr_ckpt_list *, long);
99 static void kmr_ckpt_opr_list_init(struct kmr_ckpt_list *);
100 static void kmr_ckpt_opr_list_free(struct kmr_ckpt_list *);
101 static void kmr_ckpt_opr_list_add(struct kmr_ckpt_list *,
102  struct kmr_ckpt_operation);
103 static void kmr_ckpt_kvs_chains_init(struct kmr_ckpt_kvs_chains *);
104 static void kmr_ckpt_kvs_chains_free(struct kmr_ckpt_kvs_chains *);
105 static void kmr_ckpt_kvs_chains_new_chain(struct kmr_ckpt_kvs_chains *,
106  struct kmr_ckpt_operation);
107 static void kmr_ckpt_kvs_chains_connect(struct kmr_ckpt_kvs_chains *,
108  struct kmr_ckpt_operation);
109 static struct kmr_ckpt_list *
110 kmr_ckpt_kvs_chains_find(struct kmr_ckpt_kvs_chains *, long);
111 
112 
113 /** Initialize checkpoint context. This function should be called only once
114  when MapReduce data type is initialized.
115 
116  \param[in] mr MapReduce data type
117 */
118 void
120 {
121  struct kmr_ckpt_ctx *
122  ckptctx = kmr_malloc(sizeof(struct kmr_ckpt_ctx));
123  mr->ckpt_ctx = ckptctx;
124  snprintf(ckptctx->ckpt_dname, KMR_CKPT_DIRLEN, "./%s%05d",
125  KMR_CKPT_DIRNAME, mr->rank);
126  ckptctx->prev_mode = KMR_CKPT_ALL;
127  ckptctx->ckpt_log_fp = NULL;
128  ckptctx->progress_counter = 0;
129  ckptctx->prev_progress = 0;
130  ckptctx->prev_global_progress = 0;
131  ckptctx->cur_kvi_id = KMR_CKPT_DUMMY_ID;
132  ckptctx->cur_kvo_id = KMR_CKPT_DUMMY_ID;
133  ckptctx->ckpt_data_fp = NULL;
134  ckptctx->saved_element_count = 0;
135  ckptctx->saved_adding_point = NULL;
136  ckptctx->saved_current_block = NULL;
137  ckptctx->kv_positions = NULL;
138  ckptctx->kv_positions_count = 0;
139  ckptctx->lock_id = 0;
140  ckptctx->lock_counter = 0;
141  ckptctx->initialized = 0;
142  ckptctx->slct_cur_take_ckpt = 0;
143  if (mr->ckpt_selective) {
144  ckptctx->slct_skip_ops = (struct kmr_ckpt_list *)
145  kmr_malloc(sizeof(struct kmr_ckpt_list));
146  kmr_ckpt_int_list_init(ckptctx->slct_skip_ops);
147  } else {
148  ckptctx->slct_skip_ops = NULL;
149  }
150 
151  if (mr->ckpt_enable) {
152  kmr_ckpt_init_environment(mr);
153  }
154 }
155 
156 /** Free checkpoint context. This function should be called only once
157  when MapReduce data type is freed.
158 
159  \param[in] mr MapReduce data type
160 */
161 void
163 {
164  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
165  if (kmr_ckpt_enabled(mr)) {
166  MPI_Barrier(mr->comm);
167  kmr_ckpt_fin_log(mr);
168  kmr_ckpt_delete_ckpt_files(mr, ckptctx->ckpt_dname, mr->rank);
169  kmr_free(ckptctx->kv_positions,
170  sizeof(struct kv_position) * (size_t)ckptctx->kv_positions_count);
171  }
172  if (mr->ckpt_selective) {
173  kmr_ckpt_int_list_free(ckptctx->slct_skip_ops);
174  kmr_free(ckptctx->slct_skip_ops, sizeof(struct kmr_ckpt_list));
175  }
176  kmr_free(ckptctx, sizeof(struct kmr_ckpt_ctx));
177  mr->ckpt_ctx = 0;
178 }
179 
180 /***************************************************************/
181 /* Functions for initilizing checkpoint/restart environment */
182 /***************************************************************/
183 
184 /* Initialize checkpoint environment */
185 static void
186 kmr_ckpt_init_environment(KMR *mr)
187 {
188  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
189  if (ckptctx->initialized) {
190  return;
191  }
192 
193  /* Check if restarted or not */
194  int *prev_ranks = NULL;
195  int prev_rank_count = 0;
196  int prev_nprocs = 0;
197  int restarted = kmr_ckpt_check_restart(mr, &prev_ranks, &prev_rank_count,
198  &prev_nprocs);
199  {
200  int all_restarted;
201  int cc = MPI_Allreduce(&restarted, &all_restarted, 1, MPI_INT,
202  MPI_LAND, mr->comm);
203  assert(cc == MPI_SUCCESS);
204  assert(restarted == all_restarted);
205  }
206 
207  /* Create a temporal directory for checkpoint files */
208  char tmp_dname[KMR_CKPT_DIRLEN];
209  snprintf(tmp_dname, KMR_CKPT_DIRLEN, "./tmp_%s%05d",
210  KMR_CKPT_DIRNAME, mr->rank);
211  kmr_ckpt_delete_ckpt_files(mr, tmp_dname, mr->rank);
212  int cc = mkdir(tmp_dname, S_IRWXU);
213  if (cc != 0) {
214  char msg[KMR_CKPT_MSGLEN];
215  snprintf(msg, sizeof(msg),
216  "Failed to create a directory for checkpoint %s", tmp_dname);
217  kmr_error(mr, msg);
218  }
219 
220  /* Load checkpoint files to restart */
221  if (restarted) {
222  kmr_ckpt_restore_prev_progress(mr, prev_ranks, prev_rank_count);
223  kmr_ckpt_restore_prev_state(mr, tmp_dname, prev_ranks, prev_rank_count,
224  prev_nprocs);
225  }
226 
227  /* Initialize a log file */
228  char log_fpath[KMR_CKPT_PATHLEN];
229  kmr_ckpt_make_fname(tmp_dname, KMR_CKPT_FNAME_PREFIX, KMR_CKPT_LOG,
230  mr->rank, 0, log_fpath, sizeof(log_fpath));
231  kmr_ckpt_init_log(mr, log_fpath);
232 
233  /* save nprocs to file */
234  if (mr->rank == 0) {
235  kmr_ckpt_save_nprocs(mr, tmp_dname);
236  }
237 
238  /* Rename directories */
239  for (int i = 0; i < prev_rank_count; i++) {
240  char old_dname[KMR_CKPT_DIRLEN];
241  snprintf(old_dname, KMR_CKPT_DIRLEN, "./%s%05d.old",
242  KMR_CKPT_DIRNAME, prev_ranks[i]);
243  kmr_ckpt_delete_ckpt_files(mr, old_dname, prev_ranks[i]);
244  char cur_dname[KMR_CKPT_DIRLEN];
245  snprintf(cur_dname, KMR_CKPT_DIRLEN, "./%s%05d",
246  KMR_CKPT_DIRNAME, prev_ranks[i]);
247  struct stat sb;
248  cc = stat(cur_dname, &sb);
249  if (cc == 0) {
250  cc = rename(cur_dname, old_dname);
251  assert(cc == 0);
252  }
253  }
254  MPI_Barrier(mr->comm);
255  cc = rename(tmp_dname, ckptctx->ckpt_dname);
256  assert(cc == 0);
257 
258  if (restarted) {
259  kmr_free(prev_ranks, sizeof(int) * (size_t)prev_rank_count);
260  }
261  ckptctx->initialized = 1;
262 }
263 
264 /* Check if this run is restarted or not.
265  It also finds target checkpoint files of the previous run if restarted. */
266 static int
267 kmr_ckpt_check_restart(KMR *mr, int **target_ranks, int *target_rank_count,
268  int *target_nprocs)
269 {
270  int restarted = 0;
271  _Bool force_start_from_scratch = 0;
272  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
273  struct stat sb;
274  int cc = stat(ckptctx->ckpt_dname, &sb);
275  if (cc == 0) {
276  if (!S_ISDIR(sb.st_mode)) {
277  char msg[KMR_CKPT_MSGLEN];
278  snprintf(msg, sizeof(msg),
279  "Non-directory file for checkpoint directory %s "
280  "already exist",
281  ckptctx->ckpt_dname);
282  kmr_error(mr, msg);
283  }
284  /* Read this rank's log and find target ranks in the previous run */
285  char fpath[KMR_CKPT_PATHLEN];
286  kmr_ckpt_make_fname(ckptctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
287  KMR_CKPT_LOG, mr->rank, 0, fpath, sizeof(fpath));
288  cc = access(fpath, R_OK);
289  if (cc == 0) {
290  struct kmr_ckpt_log log_hdr;
291  unsigned long log_size = 0;
292  FILE *fp = kmr_ckpt_open_log(mr, fpath, &log_hdr, &log_size);
293  fclose(fp);
294  assert(mr->rank == log_hdr.rank);
295  assert(log_hdr.nprocs > 0);
296  if (log_size == 0) {
297  force_start_from_scratch = 1;
298  char msg[KMR_CKPT_MSGLEN];
299  snprintf(msg, sizeof(msg),
300  "Log file exists, but no log is recorded in %s. "
301  "All logs are discarded and start from scratch",
302  fpath);
303  kmr_warning(mr, 1, msg);
304  }
305  int quotient = log_hdr.nprocs / mr->nprocs;
306  int rest = log_hdr.nprocs % mr->nprocs;
307  int cnt = quotient + ((mr->rank < rest) ? 1 : 0);
308  if (cnt != 0) {
309  *target_ranks = (int*)kmr_malloc(sizeof(int) * (size_t)cnt);
310  int offset = mr->rank * quotient +
311  ((mr->rank < rest) ? mr->rank : rest);
312  for (int i = 0; i < cnt; i++) {
313  (*target_ranks)[i] = offset + i;
314  }
315  }
316  *target_rank_count = cnt;
317  *target_nprocs = log_hdr.nprocs;
318  if (mr->nprocs > log_hdr.nprocs) {
319  // TODO support future
320  char msg[KMR_CKPT_MSGLEN];
321  snprintf(msg, sizeof(msg),
322  "Currently restart with bigger number of processes "
323  "is not supported");
324  kmr_error(mr, msg);
325  }
326  if (mr->ckpt_selective && mr->nprocs != log_hdr.nprocs) {
327  char msg[KMR_CKPT_MSGLEN];
328  snprintf(msg, sizeof(msg),
329  "Restart with different number of processes "
330  "is not supported in selective mode");
331  kmr_error(mr, msg);
332  }
333  ckptctx->prev_mode = log_hdr.mode;
334  } else {
335  char msg[KMR_CKPT_MSGLEN];
336  snprintf(msg, sizeof(msg),
337  "Structure of a checkpoint directory may be wrong %s. "
338  "Delete all checkpoint directories",
339  ckptctx->ckpt_dname);
340  kmr_error(mr, msg);
341  }
342  } else {
343  if (errno != ENOENT) {
344  char msg[KMR_CKPT_MSGLEN];
345  snprintf(msg, sizeof(msg),
346  "Unknown error on checkpoint directory %s",
347  ckptctx->ckpt_dname);
348  kmr_error(mr, msg);
349  }
350  assert(*target_rank_count == 0);
351  }
352 
353  /* Check consistency of target checkpoint log files to decide restart */
354  if (*target_rank_count > 0) {
355  for (int i = 1; i < *target_rank_count; i++) {
356  _Bool success = 1;
357  int t_rank = (*target_ranks)[i];
358  char dpath[KMR_CKPT_DIRLEN];
359  snprintf(dpath, KMR_CKPT_DIRLEN, "./%s%05d", KMR_CKPT_DIRNAME,
360  t_rank);
361  char fpath[KMR_CKPT_PATHLEN];
362  kmr_ckpt_make_fname(dpath, KMR_CKPT_FNAME_PREFIX, KMR_CKPT_LOG,
363  t_rank, 0, fpath, sizeof(fpath));
364  cc = access(fpath, R_OK);
365  if (cc == 0) {
366  struct kmr_ckpt_log log_hdr;
367  unsigned long log_size = 0;
368  FILE *fp = kmr_ckpt_open_log(mr, fpath, &log_hdr, &log_size);
369  fclose(fp);
370  if (log_hdr.nprocs < 0) {
371  success = 0;
372  }
373  if (log_size == 0) {
374  force_start_from_scratch = 1;
375  char msg[KMR_CKPT_MSGLEN];
376  snprintf(msg, sizeof(msg),
377  "Log file exists, but no log is recorded in %s. "
378  "All logs are discarded and start from scratch",
379  fpath);
380  kmr_warning(mr, 1, msg);
381  }
382  } else {
383  success = 0;
384  }
385  if (!success) {
386  kmr_free(*target_ranks, (size_t)*target_rank_count);
387  char msg[KMR_CKPT_MSGLEN];
388  snprintf(msg, sizeof(msg),
389  "Wrong structure of checkpoint directory %s. ",
390  dpath);
391  kmr_error(mr, msg);
392  }
393  }
394  if (!force_start_from_scratch) {
395  restarted = 1;
396  }
397  }
398  return restarted;
399 }
400 
401 /* Restore MapReduce progress of the previous run for all mode. */
402 static void
403 kmr_ckpt_restore_prev_progress_all(KMR *mr,
404  int *target_ranks, int target_rank_count)
405 {
406  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
407  long min_progress = -1;
408 
409  /* read all ckpt logs files to find the minimum progress */
410  for (int i = 0; i < target_rank_count; i++) {
411  int rank = target_ranks[i];
412  char dpath[KMR_CKPT_DIRLEN];
413  snprintf(dpath, KMR_CKPT_DIRLEN, "./%s%05d", KMR_CKPT_DIRNAME, rank);
414  char fpath[KMR_CKPT_PATHLEN];
415  kmr_ckpt_make_fname(dpath, KMR_CKPT_FNAME_PREFIX, KMR_CKPT_LOG,
416  rank, 0, fpath, sizeof(fpath));
417  struct kmr_ckpt_log log_hdr;
418  unsigned long total, size = 0;
419  FILE *fp = kmr_ckpt_open_log(mr, fpath, &log_hdr, &total);
420  long max_done_op = 0, cur_op = 0;
421  _Bool num_procs_locked = 0;
422  while (size < total) {
423  struct kmr_ckpt_log_entry e;
424  size_t rc = fread((void *)&e, sizeof(e), 1, fp);
425  if (rc != 1) {
426  char msg[KMR_CKPT_MSGLEN];
427  snprintf(msg, sizeof(msg),
428  "Failed to read a checkpoint log entry");
429  kmr_error(mr, msg);
430  }
431  switch (e.state) {
432  case KMR_CKPT_LOG_WHOLE_START:
433  case KMR_CKPT_LOG_BLOCK_START:
434  case KMR_CKPT_LOG_INDEX_START:
435  cur_op = e.op_seqno;
436  break;
437  case KMR_CKPT_LOG_WHOLE_FINISH:
438  case KMR_CKPT_LOG_BLOCK_FINISH:
439  case KMR_CKPT_LOG_INDEX_FINISH:
440  max_done_op = cur_op;
441  cur_op = 0;
442  break;
443  case KMR_CKPT_LOG_SKIPPED:
444  max_done_op = e.op_seqno;
445  break;
446  case KMR_CKPT_LOG_LOCK_START:
447  assert(num_procs_locked == 0);
448  num_procs_locked = 1;
449  break;
450  case KMR_CKPT_LOG_LOCK_FINISH:
451  assert(num_procs_locked == 1);
452  num_procs_locked = 0;
453  break;
454  }
455  size += sizeof(e);
456  }
457  fclose(fp);
458  if (num_procs_locked && target_rank_count > 1) {
459  /* Can not restart with the different number of processes */
460  char msg[KMR_CKPT_MSGLEN];
461  snprintf(msg, sizeof(msg),
462  "Fault occurred in a critical region and can not restart "
463  "with the different number of processes. "
464  "Restart with the same number of processes with "
465  "the previous run.");
466  kmr_error(mr, msg);
467  }
468  if (min_progress < 0) {
469  min_progress = max_done_op;
470  } else {
471  if (max_done_op < min_progress) {
472  min_progress = max_done_op;
473  }
474  }
475  }
476  assert(min_progress >= 0);
477 
478  /* Find global minimal progress */
479  long global_min_progress;
480  int cc = MPI_Allreduce(&min_progress, &global_min_progress, 1, MPI_LONG,
481  MPI_MIN, mr->comm);
482  assert(cc == MPI_SUCCESS);
483 
484  ckptctx->prev_progress = min_progress;
485  ckptctx->prev_global_progress = global_min_progress;
486 }
487 
488 /* Restore MapReduce progress of the previous run for selective mode. */
489 static void
490 kmr_ckpt_restore_prev_progress_selective(KMR *mr, int *target_ranks,
491  int target_rank_count)
492 {
493  long min_progress = -1, max_progress = -1;
494  /* read all ckpt logs files to find the minimum progress */
495  for (int i = 0; i < target_rank_count; i++) {
496  int rank = target_ranks[i];
497  char dpath[KMR_CKPT_DIRLEN];
498  snprintf(dpath, KMR_CKPT_DIRLEN, "./%s%05d", KMR_CKPT_DIRNAME, rank);
499  char fpath[KMR_CKPT_PATHLEN];
500  kmr_ckpt_make_fname(dpath, KMR_CKPT_FNAME_PREFIX, KMR_CKPT_LOG,
501  rank, 0, fpath, sizeof(fpath));
502  struct kmr_ckpt_log log_hdr;
503  unsigned long total, size = 0;
504  FILE *fp = kmr_ckpt_open_log(mr, fpath, &log_hdr, &total);
505  long target_kvs_id = KMR_CKPT_DUMMY_ID;
506  /* stores kvs transition chains */
507  struct kmr_ckpt_kvs_chains chains;
508  kmr_ckpt_kvs_chains_init(&chains);
509  /* stores id of kvses that has been checkpointed */
510  struct kmr_ckpt_list kvses;
511  kmr_ckpt_int_list_init(&kvses);
512  _Bool num_procs_locked = 0;
513  while (size < total) {
514  struct kmr_ckpt_log_entry e;
515  size_t rc = fread((void *)&e, sizeof(e), 1, fp);
516  if (rc != 1) {
517  char msg[KMR_CKPT_MSGLEN];
518  snprintf(msg, sizeof(msg),
519  "Failed to read a checkpoint log entry");
520  kmr_error(mr, msg);
521  }
522  struct kmr_ckpt_operation op = { .op_seqno = e.op_seqno,
523  .kvi_id = e.kvi_id,
524  .kvo_id = e.kvo_id };
525  long v;
526  switch (e.state) {
527  case KMR_CKPT_LOG_WHOLE_START:
528  target_kvs_id = e.kvo_id;
529  break;
530  case KMR_CKPT_LOG_WHOLE_FINISH:
531  kmr_ckpt_int_list_add(&kvses, target_kvs_id);
532  target_kvs_id = KMR_CKPT_DUMMY_ID;
533  break;
534  case KMR_CKPT_LOG_DELETABLE:
535  v = kmr_ckpt_int_list_del(&kvses, e.kvo_id);
536  assert(v == e.kvo_id);
537  break;
538  case KMR_CKPT_LOG_PROGRESS:
539  case KMR_CKPT_LOG_SKIPPED:
540  if (op.kvi_id == KMR_CKPT_DUMMY_ID) {
541  kmr_ckpt_kvs_chains_new_chain(&chains, op);
542  } else {
543  kmr_ckpt_kvs_chains_connect(&chains, op);
544  }
545  break;
546  case KMR_CKPT_LOG_LOCK_START:
547  assert(num_procs_locked == 0);
548  num_procs_locked = 1;
549  break;
550  case KMR_CKPT_LOG_LOCK_FINISH:
551  assert(num_procs_locked == 1);
552  num_procs_locked = 0;
553  break;
554  }
555  size += sizeof(e);
556  }
557  fclose(fp);
558  if (num_procs_locked) {
559  /* nothing to do as currently selective mode does not support
560  restart with different number of procs */
561  }
562 
563  /* calculate progress */
564  long open_min_progress = LONG_MAX;
565  long open_max_progress = 0;
566  long last_op_id = 0;
567  for (int j = 0; j < chains.chainlst_size; j++) {
568  struct kmr_ckpt_list *list = &(chains.chainlst[j]);
569  struct kmr_ckpt_operation *last_op =
570  (struct kmr_ckpt_operation *)list->tail->val;
571  if (last_op->op_seqno > last_op_id) {
572  last_op_id = last_op->op_seqno;
573  }
574  if (last_op->kvo_id != KMR_CKPT_DUMMY_ID) {
575  /* chain is open */
576  struct kmr_ckpt_list_item *item;
577  for (item = list->tail; item != 0; item = item->prev) {
578  struct kmr_ckpt_operation *op =
579  (struct kmr_ckpt_operation *)item->val;
580  long v = kmr_ckpt_int_list_search(&kvses, op->kvo_id);
581  if (v == op->kvo_id) {
582  if (op->op_seqno < open_min_progress) {
583  open_min_progress = op->op_seqno;
584  }
585  if (op->op_seqno > open_max_progress) {
586  open_max_progress = op->op_seqno;
587  }
588  break;
589  }
590  }
591  }
592  }
593  if (open_min_progress == LONG_MAX && open_max_progress == 0) {
594  open_min_progress = last_op_id;
595  open_max_progress = last_op_id;
596  }
597 
598  /* initialize the skip operation list */
599  struct kmr_ckpt_list *skip_ops = mr->ckpt_ctx->slct_skip_ops;
600  for (int j = 0; j < chains.chainlst_size; j++) {
601  struct kmr_ckpt_list *list = &(chains.chainlst[j]);
602  struct kmr_ckpt_operation *last_op =
603  (struct kmr_ckpt_operation *)list->tail->val;
604  if (last_op->op_seqno <= open_min_progress) {
605  continue;
606  }
607  struct kmr_ckpt_operation *head_op =
608  (struct kmr_ckpt_operation *)list->head->val;
609  if (head_op->op_seqno > open_max_progress) {
610  continue;
611  }
612  if (last_op->kvo_id == KMR_CKPT_DUMMY_ID) {
613  /* chain is closed.
614  add all operations larger than 'open_min_progress' */
615  struct kmr_ckpt_list_item *item;
616  for (item = list->head; item != 0; item = item->next) {
617  struct kmr_ckpt_operation *op =
618  (struct kmr_ckpt_operation *)item->val;
619  if (op->op_seqno > open_min_progress) {
620  kmr_ckpt_int_list_add(skip_ops, op->op_seqno);
621  }
622  }
623  } else {
624  /* chain is open.
625  add all operations larger than 'open_min_progress' and
626  smaller than last-ckpt-saved-operation */
627  _Bool f_add = 0;
628  struct kmr_ckpt_list_item *item;
629  for (item = list->tail; item != 0; item = item->prev) {
630  struct kmr_ckpt_operation *op =
631  (struct kmr_ckpt_operation *)item->val;
632  long v = kmr_ckpt_int_list_search(&kvses, op->kvo_id);
633  if (v == op->kvo_id) {
634  f_add = 1;
635  }
636  if (f_add) {
637  if (op->op_seqno > open_min_progress) {
638  kmr_ckpt_int_list_add(skip_ops, op->op_seqno);
639  }
640  }
641  }
642  }
643  }
644  kmr_ckpt_kvs_chains_free(&chains);
645  kmr_ckpt_int_list_free(&kvses);
646  min_progress = open_min_progress;
647  max_progress = open_max_progress;
648  }
649 
650  assert(max_progress >= 0 && min_progress >= 0);
651  mr->ckpt_ctx->prev_progress = max_progress;
652  mr->ckpt_ctx->prev_global_progress = min_progress;
653 }
654 
655 /* Restore MapReduce progress of the previous run. */
656 static void
657 kmr_ckpt_restore_prev_progress(KMR *mr,
658  int *target_ranks, int target_rank_count)
659 {
660  if (!mr->ckpt_selective) {
661  kmr_ckpt_restore_prev_progress_all(mr, target_ranks,
662  target_rank_count);
663  } else {
664  kmr_ckpt_restore_prev_progress_selective(mr, target_ranks,
665  target_rank_count);
666  }
667 }
668 
669 /* Read checkpoints of previous run and create restart files */
670 static void
671 kmr_ckpt_restore_prev_state(KMR *mr, const char *wdpath,
672  int *target_ranks, int target_rank_count,
673  int prev_nprocs)
674 {
675  /* Load checkpoint data files of each rank */
676  char **rdpaths =
677  (char **)kmr_malloc(sizeof(char *) * (size_t)target_rank_count);
678  struct kmr_ckpt_data_file **dataflsts = (struct kmr_ckpt_data_file **)
679  kmr_malloc(sizeof(struct kmr_ckpt_data_file *) * (size_t)target_rank_count);
680  int *nfiles = (int *)kmr_malloc(sizeof(int) * (size_t)target_rank_count);
681  int max_merge_count = 0;
682  for (int i = 0; i < target_rank_count; i++) {
683  rdpaths[i] = (char*)kmr_malloc(sizeof(char) * KMR_CKPT_DIRLEN);
684  snprintf(rdpaths[i], KMR_CKPT_DIRLEN, "./%s%05d",
685  KMR_CKPT_DIRNAME, target_ranks[i]);
686  kmr_ckpt_get_data_flist(mr, rdpaths[i], &dataflsts[i], &nfiles[i], 1);
687  max_merge_count += nfiles[i];
688  }
689 
690  /* Collect information for merging checkpoint data */
691  struct kmr_ckpt_merge_ctx merge_ctx;
692  merge_ctx.max_each_merge = target_rank_count;
693  merge_ctx.merges_count = 0;
694  merge_ctx.merges = (struct kmr_ckpt_merge *)
695  kmr_malloc(sizeof(struct kmr_ckpt_merge) * (size_t)max_merge_count);
696 
697  for (int i = 0; i < target_rank_count; i++) {
698  struct kmr_ckpt_prev_state prev_state;
699  prev_state.prev_rank = target_ranks[i];
700  prev_state.prev_nprocs = prev_nprocs;
701  prev_state.ckpt_dir = rdpaths[i];
702  prev_state.dataflst = dataflsts[i];
703  prev_state.dataflst_size = nfiles[i];
704  kmr_ckpt_restore_prev_state_each_rank(mr, &prev_state, &merge_ctx);
705  }
706 
707 #if 0
708  /* debug print */
709  if (mr->rank == 0) {
710  for (int i = 0; i < target_rank_count; i++) {
711  fprintf(stderr, "index: %d\n", i);
712  fprintf(stderr, " rdpath: %s\n", rdpaths[i]);
713  fprintf(stderr, " nfiles: %d\n", nfiles[i]);
714  for (int j = 0; j < nfiles[i]; j++) {
715  struct kmr_ckpt_data_file *file = &dataflsts[i][j];
716  fprintf(stderr, " ckptflst: %ld, %s/%s\n",
717  file->kvs_id, file->dname, file->fname);
718  }
719  }
720  fprintf(stderr, "max_merge_count: %d\n", max_merge_count);
721 
722  fprintf(stderr, "\n\n");
723 
724  fprintf(stderr, "merge_count: %d\n", merge_ctx.merges_count);
725  for (int i = 0; i < merge_ctx.merges_count; i++) {
726  fprintf(stderr, "merge\n");
727  fprintf(stderr, " rank: %d\n", merge_ctx.merges[i].rank);
728  fprintf(stderr, " kvs_id: %ld\n", merge_ctx.merges[i].kvs_id);
729  fprintf(stderr, " src_lst: %d\n",
730  merge_ctx.merges[i].src_lst_count);
731  for (int j = 0; j < merge_ctx.merges[i].src_lst_count; j++) {
732  struct kmr_ckpt_merge_source *source =
733  &(merge_ctx.merges[i].src_lst[j]);
734  fprintf(stderr, " rank: %d, n_kvi: %ld, n_kvo: %ld\n",
735  source->rank, source->n_kvi, source->n_kvo);
736  fprintf(stderr, " file: %s/%s\n",
737  source->file->dname, source->file->fname);
738  if (merge_ctx.merges[i].src_lst[j].done_ikv_lst_size > 0) {
739  fprintf(stderr, " done ikvs index: ");
740  for (int k = 0; k < source->done_ikv_lst_size; k++) {
741  fprintf(stderr, "%ld,", source->done_ikv_lst[k]);
742  }
743  fprintf(stderr, "\n");
744  }
745  }
746  }
747  }
748 #endif
749 
750  /* Create sorted checkpoint file */
751  for (int i = 0; i < merge_ctx.merges_count; i++) {
752  struct kmr_ckpt_merge *merge = &merge_ctx.merges[i];
753  for (int j = 0; j < merge->src_lst_count; j++) {
754  if (merge->src_lst[j].n_kvi > 0 &&
755  merge->src_lst[j].done_ikv_lst_size > 0) {
756  /* checkpoint data should be sorted */
757  kmr_ckpt_merge_sort_data(mr, wdpath, merge->kvs_id,
758  &merge->src_lst[j]);
759  }
760  }
761  }
762 
763  /* Write merged checkpoint data to files */
764  for (int i = 0; i < merge_ctx.merges_count; i++) {
765  kmr_ckpt_merge_write_file(mr, wdpath, &merge_ctx.merges[i]);
766  }
767 
768  /* Setup map/reduce operation start point */
769  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
770  for (int i = 0; i < merge_ctx.merges_count; i++) {
771  struct kmr_ckpt_merge *merge = &merge_ctx.merges[i];
772  for (int j = 0; j < merge->src_lst_count; j++) {
773  struct kmr_ckpt_merge_source* mg_src = &merge->src_lst[j];
774  if (mg_src->kvi_op_seqno > 0) {
775  ckptctx->kv_positions_count++;
776  break;
777  }
778  }
779  }
780  ckptctx->kv_positions = (struct kv_position *)
781  kmr_malloc(sizeof(struct kv_position) * (size_t)ckptctx->kv_positions_count);
782  int idx = 0;
783  for (int i = 0; i < merge_ctx.merges_count; i++) {
784  struct kmr_ckpt_merge *merge = &merge_ctx.merges[i];
785  _Bool found = 0;
786  for (int j = 0; j < merge->src_lst_count; j++) {
787  struct kmr_ckpt_merge_source *mg_src = &merge->src_lst[j];
788  if (mg_src->n_kvi > 0) {
789  struct kv_position *kvpos = &ckptctx->kv_positions[idx];
790  if (!found){
791  found = 1;
792  kvpos->op_seqno = mg_src->kvi_op_seqno;
793  kvpos->start_from = mg_src->n_kvi;
794  } else {
795  assert(mg_src->kvi_op_seqno == kvpos->op_seqno);
796  kvpos->start_from += mg_src->n_kvi;
797  }
798  }
799  }
800  if (found) {
801  idx++;
802  }
803  }
804 
805 #if 0
806  /* debug print */
807  for (int i = 0; i < ckptctx->kv_positions_count; i++) {
808  fprintf(stderr, "op_seqno: %ld, start_from: %ld\n",
809  ckptctx->kv_positions[i].op_seqno,
810  ckptctx->kv_positions[i].start_from);
811  }
812 #endif
813 
814  /* post process */
815  for (int i = 0; i < merge_ctx.merges_count; i++) {
816  for (int j = 0; j < merge_ctx.merges[i].src_lst_count; j++) {
817  if (merge_ctx.merges[i].src_lst[j].done_ikv_lst_size > 0) {
818  struct kmr_ckpt_merge_source *mg_src = &(merge_ctx.merges[i].src_lst[j]);
819  kmr_free(mg_src->done_ikv_lst,
820  sizeof(long) * (size_t)mg_src->done_ikv_lst_size);
821  char fpath[KMR_CKPT_PATHLEN];
822  snprintf(fpath, KMR_CKPT_PATHLEN, "%s/%s",
823  mg_src->file->dname, mg_src->file->fname);
824  unlink(fpath);
825  kmr_free(mg_src->file, sizeof(struct kmr_ckpt_data_file));
826  }
827  }
828  }
829  kmr_free(merge_ctx.merges,
830  sizeof(struct kmr_ckpt_merge) * (size_t)max_merge_count);
831  for (int i = 0; i < target_rank_count; i++) {
832  kmr_free(dataflsts[i],
833  sizeof(struct kmr_ckpt_data_file) * (size_t)nfiles[i]);
834  kmr_free(rdpaths[i], sizeof(char) * KMR_CKPT_DIRLEN);
835  }
836  kmr_free(dataflsts,
837  sizeof(struct kmr_ckpt_data_file *) * (size_t)target_rank_count);
838  kmr_free(nfiles, sizeof(int) * (size_t)target_rank_count);
839  kmr_free(rdpaths, sizeof(char *) * (size_t)target_rank_count);
840 }
841 
842 /* Read previous checkpoint data of a rank and record merge information
843  for all mode. */
844 static void
845 kmr_ckpt_restore_prev_state_each_rank_all
846 (KMR *mr, struct kmr_ckpt_prev_state *prev_state,
847  struct kmr_ckpt_merge_ctx *merge_ctx)
848 {
849  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
850  char logfile[KMR_CKPT_PATHLEN];
851  kmr_ckpt_make_fname(prev_state->ckpt_dir, KMR_CKPT_FNAME_PREFIX,
852  KMR_CKPT_LOG, prev_state->prev_rank, 0,
853  logfile, sizeof(logfile));
854  struct kmr_ckpt_log log_hdr;
855  unsigned long total, size = 0;
856  FILE *fp = kmr_ckpt_open_log(mr, logfile, &log_hdr, &total);
857 
858  long cur_op = 0;
859  /* list of ignored inconsistent KVSes */
860  struct kmr_ckpt_kvs_chains kvs_chains;
861  kmr_ckpt_kvs_chains_init(&kvs_chains);
862  /* list of completed spawn key-values */
863  struct kmr_ckpt_list spawn_dones;
864  /* used for map/reduce & spawn map */
865  long nkvi = 0, nkvo = 0;
866  /* used for find undeleted kvs */
867  long undel_kvs_id = 0;
868  struct kmr_ckpt_log_entry last_log = { 0, 0, 0, 0, 0, 0 };
869  while (size < total) {
870  struct kmr_ckpt_log_entry e;
871  size_t rc = fread((void *)&e, sizeof(e), 1, fp);
872  if (rc != 1) {
873  char msg[KMR_CKPT_MSGLEN];
874  snprintf(msg, sizeof(msg),
875  "Failed to read a checkpoint log entry");
876  kmr_error(mr, msg);
877  }
878  struct kmr_ckpt_operation op = { .op_seqno = e.op_seqno,
879  .kvi_id = e.kvi_id,
880  .kvo_id = e.kvo_id };
881  switch (e.state) {
882  case KMR_CKPT_LOG_WHOLE_START:
883  cur_op = e.op_seqno;
884  if (cur_op <= ckptctx->prev_global_progress) {
885  /* Operation can be skipped
886  Procesed in KMR_CKPT_LOG_WHOLE_FINISH */
887  } else { /* cur_op > ckptctx->prev_global_progress */
888  if (e.kvi_id == KMR_CKPT_DUMMY_ID) {
889  /* Ignore a kvs generated by kmr_map_once called
890  in this region */
891  kmr_ckpt_merge_ignore_ckpt_data(e.kvo_id, prev_state,
892  merge_ctx);
893  kmr_ckpt_kvs_chains_new_chain(&kvs_chains, op);
894  } else {
895  /* If the kvs is generated from a KVS generated by
896  kmr_map_once in this regin, ignore it */
897  int cc = kmr_ckpt_merge_check_ignorable(&kvs_chains,
898  e.kvi_id);
899  if (cc == 0) {
900  kmr_ckpt_merge_ignore_ckpt_data(e.kvo_id, prev_state,
901  merge_ctx);
902  kmr_ckpt_kvs_chains_connect(&kvs_chains, op);
903  }
904  }
905  }
906  if (cur_op > ckptctx->prev_progress) {
907  last_log = e;
908  }
909  break;
910  case KMR_CKPT_LOG_WHOLE_FINISH:
911  assert(e.op_seqno == cur_op);
912  kmr_ckpt_merge_store_ckpt_data(e.kvo_id, mr->rank, -1,
913  prev_state, merge_ctx);
914  cur_op = 0;
915  break;
916  case KMR_CKPT_LOG_BLOCK_START:
917  cur_op = e.op_seqno;
918  assert(e.kvi_id != KMR_CKPT_DUMMY_ID);
919  if (cur_op <= ckptctx->prev_global_progress) {
920  /* Operation can be skipped
921  Procesed in KMR_CKPT_LOG_BLOCK_FINISH */
922  } else { /* cur_op > ckptctx->prev_global_progress */
923  /* If the kvs is generated from a KVS generated by
924  kmr_map_once in this regin, ignore it */
925  int cc = kmr_ckpt_merge_check_ignorable(&kvs_chains, e.kvi_id);
926  if (cc == 0) {
927  kmr_ckpt_merge_ignore_ckpt_data(e.kvo_id, prev_state,
928  merge_ctx);
929  kmr_ckpt_kvs_chains_connect(&kvs_chains, op);
930  }
931  }
932  if (cur_op > ckptctx->prev_progress) {
933  last_log = e;
934  nkvi = e.n_kvi;
935  nkvo = e.n_kvo;
936  }
937  break;
938  case KMR_CKPT_LOG_BLOCK_ADD:
939  assert(e.op_seqno == cur_op);
940  if (cur_op > ckptctx->prev_progress) {
941  last_log = e;
942  nkvi += e.n_kvi;
943  nkvo += e.n_kvo;
944  }
945  break;
946  case KMR_CKPT_LOG_BLOCK_FINISH:
947  assert(e.op_seqno == cur_op);
948  kmr_ckpt_merge_store_ckpt_data(e.kvo_id, mr->rank, -1,
949  prev_state, merge_ctx);
950  cur_op = 0;
951  if (cur_op > ckptctx->prev_progress) {
952  nkvi = 0;
953  nkvo = 0;
954  }
955  break;
956  case KMR_CKPT_LOG_INDEX_START:
957  cur_op = e.op_seqno;
958  assert(e.kvi_id != KMR_CKPT_DUMMY_ID);
959  if (cur_op <= ckptctx->prev_global_progress) {
960  /* Operation can be skipped
961  Procesed in KMR_CKPT_LOG_BLOCK_FINISH */
962  } else { /* cur_op > ckptctx->prev_global_progress */
963  /* If the kvs is generated from a KVS generated by
964  kmr_map_once in this regin, ignore it */
965  int cc = kmr_ckpt_merge_check_ignorable(&kvs_chains, e.kvi_id);
966  if (cc == 0) {
967  kmr_ckpt_merge_ignore_ckpt_data(e.kvo_id, prev_state,
968  merge_ctx);
969  kmr_ckpt_kvs_chains_connect(&kvs_chains, op);
970  }
971  }
972  if (cur_op > ckptctx->prev_progress) {
973  last_log = e;
974  kmr_ckpt_int_list_init(&spawn_dones);
975  nkvi = e.n_kvi;
976  nkvo = e.n_kvo;
977  }
978  break;
979  case KMR_CKPT_LOG_INDEX_ADD:
980  assert(e.op_seqno == cur_op);
981  if (cur_op > ckptctx->prev_progress) {
982  last_log = e;
983  kmr_ckpt_int_list_add(&spawn_dones, e.n_kvi);
984  nkvi += 1;
985  nkvo += e.n_kvo;
986  }
987  break;
988  case KMR_CKPT_LOG_INDEX_FINISH:
989  assert(e.op_seqno == cur_op);
990  kmr_ckpt_merge_store_ckpt_data(e.kvo_id, mr->rank, -1,
991  prev_state, merge_ctx);
992  cur_op = 0;
993  if (cur_op > ckptctx->prev_progress) {
994  kmr_ckpt_int_list_free(&spawn_dones);
995  nkvi = 0;
996  nkvo = 0;
997  }
998  break;
999  case KMR_CKPT_LOG_DELETE_START:
1000  undel_kvs_id = e.kvi_id;
1001  break;
1002  case KMR_CKPT_LOG_DELETE_FINISH:
1003  assert(e.kvi_id == undel_kvs_id);
1004  undel_kvs_id = 0;
1005  break;
1006  case KMR_CKPT_LOG_SKIPPED:
1007  kmr_ckpt_merge_store_ckpt_data(e.kvo_id, mr->rank, -1,
1008  prev_state, merge_ctx);
1009  break;
1010  }
1011  size += sizeof(e);
1012  }
1013  if (cur_op != 0) {
1014  /* Process the last log */
1015  switch (last_log.state) {
1016  case KMR_CKPT_LOG_WHOLE_START:
1017  kmr_ckpt_merge_ignore_ckpt_data(last_log.kvo_id, prev_state,
1018  merge_ctx);
1019  break;
1020  case KMR_CKPT_LOG_BLOCK_START:
1021  kmr_ckpt_merge_ignore_ckpt_data(last_log.kvo_id, prev_state,
1022  merge_ctx);
1023  break;
1024  case KMR_CKPT_LOG_BLOCK_ADD:
1025  kmr_ckpt_merge_store_ckpt_data(last_log.kvo_id, mr->rank, nkvo,
1026  prev_state, merge_ctx);
1027  kmr_ckpt_merge_update_ckpt_data(last_log.kvi_id, mr->rank,
1028  last_log.op_seqno, nkvi, NULL,
1029  prev_state, merge_ctx);
1030  break;
1031  case KMR_CKPT_LOG_INDEX_START:
1032  kmr_ckpt_merge_ignore_ckpt_data(last_log.kvo_id, prev_state,
1033  merge_ctx);
1034  break;
1035  case KMR_CKPT_LOG_INDEX_ADD:
1036  kmr_ckpt_merge_store_ckpt_data(last_log.kvo_id, mr->rank, nkvo,
1037  prev_state, merge_ctx);
1038  assert(nkvi >= spawn_dones.size);
1039  kmr_ckpt_merge_update_ckpt_data(last_log.kvi_id, mr->rank,
1040  last_log.op_seqno, nkvi,
1041  &spawn_dones, prev_state,
1042  merge_ctx);
1043  kmr_ckpt_int_list_free(&spawn_dones);
1044  break;
1045  }
1046  }
1047  if (undel_kvs_id != 0) {
1048  /* Ignore the ckpt data */
1049  kmr_ckpt_merge_ignore_ckpt_data(undel_kvs_id, prev_state, merge_ctx);
1050  }
1051  kmr_ckpt_kvs_chains_free(&kvs_chains);
1052  fclose(fp);
1053 
1054  for (int i = 0; i < prev_state->dataflst_size; i++) {
1055  if (prev_state->dataflst[i].checked != 1) {
1056  char msg[KMR_CKPT_MSGLEN];
1057  snprintf(msg, sizeof(msg),
1058  "Checkpoint state is wrong. "
1059  "Delete all checkpoint and restart again");
1060  kmr_error(mr, msg);
1061  }
1062  }
1063 }
1064 
1065 /* Read previous checkpoint data of a rank and record merge information
1066  for selective mode. */
1067 static void
1068 kmr_ckpt_restore_prev_state_each_rank_selective
1069 (KMR *mr, struct kmr_ckpt_prev_state *prev_state,
1070  struct kmr_ckpt_merge_ctx *merge_ctx)
1071 {
1072  char logfile[KMR_CKPT_PATHLEN];
1073  kmr_ckpt_make_fname(prev_state->ckpt_dir, KMR_CKPT_FNAME_PREFIX,
1074  KMR_CKPT_LOG, prev_state->prev_rank, 0,
1075  logfile, sizeof(logfile));
1076  struct kmr_ckpt_log log_hdr;
1077  unsigned long total, size = 0;
1078  FILE *fp = kmr_ckpt_open_log(mr, logfile, &log_hdr, &total);
1079 
1080  /* stores id of kvses whose ckpt data files should be deleted */
1081  struct kmr_ckpt_list kvses;
1082  kmr_ckpt_int_list_init(&kvses);
1083  while (size < total) {
1084  struct kmr_ckpt_log_entry e;
1085  size_t rc = fread((void *)&e, sizeof(e), 1, fp);
1086  if (rc != 1) {
1087  char msg[KMR_CKPT_MSGLEN];
1088  snprintf(msg, sizeof(msg),
1089  "Failed to read a checkpoint log entry");
1090  kmr_error(mr, msg);
1091  }
1092  switch (e.state) {
1093  case KMR_CKPT_LOG_DELETABLE:
1094  kmr_ckpt_int_list_add(&kvses, e.kvo_id);
1095  break;
1096  }
1097  size += sizeof(e);
1098  }
1099  struct kmr_ckpt_data_file *dataflst = prev_state->dataflst;
1100  for (int i = 0; i < prev_state->dataflst_size; i++) {
1101  struct kmr_ckpt_data_file *file = &dataflst[i];
1102  long v = kmr_ckpt_int_list_rsearch(&kvses, file->kvs_id);
1103  if (v == file->kvs_id) {
1104  /* ignore should-be-deleted checkpoint data */
1105  kmr_ckpt_merge_ignore_ckpt_data(file->kvs_id, prev_state,
1106  merge_ctx);
1107  } else {
1108  kmr_ckpt_merge_store_ckpt_data(file->kvs_id, mr->rank, -1,
1109  prev_state, merge_ctx);
1110  }
1111  }
1112  kmr_ckpt_int_list_free(&kvses);
1113  fclose(fp);
1114 
1115  for (int i = 0; i < prev_state->dataflst_size; i++) {
1116  if (prev_state->dataflst[i].checked != 1) {
1117  char msg[KMR_CKPT_MSGLEN];
1118  snprintf(msg, sizeof(msg),
1119  "Checkpoint state is wrong. "
1120  "Delete all checkpoint and restart again");
1121  kmr_error(mr, msg);
1122  }
1123  }
1124 }
1125 
1126 /* Read previous checkpoint data of a rank and record merge information. */
1127 static void
1128 kmr_ckpt_restore_prev_state_each_rank(KMR *mr,
1129  struct kmr_ckpt_prev_state *prev_state,
1130  struct kmr_ckpt_merge_ctx *merge_ctx)
1131 {
1132  if (!mr->ckpt_selective) {
1133  kmr_ckpt_restore_prev_state_each_rank_all(mr, prev_state, merge_ctx);
1134  } else {
1135  kmr_ckpt_restore_prev_state_each_rank_selective(mr, prev_state,
1136  merge_ctx);
1137  }
1138 }
1139 
1140 /* It checks if KVI_ID is contained in kvs_chains.
1141  If KVI_ID is contained it returns 0, otherwise it returns -1. */
1142 static int
1143 kmr_ckpt_merge_check_ignorable(struct kmr_ckpt_kvs_chains *chains, long kvi_id)
1144 {
1145  struct kmr_ckpt_list *c = kmr_ckpt_kvs_chains_find(chains, kvi_id);
1146  if (c != 0) {
1147  return 0;
1148  } else {
1149  return -1;
1150  }
1151 }
1152 
1153 static struct kmr_ckpt_data_file *
1154 kmr_ckpt_find_data_file(long kvs_id,
1155  struct kmr_ckpt_data_file *dataflst, int nfiles)
1156 {
1157  struct kmr_ckpt_data_file *file = 0;
1158  for (int i = 0; i < nfiles; i++) {
1159  if (dataflst[i].kvs_id == kvs_id) {
1160  file = &dataflst[i];
1161  break;
1162  }
1163  }
1164  return file;
1165 }
1166 
1167 /* Marks the ckpt data as not-to-be-merged. */
1168 static void
1169 kmr_ckpt_merge_ignore_ckpt_data(long kvo_id,
1170  struct kmr_ckpt_prev_state *prev_state,
1171  struct kmr_ckpt_merge_ctx *merge_ctx)
1172 {
1173  struct kmr_ckpt_data_file *file =
1174  kmr_ckpt_find_data_file(kvo_id, prev_state->dataflst,
1175  prev_state->dataflst_size);
1176  if (file == 0) {
1177  return;
1178  }
1179  file->checked = 1;
1180 
1181  /* If the ckpt data is once marked as 'to-be-merged',
1182  remove this ckpt data from merge source list. */
1183  if (file->merged == 1) {
1184  struct kmr_ckpt_merge *merge = 0;
1185  for (int i = 0; i < merge_ctx->merges_count; i++) {
1186  if (merge_ctx->merges[i].kvs_id == kvo_id) {
1187  merge = &merge_ctx->merges[i];
1188  break;
1189  }
1190  }
1191  assert(merge != 0);
1192  int idx = -1;
1193  struct kmr_ckpt_merge_source *mg_src = 0;
1194  for (int i = 0; i < merge->src_lst_count; i++) {
1195  if (merge->src_lst[i].rank == prev_state->prev_rank) {
1196  idx = i;
1197  mg_src = &merge->src_lst[i];
1198  break;
1199  }
1200  }
1201  assert(idx != -1 && mg_src != 0);
1202  /* Delete the found mg_src */
1203  if (mg_src->done_ikv_lst_size != 0) {
1204  kmr_free(mg_src->done_ikv_lst,
1205  sizeof(long) * (size_t)mg_src->done_ikv_lst_size);
1206  }
1207  if (merge->src_lst_count == 1) {
1208  memset(mg_src, 0, sizeof(struct kmr_ckpt_merge_source));
1209  } else {
1210  for (int i = idx; i < merge->src_lst_count - 1; i++) {
1211  struct kmr_ckpt_merge_source *target = &merge->src_lst[i];
1212  struct kmr_ckpt_merge_source *source = &merge->src_lst[i + 1];
1213  memcpy(target, source, sizeof(struct kmr_ckpt_merge_source));
1214  }
1215  memset(&merge->src_lst[merge->src_lst_count - 1], 0,
1216  sizeof(struct kmr_ckpt_merge_source));
1217  }
1218  merge->src_lst_count -= 1;
1219  }
1220  file->merged = 0;
1221 }
1222 
1223 /* Marks the ckpt data as to-be-merged. */
1224 static void
1225 kmr_ckpt_merge_store_ckpt_data(long kvo_id, int rank, long nkvo,
1226  struct kmr_ckpt_prev_state *prev_state,
1227  struct kmr_ckpt_merge_ctx *merge_ctx)
1228 {
1229  struct kmr_ckpt_data_file *file =
1230  kmr_ckpt_find_data_file(kvo_id, prev_state->dataflst,
1231  prev_state->dataflst_size);
1232  if (file == 0 || file->merged == 1) {
1233  return;
1234  }
1235  file->checked = 1;
1236  file->merged = 1;
1237 
1238  struct kmr_ckpt_merge *merge = 0;
1239  int cnt = merge_ctx->merges_count;
1240  for (int i = 0; i < cnt; i++) {
1241  if (merge_ctx->merges[i].kvs_id == kvo_id) {
1242  merge = &merge_ctx->merges[i];
1243  break;
1244  }
1245  }
1246  if (merge == 0) {
1247  /* Initialize a new merge */
1248  merge = &merge_ctx->merges[cnt++];
1249  merge->rank = rank;
1250  merge->kvs_id = kvo_id;
1251  merge->src_lst = (struct kmr_ckpt_merge_source *)
1252  kmr_malloc(sizeof(struct kmr_ckpt_merge_source) * (size_t)merge_ctx->max_each_merge);
1253  merge->src_lst_count = 0;
1254  merge_ctx->merges_count = cnt;
1255  }
1256  struct kmr_ckpt_merge_source *mg_src = &merge->src_lst[merge->src_lst_count];
1257  mg_src->rank = prev_state->prev_rank;
1258  mg_src->n_kvi = -1;
1259  mg_src->n_kvo = nkvo;
1260  mg_src->done_ikv_lst = 0;
1261  mg_src->done_ikv_lst_size = 0;
1262  mg_src->kvi_op_seqno = -1;
1263  mg_src->file = file;
1264  merge->src_lst_count += 1;
1265 }
1266 
1267 static int
1268 kmr_ckpt_cmp_long(const void *v1, const void *v2)
1269 {
1270  long _v1 = *((long *)v1);
1271  long _v2 = *((long *)v2);
1272  if ( _v1 > _v2 ) {
1273  return 1;
1274  } else if ( _v1 < _v2 ) {
1275  return -1;
1276  } else {
1277  return 0;
1278  }
1279 }
1280 
1281 /* Update information of ckpt data merge. */
1282 static void
1283 kmr_ckpt_merge_update_ckpt_data(long kvi_id, int rank,
1284  long kvi_op_seqno, long nkvi,
1285  struct kmr_ckpt_list *done_ikv_lst,
1286  struct kmr_ckpt_prev_state *prev_state,
1287  struct kmr_ckpt_merge_ctx *merge_ctx)
1288 {
1289  struct kmr_ckpt_data_file *file =
1290  kmr_ckpt_find_data_file(kvi_id, prev_state->dataflst,
1291  prev_state->dataflst_size);
1292  assert(file != 0);
1293  assert(file->checked == 1 && file->merged == 1);
1294 
1295  struct kmr_ckpt_merge *merge = 0;
1296  for (int i = 0; i < merge_ctx->merges_count; i++) {
1297  if (merge_ctx->merges[i].kvs_id == kvi_id) {
1298  merge = &merge_ctx->merges[i];
1299  break;
1300  }
1301  }
1302  assert(merge != 0);
1303  struct kmr_ckpt_merge_source *mg_src = 0;
1304  for (int i = 0; i < merge->src_lst_count; i++) {
1305  if (merge->src_lst[i].rank == prev_state->prev_rank) {
1306  mg_src = &merge->src_lst[i];
1307  break;
1308  }
1309  }
1310  assert(mg_src != 0);
1311  assert(mg_src->n_kvo == -1);
1312  mg_src->n_kvi = nkvi;
1313  mg_src->kvi_op_seqno = kvi_op_seqno;
1314  if (done_ikv_lst->size != 0) {
1315  mg_src->done_ikv_lst =
1316  (long *)kmr_malloc(sizeof(long) * (size_t)done_ikv_lst->size);
1317  struct kmr_ckpt_list_item *item;
1318  int idx = 0;
1319  for (item = done_ikv_lst->head; item != 0; item = item->next) {
1320  mg_src->done_ikv_lst[idx] = *(long *)item->val;
1321  idx += 1;
1322  }
1323  qsort(mg_src->done_ikv_lst, (size_t)done_ikv_lst->size, sizeof(long),
1324  kmr_ckpt_cmp_long);
1325  mg_src->done_ikv_lst_size = done_ikv_lst->size;
1326  }
1327 }
1328 
1329 /* Sort a previous checkpoint data of a kvs so that processed key-values
1330  are moved to the front and unprocessed ones are moved to the back.
1331  It creates a new checkpoint data and replaces pointer to data. */
1332 static void
1333 kmr_ckpt_merge_sort_data(KMR *mr, const char *wdpath, long kvs_id,
1334  struct kmr_ckpt_merge_source *mrg_src)
1335 {
1336  assert(mrg_src->file->merged == 1);
1337  struct kmr_ckpt_data_file *ndata = (struct kmr_ckpt_data_file *)
1338  kmr_malloc(sizeof(struct kmr_ckpt_data_file));
1339  ndata->kvs_id = kvs_id;
1340  ndata->checked = 1;
1341  ndata->merged = 1;
1342  snprintf(ndata->fname, sizeof(ndata->fname), "%s.sorted",
1343  mrg_src->file->fname);
1344  strncpy(ndata->dname, wdpath, sizeof(ndata->dname) - 1);
1345 
1346  char dst_fpath[KMR_CKPT_PATHLEN];
1347  snprintf(dst_fpath, KMR_CKPT_PATHLEN, "%s/%s", ndata->dname, ndata->fname);
1348  int cc = access(dst_fpath, F_OK);
1349  assert(cc != 0);
1350  FILE *wfp = kmr_ckpt_open_path(mr, dst_fpath, "w");
1351 
1352  char tmp_fpath[KMR_CKPT_PATHLEN];
1353  snprintf(tmp_fpath, KMR_CKPT_PATHLEN, "%s/%s.rest",
1354  ndata->dname, ndata->fname);
1355  cc = access(tmp_fpath, F_OK);
1356  assert(cc != 0);
1357 
1358  char src_fpath[KMR_CKPT_PATHLEN];
1359  snprintf(src_fpath, KMR_CKPT_PATHLEN, "%s/%s",
1360  mrg_src->file->dname, mrg_src->file->fname);
1361  struct stat sb;
1362  cc = stat(src_fpath, &sb);
1363  if (cc != 0) {
1364  char msg[KMR_CKPT_MSGLEN];
1365  snprintf(msg, sizeof(msg),
1366  "Failed to access a checkpoint data file %s", src_fpath);
1367  kmr_error(mr, msg);
1368  }
1369  FILE *rfp = kmr_ckpt_open_path(mr, src_fpath, "r");
1370 
1371  /* Write header */
1372  size_t hdrsiz = offsetof(struct kmr_ckpt_data, data);
1373  struct kmr_ckpt_data hdr;
1374  size_t rc = fread((void *)&hdr, hdrsiz, 1, rfp);
1375  if (rc != 1) {
1376  char msg[KMR_CKPT_MSGLEN];
1377  snprintf(msg, sizeof(msg),
1378  "Failed to read a checkpoint data file %s", src_fpath);
1379  kmr_error(mr, msg);
1380  }
1381  rc = fwrite((void *)&hdr, hdrsiz, 1, wfp);
1382  if (rc != 1) {
1383  char msg[KMR_CKPT_MSGLEN];
1384  snprintf(msg, sizeof(msg),
1385  "Failed to write a checkpoint data file %s", dst_fpath);
1386  kmr_error(mr, msg);
1387  }
1388 
1389  /* Write kv */
1390  size_t total_size = (size_t)sb.st_size - hdrsiz;
1391  size_t cur_size = 0;
1392  /* Write processed kv */
1393  {
1394  size_t read_size = 0;
1395  long idx = 0, start_idx = 0;
1396  size_t bufsiz = 8;
1397  void *buf = kmr_malloc(bufsiz);
1398  FILE *wfp2 = kmr_ckpt_open_path(mr, tmp_fpath, "w");
1399  while (read_size < total_size) {
1400  struct kmr_kvs_entry e;
1401  /* Read */
1402  size_t kv_hdrsiz = offsetof(struct kmr_kvs_entry, c);
1403  rc = fread((void *)&e, kv_hdrsiz, 1, rfp);
1404  if (rc != 1) {
1405  char msg[KMR_CKPT_MSGLEN];
1406  snprintf(msg, sizeof(msg),
1407  "Failed to read a checkpoint data file %s", src_fpath);
1408  kmr_error(mr, msg);
1409  }
1410  size_t kv_bdysiz =
1411  (size_t)KMR_ALIGN(e.klen) + (size_t)KMR_ALIGN(e.vlen);
1412  if (bufsiz < kv_bdysiz) {
1413  bufsiz = kv_bdysiz;
1414  buf = kmr_realloc(buf, bufsiz);
1415  }
1416  rc = fread(buf, kv_bdysiz, 1, rfp);
1417  if (rc != 1) {
1418  char msg[KMR_CKPT_MSGLEN];
1419  snprintf(msg, sizeof(msg),
1420  "Failed to read a checkpoint data file %s", src_fpath);
1421  kmr_error(mr, msg);
1422  }
1423  /* Write */
1424  FILE *twfp;
1425  _Bool incp = 0;
1426  for (long i = start_idx; i < mrg_src->done_ikv_lst_size; i++) {
1427  if (idx == mrg_src->done_ikv_lst[i]) {
1428  incp = 1;
1429  break;
1430  }
1431  }
1432  if (incp) {
1433  twfp = wfp;
1434  cur_size += kv_hdrsiz + kv_bdysiz;
1435  start_idx++;
1436  } else {
1437  twfp = wfp2;
1438  }
1439  rc = fwrite((void *)&e, kv_hdrsiz, 1, twfp);
1440  if (rc != 1) {
1441  char msg[KMR_CKPT_MSGLEN];
1442  snprintf(msg, sizeof(msg),
1443  "Failed to write a checkpoint data file %s",
1444  dst_fpath);
1445  kmr_error(mr, msg);
1446  }
1447  rc = fwrite(buf, kv_bdysiz, 1, twfp);
1448  if (rc != 1) {
1449  char msg[KMR_CKPT_MSGLEN];
1450  snprintf(msg, sizeof(msg),
1451  "Failed to write a checkpoint data file %s",
1452  dst_fpath);
1453  kmr_error(mr, msg);
1454  }
1455  read_size += kv_hdrsiz + kv_bdysiz;
1456  idx++;
1457  }
1458  kmr_free(buf, bufsiz);
1459  kmr_ckpt_flush(mr, wfp2);
1460  fclose(wfp2);
1461  }
1462  fclose(rfp);
1463 
1464  /* Write unprocessed kv */
1465  cc = stat(tmp_fpath, &sb);
1466  if (cc != 0) {
1467  char msg[KMR_CKPT_MSGLEN];
1468  snprintf(msg, sizeof(msg),
1469  "Failed to access a checkpoint data file %s", tmp_fpath);
1470  kmr_error(mr, msg);
1471  }
1472  rfp = kmr_ckpt_open_path(mr, tmp_fpath, "r");
1473  void *buf = kmr_malloc((size_t)sb.st_size);
1474  rc = fread(buf, (size_t)sb.st_size, 1, rfp);
1475  if (rc != 1) {
1476  char msg[KMR_CKPT_MSGLEN];
1477  snprintf(msg, sizeof(msg),
1478  "Failed to read a checkpoint data file %s", tmp_fpath);
1479  kmr_error(mr, msg);
1480  }
1481  rc = fwrite(buf, (size_t)sb.st_size, 1, wfp);
1482  if (rc != 1) {
1483  char msg[KMR_CKPT_MSGLEN];
1484  snprintf(msg, sizeof(msg),
1485  "Failed to write a checkpoint data file %s", dst_fpath);
1486  kmr_error(mr, msg);
1487  }
1488  kmr_free(buf, (size_t)sb.st_size);
1489  assert((cur_size + (size_t)sb.st_size) == total_size);
1490  fclose(rfp);
1491  unlink(tmp_fpath);
1492  kmr_ckpt_flush(mr, wfp);
1493  fclose(wfp);
1494 
1495  mrg_src->file = ndata;
1496 }
1497 
1498 /* Merge checkpoint data files */
1499 static void
1500 kmr_ckpt_merge_write_file(KMR *mr, const char *wdpath,
1501  struct kmr_ckpt_merge *merge)
1502 {
1503  char dst_fpath[KMR_CKPT_PATHLEN];
1504  kmr_ckpt_make_fname(wdpath, KMR_CKPT_FNAME_PREFIX,
1505  KMR_CKPT_DATA, merge->rank, merge->kvs_id,
1506  dst_fpath, sizeof(dst_fpath));
1507  int cc = access(dst_fpath, F_OK);
1508  assert(cc != 0);
1509  FILE *wfp = kmr_ckpt_open_path(mr, dst_fpath, "w");
1510 
1511  /* Write header */
1512  char hdr_src_fpath[KMR_CKPT_PATHLEN];
1513  snprintf(hdr_src_fpath, KMR_CKPT_PATHLEN, "%s/%s",
1514  merge->src_lst[0].file->dname, merge->src_lst[0].file->fname);
1515  FILE *rfp = kmr_ckpt_open_path(mr, hdr_src_fpath, "r");
1516  size_t hdrsiz = offsetof(struct kmr_ckpt_data, data);
1517  /* Update nprocs and rank in header */
1518  struct kmr_ckpt_data hdr;
1519  size_t rc = fread((void *)&hdr, hdrsiz, 1, rfp);
1520  if (rc != 1) {
1521  char msg[KMR_CKPT_MSGLEN];
1522  snprintf(msg, sizeof(msg),
1523  "Failed to read a checkpoint data file %s", hdr_src_fpath);
1524  kmr_error(mr, msg);
1525  }
1526  hdr.nprocs = mr->nprocs;
1527  hdr.rank = mr->rank;
1528  rc = fwrite((void *)&hdr, hdrsiz, 1, wfp);
1529  if (rc != 1) {
1530  char msg[KMR_CKPT_MSGLEN];
1531  snprintf(msg, sizeof(msg),
1532  "Failed to write a checkpoint data file %s", dst_fpath);
1533  kmr_error(mr, msg);
1534  }
1535  fclose(rfp);
1536 
1537  /* Open to-be-merged files */
1538  struct merge_file {
1539  FILE *fp;
1540  size_t size; /* total key-value size in byte */
1541  size_t cur_size; /* read key-value size in byte */
1542  };
1543  struct merge_file *mfs = (struct merge_file *)
1544  kmr_malloc(sizeof(struct merge_file) * (size_t)merge->src_lst_count);
1545  for (int i = 0; i < merge->src_lst_count; i++) {
1546  struct kmr_ckpt_data_file *file = merge->src_lst[i].file;
1547  assert(file->merged == 1);
1548  char fpath[KMR_CKPT_PATHLEN];
1549  snprintf(fpath, KMR_CKPT_PATHLEN, "%s/%s", file->dname, file->fname);
1550  struct stat sb;
1551  cc = stat(fpath, &sb);
1552  if (cc != 0) {
1553  char msg[KMR_CKPT_MSGLEN];
1554  snprintf(msg, sizeof(msg),
1555  "Failed to access a checkpoint data file %s", fpath);
1556  kmr_error(mr, msg);
1557  }
1558  hdrsiz = offsetof(struct kmr_ckpt_data, data);
1559  mfs[i].fp = kmr_ckpt_open_path(mr, fpath, "r");
1560  mfs[i].size = (size_t)sb.st_size - hdrsiz;
1561  mfs[i].cur_size = 0;
1562  fseek(mfs[i].fp, (long)hdrsiz, SEEK_SET);
1563  }
1564 
1565  /* Write processed kv when used as kvi */
1566  for (int i = 0; i < merge->src_lst_count; i++) {
1567  struct kmr_ckpt_merge_source *mg_src = &merge->src_lst[i];
1568  if (mg_src->n_kvi > 0) {
1569  assert(mg_src->n_kvo == -1);
1570  long kvicnt = 0;
1571  size_t bufsiz = 8;
1572  unsigned char *buf = (unsigned char *)kmr_malloc(bufsiz);
1573  while (mfs[i].cur_size < mfs[i].size) {
1574  if (kvicnt >= mg_src->n_kvi) {
1575  break;
1576  }
1577  struct kmr_kvs_entry e;
1578  /* Read */
1579  size_t kv_hdrsiz = offsetof(struct kmr_kvs_entry, c);
1580  rc = fread((void *)&e, kv_hdrsiz, 1, mfs[i].fp);
1581  if (rc != 1) {
1582  char msg[KMR_CKPT_MSGLEN];
1583  snprintf(msg, sizeof(msg),
1584  "Failed to read a checkpoint data file");
1585  kmr_error(mr, msg);
1586  }
1587  size_t kv_bdysiz =
1588  (size_t)KMR_ALIGN(e.klen) + (size_t)KMR_ALIGN(e.vlen);
1589  if (bufsiz < kv_bdysiz) {
1590  bufsiz = kv_bdysiz;
1591  buf = (unsigned char *)kmr_realloc(buf, bufsiz);
1592  }
1593  rc = fread((void *)buf, kv_bdysiz, 1, mfs[i].fp);
1594  if (rc != 1) {
1595  char msg[KMR_CKPT_MSGLEN];
1596  snprintf(msg, sizeof(msg),
1597  "Failed to read a checkpoint data file");
1598  kmr_error(mr, msg);
1599  }
1600  /* Write */
1601  rc = fwrite((void *)&e, kv_hdrsiz, 1, wfp);
1602  if (rc != 1) {
1603  char msg[KMR_CKPT_MSGLEN];
1604  snprintf(msg, sizeof(msg),
1605  "Failed to write a checkpoint data file %s",
1606  dst_fpath);
1607  kmr_error(mr, msg);
1608  }
1609  rc = fwrite((void *)buf, kv_bdysiz, 1, wfp);
1610  if (rc != 1) {
1611  char msg[KMR_CKPT_MSGLEN];
1612  snprintf(msg, sizeof(msg),
1613  "Failed to write a checkpoint data file %s",
1614  dst_fpath);
1615  kmr_error(mr, msg);
1616  }
1617  kvicnt += 1;
1618  mfs[i].cur_size += kv_hdrsiz + kv_bdysiz;
1619  }
1620  kmr_free(buf, bufsiz);
1621  }
1622  }
1623 
1624  /* Write generated kv when used as kvo */
1625  for (int i = 0; i < merge->src_lst_count; i++) {
1626  struct kmr_ckpt_merge_source *mg_src = &merge->src_lst[i];
1627  long kvocnt = 0;
1628  size_t bufsiz = 8;
1629  unsigned char *buf = (unsigned char *)kmr_malloc(bufsiz);
1630  while (mfs[i].cur_size < mfs[i].size) {
1631  if ((mg_src->n_kvo >= 0) && (kvocnt >= mg_src->n_kvo)) {
1632  break;
1633  }
1634  struct kmr_kvs_entry e;
1635  /* Read */
1636  size_t kv_hdrsiz = offsetof(struct kmr_kvs_entry, c);
1637  rc = fread((void *)&e, kv_hdrsiz, 1, mfs[i].fp);
1638  if (rc != 1) {
1639  char msg[KMR_CKPT_MSGLEN];
1640  snprintf(msg, sizeof(msg),
1641  "Failed to read a checkpoint data file");
1642  kmr_error(mr, msg);
1643  }
1644  size_t kv_bdysiz =
1645  (size_t)KMR_ALIGN(e.klen) + (size_t)KMR_ALIGN(e.vlen);
1646  if (bufsiz < kv_bdysiz) {
1647  bufsiz = kv_bdysiz;
1648  buf = (unsigned char *)kmr_realloc(buf, bufsiz);
1649  }
1650  rc = fread((void *)buf, kv_bdysiz, 1, mfs[i].fp);
1651  if (rc != 1) {
1652  char msg[KMR_CKPT_MSGLEN];
1653  snprintf(msg, sizeof(msg),
1654  "Failed to read a checkpoint data file");
1655  kmr_error(mr, msg);
1656  }
1657  /* Write */
1658  rc = fwrite((void *)&e, kv_hdrsiz, 1, wfp);
1659  if (rc != 1) {
1660  char msg[KMR_CKPT_MSGLEN];
1661  snprintf(msg, sizeof(msg),
1662  "Failed to write a checkpoint data file %s",
1663  dst_fpath);
1664  kmr_error(mr, msg);
1665  }
1666  rc = fwrite((void *)buf, kv_bdysiz, 1, wfp);
1667  if (rc != 1) {
1668  char msg[KMR_CKPT_MSGLEN];
1669  snprintf(msg, sizeof(msg),
1670  "Failed to write a checkpoint data file %s",
1671  dst_fpath);
1672  kmr_error(mr, msg);
1673  }
1674  kvocnt += 1;
1675  mfs[i].cur_size += kv_hdrsiz + kv_bdysiz;
1676  }
1677  kmr_free(buf, bufsiz);
1678  }
1679 
1680  for (int i = 0; i < merge->src_lst_count; i++) {
1681  fclose(mfs[i].fp);
1682  }
1683  kmr_free(mfs, sizeof(struct merge_file) * (size_t)merge->src_lst_count);
1684  kmr_ckpt_flush(mr, wfp);
1685  fclose(wfp);
1686 }
1687 
1688 /***************************************************************/
1689 /* Functions for logging */
1690 /***************************************************************/
1691 
1692 /* Initialize checkpoint log file */
1693 static void
1694 kmr_ckpt_init_log(KMR *mr, const char *log_fpath)
1695 {
1696  struct kmr_ckpt_log ckptld;
1697  memset((void *)&ckptld, 0, sizeof(ckptld));
1698  if (mr->ckpt_selective) {
1699  ckptld.mode = KMR_CKPT_SELECTIVE;
1700  } else {
1701  ckptld.mode = KMR_CKPT_ALL;
1702  }
1703  ckptld.nprocs = mr->nprocs;
1704  ckptld.rank = mr->rank;
1705  FILE *fp = kmr_ckpt_open_path(mr, log_fpath, "w");
1706  size_t size = offsetof(struct kmr_ckpt_log, data);
1707  size_t ret = fwrite((void *)&ckptld, size, 1, fp);
1708  if (ret != 1) {
1709  char msg[KMR_CKPT_MSGLEN];
1710  snprintf(msg, sizeof(msg),
1711  "Failed to write header of checkpoint log %s", log_fpath);
1712  kmr_error(mr, msg);
1713  }
1714  kmr_ckpt_flush(mr, fp);
1715  mr->ckpt_ctx->ckpt_log_fp = fp;
1716 }
1717 
1718 static void
1719 kmr_ckpt_fin_log(KMR *mr)
1720 {
1721  fclose(mr->ckpt_ctx->ckpt_log_fp);
1722 }
1723 
1724 static inline void
1725 kmr_ckpt_save_log_raw(KMR *mr, struct kmr_ckpt_log_entry *ckptle)
1726 {
1727  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
1728  size_t ret = fwrite((void *)ckptle, sizeof(struct kmr_ckpt_log_entry), 1,
1729  ckptctx->ckpt_log_fp);
1730  if (ret != 1) {
1731  char msg[KMR_CKPT_MSGLEN];
1732  snprintf(msg, sizeof(msg), "Failed to add checkpoint log");
1733  kmr_error(mr, msg);
1734  }
1735  kmr_ckpt_flush(mr, ckptctx->ckpt_log_fp);
1736 }
1737 
1738 static inline void
1739 kmr_ckpt_save_log2(KMR *mr, int state)
1740 {
1741  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
1742  struct kmr_ckpt_log_entry ckptle;
1743  ckptle.op_seqno = ckptctx->progress_counter;
1744  ckptle.kvi_id = ckptctx->cur_kvi_id;
1745  ckptle.kvo_id = ckptctx->cur_kvo_id;
1746  ckptle.state = state;
1747  ckptle.n_kvi = -1;
1748  ckptle.n_kvo = -1;
1749  kmr_ckpt_save_log_raw(mr, &ckptle);
1750 }
1751 
1752 static inline void
1753 kmr_ckpt_save_log4(KMR *mr, int state, long nkvi, long nkvo)
1754 {
1755  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
1756  struct kmr_ckpt_log_entry ckptle;
1757  ckptle.op_seqno = ckptctx->progress_counter;
1758  ckptle.kvi_id = ckptctx->cur_kvi_id;
1759  ckptle.kvo_id = ckptctx->cur_kvo_id;
1760  ckptle.state = state;
1761  ckptle.n_kvi = nkvi;
1762  ckptle.n_kvo = nkvo;
1763  kmr_ckpt_save_log_raw(mr, &ckptle);
1764 }
1765 
1766 static inline void
1767 kmr_ckpt_save_log_del(KMR *mr, int state, long kvs_id)
1768 {
1769  struct kmr_ckpt_log_entry ckptle;
1770  ckptle.op_seqno = -1;
1771  ckptle.kvi_id = kvs_id;
1772  ckptle.kvo_id = kvs_id;
1773  ckptle.state = state;
1774  ckptle.n_kvi = -1;
1775  ckptle.n_kvo = -1;
1776  kmr_ckpt_save_log_raw(mr, &ckptle);
1777 }
1778 
1779 static inline void
1780 kmr_ckpt_save_log_lock(KMR *mr, int state)
1781 {
1782  struct kmr_ckpt_log_entry ckptle;
1783  ckptle.op_seqno = -1;
1784  ckptle.kvi_id = KMR_CKPT_DUMMY_ID;
1785  ckptle.kvo_id = KMR_CKPT_DUMMY_ID;
1786  ckptle.state = state;
1787  ckptle.n_kvi = -1;
1788  ckptle.n_kvo = -1;
1789  kmr_ckpt_save_log_raw(mr, &ckptle);
1790 }
1791 
1792 /* Log the start of kvs operation.
1793  \param[in] mr MapReduce data type
1794 */
1795 static void
1796 kmr_ckpt_log_whole_start(KMR *mr)
1797 {
1798  kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_WHOLE_START);
1799 }
1800 
1801 /* Log the end of kvs operation.
1802  \param[in] mr MapReduce data type
1803 */
1804 static void
1805 kmr_ckpt_log_whole_finish(KMR *mr)
1806 {
1807  kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_WHOLE_FINISH);
1808 }
1809 
1810 /* Log the start of kvs operation.
1811  \param[in] mr MapReduce data type
1812  \param[in] kvo output KVS
1813 */
1814 static void
1815 kmr_ckpt_log_block_start(KMR *mr, KMR_KVS *kvo)
1816 {
1817  long nkvi = kmr_ckpt_first_unprocessed_kv(mr);
1818  long nkvo = (kvo == 0) ? 0 : kvo->c.element_count;
1819  kmr_ckpt_save_log4(mr, KMR_CKPT_LOG_BLOCK_START, nkvi, nkvo);
1820 }
1821 
1822 /* Log the progress of kvs operation.
1823  \param[in] mr MapReduce data type
1824  \param[in] nkvi number of processed kv in kvi
1825  \param[in] nkvo number of generated kv in kvo
1826 */
1827 static void
1828 kmr_ckpt_log_block_add(KMR *mr, long nkvi, long nkvo)
1829 {
1830  kmr_ckpt_save_log4(mr, KMR_CKPT_LOG_BLOCK_ADD, nkvi, nkvo);
1831 }
1832 
1833 /* Log the end of kvs operation.
1834  \param[in] mr MapReduce data type
1835  \param[in] kvo output KVS
1836 */
1837 static void
1838 kmr_ckpt_log_block_finish(KMR *mr)
1839 {
1840  kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_BLOCK_FINISH);
1841 }
1842 
1843 /* Log the start of kvs operation.
1844  \param[in] mr MapReduce data type
1845  \param[in] kvo output KVS
1846 */
1847 static void
1848 kmr_ckpt_log_index_start(KMR *mr, KMR_KVS *kvo)
1849 {
1850  long nkvi = kmr_ckpt_first_unprocessed_kv(mr);
1851  long nkvo = (kvo == 0) ? 0 : kvo->c.element_count;
1852  kmr_ckpt_save_log4(mr, KMR_CKPT_LOG_INDEX_START, nkvi, nkvo);
1853 }
1854 
1855 /* Log the progress of kvs operation.
1856  \param[in] mr MapReduce data type
1857  \param[in] ikv_index index of processed kv in kvi
1858  \param[in] nkvo number of generated kv in kvo
1859 */
1860 static void
1861 kmr_ckpt_log_index_add(KMR *mr, long ikv_index, long nkvo)
1862 {
1863  kmr_ckpt_save_log4(mr, KMR_CKPT_LOG_INDEX_ADD, ikv_index, nkvo);
1864 }
1865 
1866 /* Log the end of kvs operation.
1867  \param[in] mr MapReduce data type
1868 */
1869 static void
1870 kmr_ckpt_log_index_finish(KMR *mr)
1871 {
1872  kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_INDEX_FINISH);
1873 }
1874 
1875 /* Log the start of kvs delete.
1876  \param[in] mr MapReduce data type
1877  \param[in] kvs_id ID of target KVS
1878 */
1879 static void
1880 kmr_ckpt_log_delete_start(KMR *mr, long kvs_id)
1881 {
1882  kmr_ckpt_save_log_del(mr, KMR_CKPT_LOG_DELETE_START, kvs_id);
1883 }
1884 
1885 /* Log the end of kvs delete.
1886  \param[in] mr MapReduce data type
1887  \param[in] kvs_id ID of target KVS
1888 */
1889 static void
1890 kmr_ckpt_log_delete_finish(KMR *mr, long kvs_id)
1891 {
1892  kmr_ckpt_save_log_del(mr, KMR_CKPT_LOG_DELETE_FINISH, kvs_id);
1893 }
1894 
1895 /* Log that the spcified kvs can be deleted.
1896  (used in selective mode)
1897  \param[in] mr MapReduce data type
1898  \param[in] kvs_id ID of target KVS
1899 */
1900 static void
1901 kmr_ckpt_log_deletable(KMR *mr, long kvs_id)
1902 {
1903  kmr_ckpt_save_log_del(mr, KMR_CKPT_LOG_DELETABLE, kvs_id);
1904 }
1905 
1906 /* Log the progress of kvs operation.
1907  \param[in] mr MapReduce data type
1908 */
1909 static void
1910 kmr_ckpt_log_progress(KMR *mr)
1911 {
1912  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
1913  if (!(ckptctx->cur_kvi_id == KMR_CKPT_DUMMY_ID &&
1914  ckptctx->cur_kvo_id == KMR_CKPT_DUMMY_ID) ) {
1915  kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_PROGRESS);
1916  }
1917 }
1918 
1919 /* Log the skip of kvs operation.
1920  \param[in] mr MapReduce data type
1921 */
1922 static void
1923 kmr_ckpt_log_skipped(KMR *mr)
1924 {
1925  kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_SKIPPED);
1926 }
1927 
1928 /** Define the start position of code region that is referred when restart.
1929  If an execution is stopped due to an error in this region, restart with
1930  the different number of processes is not allowed.
1931 
1932  \param[in] mr MapReduce data type
1933 */
1935 {
1936  kmr_ckpt_save_log_lock(mr, KMR_CKPT_LOG_LOCK_START);
1937 }
1938 
1939 /** Define the end position of code region that is referred when restart.
1940  If an execution is stopped due to an error in this region, restart with
1941  the different number of processes is not allowed.
1942 
1943  \param[in] mr MapReduce data type
1944 */
1946 {
1947  kmr_ckpt_save_log_lock(mr, KMR_CKPT_LOG_LOCK_FINISH);
1948 }
1949 
1950 static FILE *
1951 kmr_ckpt_open_log(KMR *mr, const char *path, struct kmr_ckpt_log *log_hdr,
1952  unsigned long *size)
1953 {
1954  struct stat sb;
1955  int cc = stat(path, &sb);
1956  if (cc != 0) {
1957  char msg[KMR_CKPT_MSGLEN];
1958  snprintf(msg, sizeof(msg),
1959  "Failed to access a checkpoint log %s", path);
1960  kmr_error(mr, msg);
1961  }
1962  FILE *fp = kmr_ckpt_open_path(mr, path, "r");
1963  size_t hdrsz = offsetof(struct kmr_ckpt_log, data);
1964  size_t rc = fread((void *)log_hdr, hdrsz, 1, fp);
1965  if (rc != 1) {
1966  char msg[KMR_CKPT_MSGLEN];
1967  snprintf(msg, sizeof(msg),
1968  "Failed to read a checkpoint log %s", path);
1969  kmr_error(mr, msg);
1970  }
1971  assert(sb.st_size >= 0);
1972  assert((size_t)sb.st_size >= hdrsz);
1973  *size = (size_t)sb.st_size - hdrsz;
1974  return fp;
1975 }
1976 
1977 /***************************************************************/
1978 /* Functions for checkpoint data management */
1979 /***************************************************************/
1980 
1981 /* It returns 1 if a checkpoint data file should be written to disk.
1982  Otherwise it returns 0. */
1983 static _Bool
1984 kmr_ckpt_write_file_p(KMR *mr)
1985 {
1986  assert(mr->ckpt_enable);
1987  if (mr->ckpt_selective && !mr->ckpt_ctx->slct_cur_take_ckpt) {
1988  return 0;
1989  } else {
1990  return 1;
1991  }
1992 }
1993 
1994 /* This deletes a checkpoint data file of the specified kvs */
1995 static void
1996 kmr_ckpt_delete_ckpt_data(KMR *mr, long kvs_id)
1997 {
1998  char fpath[KMR_CKPT_PATHLEN];
1999  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2000  kmr_ckpt_make_fname(ckptctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
2001  KMR_CKPT_DATA, mr->rank, kvs_id, fpath, sizeof(fpath));
2002  int cc = access(fpath, F_OK);
2003  if (cc == 0) {
2004  kmr_ckpt_log_delete_start(mr, kvs_id);
2005  cc = unlink(fpath);
2006  assert(cc == 0);
2007  kmr_ckpt_log_delete_finish(mr, kvs_id);
2008  } else {
2009  /* checkpoint file does not exist. do nothing. */
2010  }
2011 }
2012 
2013 /* This deletes checkpoint files in specified directory.
2014  It also deletes the directory if it becomes empty.
2015  If the directory does not exist, it does nothing. */
2016 static void
2017 kmr_ckpt_delete_ckpt_files(KMR *mr, const char *target_dir, int rank)
2018 {
2019  struct stat sb;
2020  int cc = stat(target_dir, &sb);
2021  if (cc == 0) {
2022  if (!S_ISDIR(sb.st_mode)) {
2023  char msg[KMR_CKPT_MSGLEN];
2024  snprintf(msg, sizeof(msg),
2025  "File %s should not exist or "
2026  "if exists, shoud be a directory.", target_dir);
2027  kmr_error(mr, msg);
2028  }
2029  } else {
2030  return; /* directory does not exist. */
2031  }
2032 
2033  /* Remove all checkpoint data file */
2034  struct kmr_ckpt_data_file *dataflst = NULL;
2035  int nfiles;
2036  kmr_ckpt_get_data_flist(mr, target_dir, &dataflst, &nfiles, 0);
2037  for (int i = 0; i < nfiles; i++) {
2038  char fpath[KMR_CKPT_PATHLEN];
2039  snprintf(fpath, sizeof(fpath), "%s/%s", target_dir, dataflst[i].fname);
2040  cc = access(fpath, F_OK);
2041  if (cc == 0) {
2042  unlink(fpath);
2043  } else {
2044  fprintf(stderr,
2045  "Failed to delete checkpoint file %s on rank[%05d]\n",
2046  fpath, rank);
2047  }
2048  }
2049  if (dataflst != NULL) {
2050  kmr_free(dataflst, sizeof(struct kmr_ckpt_data_file) * (size_t)nfiles);
2051  }
2052  /* Remove checkpoint log file */
2053  {
2054  char fpath[KMR_CKPT_PATHLEN];
2055  kmr_ckpt_make_fname(target_dir, KMR_CKPT_FNAME_PREFIX, KMR_CKPT_LOG,
2056  rank, 0, fpath, sizeof(fpath));
2057  cc = access(fpath, F_OK);
2058  if (cc == 0) {
2059  unlink(fpath);
2060  }
2061  }
2062  /* Delete nprocs file on rank 0 */
2063  if (mr->rank == 0) {
2064  char fpath[KMR_CKPT_PATHLEN];
2065  memset(fpath, 0, sizeof(fpath));
2066  snprintf(fpath, sizeof(fpath), "%s/nprocs", target_dir);
2067  cc = access(fpath, F_OK);
2068  if (cc == 0) {
2069  unlink(fpath);
2070  }
2071  }
2072  /* Delete checkpoint directory */
2073  cc = rmdir(target_dir);
2074  assert(cc == 0);
2075 }
2076 
2077 /* It finds a checkpoint data file named FNAME from DNAME directory and
2078  stores the file info. to FILE. If SETALL is 1, it reads the file to
2079  fill in all fields of FILE. */
2080 static void
2081 kmr_ckpt_init_data_file(KMR *mr, const char *dname , const char *fname,
2082  _Bool setall, struct kmr_ckpt_data_file *file)
2083 {
2084  char fpath[KMR_CKPT_PATHLEN];
2085  snprintf(fpath, KMR_CKPT_PATHLEN, "%s/%s", dname, fname);
2086  int cc = access(fpath, F_OK);
2087  if (cc != 0) {
2088  char msg[KMR_CKPT_MSGLEN];
2089  snprintf(msg, sizeof(msg),
2090  "Failed to access checkpoint file %s", fpath);
2091  kmr_error(mr, msg);
2092  }
2093  if (setall) {
2094  struct kmr_ckpt_data hdr;
2095  size_t hdrsz = offsetof(struct kmr_ckpt_data, data);
2096  FILE *fp = kmr_ckpt_open_path(mr, fpath, "r");
2097  size_t rc = fread((void *)&hdr, hdrsz, 1, fp);
2098  if (rc == 1) {
2099  file->kvs_id = hdr.kvs_id;
2100  } else {
2101  char msg[KMR_CKPT_MSGLEN];
2102  snprintf(msg, sizeof(msg),
2103  "Failed to read checkpoint file %s. Ignore this file",
2104  fpath);
2105  kmr_warning(mr, 1, msg);
2106  file->kvs_id = KMR_CKPT_DUMMY_ID;
2107  file->checked = 1;
2108  }
2109  fclose(fp);
2110  }
2111  strncpy(file->fname, fname, sizeof(file->fname) - 1);
2112  strncpy(file->dname, dname, sizeof(file->dname) - 1);
2113 }
2114 
2115 /* Save all kv in kvs to a checkpoint data file.
2116  \param[in] kvs target KVS
2117 */
2118 static inline void
2119 kmr_ckpt_save_ckpt(KMR_KVS *kvs) {
2120  struct kmr_ckpt_ctx *ckptctx = kvs->c.mr->ckpt_ctx;
2121  size_t tsize = kvs->c.storage_netsize + offsetof(struct kmr_ckpt_data, data);
2122  void *buf = kmr_malloc(tsize);
2123  memset(buf, 0, tsize);
2124 
2125  struct kmr_ckpt_data *ckpt = (struct kmr_ckpt_data *)buf;
2126  ckpt->nprocs = kvs->c.mr->nprocs;
2127  ckpt->rank = kvs->c.mr->rank;
2128  ckpt->kvs_id = kvs->c.ckpt_kvs_id;
2129  ckpt->key_data = kvs->c.key_data;
2130  ckpt->value_data = kvs->c.value_data;
2131 
2132  enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvs->c.key_data);
2133  enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvs->c.value_data);
2134 
2135  unsigned char *p = (unsigned char *)&ckpt->data[0];
2136  ckptctx->saved_current_block = kvs->c.current_block; // save current_block
2137  kvs->c.current_block = kvs->c.first_block;
2138  long cnt = 0;
2139  while (cnt < kvs->c.element_count) {
2140  assert(kvs->c.current_block != 0);
2141  struct kmr_kv_box ev;
2142  struct kmr_kvs_block *b = kvs->c.current_block;
2143  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, b);
2144  for (long i = 0; i < b->partial_element_count; i++) {
2145  assert(e != 0);
2146  ev = kmr_pick_kv(e, kvs);
2147  kmr_poke_kv2((struct kmr_kvs_entry *)p, ev, 0, keyf, valf, 0);
2148  p += kmr_kvs_entry_netsize((struct kmr_kvs_entry *)p);
2149  e = kmr_kvs_next(kvs, e, 1);
2150  cnt++;
2151  }
2152  kvs->c.current_block = b->next;
2153  }
2154  kvs->c.current_block = ckptctx->saved_current_block; // restore current_block
2155 
2156  FILE *fp = kmr_ckpt_open(kvs, "w");
2157  size_t ret = fwrite(buf, tsize, 1, fp);
2158  if (ret != 1) {
2159  char msg[KMR_CKPT_MSGLEN];
2160  snprintf(msg, sizeof(msg), "Checkpoint: save checkpoint error write failed");
2161  kmr_error(kvs->c.mr, msg);
2162  }
2163  kmr_ckpt_flush(kvs->c.mr, fp);
2164  fclose(fp);
2165  if (buf != NULL) {
2166  free(buf);
2167  }
2168 }
2169 
2170 /* Create a checkpoint file only with header and open it */
2171 static inline void
2172 kmr_ckpt_kv_record_init_data(KMR *mr, KMR_KVS *kvs)
2173 {
2174  if (kvs == 0) {
2175  mr->ckpt_ctx->ckpt_data_fp = NULL;
2176  return;
2177  }
2178 
2179  FILE *fp;
2180  char fpath[KMR_CKPT_PATHLEN];
2181  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2182  kmr_ckpt_make_fname(ckptctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
2183  KMR_CKPT_DATA, mr->rank, kvs->c.ckpt_kvs_id,
2184  fpath, sizeof(fpath));
2185  int cc = access(fpath, W_OK);
2186  if (cc == 0) {
2187  /* if checkpoint already exist, open it. */
2188  fp = kmr_ckpt_open_path(mr, fpath, "a+");
2189  } else if (cc < 0 && errno == ENOENT) {
2190  /* if checkpoint does not exist, create it. */
2191  struct kmr_ckpt_data ckpt;
2192  memset((void *)&ckpt, 0, sizeof(ckpt));
2193  ckpt.nprocs = mr->nprocs;
2194  ckpt.rank = mr->rank;
2195  ckpt.kvs_id = kvs->c.ckpt_kvs_id;
2196  ckpt.key_data = kvs->c.key_data;
2197  ckpt.value_data = kvs->c.value_data;
2198  fp = kmr_ckpt_open_path(mr, fpath, "w+");
2199  size_t size = offsetof(struct kmr_ckpt_data, data);
2200  size_t ret = fwrite((void *)&ckpt, size, 1, fp);
2201  if (ret != 1) {
2202  char msg[KMR_CKPT_MSGLEN];
2203  snprintf(msg, sizeof(msg),
2204  "Failed to write header of checkpoint file %s", fpath);
2205  kmr_error(mr, msg);
2206  }
2207  kmr_ckpt_flush(mr, fp);
2208  } else {
2209  fp = 0;
2210  assert(0);
2211  }
2212  ckptctx->ckpt_data_fp = fp;
2213 }
2214 
2215 /* Initialization for KV recording while executing Map/Reduce.
2216  \param[in] kvi input KVS
2217  \param[in] kvo output KVS
2218 */
2219 static void
2220 kmr_ckpt_kv_record_init(KMR *mr, KMR_KVS *kvo)
2221 {
2222  /* initialize kvo checkpoint file */
2223  kmr_ckpt_kv_record_init_data(mr, kvo);
2224  /* set element_count, adding_point, currnet_block to ckpt_ctx */
2225  if (kvo != 0) {
2226  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2227  ckptctx->saved_current_block = kvo->c.current_block;
2228  ckptctx->saved_adding_point = kvo->c.adding_point;
2229  ckptctx->saved_element_count = kvo->c.element_count;
2230  }
2231 }
2232 
2233 /* Add new kv in kvs to checkpoint data.
2234  As a result of Map/Reduce operation, newly generated kv(s)
2235  are added to checkpoint data file.
2236  \param[in] kvo target KVS
2237  \return number of kv added to a checkpoint data
2238 */
2239 static long
2240 kmr_ckpt_kv_record_add(KMR_KVS *kvo)
2241 {
2242  if (kvo == 0) {
2243  return 0;
2244  }
2245 
2246  struct kmr_ckpt_ctx *ckptctx = kvo->c.mr->ckpt_ctx;
2247  assert(ckptctx->ckpt_data_fp != NULL);
2248  long cnt = kvo->c.element_count - ckptctx->saved_element_count;
2249  assert(cnt >= 0);
2250  struct kmr_kvs_block *b = ckptctx->saved_current_block;
2251  if (b == 0) {
2252  b = kvo->c.first_block;
2253  }
2254  struct kmr_kvs_entry *e = ckptctx->saved_adding_point;
2255  if (e == 0) {
2256  e = kmr_kvs_first_entry(kvo, b);
2257  }
2258 
2259  for (long i = 0; i < cnt; i++) {
2260  if (kmr_kvs_entry_tail_p(e)) {
2261  b = b->next;
2262  assert(b != 0);
2263  e = kmr_kvs_first_entry(kvo, b);
2264  }
2265  /* save data in e as checkpoint data. */
2266  size_t size = kmr_kvs_entry_netsize(e);
2267  size_t ret = fwrite((void *)e, size, 1, ckptctx->ckpt_data_fp);
2268  if (ret != 1) {
2269  char msg[KMR_CKPT_MSGLEN];
2270  snprintf(msg, sizeof(msg),
2271  "Failed to add kv to a checkpoint file");
2272  kmr_error(kvo->c.mr, msg);
2273  }
2274  e = kmr_kvs_next_entry(kvo, e);
2275  }
2276  kmr_ckpt_flush(kvo->c.mr, ckptctx->ckpt_data_fp);
2277  ckptctx->saved_current_block = b;
2278  ckptctx->saved_adding_point = e;
2279  ckptctx->saved_element_count = kvo->c.element_count;
2280  return cnt;
2281 }
2282 
2283 /* Finish KV recording
2284  \param[in] kvi input KVS
2285  \param[in] kvo output KVS
2286 */
2287 static void
2288 kmr_ckpt_kv_record_fin(KMR *mr)
2289 {
2290  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2291  /* close opened checkpoint data file. */
2292  if (ckptctx->ckpt_data_fp != NULL) {
2293  kmr_ckpt_flush(mr, ckptctx->ckpt_data_fp);
2294  fclose(ckptctx->ckpt_data_fp);
2295  }
2296  /* cleanup context. */
2297  ckptctx->ckpt_data_fp = NULL;
2298  ckptctx->saved_element_count = 0;
2299  ckptctx->saved_adding_point = NULL;
2300  ckptctx->saved_current_block = NULL;
2301 }
2302 
2303 /* List up ckeckpoint data files in the specified directory DNAME.
2304  It sets files to FLIST and its count to NFILES.
2305  If SETALL is 1, it reads data files to initialize all fields
2306  of structure. */
2307 static void
2308 kmr_ckpt_get_data_flist(KMR *mr, const char *dname,
2309  struct kmr_ckpt_data_file **flist, int *nfiles,
2310  _Bool setall)
2311 {
2312  struct stat sb;
2313  int cc = stat(dname, &sb);
2314  if (cc < 0) {
2315  *nfiles = 0;
2316  return;
2317  }
2318  if (!S_ISDIR(sb.st_mode)) {
2319  fprintf(stderr, "File %s is not a directory.\n", dname);
2320  *nfiles = 0;
2321  return;
2322  }
2323 
2324  size_t direntsz;
2325  long nmax = pathconf(dname, _PC_NAME_MAX);
2326  if (nmax == -1) {
2327  direntsz = (64 * 1024);
2328  } else {
2329  direntsz = (offsetof(struct dirent, d_name) + (size_t)nmax + 1);
2330  }
2331  DIR *d;
2332  struct dirent *dent;
2333  char b[direntsz];
2334 
2335  d = opendir(dname);
2336  if (d == NULL) {
2337  fprintf(stderr, "Failed to open directory %s.\n", dname);
2338  *nfiles = 0;
2339  return;
2340  }
2341 
2342  char prefix[KMR_CKPT_PATHLEN];
2343  snprintf(prefix, KMR_CKPT_PATHLEN, KMR_CKPT_FNAME_PREFIX"_data_");
2344  int cnt = 0;
2345  while (readdir_r(d, (void *)b, &dent) == 0) {
2346  if (dent == NULL) {
2347  break;
2348  }
2349  cc = strncmp(dent->d_name, prefix, strlen(prefix));
2350  if (cc == 0) {
2351  cnt++;
2352  }
2353  }
2354 
2355  size_t siz = sizeof(struct kmr_ckpt_data_file) * (size_t)cnt;
2356  struct kmr_ckpt_data_file *dataflst = kmr_malloc(siz);
2357  memset(dataflst, 0, siz);
2358 
2359  rewinddir(d);
2360  cnt = 0;
2361  while (readdir_r(d, (void *)b, &dent) == 0) {
2362  if (dent == NULL) {
2363  break;
2364  }
2365  cc = strncmp(dent->d_name, prefix, strlen(prefix));
2366  if (cc == 0) {
2367  kmr_ckpt_init_data_file(mr, dname, dent->d_name, setall,
2368  &dataflst[cnt]);
2369  cnt++;
2370  }
2371  }
2372  (void)closedir(d);
2373 
2374  *flist = dataflst;
2375  *nfiles = cnt;
2376 }
2377 
2378 /* save mr->nprocs to file. */
2379 static void
2380 kmr_ckpt_save_nprocs(KMR *mr, const char *dname)
2381 {
2382  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2383  const char *target = (dname != 0) ? dname : ckptctx->ckpt_dname;
2384  char fpath[KMR_CKPT_PATHLEN], wstring[128], msg[KMR_CKPT_MSGLEN];
2385  memset(fpath, 0, sizeof(fpath));
2386  snprintf(fpath, sizeof(fpath), "%s/nprocs", target);
2387  int cc = access(fpath, R_OK);
2388  if (cc == 0) {
2389  return;
2390  } else {
2391  FILE *fp = kmr_ckpt_open_path(mr, fpath, "w");
2392  if (fp == NULL) {
2393  snprintf(msg, sizeof(msg),
2394  "Failed to open nprocs file %s", fpath);
2395  kmr_error(mr, msg);
2396  }
2397  memset(wstring, 0, sizeof(wstring));
2398  snprintf(wstring, sizeof(wstring), "nprocs=%d\n", mr->nprocs);
2399  size_t ret = fwrite(wstring, strlen(wstring), 1, fp);
2400  if (ret != 1) {
2401  snprintf(msg, sizeof(msg), "Failed to save nprocs to file %s",
2402  fpath);
2403  kmr_error(mr, msg);
2404  }
2405  kmr_ckpt_flush(mr, fp);
2406  fclose(fp);
2407  }
2408 }
2409 
2410 /* Open KVS checkpoint data file. */
2411 static FILE *
2412 kmr_ckpt_open(KMR_KVS *kvs, const char *mode)
2413 {
2414  char fpath[KMR_CKPT_PATHLEN];
2415  KMR *mr = kvs->c.mr;
2416  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2417  kmr_ckpt_make_fname(ckptctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
2418  KMR_CKPT_DATA, mr->rank, kvs->c.ckpt_kvs_id,
2419  fpath, sizeof(fpath));
2420  FILE *fp = kmr_ckpt_open_path(mr, fpath, mode);
2421  return fp;
2422 }
2423 
2424 /* Open file using file path. */
2425 static FILE *
2426 kmr_ckpt_open_path(KMR *mr, const char *fpath, const char *mode)
2427 {
2428  FILE *fp = fopen(fpath, mode);
2429  if (fp == NULL) {
2430  char msg[KMR_CKPT_MSGLEN];
2431  snprintf(msg, sizeof(msg),
2432  "Failed to open a checkpoint file %s", fpath);
2433  kmr_error(mr, msg);
2434  }
2435  int cc = fcntl(fileno(fp), F_SETFD, FD_CLOEXEC);
2436  assert(cc == 0);
2437  return fp;
2438 }
2439 
2440 /* Compose checkpoint file name. */
2441 static void
2442 kmr_ckpt_make_fname(const char *dirname, const char *fprefix,
2443  enum kmr_ckpt_type type,
2444  int rank, long kvs_id, char *fpath, size_t len)
2445 {
2446  memset(fpath, 0, len);
2447  assert(type == KMR_CKPT_DATA || type == KMR_CKPT_LOG);
2448  if (type == KMR_CKPT_DATA) {
2449  snprintf(fpath, len-1, "%s/%s_data_%05d_%03ld",
2450  dirname, fprefix, rank, kvs_id);
2451  } else if (type == KMR_CKPT_LOG) {
2452  snprintf(fpath, len-1, "%s/%s_log_%05d",
2453  dirname, fprefix, rank);
2454  }
2455 }
2456 
2457 /* Flush write to the specified file */
2458 static void
2459 kmr_ckpt_flush(KMR *mr, FILE *fp)
2460 {
2461  fflush(fp);
2462  if (!mr->ckpt_no_fsync) {
2463  int cc = fsync(fileno(fp));
2464  assert(cc == 0);
2465  }
2466 }
2467 
2468 /***************************************************************/
2469 /* Public functions */
2470 /***************************************************************/
2471 
2472 /** Check if checkpoint/restart is enabled.
2473 
2474  \param[in] mr MapReduce data type
2475  \return It returns 1 if checkpoint/restart is enabled.
2476  Otherwise it returns 0.
2477 */
2478 int
2480 {
2481  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2482  if (mr->ckpt_enable == 1 && ckptctx->initialized) {
2483  return 1;
2484  } else {
2485  return 0;
2486  }
2487 }
2488 
2489 /** It temporally disables checkpoint/restart.
2490 
2491  \param[in] mr MapReduce data type
2492  \return If it succeeds disabling, it returns a lock id.
2493  Otherwise it returns 0.
2494 */
2496 {
2497  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2498  if (mr->ckpt_enable == 1 && ckptctx->initialized &&
2499  ckptctx->lock_id == 0) {
2500  mr->ckpt_enable = 0;
2501  ckptctx->lock_id = ++ckptctx->lock_counter;
2502  return ckptctx->lock_id;
2503  } else {
2504  return 0;
2505  }
2506 }
2507 
2508 /** It temporally enables checkpoint/restart which has been
2509  disabled by calling kmr_ckpt_disable_ckpt().
2510 
2511  \param[in] mr MapReduce data type
2512  \param[in] lock_id ID of lock returned by kmr_ckpt_disable_ckpt()
2513  \return If it succeeds enabling, it returns 1.
2514  Otherwise it returns 0.
2515 */
2516 int kmr_ckpt_enable_ckpt(KMR *mr, int lock_id)
2517 {
2518  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2519  if (mr->ckpt_enable == 0 && ckptctx->initialized &&
2520  ckptctx->lock_id == lock_id) {
2521  mr->ckpt_enable = 1;
2522  ckptctx->lock_id = 0;
2523  return 1;
2524  } else {
2525  return 0;
2526  }
2527 }
2528 
2529 /** It returns the index of the first unprocessed key-value in the input KVS.
2530 
2531  \param[in] mr MapReduce data type
2532  \return It returns the index of the first unprocessed key-value
2533  in the input KVS.
2534 */
2535 long
2537 {
2538  if (mr->ckpt_selective) {
2539  return 0;
2540  }
2541  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2542  long op_seqno = ckptctx->progress_counter;
2543  long start_from = 0;
2544  for (int i = 0; i < ckptctx->kv_positions_count; i++) {
2545  if (ckptctx->kv_positions[i].op_seqno == op_seqno) {
2546  start_from = ckptctx->kv_positions[i].start_from;
2547  break;
2548  }
2549  }
2550  return start_from;
2551 }
2552 
2553 /** It restores checkpoint data to kvs.
2554 
2555  \param[out] kvs an KVS where the checkpoint data will be restored
2556 */
2557 void
2559 {
2560  KMR *mr = kvs->c.mr;
2561  char fpath[KMR_CKPT_PATHLEN];
2562  kmr_ckpt_make_fname(mr->ckpt_ctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
2563  KMR_CKPT_DATA, mr->rank, kvs->c.ckpt_kvs_id,
2564  fpath, sizeof(fpath));
2565  int cc = access(fpath, R_OK);
2566  if (cc != 0) {
2567  /* checkpoint file does not exist. */
2568  return;
2569  }
2570  struct stat sb;
2571  cc = stat(fpath, &sb);
2572  if (cc != 0) {
2573  char msg[KMR_CKPT_MSGLEN];
2574  snprintf(msg, sizeof(msg),
2575  "Failed to access a checkpoint file %s", fpath);
2576  kmr_error(kvs->c.mr, msg);
2577  }
2578  size_t siz = (size_t)sb.st_size;
2579  void *buf = kmr_malloc(siz);
2580  FILE *fp = kmr_ckpt_open_path(kvs->c.mr, fpath, "r");
2581  size_t ret = fread(buf, siz, 1, fp);
2582  if (ret != 1) {
2583  char msg[KMR_CKPT_MSGLEN];
2584  snprintf(msg, sizeof(msg),
2585  "Failed to load a checkpoint file %s", fpath);
2586  kmr_error(kvs->c.mr, msg);
2587  }
2588  fclose(fp);
2589 
2590  struct kmr_ckpt_data *ckpt = (struct kmr_ckpt_data *)buf;
2591  size_t cur_siz = offsetof(struct kmr_ckpt_data, data);
2592  struct kmr_kvs_entry *e = (struct kmr_kvs_entry *)&ckpt->data[0];
2593  while (cur_siz < siz) {
2594  struct kmr_kv_box kv;
2595 // if ( kmr_kvs_entry_tail_p(e) ) {
2596 // break;
2597 // }
2598  assert(e != 0);
2599  kv = kmr_pick_kv(e, kvs); // This is ok.
2600  kmr_add_kv(kvs, kv);
2601  cur_siz += kmr_kvs_entry_netsize(e);
2602  e = kmr_kvs_next_entry(kvs, e);
2603  }
2604  kmr_free(buf, siz);
2605  assert(cur_siz == siz);
2606 }
2607 
2608 /** It removes checkpoint data file.
2609 
2610  \param[in] kvs KVS whose checkpoint data is removed
2611 */
2612 void
2614 {
2615  KMR *mr = kvs->c.mr;
2616  if (!mr->ckpt_selective) {
2617  /* delete the checkpoint data file */
2618  kmr_ckpt_delete_ckpt_data(mr, kvs->c.ckpt_kvs_id);
2619  } else {
2620  /* just mark as deletable */
2621  char fpath[KMR_CKPT_PATHLEN];
2622  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2623  kmr_ckpt_make_fname(ckptctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
2624  KMR_CKPT_DATA, mr->rank, kvs->c.ckpt_kvs_id,
2625  fpath, sizeof(fpath));
2626  int cc = access(fpath, F_OK);
2627  if (cc == 0) {
2628  kmr_ckpt_log_deletable(mr, kvs->c.ckpt_kvs_id);
2629  }
2630  }
2631 }
2632 
2633 /** It saves all key-value pairs in the output KVS to a checkpoint data file.
2634 
2635  \param[in] kvi input KVS
2636  \param[in] kvo output KVS
2637 */
2638 void
2640 {
2641  if (kmr_ckpt_write_file_p(mr)) {
2642  kmr_ckpt_log_whole_start(mr);
2643  kmr_ckpt_save_ckpt(kvo);
2644  kmr_ckpt_log_whole_finish(mr);
2645  }
2646 }
2647 
2648 /** It initializes saving blocks of key-value pairs of the output KVS to
2649  a checkpoint data file.
2650 
2651  \param[in] mr MapReduce data type
2652  \param[in] kvo output KVS
2653 */
2654 void
2656 {
2657  if (!mr->ckpt_selective) {
2658  kmr_ckpt_log_block_start(mr, kvo);
2659  kmr_ckpt_kv_record_init(mr, kvo);
2660  }
2661 }
2662 
2663 /** It adds a new block of key-value pairs of the output KVS to the
2664  checkpoint data file.
2665 
2666  \param[in] mr MapReduce data type
2667  \param[in] kvo output KVS
2668  \param[in] nkvi number of processed kv in the input KVS
2669 */
2670 void
2672 {
2673  if (!mr->ckpt_selective) {
2674  long nkvo = kmr_ckpt_kv_record_add(kvo);
2675  kmr_ckpt_log_block_add(mr, nkvi, nkvo);
2676  }
2677 }
2678 
2679 /** It finalizes saving block of key-value pairs of the output KVS to
2680  the checkpoint data file.
2681 
2682  \param[in] mr MapReduce data type
2683  \param[in] kvo output KVS
2684 */
2685 void
2687 {
2688  if (!mr->ckpt_selective) {
2689  kmr_ckpt_kv_record_fin(mr);
2690  kmr_ckpt_log_block_finish(mr);
2691  } else {
2692  /* incase of selective mode, save all key-values here */
2693  kmr_ckpt_save_kvo_whole(mr, kvo);
2694  }
2695 }
2696 
2697 /** It initializes saving indexed key-value pairs of the output KVS
2698  to a checkpoint data file.
2699 
2700  \param[in] mr MapReduce data type
2701  \param[in] kvo output KVS
2702 */
2703 void
2705 {
2706  if (!mr->ckpt_selective) {
2707  kmr_ckpt_log_index_start(mr, kvo);
2708  kmr_ckpt_kv_record_init(mr, kvo);
2709  }
2710 }
2711 
2712 /** It adds new key-value pairs of the output KVS to the checkpoint data file.
2713 
2714  \param[in] mr MapReduce data type
2715  \param[in] kvo output KVS
2716  \param[in] ikv_index index of processed kv in the input KVS
2717 */
2718 void
2719 kmr_ckpt_save_kvo_each_add(KMR *mr, KMR_KVS *kvo, long ikv_index)
2720 {
2721  if (!mr->ckpt_selective) {
2722  long nkvo = kmr_ckpt_kv_record_add(kvo);
2723  kmr_ckpt_log_index_add(mr, ikv_index, nkvo);
2724  }
2725 }
2726 
2727 /** It finalizes saving indexed key-value pairs of the output KVS
2728  to the checkpoint data file.
2729 
2730  \param[in] mr MapReduce data type
2731  \param[in] kvo output KVS
2732 */
2733 void
2735 {
2736  if (!mr->ckpt_selective) {
2737  kmr_ckpt_kv_record_fin(mr);
2738  kmr_ckpt_log_index_finish(mr);
2739  } else {
2740  /* incase of selective mode, save all key-values here */
2741  kmr_ckpt_save_kvo_whole(mr, kvo);
2742  }
2743 }
2744 
2745 /** It initializes a progress of MapReduce checkpointing.
2746 
2747  \param[in] kvi input KVS to a MapReduce operation
2748  \param[in] kvo output KVS to the MapReduce operation
2749  \param[in] opt struct kmr_option
2750  \return It returns 1 if operation can be skipped.
2751  Otherwise it returns 0.
2752 */
2753 int
2755 {
2756  KMR *mr = (kvo != 0) ? kvo->c.mr : kvi->c.mr;
2757  if (opt.keep_open) {
2758  char msg[KMR_CKPT_MSGLEN];
2759  snprintf(msg, sizeof(msg),
2760  "'keep_open' option can't be used when checkpoint/restart"
2761  " is enabled");
2762  kmr_error(mr, msg);
2763  }
2764 
2765  /* initialize progress */
2766  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2767  ckptctx->progress_counter += 1;
2768  if (kvi != 0) {
2769  kvi->c.ckpt_consumed_op = ckptctx->progress_counter;
2770  }
2771  if (kvo != 0) {
2772  kvo->c.ckpt_generated_op = ckptctx->progress_counter;
2773  }
2774  assert(ckptctx->cur_kvi_id == KMR_CKPT_DUMMY_ID);
2775  assert(ckptctx->cur_kvo_id == KMR_CKPT_DUMMY_ID);
2776  if (kvi != 0) {
2777  ckptctx->cur_kvi_id = kvi->c.ckpt_kvs_id;
2778  } else {
2779  ckptctx->cur_kvi_id = KMR_CKPT_DUMMY_ID;
2780  }
2781  if (kvo != 0) {
2782  ckptctx->cur_kvo_id = kvo->c.ckpt_kvs_id;
2783  } else {
2784  ckptctx->cur_kvo_id = KMR_CKPT_DUMMY_ID;
2785  }
2786 
2787  /* check if the operation can be skipped or not */
2788  int do_skip;
2789  long progress = ckptctx->progress_counter;
2790  if (!mr->ckpt_selective) {
2791  /* ckpt all */
2792  if (progress <= ckptctx->prev_global_progress) {
2793  do_skip = 1;
2794  } else if (progress > ckptctx->prev_global_progress &&
2795  progress <= ckptctx->prev_progress ) {
2796  if (kvi == 0) { /* In case of kmr_map_once */
2797  do_skip = 0;
2798  } else {
2799  if (kvi->c.element_count == 0) {
2800  do_skip = 1;
2801  } else {
2802  do_skip = 0;
2803  }
2804  }
2805  } else { /* progress > ckptctx->prev_progress */
2806  do_skip = 0;
2807  }
2808  } else {
2809  /* ckpt selective */
2810  if (progress <= ckptctx->prev_global_progress) {
2811  do_skip = 1;
2812  } else if (progress > ckptctx->prev_global_progress &&
2813  progress <= ckptctx->prev_progress ) {
2814  long v = kmr_ckpt_int_list_del(ckptctx->slct_skip_ops, progress);
2815  if (v == progress) {
2816  do_skip = 1;
2817  } else {
2818  do_skip = 0;
2819  }
2820  } else { /* progress > ckptctx->prev_progress */
2821  do_skip = 0;
2822  }
2823  }
2824  if (do_skip == 1) {
2825  kmr_ckpt_log_skipped(mr);
2826  ckptctx->cur_kvi_id = KMR_CKPT_DUMMY_ID;
2827  ckptctx->cur_kvo_id = KMR_CKPT_DUMMY_ID;
2828  return 1;
2829  }
2830 
2831  /* initialize a checkpoint */
2832  if (mr->ckpt_selective) {
2833  if (opt.take_ckpt) {
2834  ckptctx->slct_cur_take_ckpt = 1;
2835  }
2836  }
2837 
2838  return 0;
2839 }
2840 
2841 /** It finalizes the progress of MapReduce checkpointing.
2842 
2843  \param[in] mr MapReduce data type
2844 */
2845 void
2847 {
2848  kmr_ckpt_log_progress(mr);
2849  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2850  if (mr->ckpt_selective) {
2851  ckptctx->slct_cur_take_ckpt = 0;
2852  }
2853  ckptctx->cur_kvi_id = KMR_CKPT_DUMMY_ID;
2854  ckptctx->cur_kvo_id = KMR_CKPT_DUMMY_ID;
2855 }
2856 
2857 /***************************************************************/
2858 /* Utility functions */
2859 /***************************************************************/
2860 
2861 /* Initialize list */
2862 static void
2863 kmr_ckpt_list_init(struct kmr_ckpt_list *list,
2864  kmr_ckpt_list_alocfn_t alocfn,
2865  kmr_ckpt_list_freefn_t freefn,
2866  kmr_ckpt_list_compfn_t compfn)
2867 {
2868  list->head = 0;
2869  list->tail = 0;
2870  list->size = 0;
2871  list->alocfn = alocfn;
2872  list->freefn = freefn;
2873  list->compfn = compfn;
2874 }
2875 
2876 /* Clear all data in the list */
2877 static void
2878 kmr_ckpt_list_free(struct kmr_ckpt_list *list)
2879 {
2880  struct kmr_ckpt_list_item *item, *del;
2881  for (item = list->head; item != 0; ) {
2882  del = item;
2883  item = item->next;
2884  (*(list->freefn))(del);
2885  }
2886  kmr_ckpt_list_init(list, list->alocfn, list->freefn, list->compfn);
2887 }
2888 
2889 /* Add an item to the tail of the list.
2890  If size of list is over max count, it deletes the oldest item. */
2891 static void
2892 kmr_ckpt_list_add(struct kmr_ckpt_list *list, void *val)
2893 {
2894  struct kmr_ckpt_list_item *item;
2895  size_t isize = sizeof(struct kmr_ckpt_list_item);
2896  if (list->size == KMR_CKPT_LIST_MAX) {
2897  item = list->head;
2898  list->head = list->head->next;
2899  list->head->prev = 0;
2900  (*(list->freefn))(item->val);
2901  kmr_free(item, isize);
2902  list->size -= 1;
2903  }
2904  item = (struct kmr_ckpt_list_item *)kmr_malloc(isize);
2905  item->val = (*(list->alocfn))(val);
2906  item->next = 0;
2907  item->prev = 0;
2908  if (list->head == 0) {
2909  list->head = item;
2910  list->tail = item;
2911  } else {
2912  list->tail->next = item;
2913  item->prev = list->tail;
2914  list->tail = item;
2915  }
2916  list->size += 1;
2917 }
2918 
2919 /* Delete the specified value from the list.
2920  If it successes to delete it returns the value, otherwise it returns NULL.
2921  It searches from the head. */
2922 static void *
2923 kmr_ckpt_list_del(struct kmr_ckpt_list *list, void *val)
2924 {
2925  _Bool found = 0;
2926  struct kmr_ckpt_list_item *item;
2927  for (item = list->head; item != 0; item = item->next) {
2928  if ((*(list->compfn))(item->val, val) == 0) {
2929  found = 1;
2930  break;
2931  }
2932  }
2933  if (found) {
2934  void *ret = item->val;
2935  if (!(item == list->head || item == list->tail)) {
2936  item->prev->next = item->next;
2937  item->next->prev = item->prev;
2938  }
2939  if (item == list->head) {
2940  list->head = item->next;
2941  if (list->head != 0) {
2942  list->head->prev = 0;
2943  }
2944  }
2945  if (item == list->tail) {
2946  list->tail = item->prev;
2947  if (list->tail != 0) {
2948  list->tail->next = 0;
2949  }
2950  }
2951  kmr_free(item, sizeof(struct kmr_ckpt_list_item));
2952  list->size -= 1;
2953  return ret;
2954  } else {
2955  return 0;
2956  }
2957 }
2958 
2959 /* It searchs the specified value from the head of the list.
2960  If the value is found it returns the value, otherwise it returns NULL. */
2961 static void *
2962 kmr_ckpt_list_search(struct kmr_ckpt_list *list, void *val)
2963 {
2964  struct kmr_ckpt_list_item *item;
2965  for (item = list->head; item != 0; item = item->next) {
2966  if ((*(list->compfn))(item->val, val) == 0) {
2967  return item->val;
2968  }
2969  }
2970  return 0;
2971 }
2972 
2973 /* It searchs the specified value from the tail of the list.
2974  If the value is found it returns the value, otherwise it returns NULL. */
2975 static void *
2976 kmr_ckpt_list_rsearch(struct kmr_ckpt_list *list, void *val)
2977 {
2978  struct kmr_ckpt_list_item *item;
2979  for (item = list->tail; item != 0; item = item->prev) {
2980  if ((*(list->compfn))(item->val, val) == 0) {
2981  return item->val;
2982  }
2983  }
2984  return 0;
2985 }
2986 
2987 /* allocator for int item */
2988 static void *
2989 kmr_ckpt_int_list_alocfn(void *val)
2990 {
2991  long *v = kmr_malloc(sizeof(long));
2992  *v = *(long *)val;
2993  return v;
2994 }
2995 
2996 /* deallocator for int item */
2997 static void
2998 kmr_ckpt_int_list_freefn(void *val)
2999 {
3000  kmr_free(val, sizeof(long));
3001 }
3002 
3003 /* comparetor for int item */
3004 static int
3005 kmr_ckpt_int_list_compfn(void *v1, void *v2)
3006 {
3007  long _v1 = *(long *)v1;
3008  long _v2 = *(long *)v2;
3009  if ( _v1 > _v2 ) {
3010  return 1;
3011  } else if ( _v1 < _v2 ) {
3012  return -1;
3013  } else {
3014  return 0;
3015  }
3016 }
3017 
3018 /* Initialize integer list */
3019 static void
3020 kmr_ckpt_int_list_init(struct kmr_ckpt_list *list)
3021 {
3022  kmr_ckpt_list_init(list, kmr_ckpt_int_list_alocfn,
3023  kmr_ckpt_int_list_freefn, kmr_ckpt_int_list_compfn);
3024 }
3025 
3026 /* Clear all data in the list */
3027 static void
3028 kmr_ckpt_int_list_free(struct kmr_ckpt_list *list)
3029 {
3030  kmr_ckpt_list_free(list);
3031 }
3032 
3033 /* Add an integer to the tail of the list.
3034  If size of list is over max count, it deletes the oldest value. */
3035 static void
3036 kmr_ckpt_int_list_add(struct kmr_ckpt_list *list, long val)
3037 {
3038  kmr_ckpt_list_add(list, &val);
3039 }
3040 
3041 /* Delete the specified integer from the list.
3042  If it successes to delete it returns the value, otherwise it returns -1.
3043  It searches from the head. */
3044 static long
3045 kmr_ckpt_int_list_del(struct kmr_ckpt_list *list, long val)
3046 {
3047  long *v = (long *)kmr_ckpt_list_del(list, &val);
3048  if (v != 0) {
3049  return *v;
3050  } else {
3051  return -1;
3052  }
3053 }
3054 
3055 /* It searchs the specified integer from the head of the list.
3056  If the value is found it returns the value, otherwise it returns -1. */
3057 static long
3058 kmr_ckpt_int_list_search(struct kmr_ckpt_list *list, long val)
3059 {
3060  long *v = (long *)kmr_ckpt_list_search(list, &val);
3061  if (v != 0) {
3062  return *v;
3063  } else {
3064  return -1;
3065  }
3066 }
3067 
3068 /* It searchs the specified integer from the tail of the list.
3069  If the value is found it returns the value, otherwise it returns -1. */
3070 static long
3071 kmr_ckpt_int_list_rsearch(struct kmr_ckpt_list *list, long val)
3072 {
3073  long *v = (long *)kmr_ckpt_list_rsearch(list, &val);
3074  if (v != 0) {
3075  return *v;
3076  } else {
3077  return -1;
3078  }
3079 }
3080 
3081 #if 0
3082 /* Test integer list.
3083  Before call this function, set KMR_CKPT_LIST_MAX as 2. */
3084 static void test_kmr_ckpt_int_list()
3085 {
3086  struct kmr_ckpt_list list;
3087  kmr_ckpt_int_list_init(&list);
3088  long v = kmr_ckpt_int_list_del(&list, 1);
3089  assert(v == -1);
3090  assert(list.size == 0);
3091  v = kmr_ckpt_int_list_search(&list, 1);
3092  assert(v == -1);
3093  assert(list.size == 0);
3094  v = kmr_ckpt_int_list_rsearch(&list, 1);
3095  assert(v == -1);
3096  assert(list.size == 0);
3097  kmr_ckpt_int_list_add(&list, 10);
3098  assert(list.size == 1);
3099  kmr_ckpt_int_list_add(&list, 20);
3100  assert(list.size == 2);
3101  kmr_ckpt_int_list_add(&list, 30);
3102  assert(list.size == 2);
3103  v = kmr_ckpt_int_list_search(&list, 10);
3104  assert(v == -1);
3105  v = kmr_ckpt_int_list_rsearch(&list, 10);
3106  assert(v == -1);
3107  v = kmr_ckpt_int_list_search(&list, 20);
3108  assert(v == 20);
3109  v = kmr_ckpt_int_list_rsearch(&list, 20);
3110  assert(v == 20);
3111  v = kmr_ckpt_int_list_search(&list, 30);
3112  assert(v == 30);
3113  v = kmr_ckpt_int_list_rsearch(&list, 30);
3114  assert(v == 30);
3115  v = kmr_ckpt_int_list_del(&list, 1);
3116  assert(v == -1);
3117  assert(list.size == 2);
3118  v = kmr_ckpt_int_list_del(&list, 20);
3119  assert(v == 20);
3120  assert(list.size == 1);
3121  v = kmr_ckpt_int_list_del(&list, 30);
3122  assert(v == 30);
3123  assert(list.head == 0);
3124  assert(list.tail == 0);
3125  kmr_ckpt_int_list_free(&list);
3126  fprintf(stderr, "interger list test done.\n");
3127 }
3128 #endif
3129 
3130 /* allocator for operation item */
3131 static void *
3132 kmr_ckpt_opr_list_alocfn(void *val)
3133 {
3134  struct kmr_ckpt_operation *v = kmr_malloc(sizeof(struct kmr_ckpt_operation));
3135  *v = *(struct kmr_ckpt_operation *)val;
3136  return v;
3137 }
3138 
3139 /* deallocator for operation item */
3140 static void
3141 kmr_ckpt_opr_list_freefn(void *val)
3142 {
3143  kmr_free(val, sizeof(struct kmr_ckpt_operation));
3144 }
3145 
3146 /* comparetor for operation item */
3147 static int
3148 kmr_ckpt_opr_list_compfn(void *v1, void *v2)
3149 {
3150  struct kmr_ckpt_operation _v1 = *(struct kmr_ckpt_operation *)v1;
3151  struct kmr_ckpt_operation _v2 = *(struct kmr_ckpt_operation *)v2;
3152  if ( _v1.op_seqno > _v2.op_seqno ) {
3153  return 1;
3154  } else if ( _v1.op_seqno < _v2.op_seqno ) {
3155  return -1;
3156  } else {
3157  return 0;
3158  }
3159 }
3160 
3161 /* Initialize operation list */
3162 static void
3163 kmr_ckpt_opr_list_init(struct kmr_ckpt_list *list)
3164 {
3165  kmr_ckpt_list_init(list, kmr_ckpt_opr_list_alocfn,
3166  kmr_ckpt_opr_list_freefn, kmr_ckpt_opr_list_compfn);
3167 }
3168 
3169 /* Clear all data in the list */
3170 static void
3171 kmr_ckpt_opr_list_free(struct kmr_ckpt_list *list)
3172 {
3173  kmr_ckpt_list_free(list);
3174 }
3175 
3176 /* Add an operation to the tail of the list.
3177  If size of list is over max count, it deletes the oldest value. */
3178 static void
3179 kmr_ckpt_opr_list_add(struct kmr_ckpt_list *list, struct kmr_ckpt_operation op)
3180 {
3181  kmr_ckpt_list_add(list, &op);
3182 }
3183 
3184 /* Initialize kvs chains. */
3185 static void
3186 kmr_ckpt_kvs_chains_init(struct kmr_ckpt_kvs_chains *chains)
3187 {
3188  chains->chainlst = 0;
3189  chains->chainlst_size = 0;
3190 }
3191 
3192 /* Free kvs chains */
3193 static void
3194 kmr_ckpt_kvs_chains_free(struct kmr_ckpt_kvs_chains *chains)
3195 {
3196  for (int i = 0; i < chains->chainlst_size; i++) {
3197  struct kmr_ckpt_list *list = &chains->chainlst[i];
3198  kmr_ckpt_opr_list_free(list);
3199  }
3200  kmr_ckpt_kvs_chains_init(chains);
3201 }
3202 
3203 /* Create a new chain. */
3204 static void
3205 kmr_ckpt_kvs_chains_new_chain(struct kmr_ckpt_kvs_chains *chains,
3206  struct kmr_ckpt_operation op)
3207 {
3208  int idx = chains->chainlst_size;
3209  chains->chainlst_size += 1;
3210  chains->chainlst = (struct kmr_ckpt_list *)
3211  kmr_realloc(chains->chainlst,
3212  sizeof(struct kmr_ckpt_list) * (size_t)chains->chainlst_size);
3213  struct kmr_ckpt_list *list = &chains->chainlst[idx];
3214  kmr_ckpt_opr_list_init(list);
3215  kmr_ckpt_opr_list_add(list, op);
3216 }
3217 
3218 /* Connect an operation to an existing chain. */
3219 static void
3220 kmr_ckpt_kvs_chains_connect(struct kmr_ckpt_kvs_chains *chains,
3221  struct kmr_ckpt_operation op)
3222 {
3223  struct kmr_ckpt_list *list = kmr_ckpt_kvs_chains_find(chains, op.kvi_id);
3224  if (list != 0) {
3225  kmr_ckpt_opr_list_add(list, op);
3226  } else {
3227  kmr_ckpt_kvs_chains_new_chain(chains, op);
3228  }
3229 }
3230 
3231 /* Find a chain that contains KVS_ID.
3232  If all chains are closed (tha last operation's kvo is not KMR_CKPT_DUMMY_ID),
3233  it returns null. */
3234 static struct kmr_ckpt_list *
3235 kmr_ckpt_kvs_chains_find(struct kmr_ckpt_kvs_chains *chains, long kvo_id)
3236 {
3237  for (int i = 0; i < chains->chainlst_size; i++) {
3238  struct kmr_ckpt_list *list = &chains->chainlst[i];
3239  struct kmr_ckpt_operation *last_op =
3240  (struct kmr_ckpt_operation *)list->tail->val;
3241  if (last_op->kvo_id == KMR_CKPT_DUMMY_ID) {
3242  /* chain is closed */
3243  continue;
3244  }
3245  struct kmr_ckpt_list_item *item;
3246  for (item = list->tail; item != 0; item = item->prev) {
3247  struct kmr_ckpt_operation *op =
3248  (struct kmr_ckpt_operation *)item->val;
3249  if (op->kvo_id == kvo_id) {
3250  return list;
3251  }
3252  }
3253  }
3254  return 0;
3255 }
3256 
3257 #if 0
3258 /* Test KVS chains. */
3259 static void test_kmr_ckpt_kvs_chains()
3260 {
3261  struct kmr_ckpt_kvs_chains chains;
3262  kmr_ckpt_kvs_chains_init(&chains);
3263  struct kmr_ckpt_operation op1 = { .op_seqno = 1,
3264  .kvi_id = KMR_CKPT_DUMMY_ID,
3265  .kvo_id = 1 };
3266  kmr_ckpt_kvs_chains_new_chain(&chains, op1);
3267  assert(chains.chainlst_size == 1);
3268  struct kmr_ckpt_operation op2 = { .op_seqno = 2,
3269  .kvi_id = 1,
3270  .kvo_id = 2 };
3271  kmr_ckpt_kvs_chains_connect(&chains, op2);
3272  assert(chains.chainlst_size == 1);
3273  assert(chains.chainlst[0].size == 2);
3274  struct kmr_ckpt_operation op3 = { .op_seqno = 3,
3275  .kvi_id = KMR_CKPT_DUMMY_ID,
3276  .kvo_id = 3 };
3277  kmr_ckpt_kvs_chains_new_chain(&chains, op3);
3278  assert(chains.chainlst_size == 2);
3279  kmr_ckpt_kvs_chains_free(&chains);
3280  fprintf(stderr, "kvs chains test done.\n");
3281 }
3282 #endif
3283 
3284 
3285 /*
3286 Copyright (C) 2012-2018 RIKEN R-CCS
3287 This library is distributed WITHOUT ANY WARRANTY. This library can be
3288 redistributed and/or modified under the terms of the BSD 2-Clause License.
3289 */
Key-Value Stream (abstract).
Definition: kmr.h:632
void kmr_ckpt_save_kvo_each_add(KMR *mr, KMR_KVS *kvo, long ikv_index)
It adds new key-value pairs of the output KVS to the checkpoint data file.
Definition: kmrckpt.c:2719
Utilities Private Part (do not include from applications).
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:658
void kmr_ckpt_remove_ckpt(KMR_KVS *kvs)
It removes checkpoint data file.
Definition: kmrckpt.c:2613
#define KMR_ALIGN(X)
Rounds up a given size to the alignment restriction (currently eight bytes).
Definition: kmrimpl.h:75
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:809
void kmr_ckpt_lock_start(KMR *mr)
Define the start position of code region that is referred when restart.
Definition: kmrckpt.c:1934
int kmr_ckpt_enable_ckpt(KMR *mr, int lock_id)
It temporally enables checkpoint/restart which has been disabled by calling kmr_ckpt_disable_ckpt().
Definition: kmrckpt.c:2516
int kmr_ckpt_disable_ckpt(KMR *mr)
It temporally disables checkpoint/restart.
Definition: kmrckpt.c:2495
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:177
void kmr_ckpt_save_kvo_block_fin(KMR *mr, KMR_KVS *kvo)
It finalizes saving block of key-value pairs of the output KVS to the checkpoint data file...
Definition: kmrckpt.c:2686
void kmr_ckpt_save_kvo_whole(KMR *mr, KMR_KVS *kvo)
It saves all key-value pairs in the output KVS to a checkpoint data file.
Definition: kmrckpt.c:2639
Definition: kmr.h:391
void kmr_ckpt_free_context(KMR *mr)
Free checkpoint context.
Definition: kmrckpt.c:162
KMR Context.
Definition: kmr.h:247
void kmr_ckpt_save_kvo_each_fin(KMR *mr, KMR_KVS *kvo)
It finalizes saving indexed key-value pairs of the output KVS to the checkpoint data file...
Definition: kmrckpt.c:2734
kmr_kv_field
Datatypes of Keys or Values.
Definition: kmr.h:368
void kmr_ckpt_save_kvo_block_add(KMR *mr, KMR_KVS *kvo, long nkvi)
It adds a new block of key-value pairs of the output KVS to the checkpoint data file.
Definition: kmrckpt.c:2671
long kmr_ckpt_first_unprocessed_kv(KMR *mr)
It returns the index of the first unprocessed key-value in the input KVS.
Definition: kmrckpt.c:2536
Handy Copy of a Key-Value Field.
Definition: kmr.h:401
void kmr_ckpt_save_kvo_each_init(KMR *mr, KMR_KVS *kvo)
It initializes saving indexed key-value pairs of the output KVS to a checkpoint data file...
Definition: kmrckpt.c:2704
void kmr_ckpt_progress_fin(KMR *mr)
It finalizes the progress of MapReduce checkpointing.
Definition: kmrckpt.c:2846
void kmr_ckpt_lock_finish(KMR *mr)
Define the end position of code region that is referred when restart.
Definition: kmrckpt.c:1945
void kmr_ckpt_create_context(KMR *mr)
Initialize checkpoint context.
Definition: kmrckpt.c:119
#define kmr_realloc(P, Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:181
KMR Interface.
Checkpoint/Restart Support.
int kmr_ckpt_progress_init(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
It initializes a progress of MapReduce checkpointing.
Definition: kmrckpt.c:2754
int kmr_ckpt_enabled(KMR *mr)
Check if checkpoint/restart is enabled.
Definition: kmrckpt.c:2479
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv().
Definition: kmrimpl.h:551
Definition: kmrckpt.h:143
void kmr_ckpt_save_kvo_block_init(KMR *mr, KMR_KVS *kvo)
It initializes saving blocks of key-value pairs of the output KVS to a checkpoint data file...
Definition: kmrckpt.c:2655
void kmr_ckpt_restore_ckpt(KMR_KVS *kvs)
It restores checkpoint data to kvs.
Definition: kmrckpt.c:2558