KMR
kmrdp.cpp
Go to the documentation of this file.
1 /* kmrdp.cpp (2014-02-04) */
2 /* Copyright (C) 2012-2018 RIKEN R-CCS (only for modifications) */
3 /* Copyright (C) 2011-2012 AKIYAMA Lab., Tokyo Institute of Technology */
4 
5 //============================================================================
6 //
7 // Software Name : MPIDP
8 //
9 // Contact address : Tokyo Institute of Technology, AKIYAMA Lab.
10 //
11 //============================================================================
12 
13 /* This is a rewrite of "mpidp.cpp", the original banner is above. */
14 
15 /** \file kmrdp.cpp MPI-DP Implementation with KMR. MPI-DP is a tool
16  developed by Akiyama Lab., Tokyo Institute of Technology (titec),
17  which provides an environment for running almost independent
18  (data-intensive, genome search) tasks using MPI, with master-worker
19  job scheduling. This is a rewrite of MPI-DP version 1.0.3. MEMO:
20  RETRY is ignored. \htmlinclude kmrdp-help.html */
21 
22 #define MPIDP_VERSION "1.0.3"
23 #define MPIDP_LASTUPDATED "2011/12/12"
24 
25 #include <mpi.h>
26 #include <stdlib.h>
27 #include <string>
28 #include <vector>
29 #include <iostream>
30 #include <fstream>
31 #include <algorithm>
32 #include <time.h>
33 #include <fcntl.h>
34 #include <sys/time.h>
35 #include <sys/stat.h>
36 #include <assert.h>
37 #include "kmr.h"
38 
39 using namespace std;
40 
41 /* Specifies new interface of KMR-DP. */
42 
43 #define KMRDP 1
44 
45 /** Asserts and aborts, but it cannot be disabled. The two message
46  styles are for Linux and Solaris. (The C++ standard does not
47  define the macro "__func__", and avoid it, although most compiler
48  does extend it). \hideinitializer */
49 
50 #ifndef __SVR4
51 #define xassert(X) \
52  ((X) ? (void)(0) \
53  : (fprintf(stderr, \
54  "%s:%d: Assertion '%s' failed.\n", \
55  __FILE__, __LINE__, #X), \
56  (void)MPI_Abort(MPI_COMM_WORLD, 1)))
57 #else
58 #define xassert(X) \
59  ((X) ? (void)(0) \
60  : (fprintf(stderr, \
61  "Assertion failed: %s, file %s, line %d\n", \
62  #X, __FILE__, __LINE__), \
63  (void)MPI_Abort(MPI_COMM_WORLD, 1)))
64 #endif
65 
66 /** Application Code Entry. It is necessary to rename "main" to
67  "application" in the application. NOTE IT IS OF C-LINKAGE (that
68  is, extern "C"). The main exists in MPI-DP, which sets up MPI,
69  reads the configuration table, and then calls the entry of the
70  application. */
71 
72 #if !defined(KMRDP) && (KMRDP == 0)
73 extern "C" int application(int argc, char *argv[]);
74 #else
75 extern "C" int application(int argc, char *argv[]);
76 extern int (*application_init)(int argc, char *argv[]);
77 extern int (*application_fin)(int argc, char *argv[]);
78 #endif
79 
80 /** Task Log (for MPIDP). It consists of job name TASKNO, TRY_COUNT,
81  control flag STATUS, list of work IDs WORKER, ? RCODE (with 0th
82  END, 1st RET, and 2nd FILE). */
83 
84 typedef struct {
85  int taskno;
86  int rank;
87  int result;
88  int fileok;
89 } TaskRecord;
90 
91 /** Per-Task Log (for MPIDP). */
92 
93 typedef struct {
94  vector<TaskRecord> records;
95  int try_count;
96 } TaskLog;
97 
98 /** Per-Rank Worker Log (for MPIDP). */
99 
100 typedef struct {
101  vector<TaskRecord> records;
102  int failure;
103 } RankLog;
104 
105 /** A Tool to Run Tasks under MPI. MPI-DP runs tasks which are almost
106  independent with master-worker scheduling. It reads a "jobs-list"
107  table, starts MPI processes, and then calls an application entry
108  point. _ARGC and _ARGV are copies of ones passed to main.
109  _APPARGV is the unhandled part of argv which is passed to the
110  application as argv. _TABLE_NUMOF_FIELDS on rank0 holds the
111  number of fields in the jobs-list table. */
112 
113 class MPIDP {
114  public:
115  KMR *_mr;
116 
117  /* Rank0 fields: */
118 
119  int _argc;
120  char **_argv;
121  vector<string> _appargv;
122 
123  int _out_file_position;
124  int _ntry_limit;
125  int _worker_life;
126 
127  string _title;
128  string _jobs_list_file;
129  string _parameters;
130  vector<string> _table_list;
131  int _table_numof_fields;
132 
133  char *_host_names;
134  TaskLog *_task_logs;
135  RankLog *_rank_logs;
136 
137  ofstream _logging;
138 
139  private:
140  /* Disallow copying: */
141  MPIDP(MPIDP &dp) {}
142  const MPIDP & operator=(const MPIDP &dp);
143 
144  public:
145  MPIDP() {
146  _host_names = 0;
147  _task_logs = 0;
148  _rank_logs = 0;
149  }
150 
151  virtual ~MPIDP() {
152  if (_host_names != 0) {
153  delete _host_names;
154  _host_names = 0;
155  }
156  if (_task_logs != 0) {
157  delete [] _task_logs;
158  _task_logs = 0;
159  }
160  if (_rank_logs != 0) {
161  delete [] _rank_logs;
162  _rank_logs = 0;
163  }
164  }
165 
166  virtual void check_command_line();
167  virtual void read_jobs_list();
168  virtual void put_task_list(KMR_KVS *kvs);
169  virtual vector<string> make_argv_for_task(int index, int retry);
170  virtual void put_conf(KMR_KVS *confkvs);
171  virtual void copy_conf(KMR_KVS *confkvs);
172  virtual void start_task(struct kmr_kv_box kv, const KMR_KVS *kvs,
173  KMR_KVS *kvo);
174  virtual void collect_results(const struct kmr_kv_box kv[], const long n,
175  const KMR_KVS *kvs, KMR_KVS *kvo);
176  virtual void write_report(ofstream &logging);
177 };
178 
179 /* String filled for checking initialized parameter list. */
180 
181 static const char *dummy_string = "MPIDP";
182 
183 string erase_spaces(const string &s0, const size_t len);
184 string replace_pattern(const string &s, const string &key,
185  const string &value);
186 
187 extern "C" int gather_names(const struct kmr_kv_box kv[], const long n,
188  const KMR_KVS *kvs, KMR_KVS *kvo, void *p);
189 extern "C" int setup_rank0(const struct kmr_kv_box kv,
190  const KMR_KVS *kvs, KMR_KVS *kvo, void *p,
191  const long i);
192 extern "C" int list_tasks_rank0(const struct kmr_kv_box kv,
193  const KMR_KVS *kvs, KMR_KVS *kvo, void *p,
194  const long i);
195 extern "C" int start_task(struct kmr_kv_box kv, const KMR_KVS *kvs,
196  KMR_KVS *kvo, void *dp_, const long i);
197 extern "C" int collect_results(const struct kmr_kv_box kv[], const long n,
198  const KMR_KVS *kvs, KMR_KVS *kvo, void *dp_);
199 extern "C" int write_report(const struct kmr_kv_box kv,
200  const KMR_KVS *kvs, KMR_KVS *kvo, void *p,
201  const long i);
202 
203 /** A pointer to an MPIDP object for debugging. */
204 
206 
207 /** A pointer to an embedded KMR context for debugging. */
208 
209 extern "C" KMR *kmr_dp_mr;
210 KMR *kmr_dp_mr = 0;
211 
212 /** Initializes MPI and then starts the application. */
213 
214 int main(int argc, char *argv[])
215 {
216  int cc;
217  MPIDP mpidp;
218  MPIDP *dp = &mpidp;
219  double stime, etime;
220  int len;
221  char name[MPI_MAX_PROCESSOR_NAME];
222 
223  kmr_dp = dp;
224 
225  /* Start MPI. */
226 
227  int thlv;
228  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
229  if (thlv == MPI_THREAD_SINGLE) {
230  cerr << "[Warning] MPI thread support is single" << endl;
231  }
232  int nprocs, rank;
233  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
234  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
235  stime = MPI_Wtime();
236 
237  dp->_argc = argc;
238  dp->_argv = argv;
239 
240  cc = kmr_init();
241  assert(cc == MPI_SUCCESS);
242  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
243  dp->_mr = mr;
244  kmr_dp_mr = mr;
245 
246  MPI_Get_processor_name(name, &len);
247  KMR_KVS *namekvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
248  struct kmr_kv_box kv;
249  kv.klen = (int)sizeof(long);
250  kv.k.i = 0;
251  kv.vlen = (int)sizeof(MPI_MAX_PROCESSOR_NAME);
252  kv.v.p = name;
253  cc = kmr_add_kv(namekvs0, kv);
254  assert(cc == MPI_SUCCESS);
255  cc = kmr_add_kv_done(namekvs0);
256  assert(cc == MPI_SUCCESS);
257 
258  KMR_KVS *namekvs = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
259  struct kmr_option opt0 = kmr_noopt;
260  opt0.key_as_rank = 1;
261  cc = kmr_shuffle(namekvs0, namekvs, opt0);
262  assert(cc == MPI_SUCCESS);
263  cc = kmr_reduce(namekvs, 0, dp, kmr_noopt, gather_names);
264  assert(cc == MPI_SUCCESS);
265 
266  /* Read configuration on the master process then copy. */
267 
268  KMR_KVS *confkvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
269  cc = kmr_map_on_rank_zero(confkvs0, dp, kmr_noopt, setup_rank0);
270  assert(cc == MPI_SUCCESS);
271 
272  KMR_KVS *confkvs = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
273  cc = kmr_replicate(confkvs0, confkvs, kmr_noopt);
274  assert(cc == MPI_SUCCESS);
275 
276  dp->copy_conf(confkvs);
277  cc = kmr_free_kvs(confkvs);
278  assert(cc == MPI_SUCCESS);
279 
280  KMR_KVS *taskskvs = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
281  cc = kmr_map_on_rank_zero(taskskvs, dp, kmr_noopt, list_tasks_rank0);
282  assert(cc == MPI_SUCCESS);
283 
284  /*if (rank == 0) {printf("setup done\n"); fflush(stdout);}*/
285 
286  MPI_Barrier(MPI_COMM_WORLD);
287  if (application_init != 0) {
288  double t0 = MPI_Wtime();
289  (*application_init)(argc, argv);
290  MPI_Barrier(MPI_COMM_WORLD);
291  double t1 = MPI_Wtime();
292  if (mr->trace_kmrdp && rank == 0) {
293  fprintf(stderr, ";;KMR kmrdp:init() (%f msec)\n",
294  ((t1 - t0) * 1e3));
295  fflush(0);
296  }
297  }
298 
299  double tbody0 = MPI_Wtime();
300 
301  KMR_KVS *resultskvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
302  do {
303  struct kmr_option opt = kmr_noopt;
304  opt.nothreading = 1;
305  cc = kmr_map_ms(taskskvs, resultskvs0, dp, opt, start_task);
306  } while (cc == MPI_ERR_ROOT);
307 
308  MPI_Barrier(MPI_COMM_WORLD);
309  double tbody1 = MPI_Wtime();
310 
311  if (mr->trace_kmrdp && rank == 0) {
312  fprintf(stderr, ";;KMR kmrdp:application() (%f msec)\n",
313  ((tbody1 - tbody0) * 1e3));
314  fflush(stdout);
315  }
316 
317  if (application_fin != 0) {
318  double t0 = MPI_Wtime();
319  (*application_fin)(argc, argv);
320  MPI_Barrier(MPI_COMM_WORLD);
321  double t1 = MPI_Wtime();
322  if (mr->trace_kmrdp && rank == 0) {
323  fprintf(stderr, ";;KMR kmrdp:fin() (%f msec)\n",
324  ((t1 - t0) * 1e3));
325  fflush(stdout);
326  }
327  }
328 
329  /*if (rank == 0) {printf("map done\n"); fflush(stdout);}*/
330 
331  MPI_Barrier(MPI_COMM_WORLD);
332  /*if (rank == 0) {printf("kv_stats:\n");}*/
333  /*kmr_dump_kvs_stats(resultskvs0, 1);*/
334  /*fflush(stdout);*/
335 
336  KMR_KVS *resultskvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
337  struct kmr_option opt1 = kmr_noopt;
338  opt1.key_as_rank = 1;
339  cc = kmr_shuffle(resultskvs0, resultskvs1, opt1);
340  assert(cc == MPI_SUCCESS);
341  cc = kmr_reduce(resultskvs1, 0, dp, kmr_noopt, collect_results);
342  assert(cc == MPI_SUCCESS);
343 
344  /*if (rank == 0) {printf("reduce done\n"); fflush(stdout);}*/
345 
346  KMR_KVS *nonekvs = kmr_create_kvs(mr, KMR_KV_BAD, KMR_KV_BAD);
347  cc = kmr_map_on_rank_zero(nonekvs, dp, kmr_noopt, write_report);
348  assert(cc == MPI_SUCCESS);
349  cc = kmr_free_kvs(nonekvs);
350  assert(cc == MPI_SUCCESS);
351 
352  etime = MPI_Wtime();
353 
354  if (rank == 0) {
355  dp->_logging << "\nElapsed time = "
356  << etime - stime << " sec." << endl;
357  }
358 
359  cc = kmr_free_context(mr);
360  assert(cc == MPI_SUCCESS);
361  dp->_mr = 0;
362 
363  MPI_Finalize();
364 
365  return 0;
366 }
367 
368 /** Parses an integer string. It is a safe "atoi". */
369 
370 static int safe_atoi(const char *s) {
371  char gomi;
372  int v, cc;
373  xassert(s != 0 && s[0] != 0);
374  cc = sscanf(s, "%d%c", &v, &gomi);
375  xassert(cc == 1);
376  return v;
377 }
378 
379 /** Replaces the KEY by the VALUE in the source string S. */
380 
381 string replace_pattern(const string &s, const string &key,
382  const string &value)
383 {
384  string r;
385  size_t pos = 0;
386  size_t lastpos = 0;
387  while ((pos = s.find(key, pos)) != std::string::npos) {
388  r.append(s, lastpos, (pos - lastpos));
389  r.append(value);
390  pos += key.length();
391  lastpos = pos;
392  }
393  r.append(s, lastpos, (s.length() - lastpos));
394  return r;
395 }
396 
397 /** Deletes white-spaces in the string appearing within LEN characters
398  from the start (counting after removing spaces). */
399 
400 string erase_spaces(const string &s0, const size_t len)
401 {
402  string s = s0;
403  size_t i = 0;
404  while (i < s.length() && i < len) {
405  if (s[i] == ' ') {
406  s.erase(i, 1);
407  } else {
408  i++;
409  }
410  }
411  return s;
412 }
413 
414 /** Collects host names on all nodes. It is a reduce-function. It in
415  effect runs only on rank#0. */
416 
417 extern "C"
418 int gather_names(const struct kmr_kv_box kv[], const long n,
419  const KMR_KVS *kvs, KMR_KVS *kvo, void *dp_)
420 {
421  /*MPIDP *dp = reinterpret_cast<MPIDP*>(dp_);*/
422  MPIDP *dp = (MPIDP *)dp_;
423  int nprocs = kvs->c.mr->nprocs;
424  int rank = kvs->c.mr->rank;
425  xassert(n == nprocs && rank == 0);
426  dp->_host_names = new char [nprocs * MPI_MAX_PROCESSOR_NAME];
427  for (int i = 0; i < n; i++) {
428  strncpy(&(dp->_host_names[i * MPI_MAX_PROCESSOR_NAME]),
429  (char *)kv[i].v.p, MPI_MAX_PROCESSOR_NAME);
430  }
431  return MPI_SUCCESS;
432 }
433 
434 /** Reads the command-line options and the jobs-list table, and then
435  puts the parameters into the KVS. It is a map-function, and runs
436  only on rank#0. */
437 
438 extern "C"
439 int setup_rank0(const struct kmr_kv_box kv,
440  const KMR_KVS *kvs, KMR_KVS *kvo, void *dp_, const long i)
441 {
442  /*MPIDP *dp = reinterpret_cast<MPIDP*>(dp_);*/
443  MPIDP *dp = (MPIDP *)dp_;
444  xassert(kvs == 0 && kv.klen == 0 && kv.vlen == 0);
445  dp->check_command_line();
446  dp->read_jobs_list();
447  dp->put_conf(kvo);
448  return MPI_SUCCESS;
449 }
450 
451 /** Checks the command-line options. */
452 
454 {
455  /* Give initial values to variables; overwritten later. */
456 
457  _title = "MPIDP ";
458  _title += MPIDP_VERSION;
459  _parameters = dummy_string;
460  _out_file_position = 0;
461  _ntry_limit = 10;
462  _worker_life = 3;
463 
464  /* Open log file. */
465 
466  string logfile = "./mpidp.log";
467  for (int i = 1; i < _argc; i++) {
468  if (strncmp(_argv[i], "-lg", 3) == 0) {
469  logfile = _argv[++i];
470  }
471  }
472  _logging.open(logfile.c_str());
473  if (!_logging) {
474  cerr << "[ERROR] Unable to open log file (" << logfile << ")"
475  << endl;
476  MPI_Abort(MPI_COMM_WORLD, 1);
477  exit(1);
478  }
479 
480  _logging << "MPIDP ver. " << MPIDP_VERSION << " (KMR-DP)" << endl;
481  _logging << "mpidp@bi.cs.titech.ac.jp last updated: "
482  << MPIDP_LASTUPDATED << endl << endl;
483  _logging << "#Ranks = " << _mr->nprocs << endl;
484 
485  /* Count #ranks on the same rank#0 host. */
486 
487  int nprocess = 1;
488  string *hosts = new string [_mr->nprocs];
489  hosts[0] = &_host_names[0];
490  for (int i = 1; i < _mr->nprocs; i++) {
491  hosts[i] = &_host_names[i * MPI_MAX_PROCESSOR_NAME];
492  if (hosts[i] == hosts[0]) {
493  nprocess++;
494  } else {
495  break;
496  }
497  }
498  delete [] hosts;
499 
500  _logging << "#Nodes = " << _mr->nprocs/nprocess
501  << " (#Rank/#Nodes = " << nprocess << ")" << endl;
502 
503  _logging << endl << "Used nodes - name(rank) :";
504  for (int i = 0; i < _mr->nprocs; i++) {
505  if (i % 5 == 0) {
506  _logging << endl;
507  }
508  _logging << " " << &_host_names[i * MPI_MAX_PROCESSOR_NAME]
509  << "(" << i << ")";
510  }
511  _logging << endl << endl;
512  _logging.flush();
513 
514  /* Check command-line options and save remainings in _appargv. */
515 
516  _appargv.push_back(_argv[0]);
517  for (int i = 1; i < _argc; i++) {
518  if (strncmp(_argv[i], "-tb", 3) == 0) {
519  i++;
520  _jobs_list_file = _argv[i];
521  _logging << "Table file : -tb " << _jobs_list_file << endl;
522  } else if (strncmp(_argv[i], "-ot", 3) == 0) {
523  i++;
524  _out_file_position = safe_atoi(_argv[i]);
525  _logging << "Output option : -ot " << _out_file_position << endl;
526  } else if (strncmp(_argv[i], "-rt", 3) == 0) {
527  i++;
528  _ntry_limit = safe_atoi(_argv[i]);
529  _logging << "Retry option : -rt " << _ntry_limit << endl;
530  } else if (strncmp(_argv[i], "-wl", 3) == 0) {
531  i++;
532  _worker_life = safe_atoi(_argv[i]);
533  _logging << "Worker life : -wl " << _worker_life << endl;
534  } else if (strncmp(_argv[i], "-pg", 3) == 0) {
535  i++;
536  _logging << "Program name : -pg " << _argv[i] << endl;
537  } else if (strncmp(_argv[i], "-lg", 3) == 0) {
538  i++;
539  _logging << "Log file : -lg " << _argv[i] << endl;
540  } else {
541  _appargv.push_back(_argv[i]);
542  }
543  }
544 
545  if (_appargv.size() > 1) {
546  _logging << "Other options :";
547  for (size_t i = 1; i < _appargv.size(); i++) {
548  _logging << " " << _appargv[i];
549  }
550  _logging << endl;
551  }
552  _logging << endl;
553 }
554 
555 /** Puts the run conditions into the KVS, which will be copied to all
556  processes. It runs only on rank#0. (There is nothing to be
557  copied, currently). */
558 
560 {
561  xassert(kvs->c.mr->rank == 0);
562 # if 0
563  const char *k;
564  const char *v;
565  char sbuf[256];
566 
567  k = "_out_file_position";
568  snprintf(sbuf, 256, "%d", _out_file_position);
569  v = sbuf;
570  cc = kmr_add_string(kvs, k, v);
571  assert(cc == MPI_SUCCESS);
572 #endif
573 }
574 
575 /** Initializes the MPI-DP object by copying run conditions stored as
576  the key-value pairs. Copying on rank#0 is totally redundant, as
577  overwriting fields by the same contents. (There is nothing to be
578  copied, currently). */
579 
580 void MPIDP::copy_conf(KMR_KVS *confkvs)
581 {
582 #if 0
583  const char *s;
584  int cc = kmr_find_string(confkvs, "_out_file_position", &s);
585  xassert(cc == MPI_SUCCESS);
586  _out_file_position = safe_atoi(s);
587 #endif
588 }
589 
590 /** Opens and reads jobs-list file. */
591 
593 {
594  if (_jobs_list_file.length() == 0) {
595  cerr << "[ERROR] No table file specified" << endl;
596  MPI_Abort(MPI_COMM_WORLD, 1);
597  exit(1);
598  }
599  ifstream input(_jobs_list_file.c_str(), ios::in);
600  if (!input) {
601  cerr << "[ERROR] Unable to open table file (" << _jobs_list_file
602  << ")" << endl;
603  MPI_Abort(MPI_COMM_WORLD, 1);
604  exit(1);
605  }
606 
607  for (;;) {
608  string table;
609  if (!getline(input, table)) {
610  break;
611  }
612  table = erase_spaces(table, 7);
613  if (table.length() == 0) {
614  /* Skip an empty line. */
615  } else if (string("%#").find_first_of(table[0]) != string::npos) {
616  /* Skip a comment line. */
617  } else if (strncasecmp(table.c_str(), "TITLE=", 6) == 0) {
618  _title = table.substr(6);
619  _logging << "TITLE=" << _title << endl;
620  } else if (strncasecmp(table.c_str(), "PARAM=", 6) == 0) {
621  _parameters = table.substr(6);
622  _logging << "PARAM=" << _parameters << endl;
623  } else {
624  _table_list.push_back(table);
625  //csize = max(table.length(), csize);
626  }
627  }
628  _logging << endl;
629 
630  input.close();
631 
632  if (strncmp(_parameters.c_str(), dummy_string, 5) == 0) {
633  cerr << "[ERROR] PARAM= field not found in table file" << endl;
634  MPI_Abort(MPI_COMM_WORLD, 1);
635  exit(1);
636  }
637 
638  int nfields = 1;
639  for (size_t i = 0; i < _table_list[0].length(); i++) {
640  if (_table_list[0][i] == '\t') {
641  nfields++;
642  }
643  }
644  _table_numof_fields = nfields;
645 }
646 
647 /** Puts tasks in the KVS. It is a map-function, and runs only on
648  rank#0. */
649 
650 extern "C"
651 int list_tasks_rank0(const struct kmr_kv_box kv,
652  const KMR_KVS *kvs, KMR_KVS *kvo, void *dp_, const long i)
653 {
654  MPIDP *dp = (MPIDP *)dp_;
655  xassert(kvs == 0 && kv.klen == 0 && kv.vlen == 0);
656  xassert(kvo->c.key_data == KMR_KV_INTEGER
657  && kvo->c.value_data == KMR_KV_OPAQUE);
658  dp->put_task_list(kvo);
659  return MPI_SUCCESS;
660 }
661 
662 /** Puts task entries of TABLE_LIST into KVS, after substituting
663  variables in parameters and packing argv in a single string. */
664 
666 {
667  for (int i = 0; i < (int)_table_list.size(); i++) {
668  vector<string> newargv = make_argv_for_task(i, 0);
669 
670  size_t sz = 0;
671  for (size_t j = 0; j < newargv.size(); j++) {
672  sz += (newargv[j].length() + 1);
673  }
674  sz += 1;
675  char *cargv = new char [sz];
676  size_t po = 0;
677  for (size_t j = 0; j < newargv.size(); j++) {
678  strncpy(&cargv[po], newargv[j].c_str(), (newargv[j].length() + 1));
679  po += (newargv[j].length() + 1);
680  }
681  xassert((po + 1) == sz);
682  cargv[po] = 0;
683 
684  struct kmr_kv_box kv;
685  kv.klen = (int)sizeof(long);
686  kv.k.i = i;
687  kv.vlen = (int)sz;
688  kv.v.p = cargv;
689  kmr_add_kv(kvs, kv);
690 
691  delete cargv;
692  }
693 }
694 
695 /** Makes argument-list (argv) from a task list entry, by substituting
696  variables in parameters. The first argument is used for the
697  output file name (or a string "-" if an output file position is
698  not specified), and it should be removed when passing to the
699  application. Non-zero RETRY is meaningless. IT COPIES THE
700  RESULT. */
701 
702 vector<string> MPIDP::make_argv_for_task(int index, int retry)
703 {
704  xassert(0 <= retry && retry <= 999);
705 
706  string fields = _table_list[index];
707  char *cfields = new char [fields.length() + 1];
708  strncpy(cfields, fields.c_str(), (fields.length() + 1));
709 
710  string outputfilename = "-";
711  string newparams = _parameters;
712  int i = 0;
713  for (char *e = strtok(cfields, "\t"); e != 0; e = strtok(NULL, "\t")) {
714  xassert(i < _table_numof_fields);
715  char key[5];
716  snprintf(key, 5, "$%d", (i + 1));
717  string k = key;
718  string v = e;
719  if (_out_file_position == (i + 1)) {
720  if (retry != 0) {
721  char tag[5];
722  snprintf(tag, 5, ".%d", retry);
723  v += tag;
724  }
725  outputfilename = v;
726  }
727  newparams = replace_pattern(newparams, k, v);
728  i++;
729  }
730  xassert(i == _table_numof_fields);
731 
732  char *cparams = new char [newparams.length() + 1];
733  strncpy(cparams, newparams.c_str(), (newparams.length() + 1));
734 
735  vector<string> newargv;
736  newargv.push_back(outputfilename);
737  newargv.insert(newargv.end(), _appargv.begin(), _appargv.end());
738  for (char *e = strtok(cparams, " "); e != 0; e = strtok(NULL, " ")) {
739  newargv.push_back(e);
740  }
741 
742  delete cfields;
743  delete cparams;
744 
745  return newargv;
746 }
747 
748 /** Starts an application using argv passed in value data. It is a
749  map-function. */
750 
751 extern "C"
752 int start_task(struct kmr_kv_box kv, const KMR_KVS *kvs,
753  KMR_KVS *kvo, void *dp_, const long i)
754 {
755  MPIDP *dp = (MPIDP *)dp_;
756  dp->start_task(kv, kvs, kvo);
757  return MPI_SUCCESS;
758 }
759 
760 /* Counts argv list. */
761 
762 static int count_args(char *args, size_t len)
763 {
764  int wargc = 0;
765  size_t po = 0;
766  for (;;) {
767  xassert(po < len);
768  size_t z = strlen(&args[po]);
769  if (z == 0) {
770  break;
771  }
772  wargc++;
773  po += (z + 1);
774  }
775  xassert((po + 1) == len);
776  return wargc;
777 }
778 
779 /* Scans and fills argv list. */
780 
781 static void scan_args(char **wargv, int wargc, char *args, size_t len)
782 {
783  size_t po = 0;
784  for (int i = 0; i < wargc; i++) {
785  xassert(po < len);
786  wargv[i] = &args[po];
787  size_t z = strlen(&args[po]);
788  xassert(z != 0);
789  po += (z + 1);
790  }
791  xassert((po + 1) == len);
792 }
793 
794 /** Starts an application using argv in value data. Note that the
795  passed argv has an extra entry indicating the output file name in
796  the first, and it is dropped. */
797 
798 void MPIDP::start_task(struct kmr_kv_box kv, const KMR_KVS *kvs, KMR_KVS *kvo)
799 {
800  int rank = kvs->c.mr->rank;
801  xassert(rank != 0);
802  xassert(kvs->c.key_data == KMR_KV_INTEGER
803  && kvs->c.value_data == KMR_KV_OPAQUE);
804 
805  int taskno = (int)kv.k.i;
806  char *cargv = (char *)kv.v.p;
807 
808  int wargc = count_args(cargv, (size_t)kv.vlen);
809  xassert(wargc > 1);
810  char **wargv = new char * [wargc];
811  scan_args(wargv, wargc, cargv, (size_t)kv.vlen);
812 
813  string outputfilename = wargv[0];
814  int xargc = wargc - 1;
815  char **xargv = wargv + 1;
816 
817  if (0) {
818  string fields;
819  fields += xargv[0];
820  for (int i = 1; i < xargc; i++) {
821  fields += " ";
822  fields += xargv[i];
823  }
824  printf("Running task=%d on rank=%d param=%s\n",
825  taskno, rank, fields.c_str());
826  fflush(stdout);
827  }
828 
829  /* RUN THE APPLICATION. */
830 
831  xassert(taskno < (1 << 20));
832 
833  TaskRecord tc;
834  tc.taskno = taskno;
835  tc.rank = rank;
836  struct stat buf;
837  int cc;
838  try {
839  cc = application(xargc, xargv);
840  tc.result = cc;
841  } catch (char *e) {
842  cerr << "[ERROR] application exception : " << e << endl;
843  tc.result = -1;
844  }
845 
846  if (outputfilename != "-") {
847  cc = stat(outputfilename.c_str(), &buf);
848  tc.fileok = ((cc == 0) ? 1 : -1);
849  } else {
850  tc.fileok = 0;
851  }
852 
853  long zero = 0;
854  cc = kmr_add_kv1(kvo, &zero, (int)sizeof(long), &tc, (int)sizeof(tc));
855  assert(cc == MPI_SUCCESS);
856 
857  delete wargv;
858 }
859 
860 /** Collects result status in ir[] on rank#0. It is a
861  reduce-function. It in effect runs only on rank#0. */
862 
863 extern "C"
864 int collect_results(const struct kmr_kv_box kv[], const long n,
865  const KMR_KVS *kvs, KMR_KVS *kvo, void *dp_)
866 {
867  MPIDP *dp = (MPIDP *)dp_;
868  dp->collect_results(kv, n, kvs, kvo);
869  return MPI_SUCCESS;
870 }
871 
872 /** Collects result status in ir[] on rank#0. */
873 
874 void MPIDP::collect_results(const struct kmr_kv_box kv[], const long n,
875  const KMR_KVS *kvs, KMR_KVS *kvo)
876 {
877  xassert(kvo == 0);
878  xassert(kvs->c.mr->rank == 0);
879  xassert((size_t)n == _table_list.size());
880 
881  xassert(_task_logs == 0);
882  _task_logs = new TaskLog [n];
883 
884  xassert(_rank_logs == 0);
885  int nprocs = kvs->c.mr->nprocs;
886  _rank_logs = new RankLog [nprocs];
887 
888  for (int i = 0; i < n; i++) {
889  _task_logs[i].try_count = 0;
890  }
891  for (int i = 0; i < n; i++) {
892  xassert(kv[i].klen == sizeof(long)
893  && kv[i].vlen == sizeof(TaskRecord));
894  //long k = kv[i].k.i;
895  TaskRecord tc = *((TaskRecord *)kv[i].v.p);
896  _task_logs[i].records.push_back(tc);
897  _rank_logs[tc.rank].records.push_back(tc);
898  }
899 }
900 
901 /** Writes jobs and workers report in log. It is a map-function, and
902  runs only on rank#0. */
903 
904 extern "C"
905 int write_report(const struct kmr_kv_box kv,
906  const KMR_KVS *kvs, KMR_KVS *kvo, void *dp_, const long i)
907 {
908  xassert(kvs == 0 && kv.klen == 0 && kv.vlen == 0);
909  MPIDP *dp = (MPIDP *)dp_;
910  dp->write_report(dp->_logging);
911  return MPI_SUCCESS;
912 }
913 
914 /** Writes jobs and workers report in log. */
915 
916 void MPIDP::write_report(ofstream &logging)
917 {
918  size_t ntasks = _table_list.size();
919 
920  /* Write result for tasks. */
921 
922  logging << "JOB table :" << endl;
923  for (size_t i = 0; i < ntasks; i++) {
924  xassert(_task_logs[i].records.size() > 0);
925  logging << _task_logs[i].records[0].taskno
926  << " EXEC=" << _task_logs[i].try_count;
927  for (size_t j = 0; j < _task_logs[i].records.size(); j++) {
928  TaskRecord tc = _task_logs[i].records[j];
929  logging << " (WID=" << tc.taskno;
930  logging << " END=" << 1;
931  logging << " RET=" << tc.result;
932  logging << " FILE=" << tc.fileok << ")";
933  }
934  logging << endl;
935  }
936 
937  /* Write result for ranks. */
938 
939  int nprocs = _mr->nprocs;
940  logging << "\nWorker table :" << endl;
941  for (int i = 1; i < nprocs; i++) {
942  logging << i << "\t" << _rank_logs[i].records.size();
943  for (size_t j = 0; j < _rank_logs[i].records.size(); j++) {
944  TaskRecord tc = _rank_logs[i].records[j];
945  logging << "\t" << tc.taskno << "\t" << tc.result;
946  }
947  logging << endl;
948  }
949 
950  return;
951 }
952 
953 /*
954 Copyright (C) 2012-2018 RIKEN R-CCS
955 This library is distributed WITHOUT ANY WARRANTY. This library can be
956 redistributed and/or modified under the terms of the BSD 2-Clause License.
957 */
958 
959 /* Local Variables: */
960 /* c-basic-offset: 4 */
961 /* End: */
KMR * kmr_dp_mr
A pointer to an embedded KMR context for debugging.
Definition: kmrdp.cpp:209
Key-Value Stream (abstract).
Definition: kmr.h:632
virtual void copy_conf(KMR_KVS *confkvs)
Initializes the MPI-DP object by copying run conditions stored as the key-value pairs.
Definition: kmrdp.cpp:580
Per-Task Log (for MPIDP).
Definition: kmrdp.cpp:93
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:658
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:809
int gather_names(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
Collects host names on all nodes.
Definition: kmrdp.cpp:418
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
virtual void start_task(struct kmr_kv_box kv, const KMR_KVS *kvs, KMR_KVS *kvo)
Starts an application using argv in value data.
Definition: kmrdp.cpp:798
int write_report(const struct kmr_kv_box kv, const KMR_KVS *kvs, KMR_KVS *kvo, void *p, const long i)
Writes jobs and workers report in log.
Definition: kmrdp.cpp:905
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2094
Task Log (for MPIDP).
Definition: kmrdp.cpp:84
int main(int argc, char *argv[])
Initializes MPI and then starts the application.
Definition: kmrdp.cpp:214
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:939
KMR Context.
Definition: kmr.h:247
int kmr_add_kv1(KMR_KVS *kvs, void *k, int klen, void *v, int vlen)
Adds a key-value pair as given directly by a pointer.
Definition: kmrbase.c:837
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:679
int list_tasks_rank0(const struct kmr_kv_box kv, const KMR_KVS *kvs, KMR_KVS *kvo, void *p, const long i)
Puts tasks in the KVS.
Definition: kmrdp.cpp:651
int kmr_find_string(KMR_KVS *kvi, const char *k, const char **vq)
Finds the key K in the key-value stream KVS.
Definition: kmrmoreops.c:73
Per-Rank Worker Log (for MPIDP).
Definition: kmrdp.cpp:100
virtual vector< string > make_argv_for_task(int index, int retry)
Makes argument-list (argv) from a task list entry, by substituting variables in parameters.
Definition: kmrdp.cpp:702
Handy Copy of a Key-Value Field.
Definition: kmr.h:401
int collect_results(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvs, KMR_KVS *kvo, void *dp_)
Collects result status in ir[] on rank#0.
Definition: kmrdp.cpp:864
#define kmr_init()
Sets up the environment.
Definition: kmr.h:794
int setup_rank0(const struct kmr_kv_box kv, const KMR_KVS *kvs, KMR_KVS *kvo, void *p, const long i)
Reads the command-line options and the jobs-list table, and then puts the parameters into the KVS...
Definition: kmrdp.cpp:439
virtual void check_command_line()
Checks the command-line options.
Definition: kmrdp.cpp:453
MPIDP * kmr_dp
A pointer to an MPIDP object for debugging.
Definition: kmrdp.cpp:205
virtual void collect_results(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvs, KMR_KVS *kvo)
Collects result status in ir[] on rank#0.
Definition: kmrdp.cpp:874
static int safe_atoi(const char *s)
Parses an integer string.
Definition: kmrdp.cpp:370
#define xassert(X)
Asserts and aborts, but it cannot be disabled.
Definition: kmrdp.cpp:51
string replace_pattern(const string &s, const string &key, const string &value)
Replaces the KEY by the VALUE in the source string S.
Definition: kmrdp.cpp:381
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:367
KMR Interface.
int kmr_map_ms(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps in master-worker mode.
Definition: kmrmapms.c:344
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2240
int application(int argc, char *argv[])
Application Code Entry.
Definition: testdp.c:38
int kmr_add_string(KMR_KVS *kvs, const char *k, const char *v)
Adds a key-value pair of strings.
Definition: kmrbase.c:971
A Tool to Run Tasks under MPI.
Definition: kmrdp.cpp:113
string erase_spaces(const string &s0, const size_t len)
Deletes white-spaces in the string appearing within LEN characters from the start (counting after rem...
Definition: kmrdp.cpp:400
virtual void put_conf(KMR_KVS *confkvs)
Puts the run conditions into the KVS, which will be copied to all processes.
Definition: kmrdp.cpp:559
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
Definition: kmrbase.c:1514
virtual void read_jobs_list()
Opens and reads jobs-list file.
Definition: kmrdp.cpp:592
virtual void put_task_list(KMR_KVS *kvs)
Puts task entries of TABLE_LIST into KVS, after substituting variables in parameters and packing argv...
Definition: kmrdp.cpp:665
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:168
int start_task(struct kmr_kv_box kv, const KMR_KVS *kvs, KMR_KVS *kvo, void *dp_, const long i)
Starts an application using argv passed in value data.
Definition: kmrdp.cpp:752
virtual void write_report(ofstream &logging)
Writes jobs and workers report in log.
Definition: kmrdp.cpp:916