45 #include <sys/types.h> 50 #include <sys/socket.h> 51 #include <netinet/in.h> 52 #include <arpa/inet.h> 59 struct sockaddr_in in4;
60 struct sockaddr_in6 in6;
61 struct sockaddr_storage ss;
65 main(
int argc,
char **argv)
70 fprintf(stderr, (
";;KMR kmrwatch0 error:" 71 " too few arguments: argc=%d\n"), argc);
76 char *hostport = argv[2];
77 char *magic = argv[3];
78 char *separator = argv[4];
82 _Bool quitwithoutfinalize = 0;
83 int lm = (int)strlen(magic);
84 for (
int i = 0; i < lm; i++) {
85 if (magic[i] ==
'V' && i < (lm - 1)) {
86 version = magic[i + 1] -
'0';
88 if (magic[i] ==
'T') {
91 if (magic[i] ==
'X') {
92 quitwithoutfinalize = 1;
97 fprintf(stderr, (
";;KMR kmrwatch0 error:" 98 " version mismatch: %s\n"), magic);
102 if (!(strcmp(mode,
"seq") == 0 || (strcmp(mode,
"mpi") == 0))) {
103 fprintf(stderr, (
";;KMR kmrwatch0 error:" 104 " bad mode (be seq or mpi): %s\n"), mode);
107 if (strcmp(separator,
"--") != 0) {
108 fprintf(stderr, (
";;KMR kmrwatch0 error:" 109 " bad separator, -- needed: %s\n"), separator);
115 fprintf(stderr,
";;KMR kmrwatch0: pid=%d\n", pid);
121 _Bool nonmpi = (mode[0] ==
's');
127 MPI_Comm parent = MPI_COMM_NULL;
131 fprintf(stderr, (
";;KMR kmrwatch0: initializing MPI\n"));
136 cc = MPI_Init(&argc, &argv);
137 assert(cc == MPI_SUCCESS);
140 cc = MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &lev);
141 assert(cc == MPI_SUCCESS);
143 cc = MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
144 assert(cc == MPI_SUCCESS);
145 cc = MPI_Comm_rank(MPI_COMM_WORLD, &rank);
146 assert(cc == MPI_SUCCESS);
147 cc = MPI_Comm_get_parent(&parent);
148 assert(cc == MPI_SUCCESS);
149 if (parent == MPI_COMM_NULL) {
150 fprintf(stderr, (
"KMR kmrwatch0 error:" 151 " MPI_Comm_get_parent() failed\n"));
158 fprintf(stderr, (
";;KMR kmrwatch0:" 159 " starting a process\n"));
166 char *m = strerror(errno);
167 fprintf(stderr, (
"KMR kmrwatch0 error:" 168 " fork() failed: %s\n"), m);
177 cc = execvp(argv[5], &argv[5]);
179 char *m = strerror(errno);
180 fprintf(stderr, (
"KMR kmrwatch0 error:" 181 " execvp(%s) failed: %s\n"),
184 fprintf(stderr, (
"KMR kmrwatch0 error:" 185 " execvp() returned: cc=%d\n"),
192 if (pid != -1 && !nonmpi) {
195 char host[NI_MAXHOST + 6];
199 fprintf(stderr, (
";;KMR kmrwatch0:" 200 " connecting a socket to the spawner\n"));
204 int len = ((int)strlen(hostport) + 1);
205 if (len > (
int)
sizeof(host)) {
206 fprintf(stderr, (
"KMR kmrwatch0 error:" 207 " bad host/port pair (too long): %s\n"),
211 memcpy(host, hostport, (
size_t)len);
216 char *s = rindex(host,
'/');
219 (
"KMR kmrwatch0 error:" 220 " bad host/port pair (no slash): %s\n"), hostport);
225 cc = sscanf(port,
"%d%c", &portno, (
char *)&gomi);
228 (
"KMR kmrwatch0 error:" 229 " bad host/port pair (bad port): %s\n"), hostport);
233 struct addrinfo hints;
234 memset(&hints, 0,
sizeof(hints));
235 hints.ai_flags = (AI_ADDRCONFIG);
236 hints.ai_socktype = SOCK_STREAM;
237 hints.ai_protocol = IPPROTO_TCP;
239 hints.ai_family = AF_INET;
240 }
else if (preferip == 6) {
241 hints.ai_family = AF_INET6;
243 hints.ai_family = AF_UNSPEC;
247 cc = getaddrinfo(host, port, &hints, &ai);
249 char const *m = gai_strerror(cc);
250 fprintf(stderr, (
"KMR kmrwatch0 error:" 251 "getaddrinfo(%s,%s) failed: %s.\n"),
255 struct {
char *s;
struct addrinfo *a;
int e;} errs[10];
256 int addresstries = 0;
257 for (
struct addrinfo *p = ai; p != 0; p = p->ai_next) {
258 union SOCKADDR *sa = (
void *)p->ai_addr;
260 if (p->ai_family == AF_INET) {
261 assert(ntohs(sa->in4.sin_port) == portno);
263 }
else if (p->ai_family == AF_INET6) {
264 assert(ntohs(sa->in6.sin6_port) == portno);
270 if (addresstries >= (
int)(
sizeof(errs) /
sizeof(*errs))) {
273 errs[addresstries - 1].s = 0;
275 fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
277 errs[addresstries - 1].s =
"socket";
278 errs[addresstries - 1].a = p;
279 errs[addresstries - 1].e = errno;
280 char *m = strerror(errno);
282 fprintf(stderr, (
"KMR kmrwatch0 error:" 283 "socket(%s) failed: %s\n"),
291 cc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
294 char *m = strerror(errno);
295 fprintf(stderr, (
"KMR kmrwatch0 error:" 296 "setsockopt(SO_REUSEADDR) failed" 297 " (ignored): %s\n"), m);
299 cc = connect(fd, p->ai_addr, p->ai_addrlen);
301 errs[addresstries - 1].s =
"connect";
302 errs[addresstries - 1].a = p;
303 errs[addresstries - 1].e = errno;
306 char *m = strerror(errno);
309 " connect(%s): %s\n"), hostport, m);
319 if (addresstries == 0) {
320 fprintf(stderr, (
"KMR kmrwatch0 error:" 321 "No address found for %s %s\n"),
324 for (
int i = 0; i < addresstries; i++) {
325 assert(errs[i].s != 0);
326 struct addrinfo *p = errs[i].a;
327 union SOCKADDR *sa = (
void *)p->ai_addr;
330 if (p->ai_family == AF_INET) {
331 addr = &(sa->in4.sin_addr);
333 }
else if (p->ai_family == AF_INET6) {
334 addr = &sa->in6.sin6_addr;
340 char peer[INET6_ADDRSTRLEN];
341 inet_ntop(p->ai_family, addr, peer,
sizeof(peer));
342 char *m = strerror(errs[i].e);
343 if (strcmp(errs[i].s,
"socket") == 0) {
344 fprintf(stderr, (
"KMR kmrwatch0 error:" 345 " socket(%s) failed: %s\n"),
347 }
else if (strcmp(errs[i].s,
"connect") == 0) {
349 (
"KMR kmrwatch0 error:" 350 " connect(%s/%s) failed: %s\n"),
363 ssize_t rsize = read(fd, &val,
sizeof(
int));
365 char *m = strerror(errno);
366 fprintf(stderr, (
"KMR kmrwatch0 error:" 367 "read failed: %s\n"), m);
371 assert(rsize ==
sizeof(
int));
372 ssize_t wsize = write(fd, &val,
sizeof(
int));
374 char *m = strerror(errno);
375 fprintf(stderr, (
"KMR kmrwatch0 error:" 376 "write failed: %s\n"), m);
380 assert(wsize ==
sizeof(
int));
388 fprintf(stderr, (
";;KMR kmrwatch0:" 389 " waiting for a process" 390 " to finish (pid=%d)\n"),
396 memset(&sa, 0,
sizeof(sa));
397 sigemptyset(&sa.sa_mask);
398 sa.sa_flags = (SA_RESETHAND);
400 cc = sigaction(SIGALRM, &sa, 0);
402 char *m = strerror(errno);
403 fprintf(stderr, (
"KMR kmrwatch0 error:" 404 " sigaction(%d): %s\n", SIGALRM, m));
409 cc = waitpid(pid, &st, 0);
411 if (errno == EINTR) {
412 fprintf(stderr, (
"KMR kmrwatch0 error:" 413 " waitpid() interrupted\n"));
415 char *m = strerror(errno);
416 fprintf(stderr, (
"KMR kmrwatch0 error:" 417 " waitpid() failed: %s\n"), m);
423 fprintf(stderr, (
";;KMR kmrwatch0: detected a process done\n"));
429 fprintf(stderr, (
";;KMR kmrwatch0:" 430 " sending a reply for done-notification\n"));
435 cc = MPI_Send(0, 0, MPI_BYTE, peer,
436 KMR_TAG_SPAWN_REPLY, parent);
437 assert(cc == MPI_SUCCESS);
439 if (quitwithoutfinalize) {
441 fprintf(stderr, (
";;KMR kmrwatch0:" 442 " force quit without finalizing MPI\n"));
449 fprintf(stderr, (
";;KMR kmrwatch0: finalizing MPI\n"));
454 assert(cc == MPI_SUCCESS);
457 fprintf(stderr, (
";;KMR kmrwatch0:" 458 " closing a socket for done-notification\n"));
469 fprintf(stderr, (
";;KMR kmrwatch0: done\n"));