KMR
kmrfiles.c
Go to the documentation of this file.
1 /* kmrfiles.c (2014-02-04) */
2 /* Copyright (C) 2012-2018 RIKEN R-CCS */
3 
4 /** \file kmrfiles.c File Access Support. This provides mappers
5  working on files and directories, especially provides support for
6  the file-system configuration on K. The access practice on the
7  file-system on K has affinity to the z-axis of the TOFU network,
8  to lessen the disturbance to the communication of the other users.
9  Thus, I/O-groups are formed by the nodes on the same z-axis. To
10  respect this access practice, accessing a file should be by nodes
11  with particular positions. The routines defined here ease these
12  coupling. MEMO: This part uses MPI routines directly, because
13  messages here are not eight-byte aligned. */
14 
15 /* _GNU_SOURCE is needed for "pread()" and "getline()". */
16 
17 #if defined(__linux__)
18 #define _GNU_SOURCE
19 #endif
20 
21 #include <mpi.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <unistd.h>
26 #include <libgen.h>
27 #include <limits.h>
28 #include <errno.h>
29 #include <assert.h>
30 #include <fcntl.h>
31 #include <dirent.h>
32 #include <sys/types.h>
33 #include <sys/param.h>
34 #include <sys/stat.h>
35 #include "kmr.h"
36 #include "kmrimpl.h"
37 #include "kmrfefs.h"
38 
39 #define MIN(a,b) (((a)<(b))?(a):(b))
40 #define MAX(a,b) (((a)>(b))?(a):(b))
41 #define ABS(X) ((X) < 0 ? -(X) : (X))
42 #define CEILING(N,D) (((N)+(D)-1)/(D))
43 #define NEVERHERE 0
44 
45 /** Read size limit. Large read at once slows down on K, due to the
46  limited amount of the kernel buffer (rumored as 128MB). */
47 
48 #define CHUNK_LIMIT (16 * 1024 *1024)
49 
50 static const struct kmr_fefs_stripe_info
51 kmr_bad_stripe = {.size=0, .count=0, .offset=0};
52 
53 /** Segment Reading Information of Each Rank. Entries are stored in
54  an array of a group of ranks who participate in the same
55  collective read. An entry is associated to RANK. COLOR is a
56  given color value (COLOR!=-1). IOGROUP is an I/O-group of a rank.
57  INO and SIZE are about a file. READS is an amount of a read.
58  OFFSET holds a byte offset of a read. STRIPE holds an index in a
59  stripe, where STRIPE=-1 when a rank does not perform reads.
60  INGESTING and DIGESTING indicates what a rank performs. HEAD is
61  true on a rank which is the minimum in the same color and performs
62  reads. A head rank prints diagnostics/debugging messages. */
63 
65  int rank;
66  int color;
67  int iogroup;
68  ino_t ino;
69  off_t size;
70  off_t reads;
71  long offset;
72  int stripe;
73  _Bool ingesting;
74  _Bool digesting;
75  _Bool head;
76 };
77 
78 /* ================================================================ */
79 
80 /* Returns an I/O-group (an integer key) of a compute node or an I/O
81  node from a TOFU-coordinate. It projects a TOFU-coordinate to the
82  X-Y. */
83 
84 static int
85 kmr_iogroup(kmr_k_position_t p)
86 {
87  assert(p[0] < 0x7fff && p[1] < 0x7fff);
88  return (p[0] << 16 | p[1]);
89 }
90 
91 /* Returns a distance (manhattan-distance) between I/O-groups. */
92 
93 int
94 kmr_iogroup_distance(int a0, int a1)
95 {
96  int x0 = ((a0 >> 16) & 0xffff);
97  int y0 = (a0 & 0xffff);
98  int x1 = ((a1 >> 16) & 0xffff);
99  int y1 = (a1 & 0xffff);
100  return (ABS(x0 - y0) + ABS(x1 - y1));
101 }
102 
103 /** Returns an I/O-group (an integer key) of a compute node. */
104 
105 int
107 {
108  int cc;
110  cc = kmr_k_node(mr, p);
111  assert(cc == MPI_SUCCESS);
112  return kmr_iogroup(p);
113 }
114 
115 /** Returns an I/O-group (an integer key) of a disk from an OBDIDX of
116  Lustre file-system. It uses magic expressions (x=obdidx/2048) and
117  (y=(obdidx%2048)/64). */
118 
119 int
121 {
122  int x = (obdidx / 2048);
123  int y = ((obdidx % 2048) / 64);
124  kmr_k_position_t p = {(unsigned short)x, (unsigned short)y, 0, 0};
125  return kmr_iogroup(p);
126 }
127 
128 static inline void
129 kmr_assert_file_readers_are_sorted(struct kmr_file_reader *sgv, long n)
130 {
131  for (long i = 1; i < n; i++) {
132  assert(sgv[i - 1].rank <= sgv[i].rank);
133  }
134 }
135 
136 static int
137 kmr_file_reader_compare(const void *p0, const void *p1)
138 {
139  const struct kmr_file_reader *q0 = p0;
140  const struct kmr_file_reader *q1 = p1;
141  return (q0->rank - q1->rank);
142 }
143 
144 static int
145 kmr_copyout_file_readers(const struct kmr_kv_box kv[], const long n,
146  const KMR_KVS *kvi, KMR_KVS *kvo, void *p)
147 {
148  assert(n > 0);
149  int cc;
150  struct kmr_file_reader sgv[n];
151  long segment = kv[0].k.i;
152  for (long i = 0; i < n; i++) {
153  struct kmr_file_reader *a = (struct kmr_file_reader *)kv[i].v.p;
154  sgv[i] = *a;
155  }
156  qsort(sgv, (size_t)n, sizeof(struct kmr_file_reader),
157  kmr_file_reader_compare);
158  kmr_assert_file_readers_are_sorted(sgv, n);
159  size_t vlen = (sizeof(struct kmr_file_reader) * (size_t)n);
160  assert(vlen <= INT_MAX);
161  struct kmr_kv_box nkv = {
162  .klen = (int)sizeof(long),
163  .vlen = (int)vlen,
164  .k.i = segment,
165  .v.p = (char *)sgv
166  };
167  cc = kmr_add_kv(kvo, nkv);
168  assert(cc == MPI_SUCCESS);
169  return MPI_SUCCESS;
170 }
171 
172 /* Shares file segment information, which indicates which part
173  (position and size) is read by which rank. It returns an array of
174  information of the related ranks in SGV of COLORSETSIZE entries.
175  COLORSETSIZE is set to the number of ranks having the same color.
176  The information of inactive ranks have no entries in SGV. The work
177  is independent when the colors differ. COLORINDEX is an index in
178  SGV of the local rank. COLORINDEX=-1, when the local rank is
179  inactive (not perticipating in ingesting nor digesting). The SGV
180  should be freed by callers. OFFSET and STRIPE information in SGV
181  should be filled later. */
182 
183 static int
184 kmr_share_segment_information(KMR *mr, char *file, int color,
185  _Bool ingesting, _Bool digesting,
186  off_t offset, off_t bytes,
187  struct kmr_file_reader **sgvq,
188  int *colorsetsizeq, int *colorindexq)
189 {
190  assert(sgvq != 0 && colorsetsizeq != 0);
191  assert(sizeof(ino_t) <= sizeof(long));
192  int cc;
193  int nprocs = mr->nprocs;
194  int rank = mr->rank;
195  _Bool active = (color != -1);
196  assert(active == (ingesting || digesting));
197 
198  /* Check accessibility and get read size of a file. */
199 
200  ino_t ino = 0;
201  off_t fsz = 0;
202  off_t reads = 0;
203  if (ingesting) {
204  struct stat s;
205  do {
206  cc = stat(file, &s);
207  } while (cc == -1 && errno == EINTR);
208  if (cc == 0) {
209  /* Existent and accessible, OK. */
210  if ((s.st_mode & S_IFMT) != S_IFREG) {
211  char ee[160];
212  snprintf(ee, sizeof(ee),
213  "File (%s) is not a regular file", file);
214  kmr_error(mr, ee);
215  }
216  ino = s.st_ino;
217  fsz = s.st_size;
218  } else if (errno == ENOENT) {
219  /* Non-existent. */
220  assert(cc == -1);
221  char ee[160];
222  snprintf(ee, sizeof(ee), "File (%s) does not exist", file);
223  kmr_error(mr, ee);
224  ino = 0;
225  fsz = 0;
226  } else if (errno == EACCES
227  || errno == ELOOP
228  || errno == ENOLINK
229  || errno == ENOTDIR) {
230  /* Non-existent (Really, inaccessible). */
231  assert(cc == -1);
232  char ee[160];
233  char *m = strerror(errno);
234  snprintf(ee, sizeof(ee), "File (%s) inaccessible: %s", file, m);
235  kmr_error(mr, ee);
236  ino = 0;
237  fsz = 0;
238  } else {
239  /* STAT errs. */
240  assert(cc == -1);
241  char ee[160];
242  char *m = strerror(errno);
243  snprintf(ee, sizeof(ee), "stat(%s): %s", file, m);
244  kmr_error(mr, ee);
245  ino = 0;
246  fsz = 0;
247  }
248  reads = ((bytes == -1) ? (fsz - offset) : bytes);
249  } else {
250  reads = 0;
251  }
252 
253  /* Collect read size information of each node. */
254 
255  struct kmr_file_reader *sgv = 0;
256  int colorsetsize = 0;
257  {
258  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
259  if (active) {
260  struct kmr_file_reader seg = {
261  .rank = rank,
262  .color = color,
263  .iogroup = kmr_iogroup_of_node(mr),
264  .ino = ino,
265  .size = fsz,
266  .reads = reads,
267  .offset = 0,
268  .stripe = -1,
269  .ingesting = ingesting,
270  .digesting = digesting,
271  .head = 0
272  };
273  struct kmr_kv_box nkv = {
274  .klen = (int)sizeof(long),
275  .vlen = (int)sizeof(struct kmr_file_reader),
276  .k.i = color,
277  .v.p = (char *)&seg
278  };
279  cc = kmr_add_kv(kvs0, nkv);
280  assert(cc == MPI_SUCCESS);
281  }
282  kmr_add_kv_done(kvs0);
283  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
284  cc = kmr_shuffle(kvs0, kvs1, kmr_noopt);
285  assert(cc == MPI_SUCCESS);
286  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
287  cc = kmr_reduce(kvs1, kvs2, 0, kmr_noopt, kmr_copyout_file_readers);
288  assert(cc == MPI_SUCCESS);
289  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
290  cc = kmr_replicate(kvs2, kvs3, kmr_noopt);
291  assert(cc == MPI_SUCCESS);
292  if (active) {
293  struct kmr_kv_box key = {
294  .klen = (int)sizeof(long),
295  .vlen = 0,
296  .k.i = color,
297  .v.i = 0
298  };
299  struct kmr_kv_box kv;
300  cc = kmr_find_key(kvs3, key, &kv);
301  assert(cc == MPI_SUCCESS);
302  assert((kv.vlen % (int)sizeof(struct kmr_file_reader)) == 0);
303  colorsetsize = (kv.vlen / (int)sizeof(struct kmr_file_reader));
304  size_t sz = (sizeof(struct kmr_file_reader)
305  * (size_t)colorsetsize);
306  sgv = kmr_malloc(sz);
307  memcpy(sgv, kv.v.p, sz);
308  } else {
309  colorsetsize = 0;
310  sgv = 0;
311  }
312  cc = kmr_free_kvs(kvs3);
313  assert(cc == MPI_SUCCESS);
314  kmr_assert_file_readers_are_sorted(sgv, colorsetsize);
315  }
316 
317  /* Marks the leader entry. */
318 
319  {
320  int r0 = nprocs;
321  for (int k = 0; k < colorsetsize; k++) {
322  if (sgv[k].ingesting) {
323  r0 = MIN(r0, sgv[k].rank);
324  }
325  }
326  assert(colorsetsize == 0 || r0 < nprocs);
327  for (int k = 0; k < colorsetsize; k++) {
328  if (sgv[k].rank == r0) {
329  sgv[k].head = 1;
330  }
331  }
332  }
333 
334  /* Find information of the local rank. */
335 
336  int colorindex = -1;
337  for (int k = 0; k < colorsetsize; k++) {
338  if (sgv[k].rank == rank) {
339  colorindex = k;
340  break;
341  }
342  }
343  assert(active == (colorindex != -1));
344 
345  *sgvq = sgv;
346  *colorsetsizeq = colorsetsize;
347  *colorindexq = colorindex;
348  return MPI_SUCCESS;
349 }
350 
351 /* Takes the maximum repeat count of reads for all the colors. It
352  takes global maximum in all colors because each color works
353  independently. */
354 
355 static int
356 kmr_take_maximum_loop_count(KMR *mr, off_t reads,
357  struct kmr_fefs_stripe *stripe,
358  long *maxloopsq)
359 {
360  int cc;
361  long maxloops = 0;
362  const off_t singlestripe = (stripe->s.size * stripe->s.count);
363  KMR_KVS *kvs6 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
364  if (singlestripe != 0) {
365  long nloops = CEILING(reads, singlestripe);
366  struct kmr_kv_box nkv = {
367  .klen = (int)sizeof(long),
368  .vlen = (int)sizeof(long),
369  .k.i = 0,
370  .v.i = nloops
371  };
372  cc = kmr_add_kv(kvs6, nkv);
373  assert(cc == MPI_SUCCESS);
374  }
375  kmr_add_kv_done(kvs6);
376  KMR_KVS *kvs7 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
377  cc = kmr_replicate(kvs6, kvs7, kmr_noopt);
378  assert(cc == MPI_SUCCESS);
379  cc = kmr_reduce_as_one(kvs7, 0, &maxloops, kmr_noopt,
380  kmr_imax_one_fn);
381  assert(cc == MPI_SUCCESS);
382  assert(maxloops > 0);
383  *maxloopsq = maxloops;
384  return MPI_SUCCESS;
385 }
386 
387 /* Performs reads and gathers data. REASSEMBLING is just used for
388  trace messages. BASEOFFSET is the position of the start of reads.
389  BUFFER is area to collect the result. TOTALSIZE is the size of the
390  data to be returned, and BUFFER should have TOTALSIZE. TMPBUF is a
391  temporary buffer, whose size is STRIPE.SIZE and each single read is
392  limited by it. MAXLOOPS is a loop count. SGV and COLORSETSIZE are
393  information of file readers. STRIPE is striping information.
394  RANKINDEXES holds an index in a stripe of each rank. Ranks with
395  RANKINDEXES=-1 does not perform reads. */
396 
397 static int
398 kmr_read_and_gather(KMR *mr, _Bool reassembling,
399  char *file, int fd, off_t baseoffset,
400  char *buffer, off_t totalsize, char *tmpbuf,
401  struct kmr_fefs_stripe *stripe,
402  long maxloops,
403  struct kmr_file_reader *sgv, int colorsetsize,
404  int colorindex)
405 {
406  int nprocs = mr->nprocs;
407  int rank = mr->rank;
408  const off_t singlestripe = (stripe->s.size * stripe->s.count);
409  _Bool tracing5 = (mr->trace_file_io && (5 <= mr->verbosity));
410  _Bool tracing7 = (mr->trace_file_io && (7 <= mr->verbosity));
411 #if 0
412  long *scnts = kmr_malloc(sizeof(long) * (size_t)nprocs);
413  long *sdsps = kmr_malloc(sizeof(long) * (size_t)nprocs);
414  long *rcnts = kmr_malloc(sizeof(long) * (size_t)nprocs);
415  long *rdsps = kmr_malloc(sizeof(long) * (size_t)nprocs);
416 #endif
417  int *scnts = kmr_malloc(sizeof(int) * (size_t)nprocs);
418  int *sdsps = kmr_malloc(sizeof(int) * (size_t)nprocs);
419  int *rcnts = kmr_malloc(sizeof(int) * (size_t)nprocs);
420  int *rdsps = kmr_malloc(sizeof(int) * (size_t)nprocs);
421  int cc;
422  assert(reassembling || (baseoffset == 0));
423  _Bool head = ((colorindex != -1) ? sgv[colorindex].head : 0);
424 
425  int sgindex = -1;
426  for (int k = 0; k < colorsetsize; k++) {
427  if (sgv[k].rank == rank) {
428  sgindex = k;
429  break;
430  }
431  }
432  _Bool ingesting = ((sgindex != -1) && sgv[sgindex].stripe != -1);
433  _Bool digesting = ((sgindex != -1) && sgv[sgindex].digesting);
434 
435  if (tracing5 && rank == 0) {
436  fprintf(stderr, ";;KMR [%05d] file-read: maxloops=%zd\n",
437  rank, maxloops);
438  fflush(0);
439  }
440  if (tracing5 && !reassembling && head) {
441  int nreaders = 0;
442  for (int k = 0; k < colorsetsize; k++) {
443  //int r = sgv[k].rank;
444  if (sgv[k].stripe != -1) {
445  nreaders++;
446  }
447  }
448  for (int k = 0; k < colorsetsize; k++) {
449  int r = sgv[k].rank;
450  if (sgv[k].stripe != -1) {
451  fprintf(stderr,
452  ";;KMR [%05d] file-read:"
453  " stripe unit index=%d/%d (color=%d nprocs=%d)\n",
454  r, sgv[k].stripe, nreaders,
455  sgv[k].color, colorsetsize);
456  }
457  }
458  fflush(0);
459  }
460 
461  int ndigestings = 0;
462  int segi = -1;
463  for (int k = 0; k < colorsetsize; k++) {
464  if (sgv[k].rank == rank) {
465  segi = k;
466  }
467  if (sgv[k].digesting) {
468  ndigestings++;
469  }
470  }
471  assert(colorsetsize == 0 || segi >= 0);
472 
473  /* Cycle reading one stripe. */
474 
475  for (long i = 0; i < maxloops; i++) {
476 
477  /* Read segments locally. */
478 
479  off_t start = (i * singlestripe);
480  off_t pos0;
481  off_t cnt0;
482  if (ingesting) {
483  assert(sgindex != -1);
484  pos0 = start + ((long)stripe->s.size * sgv[sgindex].stripe);
485  cnt0 = MAX(MIN((sgv[segi].reads - pos0), stripe->s.size), 0);
486  } else {
487  pos0 = 0;
488  cnt0 = 0;
489  }
490 
491  if (tracing5) {
492  if (cnt0 > 0) {
493  fprintf(stderr,
494  ";;KMR [%05d] file-read: offset=%zd size=%zd"
495  " (file=%s)\n",
496  rank, (baseoffset + pos0), cnt0, file);
497  } else {
498  fprintf(stderr, ";;KMR [%05d] file-read: (noread)\n", rank);
499  }
500  fflush(0);
501  }
502 
503  if (cnt0 > 0) {
504  for (off_t rx = 0; rx < cnt0;) {
505  ssize_t cx1;
506  do {
507  off_t chunk = MIN((cnt0 - rx), CHUNK_LIMIT);
508  cx1 = pread(fd, (tmpbuf + rx), (size_t)chunk,
509  (baseoffset + pos0 + rx));
510  } while (cx1 == -1 && errno == EINTR);
511  if (cx1 == -1) {
512  char ee[160];
513  char *m = strerror(errno);
514  snprintf(ee, sizeof(ee), "read(%s): %s", file, m);
515  kmr_error(mr, ee);
516  }
517  rx += cx1;
518  }
519  }
520 
521  /* Collect one stripe (TMPBUF to BUFFER). */
522 
523  {
524  for (int r = 0; r < nprocs; r++) {
525  scnts[r] = 0;
526  sdsps[r] = 0;
527  rcnts[r] = 0;
528  rdsps[r] = 0;
529  }
530  //off_t cnt0roundup = ROUNDUP(cnt0, 8);
531  for (int k = 0; k < colorsetsize; k++) {
532  int r = sgv[k].rank;
533  long sendsz = (sgv[k].digesting ? cnt0 : 0);
534  scnts[r] = (int)sendsz;
535  sdsps[r] = 0;
536  assert(sendsz <= stripe->s.size);
537  assert(sendsz <= INT_MAX);
538  }
539  for (int k = 0; k < colorsetsize; k++) {
540  int r = sgv[k].rank;
541  off_t cntx;
542  off_t offx;
543  if (digesting && sgv[k].stripe != -1) {
544  off_t posx = start + ((int)stripe->s.size * sgv[k].stripe);
545  cntx = MAX(MIN((sgv[k].reads - posx), stripe->s.size), 0);
546  offx = ((cntx != 0) ? (start + sgv[k].offset) : 0);
547  } else {
548  cntx = 0;
549  offx = 0;
550  }
551  //off_t cntxroundup = ROUNDUP(cntx, 8);
552  rcnts[r] = (int)cntx;
553  rdsps[r] = (int)(offx - start);
554  assert((offx + cntx) <= totalsize);
555  assert(cntx <= INT_MAX && (offx - start) <= INT_MAX);
556  }
557 
558  if (tracing7) {
559  for (int r = 0; r < nprocs; r++) {
560  if (scnts[r] != 0 || rcnts[r] != 0) {
561  fprintf(stderr,
562  (";;KMR [%05d] file-read: data exchange:"
563  " *<->[%05d] send=%d (disp=%d);"
564  " recv=%d disp=%zd\n"),
565  rank, r, scnts[r], sdsps[r],
566  rcnts[r], (start + rdsps[r]));
567  }
568  }
569  fflush(0);
570  }
571 
572  char *bufptr = (buffer + start);
573  if (ndigestings == nprocs && !mr->file_io_always_alltoallv) {
574 #if 0
575  cc = kmr_allgatherv(mr, 0, tmpbuf, cnt0,
576  buffer, rcnts, rdsps);
577  assert(cc == MPI_SUCCESS);
578 #endif
579  cc = MPI_Allgatherv(tmpbuf, (int)cnt0, MPI_BYTE,
580  bufptr, rcnts, rdsps, MPI_BYTE,
581  mr->comm);
582  assert(cc == MPI_SUCCESS);
583  } else {
584 #if 0
585  cc = kmr_alltoallv(mr, tmpbuf, scnts, sdsps,
586  buffer, rcnts, rdsps);
587  assert(cc == MPI_SUCCESS);
588 #endif
589  cc = MPI_Alltoallv(tmpbuf, scnts, sdsps, MPI_BYTE,
590  bufptr, rcnts, rdsps, MPI_BYTE,
591  mr->comm);
592  assert(cc == MPI_SUCCESS);
593  }
594  }
595  }
596 #if 0
597  kmr_free(scnts, (sizeof(long) * (size_t)nprocs));
598  kmr_free(sdsps, (sizeof(long) * (size_t)nprocs));
599  kmr_free(rcnts, (sizeof(long) * (size_t)nprocs));
600  kmr_free(rdsps, (sizeof(long) * (size_t)nprocs));
601 #endif
602  kmr_free(scnts, (sizeof(int) * (size_t)nprocs));
603  kmr_free(sdsps, (sizeof(int) * (size_t)nprocs));
604  kmr_free(rcnts, (sizeof(int) * (size_t)nprocs));
605  kmr_free(rdsps, (sizeof(int) * (size_t)nprocs));
606  return MPI_SUCCESS;
607 }
608 
609 /* Assigns each rank which part of the file to read for reassembling.
610  RANKINDEXES is set 0 or -1 for reads or for no reads. RANKINDEXES
611  holds an index to a unit in a stripe, but it is always zero in the
612  reassembling case which reads the whole file. */
613 
614 static int
615 kmr_assign_ranks_trivially(KMR *mr, char *file,
616  struct kmr_file_reader *sgv, int colorsetsize,
617  _Bool leader,
618  struct kmr_fefs_stripe *stripe)
619 {
620  //int nprocs = mr->nprocs;
621  for (int k = 0; k < colorsetsize; k++) {
622  if (sgv[k].ingesting) {
623  sgv[k].stripe = 0;
624  }
625  }
626  off_t sz = 0;
627  for (int k = 0; k < colorsetsize; k++) {
628  if (sgv[k].ingesting) {
629  sgv[k].offset = sz;
630  sz += sgv[k].reads;
631  } else {
632  sgv[k].offset = 0;
633  }
634  }
635  return MPI_SUCCESS;
636 }
637 
638 /** Reassembles files reading by ranks. It is intended to reassembles
639  a file from files split into segments. FILE is a file name. A
640  file name can be null, when the rank does not participate reading
641  (COLOR=-1). COLOR groups ranks (be COLOR>=-1). The files on the
642  ranks with the same COLOR are concatenated, where concatenation is
643  ordered by the rank-order. Read is performed for OFFSET and BYTES
644  on each file. BYTES can be -1 to read an entire file. BUFFER and
645  SIZE are set to the malloced buffer and the size on return. Ranks
646  with non-null FILE retrieve a file (ingest), while ranks with
647  non-zero BUFFER receive contents (digest). Ranks with COLOR=-1 do
648  not participate in file reading. REMARK ON K: It reads a
649  specified file by each rank, assuming the files reside in specific
650  I/O-groups to the ranks. */
651 
652 int
653 kmr_read_files_reassemble(KMR *mr, char *file, int color,
654  off_t offset, off_t bytes,
655  void **buffer, off_t *size)
656 {
657  assert((color != -1) == ((file != 0) || (buffer != 0)));
658  assert(color >= -1);
659  int cc;
660  //int nprocs = mr->nprocs;
661  //int rank = mr->rank;
662  _Bool ingesting = (file != 0);
663  _Bool digesting = (buffer != 0);
664  _Bool active = (color != -1);
665 
666  /* Collect read segment information of each node. */
667 
668  struct kmr_file_reader *sgv;
669  int colorsetsize;
670  int colorindex;
671  cc = kmr_share_segment_information(mr, file, color, ingesting, digesting,
672  offset, bytes,
673  &sgv, &colorsetsize, &colorindex);
674  off_t reads = ((colorindex != -1) ? sgv[colorindex].reads : 0);
675  //_Bool head = ((colorindex != -1) ? sgv[colorindex].head : 0);
676 
677  assert(cc == MPI_SUCCESS);
678  assert(!active || (sgv != 0 && colorsetsize != 0));
679  assert((sgv == 0) == (colorsetsize == 0));
680 
681  if (ingesting && (reads < bytes)) {
682  char ee[160];
683  snprintf(ee, sizeof(ee),
684  "File (%s) too small to read offset=%zd bytes=%zd",
685  file, offset, bytes);
686  kmr_error(mr, ee);
687  }
688  off_t totalsize = 0;
689  for (int k = 0; k < colorsetsize; k++) {
690  if (sgv[k].ingesting) {
691  totalsize += sgv[k].reads;
692  }
693  }
694 
695  /* Dummy single unit stripe. */
696 
697  struct kmr_fefs_stripe stripe = {.s = {
698  .size = (uint32_t)mr->file_io_block_size,
699  .count = 1,
700  .offset = 0}
701  };
702 
703  cc = kmr_assign_ranks_trivially(mr, file, sgv, colorsetsize, /*head*/ 0,
704  &stripe);
705  assert(cc == MPI_SUCCESS);
706  assert(ingesting == (sgv[colorindex].stripe != -1));
707 
708  /* Determine repeat count of reads. */
709 
710  long maxloops = 0;
711  cc = kmr_take_maximum_loop_count(mr, reads, &stripe, &maxloops);
712  assert(cc == MPI_SUCCESS && maxloops > 0);
713 
714  char *b = kmr_malloc((size_t)totalsize);
715  {
716  char *tmpbuf = kmr_malloc((size_t)stripe.s.size);
717  int fd = -1;
718  if (ingesting) {
719  do {
720  fd = open(file, O_RDONLY, 0);
721  } while (fd == -1 && errno == EINTR);
722  if (fd == -1) {
723  char ee[160];
724  char *m = strerror(errno);
725  snprintf(ee, sizeof(ee), "open(%s): %s", file, m);
726  kmr_error(mr, ee);
727  }
728  }
729 
730  cc = kmr_read_and_gather(mr, 1, file, fd, offset,
731  b, totalsize, tmpbuf, &stripe, maxloops,
732  sgv, colorsetsize, colorindex);
733  assert(cc == MPI_SUCCESS);
734 
735  if (ingesting) {
736  do {
737  cc = close(fd);
738  } while (cc == -1 && errno == EINTR);
739  if (cc != 0) {
740  char ee[160];
741  char *m = strerror(errno);
742  snprintf(ee, sizeof(ee), "close(%s): %s", file, m);
743  kmr_error(mr, ee);
744  }
745  }
746  kmr_free(tmpbuf, (size_t)stripe.s.size);
747  }
748  if (sgv != 0) {
749  kmr_free(sgv, (sizeof(struct kmr_file_reader) * (size_t)colorsetsize));
750  }
751  if (buffer != 0) {
752  *buffer = b;
753  } else {
754  kmr_free(b, (size_t)totalsize);
755  }
756  *size = totalsize;
757  return MPI_SUCCESS;
758 }
759 
760 /* Returns striping information of a file. It returns fake OBDINDX in
761  the case of KMR_FILE_DUMMY_STRIPE to avoid raising an error. Dummy
762  striping allows simple testing without Lustre file-system. The
763  fake OBDIDX should match with the position values of ranks returned
764  by kmr_k_node(). */
765 
766 static int
767 kmr_get_stripe(KMR *mr, char *file, int colorsetsize, _Bool leader,
768  struct kmr_fefs_stripe *stripe)
769 {
770  char path[PATH_MAX];
771  int cc;
772  size_t len = strlen(file);
773  assert(len < PATH_MAX);
774  memcpy(path, file, (len + 1));
775  char *d = path;
776  char *f = 0;
777  for (char *p = &path[len - 1]; p >= path; p--) {
778  if (*p == '/') {
779  f = (p + 1);
780  *p = 0;
781  break;
782  }
783  }
784  if (f == 0) {
785  /* No directory part in file name. */
786  d = ".";
787  f = path;
788  }
789  int errori = 0;
790  _Bool dumpi = (mr->trace_file_io && (7 <= mr->verbosity));
791  cc = kmr_fefs_get_stripe(d, f, stripe, &errori, dumpi);
792  switch (cc) {
793  default:
794  assert(0 <= cc && cc <=7);
795  break;
796  case 0:
797  /* OK */
798  break;
799  case 1:
800  /* OS unsupported. */
801  stripe->s = kmr_bad_stripe;
802  break;
803  case 2:
804  /* malloc fails. */
805  stripe->s = kmr_bad_stripe;
806  {
807  char ee[160];
808  char *m = strerror(errori);
809  snprintf(ee, sizeof(ee), "malloc(ostdata): %s", m);
810  kmr_error(mr, ee);
811  }
812  break;
813  case 3:
814  /* open fails. */
815  stripe->s = kmr_bad_stripe;
816  {
817  char ee[160];
818  char *m = strerror(errori);
819  snprintf(ee, sizeof(ee), "open(dir=%s): %s", d, m);
820  kmr_error(mr, ee);
821  }
822  break;
823  case 4:
824  /* ioctl fails (Not Lustre FS). */
825  case 5:
826  /* Bad magic. */
827  case 6:
828  /* Bad version. */
829  case 7:
830  /* Bad pattern. */
831  /* IGNORE ERROR, OK NOT BEING LUSTRE. */
832  stripe->s = kmr_bad_stripe;
833  break;
834  }
835  if (stripe->s.count == 0 && mr->file_io_dummy_striping) {
836  if (leader) {
837  char ee[160];
838  snprintf(ee, sizeof(ee),
839  ("FILE (%s) ASSIGNED WITH DUMMY STRIPE,"
840  " missing striping information"), file);
841  kmr_warning(mr, 5, ee);
842  }
843  stripe->s.size = (uint32_t)mr->file_io_block_size;
844  stripe->s.count = (uint16_t)MIN(colorsetsize, 20000);
845  stripe->s.offset = 0;
846  assert((stripe->s.count * 64) < 0x10000);
847  for (int i = 0; i < stripe->s.count; i++) {
848  stripe->obdidx[i] = (uint16_t)(i * 64);
849  }
850  }
851  return MPI_SUCCESS;
852 }
853 
854 /* Shares striping information. */
855 
856 static int
857 kmr_share_striping_information(KMR *mr, char *file, int color,
858  struct kmr_file_reader *sgv, int colorsetsize,
859  _Bool leader,
860  _Bool ingesting, _Bool digesting,
861  struct kmr_fefs_stripe *stripe)
862 {
863  int cc;
864  _Bool active = (colorsetsize != 0);
865  if (ingesting) {
866  cc = kmr_get_stripe(mr, file, colorsetsize, leader, stripe);
867  assert(cc == MPI_SUCCESS);
868  }
869  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
870  if (ingesting && leader) {
871  size_t vlen = (kmr_fefs_stripe_info_size
872  + (sizeof(uint16_t) * stripe->s.count));
873  struct kmr_kv_box nkv = {
874  .klen = (int)sizeof(long),
875  .vlen = (int)vlen,
876  .k.i = color,
877  .v.p = (void *)stripe
878  };
879  cc = kmr_add_kv(kvs4, nkv);
880  assert(cc == MPI_SUCCESS);
881  }
882  kmr_add_kv_done(kvs4);
883  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
884  cc = kmr_replicate(kvs4, kvs5, kmr_noopt);
885  assert(cc == MPI_SUCCESS);
886  if (active && !ingesting) {
887  struct kmr_kv_box key = {
888  .klen = (int)sizeof(long),
889  .vlen = 0,
890  .k.i = color,
891  .v.p = 0
892  };
893  struct kmr_kv_box kv;
894  cc = kmr_find_key(kvs5, key, &kv);
895  assert(cc == MPI_SUCCESS);
896  memcpy(stripe, kv.v.p, (size_t)kv.vlen);
897  }
898  cc = kmr_free_kvs(kvs5);
899  assert(cc == MPI_SUCCESS);
900  return MPI_SUCCESS;
901 }
902 
903 /* Assigns each rank a unit in a stripe, and sets RANKINDEXES which
904  holds an index to a unit in a stripe for each rank. Note it may
905  fix striping information to meet the number of readers. */
906 
907 static int
908 kmr_assign_ranks_to_stripe(KMR *mr, char *file,
909  struct kmr_file_reader *sgv, int colorsetsize,
910  _Bool leader,
911  struct kmr_fefs_stripe *stripe)
912 {
913  _Bool active = (colorsetsize != 0);
914  //int nprocs = mr->nprocs;
915  if (active) {
916  //int nprocs = mr->nprocs;
917  int nreaders = 0;
918  for (int k = 0; k < colorsetsize; k++) {
919  if (sgv[k].ingesting) {
920  nreaders++;
921  }
922  }
923  assert(nreaders > 0);
924  if ((stripe->s.count > nreaders) || (stripe->s.count == 0)) {
925  if ((stripe->s.count == 0) && leader) {
926  char ee[160];
927  snprintf(ee, sizeof(ee),
928  ("FILE (%s) ASSIGNED TO ARBITRARY READERS,"
929  " no striping information"), file);
930  kmr_warning(mr, 5, ee);
931  }
932  if ((stripe->s.count > nreaders) && leader) {
933  char ee[160];
934  snprintf(ee, sizeof(ee),
935  ("FILE (%s) ASSIGNED TO ARBITRARY READERS,"
936  " more stripes than ranks"), file);
937  kmr_warning(mr, 5, ee);
938  }
939 
940  /* Assign readers arbitrary. */
941 
942  stripe->s.count = (uint16_t)nreaders;
943  int sindex = 0;
944  for (int k = 0; k < colorsetsize; k++) {
945  if (sgv[k].ingesting) {
946  //int r = sgv[k].rank;
947  sgv[k].stripe = sindex;
948  sindex++;
949  }
950  }
951  assert(sindex == stripe->s.count);
952  } else {
953  _Bool assigned[stripe->s.count];
954  int nassigned = 0;
955  for (int i = 0; i < stripe->s.count; i++) {
956  assigned[i] = 0;
957  }
958  for (int i = 0; i < stripe->s.count; i++) {
959  int iog = kmr_iogroup_of_obd(stripe->obdidx[i]);
960  for (int k = 0; k < colorsetsize; k++) {
961  //int r = sgv[k].rank;
962  int g = sgv[k].iogroup;
963  if (sgv[k].ingesting && iog == g && sgv[k].stripe != -1) {
964  sgv[k].stripe = i;
965  assigned[i] = 1;
966  nassigned++;
967  break;
968  }
969  }
970  }
971  if ((nassigned < stripe->s.count) && leader) {
972  char ee[160];
973  snprintf(ee, sizeof(ee),
974  ("FILE (%s) ASSIGNED TO ARBITRARY READERS,"
975  " some stripes not covered by ranks"), file);
976  kmr_warning(mr, 5, ee);
977  }
978 
979  /* Assign readers to remaining units in a stripe. */
980 
981  if (nassigned < stripe->s.count) {
982  for (int i = 0; i < stripe->s.count; i++) {
983  if (!assigned[i]) {
984  for (int k = 0; k < colorsetsize; k++) {
985  //int r = sgv[k].rank;
986  if (sgv[k].ingesting && sgv[k].stripe == -1) {
987  sgv[k].stripe = i;
988  assigned[i] = 1;
989  nassigned++;
990  break;
991  }
992  }
993  }
994  }
995  }
996  assert(nassigned == stripe->s.count);
997  }
998  }
999  for (int k = 0; k < colorsetsize; k++) {
1000  //int r = sgv[k].rank;
1001  if (sgv[k].stripe != -1) {
1002  sgv[k].offset = ((int)stripe->s.size * sgv[k].stripe);
1003  } else {
1004  sgv[k].offset = 0;
1005  }
1006  }
1007  return MPI_SUCCESS;
1008 }
1009 
1010 /** Reads one file by segments and reassembles by all-gather. FILE is
1011  a file name. COLOR groups ranks (be COLOR>=-1). The ranks with
1012  the same COLOR collaborate to read a file, and thus, they must
1013  specify the same file (with an identical inode number). BUFFER
1014  and SIZE are set to the malloced buffer and the size on return.
1015  Ranks with non-zero FILE retrieve a file (ingest). Ranks with
1016  non-zero BUFFER receive contents (digest). Ranks with COLOR=-1 do
1017  not participate in file reading, and then arguments should be
1018  FILE=0 and BUFFER=0. */
1019 
1020 int
1021 kmr_read_file_by_segments(KMR *mr, char *file, int color,
1022  void **buffer, off_t *size)
1023 {
1024  assert((color != -1) == ((file != 0) || (buffer != 0)));
1025  assert(color >= -1);
1026  int cc;
1027  //const int nprocs = mr->nprocs;
1028  //const int rank = mr->rank;
1029  _Bool ingesting = (file != 0);
1030  _Bool digesting = (buffer != 0);
1031  _Bool active = (color != -1);
1032 
1033  /* Collect read segment information of each node. */
1034 
1035  struct kmr_file_reader *sgv;
1036  int colorsetsize;
1037  int colorindex;
1038  const off_t offset = 0;
1039  const int bytes = -1;
1040  cc = kmr_share_segment_information(mr, file, color, ingesting, digesting,
1041  offset, bytes,
1042  &sgv, &colorsetsize, &colorindex);
1043  assert(cc == MPI_SUCCESS);
1044  assert(active == (sgv != 0) && active == (colorsetsize != 0));
1045  _Bool head = ((colorindex != -1) ? sgv[colorindex].head : 0);
1046 
1047  /* Check inode number for all the ranks access the same fail. */
1048 
1049  off_t totalsize = 0;
1050  ino_t ino = 0;
1051  for (int k = 0; k < colorsetsize; k++) {
1052  if (sgv[k].ingesting) {
1053  if (totalsize == 0) {
1054  ino = sgv[k].ino;
1055  totalsize = sgv[k].reads;
1056  break;
1057  }
1058  }
1059  }
1060  for (int k = 0; k < colorsetsize; k++) {
1061  if (sgv[k].ingesting) {
1062  if (ino != sgv[k].ino) {
1063  char ee[160];
1064  snprintf(ee, sizeof(ee), "File (%s) with different ino", file);
1065  kmr_error(mr, ee);
1066  }
1067  if (totalsize != sgv[k].reads) {
1068  char ee[160];
1069  snprintf(ee, sizeof(ee), "File (%s) returns different sizes", file);
1070  kmr_error(mr, ee);
1071  }
1072  }
1073  }
1074 
1075  /* Share striping information. */
1076 
1077  struct kmr_fefs_stripe stripe = {.s = kmr_bad_stripe};
1078  cc = kmr_share_striping_information(mr, file, color,
1079  sgv, colorsetsize,
1080  head, ingesting, digesting,
1081  &stripe);
1082  assert(cc == MPI_SUCCESS);
1083 
1084  /* Assign ranks to units in a stripe. */
1085 
1086  cc = kmr_assign_ranks_to_stripe(mr, file, sgv, colorsetsize, head,
1087  &stripe);
1088  assert(cc == MPI_SUCCESS);
1089 
1090  /* Determine repeat count of reads. */
1091 
1092  long maxloops = 0;
1093  cc = kmr_take_maximum_loop_count(mr, totalsize, &stripe, &maxloops);
1094  assert(cc == MPI_SUCCESS && maxloops > 0);
1095 
1096  char *b = kmr_malloc((size_t)totalsize);
1097  {
1098  char *tmpbuf = kmr_malloc((size_t)stripe.s.size);
1099  int fd = -1;
1100  if (colorindex != -1 && sgv[colorindex].stripe != -1) {
1101  do {
1102  fd = open(file, O_RDONLY, 0);
1103  } while (fd == -1 && errno == EINTR);
1104  if (fd == -1) {
1105  char ee[160];
1106  char *m = strerror(errno);
1107  snprintf(ee, sizeof(ee), "open(%s): %s", file, m);
1108  kmr_error(mr, ee);
1109  }
1110  }
1111 
1112  /* Cycle reading one stripe. */
1113 
1114  cc = kmr_read_and_gather(mr, 0, file, fd, offset,
1115  b, totalsize, tmpbuf, &stripe, maxloops,
1116  sgv, colorsetsize, colorindex);
1117  assert(cc == MPI_SUCCESS);
1118 
1119  if (colorindex != -1 && sgv[colorindex].stripe != -1) {
1120  do {
1121  cc = close(fd);
1122  } while (cc == -1 && errno == EINTR);
1123  if (cc != 0) {
1124  char ee[160];
1125  char *m = strerror(errno);
1126  snprintf(ee, sizeof(ee), "close(%s): %s", file, m);
1127  kmr_error(mr, ee);
1128  }
1129  }
1130  kmr_free(tmpbuf, (size_t)stripe.s.size);
1131  }
1132  if (sgv != 0) {
1133  kmr_free(sgv, (sizeof(struct kmr_file_reader) * (size_t)colorsetsize));
1134  }
1135  if (buffer != 0) {
1136  *buffer = b;
1137  } else {
1138  kmr_free(b, (size_t)totalsize);
1139  }
1140  *size = totalsize;
1141  return MPI_SUCCESS;
1142 }
1143 
1144 /* ================================================================ */
1145 
1146 /* (MP-MPI MAP VARIANT 1. MR_map). Nothing provided. */
1147 
1148 /* (MP-MPI MAP VARIANT 2. MR_map_file). */
1149 
1150 #define COMINGSOON 0
1151 
1152 /** Adds file names in a key-value stream KVO. It checks the file
1153  name NAMES[i] exists, and adds it for a regular file, or
1154  enumerates it for a directory. */
1155 
1156 int
1157 kmr_file_enumerate(KMR *mr, char *names[], int n, KMR_KVS *kvo,
1158  struct kmr_file_option fopt)
1159 {
1160  assert(!fopt.list_file || fopt.subdirectories);
1161  for (int i = 0; i < n; i++) {
1162  char *path = names[i];
1163  struct stat s;
1164  int cc;
1165  do {
1166  cc = stat(path, &s);
1167  } while (cc == -1 && errno == EINTR);
1168  if (cc == 0) {
1169  if (!(S_ISREG(s.st_mode) || S_ISDIR(s.st_mode))) {
1170  char *m;
1171  if (S_ISFIFO(s.st_mode)) {
1172  m = "fifo special";
1173  } else if (S_ISCHR(s.st_mode)) {
1174  m = "character special";
1175  } else if (S_ISDIR(s.st_mode)) {
1176  m = "directory";
1177  } else if (S_ISBLK(s.st_mode)) {
1178  m = "block special";
1179  } else if (S_ISREG(s.st_mode)) {
1180  m = "regular file";
1181  } else {
1182  m = "unknown";
1183  }
1184  char ee[160];
1185  snprintf(ee, sizeof(ee), "Path (%s): type is %s", path, m);
1186  kmr_error(mr, ee);
1187  } else {
1188  /* Follow to main work. */
1189  }
1190  } else if (errno == ENOENT) {
1191  /* Non-existent. */
1192  char ee[160];
1193  snprintf(ee, sizeof(ee), "Path (%s): nonexistent", path);
1194  kmr_error(mr, ee);
1195  assert(errno != ENOENT);
1196  } else if (errno == EACCES
1197  || errno == ELOOP
1198  || errno == ENOLINK
1199  || errno == ENOTDIR) {
1200  /* Non-existent (Really, inaccessible). */
1201  char ee[160];
1202  char *m = strerror(errno);
1203  snprintf(ee, sizeof(ee), "stat(%s): %s", path, m);
1204  kmr_error(mr, ee);
1205  } else {
1206  /* STAT errs. */
1207  assert(cc == -1);
1208  char ee[160];
1209  char *m = strerror(errno);
1210  snprintf(ee, sizeof(ee), "stat(%s): %s", path, m);
1211  kmr_error(mr, ee);
1212  }
1213 
1214  /* Process file/directory entry. */
1215 
1216  if (S_ISREG(s.st_mode)) {
1217  /* Existent and accessible, regular file. */
1218  if (fopt.list_file) {
1219  FILE *f = 0;
1220  do {
1221  f = fopen(path, "r");
1222  } while (f == 0 && errno == EINTR);
1223  if (f == 0) {
1224  char ee[160];
1225  char *e = strerror(errno);
1226  snprintf(ee, sizeof(ee), "fopen(%s): %s", path, e);
1227  kmr_error(mr, ee);
1228  }
1229  char line[MAXPATHLEN];
1230  while ((fgets(line, sizeof(line), f)) != 0) {
1231  char *e = strchr(line, '\n');
1232  /* Drop a newline. */
1233  if (e != 0) {
1234  *e = 0;
1235  } else {
1236  char ee[160];
1237  if (feof(f)) {
1238  snprintf(ee, sizeof(ee),
1239  ("File (%s) misses newline"
1240  " at the end"), path);
1241  kmr_warning(mr, 5, ee);
1242  } else {
1243  snprintf(ee, sizeof(ee),
1244  "File (%s) misses newline", path);
1245  kmr_error(mr, ee);
1246  }
1247  }
1248  if (e != line) {
1249  e--;
1250  while (e != line && *e == ' ') {
1251  e--;
1252  }
1253  if (e != line) {
1254  e++;
1255  assert(*e == 0 || *e == ' ');
1256  }
1257  *e = 0;
1258  }
1259  char *p = line;
1260  while (*p != 0 && *p == ' ') {
1261  p++;
1262  }
1263  if (*p == 0 || *p == '#') {
1264  continue;
1265  }
1266  /* Suppress list_file, recursing is ill. */
1267  struct kmr_file_option foptx = fopt;
1268  assert(foptx.list_file);
1269  foptx.list_file = 0;
1270  char *na[] = {p};
1271  cc = kmr_file_enumerate(mr, na, 1, kvo, foptx);
1272  assert(cc == MPI_SUCCESS);
1273  }
1274  if (ferror(f)) {
1275  char ee[160];
1276  char *e = strerror(fileno(f));
1277  snprintf(ee, sizeof(ee), "fgets(%s): %s", path, e);
1278  kmr_error(mr, ee);
1279  }
1280  } else {
1281  size_t len = (strlen(path) + 1);
1282  struct kmr_kv_box nkv = {.klen = (int)len,
1283  .vlen = (int)sizeof(long),
1284  .k.p = path,
1285  .v.i = 0};
1286  cc = kmr_add_kv(kvo, nkv);
1287  assert(cc == MPI_SUCCESS);
1288  }
1289  } else if (S_ISDIR(s.st_mode)) {
1290  /* Existent and accessible, directory. */
1291  if (fopt.subdirectories) {
1292  size_t dsz;
1293  long nmax = pathconf(path, _PC_NAME_MAX);
1294  if (nmax == -1) {
1295  dsz = (64 * 1024);
1296  } else {
1297  dsz = (offsetof(struct dirent, d_name) + (size_t)(nmax + 1));
1298  }
1299  DIR *d;
1300  struct dirent *dp;
1301  char b[dsz];
1302  d = opendir(path);
1303  if (d == 0) {
1304  char ee[160];
1305  char *m = strerror(errno);
1306  snprintf(ee, sizeof(ee), "opendir(%s): %s", path, m);
1307  kmr_error(mr, ee);
1308  }
1309  errno = 0;
1310  while ((cc = readdir_r(d, (void *)b, &dp)) == 0) {
1311  char subdir[MAXPATHLEN];
1312  if (dp == 0) {
1313  if (errno != 0) {
1314  char ee[160];
1315  char *m = strerror(errno);
1316  snprintf(ee, sizeof(ee),
1317  "readdir_r(%s): %s", path, m);
1318  kmr_error(mr, ee);
1319  }
1320  /* End of records. */
1321  break;
1322  }
1323  if (*(dp->d_name) == '.') {
1324  continue;
1325  }
1326  cc = snprintf(subdir, sizeof(subdir),
1327  "%s/%s", path, dp->d_name);
1328  if (cc > ((int)sizeof(subdir) - 1)) {
1329  char ee[160];
1330  snprintf(ee, sizeof(ee),
1331  "Path (%s): too long", subdir);
1332  kmr_error(mr, ee);
1333  }
1334  char *na[] = {subdir};
1335  cc = kmr_file_enumerate(mr, na, 1, kvo, fopt);
1336  assert(cc == MPI_SUCCESS);
1337  }
1338  do {
1339  cc = closedir(d);
1340  } while (cc == -1 && errno == EINTR);
1341  if (cc != 0) {
1342  char ee[160];
1343  char *m = strerror(errno);
1344  snprintf(ee, sizeof(ee), "closedir(%s): %s", path, m);
1345  kmr_error(mr, ee);
1346  }
1347  } else {
1348  /* Just ignore directories. */
1349  }
1350  } else {
1351  assert(NEVERHERE);
1352  }
1353  }
1354  return MPI_SUCCESS;
1355 }
1356 
1357 /** Maps on file names. NAMES specifies N file names. The
1358  map-function gets a file name in the key field (the value field is
1359  integer zero). File-option EACH_RANK specifies each rank
1360  independently to enumerate file names, otherwise to work on rank0
1361  only. File-option SUBDIRECTORIES specifies to descend to
1362  subdirectories. It ignores files/directories whose name starting
1363  with dots. File-option LIST_FILE specifies to read contents of
1364  each file for file names. File consists of one file name per
1365  line, and ignores a line beginning with a "#". Whitespaces are
1366  trimed at the beginning and the end. LIST_FILE implies
1367  SUBDIRECTORIES. It enumerates names of regular files only.
1368  File-option SHUFFLE_FILES runs shuffling file names among
1369  ranks. */
1370 
1371 int
1372 kmr_map_file_names(KMR *mr, char **names, int n, struct kmr_file_option fopt,
1373  KMR_KVS *kvo, void *arg,
1374  struct kmr_option opt, kmr_mapfn_t m)
1375 {
1376  struct kmr_option kmr_supported = {.nothreading = 1, .keep_open = 1};
1377  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
1378  //const int nprocs = mr->nprocs;
1379  const int rank = mr->rank;
1380  int cc;
1381  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
1382  if (fopt.each_rank || rank == 0) {
1383  struct kmr_file_option foptx = fopt;
1384  if (foptx.list_file) {
1385  foptx.subdirectories = 1;
1386  }
1387  cc = kmr_file_enumerate(mr, names, n, kvs0, foptx);
1388  } else {
1389  /*assert(n == 0);*/
1390  }
1391  kmr_add_kv_done(kvs0);
1392  KMR_KVS *kvs1;
1393  if (fopt.shuffle_names) {
1394  kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
1395  cc = kmr_shuffle(kvs0, kvs1, kmr_noopt);
1396  assert(cc == MPI_SUCCESS);
1397  } else {
1398  kvs1 = kvs0;
1399  }
1400  cc = kmr_map(kvs1, kvo, arg, opt, m);
1401  assert(cc == MPI_SUCCESS);
1402  return MPI_SUCCESS;
1403 }
1404 
1405 /* (MP-MPI MAP Variant 3. MR_map_file_char). Nothing provided. */
1406 
1407 /* (MP-MPI MAP VARIANT 4. MR_map_file_str). Nothing provided. */
1408 
1409 #if 0
1410 
1411 /* Maps on file contents. Mapper first makes chunks of the file
1412  contents with roughly equal sizes, then maps on them. It chunks
1413  files on rank0 only. NCHUNKS is the number of total chunks. NAMES
1414  specifies N file names. SUBDIRS specifies to descend
1415  subdirectories. LISTFILE specifies to read contents of each file
1416  for file names. File consists of one file name per line. DELTAUB
1417  is the upper bound of the number of bytes skipped before the next
1418  separator is found. Note the flag RANK0 does not exist in contrast
1419  to this mapper. */
1420 
1421 int
1422 kmr_map_file_contents(KMR *mr, int nchunks, char **names, int n,
1423  _Bool subdirs, _Bool listfile,
1424  char *sep, long deltaub,
1425  KMR_KVS *kvo, void *arg, struct kmr_option opt,
1426  kmr_mapfn_t m)
1427 {
1428  assert(COMINGSOON);
1429  return MPI_SUCCESS;
1430 }
1431 
1432 #endif
1433 
1434 /* (MP-MPI MAP VARIANT 5. MR_map_mr). Nothing provided. */
1435 
1436 /* ================================================================ */
1437 
1438 static int
1439 kmr_map_getline_threading(KMR *mr, FILE *f, long limit, _Bool largebuffering,
1440  KMR_KVS *kvo, void *arg,
1441  struct kmr_option opt, kmr_mapfn_t m)
1442 {
1443  int cc;
1444 
1445  int glineno;
1446  glineno = 0;
1447 
1448 #ifdef _OPENMP
1449  const _Bool threading = !(mr->single_thread || opt.nothreading);
1450 #endif
1451  KMR_OMP_PARALLEL_IF_(threading)
1452  {
1453  char *line;
1454  size_t linesz;
1455  line = 0;
1456  linesz = 0;
1457  long lineno;
1458  ssize_t rc;
1459  for (;;) {
1460  KMR_OMP_CRITICAL_
1461  {
1462  lineno = glineno;
1463  if (lineno < limit) {
1464  rc = getline(&line, &linesz, f);
1465  if (rc != -1) {
1466  glineno++;
1467  }
1468  } else {
1469  rc = 0;
1470  }
1471  }
1472 
1473  if (!(lineno < limit && rc != -1)) {
1474  break;
1475  }
1476 
1477  assert(rc <= INT_MAX);
1478  struct kmr_kv_box kv = {
1479  .klen = sizeof(long),
1480  .vlen = (int)rc,
1481  .k.i = lineno,
1482  .v.p = line
1483  };
1484  cc = (*m)(kv, 0, kvo, arg, lineno);
1485  if (cc != MPI_SUCCESS) {
1486  char ee[80];
1487  snprintf(ee, sizeof(ee),
1488  "Map-fn returned with error cc=%d", cc);
1489  kmr_error(mr, ee);
1490  }
1491  }
1492  if (rc == -1 && ferror(f)) {
1493  char ee[80];
1494  char *w = strerror(errno);
1495  snprintf(ee, sizeof(ee),
1496  "kmr_map_getline: getline at line %ld failed: %s",
1497  lineno, w);
1498  kmr_error(mr, ee);
1499  }
1500 
1501  if (line != 0) {
1502  free(line);
1503  }
1504  }
1505  return MPI_SUCCESS;
1506 }
1507 
1508 static int
1509 kmr_map_getline_nothreading(KMR *mr, FILE *f, long limit, _Bool largebuffering,
1510  KMR_KVS *kvo, void *arg,
1511  struct kmr_option opt, kmr_mapfn_t m)
1512 {
1513  int cc;
1514 
1515  char *line = 0;
1516  size_t linesz = 0;
1517  long lineno = 0;
1518  ssize_t rc = 0;
1519  while ((lineno < limit) && (rc = getline(&line, &linesz, f)) != -1) {
1520  assert(rc <= INT_MAX);
1521  struct kmr_kv_box kv = {
1522  .klen = sizeof(long),
1523  .vlen = (int)rc,
1524  .k.i = lineno,
1525  .v.p = line
1526  };
1527  cc = (*m)(kv, 0, kvo, arg, lineno);
1528  if (cc != MPI_SUCCESS) {
1529  char ee[80];
1530  snprintf(ee, sizeof(ee),
1531  "Map-fn returned with error cc=%d", cc);
1532  kmr_error(mr, ee);
1533  }
1534  lineno++;
1535  }
1536  if (rc == -1 && ferror(f)) {
1537  char ee[80];
1538  char *w = strerror(errno);
1539  snprintf(ee, sizeof(ee),
1540  "kmr_map_getline: getline at line %ld failed: %s",
1541  lineno, w);
1542  kmr_error(mr, ee);
1543  }
1544 
1545  if (line != 0) {
1546  free(line);
1547  }
1548 
1549  return MPI_SUCCESS;
1550 }
1551 
1552 /** Calls a map-function M for each line by getline() on an input F.
1553  A map-function gets a line number in key and a string in value
1554  (the index argument is the same as the key). Calls to getline()
1555  is limited to LIMIT lines (0 for unlimited). It is multi-threaded
1556  and the call order is arbitrary. ARG and OPT are passed verbatim
1557  to a map-function. Effective-options: NOTHREADING, KEEP_OPEN,
1558  TAKE_CKPT. See struct kmr_option. */
1559 
1560 int
1561 kmr_map_getline(KMR *mr, FILE *f, long limit, _Bool largebuffering,
1562  KMR_KVS *kvo, void *arg,
1563  struct kmr_option opt, kmr_mapfn_t m)
1564 {
1565  kmr_assert_kvs_ok(0, kvo, 0, 1);
1566  struct kmr_option kmr_supported = {.nothreading = 1, .keep_open = 1,
1567  .take_ckpt = 1};
1568  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
1569 
1570  if (kmr_ckpt_enabled(mr)) {
1571  if (kmr_ckpt_progress_init(0, kvo, opt)) {
1572  if (kvo != 0 && !opt.keep_open) {
1573  kmr_add_kv_done(kvo);
1574  }
1575  return MPI_SUCCESS;
1576  }
1577  }
1578 
1579  size_t BFSZ = (8 * 1024 * 1024);
1580  int cc;
1581 
1582  if (f != 0) {
1583  if (limit == 0) {
1584  limit = LONG_MAX;
1585  }
1586 
1587  char *b = 0;
1588  if (largebuffering) {
1589  b = kmr_malloc(BFSZ);
1590  cc = setvbuf(f, b, _IOFBF, BFSZ);
1591  if (cc != 0) {
1592  char ee[80];
1593  char *w = strerror(errno);
1594  snprintf(ee, sizeof(ee), "%s: setvbuf failed (ignored): %s",
1595  __func__, w);
1596  kmr_warning(mr, 5, ee);
1597  kmr_free(b, BFSZ);
1598  b = 0;
1599  }
1600  }
1601 
1602  if (mr->single_thread || opt.nothreading) {
1603  cc = kmr_map_getline_nothreading(mr, f, limit, largebuffering,
1604  kvo, arg, opt, m);
1605  } else {
1606  cc = kmr_map_getline_threading(mr, f, limit, largebuffering,
1607  kvo, arg, opt, m);
1608  }
1609 
1610  if (b != 0) {
1611  assert(largebuffering);
1612  cc = setvbuf(f, 0, _IONBF, 0);
1613  if (cc != 0) {
1614  char ee[80];
1615  char *w = strerror(errno);
1616  snprintf(ee, sizeof(ee),
1617  "%s: setvbuf() at the end failed (ignored): %s",
1618  __func__, w);
1619  kmr_warning(mr, 5, ee);
1620  }
1621  /* FREE BUFFER ANYWAY. */
1622  kmr_free(b, BFSZ);
1623  }
1624  }
1625 
1626  if (kvo != 0 && !opt.keep_open) {
1627  kmr_add_kv_done(kvo);
1628  }
1629 
1630  if (kmr_ckpt_enabled(mr)) {
1631  kmr_ckpt_save_kvo_whole(mr, kvo);
1633  }
1634 
1635  return MPI_SUCCESS;
1636 }
1637 
1638 /* Calls a map-function M for each line in a memory buffer.
1639  (TENTATIVE). It is equivalent to calling kmr_map_getline() on a
1640  stream returned by fmemopen(). It calls a mapper with a line
1641  pointing to the inside the buffer. */
1642 
1643 int
1644 kmr_map_getline_in_memory_(KMR *mr, void *b, size_t sz, long limit,
1645  KMR_KVS *kvo, void *arg,
1646  struct kmr_option opt, kmr_mapfn_t m)
1647 {
1648  kmr_assert_kvs_ok(0, kvo, 0, 1);
1649  struct kmr_option kmr_supported = {.nothreading = 1, .keep_open = 1,
1650  .take_ckpt = 1};
1651  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
1652  int cc;
1653 
1654  if (kmr_ckpt_enabled(mr)) {
1655  if (kmr_ckpt_progress_init(0, kvo, opt)) {
1656  if (kvo != 0 && !opt.keep_open) {
1657  kmr_add_kv_done(kvo);
1658  }
1659  return MPI_SUCCESS;
1660  }
1661  }
1662 
1663  if (limit == 0) {
1664  limit = LONG_MAX;
1665  }
1666 
1667  int glineno = 0;
1668  char *gline = b;
1669  char * const end = (gline + sz);
1670 
1671 #ifdef _OPENMP
1672  const _Bool threading = !(mr->single_thread || opt.nothreading);
1673 #endif
1674  KMR_OMP_PARALLEL_IF_(threading)
1675  {
1676  char *line = 0;
1677  size_t linesz = 0;
1678  long lineno = 0;
1679  ssize_t rc;
1680  for (;;) {
1681  KMR_OMP_CRITICAL_
1682  {
1683  if (!((gline < end) && (glineno < limit))) {
1684  line = 0;
1685  linesz = 0;
1686  lineno = glineno;
1687  rc = -1;
1688  } else {
1689  char *p = gline;
1690  while (p < end && *p != '\n') {
1691  p++;
1692  }
1693  if (p < end && *p == '\n') {
1694  p++;
1695  }
1696  line = gline;
1697  linesz = (size_t)(p - gline);
1698  lineno = glineno;
1699  gline = p;
1700  glineno++;
1701  rc = (ssize_t)linesz;
1702  }
1703  }
1704 
1705  if (rc == -1) {
1706  break;
1707  }
1708 
1709  assert(rc <= INT_MAX);
1710  struct kmr_kv_box kv = {
1711  .klen = sizeof(long),
1712  .vlen = (int)rc,
1713  .k.i = lineno,
1714  .v.p = line
1715  };
1716  cc = (*m)(kv, 0, kvo, arg, lineno);
1717  if (cc != MPI_SUCCESS) {
1718  char ee[80];
1719  snprintf(ee, sizeof(ee),
1720  "Map-fn returned with error cc=%d", cc);
1721  kmr_error(mr, ee);
1722  }
1723  }
1724  }
1725 
1726  if (kvo != 0 && !opt.keep_open) {
1727  kmr_add_kv_done(kvo);
1728  }
1729 
1730  if (kmr_ckpt_enabled(mr)) {
1731  kmr_ckpt_save_kvo_whole(mr, kvo);
1733  }
1734 
1735  return MPI_SUCCESS;
1736 }
1737 
1738 /*
1739 Copyright (C) 2012-2018 RIKEN R-CCS
1740 This library is distributed WITHOUT ANY WARRANTY. This library can be
1741 redistributed and/or modified under the terms of the BSD 2-Clause License.
1742 */
int kmr_ckpt_progress_init(KMR_KVS *, KMR_KVS *, struct kmr_option)
It initializes a progress of MapReduce checkpointing.
Definition: kmrckpt.c:2754
int kmr_iogroup_of_node(KMR *mr)
Returns an I/O-group (an integer key) of a compute node.
Definition: kmrfiles.c:106
Key-Value Stream (abstract).
Definition: kmr.h:632
int kmr_read_file_by_segments(KMR *mr, char *file, int color, void **buffer, off_t *size)
Reads one file by segments and reassembles by all-gather.
Definition: kmrfiles.c:1021
Utilities Private Part (do not include from applications).
Lustre Striping Information with OBDIDX.
Definition: kmrfefs.h:20
Lustre Striping Information.
Definition: kmrfefs.h:12
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:658
void kmr_ckpt_progress_fin(KMR *)
It finalizes the progress of MapReduce checkpointing.
Definition: kmrckpt.c:2846
int kmr_file_enumerate(KMR *mr, char *names[], int n, KMR_KVS *kvo, struct kmr_file_option fopt)
Adds file names in a key-value stream KVO.
Definition: kmrfiles.c:1157
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:809
int kmr_find_key(KMR_KVS *kvi, struct kmr_kv_box ki, struct kmr_kv_box *vo)
Finds a key-value pair for a key.
Definition: kmrmoreops.c:43
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:177
Segment Reading Information of Each Rank.
Definition: kmrfiles.c:64
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2094
int kmr_ckpt_enabled(KMR *)
Check if checkpoint/restart is enabled.
Definition: kmrckpt.c:2479
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:939
KMR Context.
Definition: kmr.h:247
int kmr_read_files_reassemble(KMR *mr, char *file, int color, off_t offset, off_t bytes, void **buffer, off_t *size)
Reassembles files reading by ranks.
Definition: kmrfiles.c:653
int kmr_map_file_names(KMR *mr, char **names, int n, struct kmr_file_option fopt, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on file names.
Definition: kmrfiles.c:1372
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:679
unsigned short kmr_k_position_t[4]
Positions of node by (X,Y,Z,ABC), with ABC axes collapsed.
Definition: kmrimpl.h:126
void kmr_ckpt_save_kvo_whole(KMR *, KMR_KVS *)
It saves all key-value pairs in the output KVS to a checkpoint data file.
Definition: kmrckpt.c:2639
int kmr_allgatherv(KMR *mr, _Bool rankzeroonly, void *sbuf, long scnt, void *rbuf, long *rcnts, long *rdsps)
All-gathers data, or gathers data when RANKZEROONLY.
Definition: kmratoa.c:74
int kmr_map_getline(KMR *mr, FILE *f, long limit, _Bool largebuffering, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Calls a map-function M for each line by getline() on an input F.
Definition: kmrfiles.c:1561
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:401
int kmr_fefs_get_stripe(const char *dir, const char *file, struct kmr_fefs_stripe *stripe, int *err, _Bool debug_and_dump)
Gets the OBDIDX information on the file or directory.
Definition: kmrfefs.c:82
Lustre File System (or Fujitsu FEFS) Support.
int kmr_iogroup_of_obd(int obdidx)
Returns an I/O-group (an integer key) of a disk from an OBDIDX of Lustre file-system.
Definition: kmrfiles.c:120
static const size_t kmr_fefs_stripe_info_size
Offset to Striping OBDIDX Information.
Definition: kmrfefs.h:28
KMR Interface.
Options to Mapping on Files.
Definition: kmr.h:683
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2240
int kmr_k_node(KMR *mr, kmr_k_position_t p)
Gets TOFU position (physical coordinates) of the node.
Definition: kmrutil.c:445
int kmr_reduce_as_one(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Calls a reduce-function once as if all key-value pairs had the same key.
Definition: kmrbase.c:2683
#define CHUNK_LIMIT
Read size limit.
Definition: kmrfiles.c:48
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
Definition: kmr.h:736
int kmr_alltoallv(KMR *mr, void *sbuf, long *scounts, long *sdsps, void *rbuf, long *rcounts, long *rdsps)
Does all-to-all-v, but it takes the arguments by long-integers.
Definition: kmratoa.c:124