22 #define MPIDP_VERSION "1.0.3" 23 #define MPIDP_LASTUPDATED "2011/12/12" 54 "%s:%d: Assertion '%s' failed.\n", \ 55 __FILE__, __LINE__, #X), \ 56 (void)MPI_Abort(MPI_COMM_WORLD, 1))) 61 "Assertion failed: %s, file %s, line %d\n", \ 62 #X, __FILE__, __LINE__), \ 63 (void)MPI_Abort(MPI_COMM_WORLD, 1))) 72 #if !defined(KMRDP) && (KMRDP == 0) 76 extern int (*application_init)(
int argc,
char *argv[]);
77 extern int (*application_fin)(
int argc,
char *argv[]);
94 vector<TaskRecord> records;
101 vector<TaskRecord> records;
121 vector<string> _appargv;
123 int _out_file_position;
128 string _jobs_list_file;
130 vector<string> _table_list;
131 int _table_numof_fields;
152 if (_host_names != 0) {
156 if (_task_logs != 0) {
157 delete [] _task_logs;
160 if (_rank_logs != 0) {
161 delete [] _rank_logs;
166 virtual void check_command_line();
167 virtual void read_jobs_list();
168 virtual void put_task_list(
KMR_KVS *kvs);
169 virtual vector<string> make_argv_for_task(
int index,
int retry);
170 virtual void put_conf(
KMR_KVS *confkvs);
171 virtual void copy_conf(
KMR_KVS *confkvs);
181 static const char *dummy_string =
"MPIDP";
183 string erase_spaces(
const string &s0,
const size_t len);
185 const string &value);
196 KMR_KVS *kvo,
void *dp_,
const long i);
214 int main(
int argc,
char *argv[])
221 char name[MPI_MAX_PROCESSOR_NAME];
228 MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
229 if (thlv == MPI_THREAD_SINGLE) {
230 cerr <<
"[Warning] MPI thread support is single" << endl;
233 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
234 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
241 assert(cc == MPI_SUCCESS);
246 MPI_Get_processor_name(name, &len);
249 kv.klen = (int)
sizeof(
long);
251 kv.vlen = (int)
sizeof(MPI_MAX_PROCESSOR_NAME);
254 assert(cc == MPI_SUCCESS);
256 assert(cc == MPI_SUCCESS);
260 opt0.key_as_rank = 1;
262 assert(cc == MPI_SUCCESS);
264 assert(cc == MPI_SUCCESS);
270 assert(cc == MPI_SUCCESS);
274 assert(cc == MPI_SUCCESS);
278 assert(cc == MPI_SUCCESS);
282 assert(cc == MPI_SUCCESS);
286 MPI_Barrier(MPI_COMM_WORLD);
287 if (application_init != 0) {
288 double t0 = MPI_Wtime();
289 (*application_init)(argc, argv);
290 MPI_Barrier(MPI_COMM_WORLD);
291 double t1 = MPI_Wtime();
292 if (mr->trace_kmrdp && rank == 0) {
293 fprintf(stderr,
";;KMR kmrdp:init() (%f msec)\n",
299 double tbody0 = MPI_Wtime();
306 }
while (cc == MPI_ERR_ROOT);
308 MPI_Barrier(MPI_COMM_WORLD);
309 double tbody1 = MPI_Wtime();
311 if (mr->trace_kmrdp && rank == 0) {
312 fprintf(stderr,
";;KMR kmrdp:application() (%f msec)\n",
313 ((tbody1 - tbody0) * 1e3));
317 if (application_fin != 0) {
318 double t0 = MPI_Wtime();
319 (*application_fin)(argc, argv);
320 MPI_Barrier(MPI_COMM_WORLD);
321 double t1 = MPI_Wtime();
322 if (mr->trace_kmrdp && rank == 0) {
323 fprintf(stderr,
";;KMR kmrdp:fin() (%f msec)\n",
331 MPI_Barrier(MPI_COMM_WORLD);
338 opt1.key_as_rank = 1;
340 assert(cc == MPI_SUCCESS);
342 assert(cc == MPI_SUCCESS);
348 assert(cc == MPI_SUCCESS);
350 assert(cc == MPI_SUCCESS);
355 dp->_logging <<
"\nElapsed time = " 356 << etime - stime <<
" sec." << endl;
360 assert(cc == MPI_SUCCESS);
374 cc = sscanf(s,
"%d%c", &v, &gomi);
387 while ((pos = s.find(key, pos)) != std::string::npos) {
388 r.append(s, lastpos, (pos - lastpos));
393 r.append(s, lastpos, (s.length() - lastpos));
404 while (i < s.length() && i < len) {
423 int nprocs = kvs->c.mr->nprocs;
424 int rank = kvs->c.mr->rank;
425 xassert(n == nprocs && rank == 0);
426 dp->_host_names =
new char [nprocs * MPI_MAX_PROCESSOR_NAME];
427 for (
int i = 0; i < n; i++) {
428 strncpy(&(dp->_host_names[i * MPI_MAX_PROCESSOR_NAME]),
429 (
char *)kv[i].v.p, MPI_MAX_PROCESSOR_NAME);
444 xassert(kvs == 0 && kv.klen == 0 && kv.vlen == 0);
458 _title += MPIDP_VERSION;
459 _parameters = dummy_string;
460 _out_file_position = 0;
466 string logfile =
"./mpidp.log";
467 for (
int i = 1; i < _argc; i++) {
468 if (strncmp(_argv[i],
"-lg", 3) == 0) {
469 logfile = _argv[++i];
472 _logging.open(logfile.c_str());
474 cerr <<
"[ERROR] Unable to open log file (" << logfile <<
")" 476 MPI_Abort(MPI_COMM_WORLD, 1);
480 _logging <<
"MPIDP ver. " << MPIDP_VERSION <<
" (KMR-DP)" << endl;
481 _logging <<
"mpidp@bi.cs.titech.ac.jp last updated: " 482 << MPIDP_LASTUPDATED << endl << endl;
483 _logging <<
"#Ranks = " << _mr->nprocs << endl;
488 string *hosts =
new string [_mr->nprocs];
489 hosts[0] = &_host_names[0];
490 for (
int i = 1; i < _mr->nprocs; i++) {
491 hosts[i] = &_host_names[i * MPI_MAX_PROCESSOR_NAME];
492 if (hosts[i] == hosts[0]) {
500 _logging <<
"#Nodes = " << _mr->nprocs/nprocess
501 <<
" (#Rank/#Nodes = " << nprocess <<
")" << endl;
503 _logging << endl <<
"Used nodes - name(rank) :";
504 for (
int i = 0; i < _mr->nprocs; i++) {
508 _logging <<
" " << &_host_names[i * MPI_MAX_PROCESSOR_NAME]
511 _logging << endl << endl;
516 _appargv.push_back(_argv[0]);
517 for (
int i = 1; i < _argc; i++) {
518 if (strncmp(_argv[i],
"-tb", 3) == 0) {
520 _jobs_list_file = _argv[i];
521 _logging <<
"Table file : -tb " << _jobs_list_file << endl;
522 }
else if (strncmp(_argv[i],
"-ot", 3) == 0) {
524 _out_file_position =
safe_atoi(_argv[i]);
525 _logging <<
"Output option : -ot " << _out_file_position << endl;
526 }
else if (strncmp(_argv[i],
"-rt", 3) == 0) {
529 _logging <<
"Retry option : -rt " << _ntry_limit << endl;
530 }
else if (strncmp(_argv[i],
"-wl", 3) == 0) {
533 _logging <<
"Worker life : -wl " << _worker_life << endl;
534 }
else if (strncmp(_argv[i],
"-pg", 3) == 0) {
536 _logging <<
"Program name : -pg " << _argv[i] << endl;
537 }
else if (strncmp(_argv[i],
"-lg", 3) == 0) {
539 _logging <<
"Log file : -lg " << _argv[i] << endl;
541 _appargv.push_back(_argv[i]);
545 if (_appargv.size() > 1) {
546 _logging <<
"Other options :";
547 for (
size_t i = 1; i < _appargv.size(); i++) {
548 _logging <<
" " << _appargv[i];
567 k =
"_out_file_position";
568 snprintf(sbuf, 256,
"%d", _out_file_position);
571 assert(cc == MPI_SUCCESS);
594 if (_jobs_list_file.length() == 0) {
595 cerr <<
"[ERROR] No table file specified" << endl;
596 MPI_Abort(MPI_COMM_WORLD, 1);
599 ifstream input(_jobs_list_file.c_str(), ios::in);
601 cerr <<
"[ERROR] Unable to open table file (" << _jobs_list_file
603 MPI_Abort(MPI_COMM_WORLD, 1);
609 if (!getline(input, table)) {
613 if (table.length() == 0) {
615 }
else if (
string(
"%#").find_first_of(table[0]) != string::npos) {
617 }
else if (strncasecmp(table.c_str(),
"TITLE=", 6) == 0) {
618 _title = table.substr(6);
619 _logging <<
"TITLE=" << _title << endl;
620 }
else if (strncasecmp(table.c_str(),
"PARAM=", 6) == 0) {
621 _parameters = table.substr(6);
622 _logging <<
"PARAM=" << _parameters << endl;
624 _table_list.push_back(table);
632 if (strncmp(_parameters.c_str(), dummy_string, 5) == 0) {
633 cerr <<
"[ERROR] PARAM= field not found in table file" << endl;
634 MPI_Abort(MPI_COMM_WORLD, 1);
639 for (
size_t i = 0; i < _table_list[0].length(); i++) {
640 if (_table_list[0][i] ==
'\t') {
644 _table_numof_fields = nfields;
655 xassert(kvs == 0 && kv.klen == 0 && kv.vlen == 0);
656 xassert(kvo->c.key_data == KMR_KV_INTEGER
657 && kvo->c.value_data == KMR_KV_OPAQUE);
667 for (
int i = 0; i < (int)_table_list.size(); i++) {
668 vector<string> newargv = make_argv_for_task(i, 0);
671 for (
size_t j = 0; j < newargv.size(); j++) {
672 sz += (newargv[j].length() + 1);
675 char *cargv =
new char [sz];
677 for (
size_t j = 0; j < newargv.size(); j++) {
678 strncpy(&cargv[po], newargv[j].c_str(), (newargv[j].length() + 1));
679 po += (newargv[j].length() + 1);
685 kv.klen = (int)
sizeof(
long);
704 xassert(0 <= retry && retry <= 999);
706 string fields = _table_list[index];
707 char *cfields =
new char [fields.length() + 1];
708 strncpy(cfields, fields.c_str(), (fields.length() + 1));
710 string outputfilename =
"-";
711 string newparams = _parameters;
713 for (
char *e = strtok(cfields,
"\t"); e != 0; e = strtok(NULL,
"\t")) {
714 xassert(i < _table_numof_fields);
716 snprintf(key, 5,
"$%d", (i + 1));
719 if (_out_file_position == (i + 1)) {
722 snprintf(tag, 5,
".%d", retry);
730 xassert(i == _table_numof_fields);
732 char *cparams =
new char [newparams.length() + 1];
733 strncpy(cparams, newparams.c_str(), (newparams.length() + 1));
735 vector<string> newargv;
736 newargv.push_back(outputfilename);
737 newargv.insert(newargv.end(), _appargv.begin(), _appargv.end());
738 for (
char *e = strtok(cparams,
" "); e != 0; e = strtok(NULL,
" ")) {
739 newargv.push_back(e);
753 KMR_KVS *kvo,
void *dp_,
const long i)
762 static int count_args(
char *args,
size_t len)
768 size_t z = strlen(&args[po]);
781 static void scan_args(
char **wargv,
int wargc,
char *args,
size_t len)
784 for (
int i = 0; i < wargc; i++) {
786 wargv[i] = &args[po];
787 size_t z = strlen(&args[po]);
800 int rank = kvs->c.mr->rank;
802 xassert(kvs->c.key_data == KMR_KV_INTEGER
803 && kvs->c.value_data == KMR_KV_OPAQUE);
805 int taskno = (int)kv.k.i;
806 char *cargv = (
char *)kv.v.p;
808 int wargc = count_args(cargv, (
size_t)kv.vlen);
810 char **wargv =
new char * [wargc];
811 scan_args(wargv, wargc, cargv, (
size_t)kv.vlen);
813 string outputfilename = wargv[0];
814 int xargc = wargc - 1;
815 char **xargv = wargv + 1;
820 for (
int i = 1; i < xargc; i++) {
824 printf(
"Running task=%d on rank=%d param=%s\n",
825 taskno, rank, fields.c_str());
842 cerr <<
"[ERROR] application exception : " << e << endl;
846 if (outputfilename !=
"-") {
847 cc = stat(outputfilename.c_str(), &buf);
848 tc.fileok = ((cc == 0) ? 1 : -1);
854 cc =
kmr_add_kv1(kvo, &zero, (
int)
sizeof(
long), &tc, (
int)
sizeof(tc));
855 assert(cc == MPI_SUCCESS);
879 xassert((
size_t)n == _table_list.size());
885 int nprocs = kvs->c.mr->nprocs;
886 _rank_logs =
new RankLog [nprocs];
888 for (
int i = 0; i < n; i++) {
889 _task_logs[i].try_count = 0;
891 for (
int i = 0; i < n; i++) {
892 xassert(kv[i].klen ==
sizeof(
long)
896 _task_logs[i].records.push_back(tc);
897 _rank_logs[tc.rank].records.push_back(tc);
908 xassert(kvs == 0 && kv.klen == 0 && kv.vlen == 0);
918 size_t ntasks = _table_list.size();
922 logging <<
"JOB table :" << endl;
923 for (
size_t i = 0; i < ntasks; i++) {
924 xassert(_task_logs[i].records.size() > 0);
925 logging << _task_logs[i].records[0].taskno
926 <<
" EXEC=" << _task_logs[i].try_count;
927 for (
size_t j = 0; j < _task_logs[i].records.size(); j++) {
929 logging <<
" (WID=" << tc.taskno;
930 logging <<
" END=" << 1;
931 logging <<
" RET=" << tc.result;
932 logging <<
" FILE=" << tc.fileok <<
")";
939 int nprocs = _mr->nprocs;
940 logging <<
"\nWorker table :" << endl;
941 for (
int i = 1; i < nprocs; i++) {
942 logging << i <<
"\t" << _rank_logs[i].records.size();
943 for (
size_t j = 0; j < _rank_logs[i].records.size(); j++) {
945 logging <<
"\t" << tc.taskno <<
"\t" << tc.result;
KMR * kmr_dp_mr
A pointer to an embedded KMR context for debugging.
Key-Value Stream (abstract).
virtual void copy_conf(KMR_KVS *confkvs)
Initializes the MPI-DP object by copying run conditions stored as the key-value pairs.
Per-Task Log (for MPIDP).
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Options to Mapping, Shuffling, and Reduction.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
int gather_names(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
Collects host names on all nodes.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
virtual void start_task(struct kmr_kv_box kv, const KMR_KVS *kvs, KMR_KVS *kvo)
Starts an application using argv in value data.
int write_report(const struct kmr_kv_box kv, const KMR_KVS *kvs, KMR_KVS *kvo, void *p, const long i)
Writes jobs and workers report in log.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
int main(int argc, char *argv[])
Initializes MPI and then starts the application.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
int kmr_add_kv1(KMR_KVS *kvs, void *k, int klen, void *v, int vlen)
Adds a key-value pair as given directly by a pointer.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
int list_tasks_rank0(const struct kmr_kv_box kv, const KMR_KVS *kvs, KMR_KVS *kvo, void *p, const long i)
Puts tasks in the KVS.
int kmr_find_string(KMR_KVS *kvi, const char *k, const char **vq)
Finds the key K in the key-value stream KVS.
Per-Rank Worker Log (for MPIDP).
virtual vector< string > make_argv_for_task(int index, int retry)
Makes argument-list (argv) from a task list entry, by substituting variables in parameters.
Handy Copy of a Key-Value Field.
int collect_results(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvs, KMR_KVS *kvo, void *dp_)
Collects result status in ir[] on rank#0.
#define kmr_init()
Sets up the environment.
int setup_rank0(const struct kmr_kv_box kv, const KMR_KVS *kvs, KMR_KVS *kvo, void *p, const long i)
Reads the command-line options and the jobs-list table, and then puts the parameters into the KVS...
virtual void check_command_line()
Checks the command-line options.
MPIDP * kmr_dp
A pointer to an MPIDP object for debugging.
virtual void collect_results(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvs, KMR_KVS *kvo)
Collects result status in ir[] on rank#0.
static int safe_atoi(const char *s)
Parses an integer string.
#define xassert(X)
Asserts and aborts, but it cannot be disabled.
string replace_pattern(const string &s, const string &key, const string &value)
Replaces the KEY by the VALUE in the source string S.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
int kmr_map_ms(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps in master-worker mode.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
int application(int argc, char *argv[])
Application Code Entry.
int kmr_add_string(KMR_KVS *kvs, const char *k, const char *v)
Adds a key-value pair of strings.
A Tool to Run Tasks under MPI.
string erase_spaces(const string &s0, const size_t len)
Deletes white-spaces in the string appearing within LEN characters from the start (counting after rem...
virtual void put_conf(KMR_KVS *confkvs)
Puts the run conditions into the KVS, which will be copied to all processes.
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
virtual void read_jobs_list()
Opens and reads jobs-list file.
virtual void put_task_list(KMR_KVS *kvs)
Puts task entries of TABLE_LIST into KVS, after substituting variables in parameters and packing argv...
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
int start_task(struct kmr_kv_box kv, const KMR_KVS *kvs, KMR_KVS *kvo, void *dp_, const long i)
Starts an application using argv passed in value data.
virtual void write_report(ofstream &logging)
Writes jobs and workers report in log.