20 #define MAX(a,b) (((a)>(b))?(a):(b)) 21 #define MIN(a,b) (((a)<(b))?(a):(b)) 23 static int kmr_alltoallv_mpi(
KMR *mr,
void *sbuf,
long *scnts,
long *sdsps,
24 void *rbuf,
long *rcnts,
long *rdsps);
25 static int kmr_alltoallv_naive(
KMR *mr,
void *sbuf,
long *scnts,
long *sdsps,
26 void *rbuf,
long *rcnts,
long *rdsps);
27 static int kmr_alltoallv_bruck(
KMR *mr,
long maxcnt,
28 void *sbuf,
long *scnts,
long *sdsps,
29 void *rbuf,
long *rcnts,
long *rdsps);
30 static int kmr_alltoall_bruck(
KMR *mr,
void *sbuf,
void *rbuf,
int cnt);
31 static void kmr_atoa_dump_(
KMR *mr,
void *sbuf,
int sz,
char *title,
int step);
38 return ((x > 0) && ((x & (x - 1)) == 0));
44 return (kmr_powerof2_p(x) && ((x & 0x2aaaaaaa) == 0));
52 MPI_Comm comm = mr->comm;
54 cc = MPI_Alltoall(sbuf, 1, MPI_LONG, rbuf, 1, MPI_LONG, comm);
55 assert(cc == MPI_SUCCESS);
64 MPI_Comm comm = mr->comm;
66 cc = MPI_Allgather(&siz, 1, MPI_LONG, rbuf, 1, MPI_LONG, comm);
67 assert(cc == MPI_SUCCESS);
75 void *rbuf,
long *rcnts,
long *rdsps)
77 MPI_Comm comm = mr->comm;
78 int nprocs = mr->nprocs;
82 if (!rankzeroonly ||
self == 0) {
83 rsz =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
84 rdp =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
85 for (
int r = 0; r < nprocs; r++) {
86 assert(INT_MIN <= rcnts[r] && rcnts[r] <= INT_MAX);
87 assert(INT_MIN <= rdsps[r] && rdsps[r] <= INT_MAX);
88 rsz[r] = (int)rcnts[r];
89 rdp[r] = (int)rdsps[r];
97 cc = MPI_Gatherv(sbuf, (
int)scnt, MPI_BYTE,
98 rbuf, rsz, rdp, MPI_BYTE, 0, comm);
99 assert(cc == MPI_SUCCESS);
101 cc = MPI_Allgatherv(sbuf, (
int)scnt, MPI_BYTE,
102 rbuf, rsz, rdp, MPI_BYTE, comm);
103 assert(cc == MPI_SUCCESS);
106 kmr_free(rsz, (
sizeof(
int) * (
size_t)nprocs));
109 kmr_free(rdp, (
sizeof(
int) * (
size_t)nprocs));
125 void *rbuf,
long *rcnts,
long *rdsps)
127 MPI_Comm comm = mr->comm;
128 int nprocs = mr->nprocs;
130 long LIMIT = ((long)INT_MAX * 8L);
131 long cap = ((mr->atoa_size_limit == 0) ? LIMIT : mr->atoa_size_limit);
132 assert(((
long)INT_MIN * 8L) <= -cap && cap <= ((
long)INT_MAX * 8L));
140 for (
int r = 0; r < nprocs; r++) {
141 if ((scnts[r] <= cap) && (rcnts[r] <= cap)
142 && (sdsps[r] <= cap) && (rdsps[r] <= cap)) {
143 maxcnt = MAX(maxcnt, scnts[r]);
145 maxcnt = (LIMIT + 1);
149 cc = MPI_Allreduce(MPI_IN_PLACE, &maxcnt, 1, MPI_LONG, MPI_MAX, comm);
150 assert(cc == MPI_SUCCESS);
154 if (maxcnt == (LIMIT + 1)) {
155 cc = kmr_alltoallv_naive(mr, sbuf, scnts, sdsps, rbuf, rcnts, rdsps);
156 assert(cc == MPI_SUCCESS);
157 }
else if (kmr_powerof4_p(nprocs) && nprocs != 1
158 && mr->atoa_threshold != 0
159 && maxcnt < mr->atoa_threshold) {
160 cc = kmr_alltoallv_bruck(mr, maxcnt,
161 sbuf, scnts, sdsps, rbuf, rcnts, rdsps);
162 assert(cc == MPI_SUCCESS);
164 assert(maxcnt <= cap);
165 cc = kmr_alltoallv_mpi(mr, sbuf, scnts, sdsps, rbuf, rcnts, rdsps);
166 assert(cc == MPI_SUCCESS);
176 kmr_alltoallv_mpi(
KMR *mr,
177 void *sbuf,
long *scnts,
long *sdsps,
178 void *rbuf,
long *rcnts,
long *rdsps)
180 MPI_Comm comm = mr->comm;
181 int nprocs = mr->nprocs;
182 int *ssz =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
183 int *sdp =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
184 int *rsz =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
185 int *rdp =
kmr_malloc(
sizeof(
int) * (
size_t)nprocs);
187 for (
int r = 0; r < nprocs; r++) {
188 assert(INT_MIN * 8L <= scnts[r] && scnts[r] <= INT_MAX * 8L);
189 assert(INT_MIN * 8L <= rcnts[r] && rcnts[r] <= INT_MAX * 8L);
190 assert(INT_MIN * 8L <= sdsps[r] && sdsps[r] <= INT_MAX * 8L);
191 assert(INT_MIN * 8L <= rdsps[r] && rdsps[r] <= INT_MAX * 8L);
192 assert(((scnts[r] & 7) == 0)
193 && ((rcnts[r] & 7) == 0)
194 && ((sdsps[r] & 7) == 0)
195 && ((rdsps[r] & 7) == 0));
196 ssz[r] = (int)(scnts[r] / 8L);
197 rsz[r] = (int)(rcnts[r] / 8L);
198 sdp[r] = (int)(sdsps[r] / 8L);
199 rdp[r] = (int)(rdsps[r] / 8L);
202 cc = MPI_Alltoallv(sbuf, ssz, sdp, MPI_LONG,
203 rbuf, rsz, rdp, MPI_LONG, comm);
204 assert(cc == MPI_SUCCESS);
206 kmr_free(ssz, (
sizeof(
int) * (
size_t)nprocs));
207 kmr_free(rsz, (
sizeof(
int) * (
size_t)nprocs));
208 kmr_free(sdp, (
sizeof(
int) * (
size_t)nprocs));
209 kmr_free(rdp, (
sizeof(
int) * (
size_t)nprocs));
218 kmr_alltoallv_bruck(
KMR *mr,
long maxcnt,
219 void *sbuf,
long *scnts,
long *sdsps,
220 void *rbuf,
long *rcnts,
long *rdsps)
222 int nprocs = mr->nprocs;
227 char *sb =
kmr_malloc((
size_t)(maxcnt * nprocs));
228 char *rb =
kmr_malloc((
size_t)(maxcnt * nprocs));
229 for (
int i = 0; i < nprocs; i++) {
230 memcpy(&sb[maxcnt * i], &sptr[sdsps[i]], (
size_t)scnts[i]);
232 cc = kmr_alltoall_bruck(mr, sb, rb, (
int)maxcnt);
233 assert(cc == MPI_SUCCESS);
234 for (
int i = 0; i < nprocs; i++) {
235 memcpy(&rptr[rdsps[i]], &rb[maxcnt * i], (
size_t)rcnts[i]);
237 kmr_free(sb, (
size_t)(maxcnt * nprocs));
238 kmr_free(rb, (
size_t)(maxcnt * nprocs));
248 kmr_alltoallv_wait_requests(
KMR *mr,
int reqcnt, MPI_Request *rqs,
249 MPI_Status *sts,
int *indexes)
254 cc = MPI_Waitsome(reqcnt, rqs, &dones, indexes, sts);
255 assert(dones != MPI_UNDEFINED);
256 if (cc == MPI_ERR_IN_STATUS) {
257 for (
int i = 0; i < dones; i++) {
258 assert(0 <= indexes[i] && indexes[i] < reqcnt);
259 assert(sts[indexes[i]].MPI_ERROR == MPI_SUCCESS);
269 if (rqs[j] != MPI_REQUEST_NULL) {
287 kmr_alltoallv_naive(
KMR *mr,
void *sbuf,
long *scnts,
long *sdsps,
288 void *rbuf,
long *rcnts,
long *rdsps)
291 MPI_Comm comm = mr->comm;
292 int nprocs = mr->nprocs;
294 int tag = KMR_TAG_ATOA;
295 long chunk = ((mr->atoa_size_limit == 0) ? INT_MAX : mr->atoa_size_limit);
296 assert(0 < chunk && chunk <= (
long)INT_MAX);
297 int requestslimit = ((mr->atoa_requests_limit == 0)
299 : mr->atoa_requests_limit);
306 #define KMR_WRAPAROUND(I,N) \ 307 (((I)>=0) ? (((I)<(N)) ? (I) : ((I)-(N))) : ((I)+(N))) 309 MPI_Request *rqs =
kmr_malloc(
sizeof(MPI_Request) * (
size_t)requestslimit);
310 MPI_Status *sts =
kmr_malloc(
sizeof(MPI_Status) * (
size_t)requestslimit);
311 int *indexes =
kmr_malloc(
sizeof(
int) * (
size_t)requestslimit);
318 for (
int i = 0; i < nprocs; i++) {
319 int src = KMR_WRAPAROUND((
self - i), nprocs);
320 int dst = KMR_WRAPAROUND((
self + i), nprocs);
322 long rsize = rcnts[src];
323 long rchunks = ((rsize + chunk - 1) / chunk);
325 long ssize = scnts[dst];
326 long schunks = ((ssize + chunk - 1) / chunk);
328 assert((rchunks + schunks) <= INT_MAX);
332 if (requestslimit < (
int)(rchunks + schunks)) {
334 snprintf(ee, 160, (
"kmr_alltoallv: exceed the limit of requests" 335 " (atoa_requests_limit=%d needed=%ld)"),
336 requestslimit, (rchunks + schunks));
344 while (roff < rsize) {
346 while (reqcnt >= requestslimit) {
347 reqcnt = kmr_alltoallv_wait_requests(mr, reqcnt, rqs, sts,
351 assert(reqcnt < requestslimit);
352 int siz = (int)MIN((rsize - roff), chunk);
353 cc = MPI_Irecv(&rptr[rdsps[src] + roff], siz, MPI_BYTE,
354 src, tag, comm, &rqs[reqcnt]);
355 assert(cc == MPI_SUCCESS);
364 while (soff < ssize) {
366 while (reqcnt >= requestslimit) {
367 reqcnt = kmr_alltoallv_wait_requests(mr, reqcnt, rqs, sts,
371 assert(reqcnt < requestslimit);
372 int siz = (int)MIN((ssize - soff), chunk);
373 cc = MPI_Isend(&sptr[sdsps[dst] + soff], siz, MPI_BYTE,
374 dst, tag, comm, &rqs[reqcnt]);
375 assert(cc == MPI_SUCCESS);
381 cc = MPI_Waitall(reqcnt, rqs, sts);
382 if (cc == MPI_ERR_IN_STATUS) {
383 for (
int i = 0; i < reqcnt; i++) {
384 assert(sts[i].MPI_ERROR == MPI_SUCCESS);
389 kmr_free(rqs, (
sizeof(MPI_Request) * (
size_t)requestslimit));
390 kmr_free(sts, (
sizeof(MPI_Status) * (
size_t)requestslimit));
391 kmr_free(indexes, (
sizeof(
int) * (
size_t)requestslimit));
393 #undef KMR_WRAPAROUND 399 kmr_alltoall_bruck(
KMR *mr,
void *sbuf,
void *rbuf,
int cnt)
401 #define DUMP_(X0,X1,X2,X3,X4) if (tracing) kmr_atoa_dump_(X0,X1,X2,X3,X4) 402 MPI_Comm comm = mr->comm;
403 int nprocs = mr->nprocs;
405 int tag = KMR_TAG_ATOA;
406 _Bool tracing = mr->trace_alltoall;
407 assert((nprocs & 3) == 0);
408 int nprocs4th = (nprocs / 4);
412 while ((1 << lognprocs) < nprocs) {
415 assert((1 << lognprocs) == nprocs);
417 char *buf0 =
kmr_malloc((
size_t)(cnt * nprocs));
418 char *buf1 =
kmr_malloc((
size_t)(cnt * nprocs));
419 memcpy(buf0, sbuf, (
size_t)(cnt * nprocs));
422 for (
int stage = 0; stage < lognprocs; stage += 2) {
423 DUMP_(mr, buf0, cnt,
"step", stage);
424 for (
int j = 0; j < nprocs4th; j++) {
425 for (
int i = 0; i < 4; i++) {
426 void *s = &buf0[cnt * (i + (j * 4))];
427 void *r = &buf1[cnt * (nprocs4th * i + j)];
428 memcpy(r, s, (
size_t)cnt);
431 DUMP_(mr, buf1, cnt,
"pack", stage);
432 for (
int k = 0; k < 4; k++) {
433 int flip = (k << stage);
434 int peer = (rank ^ flip);
435 int baserank = ((rank >> stage) & 3);
436 int basepeer = ((peer >> stage) & 3);
438 void *s = &buf1[cnt * (baserank * nprocs4th)];
439 void *r = &buf0[cnt * (baserank * nprocs4th)];
440 memcpy(r, s, (
size_t)(cnt * nprocs4th));
442 void *s = &buf1[cnt * (basepeer * nprocs4th)];
443 void *r = &buf0[cnt * (basepeer * nprocs4th)];
445 cc = MPI_Sendrecv(s, (cnt * nprocs4th), MPI_BYTE, peer, tag,
446 r, (cnt * nprocs4th), MPI_BYTE, peer, tag,
447 comm, MPI_STATUS_IGNORE);
448 assert(cc == MPI_SUCCESS);
450 cc = MPI_Isend(s, (cnt * nprocs4th), MPI_BYTE, peer, tag,
451 comm, &rqs[(k - 1) * 2 + 1]);
452 assert(cc == MPI_SUCCESS);
453 cc = MPI_Irecv(r, (cnt * nprocs4th), MPI_BYTE, peer, tag,
454 comm, &rqs[(k - 1) * 2]);
455 assert(cc == MPI_SUCCESS);
459 cc = MPI_Waitall(6, rqs, MPI_STATUSES_IGNORE);
460 assert(cc == MPI_SUCCESS);
461 DUMP_(mr, buf0, cnt,
"exchange", stage);
463 memcpy(rbuf, buf0, (
size_t)(cnt * nprocs));
464 kmr_free(buf0, (
size_t)(cnt * nprocs));
465 kmr_free(buf1, (
size_t)(cnt * nprocs));
473 kmr_atoa_dump_(
KMR *mr,
void *sbuf,
int sz,
char *title,
int step)
475 MPI_Comm comm = mr->comm;
476 int nprocs = mr->nprocs;
482 xbuf = malloc((
size_t)(sz * nprocs * nprocs));
487 cc = MPI_Gather(sbuf, (sz * nprocs), MPI_BYTE,
488 xbuf, (sz * nprocs), MPI_BYTE,
490 assert(cc == MPI_SUCCESS);
492 fprintf(stderr,
";;KMR %s (%d)\n", title, step);
493 for (
int j = 0; j < nprocs; j++) {
494 fprintf(stderr,
";;KMR ");
495 for (
int i = 0; i < nprocs; i++) {
496 fprintf(stderr,
"%02x ",
497 (0xff & xbuf[(i * (sz * nprocs)) + (j * sz)]));
499 fprintf(stderr,
"\n");
501 fprintf(stderr,
";;KMR\n");
515 kmr_exscan(
void *sbuf,
void *rbuf,
int cnt, MPI_Datatype dt, MPI_Op op,
518 const int SCANTAG = 60;
519 MPI_Comm comm = kvs->c.mr->comm;
520 int nprocs = kvs->c.mr->nprocs;
521 int self = kvs->c.mr->rank;
524 for (
int stage = 1; stage < nprocs; stage <<= 1) {
525 int peer = (
self ^ stage);
527 cc = MPI_Sendrecv(&ssz, 1, MPI_LONG, peer, SCANTAG,
528 &rsz, 1, MPI_LONG, peer, SCANTAG,
529 comm, MPI_STATUS_IGNORE);
530 assert(cc == MPI_SUCCESS);
531 cc = MPI_Sendrecv(sbuf, ssz, MPI_BYTE, peer, SCANTAG,
532 rbuf, rsz, MPI_BYTE, peer, SCANTAG,
533 comm, MPI_STATUS_IGNORE);
534 assert(cc == MPI_SUCCESS);
537 if ((
self & (stage - 1)) != 0) {
538 kmr_add_kv_vector(kvo, rbuf, rsz);
542 if (commute ||
self > peer) {
543 kmr_add_kv_vector(kvs, rbuf, rsz);
546 kmr_add_kv_vector(kvs, rbuf, rsz);
549 if (kvs->element_count > threshold) {
int kmr_allgatherv(KMR *mr, _Bool rankzeroonly, void *sbuf, long scnt, void *rbuf, long *rcnts, long *rdsps)
All-gathers data, or gathers data when RANKZEROONLY.
Utilities Private Part (do not include from applications).
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
int kmr_exchange_sizes(KMR *mr, long *sbuf, long *rbuf)
Calls all-to-all to exchange one long-integer.
int kmr_alltoallv(KMR *mr, void *sbuf, long *scnts, long *sdsps, void *rbuf, long *rcnts, long *rdsps)
Does all-to-all-v, but it takes the arguments by long-integers.
int kmr_gather_sizes(KMR *mr, long siz, long *rbuf)
Calls all-gather for collecting one long-integer.