KMR
testckpt1.c
1 /* testckpt1.c (2014-04-09) */
2 
3 /* K-Means program for testing checkpoint restart.
4 
5  This program can be run like the following.
6 
7  $ mpirun -np 4 ./a.out -c 2 -p 10
8 
9  It will be aborted at the end of the second iteration of k-means
10  algorithm. There will be two types of checkpoint files.
11 
12  1. checkpoint files generated by KMR stored in 'ckptdir0000*'
13  directories.
14  2. a checkpoint file that stores k-means progress.
15  Its file name is 'kmeans_ckpt'.
16 
17  You can restart the program from the state stored in the above
18  checkpoint files. In this case, you don't need to specify
19  parameters. If you do, they will be ignored.
20 
21  $ mpirun -np 4 ./a.out
22 
23  The program will be completed after you run it three times.
24 
25  When run with the following parameters, this program will perform
26  answer checking.
27 
28  $ mpirun -np 2 ./a.out -c 4 -p 10
29  $ mpirun -np 4 ./a.out -c 2 -p 10
30  $ mpirun -np 4 ./a.out -c 4 -p 10
31  $ mpirun -np 8 ./a.out -c 4 -p 10
32 
33 */
34 
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <unistd.h>
38 #include <sys/time.h>
39 #include <limits.h>
40 #include <math.h>
41 #include <mpi.h>
42 #include "kmr.h"
43 
44 #define DEF_NUM_ITERATION 10
45 #define DEF_NUM_POINTS 10000
46 #define DEF_NUM_MEANS 100
47 #define DEF_DIM 3
48 #define DEF_GRID_SIZE 1000
49 
50 #define KMEANS_CKPT_FILE "./kmeans_ckpt"
51 #define KMEANS_CKPT_FILE_OLD "./kmeans_ckpt.bk"
52 
53 struct kmr_option kmr_inspect = { .inspect = 1 };
54 
55 typedef struct {
56  // number of processes
57  int nprocs;
58  // number of k-means iteration
59  int n_iteration;
60  // current iteration number
61  int c_iteration;
62  // size of grid
63  int grid_size;
64  // dimention of points
65  int dim;
66  // number of points
67  int n_points;
68  // points
69  int *points;
70  // number of means
71  int n_means;
72  // means
73  int *means;
74 } kmeans_t;
75 
76 
77 /*
78  * Ths following code for random number is copied from C standard
79  * chapter 7.20.2.
80  *
81  * http://www.open-std.org/jtc1/sc22/wg14/www/docs/n1256.pdf
82  */
83 static unsigned long int _rand_next = 1;
84 
85 static int _rand(void)
86 {
87  _rand_next = _rand_next * 1103515245 + 12345;
88  return (unsigned int)(_rand_next/65536) % 32768;
89 }
90 
91 static void _srand(unsigned int seed)
92 {
93  _rand_next = seed;
94 }
95 
96 
97 static int
98 measure_time(struct timeval *tv)
99 {
100  MPI_Barrier(MPI_COMM_WORLD);
101  if (gettimeofday(tv, NULL) == -1) {
102  perror("gettimeofday");
103  return -1;
104  }
105  return 0;
106 }
107 
108 static double
109 calc_time_diff(struct timeval *tv_s, struct timeval *tv_e)
110 {
111  return ((double)tv_e->tv_sec - (double)tv_s->tv_sec)
112  + ((double)tv_e->tv_usec - (double)tv_s->tv_usec) /1000000.0;
113 }
114 
115 static _Bool
116 is_restart()
117 {
118  _Bool result = 0;
119  int ret = access(KMEANS_CKPT_FILE, R_OK);
120  if (ret == 0) {
121  result = 1;
122  }
123  return result;
124 }
125 
126 /* Parse commandline arguments */
127 static void
128 parse_args(int argc, char **argv, kmeans_t *kmeans)
129 {
130  int c;
131  extern char *optarg;
132 
133  while ((c = getopt(argc, argv, "i:d:c:p:s:")) != EOF) {
134  switch (c) {
135  case 'i':
136  kmeans->n_iteration = atoi(optarg);
137  break;
138  case 'd':
139  kmeans->dim = atoi(optarg);
140  break;
141  case 'c':
142  kmeans->n_means = atoi(optarg);
143  break;
144  case 'p':
145  kmeans->n_points = atoi(optarg);
146  break;
147  case 's':
148  kmeans->grid_size = atoi(optarg);
149  break;
150  case '?':
151  printf("Usage: %s -i <num iteration> -d <vector dimension> "
152  "-c <num clusters> -p <num points> -s <max value>\n",
153  argv[0]);
154  MPI_Abort(MPI_COMM_WORLD, 1);
155  }
156  }
157 
158  if (kmeans->n_iteration <= 0 || kmeans->dim <= 0 || kmeans->n_means <= 0
159  || kmeans->n_points <= 0 || kmeans->grid_size <= 0) {
160  printf("Illegal argument value. All values must be numeric and "
161  "greater than 0\n");
162  MPI_Abort(MPI_COMM_WORLD, 1);
163  }
164 }
165 
166 static int
167 save_kmeans_state(kmeans_t *kmeans)
168 {
169  const char *tmp_file = KMEANS_CKPT_FILE ".tmp";
170  FILE *fp = fopen(tmp_file, "w+");
171  if (fp == NULL) {
172  fprintf(stderr, "Failed to open kmeans state file.\n");
173  return 1;
174  }
175  size_t size = sizeof(kmeans_t);
176  size_t ret = fwrite(&size, sizeof(size_t), 1, fp);
177  if (ret != 1) {
178  fprintf(stderr, "Failed to write size of kmeans data.\n");
179  return 1;
180  }
181  ret = fwrite(kmeans, size, 1, fp);
182  if (ret != 1) {
183  fprintf(stderr, "Failed to write kmeans data.\n");
184  return 1;
185  }
186  size = sizeof(int) * (size_t)kmeans->n_means * (size_t)kmeans->dim;
187  ret = fwrite(&size, sizeof(size_t), 1, fp);
188  if (ret != 1) {
189  fprintf(stderr, "Failed to write size of means array.\n");
190  return 1;
191  }
192  ret = fwrite(kmeans->means, size, 1, fp);
193  if (ret != 1) {
194  fprintf(stderr, "Failed to write means array.\n");
195  return 1;
196  }
197  fflush(fp);
198  fsync(fileno(fp));
199  fclose(fp);
200 
201  rename(KMEANS_CKPT_FILE, KMEANS_CKPT_FILE_OLD);
202  rename(tmp_file, KMEANS_CKPT_FILE);
203  return 0;
204 }
205 
206 static int
207 load_kmeans_state(kmeans_t *kmeans)
208 {
209  FILE *fp = fopen(KMEANS_CKPT_FILE, "r");
210  if (fp == NULL) {
211  fprintf(stderr, "Failed to open kmeans state file.\n");
212  return 1;
213  }
214  size_t size;
215  size_t ret = fread(&size, sizeof(size_t), 1, fp);
216  if (ret != 1) {
217  fprintf(stderr, "Failed to read size of kmeans data.\n");
218  return 1;
219  }
220  ret = fread(kmeans, size, 1, fp);
221  if (ret != 1) {
222  fprintf(stderr, "Failed to read kmeans data.\n");
223  return 1;
224  }
225  kmeans->points = NULL;
226  ret = fread(&size, sizeof(size_t), 1, fp);
227  if (ret != 1) {
228  fprintf(stderr, "Failed to read size of means array.\n");
229  return 1;
230  }
231  if (size != (size_t)kmeans->n_means * (size_t)kmeans->dim * sizeof(int)) {
232  fprintf(stderr, "The size of means array is mismatch.\n");
233  return 1;
234  }
235  kmeans->means = (int*)malloc(size);
236  ret = fread(kmeans->means, size, 1, fp);
237  if (ret != 1) {
238  fprintf(stderr, "Failed to read kmeans data.\n");
239  return 1;
240  }
241  return 0;
242 }
243 
244 static void
245 delete_kmeans_state()
246 {
247  remove(KMEANS_CKPT_FILE_OLD);
248  remove(KMEANS_CKPT_FILE);
249 }
250 
251 /* Generate the points. */
252 static void
253 generate_randoms(int *pts, int size, int max_val)
254 {
255  int i;
256  for (i = 0; i < size; i++)
257  pts[i] = (_rand() % max_val);
258 }
259 
260 /* KMR map function
261  It copies multi-dimension points to KVS.
262 */
263 static int
264 generate_points(const struct kmr_kv_box kv,
265  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
266 {
267  int i;
268  kmeans_t *kmeans = (kmeans_t *)p;
269  int *points = kmeans->points;
270 
271  for (i = 0; i < kmeans->n_points * kmeans->dim; i += kmeans->dim) {
272  struct kmr_kv_box nkv = { .klen = (int)sizeof(long),
273  .vlen = kmeans->dim * (int)sizeof(int),
274  .k.i = i,
275  .v.p = (void *)&points[i] };
276  kmr_add_kv(kvo, nkv);
277  }
278 
279  return MPI_SUCCESS;
280 }
281 
282 /* calculate squared distance
283  */
284 static unsigned int
285 calc_sq_dist(int *v1, int *v2, int dim)
286 {
287  int i;
288  unsigned int sum = 0;
289  for (i = 0; i < dim; i++)
290  sum += (unsigned int)((v1[i] - v2[i]) * (v1[i] - v2[i]));
291  return sum;
292 }
293 
294 /* KMR map function
295  It calculates cluster of a point stored in kv.
296  It emits a Key-Value whose key is cluster id (index of array
297  "kmeans.means") and value is the point.
298 */
299 static int
300 calc_cluster(const struct kmr_kv_box kv,
301  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
302 {
303  int i;
304  kmeans_t *kmeans = (kmeans_t *)p;
305  int dim = kmeans->dim;
306  int *means = kmeans->means;
307  int n_means = kmeans->n_means;
308  int *point = (int *)kv.v.p;
309  int min_idx = 0;
310  unsigned int min_dist = calc_sq_dist(point, &means[0], dim);
311 
312  for (i = 1; i < n_means; i++) {
313  unsigned int dist = calc_sq_dist(point, &means[i * dim], dim);
314  if (dist < min_dist) {
315  min_idx = i;
316  min_dist = dist;
317  }
318  }
319  struct kmr_kv_box nkv = { .klen = (int)sizeof(long),
320  .vlen = dim * (int)sizeof(int),
321  .k.i = min_idx,
322  .v.p = (void *)point };
323  kmr_add_kv(kvo, nkv);
324 
325  return MPI_SUCCESS;
326 }
327 
328 /* KMR reduce function
329  It calculates center of clusters.
330  It emits a Key-Value whose key is cluster id and value is the new center.
331 */
332 static int
333 update_cluster(const struct kmr_kv_box kv[], const long n,
334  const KMR_KVS *kvi, KMR_KVS *kvo, void *p)
335 {
336  int i, j;
337  int cid = (int)kv[0].k.i;
338  kmeans_t *kmeans = (kmeans_t *)p;
339  int dim = kmeans->dim;
340  long sum[dim];
341  int average[dim];
342 
343  for (i = 0; i < dim; i++)
344  sum[i] = 0;
345  for (i = 0; i < n; i++) {
346  int *point = (int *)kv[i].v.p;
347  for (j = 0; j < dim; j++) {
348  sum[j] += point[j];
349  }
350  }
351  for (i = 0; i < dim; i++)
352  average[i] = (int)(sum[i] / n);
353 
354  struct kmr_kv_box nkv = { .klen = sizeof(long),
355  .vlen = dim * (int)sizeof(int),
356  .k.i = cid,
357  .v.p = (void *)average };
358  kmr_add_kv(kvo, nkv);
359 
360  return MPI_SUCCESS;
361 }
362 
363 /* KMR map function
364  It copies centers of clusters stored in kv to "kmeans.means" array.
365 */
366 static int
367 copy_center(const struct kmr_kv_box kv,
368  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
369 {
370  int i;
371  int cid = (int)kv.k.i;
372  int *center = (int *)kv.v.p;
373  kmeans_t *kmeans = (kmeans_t *)p;
374  int dim = kmeans->dim;
375  int *target = &kmeans->means[cid * dim];
376 
377  for (i = 0; i < dim; i++)
378  target[i] = center[i];
379 
380  return MPI_SUCCESS;
381 }
382 
383 static void
384 print_means(int *means, int size, int dim)
385 {
386  int i, j;
387  for (i = 0; i < size; i++) {
388  int *mean = &means[i * dim];
389  fprintf(stderr, "( ");
390  for (j = 0; j < dim; j++)
391  fprintf(stderr, "%d ", mean[j]);
392  fprintf(stderr, ")\n");
393  }
394 }
395 
396 static void
397 print_progress(const char *msg, int rank, int n_itr)
398 {
399  MPI_Barrier(MPI_COMM_WORLD);
400  if (rank == 0) {
401  fprintf(stderr, "ITR[%d]: %s\n", n_itr, msg);
402  }
403  MPI_Barrier(MPI_COMM_WORLD);
404 }
405 
406 static void
407 check_answer(kmeans_t *kmeans)
408 {
409  int count = 0;
410  int *answer = NULL;
411  if (kmeans->nprocs == 2 &&
412  kmeans->dim == DEF_DIM &&
413  kmeans->grid_size == DEF_GRID_SIZE &&
414  kmeans->n_points == 10 &&
415  kmeans->n_means == 4 &&
416  kmeans->n_iteration >= DEF_NUM_ITERATION) {
417  // 2 nodes, 10 points, 4 centers
418  printf("\n"
419  "Case of 2 nodes, 10 points and 4 centers.\n"
420  "Check the answer...");
421  count = 4 * kmeans->dim;
422  answer = (int*)malloc((size_t)count * sizeof(int));
423  answer[0] = 906;
424  answer[1] = 733;
425  answer[2] = 301;
426  answer[3] = 426;
427  answer[4] = 179;
428  answer[5] = 279;
429  answer[6] = 121;
430  answer[7] = 702;
431  answer[8] = 107;
432  answer[9] = 349;
433  answer[10] = 727;
434  answer[11] = 751;
435  } else if (kmeans->nprocs == 4 &&
436  kmeans->dim == DEF_DIM &&
437  kmeans->grid_size == DEF_GRID_SIZE &&
438  kmeans->n_points == 10 &&
439  kmeans->n_means == 2 &&
440  kmeans->n_iteration >= DEF_NUM_ITERATION) {
441  // 4 nodes, 10 points, 2 centers
442  printf("\n"
443  "Case of 4 nodes, 10 points and 2 centers.\n"
444  "Check the answer...");
445  count = 2 * kmeans->dim;
446  answer = (int*)malloc((size_t)count * sizeof(int));
447  answer[0] = 830;
448  answer[1] = 787;
449  answer[2] = 389;
450  answer[3] = 346;
451  answer[4] = 425;
452  answer[5] = 448;
453  } else if (kmeans->nprocs == 4 &&
454  kmeans->dim == DEF_DIM &&
455  kmeans->grid_size == DEF_GRID_SIZE &&
456  kmeans->n_points == 10 &&
457  kmeans->n_means == 4 &&
458  kmeans->n_iteration >= DEF_NUM_ITERATION) {
459  // 4 nodes, 10 points, 4 centers
460  printf("\n"
461  "Case of 4 nodes, 10 points and 4 centers.\n"
462  "Check the answer...");
463  count = 4 * kmeans->dim;
464  answer = (int*)malloc((size_t)count * sizeof(int));
465  answer[0] = 870;
466  answer[1] = 756;
467  answer[2] = 434;
468  answer[3] = 654;
469  answer[4] = 220;
470  answer[5] = 435;
471  answer[6] = 194;
472  answer[7] = 501;
473  answer[8] = 167;
474  answer[9] = 230;
475  answer[10] = 609;
476  answer[11] = 729;
477  } else if (kmeans->nprocs == 8 &&
478  kmeans->dim == DEF_DIM &&
479  kmeans->grid_size == DEF_GRID_SIZE &&
480  kmeans->n_points == 10 &&
481  kmeans->n_means == 4 &&
482  kmeans->n_iteration >= DEF_NUM_ITERATION) {
483  // 8 nodes, 10 points, 4 centers
484  printf("\n"
485  "Case of 8 nodes, 10 points and 4 centers.\n"
486  "Check the answer...");
487  count = 4 * kmeans->dim;
488  answer = (int*)malloc((size_t)count * sizeof(int));
489  answer[0] = 809;
490  answer[1] = 753;
491  answer[2] = 467;
492  answer[3] = 668;
493  answer[4] = 260;
494  answer[5] = 495;
495  answer[6] = 194;
496  answer[7] = 438;
497  answer[8] = 235;
498  answer[9] = 220;
499  answer[10] = 620;
500  answer[11] = 792;
501  }
502 
503  if (count != 0) {
504  _Bool ok = 1;
505  for (int i = 0; i < count; i++) {
506  if (kmeans->means[i] != answer[i]) {
507  ok = 0;
508  break;
509  }
510  }
511  if (ok) {
512  printf("The answer is correct.\n");
513  } else {
514  printf("The answer is wrong.\n");
515  }
516  free(answer);
517  }
518 }
519 
520 ////////////////////////////////////////////////////////////////////////////////
521 
522 int
523 main(int argc, char **argv)
524 {
525  kmeans_t kmeans;
526  int nprocs, rank;
527  int p_iteration = -1;
528 
529  MPI_Init(&argc, &argv);
530  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
531  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
532 
533  kmr_init();
534  MPI_Info info;
535  MPI_Info_create(&info);
536  MPI_Info_set(info, "ckpt_enable", "1");
537  KMR *mr = kmr_create_context(MPI_COMM_WORLD, info, 0);
538  MPI_Info_free(&info);
539 
540  // Initialize using MPI functions
541  _srand((unsigned int)(rank + 1));
542  //srand((rank + 1) * getpid());
543  if (rank == 0){
544  if (is_restart()) {
545  load_kmeans_state(&kmeans);
546  p_iteration = kmeans.c_iteration;
547  kmeans.c_iteration = 0;
548  printf("*** Restarted ***\n\n");
549  } else {
550  kmeans.nprocs = nprocs;
551  kmeans.n_iteration = DEF_NUM_ITERATION;
552  kmeans.c_iteration = 0;
553  kmeans.grid_size = DEF_GRID_SIZE;
554  kmeans.dim = DEF_DIM;
555  kmeans.n_points = DEF_NUM_POINTS;
556  kmeans.n_means = DEF_NUM_MEANS;
557 
558  parse_args(argc, argv, &kmeans);
559  // set initial centers randomly on rank 0
560  kmeans.means = (int *) malloc((size_t)kmeans.n_means * (size_t)kmeans.dim * sizeof(int));
561  generate_randoms(kmeans.means, kmeans.n_means * kmeans.dim,
562  kmeans.grid_size);
563  }
564 
565  printf("#### Configuration ###########################\n");
566  printf("Number of processes = %d\n", nprocs);
567  printf("Iteration = %d\n", kmeans.n_iteration);
568  printf("Dimension = %d\n", kmeans.dim);
569  printf("Number of clusters = %d\n", kmeans.n_means);
570  printf("Number of points = %d\n", kmeans.n_points);
571  printf("##############################################\n");
572  }
573  MPI_Bcast(&(kmeans.n_iteration), 1, MPI_INT, 0, MPI_COMM_WORLD);
574  MPI_Bcast(&(kmeans.c_iteration), 1, MPI_INT, 0, MPI_COMM_WORLD);
575  MPI_Bcast(&(kmeans.grid_size), 1, MPI_INT, 0, MPI_COMM_WORLD);
576  MPI_Bcast(&(kmeans.dim), 1, MPI_INT, 0, MPI_COMM_WORLD);
577  MPI_Bcast(&(kmeans.n_points), 1, MPI_INT, 0, MPI_COMM_WORLD);
578  MPI_Bcast(&(kmeans.n_means), 1, MPI_INT, 0, MPI_COMM_WORLD);
579  MPI_Bcast(&p_iteration, 1, MPI_INT, 0, MPI_COMM_WORLD);
580  if (rank != 0) {
581  kmeans.means = (int *)malloc((size_t)kmeans.n_means * (size_t)kmeans.dim * sizeof(int));
582  }
583  MPI_Bcast(kmeans.means, kmeans.n_means * kmeans.dim, MPI_INT,
584  0, MPI_COMM_WORLD);
585  if (is_restart() != 1) {
586  // set points randomly on each rank
587  kmeans.points = (int *)malloc((size_t)kmeans.n_points * (size_t)kmeans.dim * sizeof(int));
588  generate_randoms(kmeans.points, kmeans.n_points * kmeans.dim,
589  kmeans.grid_size);
590  }
591  KMR_KVS *kvs_points = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
592  kmr_map_once(kvs_points, (void *)&kmeans, kmr_noopt, 0, generate_points);
593  print_progress("map_onece done", rank, 0);
594 
595  // kernel //
596  struct timeval tv_s, tv_e;
597  double time_diff, total_time = 0.0;
598  while (kmeans.c_iteration < kmeans.n_iteration) {
599  // measure iteration start time
600  if (kmeans.c_iteration > p_iteration) {
601  if (measure_time(&tv_s) == -1) {
602  MPI_Abort(MPI_COMM_WORLD, 1);
603  }
604  }
605 
606  // call kmr_map to calculate cluster
607  KMR_KVS *kvs_c2p = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
608  kmr_map(kvs_points, kvs_c2p, (void *)&kmeans, kmr_inspect,
609  calc_cluster);
610  print_progress("map done", rank, kmeans.c_iteration);
611 
612  // call kmr_shuffle to gather points which belong to the same cluster
613  KMR_KVS *kvs_c2p_s = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
614  kmr_shuffle(kvs_c2p, kvs_c2p_s, kmr_noopt);
615  print_progress("shuffle done", rank, kmeans.c_iteration);
616 
617  // call kmr_reduce to update cluster center
618  KMR_KVS *kvs_cluster = kmr_create_kvs(mr,
619  KMR_KV_INTEGER, KMR_KV_OPAQUE);
620  kmr_reduce(kvs_c2p_s, kvs_cluster, (void *)&kmeans, kmr_noopt,
621  update_cluster);
622  print_progress("reduce done", rank, kmeans.c_iteration);
623 
624  if (mr->ckpt_enable && kmeans.c_iteration > (p_iteration + 1)
625  && kmeans.c_iteration == 4) {
626  if (rank == 0) {
627  fprintf(stderr, "Aborted on rank 0 for testing purpose.\n");
628  MPI_Abort(MPI_COMM_WORLD, 1);
629  }
630  }
631 
632  // cal kmr_replicate to share new cluster centers
633  KMR_KVS *kvs_all_clusters =
634  kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
635  kmr_replicate(kvs_cluster, kvs_all_clusters, kmr_noopt);
636  print_progress("replicate done", rank, kmeans.c_iteration);
637 
638  // call kmr_map to copy new cluster centers to kmeans.means array.
639  kmr_map(kvs_all_clusters, NULL, (void *)&kmeans, kmr_noopt,
640  copy_center);
641 
642  // measure iteration end time
643  if (kmeans.c_iteration > p_iteration) {
644  if (measure_time(&tv_e) == -1) {
645  MPI_Abort(MPI_COMM_WORLD, 1);
646  }
647  if (rank == 0) {
648  time_diff = calc_time_diff(&tv_s, &tv_e);
649  total_time += time_diff;
650  //printf("Iteration[%2d]: Elapse time: %f\n", itr, time_diff);
651  print_means(kmeans.means, kmeans.n_means, kmeans.dim);
652  if (save_kmeans_state(&kmeans)) {
653  MPI_Abort(MPI_COMM_WORLD, 1);
654  }
655  }
656  }
657 
658  if (mr->ckpt_enable && kmeans.c_iteration > p_iteration
659  && kmeans.c_iteration == 1) {
660  if (rank == 0) {
661  fprintf(stderr, "Aborted on rank 0 for testing purpose.\n");
662  MPI_Abort(MPI_COMM_WORLD, 1);
663  }
664  }
665 
666  kmeans.c_iteration += 1;
667  }
668  if (rank == 0) {
669  printf("Total elapse time: %f\n", total_time);
670  check_answer(&kmeans);
671  delete_kmeans_state();
672  }
673 
674  free(kmeans.means);
675  if (is_restart() != 1) {
676  free(kmeans.points);
677  }
678  kmr_free_kvs(kvs_points);
679  kmr_free_context(mr);
680  kmr_fin();
681 
682  MPI_Finalize();
683  return 0;
684 }
Key-Value Stream (abstract).
Definition: kmr.h:632
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:658
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:809
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
Definition: kmrbase.c:1460
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2094
KMR Context.
Definition: kmr.h:247
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:679
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:401
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
#define kmr_init()
Sets up the environment.
Definition: kmr.h:794
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:367
KMR Interface.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2240
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments.
Definition: kmrshell.c:257
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:168