67 #include <sys/types.h>    69 #include <sys/param.h>    82 #define ARGSTRLEN (8 * 1024)    88 #define DEFAULT_PROCS 1    91 #define TMPDIR_PREFIX "./KMRRUN_TMP"    96 static void kmrrun_abort(
int, 
const char *, ...);
    97 static int add_command_kv(
KMR_KVS *, 
int, 
char **, 
char *, 
int);
    98 static int generate_mapcmd_kvs(
const struct kmr_kv_box,
   100 static int run_kv_generator(
const struct kmr_kv_box,
   102 static int write_kvs(
const struct kmr_kv_box[], 
const long,
   104 static int generate_redcmd_kvs(
const struct kmr_kv_box,
   108 static void create_tmpdir(
KMR *, 
char *, 
size_t);
   109 static void delete_tmpdir(
KMR *, 
char *);
   124 kmrrun_abort(
int rank, 
const char *format, ...)
   128         va_start(arg, format);
   129         vfprintf(stderr, format, arg);
   132     MPI_Abort(MPI_COMM_WORLD, 1);
   140 add_command_kv(
KMR_KVS *kvo, 
int id, 
char **cmd, 
char *infile, 
int n_procs)
   143     char *cp, *np, *value;
   147     snprintf(maxprocs, 31, 
"maxprocs=%d", n_procs);
   150     for (cmdlen = 0, i = 0; i < 
ARGSIZ; i++) {
   151         if (cmd[i] == NULL) {
   154         cmdlen += (int)strlen(cmd[i]) + 1;
   156     vlen = (int)strlen(maxprocs) + 1 + cmdlen + (int)strlen(infile) + 1;
   157     value = (
char *)malloc((
size_t)vlen * 
sizeof(char));
   158     memcpy(value, maxprocs, strlen(maxprocs));
   159     cp = value + strlen(maxprocs);
   161     for (i = 0; i < 
ARGSIZ; i++) {
   162         if (cmd[i] == NULL) {
   165         int len = (int)strlen(cmd[i]);
   166         memcpy(cp, cmd[i], (
size_t)len);
   172     memcpy(cp, infile, strlen(infile));
   173     *(cp + strlen(infile)) = 
'\0';
   178         if ((np = strchr((
const char*)cp, 
' ')) != NULL) {
   186     struct kmr_kv_box nkv = { .klen = 
sizeof(long),
   187                               .vlen = vlen * (
int)
sizeof(char),
   189                               .v.p  = (
void *)value };
   200 generate_mapcmd_kvs(
const struct kmr_kv_box kv,
   205     char *path = info->infile;
   208     if (stat(path, &status) < 0) {
   209         fprintf(stderr, 
"File[%s] error\n", path);
   212     if (!S_ISDIR(status.st_mode) && !S_ISREG(status.st_mode)) {
   213         fprintf(stderr, 
"File[%s] is not regular file or directory\n", path);
   217     if (S_ISDIR(status.st_mode)) {  
   219         long nmax = pathconf(path, _PC_NAME_MAX);
   221             direntsz = (64 * 1024);
   223             direntsz = (offsetof(
struct dirent, d_name) + (size_t)nmax + 1);
   226         struct dirent *dentp;
   235         while (readdir_r(d, (
void *)b, &dentp) >= 0) {
   237             char fullpath[MAXPATHLEN];
   242             ret = snprintf(fullpath, 
sizeof(fullpath), 
"%s/%s",
   243                            path, dentp->d_name);
   249             if (stat(fullpath, &substat) < 0) {
   252             if (S_ISREG(substat.st_mode)) {
   253                 ret = add_command_kv(kvo, 
id, info->cmd_args, fullpath,
   255                 if (ret != MPI_SUCCESS) {
   264         ret = add_command_kv(kvo, 0, info->cmd_args, path, info->num_procs);
   280     if (info->cmd_args[0] != NULL) {
   285             perror(
"pipe for kv generator");
   291             perror(
"fork kv generator");
   293         } 
else if (pid == 0) {
   295             ret = close(pipefd[0]);
   297                 perror(
"pipe close kv generator");
   300             ret = dup2(pipefd[1], STDOUT_FILENO);
   302                 perror(
"dup2 pipe kv generator");
   305             ret = close(pipefd[1]);
   307                 perror(
"pipe close kv generator");
   312             char *cp, *infile = NULL;
   313             for (cp = (
char *)kv.v.p; cp < kv.v.p + kv.vlen - 1; cp++) {
   319             char *cmd_args[
ARGSIZ+1] = { NULL };
   321             for (i = 0; i <= 
ARGSIZ; i++) {
   322                 if (info->cmd_args[i] != NULL) {
   323                     cmd_args[i] = info->cmd_args[i];
   325                     cmd_args[i] = infile;
   330             ret = execv(cmd_args[0], cmd_args);
   332                 perror(
"execv kv generator");
   337             ret = close(pipefd[1]);
   339                 perror(
"pipe close kv generator");
   346             FILE* chld_out = fdopen(pipefd[0], 
"r");
   347             while (fgets(line, 
sizeof(line), chld_out) != NULL) {
   348                 char *cp = strchr(line, 
'\n');
   355                 cp = strchr(line, 
' ');
   363                 char *value = (cp + 1);
   365                 nkv.klen = (int)strlen(key) + 1;
   366                 nkv.vlen = (int)strlen(value) + 1;
   370                 if (ret != MPI_SUCCESS) {
   375                     fprintf(stderr, (
"warning: Line too long or "   376                                      "missing last newline.\n"));
   379                     fprintf(stderr, (
"warning: Some lines have "   380                                      "no key-value pairs (ignored).\n"));
   384             ret = close(pipefd[0]);
   386                 perror(
"pipe close kv generator");
   399 write_kvs(
const struct kmr_kv_box kv[], 
const long n,
   404     char filename[PATHLEN];
   406     snprintf(filename, PATHLEN, 
"%s/%d/%s",
   407              (
char *)p, kvs->c.mr->rank, kv[0].k.p);
   408     if ((fp = fopen(filename, 
"w")) == NULL) {
   409         perror(
"open file with write mode");
   412     for (
long i = 0; i < n; i++) {
   413         fprintf(fp, 
"%s %s\n", kv[i].k.p, kv[i].v.p);
   419     nkv.klen = kv[0].klen;
   421     nkv.vlen = (int)strlen(filename) + 1;
   424     if (ret != MPI_SUCCESS) {
   435 generate_redcmd_kvs(
const struct kmr_kv_box kv,
   441     ret = add_command_kv(kvo, (
int)i_, info->cmd_args, (
char *)kv.v.p,
   453     char *file_name = (
char*)kv.v.p;
   454     int ret = access(file_name, F_OK);
   471     if (!(argstr[0] == 
'.' || argstr[0] == 
'/')) {
   473         int len = (int)strlen(argstr) + 1;
   475             fprintf(stderr, 
"command line is too long.\n");
   476             MPI_Abort(MPI_COMM_WORLD, 1);
   478         fprintf(stderr, 
"The command is assumed to be located in "   479                 "the current directory.\n");
   480         for (
int i = len; i >= 0; i--) {
   481             argstr[i+2] = argstr[i];
   493         if ((np = strchr((
const char*)cp, 
' ')) != NULL) {
   496         if (++ap >= &argary[
ARGSIZ-1]) {
   514 create_tmpdir(
KMR *mr, 
char *tmpdir, 
size_t tmpdir_len)
   518         int cur_time = (int)time(NULL);
   519         snprintf(tmpdir, tmpdir_len, TMPDIR_PREFIX
"%d", cur_time);
   520         int ret = mkdir(tmpdir, 0777);
   522             char *errmsg = strerror(errno);
   523             kmrrun_abort(0, 
"Error on creating the temporal directory: %s\n",
   527     int ret = MPI_Bcast(tmpdir, (
int)tmpdir_len, MPI_CHAR, 0, mr->comm);
   528     if (ret != MPI_SUCCESS) {
   529         kmrrun_abort(mr->rank, 
"MPI_Bcast failed.\n");
   532     char rankdir[PATHLEN];
   533     snprintf(rankdir, PATHLEN, 
"%s/%d", tmpdir, mr->rank);
   534     ret = mkdir(rankdir, 0777);
   536         char *errmsg = strerror(errno);
   537         kmrrun_abort(mr->rank,
   538                      "Error on creating the rank local temporal directory: "   545 delete_tmpdir(
KMR *mr, 
char *tmpdir)
   548     char rankdir[PATHLEN];
   549     snprintf(rankdir, PATHLEN, 
"%s/%d", tmpdir, mr->rank);
   550     int ret = rmdir(rankdir);
   552         char *errmsg = strerror(errno);
   554                 "Rank[%05d] Failed to delete rank local temporal directory: "   555                 "%s\n", mr->rank, errmsg);
   557     MPI_Barrier(mr->comm);
   562             char *errmsg = strerror(errno);
   564                     "Failed to delete the temporal directory: %s\n", errmsg);
   571 main(
int argc, 
char *argv[])
   574     char *mapper = NULL, *reducer = NULL, *infile = NULL;
   578     int map_procs = DEFAULT_PROCS, red_procs = DEFAULT_PROCS;
   582         "Usage %s [-n m_num[:r_num]] -m mapper [-k kvgenerator]\n"   583         "     [-r reducer] [--ckpt]\n"   586     MPI_Init(&argc, &argv);
   587     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
   590         int option_index = 0;
   592         static struct option long_options[] = {
   593             {
"ckpt", no_argument, 0, 0},
   597         opt = getopt_long(argc, argv, 
"m:r:k:n:",
   598                           long_options, &option_index);
   606             if (strcmp(
"ckpt", long_options[option_index].name) == 0) {
   611             asz = (strlen(optarg) + 1);
   613                 kmrrun_abort(rank, 
"Argument too long for mapper (%s)\n",
   616             memcpy(margbuf, optarg, asz);
   621             asz = (strlen(optarg) + 1);
   623                 kmrrun_abort(rank, 
"Argument too long for reducer (%s)\n",
   626             memcpy(rargbuf, optarg, asz);
   631             asz = (strlen(optarg) + 1);
   633             memcpy(para_arg, optarg, asz);
   634             cp = strchr(para_arg, 
':');
   637                 map_procs = (int)strtol(para_arg, NULL, 10);
   638                 red_procs = map_procs;
   643                 map_procs = (int)strtol(para_arg, NULL, 10);
   644                 red_procs = (int)strtol(np, NULL, 10);
   648             asz = (strlen(optarg) + 1);
   650                 kmrrun_abort(rank, 
"Argument too long for key-value "   651                              "generator (%s)\n", optarg);
   653             memcpy(kargbuf, optarg, asz);
   657             kmrrun_abort(rank, usage_msg, argv[0]);
   661     if ((argc - optind) != 1) {
   662         kmrrun_abort(rank, usage_msg, argv[0]);
   664         infile = argv[optind];
   668     if (mapper == NULL) {
   669         kmrrun_abort(rank, usage_msg, argv[0]);
   674     MPI_Info_create(&info);
   675     if (ckpt_enable == 1) {
   676         ret = MPI_Info_set(info, 
"ckpt_enable", 
"1");
   679     MPI_Info_free(&info);
   680     mr->spawn_gap_msec[0] = 500;
   681     mr->spawn_gap_msec[1] = 1000;
   685     _Bool nonmpi = (map_procs == 1) ? 1 : 0;
   686     struct cmdinfo mapinfo = { margv, infile, map_procs };
   688     ret = 
kmr_map_once(kvs_commands, &mapinfo, kmr_noopt, 1,
   689                        generate_mapcmd_kvs);
   690     if (ret != MPI_SUCCESS) {
   691         kmrrun_abort(rank, 
"kmr_map_once failed.\n");
   694     ret = 
kmr_shuffle(kvs_commands, kvs_commands2, kmr_noopt);
   695     if (ret != MPI_SUCCESS) {
   696         kmrrun_abort(rank, 
"kmr_shuffle failed.\n");
   701     struct cmdinfo gkvinfo = { kargv, NULL, 1 };
   703                             MPI_INFO_NULL, kmr_snoopt, run_kv_generator);
   704     if (ret != MPI_SUCCESS) {
   705         kmrrun_abort(rank, 
"executing mapper failed.\n");
   708     if (reducer != NULL) {
   712         if (ret != MPI_SUCCESS) {
   713             kmrrun_abort(rank, 
"shuffling failed.\n");
   717         char tmpdir[PATHLEN];
   718         create_tmpdir(mr, tmpdir, PATHLEN);
   722         ret = 
kmr_reduce(kvs_red, kvs_file, tmpdir, kmr_noopt, write_kvs);
   723         if (ret != MPI_SUCCESS) {
   724             kmrrun_abort(rank, 
"writing key-values to files failed.\n");
   728         nonmpi = (red_procs == 1) ? 1 : 0;
   729         struct cmdinfo redinfo = { rargv, NULL, red_procs };
   731         struct kmr_option kmr_inspect = { .inspect = 1 };
   732         ret = 
kmr_map(kvs_file, kvs_commands, &redinfo, kmr_inspect,
   733                       generate_redcmd_kvs);
   734         if (ret != MPI_SUCCESS) {
   735             kmrrun_abort(rank, 
"kmr_map failed.\n");
   741         if (ret != MPI_SUCCESS) {
   742             kmrrun_abort(rank, 
"executing reducer failed.\n");
   746         ret = 
kmr_map(kvs_file, NULL, NULL, kmr_noopt, delete_file);
   747         if (ret != MPI_SUCCESS) {
   748             kmrrun_abort(rank, 
"deleting file failed.\n");
   752         delete_tmpdir(mr, tmpdir);
 Key-Value Stream (abstract). 
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs. 
Options to Mapping, Shuffling, and Reduction. 
#define ARGSTRLEN
Buffer string size of arguments to mapper and reducer programs. 
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair. 
#define ARGSIZ
Maximum number of arguments to mapper and reducer programs. 
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once. 
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes. 
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks. 
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS). 
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply. 
Handy Copy of a Key-Value Field. 
int kmr_fin(void)
Clears the environment. 
#define kmr_init()
Sets up the environment. 
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context(). 
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments. 
#define LINELEN
Maximum length of a line of data. 
int kmr_map_processes(_Bool nonmpi, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent processes. 
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).