68 #include <sys/types.h> 70 #include <sys/resource.h> 71 #include <netinet/in.h> 77 #define MIN(a,b) (((a)<(b))?(a):(b)) 83 _Bool load_tables_in_advance;
84 _Bool hang_out_communication;
85 _Bool redistribute_loaded_tables;
86 _Bool use_small_block_size;
87 size_t pushoff_block_size_in_kilo;
90 _Bool load_tables_in_advance_;
91 _Bool redistribute_loaded_tables_;
93 _Bool files_in_rank_directory = 0;
95 _Bool report_count_in_messages = 0;
96 _Bool report_time_to_read = 0;
97 _Bool report_pushoff_statistics = 0;
99 #undef USE_TIME_FUNCTIONS 102 #define NAME_SIZE (25) 106 enum {T_FST = 0, T_SND = 1};
107 #define FST ((int)T_FST) 108 #define SND ((int)T_SND) 115 strnlen_(
const char *s,
size_t n)
121 const char *limit = (s + n);
122 while (*p != 0 && p < limit) {
125 return (
size_t)(p - s);
131 strnstr_(
const char *s1,
const char *s2,
size_t n)
133 if (s1 == 0 || s2 == 0 || *s2 == 0) {
136 size_t len = strnlen_(s2, n);
137 const char *limit = (s1 + n - len + 1);
140 while (*p != 0 && p < limit) {
141 if (*p == c0 && strncmp(p, s2, len) == 0) {
154 union {uint64_t ll; uint32_t i[2];} v;
155 uint32_t lo = (uint32_t)x;
156 uint32_t hi = (uint32_t)(x >> 32);
164 static char *xxx_optarg;
165 static int xxx_optind = 1;
166 static int xxx_optopt;
170 xxx_getopt(
int argc,
char **argv,
char *optstring)
173 if (xxx_optind >= argc) {
176 char *p = argv[xxx_optind];
180 }
else if (strcmp(p,
"-") == 0) {
183 }
else if (strcmp(p,
"--") == 0) {
189 char *o = strchr(optstring, xxx_optopt);
192 }
else if (o[1] !=
':') {
201 if (xxx_optind < argc) {
202 xxx_optarg = argv[xxx_optind];
220 TAB_N, TAB_R, TAB_P, TAB_S, TAB_H, TAB_C, TAB_O, TAB_L,
222 TAB_Q7_N1, TAB_Q7_NN, TAB_Q7_S1, TAB_Q7_NNS,
223 TAB_Q7_C1, TAB_Q7_O1, TAB_Q7_CO, TAB_Q7_L1,
224 TAB_Q7_CLO, TAB_Q7_CLNNOS0, TAB_Q7_CLNNOS1, TAB_Q7_REVENUE,
226 TAB_Q9, TAB_Q9_PH, TAB_Q9_NS, TAB_Q9_PHSN,
227 TAB_Q9_LO, TAB_Q9_PHSNLO, TAB_Q9_AMOUNT,
229 TAB_Q10_L1, TAB_Q10_O1, TAB_Q10_LO, TAB_Q10_C1,
230 TAB_Q10_N1, TAB_Q10_CN, TAB_Q10_CLNO0, TAB_Q10_CLNO1,
232 TAB_Q13_O1, TAB_Q13_CO0, TAB_Q13_CO1,
234 TAB_Q21_N1, TAB_Q21_NS, TAB_Q21_L1, TAB_Q21_O1,
235 TAB_Q21_L3, TAB_Q21_LNS, TAB_Q21_NAME,
239 KMR_KVS *N0, *R0, *P0, *S0, *H0, *C0, *O0, *L0;
241 enum FIELD {F_NIL, F_ZAHL, F_REAL, F_TEXT, F_DATE};
250 struct COLUMN columns[MAXCOLS];
252 struct COLUMN keys[MAXCOLS];
257 #define RECORD_SIZE (1024) 270 struct RECORD description;
272 {TAB_N, {&N0,
"nation.tbl", 1, 0, 0, 0, 0},
273 {4, {{
"n_nationkey", F_ZAHL},
275 {
"n_regionkey", F_ZAHL},
276 {
"n_comment", F_TEXT}},
277 1, {{
"n_nationkey", F_ZAHL}}}},
278 {TAB_R, {&R0,
"region.tbl", 1, 0, 0, 0, 0},
279 {3, {{
"r_regionkey", F_ZAHL},
281 {
"r_comment", F_TEXT}},
282 1, {{
"r_regionkey", F_ZAHL}}}},
283 {TAB_P, {&P0,
"part.tbl", 2, 0, 0, 0, 0},
284 {9, {{
"p_partkey", F_ZAHL},
290 {
"p_container", F_TEXT},
291 {
"p_retailprice", F_REAL},
292 {
"p_comment", F_TEXT}},
293 1, {{
"p_partkey", F_ZAHL}}}},
294 {TAB_S, {&S0,
"supplier.tbl", 2, 0, 0, 0, 0},
295 {7, {{
"s_suppkey", F_ZAHL},
297 {
"s_address", F_TEXT},
298 {
"s_nationkey", F_ZAHL},
300 {
"s_acctbal", F_REAL},
301 {
"s_comment", F_TEXT}},
302 1, {{
"s_suppkey", F_ZAHL}}}},
303 {TAB_H, {&H0,
"partsupp.tbl", 2, 0, 0, 0, 0},
304 {5, {{
"ps_partkey", F_ZAHL},
305 {
"ps_suppkey", F_ZAHL},
306 {
"ps_availqty", F_ZAHL},
307 {
"ps_supplycost", F_REAL},
308 {
"ps_comment", F_TEXT}},
309 1, {{
"ps_partkey", F_ZAHL}}}},
310 {TAB_C, {&C0,
"customer.tbl", 2, 0, 0, 0, 0},
311 {8, {{
"c_custkey", F_ZAHL},
313 {
"c_address", F_TEXT},
314 {
"c_nationkey", F_ZAHL},
316 {
"c_acctbal", F_REAL},
317 {
"c_mktsegment", F_TEXT},
318 {
"c_comment", F_TEXT}},
319 1, {{
"c_custkey", F_ZAHL}}}},
320 {TAB_O, {&O0,
"orders.tbl", 2, 0, 0, 0, 0},
321 {9, {{
"o_orderkey", F_ZAHL},
322 {
"o_custkey", F_ZAHL},
323 {
"o_orderstatus", F_TEXT},
324 {
"o_totalprice", F_REAL},
325 {
"o_orderdate", F_DATE},
326 {
"o_orderpriority", F_TEXT},
328 {
"o_shippriority", F_ZAHL},
329 {
"o_comment", F_TEXT}},
330 1, {{
"o_orderkey", F_ZAHL}}}},
331 {TAB_L, {&L0,
"lineitem.tbl", 2, 0, 0, 0, 0},
332 {16, {{
"l_orderkey", F_ZAHL},
333 {
"l_partkey", F_ZAHL},
334 {
"l_suppkey", F_ZAHL},
335 {
"l_linenumber", F_REAL},
336 {
"l_quantity", F_REAL},
337 {
"l_extendedprice", F_REAL},
338 {
"l_discount", F_REAL},
340 {
"l_returnflag", F_TEXT},
341 {
"l_linestatus", F_TEXT},
342 {
"l_shipdate", F_DATE},
343 {
"l_commitdate", F_DATE},
344 {
"l_receiptdate", F_DATE},
345 {
"l_shipinstruct", F_TEXT},
346 {
"l_shipmode", F_TEXT},
347 {
"l_comment", F_TEXT}},
348 1, {{
"l_orderkey", F_ZAHL}}}},
353 {2, {{
"n_nationkey", F_ZAHL},
357 {4, {{
"n1.n_nationkey", F_ZAHL},
358 {
"n1.n_name", F_TEXT},
359 {
"n2.n_nationkey", F_ZAHL},
360 {
"n2.n_name", F_TEXT}},
361 1, {{
"n1.n_nationkey", F_ZAHL}}}},
363 {2, {{
"s_nationkey", F_ZAHL},
364 {
"s_suppkey", F_ZAHL}},
365 1, {{
"s_nationkey", F_ZAHL}}}},
367 {4, {{
"n2.n_nationkey", F_ZAHL},
368 {
"s_suppkey", F_ZAHL},
369 {
"n1.n_name", F_TEXT},
370 {
"n2.n_name", F_TEXT}},
371 2, {{
"n2.s_nationkey", F_ZAHL},
372 {
"s_suppkey", F_ZAHL}}}},
374 {2, {{
"c_custkey", F_ZAHL},
375 {
"c_nationkey", F_ZAHL}},
376 1, {{
"c_custkey", F_ZAHL}}}},
378 {2, {{
"o_custkey", F_ZAHL},
379 {
"o_orderkey", F_ZAHL}},
380 1, {{
"o_custkey", F_ZAHL}}}},
382 {2, {{
"o_orderkey", F_ZAHL},
383 {
"c_nationkey", F_ZAHL}},
384 1, {{
"o_orderkey", F_ZAHL}}}},
386 {4, {{
"l_orderkey", F_ZAHL},
387 {
"l_suppkey", F_ZAHL},
390 1, {{
"l_orderkey", F_ZAHL}}}},
392 {4, {{
"c_nationkey", F_ZAHL},
393 {
"l_suppkey", F_ZAHL},
396 2, {{
"c_nationkey", F_ZAHL},
397 {
"l_suppkey", F_ZAHL}}}},
398 {TAB_Q7_CLNNOS0, {0},
399 {4, {{
"n1.n_name", F_TEXT},
400 {
"n2.n_name", F_TEXT},
404 {TAB_Q7_CLNNOS1, {0},
405 {4, {{
"n1.n_name", F_TEXT},
406 {
"n2.n_name", F_TEXT},
409 3, {{
"n1.n_name", F_TEXT},
410 {
"n2.n_name", F_TEXT},
412 {TAB_Q7_REVENUE, {0},
413 {4, {{
"n1.n_name", F_TEXT},
414 {
"n2.n_name", F_TEXT},
416 {
"revenue", F_REAL}},
417 3, {{
"n1.n_name", F_TEXT},
418 {
"n2.n_name", F_TEXT},
424 {3, {{
"n_name", F_TEXT},
425 {
"o_orderdate", F_DATE},
427 1, {{
"nation+year", F_NIL}}}},
429 {3, {{
"ps_partkey", F_ZAHL},
430 {
"ps_suppkey", F_ZAHL},
431 {
"ps_supplycost", F_REAL}},
432 1, {{
"ps_suppkey", F_ZAHL}}}},
434 {2, {{
"n_name", F_TEXT},
435 {
"s_suppkey", F_ZAHL}},
436 1, {{
"s_suppkey", F_ZAHL}}}},
438 {4, {{
"n_name", F_TEXT},
439 {
"ps_partkey", F_ZAHL},
440 {
"ps_suppkey", F_ZAHL},
441 {
"ps_supplycost", F_REAL}},
442 2, {{
"ps_partkey", F_ZAHL},
443 {
"ps_suppkey", F_ZAHL}}}},
445 {6, {{
"l_discount", F_REAL},
446 {
"l_extendedprice", F_REAL},
447 {
"l_partkey", F_ZAHL},
448 {
"l_quantity", F_REAL},
449 {
"l_suppkey", F_ZAHL},
450 {
"o_orderdate", F_DATE}},
451 2, {{
"l_partkey", F_ZAHL},
452 {
"l_suppkey", F_ZAHL}}}},
454 {6, {{
"l_discount", F_REAL},
455 {
"l_extendedprice", F_REAL},
456 {
"l_quantity", F_REAL},
458 {
"o_orderdate", F_DATE},
459 {
"ps_supplycost", F_REAL}},
460 1, {{
"o_orderdate", F_DATE}}}},
462 {3, {{
"nation", F_TEXT},
465 2, {{
"nation", F_TEXT},
471 {2, {{
"l_orderkey", F_ZAHL},
473 1, {{
"l_orderkey", F_ZAHL}}}},
475 {2, {{
"o_orderkey", F_ZAHL},
476 {
"o_custkey", F_ZAHL}},
477 1, {{
"o_orderkey", F_ZAHL}}}},
479 {2, {{
"o_custkey", F_ZAHL},
481 1, {{
"o_custkey", F_ZAHL}}}},
483 {7, {{
"c_nationkey", F_ZAHL},
484 {
"c_custkey", F_ZAHL},
486 {
"c_acctbal", F_REAL},
488 {
"c_address", F_TEXT},
489 {
"c_comment", F_TEXT}},
490 1, {{
"c_nationkey", F_ZAHL}}}},
492 {7, {{
"n_nationkey", F_ZAHL},
494 1, {{
"n_nationkey", F_ZAHL}}}},
496 {7, {{
"c_custkey", F_ZAHL},
498 {
"c_acctbal", F_REAL},
501 {
"c_address", F_TEXT},
502 {
"c_comment", F_TEXT}},
503 1, {{
"c_custkey", F_ZAHL}}}},
505 {8, {{
"c_custkey", F_ZAHL},
507 {
"c_acctbal", F_REAL},
510 {
"c_address", F_TEXT},
511 {
"c_comment", F_TEXT},
513 7, {{
"c_custkey", F_ZAHL},
515 {
"c_acctbal", F_REAL},
518 {
"c_address", F_TEXT},
519 {
"c_comment", F_TEXT}}}},
521 {8, {{
"c_custkey", F_ZAHL},
524 {
"c_acctbal", F_REAL},
526 {
"c_address", F_TEXT},
528 {
"c_comment", F_TEXT}},
529 1, {{
"revenue", F_REAL}}}},
534 {1, {{
"o_orderkey", F_ZAHL}},
535 1, {{
"o_custkey", F_ZAHL}}}},
537 {1, {{
"c_custkey", F_ZAHL}},
538 1, {{
"q13_count", F_ZAHL}}}},
540 {2, {{
"q13_custdist", F_ZAHL},
541 {
"q13_count", F_ZAHL}},
542 2, {{
"q13_custdist", F_ZAHL},
543 {
"q13_count", F_ZAHL}}}},
548 {1, {{
"n_nationkey", F_ZAHL}},
549 1, {{
"n_nationkey", F_ZAHL}}}},
551 {1, {{
"s_suppkey", F_ZAHL},
553 1, {{
"s_suppkey", F_ZAHL}}}},
555 {2, {{
"l_orderkey", F_ZAHL},
556 {
"l_suppkey", F_ZAHL}},
557 1, {{
"l_suppkey", F_ZAHL}}}},
559 {2, {{
"l_orderkey", F_ZAHL},
560 {
"l_suppkey", F_ZAHL}},
561 1, {{
"l_orderkey", F_ZAHL}}}},
563 {3, {{
"l_orderkey", F_ZAHL},
564 {
"l_suppkey", F_ZAHL},
566 1, {{
"l_orderkey", F_ZAHL}}}},
568 {1, {{
"o_orderkey", F_ZAHL}},
569 1, {{
"o_orderkey", F_ZAHL}}}},
571 {1, {{
"s_name", F_TEXT}},
572 1, {{
"s_name", F_TEXT}}}},
573 {TAB_Q21_NUMWAIT, {0},
574 {2, {{
"s_name", F_TEXT},
575 {
"numwait", F_ZAHL}},
576 1, {{
"numwait", F_ZAHL},
577 {
"s_name", F_TEXT}}}}};
585 int columns[MAXCOLS][2];
587 int keys[MAXCOLS][2];
589 _Bool trace_product_nonempty;
606 int columns[MAXCOLS];
615 char *columns[MAXCOLS];
629 static struct timeval tv0 = {.tv_sec = 0};
632 cc = gettimeofday(&tv, 0);
634 if (tv0.tv_sec == 0) {
636 assert(tv0.tv_sec != 0);
638 double dt = ((double)(tv.tv_sec - tv0.tv_sec)
639 + ((double)(tv.tv_usec - tv0.tv_usec) * 1e-6));
646 pcount(
KMR_KVS *kvs0,
KMR_KVS *kvs1,
char *msg, _Bool before0after1)
648 KMR *mr = kvs0->c.mr;
649 if (report_count_in_messages) {
657 char *s0 = (before0after1 == 0 ?
"before" :
"after");
658 char *s1 = (before0after1 == 0 ?
"..." :
"");
660 printf(
"%s %s #=%ld #=%ld%s\n", msg, s0, c0, c1, s1);
663 printf(
"%s %s #=%ld%s\n", msg, s0, c0, s1);
669 char *s0 = (before0after1 == 0 ?
"before" :
"after");
670 char *s1 = (before0after1 == 0 ?
"..." :
"");
671 printf(
"%s %s%s\n", msg, s0, s1);
680 ptime(
KMR_KVS *kvs0,
KMR_KVS *kvs1,
char *func,
char *msg,
double dt)
682 KMR *mr = kvs0->c.mr;
683 if (report_count_in_messages) {
697 printf(
"%s (%s) #=%ld #=%ld in %f sec\n",
698 func, msg, c0, c1, dt);
701 printf(
"%s (%s) #=%ld in %f sec\n",
710 }
else if (mr->rank == 0) {
711 printf(
"%s (%s) in %f sec\n", func, msg, dt);
718 phisto(
KMR_KVS *kvs,
char *msg)
721 int nprocs = mr->nprocs;
726 for (
int r = 0; r < nprocs; r++) {
727 printf(
"%s histo[%d]=%ld\n", msg, r, histo[r]);
742 switch (kvo->c.key_data) {
744 assert(kvo->c.key_data != KMR_KV_BAD);
751 k1.d = *(
double *)e.p;
755 case KMR_KV_POINTER_OWNED:
756 case KMR_KV_POINTER_UNMANAGED:
783 find_table(
enum TABLE table)
785 int ntables = (
sizeof(tables) /
sizeof(tables[0]));
786 for (
int i = 0; i < ntables; i++) {
787 if (tables[i].name == table) {
796 find_description(
enum TABLE table)
799 return &(tbl->description);
803 column_index_by_name(
struct RECORD *description,
char *name)
805 for (
int i = 0; i < MAXCOLS; i++) {
806 if (description->columns[i].label == 0) {
809 if (strcmp(description->columns[i].label, name) == 0) {
818 column_by_name(struct
kmr_ntuple *u,
struct RECORD *description,
char *name)
820 int c = column_index_by_name(description, name);
826 put_columns_by_indexes(
KMR *mr,
struct kmr_ntuple *v,
size_t vsz,
829 for (
int i = 0; i < ncols; i++) {
836 put_columns_by_names(
KMR *mr,
struct kmr_ntuple *v,
size_t vsz,
838 char **columns,
int ncolumns)
840 for (
int i = 0; i < ncolumns; i++) {
841 char *name = columns[i];
849 assert_column_fields(
int inputs[2],
int output,
850 int columns[][2],
int ncolumns,
851 int keys[][2],
int nkeys)
853 struct RECORD *a = find_description((
enum TABLE)inputs[0]);
854 struct RECORD *b = find_description((
enum TABLE)inputs[1]);
855 struct RECORD *o = find_description((
enum TABLE)output);
856 assert(o->ncolumns == ncolumns);
857 for (
int i = 0; i < ncolumns; i++) {
858 int *choice = columns[i];
859 assert(choice[0] < (choice[1] == 0 ? a : b)->ncolumns);
860 enum FIELD fi = ((choice[1] == 0 ? a : b)->columns[choice[0]]).field;
861 enum FIELD fo = o->columns[i].field;
864 assert(o->nkeys == nkeys);
865 for (
int i = 0; i < nkeys; i++) {
866 int *choice = keys[i];
867 assert(choice[0] < (choice[1] == 0 ? a : b)->ncolumns);
868 enum FIELD fi = ((choice[1] == 0 ? a : b)->columns[choice[0]]).field;
869 enum FIELD fo = o->keys[i].field;
881 #ifdef USE_TIME_FUNCTIONS 884 char *end = strptime(p,
"%F", &tm);
885 time_t tv = mktime(&tm);
886 if ((tv == (time_t)-1) || ((end - p) != 10)) {
893 assert(
sizeof(time_t) >= 8);
895 if ((p[4] !=
'-') || (p[7] !=
'-')) {
898 for (
int i = 0; i < 10; i++) {
899 if ((i != 4 && i != 7) && !(
'0' <= p[i] && p[i] <=
'9')) {
903 long v = (((long)p[0] << 8*7) | ((long)p[1] << 8*6)
904 | ((long)p[2] << 8*5) | ((long)p[3] << 8*4)
905 | ((long)p[5] << 8*3) | ((long)p[6] << 8*2)
906 | ((long)p[8] << 8*1) | ((long)p[9] << 8*0));
916 format_date(
char *s,
size_t sz, time_t tv)
918 #ifdef USE_TIME_FUNCTIONS 921 struct tm *tm = localtime_r(&tv, &tmbuf);
925 size_t cx = strftime(s, sz,
"%F", tm);
932 s[0] = (char)((v >> 8*7) & 0xff);
933 s[1] = (char)((v >> 8*6) & 0xff);
934 s[2] = (char)((v >> 8*5) & 0xff);
935 s[3] = (char)((v >> 8*4) & 0xff);
937 s[5] = (char)((v >> 8*3) & 0xff);
938 s[6] = (char)((v >> 8*2) & 0xff);
940 s[8] = (char)((v >> 8*1) & 0xff);
941 s[9] = (char)((v >> 8*0) & 0xff);
950 year_value(time_t date)
952 #ifdef USE_TIME_FUNCTIONS 955 struct tm *tmx = localtime_r(&date, &tm);
966 time_t year = mktime(&tm);
967 assert(year != (time_t)-1);
970 char tv0[32], tv1[32];
972 localtime_r(&date, &tm0);
973 strftime(tv0,
sizeof(tv0),
"%F", &tm0);
974 localtime_r(&year, &tm1);
975 strftime(tv1,
sizeof(tv1),
"%F", &tm1);
976 printf(
"date=%s year=%s\n", tv0, tv1); fflush(0);
983 long m = (((long)0xff << 8*3) | ((long)0xff << 8*2)
984 | ((long)0xff << 8*1) | ((long)0xff << 8*0));
985 long z = (((long)
'0' << 8*3) | ((long)
'1' << 8*2)
986 | ((long)
'0' << 8*1) | ((long)
'1' << 8*0));
987 long v = ((date & ~m) | z);
995 get_int_column_by_index(
struct kmr_ntuple *u,
struct COLUMN *columns,
int nth)
997 assert(columns[nth].field == F_ZAHL);
999 long v = *(
long *)(e.p);
1004 get_real_column_by_index(
struct kmr_ntuple *u,
struct COLUMN *columns,
int nth)
1006 assert(columns[nth].field == F_REAL);
1008 double v = *(
double *)(e.p);
1016 int nth = column_index_by_name(description, name);
1017 assert(description->columns[nth].field == F_REAL);
1019 double v = *(
double *)(e.p);
1025 get_text_column_by_index(
struct kmr_ntuple *u,
struct COLUMN *columns,
int nth)
1027 assert(columns[nth].field == F_TEXT);
1035 get_date_column_by_index(
struct kmr_ntuple *u,
struct COLUMN *columns,
int nth)
1037 assert(columns[nth].field == F_DATE);
1039 time_t v = *(time_t *)(e.p);
1044 get_date_column(
struct kmr_ntuple *u,
struct RECORD *description,
char *name)
1046 int nth = column_index_by_name(description, name);
1047 assert(description->columns[nth].field == F_DATE);
1049 time_t v = *(time_t *)(e.p);
1058 char *line,
size_t linesz,
struct TABLE_INFO *tbl)
1062 int marker = (int)tbl->name;
1063 struct RECORD *description = &(tbl->description);
1064 struct COLUMN *descs = description->columns;
1065 int ndescs = description->ncolumns;
1068 char *
const end = (line + linesz);
1070 for (
int i = 0; i < ndescs; i++) {
1072 while (s < end && *s !=
'|') {
1075 char *
const q = ((s == end) ? 0 : s);
1077 fprintf(stderr,
"Fewer fields in line (%s)\n", line);
1078 MPI_Abort(MPI_COMM_WORLD, 1);
1084 switch (descs[i].field) {
1092 cc = sscanf(p,
"%ld%c", &v, gomi);
1095 fprintf(stderr,
"Bad integer in %d-th field in line (%s)\n",
1097 MPI_Abort(MPI_COMM_WORLD, 1);
1106 cc = sscanf(p,
"%lf%c", &v, gomi);
1109 fprintf(stderr,
"Bad real in %d-th field in line (%s)\n",
1111 MPI_Abort(MPI_COMM_WORLD, 1);
1119 int len = (int)(q - p);
1124 time_t tv = decode_date(p);
1125 if (tv == (time_t)-1) {
1127 fprintf(stderr,
"Bad date in %d-th field in line (%s)\n",
1129 MPI_Abort(MPI_COMM_WORLD, 1);
1141 fprintf(stderr,
"(warning) Extra characters in line (%s)\n", line);
1151 char line[RECORD_SIZE];
1152 char vbuf[RECORD_SIZE];
1155 KMR *mr = kvo->c.mr;
1159 char *p = (
void *)kv0.v.p;
1160 size_t linesz = (
size_t)kv0.vlen;
1162 assert(linesz <
sizeof(line));
1163 memcpy(line, p, linesz);
1166 scan_columns(mr, v,
sizeof(vbuf), line, linesz, tbl);
1176 if (scanner->fn == 0) {
1179 (*scanner->fn)(kv, kvi, kvo, scanner->arg, index);
1184 static void load_table_files_in_memory(
int nprocs,
int rank,
char *directory,
1186 static void load_one_table_file_in_memory(
int nprocs,
int rank,
1188 char *filename, _Bool singlefile);
1191 load_input_tables(
int nprocs,
int rank,
char *directory,
1192 enum TABLE *tbls,
int ntbls)
1194 assert(directory != 0);
1197 printf(
"reading table files (in advance)...\n");
1200 MPI_Barrier(MPI_COMM_WORLD);
1201 double t0 = wtime();
1203 for (
int i = 0; i < ntbls; i++) {
1204 struct TABLE_INFO *tbl = find_table(tbls[i]);
1205 assert(tbl->data.nread != 0);
1206 load_table_files_in_memory(nprocs, rank, directory, tbl);
1209 MPI_Barrier(MPI_COMM_WORLD);
1210 double t1 = wtime();
1212 printf(
"reading table files (in advance) in %f sec\n", (t1 - t0));
1218 load_table_files_in_memory(
int nprocs,
int rank,
char *directory,
1224 assert(directory != 0);
1226 if (tbl->data.nread == 0) {
1228 }
else if (tbl->data.nread == 1) {
1230 cc = snprintf(filename,
sizeof(filename),
"%s/%s",
1231 directory, tbl->data.file);
1232 assert(cc < (
int)
sizeof(filename));
1233 load_one_table_file_in_memory(nprocs, rank, tbl, filename, 1);
1235 }
else if (tbl->data.nread == 2) {
1236 cc = snprintf(filename,
sizeof(filename),
"%s/%s",
1237 directory, tbl->data.file);
1238 assert(cc < (
int)
sizeof(filename));
1239 cc = access(filename, R_OK);
1243 load_one_table_file_in_memory(nprocs, rank, tbl, filename, 1);
1245 }
else if (errno == ENOENT) {
1248 for (
int j = 0; j < 50; j++) {
1250 int n = (1 + (j * nprocs) + rank);
1251 cc = snprintf(filename,
sizeof(filename),
"%s/%s.%d",
1252 directory, tbl->data.file, n);
1253 assert(cc < (
int)
sizeof(filename));
1254 cc = access(filename, R_OK);
1256 load_one_table_file_in_memory(nprocs, rank, tbl, filename, 0);
1258 }
else if (errno == ENOENT) {
1261 perror(
"access tbl file");
1262 MPI_Abort(MPI_COMM_WORLD, 1);
1267 perror(
"access tbl file");
1268 MPI_Abort(MPI_COMM_WORLD, 1);
1277 load_one_table_file_in_memory(
int nprocs,
int rank,
struct TABLE_INFO *tbl,
1278 char *filename, _Bool singlefile)
1282 int nth = tbl->data.nfiles;
1283 if (!(nth < tbl->data.nb)) {
1284 int nb = (singlefile ? 1 : (((nth + 1) + 7) & ~7));
1286 void **bb = realloc(tbl->data.buffers, (
sizeof(
void *) * (
size_t)nb));
1288 perror(
"realloc tbl buffer");
1289 MPI_Abort(MPI_COMM_WORLD, 1);
1292 for (
int i = 0; i < nb; i++) {
1293 if (i < tbl->data.nfiles) {
1299 tbl->data.buffers = bb;
1301 size_t *ss = realloc(tbl->data.sizes, (
sizeof(
size_t) * (
size_t)nb));
1303 perror(
"realloc tbl buffer");
1304 MPI_Abort(MPI_COMM_WORLD, 1);
1307 for (
int i = 0; i < nb; i++) {
1308 if (i < tbl->data.nfiles) {
1314 tbl->data.sizes = ss;
1319 assert((nth < tbl->data.nb) && (tbl->data.buffers != 0)
1320 && (tbl->data.buffers[nth] == 0));
1323 int fd = open(filename, O_RDONLY, 0);
1326 snprintf(ee,
sizeof(ee),
"open(%s) failed", filename);
1328 MPI_Abort(MPI_COMM_WORLD, 1);
1336 snprintf(ee,
sizeof(ee),
"fstat(%s) failed", filename);
1338 MPI_Abort(MPI_COMM_WORLD, 1);
1342 off_t fsz = s.st_size;
1347 size_t bsz = (size_t)(((fsz + 8) + (1024 - 1)) & (~(1024 - 1)));
1348 assert(bsz >= (
size_t)fsz);
1349 char *b = malloc(bsz);
1352 snprintf(ee,
sizeof(ee),
"malloc(%ld) failed", fsz);
1354 MPI_Abort(MPI_COMM_WORLD, 1);
1358 double t0 = wtime();
1360 off_t chunk = (8 * 1024 * 1024);
1363 size_t rr = (size_t)MIN((fsz - rd), chunk);
1364 ssize_t cx = read(fd, (b + rd), rr);
1367 snprintf(ee,
sizeof(ee),
"read(%s) failed", filename);
1369 MPI_Abort(MPI_COMM_WORLD, 1);
1377 double t1 = wtime();
1379 assert((nth == tbl->data.nfiles) && (tbl->data.buffers[nth] == 0));
1380 tbl->data.buffers[nth] = b;
1381 tbl->data.sizes[nth] = (size_t)fsz;
1384 if (report_time_to_read) {
1385 fprintf(stderr,
"[%05d] reading (%s) sz=%ld in %f sec\n",
1386 rank, filename, fsz, (t1 - t0));
1392 long cnt = kvo->c.element_count;
1393 printf(
"[%d] reading partial table (%s) n=%ld %f sec\n",
1394 rank, filename, cnt, (t1 - t0));
1408 KMR *mr = kvo->c.mr;
1411 assert(tbl->data.nread != 0);
1413 if (mr->rank == 0) {
1414 printf(
"scanning table file (%s)...\n", tbl->data.file);
1418 double t0 = wtime();
1421 if (run->redistribute_loaded_tables) {
1431 scan_table_in_memory(kvs0, tbl, &scanner);
1433 if (run->redistribute_loaded_tables) {
1434 assert(kvs0 != kvo);
1438 *(tbl->data.variable) = kvo;
1440 double t1 = wtime();
1442 ptime(kvo, 0,
"scanning table file", tbl->data.file, (t1 - t0));
1449 KMR *mr = kvo->c.mr;
1451 for (
int i = 0; i < tbl->data.nfiles; i++) {
1452 void *b = tbl->data.buffers[i];
1453 size_t sz = tbl->data.sizes[i];
1454 struct kmr_option keepopen = {.keep_open = 1};
1455 kmr_map_getline_in_memory_(mr, b, sz, 0,
1456 kvo, scanner, keepopen, scan_line);
1462 scan_table_files_in_advance(
KMR *mr,
enum TABLE *tbls,
int ntbls,
1465 if (mr->rank == 0) {
1466 printf(
"scanning tables (in advance)...\n"); fflush(0);
1468 double t0 = wtime();
1470 for (
int i = 0; i < ntbls; i++) {
1471 struct TABLE_INFO *tbl = find_table(tbls[i]);
1472 assert(tbl->data.nread != 0);
1474 scan_table_files(kvs, tbl->name, 0, 0, run);
1477 double t1 = wtime();
1478 if (mr->rank == 0) {
1479 printf(
"scanning tables in %f sec\n", (t1 - t0)); fflush(0);
1487 dump_line(
KMR *mr,
char *line,
int linesz,
struct kmr_ntuple *u,
1488 struct COLUMN descs[],
int ndescs)
1491 assert(u->n == ndescs);
1494 char *end = &line[linesz];
1495 for (
int i = 0; i < ndescs; i++) {
1497 cc = snprintf(q, (
size_t)(end - q),
"|");
1501 switch (descs[i].field) {
1507 assert(p.len ==
sizeof(
long));
1508 long v = *(
long *)(p.p);
1509 cc = snprintf(q, (
size_t)(end - q),
"%ld", v);
1511 fprintf(stderr,
"Bad integer in %d-th field\n", i);
1512 MPI_Abort(MPI_COMM_WORLD, 1);
1519 assert(p.len ==
sizeof(
double));
1520 double v = *(
double *)(p.p);
1521 cc = snprintf(q, (
size_t)(end - q),
"%lf", v);
1523 fprintf(stderr,
"Bad real in %d-th field\n", i);
1524 MPI_Abort(MPI_COMM_WORLD, 1);
1532 assert((
size_t)(p.len + 1) < (
size_t)(end - q));
1533 cc = snprintf(q, (
size_t)(p.len + 1),
"%s", v);
1535 fprintf(stderr,
"Bad text in %d-th field\n", i);
1536 MPI_Abort(MPI_COMM_WORLD, 1);
1540 q += MIN(p.len, cc);
1544 assert(p.len ==
sizeof(time_t));
1546 size_t cx = format_date(q, (
size_t)(end - q), *v);
1548 fprintf(stderr,
"Bad date in %d-th field\n", i);
1549 MPI_Abort(MPI_COMM_WORLD, 1);
1564 KMR *mr = kvi->c.mr;
1565 char line[RECORD_SIZE];
1566 char buf[RECORD_SIZE];
1569 struct RECORD *d = find_description((
enum TABLE)u->marker);
1571 if (d->nkeys == 1) {
1575 dump_line(kvi->c.mr, line, (
int)
sizeof(line), k,
1577 printf(
"%s:", line);
1580 dump_line(kvi->c.mr, line, (
int)
sizeof(line), k, d->keys, d->nkeys);
1581 printf(
"%s:", line);
1584 dump_line(mr, line, (
int)
sizeof(line), u, d->columns, d->ncolumns);
1585 printf(
"%s\n", line);
1594 KMR *mr = kvi->c.mr;
1595 char line[RECORD_SIZE];
1598 struct RECORD *d = find_description((
enum TABLE)u->marker);
1600 dump_line(mr, line, (
int)
sizeof(line), u, d->columns, d->ncolumns);
1602 printf(
"%s\n", line);
1607 dump_table(
KMR_KVS *kvs,
enum TABLE table)
1609 struct RECORD *d = find_description(table);
1610 struct kmr_option inspect = {.nothreading = 1, .inspect = 1};
1614 #define CREATE_KVS(MR, KEY, PUSHOFF) \ 1615 create_kvs1((MR), (KEY), KMR_KV_OPAQUE, \ 1616 (PUSHOFF), __FILE__, __LINE__, __func__) 1621 const char *file,
const int line,
const char *func)
1626 kmr_noopt, file, line, func);
1629 kmr_noopt, file, line, func);
1656 select_by_fields(
const struct kmr_kv_box kv0,
1658 void *p,
const long i)
1660 char kbuf[RECORD_SIZE];
1662 char vbuf[RECORD_SIZE];
1665 struct SELECT *selector = p;
1666 KMR *mr = kvo->c.mr;
1668 assert(u->marker == (
int)selector->input);
1672 if (selector->nkeys == 0) {
1675 }
else if (selector->nkeys == 1) {
1681 put_columns_by_indexes(mr, k,
sizeof(kbuf), u,
1682 selector->keys, selector->nkeys);
1688 put_columns_by_indexes(mr, v,
sizeof(vbuf), u,
1689 selector->columns, selector->ncolumns);
1705 join_by_fields(
const struct kmr_kv_box kv[],
const long n,
1713 producer->columns, producer->ncolumns,
1714 producer->keys, producer->nkeys);
1716 if (producer->trace_product) {
1717 fprintf(stderr,
"prod %ld %ld\n", cnt[0], cnt[1]); fflush(0);
1719 if (producer->trace_product_nonempty && cnt[0] != 0 && cnt[1] != 0) {
1720 fprintf(stderr,
"prod %ld %ld\n", cnt[0], cnt[1]); fflush(0);
1722 assert((!producer->cnt0_zero || cnt[0] == 0)
1723 && (!producer->cnt0_one || cnt[0] == 1)
1724 && (!producer->cnt0_zero_one || (cnt[0] == 0 || cnt[0] == 1))
1725 && (!producer->cnt0_nonzero || cnt[0] != 0));
1726 assert((!producer->cnt1_zero || cnt[1] == 0)
1727 && (!producer->cnt1_one || cnt[1] == 1)
1728 && (!producer->cnt1_zero_one || (cnt[1] == 0 || cnt[1] == 1))
1729 && (!producer->cnt1_nonzero || cnt[1] != 0));
1738 kmr_redfn_t join,
void *arg,
char *msg, _Bool pushoff)
1740 KMR *mr = input0->c.mr;
1744 ptime(input0, 0, 0, 0, 0.0);
1745 double t0 = wtime();
1746 KMR_KVS *kvs1 = CREATE_KVS(mr, input0->c.key_data, 0);
1748 double t1 = wtime();
1749 ptime(kvs1, 0,
"shuffle", msg, (t1 - t0));
1750 pcount(kvs1, 0, msg, 0);
1751 KMR_KVS *kvs2 = CREATE_KVS(mr, outputkf, pushoff);
1752 kmr_reduce(kvs1, kvs2, arg, kmr_noopt, join);
1753 pcount(kvs2, 0, msg, 1);
1759 kmr_redfn_t join,
void *arg,
char *m, _Bool pushoff)
1761 KMR *mr = input0->c.mr;
1763 snprintf(msg,
sizeof(msg),
"join (%s)", m);
1765 ptime(input0, input1, 0, 0, 0.0);
1766 double t0 = wtime();
1767 assert(input0->c.key_data == input1->c.key_data);
1769 KMR_KVS *kvs0 = CREATE_KVS(mr, inputkf, 0);
1771 KMR_KVS *kvs1 = CREATE_KVS(mr, inputkf, 0);
1773 double t1 = wtime();
1774 ptime(kvs0, kvs1,
"shuffle", m, (t1 - t0));
1775 KMR_KVS *vec[] = {kvs0, kvs1};
1776 KMR_KVS *kvs2 = CREATE_KVS(mr, inputkf, 0);
1778 pcount(kvs2, 0, msg, 0);
1779 KMR_KVS *kvs3 = CREATE_KVS(mr, outputkf, pushoff);
1780 kmr_reduce(kvs2, kvs3, arg, kmr_noopt, join);
1781 pcount(kvs3, 0, msg, 1);
1787 struct PRODUCT *join,
char *m, _Bool pushoff)
1789 KMR *mr = input0->c.mr;
1791 snprintf(msg,
sizeof(msg),
"join (%s)", m);
1793 ptime(input0, input1, 0, 0, 0.0);
1794 double t0 = wtime();
1795 assert(input0->c.key_data == input1->c.key_data);
1797 KMR_KVS *kvs0 = CREATE_KVS(mr, inputkf, 0);
1799 KMR_KVS *kvs1 = CREATE_KVS(mr, inputkf, 0);
1801 double t1 = wtime();
1802 ptime(kvs0, kvs1,
"shuffle", m, (t1 - t0));
1803 KMR_KVS *vec[] = {kvs0, kvs1};
1804 KMR_KVS *kvs2 = CREATE_KVS(mr, inputkf, 0);
1806 pcount(kvs2, 0, msg, 0);
1807 KMR_KVS *kvs3 = CREATE_KVS(mr, outputkf, pushoff);
1808 kmr_reduce(kvs2, kvs3, join, kmr_noopt, join_by_fields);
1809 pcount(kvs3, 0, msg, 1);
1893 static struct SELECT q7_select_s = {
1895 .output = TAB_Q7_S1,
1903 static struct SELECT q7_select_c = {
1905 .output = TAB_Q7_C1,
1913 static struct SELECT q7_select_o = {
1915 .output = TAB_Q7_O1,
1923 static struct PRODUCT q7_join_nn_s = {
1924 .inputs = {TAB_Q7_S1, TAB_Q7_NN},
1925 .output = TAB_Q7_NNS,
1928 .columns = {{2, SND}, {1, FST}, {1, SND}, {3, SND}},
1930 .keys = {{2, SND}, {1, FST}},
1934 static struct PRODUCT q7_join_c_o = {
1935 .inputs = {TAB_Q7_C1, TAB_Q7_O1},
1936 .output = TAB_Q7_CO,
1939 .columns = {{1, SND}, {1, FST}},
1945 static struct PRODUCT q7_join_l_co = {
1946 .inputs = {TAB_Q7_L1, TAB_Q7_CO},
1947 .output = TAB_Q7_CLO,
1950 .columns = {{1, SND}, {1, FST}, {2, FST}, {3, FST}},
1952 .keys = {{1, SND}, {1, FST}},
1954 .trace_product_nonempty = 0
1957 struct PRODUCT q7_join_clo_nns = {
1958 .inputs = {TAB_Q7_CLO, TAB_Q7_NNS},
1959 .output = TAB_Q7_CLNNOS0,
1962 .columns = {{2, SND}, {3, SND}, {2, FST}, {3, FST}},
1967 .trace_product_nonempty = 0
1972 KMR_KVS *kvo,
void *p,
const long i)
1974 char kbuf[RECORD_SIZE];
1976 char vbuf[RECORD_SIZE];
1979 enum TABLE input = TAB_N;
1980 enum TABLE output = TAB_Q7_N1;
1982 struct RECORD *d = find_description(input);
1984 KMR *mr = kvo->c.mr;
1988 char *s0 =
"FRANCE";
1989 char *s1 =
"GERMANY";
1990 if (strncmp((
char *)name.p, s0, strlen(s0)) == 0
1991 || strncmp((
char *)name.p, s1, strlen(s1)) == 0) {
1992 char *cols[] = {
"n_nationkey",
"n_name"};
1996 put_columns_by_names(mr, v,
sizeof(vbuf), u, d, cols, 2);
1997 add_record(kvo, k, v);
2004 q7_pair_names(
const struct kmr_kv_box kv[],
const long n,
2007 char kbuf[RECORD_SIZE];
2009 char vbuf[RECORD_SIZE];
2012 enum TABLE input = TAB_N;
2013 enum TABLE output = TAB_Q7_NN;
2015 struct RECORD *d = find_description(input);
2016 char *cols[] = {
"n_nationkey",
"n_name"};
2018 KMR *mr = kvo->c.mr;
2029 put_columns_by_names(mr, v,
sizeof(vbuf), u0, d, cols, 2);
2030 put_columns_by_names(mr, v,
sizeof(vbuf), u1, d, cols, 2);
2031 add_record(kvo, k, v);
2040 put_columns_by_names(mr, v,
sizeof(vbuf), u1, d, cols, 2);
2041 put_columns_by_names(mr, v,
sizeof(vbuf), u0, d, cols, 2);
2042 add_record(kvo, k, v);
2051 q7_select_by_date(
const struct kmr_kv_box kv0,
2053 void *p,
const long i)
2055 char kbuf[RECORD_SIZE];
2057 char vbuf[RECORD_SIZE];
2060 enum TABLE input = TAB_L;
2061 enum TABLE output = TAB_Q7_L1;
2063 struct RECORD *d = find_description(input);
2064 struct RECORD *dx = find_description(output);
2067 KMR *mr = kvo->c.mr;
2070 assert(u->marker == (
int)input);
2072 time_t shipdate = get_date_column(u, d,
"l_shipdate");
2073 if (dt[0] <= shipdate && shipdate <= dt[1]) {
2077 time_t year = year_value(shipdate);
2079 double extendedprice = get_real_column(u, d,
"l_extendedprice");
2080 double discount = get_real_column(u, d,
"l_discount");
2081 double volume = (extendedprice * (1 - discount));
2089 kmr_put_ntuple(mr, v, (
int)
sizeof(vbuf), &volume,
sizeof(volume));
2090 add_record(kvo, k, v);
2097 q7_make_sort_keys(
const struct kmr_kv_box kv0,
2099 void *p,
const long i)
2101 char kbuf[RECORD_SIZE];
2104 enum TABLE input = TAB_Q7_CLNNOS0;
2106 struct RECORD *d = find_description(input);
2108 KMR *mr = kvo->c.mr;
2110 assert(u->marker == (
int)input);
2114 time_t year = get_date_column_by_index(u, d->columns, 2);
2116 char nbuf[NAME_SIZE];
2117 assert((name1.len + name2.len) <= NAME_SIZE);
2118 memset(nbuf, 0,
sizeof(nbuf));
2119 memcpy(&nbuf[0], name1.p, name1.len);
2120 memcpy(&nbuf[name1.len], name2.p, name2.len);
2121 uint64_t beyear = htonll_((uint64_t)(year));
2125 kmr_put_ntuple(mr, k, (
int)
sizeof(kbuf), &beyear,
sizeof(beyear));
2126 add_record(kvo, k, (
void *)kv0.v.p);
2132 q7_sum_volume(
const struct kmr_kv_box kv[],
const long n,
2135 char vbuf[RECORD_SIZE];
2138 enum TABLE input = TAB_Q7_CLNNOS1;
2139 enum TABLE output = TAB_Q7_REVENUE;
2140 struct RECORD *d = find_description(input);
2142 KMR *mr = kvo->c.mr;
2144 assert(u0->marker == TAB_Q7_CLNNOS0);
2148 for (
long i = 0; i < n; i++) {
2150 double volume = get_real_column_by_index(u, d->columns, 3);
2154 int cols[] = {0, 1, 2};
2156 assert(d->nkeys >= 2);
2158 put_columns_by_indexes(mr, v,
sizeof(vbuf), u0, cols, 3);
2159 kmr_put_ntuple(mr, v, (
int)
sizeof(vbuf), &revenue,
sizeof(revenue));
2160 add_record(kvo, (
void *)kv[0].k.p, v);
2166 q7(
KMR *mr,
struct RUN *run)
2168 _Bool pushoff = ((run->pushoff == 0) ? 0 : 1);
2169 struct kmr_option rankzero = {.rank_zero = 1};
2172 if (mr->rank == 0) {printf(
"q7 (with push-off)...\n"); fflush(0);}
2174 if (mr->rank == 0) {printf(
"q7...\n"); fflush(0);}
2179 if (mr->rank == 0) {printf(
"q7 (n+n)...\n"); fflush(0);}
2181 if (run->load_tables_in_advance) {
2185 scan_table_files(n_, TAB_N, 0, 0, run);
2188 KMR_KVS *n1 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
2189 kmr_map(N0, n1, 0, kmr_noopt, q7_select_nations);
2190 KMR_KVS *n2 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
2192 pcount(n2, 0,
"join (n+n)", 0);
2193 KMR_KVS *nn = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
2194 kmr_reduce(n2, nn, 0, kmr_noopt, q7_pair_names);
2195 pcount(nn, 0,
"join (n+n)", 1);
2197 if (mr->rank == 0) {printf(
"q7 (s)...\n"); fflush(0);}
2199 KMR_KVS *s1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2200 if (run->load_tables_in_advance) {
2201 kmr_map(S0, s1, &q7_select_s, kmr_noopt, select_by_fields);
2203 scan_table_files(s1, TAB_S, select_by_fields, &q7_select_s, run);
2206 if (mr->rank == 0) {printf(
"q7 (nn+s)...\n"); fflush(0);}
2208 KMR_KVS *nns = JOINP(s1, nn, KMR_KV_OPAQUE,
2209 &q7_join_nn_s,
"nn+s", pushoff);
2213 if (mr->rank == 0) {printf(
"q7 (c)...\n"); fflush(0);}
2215 KMR_KVS *c1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2216 if (run->load_tables_in_advance) {
2217 kmr_map(C0, c1, &q7_select_c, kmr_noopt, select_by_fields);
2219 scan_table_files(c1, TAB_C, select_by_fields, &q7_select_c, run);
2222 if (mr->rank == 0) {printf(
"q7 (o)...\n"); fflush(0);}
2224 KMR_KVS *o1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2225 if (run->load_tables_in_advance) {
2226 kmr_map(O0, o1, &q7_select_o, kmr_noopt, select_by_fields);
2228 scan_table_files(o1, TAB_O, select_by_fields, &q7_select_o, run);
2231 if (mr->rank == 0) {printf(
"q7 (c+o)...\n"); fflush(0);}
2233 KMR_KVS *co = JOINP(c1, o1, KMR_KV_OPAQUE,
2234 &q7_join_c_o,
"c+o", pushoff);
2236 if (mr->rank == 0) {printf(
"q7 (l)...\n"); fflush(0);}
2239 tv[0] = decode_date(
"1995-01-01");
2240 tv[1] = decode_date(
"1996-12-31");
2241 assert((tv[0] != (time_t)-1) && (tv[1] != (time_t)-1));
2242 KMR_KVS *l1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2243 if (run->load_tables_in_advance) {
2244 kmr_map(L0, l1, tv, kmr_noopt, q7_select_by_date);
2246 scan_table_files(l1, TAB_L, q7_select_by_date, tv, run);
2251 if (mr->rank == 0) {printf(
"q7 (co+l)...\n"); fflush(0);}
2253 KMR_KVS *clo = JOINP(l1, co, KMR_KV_OPAQUE,
2254 &q7_join_l_co,
"co+l", pushoff);
2258 if (mr->rank == 0) {printf(
"q7 (clo+nns)...\n"); fflush(0);}
2260 KMR_KVS *clnnos2 = JOINP(clo, nns, KMR_KV_OPAQUE,
2261 &q7_join_clo_nns,
"clo+nns", 0);
2262 KMR_KVS *clnnos3 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2263 kmr_map(clnnos2, clnnos3, 0, kmr_noopt, q7_make_sort_keys);
2265 KMR_KVS *revenue0 = JOIN1(clnnos3, KMR_KV_OPAQUE,
2266 q7_sum_volume, 0,
"sum(volume)", 0);
2267 KMR_KVS *revenue1 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
2268 kmr_sort(revenue0, revenue1, kmr_noopt);
2329 static struct PRODUCT q9_join_p_h = {
2330 .inputs = {TAB_P, TAB_H},
2331 .output = TAB_Q9_PH,
2334 .columns = {{0, SND}, {1, SND}, {3, SND}},
2342 static struct PRODUCT q9_join_s_n = {
2343 .inputs = {TAB_S, TAB_N},
2344 .output = TAB_Q9_NS,
2347 .columns = {{1, SND}, {0, FST}},
2354 static struct PRODUCT q9_join_hp_ns = {
2355 .inputs = {TAB_Q9_PH, TAB_Q9_NS},
2356 .output = TAB_Q9_PHSN,
2359 .columns = {{0, SND}, {0, FST}, {1, FST}, {2, FST}},
2361 .keys = {{0, FST}, {1, FST}},
2366 static struct PRODUCT q9_join_l_o = {
2367 .inputs = {TAB_L, TAB_O},
2368 .output = TAB_Q9_LO,
2372 .columns = {{6, FST}, {5, FST}, {1, FST}, {4, FST}, {2, FST}, {4, SND}},
2374 .keys = {{1, FST}, {2, FST}},
2379 static struct PRODUCT q9_join_hnps_lo = {
2380 .inputs = {TAB_Q9_PHSN, TAB_Q9_LO},
2381 .output = TAB_Q9_PHSNLO,
2385 .columns = {{0, SND}, {1, SND}, {3, SND}, {0, FST}, {5, SND}, {3, FST}},
2394 q9_select_by_name(
const struct kmr_kv_box kv0,
2396 void *p,
const long i)
2398 enum TABLE input = TAB_P;
2399 struct RECORD *d0 = find_description(input);
2400 assert(d0->columns[1].field == F_TEXT);
2404 char *pos = strnstr_(e.p,
"green", (
size_t)e.len);
2412 q9_calculate_amount(
const struct kmr_kv_box kv0,
2414 void *p,
const long i)
2416 char kbuf[RECORD_SIZE];
2418 char vbuf[RECORD_SIZE];
2421 enum TABLE inputoutput = TAB_Q9_PHSNLO;
2422 struct RECORD *d = find_description(inputoutput);
2424 KMR *mr = kvo->c.mr;
2426 double l_discount = get_real_column_by_index(u, d->columns, 0);
2427 double l_extendedprice = get_real_column_by_index(u, d->columns, 1);
2428 double l_quantity = get_real_column_by_index(u, d->columns, 2);
2430 time_t o_orderdate = get_date_column_by_index(u, d->columns, 4);
2431 double l_supplycost = get_real_column_by_index(u, d->columns, 5);
2433 time_t year = year_value(o_orderdate);
2435 double amount = ((l_extendedprice * (1 - l_discount))
2436 - (l_supplycost * l_quantity));
2439 char nbuf[NAME_SIZE];
2440 assert(n_name.len <= NAME_SIZE);
2441 memset(nbuf, 0,
sizeof(nbuf));
2442 memcpy(nbuf, n_name.p, n_name.len);
2443 uint64_t beyear = htonll_((uint64_t)(-year));
2447 kmr_put_ntuple(mr, k, (
int)
sizeof(kbuf), &beyear,
sizeof(beyear));
2454 kmr_put_ntuple(mr, v, (
int)
sizeof(vbuf), &amount,
sizeof(amount));
2456 add_record(kvo, k, v);
2462 q9_sum_amount(
const struct kmr_kv_box kv[],
const long n,
2465 char vbuf[RECORD_SIZE];
2468 enum TABLE inputoutput = TAB_Q9_AMOUNT;
2469 struct RECORD *d = find_description(inputoutput);
2471 KMR *mr = kvo->c.mr;
2475 for (
long i = 0; i < n; i++) {
2477 double amount = get_real_column_by_index(u, d->columns, 2);
2504 q9(
KMR *mr,
struct RUN *run)
2506 _Bool pushoff = ((run->pushoff == 0) ? 0 : 1);
2510 if (mr->rank == 0) {printf(
"q9 (with push-off)...\n"); fflush(0);}
2512 if (mr->rank == 0) {printf(
"q9...\n"); fflush(0);}
2517 if (mr->rank == 0) {printf(
"q9 (p+ps)...\n"); fflush(0);}
2519 if (run->load_tables_in_advance) {
2522 KMR_KVS *h0 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2523 scan_table_files(h0, TAB_H, 0, 0, run);
2526 KMR_KVS *p1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2527 if (run->load_tables_in_advance) {
2528 kmr_map(P0, p1, 0, kmr_noopt, q9_select_by_name);
2530 scan_table_files(p1, TAB_P, q9_select_by_name, 0, run);
2533 KMR_KVS *pps2 = JOINP(p1, H0, KMR_KV_OPAQUE,
2534 &q9_join_p_h,
"p+ps", pushoff);
2538 if (mr->rank == 0) {printf(
"q9 (s+n)...\n"); fflush(0);}
2540 if (run->load_tables_in_advance) {
2543 KMR_KVS *n0 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2544 scan_table_files(n0, TAB_N, 0, 0, run);
2547 KMR_KVS *s0x = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2549 if (run->load_tables_in_advance) {
2550 kmr_map(S0, s0x, &nth, kmr_noopt, key_by_nth);
2552 scan_table_files(s0x, TAB_S, key_by_nth, &nth, run);
2555 KMR_KVS *sn2 = JOINP(s0x, N0, KMR_KV_OPAQUE,
2556 &q9_join_s_n,
"n+s", pushoff);
2560 if (mr->rank == 0) {printf(
"q9 (p+ps)+(s+n)...\n"); fflush(0);}
2562 KMR_KVS *ppssn2 = JOINP(pps2, sn2, KMR_KV_OPAQUE,
2563 &q9_join_hp_ns,
"ns+pps", pushoff);
2567 if (mr->rank == 0) {printf(
"q9 (l+o)...\n"); fflush(0);}
2569 KMR_KVS *l1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2571 if (run->load_tables_in_advance) {
2572 kmr_map(L0, l1, &nth, kmr_noopt, key_by_nth);
2574 scan_table_files(l1, TAB_L, key_by_nth, &nth, run);
2577 KMR_KVS *o1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2579 if (run->load_tables_in_advance) {
2580 kmr_map(O0, o1, &nth, kmr_noopt, key_by_nth);
2582 scan_table_files(o1, TAB_O, key_by_nth, &nth, run);
2585 KMR_KVS *lo2 = JOINP(l1, o1, KMR_KV_OPAQUE,
2586 &q9_join_l_o,
"l+o", pushoff);
2590 if (mr->rank == 0) {printf(
"q9 (((p+ps)+(s+n))+(l+o))...\n"); fflush(0);}
2592 KMR_KVS *ppssnlo2 = JOINP(ppssn2, lo2, KMR_KV_OPAQUE,
2593 &q9_join_hnps_lo,
"lo+nppss", 0);
2594 KMR_KVS *ppssnlo3 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2595 kmr_map(ppssnlo2, ppssnlo3, 0, kmr_noopt, q9_calculate_amount);
2597 pcount(ppssnlo3, 0,
"q9_calculate_amount", 1);
2599 KMR_KVS *ppssnlo5 = JOIN1(ppssnlo3, KMR_KV_OPAQUE,
2600 q9_sum_amount, 0,
"sum(amount)", 0);
2602 KMR_KVS *ppssnlo6 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
2603 kmr_sort(ppssnlo5, ppssnlo6, kmr_noopt);
2676 static struct SELECT q10_select_c = {
2678 .output = TAB_Q10_C1,
2682 .columns = {3, 0, 1, 5, 4, 2, 7},
2687 static struct SELECT q10_select_n = {
2689 .output = TAB_Q10_N1,
2697 static struct PRODUCT q10_join_l_o = {
2698 .inputs = {TAB_Q10_L1, TAB_Q10_O1},
2699 .output = TAB_Q10_LO,
2702 .columns = {{1, SND}, {1, FST}},
2708 static struct PRODUCT q10_join_c_n = {
2709 .inputs = {TAB_Q10_C1, TAB_Q10_N1},
2710 .output = TAB_Q10_CN,
2714 .columns = {{1, FST}, {2, FST}, {3, FST}, {4, FST},
2715 {1, SND}, {5, FST}, {6, FST}},
2721 static struct PRODUCT q10_join_cn_lo = {
2722 .inputs = {TAB_Q10_CN, TAB_Q10_LO},
2723 .output = TAB_Q10_CLNO0,
2727 .columns = {{0, FST}, {1, FST}, {2, FST}, {3, FST},
2728 {4, FST}, {5, FST}, {6, FST}, {1, SND}},
2730 .keys = {{0, FST}, {1, FST}, {2, FST}, {3, FST},
2731 {4, FST}, {5, FST}, {6, FST}},
2737 KMR_KVS *kvo,
void *p,
const long i)
2739 char kbuf[RECORD_SIZE];
2741 char vbuf[RECORD_SIZE];
2744 enum TABLE input = TAB_L;
2745 enum TABLE output = TAB_Q10_L1;
2746 struct RECORD *d = find_description(input);
2748 KMR *mr = kvo->c.mr;
2752 if (strncmp((
char *)returnflag.p, s0, 1) == 0) {
2753 double extendedprice = get_real_column(u, d,
"l_extendedprice");
2754 double discount = get_real_column(u, d,
"l_discount");
2755 double volume = (extendedprice * (1 - discount));
2762 kmr_put_ntuple(mr, v, (
int)
sizeof(vbuf), &volume,
sizeof(volume));
2763 add_record(kvo, k, v);
2771 KMR_KVS *kvo,
void *p,
const long i)
2773 char kbuf[RECORD_SIZE];
2775 char vbuf[RECORD_SIZE];
2778 enum TABLE input = TAB_O;
2779 enum TABLE output = TAB_Q10_O1;
2780 struct RECORD *d = find_description(input);
2783 KMR *mr = kvo->c.mr;
2785 assert(u->marker == (
int)input);
2787 time_t orderdate = get_date_column(u, d,
"o_orderdate");
2788 if (tv[0] <= orderdate && orderdate <= tv[1]) {
2791 char *cols[] = {
"o_orderkey",
"o_custkey"};
2796 put_columns_by_names(mr, v,
sizeof(vbuf), u, d, cols, 2);
2797 add_record(kvo, k, v);
2803 q10_sum_volume(
const struct kmr_kv_box kv[],
const long n,
2806 char kbuf[RECORD_SIZE];
2808 char vbuf[RECORD_SIZE];
2811 enum TABLE input = TAB_Q10_CLNO0;
2812 enum TABLE output = TAB_Q10_CLNO1;
2813 struct RECORD *d = find_description(input);
2815 KMR *mr = kvo->c.mr;
2817 assert(u0->marker == (
int)input);
2821 for (
long i = 0; i < n; i++) {
2823 double volume = get_real_column_by_index(u, d->columns, 7);
2827 char *cols0[] = {
"c_custkey",
"c_name"};
2828 char *cols1[] = {
"c_acctbal",
"n_name",
"c_address",
2829 "c_phone",
"c_comment"};
2831 double negrevenue = -revenue;
2834 kmr_put_ntuple(mr, k, (
int)
sizeof(kbuf), &negrevenue,
sizeof(
double));
2836 put_columns_by_names(mr, v,
sizeof(vbuf), u0, d, cols0, 2);
2837 kmr_put_ntuple(mr, v, (
int)
sizeof(vbuf), &revenue,
sizeof(revenue));
2838 put_columns_by_names(mr, v,
sizeof(vbuf), u0, d, cols1, 5);
2839 add_record(kvo, k, v);
2845 q10(
KMR *mr,
struct RUN *run)
2847 _Bool pushoff = ((run->pushoff == 0) ? 0 : 1);
2850 if (mr->rank == 0) {printf(
"q10 (with push-off)...\n"); fflush(0);}
2852 if (mr->rank == 0) {printf(
"q10...\n"); fflush(0);}
2857 if (mr->rank == 0) {printf(
"q10 (l)...\n"); fflush(0);}
2859 KMR_KVS *l1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2860 if (run->load_tables_in_advance) {
2861 kmr_map(L0, l1, 0, kmr_noopt, q10_select_by_flag);
2863 scan_table_files(l1, TAB_L, q10_select_by_flag, 0, run);
2868 if (mr->rank == 0) {printf(
"q10 (o)...\n"); fflush(0);}
2871 tv[0] = decode_date(
"1993-10-01");
2872 assert(tv[0] != (time_t)-1);
2873 tv[1] = decode_date(
"1994-01-01");
2874 assert(tv[1] != (time_t)-1);
2875 KMR_KVS *o1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2876 if (run->load_tables_in_advance) {
2877 kmr_map(O0, o1, tv, kmr_noopt, q10_select_by_date);
2879 scan_table_files(o1, TAB_O, q10_select_by_date, tv, run);
2884 KMR_KVS *lo = JOINP(l1, o1, KMR_KV_OPAQUE,
2885 &q10_join_l_o,
"l+o", pushoff);
2889 if (mr->rank == 0) {printf(
"q10 (c)...\n"); fflush(0);}
2891 KMR_KVS *c1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2892 if (run->load_tables_in_advance) {
2893 kmr_map(C0, c1, &q10_select_c, kmr_noopt, select_by_fields);
2895 scan_table_files(c1, TAB_C, select_by_fields, &q10_select_c, run);
2898 if (mr->rank == 0) {printf(
"q10 (n)...\n"); fflush(0);}
2900 KMR_KVS *n1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2901 if (run->load_tables_in_advance) {
2902 kmr_map(N0, n1, &q10_select_n, kmr_noopt, select_by_fields);
2904 scan_table_files(n1, TAB_N, select_by_fields, &q10_select_n, run);
2907 if (mr->rank == 0) {printf(
"q10 (c+n)...\n"); fflush(0);}
2909 KMR_KVS *cn = JOINP(c1, n1, KMR_KV_OPAQUE,
2910 &q10_join_c_n,
"c+n", pushoff);
2914 if (mr->rank == 0) {printf(
"q10 (cn+lo)...\n"); fflush(0);}
2916 KMR_KVS *clno0 = JOINP(cn, lo, KMR_KV_OPAQUE,
2917 &q10_join_cn_lo,
"cn+lo", pushoff);
2919 if (mr->rank == 0) {printf(
"q10 (sum(volume))...\n"); fflush(0);}
2921 KMR_KVS *clno1 = JOIN1(clno0, KMR_KV_FLOAT8,
2922 q10_sum_volume, 0,
"sum(volume)", 0);
2926 KMR_KVS *clno2 = CREATE_KVS(mr, KMR_KV_FLOAT8, 0);
2929 pcount(clno2, 0,
"sort", 1);
2931 KMR_KVS *clno3 = CREATE_KVS(mr, KMR_KV_FLOAT8, 0);
2934 pcount(clno3, 0,
"choose", 1);
2981 q13_select_by_string(
const struct kmr_kv_box kv0,
2983 void *p,
const long i)
2985 char kbuf[RECORD_SIZE];
2987 char vbuf[RECORD_SIZE];
2990 enum TABLE input = TAB_O;
2991 enum TABLE output = TAB_Q13_O1;
2992 struct RECORD *d = find_description(input);
2994 KMR *mr = kvo->c.mr;
2997 assert(d->columns[8].field == F_TEXT);
2999 char *p0 = (comment.p);
3000 char *end = &(p0[comment.len]);
3001 char *p1 = strnstr_(p0,
"special", (
size_t)(end - p0));
3002 char *p2 = ((p1 == 0) ? 0 : strnstr_(p1,
"requests", (
size_t)(end - p1)));
3004 if (!(p1 != 0 && p2 != 0)) {
3012 add_record(kvo, k, v);
3018 q13_join_c_o(
const struct kmr_kv_box kv[],
const long n,
3021 char kbuf[RECORD_SIZE];
3023 char vbuf[RECORD_SIZE];
3026 int inputs[2] = {TAB_C, TAB_Q13_O1};
3027 int output = TAB_Q13_CO0;
3030 KMR *mr = kvo->c.mr;
3036 assert(cnt[0] == 1);
3038 long count = cnt[1];
3042 assert(custkey.len ==
sizeof(
long));
3048 add_record(kvo, k, v);
3056 q13_join_co(
const struct kmr_kv_box kv[],
const long n,
3059 char kbuf[RECORD_SIZE];
3061 char vbuf[RECORD_SIZE];
3065 enum TABLE output = TAB_Q13_CO1;
3067 KMR *mr = kvo->c.mr;
3068 assert(kv[0].klen ==
sizeof(
long));
3070 long count = kv[0].k.i;
3074 assert(
sizeof(uint64_t) ==
sizeof(
long));
3075 uint64_t becustdist = htonll_((uint64_t)(-custdist));
3076 uint64_t becount = htonll_((uint64_t)(-count));
3084 add_record(kvo, k, v);
3090 q13(
KMR *mr,
struct RUN *run)
3092 _Bool pushoff = ((run->pushoff == 0) ? 0 : 1);
3095 if (mr->rank == 0) {printf(
"q13 (with push-off)...\n"); fflush(0);}
3097 if (mr->rank == 0) {printf(
"q13...\n"); fflush(0);}
3102 if (mr->rank == 0) {printf(
"q13 (o)...\n"); fflush(0);}
3104 KMR_KVS *o1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
3105 if (run->load_tables_in_advance) {
3106 kmr_map(O0, o1, 0, kmr_noopt, q13_select_by_string);
3108 scan_table_files(o1, TAB_O, q13_select_by_string, 0, run);
3111 pcount(o1, 0,
"select (o)", 1);
3115 if (run->load_tables_in_advance) {
3118 KMR_KVS *c0 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
3119 scan_table_files(c0, TAB_C, 0, 0, run);
3122 KMR_KVS *co2 = JOIN2(C0, o1, KMR_KV_INTEGER,
3123 q13_join_c_o, 0,
"c+o", pushoff);
3125 KMR_KVS *co4 = JOIN1(co2, KMR_KV_OPAQUE,
3126 q13_join_co, 0,
"c+o", 0);
3128 KMR_KVS *co5 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
3241 static struct PRODUCT q21_join_n_s = {
3242 .inputs = {TAB_Q21_N1, TAB_S},
3243 .output = TAB_Q21_NS,
3246 .columns = {{0, SND}, {1, SND}},
3252 static struct PRODUCT q21_join_l_ns = {
3253 .inputs = {TAB_Q21_L1, TAB_Q21_NS},
3254 .output = TAB_Q21_LNS,
3257 .columns = {{0, FST}, {1, FST}, {1, SND}},
3263 static struct PRODUCT q21_join_lns_o = {
3264 .inputs = {TAB_Q21_LNS, TAB_Q21_O1},
3265 .output = TAB_Q21_LNS,
3268 .columns = {{0, FST}, {1, FST}, {2, FST}},
3275 q21_select_n_by_name(
const struct kmr_kv_box kv0,
3277 void *p,
const long i)
3279 char kbuf[RECORD_SIZE];
3281 char vbuf[RECORD_SIZE];
3284 enum TABLE input = TAB_N;
3285 enum TABLE output = TAB_Q21_N1;
3287 struct RECORD *d = find_description(input);
3289 KMR *mr = kvo->c.mr;
3292 assert(d->columns[1].field == F_TEXT);
3294 char *s0 =
"SAUDI ARABIA";
3295 size_t len = strlen(s0);
3296 if (((
size_t)n_name.len == len)
3297 && (strncmp((
char *)n_name.p, s0, len) == 0)) {
3304 add_record(kvo, k, v);
3311 KMR_KVS *kvo,
void *p,
const long i)
3313 char kbuf[RECORD_SIZE];
3315 char vbuf[RECORD_SIZE];
3318 enum TABLE input = TAB_L;
3319 enum TABLE output = TAB_Q21_L1;
3320 struct RECORD *d = find_description(input);
3322 KMR *mr = kvo->c.mr;
3325 time_t receiptdate = get_date_column(u, d,
"l_receiptdate");
3326 time_t commitdate = get_date_column(u, d,
"l_commitdate");
3327 if (receiptdate > commitdate) {
3336 add_record(kvo, k, v);
3344 KMR_KVS *kvo,
void *p,
const long i)
3346 char kbuf[RECORD_SIZE];
3348 char vbuf[RECORD_SIZE];
3351 enum TABLE input = TAB_L;
3352 enum TABLE output = TAB_Q21_L3;
3353 struct RECORD *d = find_description(input);
3355 KMR *mr = kvo->c.mr;
3358 time_t receiptdate = get_date_column(u, d,
"l_receiptdate");
3359 time_t commitdate = get_date_column(u, d,
"l_commitdate");
3360 if (receiptdate > commitdate) {
3369 add_record(kvo, k, v);
3375 static struct SELECT q21_copy_l = {
3377 .output = TAB_Q21_L3,
3386 KMR_KVS *kvo,
void *p,
const long i)
3388 char vbuf[RECORD_SIZE];
3391 enum TABLE input = TAB_O;
3392 enum TABLE output = TAB_Q21_O1;
3393 struct RECORD *d = find_description(input);
3395 KMR *mr = kvo->c.mr;
3398 char *s0 = (
char *)status.p;
3399 if (status.len == 1 && s0[0] ==
'F') {
3406 .klen = orderkey.len,
3418 q21_join_lnos_l2(
const struct kmr_kv_box kv[],
const long n,
3421 char kbuf[RECORD_SIZE];
3426 int inputs[] = {TAB_Q21_LNS, TAB_Q21_L3};
3428 struct RECORD *d0 = find_description((
enum TABLE)inputs[0]);
3429 struct RECORD *d1 = find_description((
enum TABLE)inputs[1]);
3431 KMR *mr = kvo->c.mr;
3436 for (
long i0 = 0; i0 < cnt[0]; i0++) {
3438 long suppkey0 = get_int_column_by_index(u0, d0->columns, 1);
3440 for (
long i1 = 0; i1 < cnt[1]; i1++) {
3442 long suppkey1 = get_int_column_by_index(u1, d1->columns, 1);
3443 if (suppkey0 != suppkey1) {
3452 add_record(kvo, k, u0);
3460 q21_join_lnos_l3(
const struct kmr_kv_box kv[],
const long n,
3463 char kbuf[RECORD_SIZE];
3465 char vbuf[RECORD_SIZE];
3468 int inputs[] = {TAB_Q21_LNS, TAB_Q21_L3};
3471 struct RECORD *d0 = find_description((
enum TABLE)inputs[0]);
3472 struct RECORD *d1 = find_description((
enum TABLE)inputs[1]);
3474 KMR *mr = kvo->c.mr;
3479 for (
long i0 = 0; i0 < cnt[0]; i0++) {
3481 long suppkey0 = get_int_column_by_index(u0, d0->columns, 1);
3482 _Bool nonexists = 1;
3483 for (
long i1 = 0; i1 < cnt[1]; i1++) {
3485 long suppkey1 = get_int_column_by_index(u1, d1->columns, 1);
3486 if (suppkey0 != suppkey1) {
3497 add_record(kvo, k, v);
3505 q21_join_numwait(
const struct kmr_kv_box kv[],
const long n,
3508 char kbuf[RECORD_SIZE];
3510 char vbuf[RECORD_SIZE];
3513 enum TABLE input = TAB_Q21_NAME;
3514 enum TABLE output = TAB_Q21_NUMWAIT;
3516 struct RECORD *d = find_description((
enum TABLE)input);
3518 KMR *mr = kvo->c.mr;
3523 long negnumwait = -n;
3525 char nbuf[NAME_SIZE];
3526 assert(s_name.len <= NAME_SIZE);
3527 memset(nbuf, 0,
sizeof(nbuf));
3528 memcpy(nbuf, s_name.p, s_name.len);
3531 kmr_put_ntuple(mr, k, (
int)
sizeof(kbuf), &negnumwait,
sizeof(
long));
3536 add_record(kvo, k, v);
3542 q21(
KMR *mr,
struct RUN *run)
3544 _Bool pushoff = ((run->pushoff == 0) ? 0 : 1);
3548 if (mr->rank == 0) {printf(
"q21 (with push-off)...\n"); fflush(0);}
3550 if (mr->rank == 0) {printf(
"q21...\n"); fflush(0);}
3555 KMR_KVS *n1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
3556 if (run->load_tables_in_advance) {
3557 kmr_map(N0, n1, 0, kmr_noopt, q21_select_n_by_name);
3559 scan_table_files(n1, TAB_N, q21_select_n_by_name, 0, run);
3562 KMR_KVS *s1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
3564 if (run->load_tables_in_advance) {
3565 kmr_map(S0, s1, &nth, kmr_noopt, key_by_nth);
3567 scan_table_files(s1, TAB_S, key_by_nth, &nth, run);
3572 KMR_KVS *ns0 = JOINP(n1, s1, KMR_KV_OPAQUE,
3573 &q21_join_n_s,
"n+s", pushoff);
3577 if (run->load_tables_in_advance) {
3580 KMR_KVS *l0 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
3581 scan_table_files(l0, TAB_L, 0, 0, run);
3584 KMR_KVS *l1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
3585 kmr_map(L0, l1, 0, inspect, q21_select_l1_by_date);
3587 pcount(l1, 0,
"l1", 1);
3589 KMR_KVS *lns0 = JOINP(l1, ns0, KMR_KV_OPAQUE,
3590 &q21_join_l_ns,
"l+ns", pushoff);
3594 KMR_KVS *o1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
3595 if (run->load_tables_in_advance) {
3596 kmr_map(O0, o1, 0, kmr_noopt, q21_select_o_by_flag);
3598 scan_table_files(o1, TAB_O, q21_select_o_by_flag, 0, run);
3601 pcount(o1, 0,
"o1", 1);
3605 KMR_KVS *lnos0 = JOINP(lns0, o1, KMR_KV_OPAQUE,
3606 &q21_join_lns_o,
"lns+o", pushoff);
3612 KMR_KVS *l2 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
3613 kmr_map(L0, l2, &q21_copy_l, inspect, select_by_fields);
3615 KMR_KVS *lnos2 = JOIN2(lnos0, l2, KMR_KV_OPAQUE,
3616 q21_join_lnos_l2, 0,
"l2+lnos", pushoff);
3620 KMR_KVS *l3 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
3621 kmr_map(L0, l3, 0, inspect, q21_select_l3_by_date);
3624 KMR_KVS *name0 = JOIN2(lnos2, l3, KMR_KV_OPAQUE,
3625 q21_join_lnos_l3, 0,
"l3+lnos", pushoff);
3631 KMR_KVS *ex2 = JOIN1(name0, KMR_KV_OPAQUE,
3632 q21_join_numwait, 0,
"numwait", 0);
3639 pcount(ex3, 0,
"sort", 1);
3646 pcount(ex4, 0,
"choose", 1);
3654 atoi_safe(
char *s,
int ok[],
int n,
char *m)
3658 int cc = sscanf(s,
"%d%c", &v, gomi);
3660 fprintf(stderr,
"%s. Not integer (%s).\n", m, s);
3663 for (
int i = 0; i < n; i++) {
3668 fprintf(stderr,
"%s. Not acceptable (%d).\n", m, v);
3672 struct RUN runs[50];
3676 main(
int argc,
char *argv[])
3678 assert(
sizeof(
long) ==
sizeof(time_t));
3682 setenv(
"TZ",
"UTC", 1);
3685 char *helpstring = (
"%s directory-of-tables [-C -F -P]" 3686 " query [options] query [options]...\n" 3687 " query={7,9,10,13,21}\n" 3688 " options: [-p po -b sz -a -g -r -s]\n" 3689 " -p po: po={0,1,2}, push-off setting\n" 3690 " -b sz: block size for push-off (in KB)\n" 3691 " -a: scan tables in advance\n" 3692 " -g: stop push-off at the end\n" 3693 " -r: redistribute table entries\n" 3694 " -s: use small block size\n" 3695 " -C: report count in messages\n" 3696 " -F: report file read time\n" 3697 " -P: report push-off statistics\n");
3699 int nprocs, rank, thlv;
3701 MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
3702 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
3703 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
3707 fprintf(stderr, helpstring, argv[0]);
3716 struct rlimit core = {.rlim_cur = 0, .rlim_max = 0};
3717 int cc = setrlimit(RLIMIT_CORE, &core);
3718 if (cc != 0) {perror(
"setrlimit(core=0)");}
3723 char *directory = argv[1];
3727 while ((c = xxx_getopt(argc, argv,
"hCFP")) != -1) {
3732 fprintf(stderr, helpstring, argv[0]);
3740 report_count_in_messages = 1;
3743 report_time_to_read = 1;
3746 report_pushoff_statistics = 1;
3751 fprintf(stderr,
"Unknown option (%c)\n", xxx_optopt);
3752 fprintf(stderr, helpstring, argv[0]);
3760 if (xxx_optind >= argc) {
3761 fprintf(stderr, helpstring, argv[0]);
3767 while (xxx_optind < argc) {
3768 int qset[] = {7, 9, 10, 13, 21};
3769 int nq = (int)(
sizeof(qset) /
sizeof(qset[0]));
3770 int query = atoi_safe(argv[xxx_optind], qset, nq,
"Bad query");
3773 fprintf(stderr, helpstring, argv[0]);
3780 runs[nruns].query = query;
3781 runs[nruns].pushoff = 0;
3782 runs[nruns].load_tables_in_advance = 0;
3783 runs[nruns].hang_out_communication = 0;
3784 runs[nruns].redistribute_loaded_tables = 0;
3785 runs[nruns].use_small_block_size = 0;
3786 runs[nruns].pushoff_block_size_in_kilo = 64;
3789 while ((c = xxx_getopt(argc, argv,
"p:b:agrs")) != -1) {
3793 int okset[] = {0, 1, 2};
3794 int nokset = (int)(
sizeof(okset) /
sizeof(okset[0]));
3795 int pushoffv = atoi_safe(xxx_optarg, okset, nokset,
3797 if (pushoffv == -1) {
3799 fprintf(stderr, helpstring, argv[0]);
3805 runs[nruns].pushoff = pushoffv;
3811 int bs = atoi(xxx_optarg);
3812 runs[nruns].pushoff_block_size_in_kilo = (size_t)bs;
3817 runs[nruns].load_tables_in_advance = 1;
3820 runs[nruns].hang_out_communication = 1;
3823 runs[nruns].redistribute_loaded_tables = 1;
3826 runs[nruns].use_small_block_size = 1;
3831 fprintf(stderr,
"Unknown option (%c)\n", xxx_optopt);
3832 fprintf(stderr, helpstring, argv[0]);
3841 if (nruns >= (
int)(
sizeof(runs)/
sizeof(runs[0]))) {
3843 fprintf(stderr,
"run list too long\n");
3844 fprintf(stderr, helpstring, argv[0]);
3855 sc += snprintf((ss + sc), (
sizeof(ss) - (
size_t)sc),
3856 "Running (%d runs) with:", nruns);
3857 for (
int i = 0; i < nruns; i++) {
3858 sc += snprintf((ss + sc), (
sizeof(ss) - (
size_t)sc),
3859 " %d", runs[i].query);
3861 snprintf((ss + sc), (
sizeof(ss) - (
size_t)sc),
"\n");
3871 enum TABLE t[] = {TAB_N, TAB_R, TAB_P, TAB_S,
3872 TAB_H, TAB_C, TAB_O, TAB_L};
3873 int nt = (int)(
sizeof(t) /
sizeof(t[0]));
3874 load_input_tables(nprocs, rank, directory, t, nt);
3883 mr->pushoff_fast_notice = 1;
3888 for (
int i = 0; i < nruns; i++) {
3889 int query = runs[i].query;
3890 _Bool pushoff = ((runs[i].pushoff == 0) ? 0 : 1);
3895 mr->pushoff_hang_out = runs[i].hang_out_communication;
3896 mr->pushoff_fast_notice = (runs[i].pushoff == 2);
3897 mr->pushoff_stat = report_pushoff_statistics;
3898 if (runs[i].use_small_block_size) {
3899 mr->preset_block_size = (64 * 1024);
3901 mr->preset_block_size = (64 * 1024 * 1024);
3903 if (runs[i].pushoff_block_size_in_kilo != 0) {
3904 mr->pushoff_block_size = (runs[i].pushoff_block_size_in_kilo
3908 MPI_Barrier(MPI_COMM_WORLD);
3911 fprintf(stdout,
"Run Q%d (pushoff=%d fast_notice=%d)\n",
3912 query, pushoff, mr->pushoff_fast_notice);
3913 fprintf(stderr,
"Run Q%d (pushoff=%d fast_notice=%d)\n",
3914 query, pushoff, mr->pushoff_fast_notice);
3920 printf(
"OPTION: (-p) pushoff=%d fast_notice=%d\n",
3921 pushoff, mr->pushoff_fast_notice);
3922 printf(
"OPTION: (-a) load_tables_in_advance=%d\n",
3923 runs[i].load_tables_in_advance);
3924 printf(
"OPTION: (-g) pushoff_hang_out=%d\n",
3925 mr->pushoff_hang_out);
3926 printf(
"OPTION: (-r) redistribute_loaded_tables=%d\n",
3927 runs[i].redistribute_loaded_tables);
3928 printf(
"OPTION: (-s) preset_block_size=%zd\n",
3929 mr->preset_block_size);
3930 printf(
"OPTION: (-b) pushoff_block_size=%zd\n",
3931 mr->pushoff_block_size);
3932 printf(
"OPTION: (-C) report_count_in_messages=%d\n",
3933 report_count_in_messages);
3934 printf(
"OPTION: (-F) report_time_to_read=%d\n",
3935 report_time_to_read);
3936 printf(
"OPTION: (-P) report_pushoff_statistics=%d\n",
3937 report_pushoff_statistics);
3944 enum TABLE tbl = TAB_NIL;
3947 if (runs[i].load_tables_in_advance) {
3948 enum TABLE t7[] = {TAB_N, TAB_S, TAB_C, TAB_O, TAB_L};
3950 int nt7 = (int)(
sizeof(t7) /
sizeof(t7[0]));
3951 scan_table_files_in_advance(mr, t7, nt7, &runs[i]);
3953 MPI_Barrier(MPI_COMM_WORLD);
3955 r = q7(mr, &runs[i]);
3956 tbl = TAB_Q7_REVENUE;
3957 MPI_Barrier(MPI_COMM_WORLD);
3962 if (runs[i].load_tables_in_advance) {
3963 enum TABLE t9[] = {TAB_N, TAB_P, TAB_S, TAB_H, TAB_O, TAB_L};
3965 int nt9 = (int)(
sizeof(t9) /
sizeof(t9[0]));
3966 scan_table_files_in_advance(mr, t9, nt9, &runs[i]);
3968 MPI_Barrier(MPI_COMM_WORLD);
3970 r = q9(mr, &runs[i]);
3971 tbl = TAB_Q9_AMOUNT;
3972 MPI_Barrier(MPI_COMM_WORLD);
3977 if (runs[i].load_tables_in_advance) {
3978 enum TABLE t10[] = {TAB_N, TAB_C, TAB_O, TAB_L};
3980 int nt10 = (int)(
sizeof(t10) /
sizeof(t10[0]));
3981 scan_table_files_in_advance(mr, t10, nt10, &runs[i]);
3983 MPI_Barrier(MPI_COMM_WORLD);
3985 r = q10(mr, &runs[i]);
3986 tbl = TAB_Q10_CLNO1;
3987 MPI_Barrier(MPI_COMM_WORLD);
3992 if (runs[i].load_tables_in_advance) {
3993 enum TABLE t13[] = {TAB_C, TAB_O, TAB_L};
3995 int nt13 = (int)(
sizeof(t13) /
sizeof(t13[0]));
3996 scan_table_files_in_advance(mr, t13, nt13, &runs[i]);
3998 MPI_Barrier(MPI_COMM_WORLD);
4000 r = q13(mr, &runs[i]);
4002 MPI_Barrier(MPI_COMM_WORLD);
4007 if (runs[i].load_tables_in_advance) {
4008 enum TABLE t21[] = {TAB_N, TAB_S, TAB_O, TAB_L};
4010 int nt21 = (int)(
sizeof(t21) /
sizeof(t21[0]));
4011 scan_table_files_in_advance(mr, t21, nt21, &runs[i]);
4013 MPI_Barrier(MPI_COMM_WORLD);
4015 r = q21(mr, &runs[i]);
4016 tbl = TAB_Q21_NUMWAIT;
4017 MPI_Barrier(MPI_COMM_WORLD);
4022 if (rank == 0) {printf(
"Run Q%d in %f sec\n", query, (t1 - t0));}
4027 if (mr->rank == 0) {
4028 printf(
"result count=%ld\n", rcnt);
4036 if (pushoff && mr->pushoff_stat) {
4037 char *s =
"STATISTICS on push-off kvs:\n";
4038 kmr_print_statistics_on_pushoff(mr, s);
4044 kmr_fin_pushoff_fast_notice_();
Key-Value Stream (abstract).
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Options to Mapping, Shuffling, and Reduction.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
KMR_KVS * kmr_create_pushoff_kvs(KMR *mr, enum kmr_kv_field kf, enum kmr_kv_field vf, struct kmr_option opt, const char *, const int, const char *)
Makes a new key-value stream with the specified field data-types.
void kmr_reset_ntuple(struct kmr_ntuple *u, int n, int marker)
Resets an n-tuple U with N entries and a MARKER.
int kmr_put_ntuple(KMR *mr, struct kmr_ntuple *u, const int sz, const void *v, const int vlen)
Adds an entry V with LEN in an n-tuple U whose size is limited to SIZE.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
struct kmr_ntuple_entry kmr_nth_ntuple(struct kmr_ntuple *u, int nth)
Returns an NTH entry of an n-tuple.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
int kmr_distribute(KMR_KVS *kvi, KMR_KVS *kvo, _Bool cyclic, struct kmr_option opt)
Distributes key-values so that each rank has approximately the same number of pairs.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
int kmr_concatenate_kvs(KMR_KVS *kvs[], int nkvs, KMR_KVS *kvo, struct kmr_option opt)
Concatenates a number of KVSes to one.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
void kmr_check_pushoff_fast_notice_(KMR *mr)
Check if fast-notice works.
kmr_kv_field
Datatypes of Keys or Values.
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
void kmr_init_pushoff_fast_notice_(MPI_Comm, _Bool verbose)
Initializes RDMA for fast-notice.
int kmr_fin(void)
Clears the environment.
int kmr_get_element_count(KMR_KVS *kvs, long *v)
Gets the total number of key-value pairs.
#define kmr_init()
Sets up the environment.
int kmr_sort(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream globally.
int kmr_size_ntuple(struct kmr_ntuple *u)
Returns the storage size of an n-tuple.
int kmr_separate_ntuples(KMR *mr, const struct kmr_kv_box kv[], const long n, struct kmr_ntuple **vv[2], long cnt[2], int markers[2], _Bool disallow_other_entries)
Separates the n-tuples stored in the value part of KV into the two sets by their marker values...
int kmr_put_ntuple_long(KMR *mr, struct kmr_ntuple *u, const int sz, long v)
Adds an integer value in an n-tuple U whose size is limited to SIZE.
int kmr_product_ntuples(KMR_KVS *kvo, struct kmr_ntuple **vv[2], long cnt[2], int newmarker, int slots[][2], int nslots, int keys[][2], int nkeys)
Makes a direct product of the two sets of n-tuples VV[0] and VV[1] with their counts in CNT[0] and CN...
int kmr_histogram_count_by_ranks(KMR_KVS *kvs, long *frq, double *var, _Bool rankzeroonly)
Fills an integer array FRQ[i] with the count of the elements of each rank.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
int kmr_map_rank_by_rank(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps sequentially with rank by rank for debugging.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
int kmr_put_ntuple_entry(KMR *mr, struct kmr_ntuple *u, const int sz, struct kmr_ntuple_entry e)
Adds an n-tuple entry E in an n-tuple U whose size is limited to SIZE.
int(* kmr_redfn_t)(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg)
Reduce-function Type.
KMR_KVS * kmr_create_kvs7(KMR *mr, enum kmr_kv_field k, enum kmr_kv_field v, struct kmr_option opt, const char *, const int, const char *)
Makes a new key-value stream with the specified field data-types.
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
int kmr_choose_first_part(KMR_KVS *kvi, KMR_KVS *kvo, long n, struct kmr_option opt)
Chooses the first N entries from a key-value stream KVI.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).