KMR
kmeans-mrmpi.cpp
1 /* K-Means in MR-MPI (2013-09-19) */
2 
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6 #include <sys/time.h>
7 #include <limits.h>
8 #include <math.h>
9 #include <mpi.h>
10 #include "mapreduce.h"
11 #include "keyvalue.h"
12 
13 #define DEF_NUM_ITERATION 10
14 #define DEF_NUM_POINTS 10000
15 #define DEF_NUM_MEANS 100
16 #define DEF_DIM 3
17 #define DEF_GRID_SIZE 1000
18 
19 using namespace MAPREDUCE_NS;
20 
21 typedef struct {
22  // number of k-means iteration
23  int n_iteration;
24  // size of grid
25  int grid_size;
26  // dimention of points
27  int dim;
28  // number of (initial) points
29  int n_points;
30  // points
31  int *points;
32  // number of means
33  int n_means;
34  // means
35  int *means;
36 } kmeans_t;
37 
38 int
39 measure_time(struct timeval *tv)
40 {
41  MPI_Barrier(MPI_COMM_WORLD);
42  if (gettimeofday(tv, NULL) == -1) {
43  perror("gettimeofday");
44  return -1;
45  }
46  return 0;
47 }
48 
49 double
50 calc_time_diff(struct timeval *tv_s, struct timeval *tv_e)
51 {
52  return ((double)tv_e->tv_sec - (double)tv_s->tv_sec)
53  + ((double)tv_e->tv_usec - (double)tv_s->tv_usec) /1000000.0;
54 }
55 
56 /* Parse commandline arguments */
57 void
58 parse_args(int argc, char **argv, kmeans_t *kmeans)
59 {
60  int c;
61  extern char *optarg;
62 
63  while ((c = getopt(argc, argv, "i:d:c:p:s:")) != EOF) {
64  switch (c) {
65  case 'i':
66  kmeans->n_iteration = atoi(optarg);
67  break;
68  case 'd':
69  kmeans->dim = atoi(optarg);
70  break;
71  case 'c':
72  kmeans->n_means = atoi(optarg);
73  break;
74  case 'p':
75  kmeans->n_points = atoi(optarg);
76  break;
77  case 's':
78  kmeans->grid_size = atoi(optarg);
79  break;
80  case '?':
81  printf("Usage: %s -i <num iteration> -d <vector dimension> "
82  "-c <num clusters> -p <num points> -s <max value>\n", argv[0]);
83  MPI_Abort(MPI_COMM_WORLD, 1);
84  }
85  }
86 
87  if (kmeans->n_iteration <= 0 || kmeans->dim <= 0 || kmeans->n_means <= 0
88  || kmeans->n_points <= 0 || kmeans->grid_size <= 0) {
89  printf("Illegal argument value. All values must be numeric and "
90  "greater than 0\n");
91  MPI_Abort(MPI_COMM_WORLD, 1);
92  }
93 }
94 
95 /* Show statistics of Key-Value */
96 void
97 statistics_kv(int iteration, long kv_cnt)
98 {
99  int rank, nprocs;
100  long *data;
101 
102  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
103  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
104  if (rank == 0) {
105  data = (long *)malloc(nprocs * sizeof(long));
106  } else {
107  data = NULL;
108  }
109  MPI_Gather(&kv_cnt, 1, MPI_LONG, data, 1, MPI_LONG, 0, MPI_COMM_WORLD);
110 
111  if (rank == 0) {
112  int i, min_idx, max_idx;
113  long sum = 0, min = LONG_MAX, max = 0;
114  for (i = 0; i < nprocs; i++) {
115  sum += data[i];
116  if (data[i] < min) {
117  min = data[i];
118  min_idx = i;
119  }
120  if (data[i] > max) {
121  max = data[i];
122  max_idx = i;
123  }
124  }
125  double average = (double)sum / nprocs;
126  double variance = 0.0;
127  for (i = 0; i < nprocs; i++) {
128  variance += ((double)data[i] - average) * ((double)data[i] - average);
129  }
130  variance /= nprocs;
131  double std_dev = sqrt(variance);
132 
133 #if 0
134  printf("Itr[%2d]: KV statistics\n"
135  "\tMin # of KVs is %10ld on Rank[%05d]\n"
136  "\tMax # of KVs is %10ld on Rank[%05d]\n"
137  "\tStd Dev of KVs is %f\n",
138  iteration, min, min_idx, max, max_idx, std_dev);
139 #endif
140  printf("%d,%ld,%ld,%f\n", iteration, min, max, std_dev);
141 
142  free(data);
143  }
144 }
145 
146 /* Generate the points. */
147 void
148 generate_randoms(int *pts, int size, int max_val)
149 {
150  int i;
151  for (i = 0; i < size; i++)
152  pts[i] = (rand() % max_val);
153 }
154 
155 /* MR-MPI map function
156  It copies multi-dimension points to KVS.
157 */
158 void
159 generate_points(int itask, KeyValue *kv, void *ptr)
160 {
161  int i;
162  kmeans_t *kmeans = (kmeans_t *)ptr;
163  int *points = kmeans->points;
164 
165  for (i = 0; i < kmeans->n_points * kmeans->dim; i += kmeans->dim) {
166  kv->add((char *)&i, sizeof(int),
167  (char *)&points[i], kmeans->dim * (int)sizeof(int));
168  }
169 }
170 
171 /* calculate squared distance
172  */
173 unsigned int
174 calc_sq_dist(int *v1, int *v2, int dim)
175 {
176  int i;
177  unsigned int sum = 0;
178  for (i = 0; i < dim; i++)
179  sum += ((v1[i] - v2[i]) * (v1[i] - v2[i]));
180  return sum;
181 }
182 
183 /* MR-MPI map function
184  It calculates cluster of a point stored in kv.
185  It emits a Key-Value whose key is cluster id (index of array
186  "kmeans.means") and value is the point.
187 */
188 void
189 calc_cluster(uint64_t itask, char *key, int keybytes,
190  char *value, int valuebytes, KeyValue *kv, void *ptr)
191 {
192  int i;
193  kmeans_t *kmeans = (kmeans_t *)ptr;
194  int dim = kmeans->dim;
195  int *means = kmeans->means;
196  int n_means = kmeans->n_means;
197  int *point = (int *)value;
198  int min_idx = 0;
199  unsigned int min_dist = calc_sq_dist(point, &means[0], dim);
200 
201  for (i = 1; i < n_means; i++) {
202  unsigned int dist = calc_sq_dist(point, &means[i * dim], dim);
203  if (dist < min_dist) {
204  min_idx = i;
205  min_dist = dist;
206  }
207  }
208  kv->add((char *)&min_idx, sizeof(int),
209  (char *)point, dim * (int)sizeof(int));
210 }
211 
212 /* MR-MPI reduce function
213  It calculates center of clusters.
214  It emits a Key-Value whose key is cluster id and value is the new center.
215 */
216 void
217 update_cluster(char *key, int keybytes, char *multivalue, int nvalues,
218  int *valuebytes, KeyValue *kv, void *ptr)
219 {
220  int i, j;
221  int cid = *key;
222  kmeans_t *kmeans = (kmeans_t *)ptr;
223  int *points = (int *)multivalue;
224  int dim = kmeans->dim;
225  int average[dim];
226 
227  for (i = 0; i < dim; i++)
228  average[i] = 0;
229  for (i = 0; i < nvalues; i++) {
230  int *point = &points[i * dim];
231  for (j = 0; j < dim; j++) {
232  average[j] += point[j];
233  }
234  }
235  for (i = 0; i < dim; i++)
236  average[i] /= nvalues;
237 
238  kv->add((char *)&cid, sizeof(int),
239  (char *)average, dim * (int)sizeof(int));
240 }
241 
242 /* MR-MPI map function
243  It copies centers of clusters stored in kv to "kmeans.means" array.
244 */
245 void
246 copy_center(uint64_t itask, char *key, int keybytes,
247  char *value, int valuebytes, KeyValue *kv, void *ptr)
248 {
249  int i;
250  int cid = *key;
251  int *center = (int *)value;
252  kmeans_t *kmeans = (kmeans_t *)ptr;
253  int dim = kmeans->dim;
254  int *target = &kmeans->means[cid * dim];
255 
256  for (i = 0; i < dim; i++)
257  target[i] = center[i];
258 }
259 
260 #if defined(DEBUG) || defined(FULL_DEBUG)
261 /* MR-MPI scan function
262  It shows points.
263 */
264 void
265 show_points(char *key, int keybytes, char *value, int valuebytes, void *ptr)
266 {
267  int i;
268  char buf[100], buf2[10];
269  kmeans_t *kmeans = (kmeans_t *)ptr;
270  int *point = (int *)value;
271 
272  memset(buf, 0, strlen(buf));
273  for (i = 0; i < kmeans->dim; i++) {
274  sprintf(buf2, "%d ", point[i]);
275  strncat(buf, buf2, strlen(buf2));
276  }
277  fprintf(stderr, "( %s)\n", buf);
278 }
279 
280 /* MR-MPI scan function
281  It shows points with their cluster.
282 */
283 void
284 show_points_w_clusters(char *key, int keybytes,
285  char *value, int valuebytes, void *ptr)
286 {
287  int i;
288  char buf[100], buf2[10];
289  kmeans_t *kmeans = (kmeans_t *)ptr;
290  int cid = *key;
291  int *point = (int *)value;
292  int rank;
293 
294  memset(buf, 0, strlen(buf));
295  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
296  for (i = 0; i < kmeans->dim; i++) {
297  sprintf(buf2, "%d ", point[i]);
298  strncat(buf, buf2, strlen(buf2));
299  }
300  fprintf(stderr, "RANK[%d]: %3d ( %s)\n", rank, cid, buf);
301 }
302 
303 /* MR-MPI scan function
304  It shows points with their cluster. KMV version.
305 */
306 void
307 show_points_w_clusters_m(char *key, int keybytes,
308  char *multivalue, int nvalues, int *valuebytes,
309  void *ptr)
310 {
311  int i, j;
312  char buf[100], buf2[10];
313  kmeans_t *kmeans = (kmeans_t *)ptr;
314  int cid = *key;
315  int *points = (int *)multivalue;
316  int rank;
317 
318  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
319  for (j = 0; j < nvalues; j++) {
320  memset(buf, 0, strlen(buf));
321  int *point = &points[j * kmeans->dim];
322  for (i = 0; i < kmeans->dim; i++) {
323  sprintf(buf2, "%d ", point[i]);
324  strncat(buf, buf2, strlen(buf2));
325  }
326  fprintf(stderr, "RANK[%d]: %3d ( %s)\n", rank, cid, buf);
327  }
328 }
329 
330 void
331 print_means(int *means, int size, int dim)
332 {
333  int i, j;
334  for (i = 0; i < size; i++) {
335  int *mean = &means[i * dim];
336  printf("( ");
337  for (j = 0; j < dim; j++)
338  printf("%d ", mean[j]);
339  printf(")\n");
340  }
341 }
342 #endif
343 
344 ////////////////////////////////////////////////////////////////////////////////
345 
346 int
347 main(int argc, char **argv)
348 {
349  kmeans_t kmeans;
350  int nprocs, rank;
351 
352  MPI_Init(&argc, &argv);
353  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
354  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
355 
356  // Initialize using MPI functions
357  srand((rank + 1) * getpid());
358  if (rank == 0) {
359  kmeans.n_iteration = DEF_NUM_ITERATION;
360  kmeans.grid_size = DEF_GRID_SIZE;
361  kmeans.dim = DEF_DIM;
362  // TODO decide the number of points assigned to each procs randomly
363  kmeans.n_points = DEF_NUM_POINTS;
364  kmeans.n_means = DEF_NUM_MEANS;
365 
366  parse_args(argc, argv, &kmeans);
367 
368  printf("#### Configuration ###########################\n");
369  printf("Number of processes = %d\n", nprocs);
370  printf("Iteration = %d\n", kmeans.n_iteration);
371  printf("Dimension = %d\n", kmeans.dim);
372  printf("Number of clusters = %d\n", kmeans.n_means);
373  printf("Number of points = %d\n", kmeans.n_points);
374  printf("##############################################\n");
375  }
376  MPI_Bcast(&(kmeans.n_iteration), 1, MPI_INT, 0, MPI_COMM_WORLD);
377  MPI_Bcast(&(kmeans.grid_size), 1, MPI_INT, 0, MPI_COMM_WORLD);
378  MPI_Bcast(&(kmeans.dim), 1, MPI_INT, 0, MPI_COMM_WORLD);
379  MPI_Bcast(&(kmeans.n_points), 1, MPI_INT, 0, MPI_COMM_WORLD);
380  MPI_Bcast(&(kmeans.n_means), 1, MPI_INT, 0, MPI_COMM_WORLD);
381  // set initial centers randomly on rank 0
382  kmeans.means = (int *)malloc(kmeans.n_means * kmeans.dim * sizeof(int));
383  if (rank == 0) {
384  generate_randoms(kmeans.means, kmeans.n_means * kmeans.dim,
385  kmeans.grid_size);
386  }
387  MPI_Bcast(kmeans.means, kmeans.n_means * kmeans.dim, MPI_INT,
388  0, MPI_COMM_WORLD);
389  // set points randomly on each rank
390  kmeans.points = (int *)malloc(kmeans.n_points * kmeans.dim * sizeof(int));
391  generate_randoms(kmeans.points, kmeans.n_points * kmeans.dim,
392  kmeans.grid_size);
393 
394  struct timeval tv_ts, tv_te, tv_s, tv_e;
395  // measure kernel start time
396  if (measure_time(&tv_ts) == -1) {
397  MPI_Abort(MPI_COMM_WORLD, 1);
398  }
399 
400  // kernel //
401  int itr = 0;
402  while (itr < kmeans.n_iteration) {
403  MapReduce *mr = new MapReduce(MPI_COMM_WORLD);
404  mr->memsize=512;
405  mr->outofcore = -1;
406  //mr->verbosity = 2;
407  //mr->timer = 1;
408  mr->map(nprocs, generate_points, (void *)&kmeans);
409  statistics_kv(itr + 1, mr->kv->nkv);
410 
411  // measure iteration start time
412  if (measure_time(&tv_s) == -1) {
413  MPI_Abort(MPI_COMM_WORLD, 1);
414  }
415 
416 #ifdef DEBUG
417  if (rank == 0) {
418  printf("Iteration[%2d]: Means\n", itr);
419  print_means(kmeans.means, kmeans.n_means, kmeans.dim);
420  }
421 #endif
422 #ifdef FULL_DEBUG
423  fprintf(stderr, "Iteration[%2d]: Points\n", itr);
424  mr->scan(show_points, (void *)&kmeans);
425 #endif
426 
427  // call map to calculate cluster
428  mr->map(mr, calc_cluster, (void *)&kmeans);
429 #ifdef FULL_DEBUG
430  fprintf(stderr, "Iteration[%2d]: Map result\n", itr);
431  mr->scan(show_points_w_clusters, (void *)&kmeans);
432 #endif
433 
434  // call callate (acutually aggregate & convert) to gather points which
435  // belong to the same cluster
436  mr->aggregate(NULL);
437 #ifdef FULL_DEBUG
438  fprintf(stderr, "Iteration[%2d]: Collate(aggregate) result\n", itr);
439  mr->scan(show_points_w_clusters, (void *)&kmeans);
440 #endif
441 
442  // call callate (acutually aggregate & convert) to gather points which
443  // belong to the same cluster
444  mr->convert();
445 #ifdef FULL_DEBUG
446  fprintf(stderr, "Iteration[%2d]: Collate(convert) result\n", itr);
447  mr->scan(show_points_w_clusters_m, (void *)&kmeans);
448 #endif
449 
450  // call reduce to update cluster center
451  mr->reduce(update_cluster, (void *)&kmeans);
452 #ifdef FULL_DEBUG
453  fprintf(stderr, "Iteration[%2d]: Reduce result\n", itr);
454  mr->scan(show_points_w_clusters, (void *)&kmeans);
455 #endif
456 
457  // cal gather & broadcast to share new cluster centers
458  mr->gather(1);
459  mr->broadcast(0);
460 #ifdef FULL_DEBUG
461  fprintf(stderr, "Iteration[%2d]: Gather & Broadcast result\n", itr);
462  mr->scan(show_points_w_clusters, (void *)&kmeans);
463 #endif
464 
465  // call map to copy new cluster centers to kmeans.means array.
466  mr->map(mr, copy_center, (void *)&kmeans);
467  delete mr;
468  itr += 1;
469 
470  // measure iteration end time
471  if (measure_time(&tv_e) == -1) {
472  MPI_Abort(MPI_COMM_WORLD, 1);
473  }
474  if (rank == 0) {
475  double time_diff = calc_time_diff(&tv_s, &tv_e);
476  printf("Iteration[%2d]: Elapse time: %f\n", itr, time_diff);
477  }
478  }
479 
480  // measure kernel end time
481  if (measure_time(&tv_te) == -1) {
482  MPI_Abort(MPI_COMM_WORLD, 1);
483  }
484 
485  if (rank == 0) {
486  double total_time = calc_time_diff(&tv_ts, &tv_te);
487  printf("Total elapse time: %f\n", total_time);
488  }
489 
490  MPI_Finalize();
491  return 0;
492 }
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments.
Definition: kmrshell.c:257