15 #include <sys/types.h> 18 #include <sys/param.h> 19 #include <arpa/inet.h> 28 #define MAX(a,b) (((a)>(b))?(a):(b)) 29 #define MIN(a,b) (((a)<(b))?(a):(b)) 31 static const int kmr_kv_buffer_slack_size = 1024;
36 KMR_RPC_NONE, KMR_RPC_GOON, KMR_RPC_DONE
40 kmr_assert_peer_tag(
int tag)
42 assert(KMR_TAG_PEER_0 <= tag && tag < KMR_TAG_PEER_END);
52 #define KMR_RPC_ID_NONE -1 53 #define KMR_RPC_ID_FIN -2 76 if (kmr_fields_pointer_p(kvi)) {
77 kmr_error(mr,
"kmr_map_ms: cannot handle pointer field types");
79 assert(kvo->c.key_data == kvi->c.key_data
80 && kvo->c.value_data == kvi->c.value_data);
81 MPI_Comm comm = mr->comm;
82 int nprocs = mr->nprocs;
83 _Bool tracing5 = (mr->trace_map_ms && (5 <= mr->verbosity));
84 long longcount = kvi->c.element_count;
85 assert(INT_MIN <= longcount && longcount <= INT_MAX);
86 int cnt = (int)longcount;
90 msstates = &(ms->states[0]);
94 ms =
kmr_malloc((hdsz +
sizeof(
char) * (
size_t)cnt));
99 msstates = &(ms->states[0]);
100 for (
int i = 0; i < cnt; i++) {
101 msstates[i] = KMR_RPC_NONE;
104 char *name =
"kmr_map_ms";
106 ";;KMR [%05d] %s: key-count=%d\n",
107 mr->rank, name, cnt);
115 kvi->c.temporary_data = ev;
116 kvi->c.current_block = kvi->c.first_block;
117 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, kvi->c.first_block);
118 for (
int i = 0; i < cnt; i++) {
121 e = kmr_kvs_next(kvi, e, 0);
125 if (ms->dones == cnt && ms->idles == (nprocs - 1)) {
127 for (
int peer = 1; peer < nprocs; peer++) {
129 int req[3] = {0, KMR_RPC_ID_FIN, 0};
130 cc = MPI_Send(req, 3, MPI_INT, peer, KMR_TAG_REQ, comm);
131 assert(cc == MPI_SUCCESS);
133 if (kvi->c.temporary_data != 0) {
134 kmr_free(kvi->c.temporary_data,
136 kvi->c.temporary_data = 0;
141 assert(ms->dones <= ms->kicks);
145 cc = MPI_Recv(req, 3, MPI_INT, MPI_ANY_SOURCE, KMR_TAG_REQ, comm, &st);
146 assert(cc == MPI_SUCCESS);
147 int peer_tag = req[0];
148 int peer = st.MPI_SOURCE;
153 if (
id == KMR_RPC_ID_NONE) {
155 }
else if (
id == KMR_RPC_ID_FIN) {
158 assert(ms->idles <= (nprocs - 1));
162 kmr_assert_peer_tag(peer_tag);
164 cc = MPI_Recv(packed, sz, MPI_BYTE, peer, peer_tag, comm, &st);
165 assert(cc == MPI_SUCCESS);
168 assert(cc == MPI_SUCCESS);
169 struct kmr_option keepopen = {.keep_open = 1};
171 assert(cc == MPI_SUCCESS);
172 kmr_free(packed, (
size_t)sz);
173 assert(msstates[
id] == KMR_RPC_GOON);
174 msstates[id] = KMR_RPC_DONE;
178 if (ms->kicks < cnt) {
181 for (
id = 0;
id < cnt;
id++) {
182 if (msstates[
id] == KMR_RPC_NONE) {
186 assert(
id != KMR_RPC_ID_NONE &&
id != cnt);
188 int sz = (int)kmr_kvs_entry_netsize(e);
190 int ack[2] = {id, sz};
191 cc = MPI_Send(ack, 2, MPI_INT, peer, peer_tag, comm);
192 assert(cc == MPI_SUCCESS);
193 cc = MPI_Send(e, sz, MPI_BYTE, peer, peer_tag, comm);
194 assert(cc == MPI_SUCCESS);
195 assert(msstates[
id] == KMR_RPC_NONE);
196 msstates[id] = KMR_RPC_GOON;
199 char *name =
"kmr_map_ms";
201 ";;KMR [%05d] %s: work=%d to rank=%d\n",
202 mr->rank, name,
id, peer);
207 int ack[2] = {KMR_RPC_ID_NONE, 0};
208 cc = MPI_Send(ack, 2, MPI_INT, peer, peer_tag, comm);
209 assert(cc == MPI_SUCCESS);
225 assert(!kmr_fields_pointer_p(kvi)
226 && kvo->c.key_data == kvi->c.key_data
227 && kvo->c.value_data == kvi->c.value_data);
228 KMR *
const mr = kvi->c.mr;
229 const MPI_Comm comm = mr->comm;
230 const int rank = mr->rank;
231 const enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvo->c.key_data);
232 const enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvo->c.value_data);
234 assert(kvi->c.element_count == 0);
236 const _Bool threading = !(mr->single_thread || opt.nothreading);
238 KMR_OMP_PARALLEL_IF_(threading)
241 int thr = KMR_OMP_GET_THREAD_NUM();
245 int peer_tag = KMR_TAG_PEER(thr);
246 kmr_assert_peer_tag(peer_tag);
249 int req[3] = {peer_tag, KMR_RPC_ID_NONE, 0};
251 cc = MPI_Send(req, 3, MPI_INT, 0, KMR_TAG_REQ, comm);
252 assert(cc == MPI_SUCCESS);
257 cc = MPI_Recv(ack, 2, MPI_INT, 0, peer_tag, comm, &st);
258 assert(cc == MPI_SUCCESS);
261 if (
id == KMR_RPC_ID_NONE) {
266 assert(
id >= 0 && sz > 0);
268 maxsz = (sz + kmr_kv_buffer_slack_size);
273 cc = MPI_Recv(e, sz, MPI_BYTE, 0, peer_tag, comm, &st);
274 assert(cc == MPI_SUCCESS);
280 cc = (*m)(kv, kvi, kvx, arg, id);
281 if (cc != MPI_SUCCESS) {
283 snprintf(ee,
sizeof(ee),
284 "Map-fn returned with error cc=%d", cc);
291 assert(cc == MPI_SUCCESS && packed != 0);
293 assert(packsz <= (
size_t)INT_MAX);
295 int req[3] = {peer_tag, id, sz};
297 cc = MPI_Send(req, 3, MPI_INT, 0, KMR_TAG_REQ, comm);
298 assert(cc == MPI_SUCCESS);
300 cc = MPI_Send(packed, sz, MPI_BYTE, 0, peer_tag, comm);
301 assert(cc == MPI_SUCCESS);
305 assert(cc == MPI_SUCCESS);
306 kmr_free(packed, packsz);
309 kmr_free(e, (
size_t)maxsz);
316 int req[3] = {0, KMR_RPC_ID_FIN, 0};
317 cc = MPI_Send(req, 3, MPI_INT, 0, KMR_TAG_REQ, comm);
318 assert(cc == MPI_SUCCESS);
321 cc = MPI_Recv(req, 3, MPI_INT, 0, KMR_TAG_REQ, comm, &st);
322 assert(cc == MPI_SUCCESS);
323 assert(req[0] == 0 && req[1] == KMR_RPC_ID_FIN && req[2] == 0);
347 kmr_assert_kvs_ok(kvi, kvo, 1, 0);
349 struct kmr_option kmr_supported = {.nothreading = 1, .keep_open = 1};
350 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
353 long cnt = kvi->c.element_count;
354 assert(INT_MIN <= cnt && cnt <= INT_MAX);
359 if (ccr == MPI_SUCCESS) {
361 assert(cc == MPI_SUCCESS);
363 assert(cc == MPI_SUCCESS);
368 assert(cc == MPI_SUCCESS);
370 assert(cc == MPI_SUCCESS);
388 enum kmr_spawn_mode {
389 KMR_SPAWN_INTERACT, KMR_SPAWN_SERIAL, KMR_SPAWN_PARALLEL
428 enum kmr_spawn_mode mode;
440 MPI_Request *replies;
444 char watch_host[MAXHOSTNAMELEN + 10];
451 kmr_sum_on_all_ranks(
KMR *mr,
int v,
int *sum)
457 .klen = (int)
sizeof(
long),
458 .vlen = (int)
sizeof(
long),
463 assert(cc == MPI_SUCCESS);
467 assert(cc == MPI_SUCCESS);
470 assert(cc == MPI_SUCCESS);
480 MPI_Info info = MPI_Info_f2c(finfo);
491 info->u.icomm = s->icomm;
492 info->icomm_ff = MPI_Comm_c2f(s->icomm);
493 info->reply_root = opt.reply_root;
500 if (info->u.icomm != s->icomm) {
501 s->icomm = info->u.icomm;
502 }
else if (info->icomm_ff != MPI_Comm_c2f(s->icomm)) {
503 s->icomm = MPI_Comm_f2c(info->icomm_ff);
513 assert(spw->n_spawns == (
int)kvi->c.element_count);
514 KMR *
const mr = kvi->c.mr;
515 _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
520 kvi->c.current_block = kvi->c.first_block;
521 struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, kvi->c.first_block);
522 for (
int w = 0; w < spw->n_spawns; w++) {
524 e = kmr_kvs_next(kvi, e, 0);
530 cc = kmr_sum_on_all_ranks(mr, ((spw->n_spawns > 0) ? 1 : 0), &nranks);
531 assert(cc == MPI_SUCCESS && nranks <= mr->nprocs);
532 spw->n_spawners = nranks;
535 cc = MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_UNIVERSE_SIZE, &usizep, &uflag);
536 if (cc != MPI_SUCCESS || uflag == 0) {
538 snprintf(ee,
sizeof(ee),
"%s: MPI lacks universe size", spw->fn);
541 spw->usize = *usizep;
542 if (spw->usize <= mr->nprocs) {
544 snprintf(ee,
sizeof(ee),
"%s: no dynamic processes in universe",
548 int m = spw->usize - mr->nprocs;
549 if (spw->n_spawners != 0) {
550 m /= spw->n_spawners;
552 spw->spawn_limit = ((mr->spawn_max_processes != 0)
553 ? MIN(mr->spawn_max_processes, m)
556 if (spw->n_spawns > 0) {
558 ";;KMR [%05d] %s: universe-size=%d spawn-limit=%d\n",
559 mr->rank, spw->fn, spw->usize, spw->spawn_limit);
568 char *infoval =
kmr_malloc((
size_t)(MPI_MAX_INFO_VAL + 1));
570 if (info != MPI_INFO_NULL) {
571 cc = MPI_Info_get(info,
"maxprocs", MPI_MAX_INFO_VAL,
573 assert(cc == MPI_SUCCESS);
579 cc = kmr_parse_int(infoval, &v);
580 if (cc == 0 || v < 0) {
582 snprintf(ee,
sizeof(ee),
"%s: bad value in info maxprocs=%s",
592 kmr_free(infoval, (
size_t)(MPI_MAX_INFO_VAL + 1));
597 spw->n_processes = 0;
598 for (
int w = 0; w < spw->n_spawns; w++) {
611 s->icomm = MPI_COMM_NULL;
614 s->alen = (size_t)kv.vlen;
616 memcpy(s->abuf, kv.v.p, (
size_t)kv.vlen);
618 cc = kmr_scan_argv_strings(mr, s->abuf, s->alen,
620 opt.separator_space, spw->fn);
621 assert(cc == MPI_SUCCESS);
622 s->argv0 =
kmr_malloc(
sizeof(
char *) * (
size_t)(maxargc + 1));
623 memset(s->argv0, 0, (
sizeof(
char *) * (
size_t)(maxargc + 1)));
624 cc = kmr_scan_argv_strings(mr, s->abuf, s->alen,
625 (maxargc + 1), &s->argc0, s->argv0,
626 opt.separator_space, spw->fn);
627 assert(cc == MPI_SUCCESS);
628 assert(maxargc == s->argc0);
632 if (s->argc0 > 0 && strncmp(
"maxprocs=", s->argv0[0], 9) == 0) {
634 cc = kmr_parse_int(&s->argv0[0][9], &v);
635 if (cc == 0 || v < 0) {
637 snprintf(ee,
sizeof(ee),
"%s: bad maxprocs=%s",
638 spw->fn, s->argv0[0]);
642 s->argc = (s->argc0 - 1);
643 s->argv = (s->argv0 + 1);
645 s->n_procs = maxprocs;
652 snprintf(ee,
sizeof(ee),
"%s: no arguments", spw->fn);
655 if (s->n_procs <= 0) {
657 snprintf(ee,
sizeof(ee),
"%s: maxprocs not specified",
661 if (s->n_procs > spw->spawn_limit) {
663 snprintf(ee,
sizeof(ee),
664 "%s: maxprocs too large, (maxprocs=%d limit=%d)",
665 spw->fn, s->n_procs, spw->spawn_limit);
669 spw->n_processes += s->n_procs;
680 _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
681 if (s->icomm != MPI_COMM_NULL) {
683 ptrdiff_t done = (s - spw->spawned);
684 fprintf(stderr, (
";;KMR [%05d] %s [%ld]:" 685 " MPI_Comm_free (could block)...\n"),
686 mr->rank, spw->fn, done);
690 if (!mr->spawn_disconnect_but_free) {
691 cc = MPI_Comm_free(&(s->icomm));
693 cc = MPI_Comm_disconnect(&(s->icomm));
695 assert(cc == MPI_SUCCESS);
698 ptrdiff_t done = (s - spw->spawned);
699 fprintf(stderr, (
";;KMR [%05d] %s [%ld]:" 700 " MPI_Comm_free done\n"),
701 mr->rank, spw->fn, done);
715 assert(
sizeof(spw->watch_host) >= 46);
719 struct sockaddr_in sa4;
720 struct sockaddr_in6 sa6;
721 struct sockaddr_storage ss;
723 char hostname[MAXHOSTNAMELEN];
724 char address[INET6_ADDRSTRLEN];
728 int af = mr->spawn_watch_af;
729 assert(af == 0 || af == 4 || af == 6);
730 char *family = (af == 4 ?
"AF_INET" :
"AF_INET6");
732 assert(spw->watch_listener == -1);
733 const int *ports = mr->spawn_watch_port_range;
734 assert(ports[0] != 0 || ports[1] == 0);
735 for (
int port = ports[0]; port <= ports[1]; port++) {
737 sa.sa.sa_family = AF_INET;
738 }
else if (af == 0 || af == 6) {
739 sa.sa.sa_family = AF_INET6;
743 int fd = socket(sa.sa.sa_family, SOCK_STREAM, 0);
746 char *m = strerror(errno);
747 snprintf(ee,
sizeof(ee),
"%s: socket(%s) failed: %s",
752 cc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one,
sizeof(one));
755 char *m = strerror(errno);
756 snprintf(ee,
sizeof(ee),
"%s: setsockopt(SO_REUSEADDR): %s",
758 kmr_warning(mr, 1, ee);
763 memset(&sa, 0,
sizeof(sa));
764 sa.sa4.sin_family = AF_INET;
765 sa.sa4.sin_addr.s_addr = htonl(INADDR_ANY);
766 sa.sa4.sin_port = htons((uint16_t)port);
767 salen =
sizeof(sa.sa4);
768 }
else if (af == 0 || af == 6) {
769 memset(&sa, 0,
sizeof(sa));
770 sa.sa6.sin6_family = AF_INET6;
771 sa.sa6.sin6_addr = in6addr_any;
772 sa.sa6.sin6_port = htons((uint16_t)port);
773 salen =
sizeof(sa.sa6);
781 cc = bind(fd, &sa.sa, salen);
783 if (errno == EADDRINUSE || errno == EINVAL) {
789 char *m = strerror(errno);
790 snprintf(ee,
sizeof(ee),
"%s: bind(%s, port=%d) failed: %s",
791 spw->fn, family, port, m);
798 int backlog = spw->spawn_limit;
799 cc = listen(fd, backlog);
801 if (errno == EADDRINUSE || errno == EINVAL) {
807 char *m = strerror(errno);
808 snprintf(ee,
sizeof(ee),
"%s: listen(%s, port=%d) failed: %s",
809 spw->fn, family, port, m);
814 spw->watch_listener = fd;
818 int fd = spw->watch_listener;
821 snprintf(ee,
sizeof(ee),
"%s: no ports to listen to watch-programs",
828 memset(&sa, 0,
sizeof(sa));
829 socklen_t salen =
sizeof(sa);
830 cc = getsockname(fd, &sa.sa, &salen);
833 char *m = strerror(errno);
834 snprintf(ee,
sizeof(ee),
"%s: getsockname() failed: %s",
840 if (sa.sa.sa_family == AF_INET) {
841 port = ntohs(sa.sa4.sin_port);
842 }
else if (sa.sa.sa_family == AF_INET6) {
843 port = ntohs(sa.sa6.sin6_port);
846 snprintf(ee,
sizeof(ee),
"%s: getsockname(): unknown ip family=%d",
847 spw->fn, sa.sa.sa_family);
852 if (mr->spawn_watch_host_name != 0) {
853 cc = snprintf(hostname,
sizeof(hostname),
854 "%s", mr->spawn_watch_host_name);
855 assert(cc < (
int)
sizeof(hostname));
857 cc = gethostname(hostname,
sizeof(hostname));
860 char *m = strerror(errno);
861 snprintf(ee,
sizeof(ee),
"%s: gethostname() failed: %s",
867 struct addrinfo hints;
868 memset(&hints, 0,
sizeof(hints));
869 hints.ai_flags = AI_ADDRCONFIG;
870 hints.ai_socktype = SOCK_STREAM;
871 hints.ai_protocol = IPPROTO_TCP;
873 hints.ai_family = AF_INET;
874 }
else if (af == 6) {
875 hints.ai_family = AF_INET6;
876 }
else if (af == 0) {
877 hints.ai_family = (AF_INET6 | AI_V4MAPPED);
881 struct addrinfo *addrs = 0;
882 cc = getaddrinfo(hostname, 0, &hints, &addrs);
885 const char *m = gai_strerror(cc);
886 snprintf(ee,
sizeof(ee),
"%s: getaddrinfo(%s) failed: %s",
887 spw->fn, hostname, m);
891 for (p = addrs; p != 0; p = p->ai_next) {
892 if (!(p->ai_family == AF_INET || p->ai_family == AF_INET6)) {
895 if (af == 4 && p->ai_family != AF_INET) {
898 if (af == 6 && p->ai_family != AF_INET6) {
905 snprintf(ee,
sizeof(ee),
"%s: getaddrinfo(%s): no address for host",
910 if (p->ai_family == AF_INET) {
911 void *inaddr = &(((
struct sockaddr_in *)p->ai_addr)->sin_addr);
912 inet_ntop(p->ai_family, inaddr, address,
sizeof(address));
913 }
else if (p->ai_family == AF_INET6) {
914 void *inaddr = &(((
struct sockaddr_in6 *)p->ai_addr)->sin6_addr);
915 inet_ntop(p->ai_family, inaddr, address,
sizeof(address));
918 snprintf(ee,
sizeof(ee),
"%s: getaddrinfo(%s): unknown ip family=%d",
919 spw->fn, hostname, p->ai_family);
924 assert(0 <= index && index < spw->n_spawns);
926 s->watch_port = port;
928 cc = snprintf(spw->watch_host,
sizeof(spw->watch_host),
930 assert(cc < (
int)
sizeof(spw->watch_host));
941 _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
942 assert(0 <= index && index < spw->n_spawns);
944 assert(s->n_procs > 0);
947 struct sockaddr_in sa4;
948 struct sockaddr_in6 sa6;
949 struct sockaddr_storage ss;
953 assert(spw->watch_listener != -1);
954 int fd0 = spw->watch_listener;
955 for (
int count = 0; count < s->n_procs; count++) {
958 struct pollfd fds0, *fds = &fds0;
959 memset(fds, 0, (
sizeof(
struct pollfd) * nfds));
961 fds[0].events = (POLLIN|POLLPRI);
964 assert(mr->spawn_watch_accept_onhold_msec >= (60 * 1000));
965 int msec = mr->spawn_watch_accept_onhold_msec;
966 int nn = poll(fds, nfds, msec);
969 snprintf(ee,
sizeof(ee),
970 "%s: accepting watch-programs timed out" 971 " (msec=%d)", spw->fn, msec);
973 }
else if (nn < 0 && (errno == EAGAIN || errno == EINTR)) {
975 char *m = strerror(errno);
976 snprintf(ee,
sizeof(ee),
977 "%s: poll (for watch-programs) returned: %s",
979 kmr_warning(mr, 1, ee);
983 char *m = strerror(errno);
984 snprintf(ee,
sizeof(ee),
985 "%s: poll (for watch-programs) failed: %s",
992 memset(&sa, 0,
sizeof(sa));
993 socklen_t salen =
sizeof(sa);
994 int fd = accept(fd0, &sa.sa, &salen);
997 char *m = strerror(errno);
998 snprintf(ee,
sizeof(ee),
999 "%s: accept (for watch-programs) failed: %s",
1007 char address[INET6_ADDRSTRLEN];
1009 if (sa.sa.sa_family == AF_INET) {
1010 void *inaddr = &sa.sa4.sin_addr;
1011 inet_ntop(sa.sa.sa_family, inaddr, address,
sizeof(address));
1013 }
else if (sa.sa.sa_family == AF_INET6) {
1014 void *inaddr = &sa.sa6.sin6_addr;
1015 inet_ntop(sa.sa.sa_family, inaddr, address,
sizeof(address));
1019 snprintf(ee,
sizeof(ee),
"%s: accept(): unknown ip family=%d",
1020 spw->fn, sa.sa.sa_family);
1024 fprintf(stderr, (
";;KMR [%05d] %s [%d]:" 1025 " accepting a connection of watch-programs" 1026 " on port=%d from %s (%d/%d)\n"),
1027 mr->rank, spw->fn, index,
1028 s->watch_port, address, (count + 1), s->n_procs);
1033 if (count == 0 || mr->spawn_watch_all) {
1034 assert((s->index + count) <= spw->n_processes);
1035 spw->watches[s->index + count] = fd;
1041 ssize_t wsize = write(fd, &val,
sizeof(
int));
1044 char *m = strerror(errno);
1045 snprintf(ee,
sizeof(ee),
1046 "%s: write (for watch-programs) failed: %s",
1050 assert(wsize ==
sizeof(
int));
1053 ssize_t rsize = read(fd, &rval,
sizeof(
int));
1056 char *m = strerror(errno);
1057 snprintf(ee,
sizeof(ee),
1058 "%s: read (for watch-programs) failed: %s",
1062 assert(rsize ==
sizeof(
int));
1063 assert(val == rval);
1065 if (!(count == 0 || mr->spawn_watch_all)) {
1071 cc = close(spw->watch_listener);
1073 spw->watch_listener = -1;
1080 int w, _Bool replyeach, _Bool replyroot)
1082 assert(0 <= w && w < spw->n_spawns);
1084 enum kmr_spawn_mode mode = spw->mode;
1086 MPI_Request *reqs = spw->replies;
1087 if (mode == KMR_SPAWN_INTERACT) {
1089 assert(s->index + s->count <= spw->n_processes);
1090 for (
int rank = 0; rank < s->count; rank++) {
1091 assert(reqs[s->index + rank] == MPI_REQUEST_NULL);
1092 cc = MPI_Irecv(0, 0, MPI_BYTE,
1093 rank, KMR_TAG_SPAWN_REPLY,
1094 s->icomm, &reqs[s->index + rank]);
1095 assert(cc == MPI_SUCCESS);
1096 assert(reqs[s->index + rank] != MPI_REQUEST_NULL);
1098 }
else if (replyroot) {
1099 assert(w <= spw->n_processes);
1101 assert(reqs[w] == MPI_REQUEST_NULL);
1102 cc = MPI_Irecv(0, 0, MPI_BYTE,
1103 rank, KMR_TAG_SPAWN_REPLY,
1104 s->icomm, &reqs[w]);
1105 assert(cc == MPI_SUCCESS);
1106 assert(reqs[w] != MPI_REQUEST_NULL);
1110 }
else if (mode == KMR_SPAWN_SERIAL) {
1113 assert(s->index + s->count <= spw->n_processes);
1114 for (
int rank = 0; rank < s->count; rank++) {
1115 assert(reqs[s->index + rank] == MPI_REQUEST_NULL);
1116 cc = MPI_Irecv(0, 0, MPI_BYTE,
1117 rank, KMR_TAG_SPAWN_REPLY,
1118 s->icomm, &reqs[s->index + rank]);
1119 assert(cc == MPI_SUCCESS);
1120 assert(reqs[s->index + rank] != MPI_REQUEST_NULL);
1124 assert(mode == KMR_SPAWN_INTERACT || mode == KMR_SPAWN_SERIAL);
1139 _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
1141 MPI_Request *reqs = spw->replies;
1142 if (opt.reply_each) {
1145 int nwait = spw->n_processes;
1146 cc = MPI_Waitany(nwait, reqs, &index, &st);
1147 assert(cc == MPI_SUCCESS && index != MPI_UNDEFINED);
1148 assert(index < spw->n_processes);
1149 assert(reqs[index] == MPI_REQUEST_NULL);
1152 for (
int w = 0; w < spw->n_spawns; w++) {
1154 if (index < (s->index + s->count)) {
1155 assert(s->index <= index);
1163 int count = (opt.reply_each ? s->count : 1);
1165 assert((s->index + count) <= spw->n_processes);
1166 for (
int j = 0; j < count; j++) {
1167 if (reqs[s->index + j] == MPI_REQUEST_NULL) {
1173 fprintf(stderr, (
";;KMR [%05d] %s [%d]: got a reply (%d/%d)\n"),
1174 mr->rank, spw->fn, done, nreplies, count);
1178 _Bool fin = (nreplies == count);
1179 return (fin ? done : -1);
1180 }
else if (opt.reply_root) {
1183 int nwait = spw->n_spawns;
1184 cc = MPI_Waitany(nwait, reqs, &index, &st);
1185 assert(cc == MPI_SUCCESS && index != MPI_UNDEFINED);
1186 assert(index <= spw->n_spawns);
1187 assert(reqs[index] == MPI_REQUEST_NULL);
1189 assert(0 <= done && done < spw->n_spawns);
1192 assert(reqs[done] == MPI_REQUEST_NULL);
1195 fprintf(stderr, (
";;KMR [%05d] %s [%d]: got a root reply\n"),
1196 mr->rank, spw->fn, done);
1203 for (
int w = 0; w < spw->n_spawns; w++) {
1212 fprintf(stderr, (
";;KMR [%05d] %s [%d]: (no checks of replies)\n"),
1213 mr->rank, spw->fn, done);
1233 _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
1238 for (
int w = 0; w < spw->n_spawns; w++) {
1244 assert(nruns != 0 && spw->n_runnings == nruns);
1247 for (
int i = 0; i < spw->n_processes; i++) {
1248 if (spw->watches[i] != -1) {
1254 struct pollfd *fds =
kmr_malloc(
sizeof(
struct pollfd) * (
size_t)nfds);
1258 memset(fds, 0, (
sizeof(
struct pollfd) * nfds));
1260 for (
int i = 0; i < spw->n_processes; i++) {
1261 if (spw->watches[i] != -1) {
1262 assert(fdix < nfds);
1263 fds[fdix].fd = spw->watches[i];
1264 fds[fdix].events = (POLLIN|POLLPRI);
1265 fds[fdix].revents = 0;
1269 assert(fdix == nfds);
1272 fprintf(stderr, (
";;KMR [%05d] %s:" 1273 " waiting for some watch-programs finish\n"),
1280 int nn = poll(fds, nfds, msec);
1285 MPI_Testany(0, 0, &index, &ok, &st);
1290 }
else if (nn < 0 && (errno == EAGAIN || errno == EINTR)) {
1292 char *m = strerror(errno);
1293 snprintf(ee,
sizeof(ee),
1294 (
"poll (for watch-programs) interrupted;" 1295 " continuing: %s"), m);
1296 kmr_warning(mr, 1, ee);
1300 char *m = strerror(errno);
1301 snprintf(ee,
sizeof(ee),
1302 "%s: poll (for watch-programs) failed: %s",
1311 for (nfds_t k = 0; k < nfds; k++) {
1312 if (fds[k].fd != -1 && fds[k].revents != 0) {
1319 snprintf(ee,
sizeof(ee),
"poll (for watch-programs) no FD found");
1320 kmr_warning(mr, 1, ee);
1325 for (
int w = 0; w < spw->n_spawns; w++) {
1327 assert((s->index + s->count) <= spw->n_processes);
1328 for (
int j = 0; j < s->count; j++) {
1329 if (fd == spw->watches[s->index + j]) {
1330 index = (s->index + j);
1336 assert(fd != -1 && index != -1 && done != -1);
1337 assert(0 <= index && index < spw->n_processes);
1338 assert(0 <= done && done < spw->n_spawns);
1341 ssize_t rr = read(fd, garbage,
sizeof(garbage));
1344 assert(fd == spw->watches[index]);
1347 spw->watches[index] = -1;
1349 }
else if (rr > 0) {
1352 }
else if (rr == -1 && (errno == EAGAIN || errno == EINTR)) {
1354 char *m = strerror(errno);
1355 snprintf(ee,
sizeof(ee),
1356 "read (for watch-programs) returned: %s", m);
1357 kmr_warning(mr, 1, ee);
1359 }
else if (rr == -1) {
1361 char *m = strerror(errno);
1362 snprintf(ee,
sizeof(ee),
1363 "%s: read (for watch-programs) failed: %s",
1370 assert(0 <= done && done < spw->n_spawns);
1372 int count = ((mr->spawn_watch_all) ? s->count : 1);
1374 assert((s->index + count) <= spw->n_processes);
1375 for (
int j = 0; j < count; j++) {
1376 if (spw->watches[s->index + j] == -1) {
1382 fprintf(stderr, (
";;KMR [%05d] %s [%d]:" 1383 " detected a watch done (%d/%d)\n"),
1384 mr->rank, spw->fn, done, nreplies, count);
1388 _Bool fin = (nreplies == count);
1390 if (s->icomm != MPI_COMM_NULL) {
1391 cc = kmr_free_comm_with_tracing(mr, spw, s);
1392 assert(cc == MPI_SUCCESS);
1396 kmr_free(fds, (
sizeof(
struct pollfd) * (
size_t)nfds));
1397 return (fin ? done : -1);
1406 enum kmr_spawn_mode mode = spw->mode;
1408 if (mode == KMR_SPAWN_INTERACT) {
1409 done = kmr_wait_for_reply(mr, spw, opt);
1410 }
else if (mode == KMR_SPAWN_SERIAL) {
1411 done = kmr_wait_for_reply(mr, spw, opt);
1412 }
else if (mode == KMR_SPAWN_PARALLEL) {
1413 done = kmr_wait_for_watch(mr, spw, opt);
1419 assert(0 <= done && done < spw->n_spawns);
1421 s->timestamp[3] = MPI_Wtime();
1423 if (mr->spawn_pass_intercomm_in_argument
1424 && mode == KMR_SPAWN_INTERACT) {
1425 assert(mr->spawn_comms != 0);
1426 assert(mr->spawn_comms[done] == &(s->icomm));
1428 kmr_spawn_info_put(&si, s, opt, arg);
1429 cc = (*m)(spw->ev[done], kvi, kvo, &si, done);
1430 if (cc != MPI_SUCCESS) {
1432 snprintf(ee,
sizeof(ee),
1433 "Map-fn returned with error cc=%d", cc);
1436 kmr_spawn_info_get(&si, s);
1438 cc = (*m)(spw->ev[done], kvi, kvo, arg, done);
1439 if (cc != MPI_SUCCESS) {
1441 snprintf(ee,
sizeof(ee),
1442 "Map-fn returned with error cc=%d", cc);
1447 s->timestamp[4] = MPI_Wtime();
1448 if (s->icomm != MPI_COMM_NULL) {
1449 cc = kmr_free_comm_with_tracing(mr, spw, s);
1450 assert(cc == MPI_SUCCESS);
1452 s->timestamp[5] = MPI_Wtime();
1455 spw->n_runnings -= s->count;
1471 kmr_map_spawned_processes(
enum kmr_spawn_mode mode,
char *name,
1476 kmr_assert_kvs_ok(kvi, kvo, 1, 0);
1477 assert(kvi->c.value_data == KMR_KV_OPAQUE
1478 || kvi->c.value_data == KMR_KV_CSTRING);
1479 assert(kvi->c.element_count <= INT_MAX);
1480 _Bool use_reply = (mode == KMR_SPAWN_INTERACT || mode == KMR_SPAWN_SERIAL);
1481 _Bool use_watch = (mode != KMR_SPAWN_INTERACT);
1482 KMR *
const mr = kvi->c.mr;
1483 _Bool tracing5 = (mr->trace_map_spawn && (5 <= mr->verbosity));
1487 char hostport[MAXHOSTNAMELEN + 10];
1493 cc = kmr_install_watch_program(mr, name);
1494 assert(cc == MPI_SUCCESS);
1498 if (mr->spawn_self == MPI_COMM_NULL) {
1499 if (mr->spawn_retry_limit > 0) {
1500 cc = MPI_Comm_dup(MPI_COMM_SELF, &mr->spawn_self);
1501 if (cc != MPI_SUCCESS) {
1502 kmr_error_mpi(mr,
"MPI_Comm_dup(MPI_COMM_SELF)", cc);
1503 MPI_Abort(MPI_COMM_WORLD, 1);
1505 #if (MPI_VERSION == 3) 1506 cc = MPI_Comm_set_errhandler(mr->spawn_self, MPI_ERRORS_RETURN);
1508 cc = MPI_Errhandler_set(mr->spawn_self, MPI_ERRORS_RETURN);
1510 if (cc != MPI_SUCCESS) {
1511 kmr_error_mpi(mr,
"MPI_Errhandler_set()", cc);
1512 MPI_Abort(MPI_COMM_WORLD, 1);
1515 mr->spawn_self = MPI_COMM_SELF;
1519 int cnt = (int)kvi->c.element_count;
1523 spw->n_spawns = cnt;
1524 spw->n_starteds = 0;
1525 spw->n_runnings = 0;
1528 spw->watch_listener = -1;
1529 spw->watch_host[0] = 0;
1532 cc = kmr_list_spawns(spw, kvi, info, opt);
1533 assert(cc == MPI_SUCCESS);
1538 if (opt.take_ckpt) {
1557 assert(spw->replies == 0);
1558 spw->replies =
kmr_malloc(
sizeof(MPI_Request) * (
size_t)spw->n_processes);
1559 for (
int i = 0; i < spw->n_processes; i++) {
1560 spw->replies[i] = MPI_REQUEST_NULL;
1563 if (mode == KMR_SPAWN_PARALLEL) {
1564 assert(spw->watches == 0);
1565 spw->watches =
kmr_malloc(
sizeof(
int) * (
size_t)spw->n_processes);
1566 for (
int i = 0; i < spw->n_processes; i++) {
1567 spw->watches[i] = -1;
1570 assert(mr->spawn_comms == 0);
1571 mr->spawn_size = spw->n_spawns;
1572 mr->spawn_comms =
kmr_malloc(
sizeof(MPI_Comm *) * (
size_t)spw->n_spawns);
1573 for (
int w = 0; w < spw->n_spawns; w++) {
1574 mr->spawn_comms[w] = &(spw->spawned[w].icomm);
1578 if (mr->spawn_gap_msec[0] == 0) {
1582 unsigned int v = (
unsigned int)spw->usize;
1587 gap = (int)((((
long)mr->spawn_gap_msec[1] * usz) / 10)
1588 + mr->spawn_gap_msec[0]);
1593 for (
int w = from; w < spw->n_spawns; w++) {
1598 if ((spw->n_runnings + s->n_procs) > spw->spawn_limit) {
1599 while ((spw->n_runnings + s->n_procs) > spw->spawn_limit) {
1600 cc = kmr_wait_then_map(mr, spw,
1601 kvi, kvo, arg, opt, mapfn);
1602 assert(cc == MPI_SUCCESS);
1607 ";;KMR [%05d] %s: sleeping for spawn gap" 1609 mr->rank, spw->fn, gap);
1616 if (mode == KMR_SPAWN_PARALLEL) {
1617 cc = kmr_listen_to_watch(mr, spw, w);
1618 assert(cc == MPI_SUCCESS);
1620 cc = snprintf(spw->watch_host,
sizeof(spw->watch_host),
"0");
1621 assert(cc < (
int)
sizeof(spw->watch_host));
1631 if (!opt.no_set_infos && strchr(argv1[0],
'=') != 0) {
1632 if (info == MPI_INFO_NULL) {
1633 cc = MPI_Info_create(&infox);
1634 assert(cc == MPI_SUCCESS);
1636 cc = MPI_Info_dup(info, &infox);
1637 assert(cc == MPI_SUCCESS);
1641 while ((sep = strchr(argv1[0],
'=')) != 0) {
1643 char *v = (sep + 1);
1644 if (k != sep && v[0] != 0) {
1646 assert(*sep ==
'=');
1648 cc = MPI_Info_set(infox, k, v);
1649 assert(cc == MPI_SUCCESS);
1653 if (!(k != sep && v[0] != 0)) {
1669 for (argc1 = 0; argv1[argc1] != 0; argc1++);
1671 argc2 = (argc1 + 5);
1672 argv2 =
kmr_malloc(
sizeof(
char *) * (
size_t)(argc2 + 1));
1674 cc = snprintf(hostport,
sizeof(hostport),
1675 "%s/%d", spw->watch_host, s->watch_port);
1676 assert(cc < (
int)
sizeof(hostport));
1678 unsigned int vv = (
unsigned int)random();
1679 cc = snprintf(magic,
sizeof(magic),
"%08xN%dV0%s",
1680 vv, w, ((mr->trace_map_spawn) ?
"T1" :
""));
1681 assert(cc < (
int)
sizeof(magic));
1683 assert(mr->spawn_watch_program != 0);
1684 argv2[0] = mr->spawn_watch_program;
1685 argv2[1] = ((mode == KMR_SPAWN_SERIAL) ?
"seq" :
"mpi");
1686 argv2[2] = hostport;
1689 for (
int i = 0; i < argc1; i++) {
1690 argv2[5 + i] = argv1[i];
1692 argv2[(argc1 + 5)] = 0;
1701 fprintf(stderr, (
";;KMR [%05d] %s [%d]: MPI_Comm_spawn" 1702 " (maxprocs=%d;%s;info:%s)\n"),
1703 mr->rank, spw->fn, w, s->n_procs, ee0, ee1);
1707 s->timestamp[0] = MPI_Wtime();
1710 assert(s->icomm == MPI_COMM_NULL);
1711 int *ec =
kmr_malloc(
sizeof(
int) * (
size_t)s->n_procs);
1717 trys = (mr->spawn_retry_limit + 1);
1720 cc = MPI_Comm_spawn(argv2[0], &(argv2[1]), s->n_procs, infox,
1721 root, mr->spawn_self, &s->icomm, ec);
1722 if (cc != MPI_SUCCESS) {
1725 MPI_Error_class(cc, &xcc);
1726 assert(xcc == MPI_SUCCESS || xcc == MPI_ERR_SPAWN);
1729 if (cc == MPI_SUCCESS || trys == 0) {
1734 ";;KMR [%05d] %s: sleeping after spawn failure" 1736 mr->rank, spw->fn, mr->spawn_retry_gap_msec);
1739 kmr_msleep(mr->spawn_retry_gap_msec, 1);
1744 if (cc != MPI_SUCCESS) {
1746 kmr_error_mpi(mr,
"MPI_Comm_spawn()", cc);
1747 MPI_Abort(MPI_COMM_WORLD, 1);
1749 nspawns = s->n_procs;
1751 assert(nspawns > 0);
1753 s->timestamp[1] = MPI_Wtime();
1755 kmr_free(ec, (
sizeof(
int) * (
size_t)s->n_procs));
1756 if (argv2 != argv1) {
1757 kmr_free(argv2, (
sizeof(
char *) * (
size_t)(argc2 + 1)));
1762 s->index = spw->n_starteds;
1764 spw->n_starteds += nspawns;
1765 spw->n_runnings += nspawns;
1767 if (infox != info) {
1768 cc = MPI_Info_free(&infox);
1769 assert(cc == MPI_SUCCESS);
1773 if (mode == KMR_SPAWN_PARALLEL) {
1774 cc = kmr_accept_on_watch(mr, spw, w);
1775 assert(cc == MPI_SUCCESS);
1778 if (mr->spawn_disconnect_early && mode == KMR_SPAWN_PARALLEL) {
1779 if (s->icomm != MPI_COMM_NULL) {
1780 cc = kmr_free_comm_with_tracing(mr, spw, s);
1781 assert(cc == MPI_SUCCESS);
1785 if (mr->spawn_sync_at_startup && s->icomm != MPI_COMM_NULL) {
1787 cc = MPI_Comm_test_inter(s->icomm, &flag);
1788 assert(cc == MPI_SUCCESS && flag != 0);
1790 cc = MPI_Comm_remote_size(s->icomm, &peernprocs);
1791 assert(cc == MPI_SUCCESS && peernprocs == s->count);
1794 s->timestamp[2] = MPI_Wtime();
1797 cc = kmr_receive_for_reply(mr, spw, w,
1798 opt.reply_each, opt.reply_root);
1799 assert(cc == MPI_SUCCESS);
1803 while (spw->n_runnings > 0) {
1804 cc = kmr_wait_then_map(mr, spw,
1805 kvi, kvo, arg, opt, mapfn);
1806 assert(cc == MPI_SUCCESS);
1810 for (
int w = 0; w < spw->n_spawns; w++) {
1812 fprintf(stderr, (
";;KMR [%05d] %s [%d/%d]" 1814 " spawn=%f setup=%f run=%f mapfn=%f clean=%f" 1816 mr->rank, spw->fn, w, spw->n_spawns,
1817 ((s->timestamp[1] - s->timestamp[0]) * 1e3),
1818 ((s->timestamp[2] - s->timestamp[1]) * 1e3),
1819 ((s->timestamp[3] - s->timestamp[2]) * 1e3),
1820 ((s->timestamp[4] - s->timestamp[3]) * 1e3),
1821 ((s->timestamp[5] - s->timestamp[4]) * 1e3));
1826 assert(mr->spawn_comms != 0);
1828 kmr_free(mr->spawn_comms, (
sizeof(MPI_Comm *) * (
size_t)spw->n_spawns));
1829 mr->spawn_comms = 0;
1831 for (
int w = 0; w < spw->n_spawns; w++) {
1833 assert(s->icomm == MPI_COMM_NULL);
1834 assert(s->abuf != 0);
1835 kmr_free(s->abuf, s->alen);
1837 assert(s->argv0 != 0);
1838 kmr_free(s->argv0, (
sizeof(
char *) * (
size_t)(s->argc0 + 1)));
1842 assert(spw->ev != 0);
1843 kmr_free(spw->ev, (
sizeof(
struct kmr_kv_box) * (
size_t)spw->n_spawns));
1845 assert(spw->spawned != 0);
1846 kmr_free(spw->spawned, (
sizeof(
struct kmr_spawn_state) * (
size_t)spw->n_spawns));
1850 assert(spw->replies != 0);
1851 for (
int i = 0; i < spw->n_processes; i++) {
1852 assert(spw->replies[i] == MPI_REQUEST_NULL);
1854 kmr_free(spw->replies, (
sizeof(MPI_Request) * (
size_t)spw->n_processes));
1857 if (mode == KMR_SPAWN_PARALLEL) {
1858 assert(spw->watches != 0);
1859 for (
int i = 0; i < spw->n_processes; i++) {
1860 assert(spw->watches[i] == -1);
1862 kmr_free(spw->watches, (
sizeof(
int) * (
size_t)spw->n_processes));
1866 assert(spw->watch_listener == -1);
1896 MPI_Comm ic = MPI_COMM_NULL;
1897 cc = MPI_Comm_get_parent(&ic);
1898 assert(cc == MPI_SUCCESS);
1899 if (ic == MPI_COMM_NULL) {
1900 kmr_error(mr, (
"kmr_reply_to_spawner:" 1901 " may be called in a not-spawned process"));
1904 cc = MPI_Send(0, 0, MPI_BYTE, peer, KMR_TAG_SPAWN_REPLY, ic);
1905 assert(cc == MPI_SUCCESS);
1918 if (mr->spawn_comms == 0) {
1919 kmr_error(mr, (
"kmr_get_spawner_communicator() be called" 1920 " outside of kmr_map_via_spawn()"));
1922 if (index >= mr->spawn_size) {
1923 kmr_error(mr, (
"kmr_get_spawner_communicator() be called" 1924 " with index out of range"));
1926 MPI_Comm *comm = mr->spawn_comms[index];
1931 kmr_get_spawner_communicator_ff(
KMR *mr,
long ii,
int *comm)
1934 *comm = MPI_Comm_c2f(*c);
1996 int cc = kmr_map_spawned_processes(KMR_SPAWN_INTERACT,
1997 "kmr_map_via_spawn",
1998 kvi, kvo, arg, info, opt, mapfn);
2042 ssopt.reply_root = 0;
2043 ssopt.reply_each = 1;
2044 int cc = kmr_map_spawned_processes(KMR_SPAWN_PARALLEL,
2045 "kmr_map_parallel_processes",
2046 kvi, kvo, arg, info, ssopt, mapfn);
2072 ssopt.reply_root = 0;
2073 ssopt.reply_each = 1;
2074 int cc = kmr_map_spawned_processes(KMR_SPAWN_SERIAL,
2075 "kmr_map_serial_processes",
2076 kvi, kvo, arg, info, ssopt, mapfn);
2091 KMR *mr = kvi->c.mr;
2092 if (opt.reply_root || opt.reply_each) {
2093 kmr_error(mr,
"kmr_map_processes:" 2094 " options REPLY_ROOT/REPLY_EACH not allowed");
2098 ssopt.reply_root = 0;
2099 ssopt.reply_each = 1;
2101 int cc = kmr_map_spawned_processes(KMR_SPAWN_SERIAL,
2102 "kmr_map_processes",
2103 kvi, kvo, arg, info, ssopt, mapfn);
2106 int cc = kmr_map_spawned_processes(KMR_SPAWN_PARALLEL,
2107 "kmr_map_processes",
2108 kvi, kvo, arg, info, ssopt, mapfn);
2117 kmr_create_dummy_context(
void)
2130 MPI_Comm ic = MPI_COMM_NULL;
2131 cc = MPI_Comm_get_parent(&ic);
2132 assert(cc == MPI_SUCCESS);
2133 if (ic == MPI_COMM_NULL) {
2134 kmr_error(mr, (
"kmr_send_kvs_to_spawner:" 2135 " may be called in a not-spawned process"));
2140 assert(cc == MPI_SUCCESS && data != 0 && sz != 0);
2143 cc = MPI_Send(&siz, 1, MPI_INT, peer, KMR_TAG_SPAWN_REPLY1, ic);
2144 assert(cc == MPI_SUCCESS);
2145 cc = MPI_Send(data, (
int)sz, MPI_BYTE, peer, KMR_TAG_SPAWN_REPLY1, ic);
2146 assert(cc == MPI_SUCCESS);
2165 _Bool replyeach = 1;
2167 KMR *
const mr = kvi->c.mr;
2169 assert(icommr != 0);
2170 MPI_Comm icomm = *icommr;
2173 cc = MPI_Comm_remote_size(icomm, &peernprocs);
2174 assert(cc == MPI_SUCCESS);
2175 int npeers = (replyeach ? peernprocs : 1);
2176 for (
int peerrank = 0; peerrank < npeers; peerrank++) {
2180 cc = MPI_Recv(&sz, 1, MPI_INT,
2181 peerrank, KMR_TAG_SPAWN_REPLY1,
2183 assert(cc == MPI_SUCCESS);
2188 cc = MPI_Recv(data, sz, MPI_BYTE,
2189 peerrank, KMR_TAG_SPAWN_REPLY1,
2191 assert(cc == MPI_SUCCESS);
2194 assert(cc == MPI_SUCCESS);
2195 struct kmr_option keepopen = {.keep_open = 1};
2197 assert(cc == MPI_SUCCESS);
2198 kmr_free(data, (
size_t)sz);
2212 kmr_exec_command_e(_Bool use_exec,
const struct kmr_kv_box kv,
2216 char *name =
"kmr_map_ms_commands";
2217 KMR *mr = kvi->c.mr;
2218 _Bool tracing5 = (mr->trace_map_ms && (5 <= mr->verbosity));
2221 const int maxargc = 128;
2225 char argvstring[160];
2227 char *starter = (use_exec ?
"fork-exec" :
"system");
2228 snprintf(prefix,
sizeof(prefix),
"%s: %s", name, starter);
2230 char *abuf =
kmr_malloc((
size_t)kv.vlen + 1);
2231 char **argv =
kmr_malloc(
sizeof(
char *) * (
size_t)maxargc);
2232 memcpy(abuf, kv.v.p, (
size_t)kv.vlen);
2240 cc = kmr_scan_argv_strings(mr, abuf, (
size_t)kv.vlen, maxargc,
2242 opt.separator_space, name);
2243 assert(cc == MPI_SUCCESS);
2250 ";;KMR [%05d] %s(%s)\n",
2251 mr->rank, prefix, argvstring);
2257 waitstatus = system(abuf);
2258 if (waitstatus == -1) {
2260 char *m = strerror(errno);
2261 snprintf(ee,
sizeof(ee),
"%s() failed: %s for %s(%s)",
2262 prefix, m, starter, argvstring);
2267 if (mr->keep_fds_at_fork) {
2269 }
else if (mr->rlimit_nofile == -1) {
2271 assert(mr->rlimit_nofile > 0);
2272 closefds = mr->rlimit_nofile;
2274 closefds = mr->rlimit_nofile;
2280 char *m = strerror(errno);
2281 snprintf(ee,
sizeof(ee),
"%s: fork() failed: %s",
2286 for (
int fd = 3; fd < closefds; fd++) {
2291 unsetenv(
"LD_PRELOAD");
2292 setenv(
"XOS_MMM_L_HPAGE_TYPE",
"none", 1);
2295 cc = execvp(argv[0], argv);
2298 char *m = strerror(errno);
2299 snprintf(ee,
sizeof(ee),
2300 "%s: execvp failed: %s for execvp(%s)",
2301 name, m, argvstring);
2305 snprintf(ee,
sizeof(ee),
2306 "%s: execvp returned with=%d for execvp(%s)",
2307 name, cc, argvstring);
2312 cc = waitpid(pid, &waitstatus, 0);
2314 if (errno == EINTR) {
2316 snprintf(ee,
sizeof(ee),
2317 "%s: waitpid() interrupted",
2319 kmr_warning(mr, 1, ee);
2323 char *m = strerror(errno);
2324 snprintf(ee,
sizeof(ee),
2325 "%s: waitpid() failed: %s",
2327 kmr_warning(mr, 1, ee);
2338 if (tracing5 || WIFSIGNALED(waitstatus) || WIFSTOPPED(waitstatus)) {
2339 if (WIFEXITED(waitstatus)) {
2340 int n = WEXITSTATUS(waitstatus);
2342 ";;KMR [%05d] %s() done (%d) for %s(%s)\n",
2343 mr->rank, prefix, n, starter, argvstring);
2344 }
else if (WIFSIGNALED(waitstatus)) {
2346 int n = WTERMSIG(waitstatus);
2347 snprintf(ee,
sizeof(ee),
2348 "%s() signaled=%d in %s(%s)\n",
2349 prefix, n, starter, argvstring);
2350 if (mr->map_ms_abort_on_signal) {
2353 kmr_warning(mr, 1, ee);
2355 }
else if (WIFSTOPPED(waitstatus)) {
2358 int n = WSTOPSIG(waitstatus);
2359 snprintf(ee,
sizeof(ee),
2360 "%s() stopped=%d in %s(%s)\n",
2361 prefix, n, starter, argvstring);
2362 if (mr->map_ms_abort_on_signal) {
2365 kmr_warning(mr, 1, ee);
2370 snprintf(ee,
sizeof(ee),
2371 "%s() bad return (?): %s(%s)\n",
2372 prefix, starter, argvstring);
2373 if (mr->map_ms_abort_on_signal) {
2376 kmr_warning(mr, 1, ee);
2382 cc = (*xarg->fn)(kv, kvi, kvo, xarg->arg, index);
2383 assert(cc == MPI_SUCCESS);
2385 kmr_free(abuf, (
size_t)kv.vlen);
2386 kmr_free(argv,
sizeof(
char *) * (
size_t)maxargc);
2398 KMR *mr = kvi->c.mr;
2403 _Bool contains_separator;
2404 if (opt.separator_space) {
2405 contains_separator = 1;
2407 contains_separator = 0;
2408 for (
int i = 0; i < (kv.vlen - 1); i++) {
2409 if (kv.v.p[i] ==
'\0') {
2410 contains_separator = 1;
2416 _Bool use_exec = (mr->map_ms_use_exec || contains_separator);
2417 cc = kmr_exec_command_e(use_exec, kv, kvi, kvo, arg, index);
2446 int kmr_check_exec__(
KMR *mr);
2449 kmr_check_exec__(
KMR *mr)
2452 opt.separator_space = 0;
2460 char key[] =
"key0";
2461 char val[] =
"echo start a subprocess.; sleep 3;" 2462 " echo a process done.";
2463 const int klen =
sizeof(key);
2464 int vlen =
sizeof(val);
int kmr_ckpt_progress_init(KMR_KVS *, KMR_KVS *, struct kmr_option)
It initializes a progress of MapReduce checkpointing.
Key-Value Stream (abstract).
static int kmr_map_master(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Delivers key-value pairs as requested.
Utilities Private Part (do not include from applications).
int kmr_make_printable_info_string(char *s, size_t sz, MPI_Info info)
Fills the string buffer with the MPI_Info strings for printing.
Options to Mapping, Shuffling, and Reduction.
void kmr_ckpt_progress_fin(KMR *)
It finalizes the progress of MapReduce checkpointing.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
void kmr_ckpt_save_kvo_each_init(KMR *, KMR_KVS *)
It initializes saving indexed key-value pairs of the output KVS to a checkpoint data file...
void kmr_ckpt_save_kvo_each_fin(KMR *, KMR_KVS *)
It finalizes saving indexed key-value pairs of the output KVS to the checkpoint data file...
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_save_kvs(KMR_KVS *kvi, void **dataq, size_t *szq, struct kmr_option opt)
Packs locally the contents of a key-value stream to a byte array.
int kmr_ckpt_enabled(KMR *)
Check if checkpoint/restart is enabled.
long kmr_ckpt_first_unprocessed_kv(KMR *)
It returns the index of the first unprocessed key-value in the input KVS.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
void kmr_ckpt_save_kvo_each_add(KMR *, KMR_KVS *, long)
It adds new key-value pairs of the output KVS to the checkpoint data file.
static int kmr_exec_command(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Runs commands in kmr_map_ms_commands().
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
int kmr_map_processes(_Bool nonmpi, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent processes.
int kmr_map_via_spawn(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn().
kmr_kv_field
Datatypes of Keys or Values.
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
Options to Mapping by Spawns.
State during kmr_map_ms().
int kmr_ckpt_disable_ckpt(KMR *)
It temporally disables checkpoint/restart.
int kmr_map_ms(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps in master-worker mode.
#define kmr_realloc(P, Z)
Allocates memory, or aborts when failed.
int kmr_send_kvs_to_spawner(KMR *mr, KMR_KVS *kvs)
Sends the KVS from a spawned process to the map-function of the spawner.
int kmr_restore_kvs(KMR_KVS *kvo, void *data, size_t sz, struct kmr_option opt)
Unpacks locally the contents of a key-value stream from a byte array.
MPI_Comm * kmr_get_spawner_communicator(KMR *mr, long index)
Obtains (a reference to) a parent inter-communicator of a spawned process.
int kmr_add_identity_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Adds a given key-value pair unmodified.
int kmr_getdtablesize(KMR *mr)
Does getdtablesize(); it is defined, because it is not Posix.
int kmr_reply_to_spawner(KMR *mr)
Sends a reply message in the spawned process, which tells it is ready to finish and may have some dat...
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
int kmr_map_parallel_processes(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent MPI processes, which will not commun...
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv().
int kmr_make_printable_argv_string(char *s, size_t sz, char **argv)
Fills the string buffer with the argv strings for printing.
int kmr_reduce_as_one(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Calls a reduce-function once as if all key-value pairs had the same key.
int kmr_receive_kvs_from_spawned_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Collects key-value pairs generated by spawned processes.
int kmr_map_ms_commands(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, struct kmr_spawn_option sopt, kmr_mapfn_t m)
Maps in the master-worker mode, specialized to run serial commands.
int kmr_ckpt_enable_ckpt(KMR *, int)
It temporally enables checkpoint/restart which has been disabled by calling kmr_ckpt_disable_ckpt().
static int kmr_map_worker(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Asks the master for a task, then calls a map-function.
int kmr_map_serial_processes(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run serial processes.
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).