KMR
phoenix-kmeans.c
1 /* Copyright (c) 2007-2009, Stanford University
2 * All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are met:
6 * * Redistributions of source code must retain the above copyright
7 * notice, this list of conditions and the following disclaimer.
8 * * Redistributions in binary form must reproduce the above copyright
9 * notice, this list of conditions and the following disclaimer in the
10 * documentation and/or other materials provided with the distribution.
11 * * Neither the name of Stanford University nor the names of its
12 * contributors may be used to endorse or promote products derived from
13 * this software without specific prior written permission.
14 *
15 * THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY
16 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18 * DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY
19 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 */
26 
27 /* This is an example in "test" directory in Phoenix-2.0.0, converted
28  for KMR (2013-04-24). REWRITTEN MUCH. */
29 
30 /* KMR MEMO: Phoenix map-reduce performs the sequence of split, map,
31  reduce, and (built-in) merge. */
32 
33 #include <mpi.h>
34 #include <stdio.h>
35 #include <strings.h>
36 #include <string.h>
37 #include <stddef.h>
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <assert.h>
41 #include <string.h>
42 #include <math.h>
43 #if 0
44 #include "stddefines.h"
45 #include "map_reduce.h"
46 #endif
47 #include <stdbool.h>
48 #include "kmr.h"
49 
50 #define DEF_NUM_POINTS 100000
51 #define DEF_NUM_MEANS 100
52 #define DEF_DIM 3
53 #define DEF_GRID_SIZE 1000
54 
55 /* Paramters. These constants copied on all nodes. */
56 
57 int dim; // dimension
58 int grid_size; // range (size) in each dimension
59 int num_points; // number of vectors
60 int num_means; // number of clusters
61 
62 int unit_size;
63 
64 /* POINTS is the problem and replicated on all nodes. MEANS is a
65  replicated copy of means, updated at each loop. CLUSTERS indexes
66  in means and is only on rank0. */
67 
68 int *points = 0; // [num_points][dim]
69 int *means = 0; // [num_means][dim]
70 int *clusters = 0; // [num_points]
71 
72 /* Position record in splitter. */
73 
74 int next_point = 0;
75 
76 /* Count of clusters are modified. Total is taken on rank0. */
77 
78 int modified;
79 
80 /* Range of points of each work, and a copy of CLUSTERS in that
81  range. */
82 
83 typedef struct {
84  int start;
85  int length;
86  /* and part of clusters data */
88 
89 #define dprintf(...) fprintf(stdout, __VA_ARGS__)
90 
91 static inline void
92 get_time(struct timeval *tv)
93 {
94  gettimeofday(tv, 0);
95 }
96 
97 /* Prints out the mean values. */
98 
99 static void
100 dump_means(int *_, int size)
101 {
102  for (int i = 0; i < size; i++) {
103  for (int j = 0; j < dim; j++) {
104  dprintf("%5d ", means[i * dim + j]);
105  }
106  dprintf("\n");
107  }
108 }
109 
110 static int
111 safe_atoi(const char *s)
112 {
113  char gomi;
114  int v, cc;
115  assert(s != 0 && s[0] != 0);
116  cc = sscanf(s, "%d%c", &v, &gomi);
117  assert(cc == 1);
118  return v;
119 }
120 
121 /* Parses the user arguments. */
122 
123 static void
124 parse_args(int argc, char **argv)
125 {
126  int c;
127  extern char *optarg;
128  extern int optind;
129 
130  num_points = DEF_NUM_POINTS;
131  num_means = DEF_NUM_MEANS;
132  dim = DEF_DIM;
133  grid_size = DEF_GRID_SIZE;
134 
135  while ((c = getopt(argc, argv, "d:c:p:s:")) != EOF) {
136  switch (c) {
137  case 'd':
138  dim = safe_atoi(optarg);
139  break;
140  case 'c':
141  num_means = safe_atoi(optarg);
142  break;
143  case 'p':
144  num_points = safe_atoi(optarg);
145  break;
146  case 's':
147  grid_size = safe_atoi(optarg);
148  break;
149  case '?':
150  printf("Usage: %s -d <vector dimension> -c <num clusters> -p <num points> -s <max value>\n", argv[0]);
151  exit(1);
152  }
153  }
154 
155  if (dim <= 0 || num_means <= 0 || num_points <= 0 || grid_size <= 0) {
156  printf("Illegal argument value. All values must be numeric and greater than 0\n");
157  exit(1);
158  }
159 
160  printf("Dimension = %d\n", dim);
161  printf("Number of clusters = %d\n", num_means);
162  printf("Number of points = %d\n", num_points);
163  printf("Size of each dimension = %d\n", grid_size);
164 }
165 
166 static int
167 setup_config(const struct kmr_kv_box kv,
168  const KMR_KVS *kvs, KMR_KVS *kvo, void *p, long i_)
169 {
170  assert(kvs == 0 && kv.klen == 0 && kv.vlen == 0);
171  assert(kvo->c.mr->rank == 0);
172  char v[256];
173 
174  snprintf(v, 256, "%d", num_points);
175  kmr_add_string(kvo, "num_points", v);
176  snprintf(v, 256, "%d", num_means);
177  kmr_add_string(kvo, "num_means", v);
178  snprintf(v, 256, "%d", dim);
179  kmr_add_string(kvo, "dim", v);
180  snprintf(v, 256, "%d", grid_size);
181  kmr_add_string(kvo, "grid_size", v);
182 
183  char *k0 = "points";
184  int klen0 = (int)(strlen(k0) + 1);
185  kmr_add_kv1(kvo, k0, klen0, points,
186  ((int)sizeof(int) * num_points * dim));
187 
188  char *k1 = "means";
189  int klen1 = (int)(strlen(k1) + 1);
190  kmr_add_kv1(kvo, k1, klen1, means,
191  ((int)sizeof(int) * num_means * dim));
192 
193  return MPI_SUCCESS;
194 }
195 
196 static void
197 copy_config(KMR_KVS *kvs)
198 {
199  const char *s;
200  int cc;
201  cc = kmr_find_string(kvs, "num_points", &s);
202  assert(cc == MPI_SUCCESS);
203  num_points = safe_atoi(s);
204  cc = kmr_find_string(kvs, "num_means", &s);
205  assert(cc == MPI_SUCCESS);
206  num_means = safe_atoi(s);
207  cc = kmr_find_string(kvs, "dim", &s);
208  assert(cc == MPI_SUCCESS);
209  dim = safe_atoi(s);
210  cc = kmr_find_string(kvs, "grid_size", &s);
211  assert(cc == MPI_SUCCESS);
212  grid_size = safe_atoi(s);
213 
214  if (points != 0) {
215  free(points);
216  }
217  if (means != 0) {
218  free(means);
219  }
220 
221  cc = kmr_find_string(kvs, "points", &s);
222  assert(cc == MPI_SUCCESS);
223  points = malloc(sizeof(int) * (size_t)(num_points * dim));
224  assert(points != 0);
225  memcpy(points, s, (sizeof(int) * (size_t)(num_points * dim)));
226 
227  cc = kmr_find_string(kvs, "means", &s);
228  assert(cc == MPI_SUCCESS);
229  means = malloc(sizeof(int) * (size_t)(num_means * dim));
230  assert(means != 0);
231  memcpy(means, s, (sizeof(int) * (size_t)(num_means * dim)));
232 }
233 
234 /* Generates the points. */
235 
236 static void
237 generate_points(int *pts, int size)
238 {
239  for (int i = 0; i < size; i++) {
240  for (int j = 0; j < dim; j++) {
241  pts[i * dim + j] = (rand() % grid_size);
242  }
243  }
244 }
245 
246 /* Gets the squared distance between two points. */
247 
248 static inline unsigned int
249 get_sq_dist(int *v1, int *v2)
250 {
251  unsigned int sum;
252  sum = 0;
253  for (int i = 0; i < dim; i++) {
254  int p1 = v1[i];
255  int p2 = v2[i];
256  sum += (unsigned int)((p1 - p2) * (p1 - p2));
257  }
258  return sum;
259 }
260 
261 /* Adds vectors. */
262 
263 static inline void
264 add_to_sum(int *sum, int *point)
265 {
266  for (int i = 0; i < dim; i++) {
267  sum[i] += point[i];
268  }
269 }
270 
271 /* Divides points to map tasks. Tasks is represented by a range of
272  partion of points along with associated clusters. UNITS is a chuck
273  size (which will fit to the L1 cache size). It is called until
274  returning 0 to split input data to a number of map tasks. */
275 
276 static int
277 kmeans_splitter(int units, int id, KMR_KVS *kvo)
278 /*int kmeans_splitter(void *data_in, int req_units, map_args_t *out)*/
279 {
280  assert(points != 0 && means != 0 && clusters != 0);
281  assert(units > 0 && kvo != 0);
282 
283  if (next_point >= num_points) {
284  return 0;
285  }
286 
287  int s = next_point;
288  int w;
289  if ((next_point + units) < num_points) {
290  w = units;
291  } else {
292  w = (num_points - next_point);
293  }
294  next_point += units;
295 
296  size_t sz = (sizeof(kmeans_mapdata_t) + (sizeof(int) * (size_t)w));
297  kmeans_mapdata_t *md = malloc(sz);
298  assert(md != 0);
299  md->start = s;
300  md->length = w;
301  int *partofclusters = (void *)((char *)md + sizeof(kmeans_mapdata_t));
302  memcpy(partofclusters, &clusters[s], (sizeof(int) * (size_t)w));
303  struct kmr_kv_box kv = {.klen = (int)sizeof(long),
304  .vlen = (int)sz,
305  .k.i = id,
306  .v.p = (void *)md};
307  kmr_add_kv(kvo, kv);
308  free(md);
309  return 1;
310 }
311 
312 static int
313 phoenix_split(const struct kmr_kv_box kv0,
314  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, long i_)
315 {
316  assert(kvs0 == 0 && kv0.klen == 0 && kv0.vlen == 0 && kvo != 0);
317  const int DEFAULT_CACHE_SIZE = (64 * 1024);
318  int chunk_size = (DEFAULT_CACHE_SIZE / unit_size);
319  next_point = 0;
320  int id = 0;
321  while (kmeans_splitter(chunk_size, id, kvo)) {
322  id++;
323  }
324  return MPI_SUCCESS;
325 }
326 
327 /* KEY_CMP and LOCATOR are not used. */
328 
329 /* Casts by dropping constantness. */
330 
331 #define LOOSE_CAST(X) ((void *)(X))
332 
333 /* Finds the cluster that is most suitable for a given set of points.
334  PARTOFCLUSTERS is a partial copy of CLUSTERS of the range. */
335 
336 static void
337 find_clusters(int start, int length, int *partofclusters, KMR_KVS *kvo)
338 /*void find_clusters(int *points, keyval_t *means, int *clusters, int size)*/
339 {
340  for (int i = 0; i < length; i++) {
341  int p = (start + i);
342  unsigned int mindist = get_sq_dist(&points[p * dim], &means[0]);
343  int minidx = 0;
344  for (int j = 1; j < num_means; j++) {
345  unsigned int d = get_sq_dist(&points[p * dim], &means[j * dim]);
346  if (d < mindist) {
347  mindist = d;
348  minidx = j;
349  }
350  }
351  if (partofclusters[i] != minidx) {
352  partofclusters[i] = minidx;
353  modified++;
354  }
355  struct kmr_kv_box nkv = {.klen = (int)sizeof(long),
356  .vlen = (int)(sizeof(int) * (size_t)dim),
357  .k.i = minidx,
358  .v.p = (const char *)(&points[p * dim])};
359  kmr_add_kv(kvo, nkv);
360  }
361 }
362 
363 /* Maps on the tasks. The value field is the partions of CLUSTERS.
364  It takes the second KVS which is used to share the updates to the
365  partitions of CLUSUTERS. */
366 
367 static int
368 kmeans_map(const struct kmr_kv_box kv,
369  const KMR_KVS *kvs, KMR_KVS *kvo, void *p, long i_)
370 /*void kmeans_map(map_args_t *args)*/
371 {
372  assert((size_t)kv.klen == sizeof(long)
373  && (size_t)kv.vlen > sizeof(kmeans_mapdata_t));
374  kmeans_mapdata_t *md = LOOSE_CAST(kv.v.p);
375  int *partofclusters = (void *)((char *)md + sizeof(kmeans_mapdata_t));
376  find_clusters(md->start, md->length, partofclusters, kvo);
377  KMR_KVS *kvo1 = p;
378  struct kmr_kv_box nkv = {.klen = (int)sizeof(kmeans_mapdata_t),
379  .vlen = (int)(sizeof(int) * (size_t)md->length),
380  .k.p = (void *)md,
381  .v.p = (void *)partofclusters};
382  kmr_add_kv(kvo1, nkv);
383  return MPI_SUCCESS;
384 }
385 
386 /* Updates mean point to the average. A key is an index in the means,
387  and values are points near the mean. */
388 
389 static int
390 reduce_to_average(const struct kmr_kv_box kv[], const long n,
391  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
392 /*void kmeans_reduce(void *key_in, iterator_t *itr)*/
393 {
394  long meanindex = kv[0].k.i;
395  int ave[dim];
396  for (int i = 0; i < dim; i++) {
397  ave[i] = 0;
398  }
399  for (int i = 0; i < n; i++) {
400  assert((size_t)kv[i].vlen == (sizeof(int) * (size_t)dim));
401  assert(kv[0].k.i == meanindex);
402  int *point = (int *)kv[i].v.p;
403  add_to_sum(ave, point);
404  }
405  for (int i = 0; i < dim; i++) {
406  ave[i] = (int)(ave[i] / n);
407  }
408  struct kmr_kv_box nkv = {.klen = (int)sizeof(long),
409  .vlen = (int)(sizeof(int) * (size_t)dim),
410  .k.i = meanindex,
411  .v.p = (void *)ave};
412  kmr_add_kv(kvo, nkv);
413  return MPI_SUCCESS;
414 }
415 
416 /* Stores the point to the MEANS array. */
417 
418 static int
419 store_means(const struct kmr_kv_box kv,
420  const KMR_KVS *kvs, KMR_KVS *kvo, void *p, long i_)
421 {
422  assert((size_t)kv.klen == sizeof(long)
423  && (size_t)kv.vlen == (sizeof(int) * (size_t)dim));
424  long meanindex = kv.k.i;
425  int *point = (void *)kv.v.p;
426  int *m = &means[meanindex * dim];
427  for (int i = 0; i < dim; i++) {
428  m[i] = point[i];
429  }
430  return MPI_SUCCESS;
431 }
432 
433 static int
434 store_clusters(const struct kmr_kv_box kv,
435  const KMR_KVS *kvs, KMR_KVS *kvo, void *p, long i_)
436 {
437  kmeans_mapdata_t *md = LOOSE_CAST(kv.k.p);
438  assert((size_t)kv.klen == sizeof(kmeans_mapdata_t)
439  && (size_t)kv.vlen == (sizeof(int) * (size_t)md->length));
440  int *partofclusters = (void *)kv.v.p;
441  for (int i = 0; i < md->length; i++) {
442  clusters[md->start + i] = partofclusters[i];
443  }
444  return MPI_SUCCESS;
445 }
446 
447 /* Updates MODIFIED by the ior of the collected values. It is called
448  once on rank0. */
449 
450 static int
451 check_modified(const struct kmr_kv_box kv[], const long n,
452  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
453 {
454  /*assert(n == nprocs);*/
455  int m;
456  m = 0;
457  for (int i = 0; i < n; i++) {
458  assert(kv[i].k.i == 0);
459  m += (int)kv[i].v.i;
460  }
461  modified = m;
462  return MPI_SUCCESS;
463 }
464 
465 int
466 main(int argc, char **argv)
467 {
468  srand(20120930);
469 
470  int nprocs, rank, thlv;
471  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
472  /*MPI_Init(&argc, &argv);*/
473  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
474  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
475  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
476 
477  /* Enlarge element size of key-value pairs. */
478  mr->preset_block_size = (6 * 1024 * 1024);
479 
480  struct timeval begin, end;
481 #ifdef TIMING
482  unsigned int library_time = 0;
483  unsigned int inter_library_time = 0;
484 #endif
485 
486  get_time(&begin);
487 
488  if (rank == 0) {
489  parse_args(argc, argv);
490  }
491 
492  if (rank == 0) {
493  /* Generate problem points. */
494  points = malloc(sizeof(int) * (size_t)(num_points * dim));
495  assert(points != 0);
496  generate_points(points, num_points);
497  /* Generate initial means. */
498  means = malloc(sizeof(int) * (size_t)(num_means * dim));
499  assert(means != 0);
500  generate_points(means, num_means);
501  }
502 
503  KMR_KVS *cnf0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
504  kmr_map_on_rank_zero(cnf0, 0, kmr_noopt, setup_config);
505  KMR_KVS *cnf1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
506  kmr_replicate(cnf0, cnf1, kmr_noopt);
507  copy_config(cnf1);
508  kmr_free_kvs(cnf1);
509 
510  unit_size = (int)(sizeof(int) * (size_t)dim);
511 
512  clusters = malloc(sizeof(int) * (size_t)num_points);
513  assert(clusters != 0);
514  memset(clusters, -1, (sizeof(int) * (size_t)num_points));
515 
516  modified = 1;
517 
518  /*map_reduce_init();*/
519 
520 #if 0
521  /* Setup map reduce args */
522  memset(&map_reduce_args, 0, sizeof(map_reduce_args_t));
523  map_reduce_args.task_data = &kmeans_data;
524  map_reduce_args.map = kmeans_map;
525  map_reduce_args.reduce = kmeans_reduce;
526  map_reduce_args.splitter = kmeans_splitter;
527  map_reduce_args.locator = kmeans_locator;
528  map_reduce_args.key_cmp = mykeycmp;
529  map_reduce_args.unit_size = kmeans_data.unit_size;
530  map_reduce_args.partition = NULL; // use default
531  map_reduce_args.result = &kmeans_vals;
532  map_reduce_args.data_size = (num_points + num_means) * dim * sizeof(int);
533  map_reduce_args.L1_cache_size = atoi(GETENV("MR_L1CACHESIZE"));//1024 * 8;
534  map_reduce_args.num_map_threads = atoi(GETENV("MR_NUMTHREADS"));//8;
535  map_reduce_args.num_reduce_threads = atoi(GETENV("MR_NUMTHREADS"));//16;
536  map_reduce_args.num_merge_threads = atoi(GETENV("MR_NUMTHREADS"));//8;
537  map_reduce_args.num_procs = atoi(GETENV("MR_NUMPROCS"));//16;
538  map_reduce_args.key_match_factor = (float)atof(GETENV("MR_KEYMATCHFACTOR"));//2;
539  map_reduce_args.use_one_queue_per_task = true;
540 #endif
541 
542  if (rank == 0) {
543  dprintf("KMeans: Calling MapReduce Scheduler\n");
544  }
545 
546  get_time(&end);
547 
548 #ifdef TIMING
549  fprintf(stderr, "initialize: %u\n", time_diff(&end, &begin));
550 #endif
551 
552  while (modified > 0) {
553  if (rank == 0) {
554  dprintf("number of modified cluster points=%d\n", modified);
555  fflush(stdout);
556  }
557 
558  modified = 0;
559 
560  get_time(&begin);
561  /*map_reduce(&map_reduce_args);*/
562 
563  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
564  kmr_map_on_rank_zero(kvs0, 0, kmr_noopt, phoenix_split);
565 
566  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
567  kmr_shuffle(kvs0, kvs1, kmr_noopt);
568 
569  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
570  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
571  kmr_map(kvs1, kvs2, (void *)kvs3, kmr_noopt, kmeans_map);
572  kmr_add_kv_done(kvs3);
573 
574  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
575  kmr_shuffle(kvs2, kvs4, kmr_noopt);
576 
577  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
578  kmr_reduce(kvs4, kvs5, 0, kmr_noopt, reduce_to_average);
579 
580  /* Replicate and store MEANS. */
581 
582  KMR_KVS *kvs6 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
583  kmr_replicate(kvs5, kvs6, kmr_noopt);
584  kmr_map(kvs6, 0, 0, kmr_noopt, store_means);
585 
586  /* Replicate and store CLUSTERS. */
587 
588  struct kmr_option rankzero = {.rank_zero = 1};
589  KMR_KVS *kvs7 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
590  kmr_replicate(kvs3, kvs7, rankzero);
591  kmr_map(kvs7, 0, 0, kmr_noopt, store_clusters);
592 
593  /* Take sum of the modified flag. */
594 
595  KMR_KVS *kvs8 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
596  struct kmr_kv_box kv = {.klen = sizeof(long),
597  .vlen = sizeof(long),
598  .k.i = 0,
599  .v.i = modified};
600  kmr_add_kv(kvs8, kv);
601  kmr_add_kv_done(kvs8);
602  KMR_KVS *kvs9 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
603  kmr_replicate(kvs8, kvs9, kmr_noopt);
604  kmr_reduce(kvs9, 0, 0, kmr_noopt, check_modified);
605 
606  get_time(&end);
607 
608 #ifdef TIMING
609  library_time += time_diff(&end, &begin);
610 #endif
611 
612 #ifdef TIMING
613  inter_library_time += time_diff(&end, &end);
614 #endif
615  }
616 
617 #ifdef TIMING
618  if (rank == 0) {
619  fprintf(stderr, "library: %u\n", library_time);
620  fprintf(stderr, "inter library: %u\n", inter_library_time);
621  }
622 #endif
623 
624  get_time(&begin);
625 
626  /*map_reduce_finalize();*/
627 
628  if (rank == 0) {
629  dprintf("\n");
630  printf("KMeans: MapReduce Completed\n");
631 
632  dprintf("\n\nFinal means:\n");
633  dump_means(means, num_means);
634  }
635 
636  if (rank == 0) {
637  dprintf("Cleaning up\n");
638  }
639 
640  free(points);
641  free(means);
642  free(clusters);
643 
644  kmr_free_context(mr);
645 
646  get_time(&end);
647 
648 #ifdef TIMING
649  if (rank == 0) {
650  fprintf(stderr, "finalize: %u\n", time_diff(&end, &begin));
651  }
652 #endif
653 
654  MPI_Finalize();
655  return 0;
656 }
Key-Value Stream (abstract).
Definition: kmr.h:632
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:658
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:809
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2094
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:939
KMR Context.
Definition: kmr.h:247
int kmr_add_kv1(KMR_KVS *kvs, void *k, int klen, void *v, int vlen)
Adds a key-value pair as given directly by a pointer.
Definition: kmrbase.c:837
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:679
int kmr_find_string(KMR_KVS *kvi, const char *k, const char **vq)
Finds the key K in the key-value stream KVS.
Definition: kmrmoreops.c:73
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:401
static int safe_atoi(const char *s)
Parses an integer string.
Definition: kmrdp.cpp:370
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:367
KMR Interface.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2240
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments.
Definition: kmrshell.c:257
int kmr_add_string(KMR_KVS *kvs, const char *k, const char *v)
Adds a key-value pair of strings.
Definition: kmrbase.c:971
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
Definition: kmrbase.c:1514
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:168