KMR
tpch.c
1 /* tpch.c (2014-02-04) */
2 
3 /* It performs SQL queries defined in TPC-H (2.17.0) by map-reduce.
4  See "http://www.tpc.org/tpch/", for references to TPC-H by the
5  Transaction Processing Performance Council. */
6 
7 /* HOW TO RUN:
8  (1) Make database tables.
9  (1.1) Get and unzip "tpch_2_17_0.zip".
10  (1.2) Make in the directory "tpch_2_17_0/dbgen" with:
11  "make -f makefile.suite CC=gcc DATABASE=DB2 MACHINE=LINUX WORKLOAD=TPCH"
12  (1.3) Run "dbgen -f -s 1" ("-s 1" for approximately 1GB of tables).
13  (2) mpirun -np 8 a.out directory-of-tables 7 (for QUERY-7).
14  Queries implemented are 7, 9, 10, 13, 21. See the comment below.
15  (3) Compare with the answer sets in "tpch_2_17_0/dbgen/answers/". */
16 
17 /* The selected queries are Q7, Q9, Q10, Q13, and Q21. The parameters
18  for queries are the given ones in the benchmark for validation
19  runs. Q9 is said to be the most complex one. The others are
20  selected considering the clustering by characteristics of the
21  queries in the paper "Building and Validating a Reduced TPC-H
22  Benchmark (MASCOTS 2006)", although the criteria in the paper are
23  not relevant to the workings of map-reduce. Table name "H" is used
24  instead of "PS", preferring to a one character. */
25 
26 /* MEMO: The "lineitem" table has no primary key, and "l_orderkey" is
27  not one. MEMO: It makes printing/filling in a string a bit
28  difficult. */
29 
30 /* INPUT TABLES:
31 
32  N/nation : {n_nationkey, n_name, n_regionkey, n_comment}
33  R/region : {r_regionkey, r_name, r_comment}
34  P/part : {p_partkey, p_name, p_mfgr, p_brand, p_type, p_size,
35  p_container, p_retailprice, p_comment}
36  S/supplier : {s_suppkey, s_name, s_address, s_nationkey, s_phone,
37  s_acctbal, s_comment}
38  H/partsupp : {ps_partkey, ps_suppkey, ps_availqty, ps_supplycost,
39  ps_comment}
40  C/customer : {c_custkey, c_name, c_address, c_nationkey, c_phone,
41  c_acctbal, c_mktsegment, c_comment}
42  O/orders : {o_orderkey, o_custkey, o_orderstatus, o_totalprice,
43  o_orderdate, o_orderpriority, o_clerk, o_shippriority,
44  o_comment}
45  L/lineitem : {l_orderkey, l_partkey, l_suppkey, l_linenumber,
46  l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,
47  l_linestatus, l_shipdate, l_commitdate, l_receiptdate,
48  l_shipinstruct, l_shipmode, l_comment}
49 
50  * RECORDS OF 1 GB DATA SET (Data rows generated by "dbgen -s 1")
51  region.tbl: 5
52  nation.tbl: 25
53  supplier.tbl: 10,000
54  customer.tbl: 150,000
55  part.tbl: 200,000
56  partsupp.tbl: 800,000
57  orders.tbl: 1,500,000
58  lineitem.tbl: 6,001,215 */
59 
60 #include <mpi.h>
61 #include <stdio.h>
62 #include <stdlib.h>
63 #include <unistd.h>
64 #include <inttypes.h>
65 #include <string.h>
66 #include <fcntl.h>
67 #include <errno.h>
68 #include <sys/types.h>
69 #include <sys/stat.h>
70 #include <sys/resource.h>
71 #include <netinet/in.h>
72 #include <time.h>
73 #include <sys/time.h>
74 #include <assert.h>
75 #include "kmr.h"
76 
77 #define MIN(a,b) (((a)<(b))?(a):(b))
78 #define NEVERHERE 0
79 
80 struct RUN {
81  int query;
82  int pushoff;
83  _Bool load_tables_in_advance;
84  _Bool hang_out_communication;
85  _Bool redistribute_loaded_tables;
86  _Bool use_small_block_size;
87  size_t pushoff_block_size_in_kilo;
88 };
89 
90 _Bool load_tables_in_advance_;
91 _Bool redistribute_loaded_tables_;
92 
93 _Bool files_in_rank_directory = 0;
94 
95 _Bool report_count_in_messages = 0;
96 _Bool report_time_to_read = 0;
97 _Bool report_pushoff_statistics = 0;
98 
99 #undef USE_TIME_FUNCTIONS
100 
101 #define MAXCOLS (16)
102 #define NAME_SIZE (25)
103 
104 /* Join argument: first (left) or second (right). */
105 
106 enum {T_FST = 0, T_SND = 1};
107 #define FST ((int)T_FST)
108 #define SND ((int)T_SND)
109 
110 /* MEMO: strnstr() and htonll() are not POSIX */
111 
112 /* strnlen() in BSD. */
113 
114 static size_t
115 strnlen_(const char *s, size_t n)
116 {
117  if (s == 0) {
118  return 0;
119  }
120  const char *p = s;
121  const char *limit = (s + n);
122  while (*p != 0 && p < limit) {
123  p++;
124  }
125  return (size_t)(p - s);
126 }
127 
128 /* strnstr() in BSD. */
129 
130 static char *
131 strnstr_(const char *s1, const char *s2, size_t n)
132 {
133  if (s1 == 0 || s2 == 0 || *s2 == 0) {
134  return (char *)s1;
135  }
136  size_t len = strnlen_(s2, n);
137  const char *limit = (s1 + n - len + 1);
138  int c0 = *s2;
139  const char *p = s1;
140  while (*p != 0 && p < limit) {
141  if (*p == c0 && strncmp(p, s2, len) == 0) {
142  return (char *)p;
143  }
144  p++;
145  }
146  return 0;
147 }
148 
149 /* htonll() in SOLARIS. */
150 
151 static uint64_t
152 htonll_(uint64_t x)
153 {
154  union {uint64_t ll; uint32_t i[2];} v;
155  uint32_t lo = (uint32_t)x;
156  uint32_t hi = (uint32_t)(x >> 32);
157  v.i[0] = htonl(hi);
158  v.i[1] = htonl(lo);
159  return v.ll;
160 }
161 
162 /* getopt() reduced. */
163 
164 static char *xxx_optarg;
165 static int xxx_optind = 1;
166 static int xxx_optopt;
167 /*static int xxx_opterr;*/
168 
169 static int
170 xxx_getopt(int argc, char **argv, char *optstring)
171 {
172 
173  if (xxx_optind >= argc) {
174  return -1;
175  }
176  char *p = argv[xxx_optind];
177  if (p[0] != '-') {
178  xxx_optopt = p[0];
179  return -1;
180  } else if (strcmp(p, "-") == 0) {
181  xxx_optopt = p[0];
182  return -1;
183  } else if (strcmp(p, "--") == 0) {
184  xxx_optopt = p[0];
185  xxx_optind++;
186  return -1;
187  } else {
188  xxx_optopt = p[1];
189  char *o = strchr(optstring, xxx_optopt);
190  if (o == 0) {
191  return '?';
192  } else if (o[1] != ':') {
193  xxx_optarg = 0;
194  if (p[2] == 0) {
195  xxx_optind++;
196  }
197  return xxx_optopt;
198  } else {
199  if (p[2] == 0) {
200  xxx_optind++;
201  if (xxx_optind < argc) {
202  xxx_optarg = argv[xxx_optind];
203  xxx_optind++;
204  return xxx_optopt;
205  } else {
206  return '?';
207  }
208  } else {
209  xxx_optarg = &p[2];
210  xxx_optind++;
211  return xxx_optopt;
212  }
213  }
214  }
215 }
216 
217 enum TABLE {
218  TAB_NIL = 0,
219 
220  TAB_N, TAB_R, TAB_P, TAB_S, TAB_H, TAB_C, TAB_O, TAB_L,
221 
222  TAB_Q7_N1, TAB_Q7_NN, TAB_Q7_S1, TAB_Q7_NNS,
223  TAB_Q7_C1, TAB_Q7_O1, TAB_Q7_CO, TAB_Q7_L1,
224  TAB_Q7_CLO, TAB_Q7_CLNNOS0, TAB_Q7_CLNNOS1, TAB_Q7_REVENUE,
225 
226  TAB_Q9, TAB_Q9_PH, TAB_Q9_NS, TAB_Q9_PHSN,
227  TAB_Q9_LO, TAB_Q9_PHSNLO, TAB_Q9_AMOUNT,
228 
229  TAB_Q10_L1, TAB_Q10_O1, TAB_Q10_LO, TAB_Q10_C1,
230  TAB_Q10_N1, TAB_Q10_CN, TAB_Q10_CLNO0, TAB_Q10_CLNO1,
231 
232  TAB_Q13_O1, TAB_Q13_CO0, TAB_Q13_CO1,
233 
234  TAB_Q21_N1, TAB_Q21_NS, TAB_Q21_L1, TAB_Q21_O1,
235  TAB_Q21_L3, TAB_Q21_LNS, TAB_Q21_NAME,
236  TAB_Q21_NUMWAIT
237 };
238 
239 KMR_KVS *N0, *R0, *P0, *S0, *H0, *C0, *O0, *L0;
240 
241 enum FIELD {F_NIL, F_ZAHL, F_REAL, F_TEXT, F_DATE};
242 
243 struct COLUMN {
244  char *label;
245  enum FIELD field;
246 };
247 
248 struct RECORD {
249  int ncolumns;
250  struct COLUMN columns[MAXCOLS];
251  int nkeys;
252  struct COLUMN keys[MAXCOLS];
253 };
254 
255 /* Byte Count Limit to Store a Record. */
256 
257 #define RECORD_SIZE (1024)
258 
259 struct TABLE_INFO {
260  enum TABLE name;
261  struct {
262  KMR_KVS **variable;
263  char *file;
264  int nread;
265  int nfiles;
266  int nb;
267  void **buffers;
268  size_t *sizes;
269  } data;
270  struct RECORD description;
271 } tables[] = {
272  {TAB_N, {&N0, "nation.tbl", 1, 0, 0, 0, 0},
273  {4, {{"n_nationkey", F_ZAHL},
274  {"n_name", F_TEXT},
275  {"n_regionkey", F_ZAHL},
276  {"n_comment", F_TEXT}},
277  1, {{"n_nationkey", F_ZAHL}}}},
278  {TAB_R, {&R0, "region.tbl", 1, 0, 0, 0, 0},
279  {3, {{"r_regionkey", F_ZAHL},
280  {"r_name", F_TEXT},
281  {"r_comment", F_TEXT}},
282  1, {{"r_regionkey", F_ZAHL}}}},
283  {TAB_P, {&P0, "part.tbl", 2, 0, 0, 0, 0},
284  {9, {{"p_partkey", F_ZAHL},
285  {"p_name", F_TEXT},
286  {"p_mfgr", F_TEXT},
287  {"p_brand", F_TEXT},
288  {"p_type", F_TEXT},
289  {"p_size", F_ZAHL},
290  {"p_container", F_TEXT},
291  {"p_retailprice", F_REAL},
292  {"p_comment", F_TEXT}},
293  1, {{"p_partkey", F_ZAHL}}}},
294  {TAB_S, {&S0, "supplier.tbl", 2, 0, 0, 0, 0},
295  {7, {{"s_suppkey", F_ZAHL},
296  {"s_name", F_TEXT},
297  {"s_address", F_TEXT},
298  {"s_nationkey", F_ZAHL},
299  {"s_phone", F_TEXT},
300  {"s_acctbal", F_REAL},
301  {"s_comment", F_TEXT}},
302  1, {{"s_suppkey", F_ZAHL}}}},
303  {TAB_H, {&H0, "partsupp.tbl", 2, 0, 0, 0, 0},
304  {5, {{"ps_partkey", F_ZAHL},
305  {"ps_suppkey", F_ZAHL},
306  {"ps_availqty", F_ZAHL},
307  {"ps_supplycost", F_REAL},
308  {"ps_comment", F_TEXT}},
309  1, {{"ps_partkey", F_ZAHL}}}},
310  {TAB_C, {&C0, "customer.tbl", 2, 0, 0, 0, 0},
311  {8, {{"c_custkey", F_ZAHL},
312  {"c_name", F_TEXT},
313  {"c_address", F_TEXT},
314  {"c_nationkey", F_ZAHL},
315  {"c_phone", F_TEXT},
316  {"c_acctbal", F_REAL},
317  {"c_mktsegment", F_TEXT},
318  {"c_comment", F_TEXT}},
319  1, {{"c_custkey", F_ZAHL}}}},
320  {TAB_O, {&O0, "orders.tbl", 2, 0, 0, 0, 0},
321  {9, {{"o_orderkey", F_ZAHL},
322  {"o_custkey", F_ZAHL},
323  {"o_orderstatus", F_TEXT},
324  {"o_totalprice", F_REAL},
325  {"o_orderdate", F_DATE},
326  {"o_orderpriority", F_TEXT},
327  {"o_clerk", F_TEXT},
328  {"o_shippriority", F_ZAHL},
329  {"o_comment", F_TEXT}},
330  1, {{"o_orderkey", F_ZAHL}}}},
331  {TAB_L, {&L0, "lineitem.tbl", 2, 0, 0, 0, 0},
332  {16, {{"l_orderkey", F_ZAHL},
333  {"l_partkey", F_ZAHL},
334  {"l_suppkey", F_ZAHL},
335  {"l_linenumber", F_REAL},
336  {"l_quantity", F_REAL},
337  {"l_extendedprice", F_REAL},
338  {"l_discount", F_REAL},
339  {"l_tax", F_REAL},
340  {"l_returnflag", F_TEXT},
341  {"l_linestatus", F_TEXT},
342  {"l_shipdate", F_DATE},
343  {"l_commitdate", F_DATE},
344  {"l_receiptdate", F_DATE},
345  {"l_shipinstruct", F_TEXT},
346  {"l_shipmode", F_TEXT},
347  {"l_comment", F_TEXT}},
348  1, {{"l_orderkey", F_ZAHL}}}},
349 
350  /*(Q7)*/
351 
352  {TAB_Q7_N1, {0},
353  {2, {{"n_nationkey", F_ZAHL},
354  {"n_name", F_TEXT}},
355  0, {{0, F_NIL}}}},
356  {TAB_Q7_NN, {0},
357  {4, {{"n1.n_nationkey", F_ZAHL},
358  {"n1.n_name", F_TEXT},
359  {"n2.n_nationkey", F_ZAHL},
360  {"n2.n_name", F_TEXT}},
361  1, {{"n1.n_nationkey", F_ZAHL}}}},
362  {TAB_Q7_S1, {0},
363  {2, {{"s_nationkey", F_ZAHL},
364  {"s_suppkey", F_ZAHL}},
365  1, {{"s_nationkey", F_ZAHL}}}},
366  {TAB_Q7_NNS, {0},
367  {4, {{"n2.n_nationkey", F_ZAHL},
368  {"s_suppkey", F_ZAHL},
369  {"n1.n_name", F_TEXT},
370  {"n2.n_name", F_TEXT}},
371  2, {{"n2.s_nationkey", F_ZAHL},
372  {"s_suppkey", F_ZAHL}}}},
373  {TAB_Q7_C1, {0},
374  {2, {{"c_custkey", F_ZAHL},
375  {"c_nationkey", F_ZAHL}},
376  1, {{"c_custkey", F_ZAHL}}}},
377  {TAB_Q7_O1, {0},
378  {2, {{"o_custkey", F_ZAHL},
379  {"o_orderkey", F_ZAHL}},
380  1, {{"o_custkey", F_ZAHL}}}},
381  {TAB_Q7_CO, {0},
382  {2, {{"o_orderkey", F_ZAHL},
383  {"c_nationkey", F_ZAHL}},
384  1, {{"o_orderkey", F_ZAHL}}}},
385  {TAB_Q7_L1, {0},
386  {4, {{"l_orderkey", F_ZAHL},
387  {"l_suppkey", F_ZAHL},
388  {"year", F_DATE},
389  {"volume", F_REAL}},
390  1, {{"l_orderkey", F_ZAHL}}}},
391  {TAB_Q7_CLO, {0},
392  {4, {{"c_nationkey", F_ZAHL},
393  {"l_suppkey", F_ZAHL},
394  {"year", F_DATE},
395  {"volume", F_REAL}},
396  2, {{"c_nationkey", F_ZAHL},
397  {"l_suppkey", F_ZAHL}}}},
398  {TAB_Q7_CLNNOS0, {0},
399  {4, {{"n1.n_name", F_TEXT},
400  {"n2.n_name", F_TEXT},
401  {"year", F_DATE},
402  {"volume", F_REAL}},
403  0, {{0, F_NIL}}}},
404  {TAB_Q7_CLNNOS1, {0},
405  {4, {{"n1.n_name", F_TEXT},
406  {"n2.n_name", F_TEXT},
407  {"year", F_DATE},
408  {"volume", F_REAL}},
409  3, {{"n1.n_name", F_TEXT},
410  {"n2.n_name", F_TEXT},
411  {"year", F_DATE}}}},
412  {TAB_Q7_REVENUE, {0},
413  {4, {{"n1.n_name", F_TEXT},
414  {"n2.n_name", F_TEXT},
415  {"year", F_DATE},
416  {"revenue", F_REAL}},
417  3, {{"n1.n_name", F_TEXT},
418  {"n2.n_name", F_TEXT},
419  {"year", F_DATE}}}},
420 
421  /*(Q9)*/
422 
423  {TAB_Q9, {0},
424  {3, {{"n_name", F_TEXT},
425  {"o_orderdate", F_DATE},
426  {"amount", F_REAL}},
427  1, {{"nation+year", F_NIL}}}},
428  {TAB_Q9_PH, {0},
429  {3, {{"ps_partkey", F_ZAHL},
430  {"ps_suppkey", F_ZAHL},
431  {"ps_supplycost", F_REAL}},
432  1, {{"ps_suppkey", F_ZAHL}}}},
433  {TAB_Q9_NS, {0},
434  {2, {{"n_name", F_TEXT},
435  {"s_suppkey", F_ZAHL}},
436  1, {{"s_suppkey", F_ZAHL}}}},
437  {TAB_Q9_PHSN, {0},
438  {4, {{"n_name", F_TEXT},
439  {"ps_partkey", F_ZAHL},
440  {"ps_suppkey", F_ZAHL},
441  {"ps_supplycost", F_REAL}},
442  2, {{"ps_partkey", F_ZAHL},
443  {"ps_suppkey", F_ZAHL}}}},
444  {TAB_Q9_LO, {0},
445  {6, {{"l_discount", F_REAL},
446  {"l_extendedprice", F_REAL},
447  {"l_partkey", F_ZAHL},
448  {"l_quantity", F_REAL},
449  {"l_suppkey", F_ZAHL},
450  {"o_orderdate", F_DATE}},
451  2, {{"l_partkey", F_ZAHL},
452  {"l_suppkey", F_ZAHL}}}},
453  {TAB_Q9_PHSNLO, {0},
454  {6, {{"l_discount", F_REAL},
455  {"l_extendedprice", F_REAL},
456  {"l_quantity", F_REAL},
457  {"n_name", F_TEXT},
458  {"o_orderdate", F_DATE},
459  {"ps_supplycost", F_REAL}},
460  1, {{"o_orderdate", F_DATE}}}},
461  {TAB_Q9_AMOUNT, {0},
462  {3, {{"nation", F_TEXT},
463  {"year", F_DATE},
464  {"amount", F_REAL}},
465  2, {{"nation", F_TEXT},
466  {"year", F_DATE}}}},
467 
468  /*(Q10)*/
469 
470  {TAB_Q10_L1, {0},
471  {2, {{"l_orderkey", F_ZAHL},
472  {"volume", F_REAL}},
473  1, {{"l_orderkey", F_ZAHL}}}},
474  {TAB_Q10_O1, {0},
475  {2, {{"o_orderkey", F_ZAHL},
476  {"o_custkey", F_ZAHL}},
477  1, {{"o_orderkey", F_ZAHL}}}},
478  {TAB_Q10_LO, {0},
479  {2, {{"o_custkey", F_ZAHL},
480  {"volume", F_REAL}},
481  1, {{"o_custkey", F_ZAHL}}}},
482  {TAB_Q10_C1, {0},
483  {7, {{"c_nationkey", F_ZAHL},
484  {"c_custkey", F_ZAHL},
485  {"c_name", F_TEXT},
486  {"c_acctbal", F_REAL},
487  {"c_phone", F_TEXT},
488  {"c_address", F_TEXT},
489  {"c_comment", F_TEXT}},
490  1, {{"c_nationkey", F_ZAHL}}}},
491  {TAB_Q10_N1, {0},
492  {7, {{"n_nationkey", F_ZAHL},
493  {"n_name", F_TEXT}},
494  1, {{"n_nationkey", F_ZAHL}}}},
495  {TAB_Q10_CN, {0},
496  {7, {{"c_custkey", F_ZAHL},
497  {"c_name", F_TEXT},
498  {"c_acctbal", F_REAL},
499  {"c_phone", F_TEXT},
500  {"n_name", F_TEXT},
501  {"c_address", F_TEXT},
502  {"c_comment", F_TEXT}},
503  1, {{"c_custkey", F_ZAHL}}}},
504  {TAB_Q10_CLNO0, {0},
505  {8, {{"c_custkey", F_ZAHL},
506  {"c_name", F_TEXT},
507  {"c_acctbal", F_REAL},
508  {"c_phone", F_TEXT},
509  {"n_name", F_TEXT},
510  {"c_address", F_TEXT},
511  {"c_comment", F_TEXT},
512  {"volume", F_REAL}},
513  7, {{"c_custkey", F_ZAHL},
514  {"c_name", F_TEXT},
515  {"c_acctbal", F_REAL},
516  {"c_phone", F_TEXT},
517  {"n_name", F_TEXT},
518  {"c_address", F_TEXT},
519  {"c_comment", F_TEXT}}}},
520  {TAB_Q10_CLNO1, {0},
521  {8, {{"c_custkey", F_ZAHL},
522  {"c_name", F_TEXT},
523  {"revenue", F_REAL},
524  {"c_acctbal", F_REAL},
525  {"n_name", F_TEXT},
526  {"c_address", F_TEXT},
527  {"c_phone", F_TEXT},
528  {"c_comment", F_TEXT}},
529  1, {{"revenue", F_REAL}}}},
530 
531  /*(Q13)*/
532 
533  {TAB_Q13_O1, {0},
534  {1, {{"o_orderkey", F_ZAHL}},
535  1, {{"o_custkey", F_ZAHL}}}},
536  {TAB_Q13_CO0, {0},
537  {1, {{"c_custkey", F_ZAHL}},
538  1, {{"q13_count", F_ZAHL}}}},
539  {TAB_Q13_CO1, {0},
540  {2, {{"q13_custdist", F_ZAHL},
541  {"q13_count", F_ZAHL}},
542  2, {{"q13_custdist", F_ZAHL},
543  {"q13_count", F_ZAHL}}}},
544 
545  /*(Q21)*/
546 
547  {TAB_Q21_N1, {0},
548  {1, {{"n_nationkey", F_ZAHL}},
549  1, {{"n_nationkey", F_ZAHL}}}},
550  {TAB_Q21_NS, {0},
551  {1, {{"s_suppkey", F_ZAHL},
552  {"s_name", F_TEXT}},
553  1, {{"s_suppkey", F_ZAHL}}}},
554  {TAB_Q21_L1, {0},
555  {2, {{"l_orderkey", F_ZAHL},
556  {"l_suppkey", F_ZAHL}},
557  1, {{"l_suppkey", F_ZAHL}}}},
558  {TAB_Q21_L3, {0},
559  {2, {{"l_orderkey", F_ZAHL},
560  {"l_suppkey", F_ZAHL}},
561  1, {{"l_orderkey", F_ZAHL}}}},
562  {TAB_Q21_LNS, {0},
563  {3, {{"l_orderkey", F_ZAHL},
564  {"l_suppkey", F_ZAHL},
565  {"s_name", F_TEXT}},
566  1, {{"l_orderkey", F_ZAHL}}}},
567  {TAB_Q21_O1, {0},
568  {1, {{"o_orderkey", F_ZAHL}},
569  1, {{"o_orderkey", F_ZAHL}}}},
570  {TAB_Q21_NAME, {0},
571  {1, {{"s_name", F_TEXT}},
572  1, {{"s_name", F_TEXT}}}},
573  {TAB_Q21_NUMWAIT, {0},
574  {2, {{"s_name", F_TEXT},
575  {"numwait", F_ZAHL}},
576  1, {{"numwait", F_ZAHL},
577  {"s_name", F_TEXT}}}}};
578 
579 /* Simple Join Command to join_by_fields(). */
580 
581 struct PRODUCT {
582  int /*enum TABLE*/ inputs[2];
583  int /*enum TABLE*/ output;
584  int ncolumns;
585  int columns[MAXCOLS][2];
586  int nkeys;
587  int keys[MAXCOLS][2];
588  _Bool trace_product;
589  _Bool trace_product_nonempty;
590  _Bool cnt0_zero;
591  _Bool cnt1_zero;
592  _Bool cnt0_one;
593  _Bool cnt1_one;
594  _Bool cnt0_zero_one;
595  _Bool cnt1_zero_one;
596  _Bool cnt0_nonzero;
597  _Bool cnt1_nonzero;
598 };
599 
600 /* Simple Select Command to select_by_fields(). */
601 
602 struct SELECT {
603  int /*enum TABLE*/ input;
604  int /*enum TABLE*/ output;
605  int ncolumns;
606  int columns[MAXCOLS];
607  int nkeys;
608  int keys[MAXCOLS];
609 };
610 
612  enum TABLE input;
613  enum TABLE output;
614  int ncolumns;
615  char *columns[MAXCOLS];
616  int nkeys;
617  char *keys[MAXCOLS];
618 };
619 
620 struct SCAN_OP {
621  struct TABLE_INFO *tbl;
622  kmr_mapfn_t fn;
623  void *arg;
624 };
625 
626 static double
627 wtime()
628 {
629  static struct timeval tv0 = {.tv_sec = 0};
630  struct timeval tv;
631  int cc;
632  cc = gettimeofday(&tv, 0);
633  assert(cc == 0);
634  if (tv0.tv_sec == 0) {
635  tv0 = tv;
636  assert(tv0.tv_sec != 0);
637  }
638  double dt = ((double)(tv.tv_sec - tv0.tv_sec)
639  + ((double)(tv.tv_usec - tv0.tv_usec) * 1e-6));
640  return dt;
641 }
642 
643 /* NOTE: IT IS SYNCHRONIZING WHEN report_count_in_messages. */
644 
645 static void
646 pcount(KMR_KVS *kvs0, KMR_KVS *kvs1, char *msg, _Bool before0after1)
647 {
648  KMR *mr = kvs0->c.mr;
649  if (report_count_in_messages) {
650  long c0;
651  kmr_get_element_count(kvs0, &c0);
652  long c1 = 0;
653  if (kvs1 != 0) {
654  kmr_get_element_count(kvs1, &c1);
655  }
656  if (mr->rank == 0) {
657  char *s0 = (before0after1 == 0 ? "before" : "after");
658  char *s1 = (before0after1 == 0 ? "..." : "");
659  if (kvs1 != 0) {
660  printf("%s %s #=%ld #=%ld%s\n", msg, s0, c0, c1, s1);
661  fflush(0);
662  } else {
663  printf("%s %s #=%ld%s\n", msg, s0, c0, s1);
664  fflush(0);
665  }
666  }
667  } else {
668  if (mr->rank == 0) {
669  char *s0 = (before0after1 == 0 ? "before" : "after");
670  char *s1 = (before0after1 == 0 ? "..." : "");
671  printf("%s %s%s\n", msg, s0, s1);
672  fflush(0);
673  }
674  }
675 }
676 
677 /* NOTE: IT IS SYNCHRONIZING WHEN report_count_in_messages. */
678 
679 static void
680 ptime(KMR_KVS *kvs0, KMR_KVS *kvs1, char *func, char *msg, double dt)
681 {
682  KMR *mr = kvs0->c.mr;
683  if (report_count_in_messages) {
684  if (func == 0) {
685  /* Just synchronize. */
686  long c0;
687  kmr_get_element_count(kvs0, &c0);
688  } else {
689  long c0;
690  kmr_get_element_count(kvs0, &c0);
691  long c1 = 0;
692  if (kvs1 != 0) {
693  kmr_get_element_count(kvs1, &c1);
694  }
695  if (mr->rank == 0) {
696  if (kvs1 != 0) {
697  printf("%s (%s) #=%ld #=%ld in %f sec\n",
698  func, msg, c0, c1, dt);
699  fflush(0);
700  } else {
701  printf("%s (%s) #=%ld in %f sec\n",
702  func, msg, c0, dt);
703  fflush(0);
704  }
705  }
706  }
707  } else {
708  if (func == 0) {
709  /* nothing. */
710  } else if (mr->rank == 0) {
711  printf("%s (%s) in %f sec\n", func, msg, dt);
712  fflush(0);
713  }
714  }
715 }
716 
717 static void
718 phisto(KMR_KVS *kvs, char *msg)
719 {
720  KMR *mr = kvs->c.mr;
721  int nprocs = mr->nprocs;
722  long histo[nprocs];
723  double var[4];
724  kmr_histogram_count_by_ranks(kvs, histo, var, 1);
725  if (mr->rank == 0) {
726  for (int r = 0; r < nprocs; r++) {
727  printf("%s histo[%d]=%ld\n", msg, r, histo[r]);
728  }
729  fflush(0);
730  }
731 }
732 
733 /* Adds a record into KVO. It strips off a tuple wrapping a key, when
734  it is a single-entry to let sorting compare properly. */
735 
736 static void
737 add_record(KMR_KVS *kvo, struct kmr_ntuple *k, struct kmr_ntuple *v)
738 {
739  if (k->n == 1) {
740  struct kmr_ntuple_entry e = kmr_nth_ntuple(k, 0);
741  union kmr_unit_sized k1;
742  switch (kvo->c.key_data) {
743  case KMR_KV_BAD:
744  assert(kvo->c.key_data != KMR_KV_BAD);
745  k1.i = 0;
746  break;
747  case KMR_KV_INTEGER:
748  k1.i = *(long *)e.p;
749  break;
750  case KMR_KV_FLOAT8:
751  k1.d = *(double *)e.p;
752  break;
753  case KMR_KV_OPAQUE:
754  case KMR_KV_CSTRING:
755  case KMR_KV_POINTER_OWNED:
756  case KMR_KV_POINTER_UNMANAGED:
757  k1.p = e.p;
758  break;
759  default:
760  assert(NEVERHERE);
761  k1.i = 0;
762  break;
763  }
764  struct kmr_kv_box kv = {
765  .klen = e.len,
766  .k = k1,
767  .vlen = kmr_size_ntuple(v),
768  .v.p = (void *)v
769  };
770  kmr_add_kv(kvo, kv);
771  } else {
772  struct kmr_kv_box kv = {
773  .klen = kmr_size_ntuple(k),
774  .k.p = (void *)k,
775  .vlen = kmr_size_ntuple(v),
776  .v.p = (void *)v
777  };
778  kmr_add_kv(kvo, kv);
779  }
780 }
781 
782 static struct TABLE_INFO *
783 find_table(enum TABLE table)
784 {
785  int ntables = (sizeof(tables) / sizeof(tables[0]));
786  for (int i = 0; i < ntables; i++) {
787  if (tables[i].name == table) {
788  return &(tables[i]);
789  }
790  }
791  assert(NEVERHERE);
792  return 0;
793 }
794 
795 static struct RECORD *
796 find_description(enum TABLE table)
797 {
798  struct TABLE_INFO *tbl = find_table(table);
799  return &(tbl->description);
800 }
801 
802 static int
803 column_index_by_name(struct RECORD *description, char *name)
804 {
805  for (int i = 0; i < MAXCOLS; i++) {
806  if (description->columns[i].label == 0) {
807  break;
808  }
809  if (strcmp(description->columns[i].label, name) == 0) {
810  return i;
811  }
812  }
813  assert(NEVERHERE);
814  return 0;
815 }
816 
817 static struct kmr_ntuple_entry
818 column_by_name(struct kmr_ntuple *u, struct RECORD *description, char *name)
819 {
820  int c = column_index_by_name(description, name);
821  struct kmr_ntuple_entry e = kmr_nth_ntuple(u, c);
822  return e;
823 }
824 
825 static void
826 put_columns_by_indexes(KMR *mr, struct kmr_ntuple *v, size_t vsz,
827  struct kmr_ntuple *u, int *cols, int ncols)
828 {
829  for (int i = 0; i < ncols; i++) {
830  struct kmr_ntuple_entry e = kmr_nth_ntuple(u, cols[i]);
831  kmr_put_ntuple_entry(mr, v, (int)vsz, e);
832  }
833 }
834 
835 static void
836 put_columns_by_names(KMR *mr, struct kmr_ntuple *v, size_t vsz,
837  struct kmr_ntuple *u, struct RECORD *d,
838  char **columns, int ncolumns)
839 {
840  for (int i = 0; i < ncolumns; i++) {
841  char *name = columns[i];
842  struct kmr_ntuple_entry e = column_by_name(u, d, name);
843  kmr_put_ntuple_entry(mr, v, (int)vsz, e);
844  }
845 }
846 
847 #if 0
848 static void
849 assert_column_fields(int inputs[2], int output,
850  int columns[][2], int ncolumns,
851  int keys[][2], int nkeys)
852 {
853  struct RECORD *a = find_description((enum TABLE)inputs[0]);
854  struct RECORD *b = find_description((enum TABLE)inputs[1]);
855  struct RECORD *o = find_description((enum TABLE)output);
856  assert(o->ncolumns == ncolumns);
857  for (int i = 0; i < ncolumns; i++) {
858  int *choice = columns[i];
859  assert(choice[0] < (choice[1] == 0 ? a : b)->ncolumns);
860  enum FIELD fi = ((choice[1] == 0 ? a : b)->columns[choice[0]]).field;
861  enum FIELD fo = o->columns[i].field;
862  assert(fi == fo);
863  }
864  assert(o->nkeys == nkeys);
865  for (int i = 0; i < nkeys; i++) {
866  int *choice = keys[i];
867  assert(choice[0] < (choice[1] == 0 ? a : b)->ncolumns);
868  enum FIELD fi = ((choice[1] == 0 ? a : b)->columns[choice[0]]).field;
869  enum FIELD fo = o->keys[i].field;
870  assert(fi == fo);
871  }
872 }
873 #endif
874 
875 /* It returns an integer for a date string, which keeps ordering and
876  is invertible. (Returns time_t for a date string (ISO) or -1). */
877 
878 static time_t
879 decode_date(char *p)
880 {
881 #ifdef USE_TIME_FUNCTIONS
882  {
883  struct tm tm;
884  char *end = strptime(p, "%F", &tm);
885  time_t tv = mktime(&tm);
886  if ((tv == (time_t)-1) || ((end - p) != 10)) {
887  return (time_t)-1;
888  }
889  return tv;
890  }
891 #else
892  {
893  assert(sizeof(time_t) >= 8);
894  /*'1995-03-15'*/
895  if ((p[4] != '-') || (p[7] != '-')) {
896  return (time_t)-1;
897  }
898  for (int i = 0; i < 10; i++) {
899  if ((i != 4 && i != 7) && !('0' <= p[i] && p[i] <= '9')) {
900  return (time_t)-1;
901  }
902  }
903  long v = (((long)p[0] << 8*7) | ((long)p[1] << 8*6)
904  | ((long)p[2] << 8*5) | ((long)p[3] << 8*4)
905  | ((long)p[5] << 8*3) | ((long)p[6] << 8*2)
906  | ((long)p[8] << 8*1) | ((long)p[9] << 8*0));
907  time_t tv = v;
908  return tv;
909  }
910 #endif /*USE_TIME_FUNCTIONS*/
911 }
912 
913 /* Fills a string for a date and returns its length or returns 0. */
914 
915 static size_t
916 format_date(char *s, size_t sz, time_t tv)
917 {
918 #ifdef USE_TIME_FUNCTIONS
919  {
920  struct tm tmbuf;
921  struct tm *tm = localtime_r(&tv, &tmbuf);
922  if (tm == 0) {
923  return 0;
924  }
925  size_t cx = strftime(s, sz, "%F", tm);
926  return cx;
927  }
928 #else
929  {
930  assert(sz >= 10);
931  long v = tv;
932  s[0] = (char)((v >> 8*7) & 0xff);
933  s[1] = (char)((v >> 8*6) & 0xff);
934  s[2] = (char)((v >> 8*5) & 0xff);
935  s[3] = (char)((v >> 8*4) & 0xff);
936  s[4] = '-';
937  s[5] = (char)((v >> 8*3) & 0xff);
938  s[6] = (char)((v >> 8*2) & 0xff);
939  s[7] = '-';
940  s[8] = (char)((v >> 8*1) & 0xff);
941  s[9] = (char)((v >> 8*0) & 0xff);
942  return 10;
943  }
944 #endif /*USE_TIME_FUNCTIONS*/
945 }
946 
947 /* Returns the year part (as January first) of the date. */
948 
949 static time_t
950 year_value(time_t date)
951 {
952 #ifdef USE_TIME_FUNCTIONS
953  {
954  struct tm tm;
955  struct tm *tmx = localtime_r(&date, &tm);
956  assert(tmx != 0);
957  tm.tm_sec = 0;
958  tm.tm_min = 0;
959  tm.tm_hour = 0;
960  tm.tm_mday = 1;
961  tm.tm_mon = 0;
962  /*tm.tm_year*/
963  /*tm.tm_wday*/
964  /*tm.tm_yday*/
965  /*tm.tm_isdst*/
966  time_t year = mktime(&tm);
967  assert(year != (time_t)-1);
968 
969 #if 0
970  char tv0[32], tv1[32];
971  struct tm tm0, tm1;
972  localtime_r(&date, &tm0);
973  strftime(tv0, sizeof(tv0), "%F", &tm0);
974  localtime_r(&year, &tm1);
975  strftime(tv1, sizeof(tv1), "%F", &tm1);
976  printf("date=%s year=%s\n", tv0, tv1); fflush(0);
977 #endif
978 
979  return year;
980  }
981 #else
982  {
983  long m = (((long)0xff << 8*3) | ((long)0xff << 8*2)
984  | ((long)0xff << 8*1) | ((long)0xff << 8*0));
985  long z = (((long)'0' << 8*3) | ((long)'1' << 8*2)
986  | ((long)'0' << 8*1) | ((long)'1' << 8*0));
987  long v = ((date & ~m) | z);
988  time_t year = v;
989  return year;
990  }
991 #endif /*USE_TIME_FUNCTIONS*/
992 }
993 
994 static long
995 get_int_column_by_index(struct kmr_ntuple *u, struct COLUMN *columns, int nth)
996 {
997  assert(columns[nth].field == F_ZAHL);
998  struct kmr_ntuple_entry e = kmr_nth_ntuple(u, nth);
999  long v = *(long *)(e.p);
1000  return v;
1001 }
1002 
1003 static double
1004 get_real_column_by_index(struct kmr_ntuple *u, struct COLUMN *columns, int nth)
1005 {
1006  assert(columns[nth].field == F_REAL);
1007  struct kmr_ntuple_entry e = kmr_nth_ntuple(u, nth);
1008  double v = *(double *)(e.p);
1009  return v;
1010 }
1011 
1012 static double
1013 get_real_column(struct kmr_ntuple *u, struct RECORD *description,
1014  char *name)
1015 {
1016  int nth = column_index_by_name(description, name);
1017  assert(description->columns[nth].field == F_REAL);
1018  struct kmr_ntuple_entry e = kmr_nth_ntuple(u, nth);
1019  double v = *(double *)(e.p);
1020  return v;
1021 }
1022 
1023 #if 0
1024 static char *
1025 get_text_column_by_index(struct kmr_ntuple *u, struct COLUMN *columns, int nth)
1026 {
1027  assert(columns[nth].field == F_TEXT);
1028  struct kmr_ntuple_entry e = kmr_nth_ntuple(u, nth);
1029  char *v = e.p;
1030  return v;
1031 }
1032 #endif
1033 
1034 static time_t
1035 get_date_column_by_index(struct kmr_ntuple *u, struct COLUMN *columns, int nth)
1036 {
1037  assert(columns[nth].field == F_DATE);
1038  struct kmr_ntuple_entry e = kmr_nth_ntuple(u, nth);
1039  time_t v = *(time_t *)(e.p);
1040  return v;
1041 }
1042 
1043 static time_t
1044 get_date_column(struct kmr_ntuple *u, struct RECORD *description, char *name)
1045 {
1046  int nth = column_index_by_name(description, name);
1047  assert(description->columns[nth].field == F_DATE);
1048  struct kmr_ntuple_entry e = kmr_nth_ntuple(u, nth);
1049  time_t v = *(time_t *)(e.p);
1050  return v;
1051 }
1052 
1053 /* Scans a (newline-terminated) input line and fills an n-tuple U, in
1054  accordance with datatypes given by descriptions DESCS[column]. */
1055 
1056 static void
1057 scan_columns(KMR *mr, struct kmr_ntuple *u, int bufsz,
1058  char *line, size_t linesz, struct TABLE_INFO *tbl)
1059 {
1060  int cc;
1061 
1062  int marker = (int)tbl->name;
1063  struct RECORD *description = &(tbl->description);
1064  struct COLUMN *descs = description->columns;
1065  int ndescs = description->ncolumns;
1066 
1067  kmr_reset_ntuple(u, ndescs, marker);
1068  char * const end = (line + linesz);
1069  char *p = line;
1070  for (int i = 0; i < ndescs; i++) {
1071  char *s = p;
1072  while (s < end && *s != '|') {
1073  s++;
1074  }
1075  char * const q = ((s == end) ? 0 : s);
1076  if (q == 0) {
1077  fprintf(stderr, "Fewer fields in line (%s)\n", line);
1078  MPI_Abort(MPI_COMM_WORLD, 1);
1079  return;
1080  }
1081 
1082  assert(*q == '|');
1083  *q = 0;
1084  switch (descs[i].field) {
1085  default:
1086  case F_NIL:
1087  assert(NEVERHERE);
1088  break;
1089  case F_ZAHL: {
1090  long v;
1091  char gomi[4];
1092  cc = sscanf(p, "%ld%c", &v, gomi);
1093  if (cc != 1) {
1094  *q = '|';
1095  fprintf(stderr, "Bad integer in %d-th field in line (%s)\n",
1096  i, line);
1097  MPI_Abort(MPI_COMM_WORLD, 1);
1098  return;
1099  }
1100  kmr_put_ntuple(mr, u, bufsz, &v, (int)sizeof(v));
1101  break;
1102  }
1103  case F_REAL: {
1104  double v;
1105  char gomi[4];
1106  cc = sscanf(p, "%lf%c", &v, gomi);
1107  if (cc != 1) {
1108  *q = '|';
1109  fprintf(stderr, "Bad real in %d-th field in line (%s)\n",
1110  i, line);
1111  MPI_Abort(MPI_COMM_WORLD, 1);
1112  return;
1113  }
1114  kmr_put_ntuple(mr, u, bufsz, &v, (int)sizeof(v));
1115  break;
1116  }
1117  case F_TEXT: {
1118  char *v = p;
1119  int len = (int)(q - p);
1120  kmr_put_ntuple(mr, u, bufsz, v, len);
1121  break;
1122  }
1123  case F_DATE: {
1124  time_t tv = decode_date(p);
1125  if (tv == (time_t)-1) {
1126  *q = '|';
1127  fprintf(stderr, "Bad date in %d-th field in line (%s)\n",
1128  i, line);
1129  MPI_Abort(MPI_COMM_WORLD, 1);
1130  return;
1131  }
1132  long v = tv;
1133  kmr_put_ntuple(mr, u, bufsz, &v, (int)sizeof(v));
1134  break;
1135  }
1136  }
1137  *q = '|';
1138  p = (q + 1);
1139  }
1140  if (*p != '\n') {
1141  fprintf(stderr, "(warning) Extra characters in line (%s)\n", line);
1142  fflush(0);
1143  }
1144 }
1145 
1146 static int
1147 scan_line(const struct kmr_kv_box kv0,
1148  const KMR_KVS *kvi, KMR_KVS *kvo, void *p0,
1149  const long index)
1150 {
1151  char line[RECORD_SIZE];
1152  char vbuf[RECORD_SIZE];
1153  struct kmr_ntuple *v = (void *)vbuf;
1154 
1155  KMR *mr = kvo->c.mr;
1156 
1157  struct SCAN_OP *scanner = p0;
1158  struct TABLE_INFO *tbl = scanner->tbl;
1159  char *p = (void *)kv0.v.p;
1160  size_t linesz = (size_t)kv0.vlen;
1161 
1162  assert(linesz < sizeof(line));
1163  memcpy(line, p, linesz);
1164  line[linesz] = 0;
1165 
1166  scan_columns(mr, v, sizeof(vbuf), line, linesz, tbl);
1167 
1168  struct kmr_ntuple_entry key = kmr_nth_ntuple(v, 0);
1169  struct kmr_kv_box kv = {
1170  .klen = key.len,
1171  .k.p = key.p,
1172  .vlen = kmr_size_ntuple(v),
1173  .v.p = (void *)v
1174  };
1175 
1176  if (scanner->fn == 0) {
1177  kmr_add_kv(kvo, kv);
1178  } else {
1179  (*scanner->fn)(kv, kvi, kvo, scanner->arg, index);
1180  }
1181  return MPI_SUCCESS;
1182 }
1183 
1184 static void load_table_files_in_memory(int nprocs, int rank, char *directory,
1185  struct TABLE_INFO *tbl);
1186 static void load_one_table_file_in_memory(int nprocs, int rank,
1187  struct TABLE_INFO *tbl,
1188  char *filename, _Bool singlefile);
1189 
1190 static void
1191 load_input_tables(int nprocs, int rank, char *directory,
1192  enum TABLE *tbls, int ntbls)
1193 {
1194  assert(directory != 0);
1195 
1196  if (rank == 0) {
1197  printf("reading table files (in advance)...\n");
1198  fflush(0);
1199  }
1200  MPI_Barrier(MPI_COMM_WORLD);
1201  double t0 = wtime();
1202 
1203  for (int i = 0; i < ntbls; i++) {
1204  struct TABLE_INFO *tbl = find_table(tbls[i]);
1205  assert(tbl->data.nread != 0);
1206  load_table_files_in_memory(nprocs, rank, directory, tbl);
1207  }
1208 
1209  MPI_Barrier(MPI_COMM_WORLD);
1210  double t1 = wtime();
1211  if (rank == 0) {
1212  printf("reading table files (in advance) in %f sec\n", (t1 - t0));
1213  fflush(0);
1214  }
1215 }
1216 
1217 static void
1218 load_table_files_in_memory(int nprocs, int rank, char *directory,
1219  struct TABLE_INFO *tbl)
1220 {
1221  char filename[256];
1222  int cc;
1223 
1224  assert(directory != 0);
1225 
1226  if (tbl->data.nread == 0) {
1227  assert(NEVERHERE);
1228  } else if (tbl->data.nread == 1) {
1229  if (rank == 0) {
1230  cc = snprintf(filename, sizeof(filename), "%s/%s",
1231  directory, tbl->data.file);
1232  assert(cc < (int)sizeof(filename));
1233  load_one_table_file_in_memory(nprocs, rank, tbl, filename, 1);
1234  }
1235  } else if (tbl->data.nread == 2) {
1236  cc = snprintf(filename, sizeof(filename), "%s/%s",
1237  directory, tbl->data.file);
1238  assert(cc < (int)sizeof(filename));
1239  cc = access(filename, R_OK);
1240  if (cc == 0) {
1241  /* Single file. */
1242  if (rank == 0) {
1243  load_one_table_file_in_memory(nprocs, rank, tbl, filename, 1);
1244  }
1245  } else if (errno == ENOENT) {
1246  /* Multiple files, load by suffix for ranks. */
1247  int count = 0;
1248  for (int j = 0; j < 50; j++) {
1249  /* (Start from 1 because 1-origin). */
1250  int n = (1 + (j * nprocs) + rank);
1251  cc = snprintf(filename, sizeof(filename), "%s/%s.%d",
1252  directory, tbl->data.file, n);
1253  assert(cc < (int)sizeof(filename));
1254  cc = access(filename, R_OK);
1255  if (cc == 0) {
1256  load_one_table_file_in_memory(nprocs, rank, tbl, filename, 0);
1257  count++;
1258  } else if (errno == ENOENT) {
1259  break;
1260  } else {
1261  perror("access tbl file");
1262  MPI_Abort(MPI_COMM_WORLD, 1);
1263  return;
1264  }
1265  }
1266  } else {
1267  perror("access tbl file");
1268  MPI_Abort(MPI_COMM_WORLD, 1);
1269  return;
1270  }
1271  } else {
1272  assert(NEVERHERE);
1273  }
1274 }
1275 
1276 static void
1277 load_one_table_file_in_memory(int nprocs, int rank, struct TABLE_INFO *tbl,
1278  char *filename, _Bool singlefile)
1279 {
1280  int cc;
1281 
1282  int nth = tbl->data.nfiles;
1283  if (!(nth < tbl->data.nb)) {
1284  int nb = (singlefile ? 1 : (((nth + 1) + 7) & ~7));
1285  assert(nth < nb);
1286  void **bb = realloc(tbl->data.buffers, (sizeof(void *) * (size_t)nb));
1287  if (bb == 0) {
1288  perror("realloc tbl buffer");
1289  MPI_Abort(MPI_COMM_WORLD, 1);
1290  return;
1291  }
1292  for (int i = 0; i < nb; i++) {
1293  if (i < tbl->data.nfiles) {
1294  assert(bb[i] != 0);
1295  } else {
1296  bb[i] = 0;
1297  }
1298  }
1299  tbl->data.buffers = bb;
1300 
1301  size_t *ss = realloc(tbl->data.sizes, (sizeof(size_t) * (size_t)nb));
1302  if (ss == 0) {
1303  perror("realloc tbl buffer");
1304  MPI_Abort(MPI_COMM_WORLD, 1);
1305  return;
1306  }
1307  for (int i = 0; i < nb; i++) {
1308  if (i < tbl->data.nfiles) {
1309  assert(ss[i] != 0);
1310  } else {
1311  ss[i] = 0;
1312  }
1313  }
1314  tbl->data.sizes = ss;
1315 
1316  tbl->data.nb = nb;
1317  }
1318 
1319  assert((nth < tbl->data.nb) && (tbl->data.buffers != 0)
1320  && (tbl->data.buffers[nth] == 0));
1321 
1322  {
1323  int fd = open(filename, O_RDONLY, 0);
1324  if (fd == -1) {
1325  char ee[80];
1326  snprintf(ee, sizeof(ee), "open(%s) failed", filename);
1327  perror(ee);
1328  MPI_Abort(MPI_COMM_WORLD, 1);
1329  return;
1330  }
1331 
1332  struct stat s;
1333  cc = fstat(fd, &s);
1334  if (cc != 0) {
1335  char ee[80];
1336  snprintf(ee, sizeof(ee), "fstat(%s) failed", filename);
1337  perror(ee);
1338  MPI_Abort(MPI_COMM_WORLD, 1);
1339  return;
1340  }
1341 
1342  off_t fsz = s.st_size;
1343  assert(fsz > 0);
1344 
1345  /* TAKE CEILING FOR BUFFER SIZE. */
1346 
1347  size_t bsz = (size_t)(((fsz + 8) + (1024 - 1)) & (~(1024 - 1)));
1348  assert(bsz >= (size_t)fsz);
1349  char *b = malloc(bsz);
1350  if (b == 0) {
1351  char ee[80];
1352  snprintf(ee, sizeof(ee), "malloc(%ld) failed", fsz);
1353  perror(ee);
1354  MPI_Abort(MPI_COMM_WORLD, 1);
1355  return;
1356  }
1357 
1358  double t0 = wtime();
1359 
1360  off_t chunk = (8 * 1024 * 1024);
1361  off_t rd = 0;
1362  while (rd < fsz) {
1363  size_t rr = (size_t)MIN((fsz - rd), chunk);
1364  ssize_t cx = read(fd, (b + rd), rr);
1365  if (cx <= 0) {
1366  char ee[80];
1367  snprintf(ee, sizeof(ee), "read(%s) failed", filename);
1368  perror(ee);
1369  MPI_Abort(MPI_COMM_WORLD, 1);
1370  return;
1371  }
1372  rd += cx;
1373  }
1374  cc = close(fd);
1375  assert(cc == 0);
1376 
1377  double t1 = wtime();
1378 
1379  assert((nth == tbl->data.nfiles) && (tbl->data.buffers[nth] == 0));
1380  tbl->data.buffers[nth] = b;
1381  tbl->data.sizes[nth] = (size_t)fsz;
1382  tbl->data.nfiles++;
1383 
1384  if (report_time_to_read) {
1385  fprintf(stderr, "[%05d] reading (%s) sz=%ld in %f sec\n",
1386  rank, filename, fsz, (t1 - t0));
1387  fflush(0);
1388  }
1389  }
1390 
1391 #if 0
1392  long cnt = kvo->c.element_count;
1393  printf("[%d] reading partial table (%s) n=%ld %f sec\n",
1394  rank, filename, cnt, (t1 - t0));
1395  fflush(0);
1396 #endif
1397 
1398  return;
1399 }
1400 
1401 static void scan_table_in_memory(KMR_KVS *kvo, struct TABLE_INFO *tbl,
1402  struct SCAN_OP *scanner);
1403 
1404 static void
1405 scan_table_files(KMR_KVS *kvo, enum TABLE table, kmr_mapfn_t kvput, void *arg,
1406  struct RUN *run)
1407 {
1408  KMR *mr = kvo->c.mr;
1409 
1410  struct TABLE_INFO *tbl = find_table(table);
1411  assert(tbl->data.nread != 0);
1412 
1413  if (mr->rank == 0) {
1414  printf("scanning table file (%s)...\n", tbl->data.file);
1415  fflush(0);
1416  }
1417 
1418  double t0 = wtime();
1419 
1420  KMR_KVS *kvs0;
1421  if (run->redistribute_loaded_tables) {
1422  kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1423  } else {
1424  kvs0 = kvo;
1425  }
1426  struct SCAN_OP scanner = {
1427  .tbl = tbl,
1428  .fn = kvput,
1429  .arg = arg
1430  };
1431  scan_table_in_memory(kvs0, tbl, &scanner);
1432 
1433  if (run->redistribute_loaded_tables) {
1434  assert(kvs0 != kvo);
1435  kmr_distribute(kvs0, kvo, 0, kmr_noopt);
1436  }
1437 
1438  *(tbl->data.variable) = kvo;
1439 
1440  double t1 = wtime();
1441 
1442  ptime(kvo, 0, "scanning table file", tbl->data.file, (t1 - t0));
1443 }
1444 
1445 static void
1446 scan_table_in_memory(KMR_KVS *kvo, struct TABLE_INFO *tbl,
1447  struct SCAN_OP *scanner)
1448 {
1449  KMR *mr = kvo->c.mr;
1450 
1451  for (int i = 0; i < tbl->data.nfiles; i++) {
1452  void *b = tbl->data.buffers[i];
1453  size_t sz = tbl->data.sizes[i];
1454  struct kmr_option keepopen = {.keep_open = 1};
1455  kmr_map_getline_in_memory_(mr, b, sz, 0,
1456  kvo, scanner, keepopen, scan_line);
1457  }
1458  kmr_add_kv_done(kvo);
1459 }
1460 
1461 static void
1462 scan_table_files_in_advance(KMR *mr, enum TABLE *tbls, int ntbls,
1463  struct RUN *run)
1464 {
1465  if (mr->rank == 0) {
1466  printf("scanning tables (in advance)...\n"); fflush(0);
1467  }
1468  double t0 = wtime();
1469 
1470  for (int i = 0; i < ntbls; i++) {
1471  struct TABLE_INFO *tbl = find_table(tbls[i]);
1472  assert(tbl->data.nread != 0);
1473  KMR_KVS *kvs = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1474  scan_table_files(kvs, tbl->name, 0, 0, run);
1475  }
1476 
1477  double t1 = wtime();
1478  if (mr->rank == 0) {
1479  printf("scanning tables in %f sec\n", (t1 - t0)); fflush(0);
1480  }
1481 }
1482 
1483 /* Dumps an n-tuple U to a line buffer, in accordance with datatypes
1484  given by descriptions. */
1485 
1486 static void
1487 dump_line(KMR *mr, char *line, int linesz, struct kmr_ntuple *u,
1488  struct COLUMN descs[], int ndescs)
1489 {
1490  int cc;
1491  assert(u->n == ndescs);
1492  char *q;
1493  q = line;
1494  char *end = &line[linesz];
1495  for (int i = 0; i < ndescs; i++) {
1496  if (i > 0) {
1497  cc = snprintf(q, (size_t)(end - q), "|");
1498  q += cc;
1499  }
1500  struct kmr_ntuple_entry p = kmr_nth_ntuple(u, i);
1501  switch (descs[i].field) {
1502  default:
1503  case F_NIL:
1504  assert(NEVERHERE);
1505  break;
1506  case F_ZAHL: {
1507  assert(p.len == sizeof(long));
1508  long v = *(long *)(p.p);
1509  cc = snprintf(q, (size_t)(end - q), "%ld", v);
1510  if (cc < 0) {
1511  fprintf(stderr, "Bad integer in %d-th field\n", i);
1512  MPI_Abort(MPI_COMM_WORLD, 1);
1513  return;
1514  }
1515  q += cc;
1516  break;
1517  }
1518  case F_REAL: {
1519  assert(p.len == sizeof(double));
1520  double v = *(double *)(p.p);
1521  cc = snprintf(q, (size_t)(end - q), "%lf", v);
1522  if (cc < 0) {
1523  fprintf(stderr, "Bad real in %d-th field\n", i);
1524  MPI_Abort(MPI_COMM_WORLD, 1);
1525  return;
1526  }
1527  q += cc;
1528  break;
1529  }
1530  case F_TEXT: {
1531  char *v = p.p;
1532  assert((size_t)(p.len + 1) < (size_t)(end - q));
1533  cc = snprintf(q, (size_t)(p.len + 1), "%s", v);
1534  if (cc < 0) {
1535  fprintf(stderr, "Bad text in %d-th field\n", i);
1536  MPI_Abort(MPI_COMM_WORLD, 1);
1537  return;
1538  }
1539  /*assert(p.len <= cc);*/
1540  q += MIN(p.len, cc);
1541  break;
1542  }
1543  case F_DATE: {
1544  assert(p.len == sizeof(time_t));
1545  time_t *v = p.p;
1546  size_t cx = format_date(q, (size_t)(end - q), *v);
1547  if (cx == 0) {
1548  fprintf(stderr, "Bad date in %d-th field\n", i);
1549  MPI_Abort(MPI_COMM_WORLD, 1);
1550  return;
1551  }
1552  q += (int)cx;
1553  break;
1554  }
1555  }
1556  }
1557 }
1558 
1559 #if 0
1560 static int
1561 dump_kv(const struct kmr_kv_box kv0,
1562  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, const long i)
1563 {
1564  KMR *mr = kvi->c.mr;
1565  char line[RECORD_SIZE];
1566  char buf[RECORD_SIZE];
1567 
1568  struct kmr_ntuple *u = (void *)kv0.v.p;
1569  struct RECORD *d = find_description((enum TABLE)u->marker);
1570 
1571  if (d->nkeys == 1) {
1572  struct kmr_ntuple *k = (void *)buf;
1573  kmr_reset_ntuple(k, 1, -1);
1574  kmr_put_ntuple(mr, k, (int)sizeof(buf), kv0.k.p, kv0.klen);
1575  dump_line(kvi->c.mr, line, (int)sizeof(line), k,
1576  d->keys, d->nkeys);
1577  printf("%s:", line);
1578  } else {
1579  struct kmr_ntuple *k = (void *)kv0.k.p;
1580  dump_line(kvi->c.mr, line, (int)sizeof(line), k, d->keys, d->nkeys);
1581  printf("%s:", line);
1582  }
1583 
1584  dump_line(mr, line, (int)sizeof(line), u, d->columns, d->ncolumns);
1585  printf("%s\n", line);
1586  return MPI_SUCCESS;
1587 }
1588 #endif
1589 
1590 static int
1591 dump_value(const struct kmr_kv_box kv0,
1592  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, const long i)
1593 {
1594  KMR *mr = kvi->c.mr;
1595  char line[RECORD_SIZE];
1596 
1597  struct kmr_ntuple *u = (void *)kv0.v.p;
1598  struct RECORD *d = find_description((enum TABLE)u->marker);
1599 
1600  dump_line(mr, line, (int)sizeof(line), u, d->columns, d->ncolumns);
1601  //printf("%s [%d]\n", line, mr->rank);
1602  printf("%s\n", line);
1603  return MPI_SUCCESS;
1604 }
1605 
1606 static void
1607 dump_table(KMR_KVS *kvs, enum TABLE table)
1608 {
1609  struct RECORD *d = find_description(table);
1610  struct kmr_option inspect = {.nothreading = 1, .inspect = 1};
1611  kmr_map_rank_by_rank(kvs, 0, d, inspect, dump_value);
1612 }
1613 
1614 #define CREATE_KVS(MR, KEY, PUSHOFF) \
1615  create_kvs1((MR), (KEY), KMR_KV_OPAQUE, \
1616  (PUSHOFF), __FILE__, __LINE__, __func__)
1617 
1618 static KMR_KVS *
1619 create_kvs1(KMR *mr, enum kmr_kv_field kf, enum kmr_kv_field vf,
1620  _Bool pushoff,
1621  const char *file, const int line, const char *func)
1622 {
1623  KMR_KVS *kvs;
1624  if (pushoff) {
1625  kvs = kmr_create_pushoff_kvs(mr, kf, KMR_KV_OPAQUE,
1626  kmr_noopt, file, line, func);
1627  } else {
1628  kvs = kmr_create_kvs7(mr, kf, KMR_KV_OPAQUE,
1629  kmr_noopt, file, line, func);
1630  }
1631  return kvs;
1632 }
1633 
1634 /* Chooses the key by nth of the n-tuple. */
1635 
1636 static int
1637 key_by_nth(const struct kmr_kv_box kv0,
1638  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, const long i)
1639 {
1640  int *nth = p;
1641  struct kmr_ntuple *u = (void *)kv0.v.p;
1642  struct kmr_ntuple_entry e = kmr_nth_ntuple(u, *nth);
1643  struct kmr_kv_box kv = {
1644  .klen = e.len,
1645  .k.p = e.p,
1646  .vlen = kv0.vlen,
1647  .v.p = kv0.v.p
1648  };
1649  kmr_add_kv(kvo, kv);
1650  return MPI_SUCCESS;
1651 }
1652 
1653 /* Just selects fields as specified (without calculations). */
1654 
1655 static int
1656 select_by_fields(const struct kmr_kv_box kv0,
1657  const KMR_KVS *kvi, KMR_KVS *kvo,
1658  void *p, const long i)
1659 {
1660  char kbuf[RECORD_SIZE];
1661  struct kmr_ntuple *k = (void *)kbuf;
1662  char vbuf[RECORD_SIZE];
1663  struct kmr_ntuple *v = (void *)vbuf;
1664 
1665  struct SELECT *selector = p;
1666  KMR *mr = kvo->c.mr;
1667  struct kmr_ntuple *u = (void *)kv0.v.p;
1668  assert(u->marker == (int)selector->input);
1669 
1670  int klen;
1671  void *kval;
1672  if (selector->nkeys == 0) {
1673  klen = 0;
1674  kval = 0;
1675  } else if (selector->nkeys == 1) {
1676  struct kmr_ntuple_entry e = kmr_nth_ntuple(u, selector->keys[0]);
1677  klen = e.len;
1678  kval = e.p;
1679  } else {
1680  kmr_reset_ntuple(k, selector->nkeys, 0);
1681  put_columns_by_indexes(mr, k, sizeof(kbuf), u,
1682  selector->keys, selector->nkeys);
1683  klen = kmr_size_ntuple(k);
1684  kval = k;
1685  }
1686 
1687  kmr_reset_ntuple(v, selector->ncolumns, selector->output);
1688  put_columns_by_indexes(mr, v, sizeof(vbuf), u,
1689  selector->columns, selector->ncolumns);
1690 
1691  struct kmr_kv_box kv = {
1692  .klen = klen,
1693  .k.p = kval,
1694  .vlen = kmr_size_ntuple(v),
1695  .v.p = (void *)v
1696  };
1697  kmr_add_kv(kvo, kv);
1698 
1699  return MPI_SUCCESS;
1700 }
1701 
1702 /* Just joins fields as specified (without calculations). */
1703 
1704 static int
1705 join_by_fields(const struct kmr_kv_box kv[], const long n,
1706  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
1707 {
1708  struct PRODUCT *producer = p;
1709  struct kmr_ntuple **vv[2];
1710  long cnt[2];
1711  kmr_separate_ntuples(kvo->c.mr, kv, n, vv, cnt, producer->inputs, 1);
1712  kmr_product_ntuples(kvo, vv, cnt, producer->output,
1713  producer->columns, producer->ncolumns,
1714  producer->keys, producer->nkeys);
1715 
1716  if (producer->trace_product) {
1717  fprintf(stderr, "prod %ld %ld\n", cnt[0], cnt[1]); fflush(0);
1718  }
1719  if (producer->trace_product_nonempty && cnt[0] != 0 && cnt[1] != 0) {
1720  fprintf(stderr, "prod %ld %ld\n", cnt[0], cnt[1]); fflush(0);
1721  }
1722  assert((!producer->cnt0_zero || cnt[0] == 0)
1723  && (!producer->cnt0_one || cnt[0] == 1)
1724  && (!producer->cnt0_zero_one || (cnt[0] == 0 || cnt[0] == 1))
1725  && (!producer->cnt0_nonzero || cnt[0] != 0));
1726  assert((!producer->cnt1_zero || cnt[1] == 0)
1727  && (!producer->cnt1_one || cnt[1] == 1)
1728  && (!producer->cnt1_zero_one || (cnt[1] == 0 || cnt[1] == 1))
1729  && (!producer->cnt1_nonzero || cnt[1] != 0));
1730 
1731  free(vv[0]);
1732  free(vv[1]);
1733  return MPI_SUCCESS;
1734 }
1735 
1736 static KMR_KVS *
1737 JOIN1(KMR_KVS *input0, enum kmr_kv_field outputkf,
1738  kmr_redfn_t join, void *arg, char *msg, _Bool pushoff)
1739 {
1740  KMR *mr = input0->c.mr;
1741  /*char msg[80];*/
1742  /*snprintf(msg, sizeof(msg), "join (%s)", m);*/
1743 
1744  ptime(input0, 0, 0, 0, 0.0);
1745  double t0 = wtime();
1746  KMR_KVS *kvs1 = CREATE_KVS(mr, input0->c.key_data, 0);
1747  kmr_shuffle(input0, kvs1, kmr_noopt);
1748  double t1 = wtime();
1749  ptime(kvs1, 0, "shuffle", msg, (t1 - t0));
1750  pcount(kvs1, 0, msg, 0);
1751  KMR_KVS *kvs2 = CREATE_KVS(mr, outputkf, pushoff);
1752  kmr_reduce(kvs1, kvs2, arg, kmr_noopt, join);
1753  pcount(kvs2, 0, msg, 1);
1754  return kvs2;
1755 }
1756 
1757 static inline KMR_KVS *
1758 JOIN2(KMR_KVS *input0, KMR_KVS *input1, enum kmr_kv_field outputkf,
1759  kmr_redfn_t join, void *arg, char *m, _Bool pushoff)
1760 {
1761  KMR *mr = input0->c.mr;
1762  char msg[80];
1763  snprintf(msg, sizeof(msg), "join (%s)", m);
1764 
1765  ptime(input0, input1, 0, 0, 0.0);
1766  double t0 = wtime();
1767  assert(input0->c.key_data == input1->c.key_data);
1768  enum kmr_kv_field inputkf = input0->c.key_data;
1769  KMR_KVS *kvs0 = CREATE_KVS(mr, inputkf, 0);
1770  kmr_shuffle(input0, kvs0, kmr_noopt);
1771  KMR_KVS *kvs1 = CREATE_KVS(mr, inputkf, 0);
1772  kmr_shuffle(input1, kvs1, kmr_noopt);
1773  double t1 = wtime();
1774  ptime(kvs0, kvs1, "shuffle", m, (t1 - t0));
1775  KMR_KVS *vec[] = {kvs0, kvs1};
1776  KMR_KVS *kvs2 = CREATE_KVS(mr, inputkf, 0);
1777  kmr_concatenate_kvs(vec, 2, kvs2, kmr_noopt);
1778  pcount(kvs2, 0, msg, 0);
1779  KMR_KVS *kvs3 = CREATE_KVS(mr, outputkf, pushoff);
1780  kmr_reduce(kvs2, kvs3, arg, kmr_noopt, join);
1781  pcount(kvs3, 0, msg, 1);
1782  return kvs3;
1783 }
1784 
1785 static inline KMR_KVS *
1786 JOINP(KMR_KVS *input0, KMR_KVS *input1, enum kmr_kv_field outputkf,
1787  struct PRODUCT *join, char *m, _Bool pushoff)
1788 {
1789  KMR *mr = input0->c.mr;
1790  char msg[80];
1791  snprintf(msg, sizeof(msg), "join (%s)", m);
1792 
1793  ptime(input0, input1, 0, 0, 0.0);
1794  double t0 = wtime();
1795  assert(input0->c.key_data == input1->c.key_data);
1796  enum kmr_kv_field inputkf = input0->c.key_data;
1797  KMR_KVS *kvs0 = CREATE_KVS(mr, inputkf, 0);
1798  kmr_shuffle(input0, kvs0, kmr_noopt);
1799  KMR_KVS *kvs1 = CREATE_KVS(mr, inputkf, 0);
1800  kmr_shuffle(input1, kvs1, kmr_noopt);
1801  double t1 = wtime();
1802  ptime(kvs0, kvs1, "shuffle", m, (t1 - t0));
1803  KMR_KVS *vec[] = {kvs0, kvs1};
1804  KMR_KVS *kvs2 = CREATE_KVS(mr, inputkf, 0);
1805  kmr_concatenate_kvs(vec, 2, kvs2, kmr_noopt);
1806  pcount(kvs2, 0, msg, 0);
1807  KMR_KVS *kvs3 = CREATE_KVS(mr, outputkf, pushoff);
1808  kmr_reduce(kvs2, kvs3, join, kmr_noopt, join_by_fields);
1809  pcount(kvs3, 0, msg, 1);
1810  return kvs3;
1811 }
1812 
1813 /* ================================================================ */
1814 
1815 /* QUERY (Q7)
1816 
1817  VALIDATION-RUN PARAMETERS (NATION1=FRANCE, NATION2=GERMANY)
1818 
1819  [QUERY#0]
1820 
1821  select: shipping = (supp_nation, cust_nation, l_year, volume)
1822  supp_nation = n1.n_name,
1823  cust_nation = n2.n_name,
1824  l_year = extract(year from l_shipdate),
1825  volume = (l_extendedprice * (1 - l_discount))
1826 
1827  from: n1 = nation, n2 = nation
1828 
1829  where:
1830  s_suppkey = l_suppkey
1831  && o_orderkey = l_orderkey
1832  && c_custkey = o_custkey
1833  && s_nationkey = n1.n_nationkey
1834  && c_nationkey = n2.n_nationkey
1835  && ((n1.n_name = 'FRANCE' && n2.n_name = 'GERMANY')
1836  || (n1.n_name = 'GERMANY' && n2.n_name = 'FRANCE'))
1837  && l_shipdate between(date '1995-01-01', date '1996-12-31')
1838 
1839  [QUERY#1]
1840 
1841  select: (supp_nation, cust_nation, l_year, revenue)
1842  group-by: (supp_nation, cust_nation, l_year)
1843  revenue = sum(volume)
1844 
1845  from: shipping
1846 
1847  [QUERY#2]
1848 
1849  select: (supp_nation, cust_nation, l_year, revenue)
1850  order-by: (supp_nation, cust_nation, l_year)
1851 
1852  from: shipping
1853 
1854  [SCHEDULE#0]
1855 
1856  (N+N):
1857  select: (n1.n_name = 'FRANCE' && n2.n_name = 'GERMANY') and the reverse
1858  output: NN: {n1.n_nationkey*, n1.n_name, n2.n_nationkey, n2.n_name}
1859 
1860  (S):
1861  output: S1 = {s_nationkey*, s_suppkey}
1862 
1863  (S1+NN):
1864  select: s_nationkey = n1.n_nationkey
1865  output: NNS = {n2.n_nationkey*, s_suppkey*, n1.n_name, n2.n_name}
1866 
1867  (C):
1868  output: C1 = {c_custkey*, c_nationkey}
1869 
1870  (O):
1871  output: O1 = {o_custkey*, o_orderkey}
1872 
1873  (C+O):
1874  select: c_custkey = o_custkey
1875  output: CO = {o_orderkey*, c_nationkey}
1876 
1877  (L):
1878  select: (l_shipdate between(date '1995-01-01', date '1996-12-31')
1879  output: L1 = {l_orderkey*, l_suppkey, year, volume}
1880 
1881  (L+CO):
1882  select: o_orderkey = l_orderkey
1883  output: CLO = {c_nationkey*, l_suppkey*, year, volume}
1884 
1885  (CLO+NNS):
1886  select: c_nationkey = n2.n_nationkey, s_suppkey = l_suppkey
1887  output: CLNNOS = {n1.n_name*, n2.n_name*, year*, volume}
1888 
1889  (CLNNOS):
1890  let: revenue = sum(volume)
1891  output: CLNNOS1 = {n1.n_name*, n2.n_name*, year*, revenue} */
1892 
1893 static struct SELECT q7_select_s = {
1894  .input = TAB_S,
1895  .output = TAB_Q7_S1,
1896  /*{s_nationkey*, s_suppkey}*/
1897  .ncolumns = 2,
1898  .columns = {3, 0},
1899  .nkeys = 1,
1900  .keys = {3}
1901 };
1902 
1903 static struct SELECT q7_select_c = {
1904  .input = TAB_C,
1905  .output = TAB_Q7_C1,
1906  /*{c_custkey*, c_nationkey}*/
1907  .ncolumns = 2,
1908  .columns = {0, 3},
1909  .nkeys = 1,
1910  .keys = {0}
1911 };
1912 
1913 static struct SELECT q7_select_o = {
1914  .input = TAB_O,
1915  .output = TAB_Q7_O1,
1916  /*{o_custkey*, o_orderkey}*/
1917  .ncolumns = 2,
1918  .columns = {1, 0},
1919  .nkeys = 1,
1920  .keys = {1}
1921 };
1922 
1923 static struct PRODUCT q7_join_nn_s = {
1924  .inputs = {TAB_Q7_S1, TAB_Q7_NN},
1925  .output = TAB_Q7_NNS,
1926  /*{n2.n_nationkey*, s_suppkey*, n1.n_name, n2.n_name}*/
1927  .ncolumns = 4,
1928  .columns = {{2, SND}, {1, FST}, {1, SND}, {3, SND}},
1929  .nkeys = 2,
1930  .keys = {{2, SND}, {1, FST}},
1931  .trace_product = 0
1932 };
1933 
1934 static struct PRODUCT q7_join_c_o = {
1935  .inputs = {TAB_Q7_C1, TAB_Q7_O1},
1936  .output = TAB_Q7_CO,
1937  /*{o_orderkey*, c_nationkey}*/
1938  .ncolumns = 2,
1939  .columns = {{1, SND}, {1, FST}},
1940  .nkeys = 1,
1941  .keys = {{1, SND}},
1942  .trace_product = 0
1943 };
1944 
1945 static struct PRODUCT q7_join_l_co = {
1946  .inputs = {TAB_Q7_L1, TAB_Q7_CO},
1947  .output = TAB_Q7_CLO,
1948  /*{c_nationkey*, l_suppkey*, year, volume}*/
1949  .ncolumns = 4,
1950  .columns = {{1, SND}, {1, FST}, {2, FST}, {3, FST}},
1951  .nkeys = 2,
1952  .keys = {{1, SND}, {1, FST}},
1953  .cnt1_one = 1,
1954  .trace_product_nonempty = 0
1955 };
1956 
1957 struct PRODUCT q7_join_clo_nns = {
1958  .inputs = {TAB_Q7_CLO, TAB_Q7_NNS},
1959  .output = TAB_Q7_CLNNOS0,
1960  /*{n1.n_name*, n2.n_name*, year*, volume}*/
1961  .ncolumns = 4,
1962  .columns = {{2, SND}, {3, SND}, {2, FST}, {3, FST}},
1963  .nkeys = 0,
1964  .keys = {{0, FST}},
1965  //.cnt0_nonzero = 1,
1966  .cnt1_zero_one = 1,
1967  .trace_product_nonempty = 0
1968 };
1969 
1970 static int
1971 q7_select_nations(const struct kmr_kv_box kv0, const KMR_KVS *kvi,
1972  KMR_KVS *kvo, void *p, const long i)
1973 {
1974  char kbuf[RECORD_SIZE];
1975  struct kmr_ntuple *k = (void *)kbuf;
1976  char vbuf[RECORD_SIZE];
1977  struct kmr_ntuple *v = (void *)vbuf;
1978 
1979  enum TABLE input = TAB_N;
1980  enum TABLE output = TAB_Q7_N1;
1981  /*{n_nationkey, n_name}*/
1982  struct RECORD *d = find_description(input);
1983 
1984  KMR *mr = kvo->c.mr;
1985  struct kmr_ntuple *u = (void *)kv0.v.p;
1986  struct kmr_ntuple_entry name = column_by_name(u, d, "n_name");
1987 
1988  char *s0 = "FRANCE";
1989  char *s1 = "GERMANY";
1990  if (strncmp((char *)name.p, s0, strlen(s0)) == 0
1991  || strncmp((char *)name.p, s1, strlen(s1)) == 0) {
1992  char *cols[] = {"n_nationkey", "n_name"};
1993 
1994  kmr_reset_ntuple(k, 0, 0);
1995  kmr_reset_ntuple(v, 2, output);
1996  put_columns_by_names(mr, v, sizeof(vbuf), u, d, cols, 2);
1997  add_record(kvo, k, v);
1998  }
1999 
2000  return MPI_SUCCESS;
2001 }
2002 
2003 static int
2004 q7_pair_names(const struct kmr_kv_box kv[], const long n,
2005  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
2006 {
2007  char kbuf[RECORD_SIZE];
2008  struct kmr_ntuple *k = (void *)kbuf;
2009  char vbuf[RECORD_SIZE];
2010  struct kmr_ntuple *v = (void *)vbuf;
2011 
2012  enum TABLE input = TAB_N;
2013  enum TABLE output = TAB_Q7_NN;
2014  /*{n1.n_nationkey* n1.n_nationkey, n1.n_name, n2.n_nationkey, n2.n_name}*/
2015  struct RECORD *d = find_description(input);
2016  char *cols[] = {"n_nationkey", "n_name"};
2017 
2018  KMR *mr = kvo->c.mr;
2019  assert(n == 2);
2020  struct kmr_ntuple *u0 = (void *)kv[0].v.p;
2021  struct kmr_ntuple *u1 = (void *)kv[1].v.p;
2022 
2023  {
2024  struct kmr_ntuple_entry key0 = column_by_name(u0, d, "n_nationkey");
2025 
2026  kmr_reset_ntuple(k, 1, 0);
2027  kmr_put_ntuple_entry(mr, k, (int)sizeof(kbuf), key0);
2028  kmr_reset_ntuple(v, 4, output);
2029  put_columns_by_names(mr, v, sizeof(vbuf), u0, d, cols, 2);
2030  put_columns_by_names(mr, v, sizeof(vbuf), u1, d, cols, 2);
2031  add_record(kvo, k, v);
2032  }
2033 
2034  {
2035  struct kmr_ntuple_entry key1 = column_by_name(u1, d, "n_nationkey");
2036 
2037  kmr_reset_ntuple(k, 1, 0);
2038  kmr_put_ntuple_entry(mr, k, (int)sizeof(kbuf), key1);
2039  kmr_reset_ntuple(v, 4, output);
2040  put_columns_by_names(mr, v, sizeof(vbuf), u1, d, cols, 2);
2041  put_columns_by_names(mr, v, sizeof(vbuf), u0, d, cols, 2);
2042  add_record(kvo, k, v);
2043  }
2044 
2045  return MPI_SUCCESS;
2046 }
2047 
2048 /* (l_shipdate between(date '1995-01-01', date '1996-12-31'). */
2049 
2050 static int
2051 q7_select_by_date(const struct kmr_kv_box kv0,
2052  const KMR_KVS *kvi, KMR_KVS *kvo,
2053  void *p, const long i)
2054 {
2055  char kbuf[RECORD_SIZE];
2056  struct kmr_ntuple *k = (void *)kbuf;
2057  char vbuf[RECORD_SIZE];
2058  struct kmr_ntuple *v = (void *)vbuf;
2059 
2060  enum TABLE input = TAB_L;
2061  enum TABLE output = TAB_Q7_L1;
2062  /*{l_orderkey*, l_suppkey, year, volume}*/
2063  struct RECORD *d = find_description(input);
2064  struct RECORD *dx = find_description(output);
2065 
2066  time_t *dt = p;
2067  KMR *mr = kvo->c.mr;
2068 
2069  struct kmr_ntuple *u = (void *)kv0.v.p;
2070  assert(u->marker == (int)input);
2071 
2072  time_t shipdate = get_date_column(u, d, "l_shipdate");
2073  if (dt[0] <= shipdate && shipdate <= dt[1]) {
2074  struct kmr_ntuple_entry orderkey = column_by_name(u, d, "l_orderkey");
2075  struct kmr_ntuple_entry suppkey = column_by_name(u, d, "l_suppkey");
2076 
2077  time_t year = year_value(shipdate);
2078  long iyear = year;
2079  double extendedprice = get_real_column(u, d, "l_extendedprice");
2080  double discount = get_real_column(u, d, "l_discount");
2081  double volume = (extendedprice * (1 - discount));
2082 
2083  kmr_reset_ntuple(k, 1, 0);
2084  kmr_put_ntuple_entry(mr, k, (int)sizeof(kbuf), orderkey);
2085  kmr_reset_ntuple(v, dx->ncolumns, output);
2086  kmr_put_ntuple_entry(mr, v, (int)sizeof(vbuf), orderkey);
2087  kmr_put_ntuple_entry(mr, v, (int)sizeof(vbuf), suppkey);
2088  kmr_put_ntuple(mr, v, (int)sizeof(vbuf), &iyear, sizeof(iyear));
2089  kmr_put_ntuple(mr, v, (int)sizeof(vbuf), &volume, sizeof(volume));
2090  add_record(kvo, k, v);
2091  }
2092 
2093  return MPI_SUCCESS;
2094 }
2095 
2096 static int
2097 q7_make_sort_keys(const struct kmr_kv_box kv0,
2098  const KMR_KVS *kvi, KMR_KVS *kvo,
2099  void *p, const long i)
2100 {
2101  char kbuf[RECORD_SIZE];
2102  struct kmr_ntuple *k = (void *)kbuf;
2103 
2104  enum TABLE input = TAB_Q7_CLNNOS0;
2105  /*enum TABLE output = TAB_Q7_CLNNOS1;*/
2106  struct RECORD *d = find_description(input);
2107 
2108  KMR *mr = kvo->c.mr;
2109  struct kmr_ntuple *u = (void *)kv0.v.p;
2110  assert(u->marker == (int)input);
2111 
2112  struct kmr_ntuple_entry name1 = kmr_nth_ntuple(u, 0);
2113  struct kmr_ntuple_entry name2 = kmr_nth_ntuple(u, 1);
2114  time_t year = get_date_column_by_index(u, d->columns, 2);
2115 
2116  char nbuf[NAME_SIZE];
2117  assert((name1.len + name2.len) <= NAME_SIZE);
2118  memset(nbuf, 0, sizeof(nbuf));
2119  memcpy(&nbuf[0], name1.p, name1.len);
2120  memcpy(&nbuf[name1.len], name2.p, name2.len);
2121  uint64_t beyear = htonll_((uint64_t)(year));
2122 
2123  kmr_reset_ntuple(k, 2, 0);
2124  kmr_put_ntuple(mr, k, (int)sizeof(kbuf), nbuf, NAME_SIZE);
2125  kmr_put_ntuple(mr, k, (int)sizeof(kbuf), &beyear, sizeof(beyear));
2126  add_record(kvo, k, (void *)kv0.v.p);
2127 
2128  return MPI_SUCCESS;
2129 }
2130 
2131 static int
2132 q7_sum_volume(const struct kmr_kv_box kv[], const long n,
2133  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
2134 {
2135  char vbuf[RECORD_SIZE];
2136  struct kmr_ntuple *v = (void *)vbuf;
2137 
2138  enum TABLE input = TAB_Q7_CLNNOS1;
2139  enum TABLE output = TAB_Q7_REVENUE;
2140  struct RECORD *d = find_description(input);
2141 
2142  KMR *mr = kvo->c.mr;
2143  struct kmr_ntuple *u0 = (void *)kv[0].v.p;
2144  assert(u0->marker == TAB_Q7_CLNNOS0);
2145 
2146  double revenue;
2147  revenue = 0.0;
2148  for (long i = 0; i < n; i++) {
2149  struct kmr_ntuple *u = (void *)kv[i].v.p;
2150  double volume = get_real_column_by_index(u, d->columns, 3);
2151  revenue += volume;
2152  }
2153 
2154  int cols[] = {0, 1, 2};
2155 
2156  assert(d->nkeys >= 2);
2157  kmr_reset_ntuple(v, 4, output);
2158  put_columns_by_indexes(mr, v, sizeof(vbuf), u0, cols, 3);
2159  kmr_put_ntuple(mr, v, (int)sizeof(vbuf), &revenue, sizeof(revenue));
2160  add_record(kvo, (void *)kv[0].k.p, v);
2161 
2162  return MPI_SUCCESS;
2163 }
2164 
2165 static KMR_KVS *
2166 q7(KMR *mr, struct RUN *run)
2167 {
2168  _Bool pushoff = ((run->pushoff == 0) ? 0 : 1);
2169  struct kmr_option rankzero = {.rank_zero = 1};
2170 
2171  if (pushoff) {
2172  if (mr->rank == 0) {printf("q7 (with push-off)...\n"); fflush(0);}
2173  } else {
2174  if (mr->rank == 0) {printf("q7...\n"); fflush(0);}
2175  }
2176 
2177  /*(N+N)*/
2178 
2179  if (mr->rank == 0) {printf("q7 (n+n)...\n"); fflush(0);}
2180 
2181  if (run->load_tables_in_advance) {
2182  /*empty*/
2183  } else {
2184  KMR_KVS *n_ = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
2185  scan_table_files(n_, TAB_N, 0, 0, run);
2186  }
2187 
2188  KMR_KVS *n1 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
2189  kmr_map(N0, n1, 0, kmr_noopt, q7_select_nations);
2190  KMR_KVS *n2 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
2191  kmr_replicate(n1, n2, rankzero);
2192  pcount(n2, 0, "join (n+n)", 0);
2193  KMR_KVS *nn = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
2194  kmr_reduce(n2, nn, 0, kmr_noopt, q7_pair_names);
2195  pcount(nn, 0, "join (n+n)", 1);
2196 
2197  if (mr->rank == 0) {printf("q7 (s)...\n"); fflush(0);}
2198 
2199  KMR_KVS *s1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2200  if (run->load_tables_in_advance) {
2201  kmr_map(S0, s1, &q7_select_s, kmr_noopt, select_by_fields);
2202  } else {
2203  scan_table_files(s1, TAB_S, select_by_fields, &q7_select_s, run);
2204  }
2205 
2206  if (mr->rank == 0) {printf("q7 (nn+s)...\n"); fflush(0);}
2207 
2208  KMR_KVS *nns = JOINP(s1, nn, KMR_KV_OPAQUE,
2209  &q7_join_nn_s, "nn+s", pushoff);
2210 
2211  //dump_table(nns, TAB_Q7_NNS);
2212 
2213  if (mr->rank == 0) {printf("q7 (c)...\n"); fflush(0);}
2214 
2215  KMR_KVS *c1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2216  if (run->load_tables_in_advance) {
2217  kmr_map(C0, c1, &q7_select_c, kmr_noopt, select_by_fields);
2218  } else {
2219  scan_table_files(c1, TAB_C, select_by_fields, &q7_select_c, run);
2220  }
2221 
2222  if (mr->rank == 0) {printf("q7 (o)...\n"); fflush(0);}
2223 
2224  KMR_KVS *o1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2225  if (run->load_tables_in_advance) {
2226  kmr_map(O0, o1, &q7_select_o, kmr_noopt, select_by_fields);
2227  } else {
2228  scan_table_files(o1, TAB_O, select_by_fields, &q7_select_o, run);
2229  }
2230 
2231  if (mr->rank == 0) {printf("q7 (c+o)...\n"); fflush(0);}
2232 
2233  KMR_KVS *co = JOINP(c1, o1, KMR_KV_OPAQUE,
2234  &q7_join_c_o, "c+o", pushoff);
2235 
2236  if (mr->rank == 0) {printf("q7 (l)...\n"); fflush(0);}
2237 
2238  time_t tv[2];
2239  tv[0] = decode_date("1995-01-01");
2240  tv[1] = decode_date("1996-12-31");
2241  assert((tv[0] != (time_t)-1) && (tv[1] != (time_t)-1));
2242  KMR_KVS *l1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2243  if (run->load_tables_in_advance) {
2244  kmr_map(L0, l1, tv, kmr_noopt, q7_select_by_date);
2245  } else {
2246  scan_table_files(l1, TAB_L, q7_select_by_date, tv, run);
2247  }
2248 
2249  //dump_table(l1, TAB_Q7_L1);
2250 
2251  if (mr->rank == 0) {printf("q7 (co+l)...\n"); fflush(0);}
2252 
2253  KMR_KVS *clo = JOINP(l1, co, KMR_KV_OPAQUE,
2254  &q7_join_l_co, "co+l", pushoff);
2255 
2256  //dump_table(clo, TAB_Q7_CLO);
2257 
2258  if (mr->rank == 0) {printf("q7 (clo+nns)...\n"); fflush(0);}
2259 
2260  KMR_KVS *clnnos2 = JOINP(clo, nns, KMR_KV_OPAQUE,
2261  &q7_join_clo_nns, "clo+nns", 0);
2262  KMR_KVS *clnnos3 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2263  kmr_map(clnnos2, clnnos3, 0, kmr_noopt, q7_make_sort_keys);
2264 
2265  KMR_KVS *revenue0 = JOIN1(clnnos3, KMR_KV_OPAQUE,
2266  q7_sum_volume, 0, "sum(volume)", 0);
2267  KMR_KVS *revenue1 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
2268  kmr_sort(revenue0, revenue1, kmr_noopt);
2269 
2270  return revenue1;
2271 }
2272 
2273 /* ================================================================ */
2274 
2275 /* QUERY (Q9)
2276 
2277  VALIDATION-RUN PARAMETERS (COLOR=green)
2278 
2279  [QUERY#0]
2280 
2281  select: (nation, o_year, amount)
2282  nation = n_name
2283  o_year = extract(year from o_orderdate)
2284  amount = (l_extendedprice*(1-l_discount)-ps_supplycost*l_quantity)
2285 
2286  where:
2287  s_suppkey = l_suppkey
2288  && ps_suppkey = l_suppkey
2289  && ps_partkey = l_partkey
2290  && p_partkey = l_partkey
2291  && o_orderkey = l_orderkey
2292  && s_nationkey = n_nationkey
2293  && p_name = '%green%'
2294 
2295  [QUERY#1]
2296 
2297  select: (nation, o_year, sum_profit)
2298  o_year = extract(year, o_orderdate)
2299  sum_profit = sum(amount)
2300  group-by: (nation, o_year)
2301 
2302  [QUERY#2]
2303 
2304  select: (nation, o_year, sum_profit)
2305  order-by: (nation, o_year descending)
2306 
2307  (P+H):
2308  select: ps_partkey* = p_partkey* (= l_partkey) && (p_name like '%%')
2309  output: PH = {ps_partkey, ps_suppkey*, ps_supplycost}
2310 
2311  (S+N):
2312  select: s_nationkey = n_nationkey*
2313  output: SN = {n_name, s_suppkey*}
2314 
2315  ((P+H)+(S+N)):
2316  select: s_suppkey = ps_suppkey (= l_suppkey)
2317  output: PHSN = {n_name, ps_partkey*, ps_suppkey*, ps_supplycost}
2318 
2319  (L+O):
2320  select: o_orderkey* = l_orderkey*
2321  output: LO = {l_discount, l_extendedprice, l_partkey*, l_quantity,
2322  l_suppkey*, o_orderdate}
2323 
2324  (((P+H)+(S+N))+(L+O)):
2325  select: (ps_partkey, ps_suppkey) = (l_partkey, l_suppkey)
2326  let: amount = (l_extendedprice*(1-l_discount)-ps_supplycost*l_quantity)
2327  output: LHNOPS = {n_name, o_orderdate, amount} */
2328 
2329 static struct PRODUCT q9_join_p_h = {
2330  .inputs = {TAB_P, TAB_H},
2331  .output = TAB_Q9_PH,
2332  /*{ps_partkey, ps_suppkey*, ps_supplycost}*/
2333  .ncolumns = 3,
2334  .columns = {{0, SND}, {1, SND}, {3, SND}},
2335  .nkeys = 1,
2336  .keys = {{1, SND}},
2337  /*assert(cnt[0] == 0 || (cnt[0] == 1 && cnt[1] != 0));*/
2338  .cnt0_zero_one = 1,
2339  .trace_product = 0
2340 };
2341 
2342 static struct PRODUCT q9_join_s_n = {
2343  .inputs = {TAB_S, TAB_N},
2344  .output = TAB_Q9_NS,
2345  /*{n_name, s_suppkey*}*/
2346  .ncolumns = 2,
2347  .columns = {{1, SND}, {0, FST}},
2348  .nkeys = 1,
2349  .keys = {{0, FST}},
2350  /*assert(cnt[0] != 0 && cnt[1] == 1);*/
2351  .trace_product = 0
2352 };
2353 
2354 static struct PRODUCT q9_join_hp_ns = {
2355  .inputs = {TAB_Q9_PH, TAB_Q9_NS},
2356  .output = TAB_Q9_PHSN,
2357  /*{n_name, ps_partkey*, ps_suppkey*, ps_supplycost}*/
2358  .ncolumns = 4,
2359  .columns = {{0, SND}, {0, FST}, {1, FST}, {2, FST}},
2360  .nkeys = 2,
2361  .keys = {{0, FST}, {1, FST}},
2362  /*assert(cnt[0] == 0 || (cnt[0] != 0 && cnt[1] == 1));*/
2363  .trace_product = 0
2364 };
2365 
2366 static struct PRODUCT q9_join_l_o = {
2367  .inputs = {TAB_L, TAB_O},
2368  .output = TAB_Q9_LO,
2369  /*{l_discount, l_extendedprice, l_partkey*,
2370  l_quantity, l_suppkey*, o_orderdate}*/
2371  .ncolumns = 6,
2372  .columns = {{6, FST}, {5, FST}, {1, FST}, {4, FST}, {2, FST}, {4, SND}},
2373  .nkeys = 2,
2374  .keys = {{1, FST}, {2, FST}},
2375  /*assert(cnt[0] != 0 && cnt[1] == 1);*/
2376  .trace_product = 0
2377 };
2378 
2379 static struct PRODUCT q9_join_hnps_lo = {
2380  .inputs = {TAB_Q9_PHSN, TAB_Q9_LO},
2381  .output = TAB_Q9_PHSNLO,
2382  /*{l_discount, l_extendedprice, l_quantity,
2383  n_name, o_orderdate*, ps_supplycost}*/
2384  .ncolumns = 6,
2385  .columns = {{0, SND}, {1, SND}, {3, SND}, {0, FST}, {5, SND}, {3, FST}},
2386  .nkeys = 1,
2387  .keys = {{5, SND}},
2388  /*assert(cnt[0] == 0 || cnt[0] == 1);*/
2389  .cnt0_zero_one = 1,
2390  .trace_product = 0
2391 };
2392 
2393 static int
2394 q9_select_by_name(const struct kmr_kv_box kv0,
2395  const KMR_KVS *kvi, KMR_KVS *kvo,
2396  void *p, const long i)
2397 {
2398  enum TABLE input = TAB_P;
2399  struct RECORD *d0 = find_description(input);
2400  assert(d0->columns[1].field == F_TEXT);
2401 
2402  struct kmr_ntuple *u = (void *)kv0.v.p;
2403  struct kmr_ntuple_entry e = kmr_nth_ntuple(u, 1);
2404  char *pos = strnstr_(e.p, "green", (size_t)e.len);
2405  if (pos != 0) {
2406  kmr_add_kv(kvo, kv0);
2407  }
2408  return MPI_SUCCESS;
2409 }
2410 
2411 static int
2412 q9_calculate_amount(const struct kmr_kv_box kv0,
2413  const KMR_KVS *kvi, KMR_KVS *kvo,
2414  void *p, const long i)
2415 {
2416  char kbuf[RECORD_SIZE];
2417  struct kmr_ntuple *k = (void *)kbuf;
2418  char vbuf[RECORD_SIZE];
2419  struct kmr_ntuple *v = (void *)vbuf;
2420 
2421  enum TABLE inputoutput = TAB_Q9_PHSNLO;
2422  struct RECORD *d = find_description(inputoutput);
2423 
2424  KMR *mr = kvo->c.mr;
2425  struct kmr_ntuple *u = (void *)kv0.v.p;
2426  double l_discount = get_real_column_by_index(u, d->columns, 0);
2427  double l_extendedprice = get_real_column_by_index(u, d->columns, 1);
2428  double l_quantity = get_real_column_by_index(u, d->columns, 2);
2429  struct kmr_ntuple_entry n_name = kmr_nth_ntuple(u, 3);
2430  time_t o_orderdate = get_date_column_by_index(u, d->columns, 4);
2431  double l_supplycost = get_real_column_by_index(u, d->columns, 5);
2432 
2433  time_t year = year_value(o_orderdate);
2434 
2435  double amount = ((l_extendedprice * (1 - l_discount))
2436  - (l_supplycost * l_quantity));
2437 
2438  {
2439  char nbuf[NAME_SIZE];
2440  assert(n_name.len <= NAME_SIZE);
2441  memset(nbuf, 0, sizeof(nbuf));
2442  memcpy(nbuf, n_name.p, n_name.len);
2443  uint64_t beyear = htonll_((uint64_t)(-year));
2444 
2445  kmr_reset_ntuple(k, 2, 0);
2446  kmr_put_ntuple(mr, k, (int)sizeof(kbuf), nbuf, NAME_SIZE);
2447  kmr_put_ntuple(mr, k, (int)sizeof(kbuf), &beyear, sizeof(beyear));
2448 
2449  long iyear = year;
2450 
2451  kmr_reset_ntuple(v, 3, TAB_Q9_AMOUNT);
2452  kmr_put_ntuple_entry(mr, v, (int)sizeof(vbuf), n_name);
2453  kmr_put_ntuple(mr, v, (int)sizeof(vbuf), &iyear, sizeof(iyear));
2454  kmr_put_ntuple(mr, v, (int)sizeof(vbuf), &amount, sizeof(amount));
2455  }
2456  add_record(kvo, k, v);
2457 
2458  return MPI_SUCCESS;
2459 }
2460 
2461 static int
2462 q9_sum_amount(const struct kmr_kv_box kv[], const long n,
2463  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
2464 {
2465  char vbuf[RECORD_SIZE];
2466  struct kmr_ntuple *v = (void *)vbuf;
2467 
2468  enum TABLE inputoutput = TAB_Q9_AMOUNT;
2469  struct RECORD *d = find_description(inputoutput);
2470 
2471  KMR *mr = kvo->c.mr;
2472 
2473  double sum;
2474  sum = 0.0;
2475  for (long i = 0; i < n; i++) {
2476  struct kmr_ntuple *u = (void *)kv[i].v.p;
2477  double amount = get_real_column_by_index(u, d->columns, 2);
2478  sum += amount;
2479  }
2480 
2481  {
2482  struct kmr_ntuple *u = (void *)kv[0].v.p;
2483  struct kmr_ntuple_entry name = kmr_nth_ntuple(u, 0);
2484  struct kmr_ntuple_entry year = kmr_nth_ntuple(u, 1);
2485 
2486  kmr_reset_ntuple(v, 3, inputoutput);
2487  kmr_put_ntuple(mr, v, (int)sizeof(vbuf), name.p, name.len);
2488  kmr_put_ntuple(mr, v, (int)sizeof(vbuf), year.p, year.len);
2489  kmr_put_ntuple(mr, v, (int)sizeof(vbuf), &sum, sizeof(sum));
2490  }
2491 
2492  struct kmr_kv_box nkv = {
2493  .klen = kv[0].klen,
2494  .k = kv[0].k,
2495  .vlen = kmr_size_ntuple(v),
2496  .v.p = (void *)v
2497  };
2498  kmr_add_kv(kvo, nkv);
2499 
2500  return MPI_SUCCESS;
2501 }
2502 
2503 static KMR_KVS *
2504 q9(KMR *mr, struct RUN *run)
2505 {
2506  _Bool pushoff = ((run->pushoff == 0) ? 0 : 1);
2507  int nth;
2508 
2509  if (pushoff) {
2510  if (mr->rank == 0) {printf("q9 (with push-off)...\n"); fflush(0);}
2511  } else {
2512  if (mr->rank == 0) {printf("q9...\n"); fflush(0);}
2513  }
2514 
2515  /*(P+H)*/
2516 
2517  if (mr->rank == 0) {printf("q9 (p+ps)...\n"); fflush(0);}
2518 
2519  if (run->load_tables_in_advance) {
2520  /*empty*/
2521  } else {
2522  KMR_KVS *h0 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2523  scan_table_files(h0, TAB_H, 0, 0, run);
2524  }
2525 
2526  KMR_KVS *p1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2527  if (run->load_tables_in_advance) {
2528  kmr_map(P0, p1, 0, kmr_noopt, q9_select_by_name);
2529  } else {
2530  scan_table_files(p1, TAB_P, q9_select_by_name, 0, run);
2531  }
2532 
2533  KMR_KVS *pps2 = JOINP(p1, H0, KMR_KV_OPAQUE,
2534  &q9_join_p_h, "p+ps", pushoff);
2535 
2536  /*(S+N)*/
2537 
2538  if (mr->rank == 0) {printf("q9 (s+n)...\n"); fflush(0);}
2539 
2540  if (run->load_tables_in_advance) {
2541  /*empty*/
2542  } else {
2543  KMR_KVS *n0 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2544  scan_table_files(n0, TAB_N, 0, 0, run);
2545  }
2546 
2547  KMR_KVS *s0x = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2548  nth = 3; /*s_nationkey*/
2549  if (run->load_tables_in_advance) {
2550  kmr_map(S0, s0x, &nth, kmr_noopt, key_by_nth);
2551  } else {
2552  scan_table_files(s0x, TAB_S, key_by_nth, &nth, run);
2553  }
2554 
2555  KMR_KVS *sn2 = JOINP(s0x, N0, KMR_KV_OPAQUE,
2556  &q9_join_s_n, "n+s", pushoff);
2557 
2558  /*((P+H)+(S+N))*/
2559 
2560  if (mr->rank == 0) {printf("q9 (p+ps)+(s+n)...\n"); fflush(0);}
2561 
2562  KMR_KVS *ppssn2 = JOINP(pps2, sn2, KMR_KV_OPAQUE,
2563  &q9_join_hp_ns, "ns+pps", pushoff);
2564 
2565  /*(L+O)*/
2566 
2567  if (mr->rank == 0) {printf("q9 (l+o)...\n"); fflush(0);}
2568 
2569  KMR_KVS *l1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2570  nth = 0; /*l_orderkey*/
2571  if (run->load_tables_in_advance) {
2572  kmr_map(L0, l1, &nth, kmr_noopt, key_by_nth);
2573  } else {
2574  scan_table_files(l1, TAB_L, key_by_nth, &nth, run);
2575  }
2576 
2577  KMR_KVS *o1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2578  nth = 0; /*o_orderkey*/
2579  if (run->load_tables_in_advance) {
2580  kmr_map(O0, o1, &nth, kmr_noopt, key_by_nth);
2581  } else {
2582  scan_table_files(o1, TAB_O, key_by_nth, &nth, run);
2583  }
2584 
2585  KMR_KVS *lo2 = JOINP(l1, o1, KMR_KV_OPAQUE,
2586  &q9_join_l_o, "l+o", pushoff);
2587 
2588  /*(((P+H)+(S+N))+(L+O))*/
2589 
2590  if (mr->rank == 0) {printf("q9 (((p+ps)+(s+n))+(l+o))...\n"); fflush(0);}
2591 
2592  KMR_KVS *ppssnlo2 = JOINP(ppssn2, lo2, KMR_KV_OPAQUE,
2593  &q9_join_hnps_lo, "lo+nppss", 0);
2594  KMR_KVS *ppssnlo3 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2595  kmr_map(ppssnlo2, ppssnlo3, 0, kmr_noopt, q9_calculate_amount);
2596 
2597  pcount(ppssnlo3, 0, "q9_calculate_amount", 1);
2598 
2599  KMR_KVS *ppssnlo5 = JOIN1(ppssnlo3, KMR_KV_OPAQUE,
2600  q9_sum_amount, 0, "sum(amount)", 0);
2601 
2602  KMR_KVS *ppssnlo6 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
2603  kmr_sort(ppssnlo5, ppssnlo6, kmr_noopt);
2604 
2605  return ppssnlo6;
2606 }
2607 
2608 /* ================================================================ */
2609 
2610 /* QUERY (Q10)
2611 
2612  VALIDATION-RUN PARAMETERS (DATE=1993-10-01)
2613 
2614  [QUERY#0]
2615 
2616  select: (c_custkey, c_name, revenue, c_acctbal,
2617  n_name, c_address, c_phone, c_comment)
2618  revenue = sum(l_extendedprice * (1 - l_discount))
2619 
2620  where:
2621  c_custkey = o_custkey
2622  && l_orderkey = o_orderkey
2623  && o_orderdate >= date '1993-10-01'
2624  && o_orderdate < date '1993-10-01' + interval '3' month
2625  && l_returnflag = 'R'
2626  && c_nationkey = n_nationkey
2627 
2628  group-by: (c_custkey, c_name, c_acctbal, c_phone,
2629  n_name, c_address, c_comment)
2630 
2631  [QUERY#1]
2632 
2633  select: (c_custkey, c_name, revenue, c_acctbal,
2634  n_name, c_address, c_phone, c_comment)
2635 
2636  order-by: (revenue descending)
2637  where: rownum <= 20;
2638 
2639  [SCHEDULE]
2640 
2641  (L):
2642  select: l_returnflag = 'R'
2643  let: volume = (l_extendedprice * (1 - l_discount))
2644  output: L1 = {l_orderkey*, volume}
2645 
2646  (O):
2647  select: '1993-10-01' <= o_orderdate < '1994-01-01'
2648  output: O1 = {o_orderkey*, o_custkey}
2649 
2650  (L1+O1):
2651  select: l_orderkey+ = o_orderkey+
2652  output: LO = {o_custkey*, volume}
2653 
2654  (C):
2655  output: C1 = {c_nationkey*, c_custkey, c_name, c_acctbal,
2656  c_phone, c_address, c_comment}
2657 
2658  (N):
2659  output: N1 = {n_nationkey*, n_name}
2660 
2661  (C+N):
2662  select: c_nationkey = n_nationkey+
2663  output: CN = {c_custkey*, c_name, c_acctbal, c_phone,
2664  n_name, c_address, c_comment*}
2665 
2666  (CN+LO):
2667  select: c_custkey+ = o_custkey
2668  output: CLNO0 = {c_custkey*, c_name*, c_acctbal*, c_phone*,
2669  n_name*, c_address*, c_comment*, volume}
2670 
2671  (CLNO1):
2672  let: revenue = sum(volume)
2673  output: CLNO1 = {-revenue*; c_custkey, c_name, revenue,
2674  c_acctbal, n_name, c_address, c_phone, c_comment} */
2675 
2676 static struct SELECT q10_select_c = {
2677  .input = TAB_C,
2678  .output = TAB_Q10_C1,
2679  /*{c_nationkey*, c_custkey, c_name, c_acctbal,
2680  c_phone, c_address, c_comment}*/
2681  .ncolumns = 7,
2682  .columns = {3, 0, 1, 5, 4, 2, 7},
2683  .nkeys = 1,
2684  .keys = {3}
2685 };
2686 
2687 static struct SELECT q10_select_n = {
2688  .input = TAB_N,
2689  .output = TAB_Q10_N1,
2690  /*{n_nationkey*, n_name}*/
2691  .ncolumns = 2,
2692  .columns = {0, 1},
2693  .nkeys = 1,
2694  .keys = {0}
2695 };
2696 
2697 static struct PRODUCT q10_join_l_o = {
2698  .inputs = {TAB_Q10_L1, TAB_Q10_O1},
2699  .output = TAB_Q10_LO,
2700  /*{o_custkey*, volume}*/
2701  .ncolumns = 2,
2702  .columns = {{1, SND}, {1, FST}},
2703  .nkeys = 1,
2704  .keys = {{1, SND}},
2705  .trace_product = 0
2706 };
2707 
2708 static struct PRODUCT q10_join_c_n = {
2709  .inputs = {TAB_Q10_C1, TAB_Q10_N1},
2710  .output = TAB_Q10_CN,
2711  /*{c_custkey*, c_name, c_acctbal, c_phone,
2712  n_name, c_address, c_comment}*/
2713  .ncolumns = 7,
2714  .columns = {{1, FST}, {2, FST}, {3, FST}, {4, FST},
2715  {1, SND}, {5, FST}, {6, FST}},
2716  .nkeys = 1,
2717  .keys = {{1, FST}},
2718  .trace_product = 0
2719 };
2720 
2721 static struct PRODUCT q10_join_cn_lo = {
2722  .inputs = {TAB_Q10_CN, TAB_Q10_LO},
2723  .output = TAB_Q10_CLNO0,
2724  /*{c_custkey*, c_name*, c_acctbal*, c_phone*,
2725  n_name*, c_address*, c_comment*, volume}*/
2726  .ncolumns = 8,
2727  .columns = {{0, FST}, {1, FST}, {2, FST}, {3, FST},
2728  {4, FST}, {5, FST}, {6, FST}, {1, SND}},
2729  .nkeys = 7,
2730  .keys = {{0, FST}, {1, FST}, {2, FST}, {3, FST},
2731  {4, FST}, {5, FST}, {6, FST}},
2732  .trace_product = 0
2733 };
2734 
2735 static int
2736 q10_select_by_flag(const struct kmr_kv_box kv0, const KMR_KVS *kvi,
2737  KMR_KVS *kvo, void *p, const long i)
2738 {
2739  char kbuf[RECORD_SIZE];
2740  struct kmr_ntuple *k = (void *)kbuf;
2741  char vbuf[RECORD_SIZE];
2742  struct kmr_ntuple *v = (void *)vbuf;
2743 
2744  enum TABLE input = TAB_L;
2745  enum TABLE output = TAB_Q10_L1;
2746  struct RECORD *d = find_description(input);
2747 
2748  KMR *mr = kvo->c.mr;
2749  struct kmr_ntuple *u = (void *)kv0.v.p;
2750  struct kmr_ntuple_entry returnflag = column_by_name(u, d, "l_returnflag");
2751  char *s0 = "R";
2752  if (strncmp((char *)returnflag.p, s0, 1) == 0) {
2753  double extendedprice = get_real_column(u, d, "l_extendedprice");
2754  double discount = get_real_column(u, d, "l_discount");
2755  double volume = (extendedprice * (1 - discount));
2756  struct kmr_ntuple_entry orderkey = column_by_name(u, d, "l_orderkey");
2757 
2758  kmr_reset_ntuple(k, 1, 0);
2759  kmr_put_ntuple_entry(mr, k, (int)sizeof(kbuf), orderkey);
2760  kmr_reset_ntuple(v, 2, output);
2761  kmr_put_ntuple_entry(mr, v, (int)sizeof(vbuf), orderkey);
2762  kmr_put_ntuple(mr, v, (int)sizeof(vbuf), &volume, sizeof(volume));
2763  add_record(kvo, k, v);
2764  }
2765 
2766  return MPI_SUCCESS;
2767 }
2768 
2769 static int
2770 q10_select_by_date(const struct kmr_kv_box kv0, const KMR_KVS *kvi,
2771  KMR_KVS *kvo, void *p, const long i)
2772 {
2773  char kbuf[RECORD_SIZE];
2774  struct kmr_ntuple *k = (void *)kbuf;
2775  char vbuf[RECORD_SIZE];
2776  struct kmr_ntuple *v = (void *)vbuf;
2777 
2778  enum TABLE input = TAB_O;
2779  enum TABLE output = TAB_Q10_O1;
2780  struct RECORD *d = find_description(input);
2781 
2782  time_t *tv = p;
2783  KMR *mr = kvo->c.mr;
2784  struct kmr_ntuple *u = (void *)kv0.v.p;
2785  assert(u->marker == (int)input);
2786 
2787  time_t orderdate = get_date_column(u, d, "o_orderdate");
2788  if (tv[0] <= orderdate && orderdate <= tv[1]) {
2789  struct kmr_ntuple_entry orderkey = column_by_name(u, d, "o_orderkey");
2790 
2791  char *cols[] = {"o_orderkey", "o_custkey"};
2792 
2793  kmr_reset_ntuple(k, 1, 0);
2794  kmr_put_ntuple_entry(mr, k, (int)sizeof(kbuf), orderkey);
2795  kmr_reset_ntuple(v, 2, output);
2796  put_columns_by_names(mr, v, sizeof(vbuf), u, d, cols, 2);
2797  add_record(kvo, k, v);
2798  }
2799  return MPI_SUCCESS;
2800 }
2801 
2802 static int
2803 q10_sum_volume(const struct kmr_kv_box kv[], const long n,
2804  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
2805 {
2806  char kbuf[RECORD_SIZE];
2807  struct kmr_ntuple *k = (void *)kbuf;
2808  char vbuf[RECORD_SIZE];
2809  struct kmr_ntuple *v = (void *)vbuf;
2810 
2811  enum TABLE input = TAB_Q10_CLNO0;
2812  enum TABLE output = TAB_Q10_CLNO1;
2813  struct RECORD *d = find_description(input);
2814 
2815  KMR *mr = kvo->c.mr;
2816  struct kmr_ntuple *u0 = (void *)kv[0].v.p;
2817  assert(u0->marker == (int)input);
2818 
2819  double revenue;
2820  revenue = 0.0;
2821  for (long i = 0; i < n; i++) {
2822  struct kmr_ntuple *u = (void *)kv[i].v.p;
2823  double volume = get_real_column_by_index(u, d->columns, 7);
2824  revenue += volume;
2825  }
2826 
2827  char *cols0[] = {"c_custkey", "c_name"};
2828  char *cols1[] = {"c_acctbal", "n_name", "c_address",
2829  "c_phone", "c_comment"};
2830 
2831  double negrevenue = -revenue;
2832 
2833  kmr_reset_ntuple(k, 1, 0);
2834  kmr_put_ntuple(mr, k, (int)sizeof(kbuf), &negrevenue, sizeof(double));
2835  kmr_reset_ntuple(v, 8, output);
2836  put_columns_by_names(mr, v, sizeof(vbuf), u0, d, cols0, 2);
2837  kmr_put_ntuple(mr, v, (int)sizeof(vbuf), &revenue, sizeof(revenue));
2838  put_columns_by_names(mr, v, sizeof(vbuf), u0, d, cols1, 5);
2839  add_record(kvo, k, v);
2840 
2841  return MPI_SUCCESS;
2842 }
2843 
2844 static KMR_KVS *
2845 q10(KMR *mr, struct RUN *run)
2846 {
2847  _Bool pushoff = ((run->pushoff == 0) ? 0 : 1);
2848 
2849  if (pushoff) {
2850  if (mr->rank == 0) {printf("q10 (with push-off)...\n"); fflush(0);}
2851  } else {
2852  if (mr->rank == 0) {printf("q10...\n"); fflush(0);}
2853  }
2854 
2855  /*(L)*/
2856 
2857  if (mr->rank == 0) {printf("q10 (l)...\n"); fflush(0);}
2858 
2859  KMR_KVS *l1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2860  if (run->load_tables_in_advance) {
2861  kmr_map(L0, l1, 0, kmr_noopt, q10_select_by_flag);
2862  } else {
2863  scan_table_files(l1, TAB_L, q10_select_by_flag, 0, run);
2864  }
2865 
2866  /*(O)*/
2867 
2868  if (mr->rank == 0) {printf("q10 (o)...\n"); fflush(0);}
2869 
2870  time_t tv[2];
2871  tv[0] = decode_date("1993-10-01");
2872  assert(tv[0] != (time_t)-1);
2873  tv[1] = decode_date("1994-01-01");
2874  assert(tv[1] != (time_t)-1);
2875  KMR_KVS *o1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2876  if (run->load_tables_in_advance) {
2877  kmr_map(O0, o1, tv, kmr_noopt, q10_select_by_date);
2878  } else {
2879  scan_table_files(o1, TAB_O, q10_select_by_date, tv, run);
2880  }
2881 
2882  /*(L1+O1)*/
2883 
2884  KMR_KVS *lo = JOINP(l1, o1, KMR_KV_OPAQUE,
2885  &q10_join_l_o, "l+o", pushoff);
2886 
2887  /*(C+N)*/
2888 
2889  if (mr->rank == 0) {printf("q10 (c)...\n"); fflush(0);}
2890 
2891  KMR_KVS *c1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2892  if (run->load_tables_in_advance) {
2893  kmr_map(C0, c1, &q10_select_c, kmr_noopt, select_by_fields);
2894  } else {
2895  scan_table_files(c1, TAB_C, select_by_fields, &q10_select_c, run);
2896  }
2897 
2898  if (mr->rank == 0) {printf("q10 (n)...\n"); fflush(0);}
2899 
2900  KMR_KVS *n1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
2901  if (run->load_tables_in_advance) {
2902  kmr_map(N0, n1, &q10_select_n, kmr_noopt, select_by_fields);
2903  } else {
2904  scan_table_files(n1, TAB_N, select_by_fields, &q10_select_n, run);
2905  }
2906 
2907  if (mr->rank == 0) {printf("q10 (c+n)...\n"); fflush(0);}
2908 
2909  KMR_KVS *cn = JOINP(c1, n1, KMR_KV_OPAQUE,
2910  &q10_join_c_n, "c+n", pushoff);
2911 
2912  /*(CN+LO)*/
2913 
2914  if (mr->rank == 0) {printf("q10 (cn+lo)...\n"); fflush(0);}
2915 
2916  KMR_KVS *clno0 = JOINP(cn, lo, KMR_KV_OPAQUE,
2917  &q10_join_cn_lo, "cn+lo", pushoff);
2918 
2919  if (mr->rank == 0) {printf("q10 (sum(volume))...\n"); fflush(0);}
2920 
2921  KMR_KVS *clno1 = JOIN1(clno0, KMR_KV_FLOAT8,
2922  q10_sum_volume, 0, "sum(volume)", 0);
2923 
2924  //phisto(clno1, "sum(volumne)");
2925 
2926  KMR_KVS *clno2 = CREATE_KVS(mr, KMR_KV_FLOAT8, 0);
2927  kmr_sort(clno1, clno2, kmr_noopt);
2928 
2929  pcount(clno2, 0, "sort", 1);
2930 
2931  KMR_KVS *clno3 = CREATE_KVS(mr, KMR_KV_FLOAT8, 0);
2932  kmr_choose_first_part(clno2, clno3, 20, kmr_noopt);
2933 
2934  pcount(clno3, 0, "choose", 1);
2935 
2936  return clno3;
2937 }
2938 
2939 /* ================================================================ */
2940 
2941 /* QUERY (Q13)
2942 
2943  VALIDATION-RUN PARAMETERS (WORD1=special, WORD2=requests)
2944 
2945  [QUERY#0]
2946 
2947  select: #left-outer-join (customer, orders)
2948  on: c_custkey = o_custkey
2949  && o_comment not like '%special%requests%'
2950 
2951  [QUERY#1]
2952 
2953  select: c_orders = (c_custkey, count(o_orderkey))
2954  group-by: (c_custkey)
2955 
2956  [QUERY#2]
2957 
2958  select: (c_count, custdist=count(*))
2959  from: c_orders
2960  group-by: (c_count)
2961 
2962  order-by: (c_count descending, custdist descending)
2963 
2964  [SCHEDULE#0]
2965 
2966  (O1):
2967  select: o_comment = '%special%requests%'
2968  output: O1 = {o_custkey*, o_orderkey}
2969 
2970  (C+O1):
2971  select: (c_custkey* = o_custkey)
2972  let: q13_count = #count(o_orderkey)
2973  output: CO0 = {q13_count*, c_custkey}
2974 
2975  (CO0):
2976  select: (q13_count)
2977  let: q13_custdist = #count(c_custkey)
2978  output: CO1 = {q13_custdist* (descending), q13_count* (descending)} */
2979 
2980 static int
2981 q13_select_by_string(const struct kmr_kv_box kv0,
2982  const KMR_KVS *kvi, KMR_KVS *kvo,
2983  void *p, const long i)
2984 {
2985  char kbuf[RECORD_SIZE];
2986  struct kmr_ntuple *k = (void *)kbuf;
2987  char vbuf[RECORD_SIZE];
2988  struct kmr_ntuple *v = (void *)vbuf;
2989 
2990  enum TABLE input = TAB_O;
2991  enum TABLE output = TAB_Q13_O1;
2992  struct RECORD *d = find_description(input);
2993 
2994  KMR *mr = kvo->c.mr;
2995  struct kmr_ntuple *u = (void *)kv0.v.p;
2996 
2997  assert(d->columns[8].field == F_TEXT);
2998  struct kmr_ntuple_entry comment = kmr_nth_ntuple(u, 8);
2999  char *p0 = (comment.p);
3000  char *end = &(p0[comment.len]);
3001  char *p1 = strnstr_(p0, "special", (size_t)(end - p0));
3002  char *p2 = ((p1 == 0) ? 0 : strnstr_(p1, "requests", (size_t)(end - p1)));
3003 
3004  if (!(p1 != 0 && p2 != 0)) {
3005  struct kmr_ntuple_entry orderkey = kmr_nth_ntuple(u, 0);
3006  struct kmr_ntuple_entry custkey = kmr_nth_ntuple(u, 1);
3007 
3008  kmr_reset_ntuple(k, 1, 0);
3009  kmr_put_ntuple_entry(mr, k, (int)sizeof(kbuf), custkey);
3010  kmr_reset_ntuple(v, 1, output);
3011  kmr_put_ntuple_entry(mr, v, (int)sizeof(vbuf), orderkey);
3012  add_record(kvo, k, v);
3013  }
3014  return MPI_SUCCESS;
3015 }
3016 
3017 static int
3018 q13_join_c_o(const struct kmr_kv_box kv[], const long n,
3019  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
3020 {
3021  char kbuf[RECORD_SIZE];
3022  struct kmr_ntuple *k = (void *)kbuf;
3023  char vbuf[RECORD_SIZE];
3024  struct kmr_ntuple *v = (void *)vbuf;
3025 
3026  int inputs[2] = {TAB_C, TAB_Q13_O1};
3027  int output = TAB_Q13_CO0;
3028  /*struct RECORD *dx = find_description((enum TABLE)output);*/
3029 
3030  KMR *mr = kvo->c.mr;
3031 
3032  struct kmr_ntuple **vv[2];
3033  long cnt[2];
3034  kmr_separate_ntuples(kvo->c.mr, kv, n, vv, cnt, inputs, 1);
3035  /*fprintf(stderr, "prod %ld x %ld\n", cnt[0], cnt[1]); fflush(0);*/
3036  assert(cnt[0] == 1);
3037 
3038  long count = cnt[1];
3039 
3040  struct kmr_ntuple *u0 = vv[0][0];
3041  struct kmr_ntuple_entry custkey = kmr_nth_ntuple(u0, 0);
3042  assert(custkey.len == sizeof(long));
3043 
3044  kmr_reset_ntuple(k, 1, 0);
3045  kmr_put_ntuple(mr, k, (int)sizeof(kbuf), &count, sizeof(long));
3046  kmr_reset_ntuple(v, 1, output);
3047  kmr_put_ntuple_entry(mr, v, (int)sizeof(vbuf), custkey);
3048  add_record(kvo, k, v);
3049 
3050  free(vv[0]);
3051  free(vv[1]);
3052  return MPI_SUCCESS;
3053 }
3054 
3055 static int
3056 q13_join_co(const struct kmr_kv_box kv[], const long n,
3057  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
3058 {
3059  char kbuf[RECORD_SIZE];
3060  struct kmr_ntuple *k = (void *)kbuf;
3061  char vbuf[RECORD_SIZE];
3062  struct kmr_ntuple *v = (void *)vbuf;
3063 
3064  /*enum TABLE input = TAB_Q13_CO0;*/
3065  enum TABLE output = TAB_Q13_CO1;
3066 
3067  KMR *mr = kvo->c.mr;
3068  assert(kv[0].klen == sizeof(long));
3069 
3070  long count = kv[0].k.i;
3071  assert(n > 0);
3072  long custdist = n;
3073 
3074  assert(sizeof(uint64_t) == sizeof(long));
3075  uint64_t becustdist = htonll_((uint64_t)(-custdist));
3076  uint64_t becount = htonll_((uint64_t)(-count));
3077 
3078  kmr_reset_ntuple(k, 2, 0);
3079  kmr_put_ntuple_long(mr, k, (int)sizeof(kbuf), (long)becustdist);
3080  kmr_put_ntuple_long(mr, k, (int)sizeof(kbuf), (long)becount);
3081  kmr_reset_ntuple(v, 2, output);
3082  kmr_put_ntuple_long(mr, v, (int)sizeof(vbuf), count);
3083  kmr_put_ntuple_long(mr, v, (int)sizeof(vbuf), custdist);
3084  add_record(kvo, k, v);
3085 
3086  return MPI_SUCCESS;
3087 }
3088 
3089 static KMR_KVS *
3090 q13(KMR *mr, struct RUN *run)
3091 {
3092  _Bool pushoff = ((run->pushoff == 0) ? 0 : 1);
3093 
3094  if (pushoff) {
3095  if (mr->rank == 0) {printf("q13 (with push-off)...\n"); fflush(0);}
3096  } else {
3097  if (mr->rank == 0) {printf("q13...\n"); fflush(0);}
3098  }
3099 
3100  /*(O)*/
3101 
3102  if (mr->rank == 0) {printf("q13 (o)...\n"); fflush(0);}
3103 
3104  KMR_KVS *o1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
3105  if (run->load_tables_in_advance) {
3106  kmr_map(O0, o1, 0, kmr_noopt, q13_select_by_string);
3107  } else {
3108  scan_table_files(o1, TAB_O, q13_select_by_string, 0, run);
3109  }
3110 
3111  pcount(o1, 0, "select (o)", 1);
3112 
3113  /*(C+O)*/
3114 
3115  if (run->load_tables_in_advance) {
3116  /*empty*/
3117  } else {
3118  KMR_KVS *c0 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
3119  scan_table_files(c0, TAB_C, 0, 0, run);
3120  }
3121 
3122  KMR_KVS *co2 = JOIN2(C0, o1, KMR_KV_INTEGER,
3123  q13_join_c_o, 0, "c+o", pushoff);
3124 
3125  KMR_KVS *co4 = JOIN1(co2, KMR_KV_OPAQUE,
3126  q13_join_co, 0, "c+o", 0);
3127 
3128  KMR_KVS *co5 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
3129  kmr_sort(co4, co5, kmr_noopt);
3130 
3131  return co5;
3132 }
3133 
3134 /* ================================================================ */
3135 
3136 /* QUERY (Q21)
3137 
3138  VALIDATION-RUN PARAMETERS (NATION=SAUDI ARABIA)
3139 
3140  [QUERY#0]
3141 
3142  select: TBL0(l1) = *
3143 
3144  from:
3145  l2 = lineitem
3146 
3147  where:
3148  l2.l_orderkey = l1.l_orderkey
3149  && l2.l_suppkey <> l1.l_suppkey
3150 
3151  [QUERY#1]
3152 
3153  select: TBL1(l1) = *
3154 
3155  from:
3156  l3 = lineitem
3157 
3158  where:
3159  l3.l_orderkey = l1.l_orderkey
3160  && l3.l_suppkey <> l1.l_suppkey
3161  && l3.l_receiptdate > l3.l_commitdate
3162 
3163  [QUERY#2]
3164 
3165  select: (s_name, numwait)
3166  numwait = count(*)
3167 
3168  from:
3169  supplier,
3170  l1 = lineitem,
3171  orders,
3172  nation
3173 
3174  where:
3175  s_suppkey = l1.l_suppkey
3176  and o_orderkey = l1.l_orderkey
3177  and o_orderstatus = 'F'
3178  and l1.l_receiptdate > l1.l_commitdate
3179  and exists (TBL0(l1))
3180  and not exists (TBL1(l1))
3181  and s_nationkey = n_nationkey
3182  and n_name = 'SAUDI ARABIA'
3183 
3184  group-by: s_name
3185  order-by: (numwait descending, s_name)
3186  where:
3187  rownum <= 100
3188 
3189  [SCHEDULE]
3190 
3191  (N1):
3192  select: n_name = 'SAUDI ARABIA'
3193  output N1 = {n_nationkey*}
3194 
3195  (N+S):
3196  select: s_nationkey = n_nationkey
3197  output: NS (:TAB_Q21_NS) = {s_suppkey*, s_name}
3198 
3199  (L1):
3200  select: l1.l_receiptdate > l1.l_commitdate
3201  output: L1 = {l_orderkey, l_suppkey*}
3202 
3203  (L1+NS):
3204  select: s_suppkey = l1.l_suppkey
3205  output: LNS (:TAB_Q21_LNS) = {l_orderkey*, l_suppkey, s_name}
3206 
3207  (O1):
3208  select o_orderstatus = 'F'
3209  output: O1 = {o_orderkey*}
3210 
3211  (LNS+O):
3212  select: o_orderkey = l1.l_orderkey
3213  output: LNOS0 (:TAB_Q21_LNS) = {l_orderkey*, l_suppkey, s_name}
3214 
3215  (COPY L TWICE):
3216 
3217  (L2):
3218  output: L2 = {l2.l_orderkey*, l2.l_suppkey}
3219 
3220  (LNOS0+L2):
3221  exists: l1.l_orderkey = l2.l_orderkey && l1.l_suppkey <> l2.l_suppkey
3222  output: LNOS2 (:TAB_Q21_LNS) = {l_orderkey*, l_suppkey, s_name}
3223 
3224  (L3):
3225  select: l3.l_receiptdate > l3.l_commitdate
3226  output: L3 = {l3.l_orderkey*, l3.l_suppkey}
3227 
3228  (LNOS2+L3):
3229  not-exists: l3.l_orderkey = l1.l_orderkey && l3.l_suppkey <> l1.l_suppkey
3230  output: NAME = {s_name}
3231 
3232  (NUMWAIT):
3233  select: s_name
3234  let: numwait = count(*)
3235  output: NUMWAIT = {-numwait*, s_name}
3236 
3237  (SORT):
3238  order-by: (numwait descending, s_name)
3239  filetr: (rownum <= 100) */
3240 
3241 static struct PRODUCT q21_join_n_s = {
3242  .inputs = {TAB_Q21_N1, TAB_S},
3243  .output = TAB_Q21_NS,
3244  /*{s_suppkey*, s_name}*/
3245  .ncolumns = 2,
3246  .columns = {{0, SND}, {1, SND}},
3247  .nkeys = 1,
3248  .keys = {{0, SND}},
3249  .trace_product = 0
3250 };
3251 
3252 static struct PRODUCT q21_join_l_ns = {
3253  .inputs = {TAB_Q21_L1, TAB_Q21_NS},
3254  .output = TAB_Q21_LNS,
3255  /*{l_orderkey*, l_suppkey, s_name}*/
3256  .ncolumns = 3,
3257  .columns = {{0, FST}, {1, FST}, {1, SND}},
3258  .nkeys = 1,
3259  .keys = {{0, FST}},
3260  .trace_product = 0
3261 };
3262 
3263 static struct PRODUCT q21_join_lns_o = {
3264  .inputs = {TAB_Q21_LNS, TAB_Q21_O1},
3265  .output = TAB_Q21_LNS,
3266  /*{l_orderkey*, l_suppkey, s_name}*/
3267  .ncolumns = 3,
3268  .columns = {{0, FST}, {1, FST}, {2, FST}},
3269  .nkeys = 1,
3270  .keys = {{0, FST}},
3271  .trace_product = 0
3272 };
3273 
3274 static int
3275 q21_select_n_by_name(const struct kmr_kv_box kv0,
3276  const KMR_KVS *kvi, KMR_KVS *kvo,
3277  void *p, const long i)
3278 {
3279  char kbuf[RECORD_SIZE];
3280  struct kmr_ntuple *k = (void *)kbuf;
3281  char vbuf[RECORD_SIZE];
3282  struct kmr_ntuple *v = (void *)vbuf;
3283 
3284  enum TABLE input = TAB_N;
3285  enum TABLE output = TAB_Q21_N1;
3286  /*{n_nationkey*}*/
3287  struct RECORD *d = find_description(input);
3288 
3289  KMR *mr = kvo->c.mr;
3290  struct kmr_ntuple *u = (void *)kv0.v.p;
3291 
3292  assert(d->columns[1].field == F_TEXT);
3293  struct kmr_ntuple_entry n_name = kmr_nth_ntuple(u, 1);
3294  char *s0 = "SAUDI ARABIA";
3295  size_t len = strlen(s0);
3296  if (((size_t)n_name.len == len)
3297  && (strncmp((char *)n_name.p, s0, len) == 0)) {
3298  struct kmr_ntuple_entry n_nationkey = kmr_nth_ntuple(u, 0);
3299 
3300  kmr_reset_ntuple(k, 1, 0);
3301  kmr_put_ntuple_entry(mr, k, (int)sizeof(kbuf), n_nationkey);
3302  kmr_reset_ntuple(v, 1, output);
3303  kmr_put_ntuple_entry(mr, v, (int)sizeof(vbuf), n_nationkey);
3304  add_record(kvo, k, v);
3305  }
3306  return MPI_SUCCESS;
3307 }
3308 
3309 static int
3310 q21_select_l1_by_date(const struct kmr_kv_box kv0, const KMR_KVS *kvi,
3311  KMR_KVS *kvo, void *p, const long i)
3312 {
3313  char kbuf[RECORD_SIZE];
3314  struct kmr_ntuple *k = (void *)kbuf;
3315  char vbuf[RECORD_SIZE];
3316  struct kmr_ntuple *v = (void *)vbuf;
3317 
3318  enum TABLE input = TAB_L;
3319  enum TABLE output = TAB_Q21_L1;
3320  struct RECORD *d = find_description(input);
3321 
3322  KMR *mr = kvo->c.mr;
3323  struct kmr_ntuple *u = (void *)kv0.v.p;
3324 
3325  time_t receiptdate = get_date_column(u, d, "l_receiptdate");
3326  time_t commitdate = get_date_column(u, d, "l_commitdate");
3327  if (receiptdate > commitdate) {
3328  struct kmr_ntuple_entry orderkey = column_by_name(u, d, "l_orderkey");
3329  struct kmr_ntuple_entry suppkey = column_by_name(u, d, "l_suppkey");
3330 
3331  kmr_reset_ntuple(k, 1, 0);
3332  kmr_put_ntuple_entry(mr, k, (int)sizeof(kbuf), suppkey);
3333  kmr_reset_ntuple(v, 2, output);
3334  kmr_put_ntuple_entry(mr, v, (int)sizeof(vbuf), orderkey);
3335  kmr_put_ntuple_entry(mr, v, (int)sizeof(vbuf), suppkey);
3336  add_record(kvo, k, v);
3337  }
3338 
3339  return MPI_SUCCESS;
3340 }
3341 
3342 static int
3343 q21_select_l3_by_date(const struct kmr_kv_box kv0, const KMR_KVS *kvi,
3344  KMR_KVS *kvo, void *p, const long i)
3345 {
3346  char kbuf[RECORD_SIZE];
3347  struct kmr_ntuple *k = (void *)kbuf;
3348  char vbuf[RECORD_SIZE];
3349  struct kmr_ntuple *v = (void *)vbuf;
3350 
3351  enum TABLE input = TAB_L;
3352  enum TABLE output = TAB_Q21_L3;
3353  struct RECORD *d = find_description(input);
3354 
3355  KMR *mr = kvo->c.mr;
3356  struct kmr_ntuple *u = (void *)kv0.v.p;
3357 
3358  time_t receiptdate = get_date_column(u, d, "l_receiptdate");
3359  time_t commitdate = get_date_column(u, d, "l_commitdate");
3360  if (receiptdate > commitdate) {
3361  struct kmr_ntuple_entry orderkey = column_by_name(u, d, "l_orderkey");
3362  struct kmr_ntuple_entry suppkey = column_by_name(u, d, "l_suppkey");
3363 
3364  kmr_reset_ntuple(k, 1, 0);
3365  kmr_put_ntuple_entry(mr, k, (int)sizeof(kbuf), orderkey);
3366  kmr_reset_ntuple(v, 2, output);
3367  kmr_put_ntuple_entry(mr, v, (int)sizeof(vbuf), orderkey);
3368  kmr_put_ntuple_entry(mr, v, (int)sizeof(vbuf), suppkey);
3369  add_record(kvo, k, v);
3370  }
3371 
3372  return MPI_SUCCESS;
3373 }
3374 
3375 static struct SELECT q21_copy_l = {
3376  .input = TAB_L,
3377  .output = TAB_Q21_L3,
3378  .ncolumns = 2,
3379  .columns = {0, 2},
3380  .nkeys = 1,
3381  .keys = {0}
3382 };
3383 
3384 static int
3385 q21_select_o_by_flag(const struct kmr_kv_box kv0, const KMR_KVS *kvi,
3386  KMR_KVS *kvo, void *p, const long i)
3387 {
3388  char vbuf[RECORD_SIZE];
3389  struct kmr_ntuple *v = (void *)vbuf;
3390 
3391  enum TABLE input = TAB_O;
3392  enum TABLE output = TAB_Q21_O1;
3393  struct RECORD *d = find_description(input);
3394 
3395  KMR *mr = kvo->c.mr;
3396  struct kmr_ntuple *u = (void *)kv0.v.p;
3397  struct kmr_ntuple_entry status = column_by_name(u, d, "o_orderstatus");
3398  char *s0 = (char *)status.p;
3399  if (status.len == 1 && s0[0] == 'F') {
3400  struct kmr_ntuple_entry orderkey = column_by_name(u, d, "o_orderkey");
3401 
3402  kmr_reset_ntuple(v, 1, output);
3403  kmr_put_ntuple_entry(mr, v, (int)sizeof(vbuf), orderkey);
3404 
3405  struct kmr_kv_box kv = {
3406  .klen = orderkey.len,
3407  .k.p = orderkey.p,
3408  .vlen = kmr_size_ntuple(v),
3409  .v.p = (void *)v
3410  };
3411  kmr_add_kv(kvo, kv);
3412  }
3413 
3414  return MPI_SUCCESS;
3415 }
3416 
3417 static int
3418 q21_join_lnos_l2(const struct kmr_kv_box kv[], const long n,
3419  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
3420 {
3421  char kbuf[RECORD_SIZE];
3422  struct kmr_ntuple *k = (void *)kbuf;
3423  //char vbuf[RECORD_SIZE];
3424  //struct kmr_ntuple *v = (void *)vbuf;
3425 
3426  int inputs[] = {TAB_Q21_LNS, TAB_Q21_L3};
3427  //enum TABLE output = TAB_Q21_LNS;
3428  struct RECORD *d0 = find_description((enum TABLE)inputs[0]);
3429  struct RECORD *d1 = find_description((enum TABLE)inputs[1]);
3430 
3431  KMR *mr = kvo->c.mr;
3432  struct kmr_ntuple **vv[2];
3433  long cnt[2];
3434  kmr_separate_ntuples(mr, kv, n, vv, cnt, inputs, 1);
3435 
3436  for (long i0 = 0; i0 < cnt[0]; i0++) {
3437  struct kmr_ntuple *u0 = (void *)vv[0][i0];
3438  long suppkey0 = get_int_column_by_index(u0, d0->columns, 1);
3439  _Bool exists = 0;
3440  for (long i1 = 0; i1 < cnt[1]; i1++) {
3441  struct kmr_ntuple *u1 = (void *)vv[1][i1];
3442  long suppkey1 = get_int_column_by_index(u1, d1->columns, 1);
3443  if (suppkey0 != suppkey1) {
3444  exists = 1;
3445  break;
3446  }
3447  }
3448  if (exists) {
3449  struct kmr_ntuple_entry orderkey = kmr_nth_ntuple(u0, 0);
3450  kmr_reset_ntuple(k, 1, 0);
3451  kmr_put_ntuple_entry(mr, k, (int)sizeof(kbuf), orderkey);
3452  add_record(kvo, k, u0);
3453  }
3454  }
3455 
3456  return MPI_SUCCESS;
3457 }
3458 
3459 static int
3460 q21_join_lnos_l3(const struct kmr_kv_box kv[], const long n,
3461  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
3462 {
3463  char kbuf[RECORD_SIZE];
3464  struct kmr_ntuple *k = (void *)kbuf;
3465  char vbuf[RECORD_SIZE];
3466  struct kmr_ntuple *v = (void *)vbuf;
3467 
3468  int inputs[] = {TAB_Q21_LNS, TAB_Q21_L3};
3469  //enum TABLE output = TAB_Q21_NAME;
3470  /*{s_name*, s_name}*/
3471  struct RECORD *d0 = find_description((enum TABLE)inputs[0]);
3472  struct RECORD *d1 = find_description((enum TABLE)inputs[1]);
3473 
3474  KMR *mr = kvo->c.mr;
3475  struct kmr_ntuple **vv[2];
3476  long cnt[2];
3477  kmr_separate_ntuples(mr, kv, n, vv, cnt, inputs, 1);
3478 
3479  for (long i0 = 0; i0 < cnt[0]; i0++) {
3480  struct kmr_ntuple *u0 = (void *)vv[0][i0];
3481  long suppkey0 = get_int_column_by_index(u0, d0->columns, 1);
3482  _Bool nonexists = 1;
3483  for (long i1 = 0; i1 < cnt[1]; i1++) {
3484  struct kmr_ntuple *u1 = (void *)vv[1][i1];
3485  long suppkey1 = get_int_column_by_index(u1, d1->columns, 1);
3486  if (suppkey0 != suppkey1) {
3487  nonexists = 0;
3488  break;
3489  }
3490  }
3491  if (nonexists) {
3492  struct kmr_ntuple_entry name = kmr_nth_ntuple(u0, 2);
3493  kmr_reset_ntuple(k, 1, 0);
3494  kmr_put_ntuple_entry(mr, k, (int)sizeof(kbuf), name);
3495  kmr_reset_ntuple(v, 1, TAB_Q21_NAME);
3496  kmr_put_ntuple_entry(mr, v, (int)sizeof(kbuf), name);
3497  add_record(kvo, k, v);
3498  }
3499  }
3500 
3501  return MPI_SUCCESS;
3502 }
3503 
3504 static int
3505 q21_join_numwait(const struct kmr_kv_box kv[], const long n,
3506  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
3507 {
3508  char kbuf[RECORD_SIZE];
3509  struct kmr_ntuple *k = (void *)kbuf;
3510  char vbuf[RECORD_SIZE];
3511  struct kmr_ntuple *v = (void *)vbuf;
3512 
3513  enum TABLE input = TAB_Q21_NAME;
3514  enum TABLE output = TAB_Q21_NUMWAIT;
3515  /*{-numwait*, s_name}*/
3516  struct RECORD *d = find_description((enum TABLE)input);
3517 
3518  KMR *mr = kvo->c.mr;
3519 
3520  struct kmr_ntuple *u = (void *)kv[0].v.p;
3521  struct kmr_ntuple_entry s_name = column_by_name(u, d, "s_name");
3522 
3523  long negnumwait = -n;
3524 
3525  char nbuf[NAME_SIZE];
3526  assert(s_name.len <= NAME_SIZE);
3527  memset(nbuf, 0, sizeof(nbuf));
3528  memcpy(nbuf, s_name.p, s_name.len);
3529 
3530  kmr_reset_ntuple(k, 2, 0);
3531  kmr_put_ntuple(mr, k, (int)sizeof(kbuf), &negnumwait, sizeof(long));
3532  kmr_put_ntuple(mr, k, (int)sizeof(kbuf), nbuf, NAME_SIZE);
3533  kmr_reset_ntuple(v, 2, output);
3534  kmr_put_ntuple_entry(mr, v, (int)sizeof(vbuf), s_name);
3535  kmr_put_ntuple(mr, v, (int)sizeof(vbuf), &n, (int)sizeof(long));
3536  add_record(kvo, k, v);
3537 
3538  return MPI_SUCCESS;
3539 }
3540 
3541 static KMR_KVS *
3542 q21(KMR *mr, struct RUN *run)
3543 {
3544  _Bool pushoff = ((run->pushoff == 0) ? 0 : 1);
3545  struct kmr_option inspect = {.inspect = 1};
3546 
3547  if (pushoff) {
3548  if (mr->rank == 0) {printf("q21 (with push-off)...\n"); fflush(0);}
3549  } else {
3550  if (mr->rank == 0) {printf("q21...\n"); fflush(0);}
3551  }
3552 
3553  /*(N)*/
3554 
3555  KMR_KVS *n1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
3556  if (run->load_tables_in_advance) {
3557  kmr_map(N0, n1, 0, kmr_noopt, q21_select_n_by_name);
3558  } else {
3559  scan_table_files(n1, TAB_N, q21_select_n_by_name, 0, run);
3560  }
3561 
3562  KMR_KVS *s1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
3563  int nth = 3; /*s_nationkey*/
3564  if (run->load_tables_in_advance) {
3565  kmr_map(S0, s1, &nth, kmr_noopt, key_by_nth);
3566  } else {
3567  scan_table_files(s1, TAB_S, key_by_nth, &nth, run);
3568  }
3569 
3570  /*(N+S)*/
3571 
3572  KMR_KVS *ns0 = JOINP(n1, s1, KMR_KV_OPAQUE,
3573  &q21_join_n_s, "n+s", pushoff);
3574 
3575  /*(L1+NS)*/
3576 
3577  if (run->load_tables_in_advance) {
3578  /*empty*/
3579  } else {
3580  KMR_KVS *l0 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
3581  scan_table_files(l0, TAB_L, 0, 0, run);
3582  }
3583 
3584  KMR_KVS *l1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
3585  kmr_map(L0, l1, 0, inspect, q21_select_l1_by_date);
3586 
3587  pcount(l1, 0, "l1", 1);
3588 
3589  KMR_KVS *lns0 = JOINP(l1, ns0, KMR_KV_OPAQUE,
3590  &q21_join_l_ns, "l+ns", pushoff);
3591 
3592  /*(O1)*/
3593 
3594  KMR_KVS *o1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
3595  if (run->load_tables_in_advance) {
3596  kmr_map(O0, o1, 0, kmr_noopt, q21_select_o_by_flag);
3597  } else {
3598  scan_table_files(o1, TAB_O, q21_select_o_by_flag, 0, run);
3599  }
3600 
3601  pcount(o1, 0, "o1", 1);
3602 
3603  /*(LNS+O)*/
3604 
3605  KMR_KVS *lnos0 = JOINP(lns0, o1, KMR_KV_OPAQUE,
3606  &q21_join_lns_o, "lns+o", pushoff);
3607 
3608  //dump_table(lnos0, TAB_Q21_LNS);
3609 
3610  /*(L2+LNOS)*/
3611 
3612  KMR_KVS *l2 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
3613  kmr_map(L0, l2, &q21_copy_l, inspect, select_by_fields);
3614 
3615  KMR_KVS *lnos2 = JOIN2(lnos0, l2, KMR_KV_OPAQUE,
3616  q21_join_lnos_l2, 0, "l2+lnos", pushoff);
3617 
3618  /*(LNOS+L3)*/
3619 
3620  KMR_KVS *l3 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
3621  kmr_map(L0, l3, 0, inspect, q21_select_l3_by_date);
3622  kmr_free_kvs(L0);
3623 
3624  KMR_KVS *name0 = JOIN2(lnos2, l3, KMR_KV_OPAQUE,
3625  q21_join_lnos_l3, 0, "l3+lnos", pushoff);
3626 
3627  //dump_table(name0, TAB_Q21_NAME);
3628 
3629  /*(NUMWAIT)*/
3630 
3631  KMR_KVS *ex2 = JOIN1(name0, KMR_KV_OPAQUE,
3632  q21_join_numwait, 0, "numwait", 0);
3633 
3634  /*(SORT)*/
3635 
3636  KMR_KVS *ex3 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
3637  kmr_sort(ex2, ex3, kmr_noopt);
3638 
3639  pcount(ex3, 0, "sort", 1);
3640  //dump_table(ex3, TAB_Q21_NUMWAIT);
3641 
3642  KMR_KVS *ex4 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
3643  struct kmr_option opt = {.nothreading = 1};
3644  kmr_choose_first_part(ex3, ex4, 100, opt);
3645 
3646  pcount(ex4, 0, "choose", 1);
3647 
3648  return ex4;
3649 }
3650 
3651 /* ================================================================ */
3652 
3653 static int
3654 atoi_safe(char *s, int ok[], int n, char *m)
3655 {
3656  int v;
3657  char gomi[4];
3658  int cc = sscanf(s, "%d%c", &v, gomi);
3659  if (cc != 1) {
3660  fprintf(stderr, "%s. Not integer (%s).\n", m, s);
3661  return -1;
3662  }
3663  for (int i = 0; i < n; i++) {
3664  if (ok[i] == v) {
3665  return v;
3666  }
3667  }
3668  fprintf(stderr, "%s. Not acceptable (%d).\n", m, v);
3669  return -1;
3670 }
3671 
3672 struct RUN runs[50];
3673 int nruns = 0;
3674 
3675 int
3676 main(int argc, char *argv[])
3677 {
3678  assert(sizeof(long) == sizeof(time_t));
3679 
3680  /* For localtime()/mktime(). */
3681 
3682  setenv("TZ", "UTC", 1);
3683  tzset();
3684 
3685  char *helpstring = ("%s directory-of-tables [-C -F -P]"
3686  " query [options] query [options]...\n"
3687  " query={7,9,10,13,21}\n"
3688  " options: [-p po -b sz -a -g -r -s]\n"
3689  " -p po: po={0,1,2}, push-off setting\n"
3690  " -b sz: block size for push-off (in KB)\n"
3691  " -a: scan tables in advance\n"
3692  " -g: stop push-off at the end\n"
3693  " -r: redistribute table entries\n"
3694  " -s: use small block size\n"
3695  " -C: report count in messages\n"
3696  " -F: report file read time\n"
3697  " -P: report push-off statistics\n");
3698 
3699  int nprocs, rank, thlv;
3700  /*MPI_Init(&argc, &argv);*/
3701  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
3702  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
3703  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
3704 
3705  if (!(argc >= 3)) {
3706  if (rank == 0) {
3707  fprintf(stderr, helpstring, argv[0]);
3708  }
3709  MPI_Finalize();
3710  return 0;
3711  }
3712 
3713 #if 0
3714  /* LET DUMP CORE ONLY AT RANK ZERO. */
3715  if (rank != 0) {
3716  struct rlimit core = {.rlim_cur = 0, .rlim_max = 0};
3717  int cc = setrlimit(RLIMIT_CORE, &core);
3718  if (cc != 0) {perror("setrlimit(core=0)");}
3719  /* CONTINUE RUN. */
3720  }
3721 #endif
3722 
3723  char *directory = argv[1];
3724 
3725  xxx_optind = 2;
3726  int c;
3727  while ((c = xxx_getopt(argc, argv, "hCFP")) != -1) {
3728  switch (c) {
3729  case 'h':
3730  {
3731  if (rank == 0) {
3732  fprintf(stderr, helpstring, argv[0]);
3733  fflush(0);
3734  }
3735  MPI_Finalize();
3736  return 0;
3737  }
3738 
3739  case 'C':
3740  report_count_in_messages = 1;
3741  break;
3742  case 'F':
3743  report_time_to_read = 1;
3744  break;
3745  case 'P':
3746  report_pushoff_statistics = 1;
3747  break;
3748 
3749  case '?':
3750  if (rank == 0) {
3751  fprintf(stderr, "Unknown option (%c)\n", xxx_optopt);
3752  fprintf(stderr, helpstring, argv[0]);
3753  fflush(0);
3754  }
3755  MPI_Finalize();
3756  return 0;
3757  }
3758  }
3759 
3760  if (xxx_optind >= argc) {
3761  fprintf(stderr, helpstring, argv[0]);
3762  fflush(0);
3763  MPI_Finalize();
3764  return 0;
3765  }
3766 
3767  while (xxx_optind < argc) {
3768  int qset[] = {7, 9, 10, 13, 21};
3769  int nq = (int)(sizeof(qset) / sizeof(qset[0]));
3770  int query = atoi_safe(argv[xxx_optind], qset, nq, "Bad query");
3771  if (query == -1) {
3772  if (rank == 0) {
3773  fprintf(stderr, helpstring, argv[0]);
3774  fflush(0);
3775  }
3776  MPI_Finalize();
3777  return 0;
3778  }
3779 
3780  runs[nruns].query = query;
3781  runs[nruns].pushoff = 0;
3782  runs[nruns].load_tables_in_advance = 0;
3783  runs[nruns].hang_out_communication = 0;
3784  runs[nruns].redistribute_loaded_tables = 0;
3785  runs[nruns].use_small_block_size = 0;
3786  runs[nruns].pushoff_block_size_in_kilo = 64;
3787 
3788  xxx_optind++;
3789  while ((c = xxx_getopt(argc, argv, "p:b:agrs")) != -1) {
3790  switch (c) {
3791  case 'p':
3792  {
3793  int okset[] = {0, 1, 2};
3794  int nokset = (int)(sizeof(okset) / sizeof(okset[0]));
3795  int pushoffv = atoi_safe(xxx_optarg, okset, nokset,
3796  "Bad pushoff");
3797  if (pushoffv == -1) {
3798  if (rank == 0) {
3799  fprintf(stderr, helpstring, argv[0]);
3800  fflush(0);
3801  }
3802  MPI_Finalize();
3803  return 0;
3804  }
3805  runs[nruns].pushoff = pushoffv;
3806  break;
3807  }
3808 
3809  case 'b':
3810  {
3811  int bs = atoi(xxx_optarg);
3812  runs[nruns].pushoff_block_size_in_kilo = (size_t)bs;
3813  break;
3814  }
3815 
3816  case 'a':
3817  runs[nruns].load_tables_in_advance = 1;
3818  break;
3819  case 'g':
3820  runs[nruns].hang_out_communication = 1;
3821  break;
3822  case 'r':
3823  runs[nruns].redistribute_loaded_tables = 1;
3824  break;
3825  case 's':
3826  runs[nruns].use_small_block_size = 1;
3827  break;
3828 
3829  case '?':
3830  if (rank == 0) {
3831  fprintf(stderr, "Unknown option (%c)\n", xxx_optopt);
3832  fprintf(stderr, helpstring, argv[0]);
3833  fflush(0);
3834  }
3835  MPI_Finalize();
3836  return 0;
3837  }
3838  }
3839 
3840  nruns++;
3841  if (nruns >= (int)(sizeof(runs)/sizeof(runs[0]))) {
3842  if (rank == 0) {
3843  fprintf(stderr, "run list too long\n");
3844  fprintf(stderr, helpstring, argv[0]);
3845  fflush(0);
3846  }
3847  MPI_Finalize();
3848  return 0;
3849  }
3850  }
3851 
3852  if (rank == 0) {
3853  char ss[88];
3854  int sc = 0;
3855  sc += snprintf((ss + sc), (sizeof(ss) - (size_t)sc),
3856  "Running (%d runs) with:", nruns);
3857  for (int i = 0; i < nruns; i++) {
3858  sc += snprintf((ss + sc), (sizeof(ss) - (size_t)sc),
3859  " %d", runs[i].query);
3860  }
3861  snprintf((ss + sc), (sizeof(ss) - (size_t)sc), "\n");
3862  printf("%s", ss);
3863  fflush(0);
3864  }
3865 
3866  kmr_init();
3867 
3868  /* Load table files into memory. */
3869 
3870  {
3871  enum TABLE t[] = {TAB_N, TAB_R, TAB_P, TAB_S,
3872  TAB_H, TAB_C, TAB_O, TAB_L};
3873  int nt = (int)(sizeof(t) / sizeof(t[0]));
3874  load_input_tables(nprocs, rank, directory, t, nt);
3875  }
3876 
3877  /* Initialize fast-notice once. */
3878 
3879  {
3880  kmr_init_pushoff_fast_notice_(MPI_COMM_WORLD, 1);
3881 
3882  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
3883  mr->pushoff_fast_notice = 1;
3885  kmr_free_context(mr);
3886  }
3887 
3888  for (int i = 0; i < nruns; i++) {
3889  int query = runs[i].query;
3890  _Bool pushoff = ((runs[i].pushoff == 0) ? 0 : 1);
3891  //load_tables_in_advance = runs[i].load_tables_in_advance;
3892  //redistribute_loaded_tables = runs[i].redistribute_loaded_tables;
3893 
3894  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
3895  mr->pushoff_hang_out = runs[i].hang_out_communication;
3896  mr->pushoff_fast_notice = (runs[i].pushoff == 2);
3897  mr->pushoff_stat = report_pushoff_statistics;
3898  if (runs[i].use_small_block_size) {
3899  mr->preset_block_size = (64 * 1024);
3900  } else {
3901  mr->preset_block_size = (64 * 1024 * 1024);
3902  }
3903  if (runs[i].pushoff_block_size_in_kilo != 0) {
3904  mr->pushoff_block_size = (runs[i].pushoff_block_size_in_kilo
3905  * (size_t)1024);
3906  }
3907 
3908  MPI_Barrier(MPI_COMM_WORLD);
3909  usleep(50 * 1000);
3910  if (rank == 0) {
3911  fprintf(stdout, "Run Q%d (pushoff=%d fast_notice=%d)\n",
3912  query, pushoff, mr->pushoff_fast_notice);
3913  fprintf(stderr, "Run Q%d (pushoff=%d fast_notice=%d)\n",
3914  query, pushoff, mr->pushoff_fast_notice);
3915  fflush(0);
3916  }
3917 
3918  usleep(50 * 1000);
3919  if (rank == 0) {
3920  printf("OPTION: (-p) pushoff=%d fast_notice=%d\n",
3921  pushoff, mr->pushoff_fast_notice);
3922  printf("OPTION: (-a) load_tables_in_advance=%d\n",
3923  runs[i].load_tables_in_advance);
3924  printf("OPTION: (-g) pushoff_hang_out=%d\n",
3925  mr->pushoff_hang_out);
3926  printf("OPTION: (-r) redistribute_loaded_tables=%d\n",
3927  runs[i].redistribute_loaded_tables);
3928  printf("OPTION: (-s) preset_block_size=%zd\n",
3929  mr->preset_block_size);
3930  printf("OPTION: (-b) pushoff_block_size=%zd\n",
3931  mr->pushoff_block_size);
3932  printf("OPTION: (-C) report_count_in_messages=%d\n",
3933  report_count_in_messages);
3934  printf("OPTION: (-F) report_time_to_read=%d\n",
3935  report_time_to_read);
3936  printf("OPTION: (-P) report_pushoff_statistics=%d\n",
3937  report_pushoff_statistics);
3938  fflush(0);
3939  }
3940 
3941  double t0 = 0.0;
3942  double t1 = 0.0;
3943  KMR_KVS *r = 0;
3944  enum TABLE tbl = TAB_NIL;
3945  switch (query) {
3946  case 7:
3947  if (runs[i].load_tables_in_advance) {
3948  enum TABLE t7[] = {TAB_N, TAB_S, TAB_C, TAB_O, TAB_L};
3949  /*TAB_R,TAB_P,TAB_H;*/
3950  int nt7 = (int)(sizeof(t7) / sizeof(t7[0]));
3951  scan_table_files_in_advance(mr, t7, nt7, &runs[i]);
3952  }
3953  MPI_Barrier(MPI_COMM_WORLD);
3954  t0 = wtime();
3955  r = q7(mr, &runs[i]);
3956  tbl = TAB_Q7_REVENUE;
3957  MPI_Barrier(MPI_COMM_WORLD);
3958  t1 = wtime();
3959  break;
3960 
3961  case 9:
3962  if (runs[i].load_tables_in_advance) {
3963  enum TABLE t9[] = {TAB_N, TAB_P, TAB_S, TAB_H, TAB_O, TAB_L};
3964  /*TAB_R,TAB_C;*/
3965  int nt9 = (int)(sizeof(t9) / sizeof(t9[0]));
3966  scan_table_files_in_advance(mr, t9, nt9, &runs[i]);
3967  }
3968  MPI_Barrier(MPI_COMM_WORLD);
3969  t0 = wtime();
3970  r = q9(mr, &runs[i]);
3971  tbl = TAB_Q9_AMOUNT;
3972  MPI_Barrier(MPI_COMM_WORLD);
3973  t1 = wtime();
3974  break;
3975 
3976  case 10:
3977  if (runs[i].load_tables_in_advance) {
3978  enum TABLE t10[] = {TAB_N, TAB_C, TAB_O, TAB_L};
3979  /*TAB_R,TAB_P,TAB_S,TAB_H;*/
3980  int nt10 = (int)(sizeof(t10) / sizeof(t10[0]));
3981  scan_table_files_in_advance(mr, t10, nt10, &runs[i]);
3982  }
3983  MPI_Barrier(MPI_COMM_WORLD);
3984  t0 = wtime();
3985  r = q10(mr, &runs[i]);
3986  tbl = TAB_Q10_CLNO1;
3987  MPI_Barrier(MPI_COMM_WORLD);
3988  t1 = wtime();
3989  break;
3990 
3991  case 13:
3992  if (runs[i].load_tables_in_advance) {
3993  enum TABLE t13[] = {TAB_C, TAB_O, TAB_L};
3994  /*TAB_N,TAB_R,TAB_P,TAB_S,TAB_H,TAB_L;*/
3995  int nt13 = (int)(sizeof(t13) / sizeof(t13[0]));
3996  scan_table_files_in_advance(mr, t13, nt13, &runs[i]);
3997  }
3998  MPI_Barrier(MPI_COMM_WORLD);
3999  t0 = wtime();
4000  r = q13(mr, &runs[i]);
4001  tbl = TAB_Q13_CO1;
4002  MPI_Barrier(MPI_COMM_WORLD);
4003  t1 = wtime();
4004  break;
4005 
4006  case 21:
4007  if (runs[i].load_tables_in_advance) {
4008  enum TABLE t21[] = {TAB_N, TAB_S, TAB_O, TAB_L};
4009  /*TAB_R,TAB_P,TAB_H,TAB_C;*/
4010  int nt21 = (int)(sizeof(t21) / sizeof(t21[0]));
4011  scan_table_files_in_advance(mr, t21, nt21, &runs[i]);
4012  }
4013  MPI_Barrier(MPI_COMM_WORLD);
4014  t0 = wtime();
4015  r = q21(mr, &runs[i]);
4016  tbl = TAB_Q21_NUMWAIT;
4017  MPI_Barrier(MPI_COMM_WORLD);
4018  t1 = wtime();
4019  break;
4020  }
4021 
4022  if (rank == 0) {printf("Run Q%d in %f sec\n", query, (t1 - t0));}
4023 
4024  if (r != 0) {
4025  long rcnt;
4026  kmr_get_element_count(r, &rcnt);
4027  if (mr->rank == 0) {
4028  printf("result count=%ld\n", rcnt);
4029  fflush(0);
4030  }
4031  dump_table(r, tbl);
4032 
4033  kmr_free_kvs(r);
4034  }
4035 
4036  if (pushoff && mr->pushoff_stat) {
4037  char *s = "STATISTICS on push-off kvs:\n";
4038  kmr_print_statistics_on_pushoff(mr, s);
4039  }
4040 
4041  kmr_free_context(mr);
4042  }
4043 
4044  kmr_fin_pushoff_fast_notice_();
4045  kmr_fin();
4046  MPI_Finalize();
4047 
4048  return 0;
4049 }
Key-Value Stream (abstract).
Definition: kmr.h:632
Definition: tpch.c:581
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:658
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:809
Definition: tpch.c:80
KMR_KVS * kmr_create_pushoff_kvs(KMR *mr, enum kmr_kv_field kf, enum kmr_kv_field vf, struct kmr_option opt, const char *, const int, const char *)
Makes a new key-value stream with the specified field data-types.
Definition: kmraltkvs.c:85
void kmr_reset_ntuple(struct kmr_ntuple *u, int n, int marker)
Resets an n-tuple U with N entries and a MARKER.
Definition: kmrmoreops.c:1234
int kmr_put_ntuple(KMR *mr, struct kmr_ntuple *u, const int sz, const void *v, const int vlen)
Adds an entry V with LEN in an n-tuple U whose size is limited to SIZE.
Definition: kmrmoreops.c:1252
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
struct kmr_ntuple_entry kmr_nth_ntuple(struct kmr_ntuple *u, int nth)
Returns an NTH entry of an n-tuple.
Definition: kmrmoreops.c:1197
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2094
int kmr_distribute(KMR_KVS *kvi, KMR_KVS *kvo, _Bool cyclic, struct kmr_option opt)
Distributes key-values so that each rank has approximately the same number of pairs.
Definition: kmrmoreops.c:835
Definition: tpch.c:248
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:939
KMR Context.
Definition: kmr.h:247
int kmr_concatenate_kvs(KMR_KVS *kvs[], int nkvs, KMR_KVS *kvo, struct kmr_option opt)
Concatenates a number of KVSes to one.
Definition: kmrbase.c:2754
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:679
Definition: tpch.c:243
void kmr_check_pushoff_fast_notice_(KMR *mr)
Check if fast-notice works.
Definition: kmraltkvs.c:808
kmr_kv_field
Datatypes of Keys or Values.
Definition: kmr.h:368
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:401
void kmr_init_pushoff_fast_notice_(MPI_Comm, _Bool verbose)
Initializes RDMA for fast-notice.
Definition: kmraltkvs.c:726
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
int kmr_get_element_count(KMR_KVS *kvs, long *v)
Gets the total number of key-value pairs.
Definition: kmrmoreops.c:114
#define kmr_init()
Sets up the environment.
Definition: kmr.h:794
int kmr_sort(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream globally.
Definition: kmrmoreops.c:575
int kmr_size_ntuple(struct kmr_ntuple *u)
Returns the storage size of an n-tuple.
Definition: kmrmoreops.c:1211
int kmr_separate_ntuples(KMR *mr, const struct kmr_kv_box kv[], const long n, struct kmr_ntuple **vv[2], long cnt[2], int markers[2], _Bool disallow_other_entries)
Separates the n-tuples stored in the value part of KV into the two sets by their marker values...
Definition: kmrmoreops.c:1318
int kmr_put_ntuple_long(KMR *mr, struct kmr_ntuple *u, const int sz, long v)
Adds an integer value in an n-tuple U whose size is limited to SIZE.
Definition: kmrmoreops.c:1274
int kmr_product_ntuples(KMR_KVS *kvo, struct kmr_ntuple **vv[2], long cnt[2], int newmarker, int slots[][2], int nslots, int keys[][2], int nkeys)
Makes a direct product of the two sets of n-tuples VV[0] and VV[1] with their counts in CNT[0] and CN...
Definition: kmrmoreops.c:1528
int kmr_histogram_count_by_ranks(KMR_KVS *kvs, long *frq, double *var, _Bool rankzeroonly)
Fills an integer array FRQ[i] with the count of the elements of each rank.
Definition: kmrmoreops.c:1569
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:367
KMR Interface.
int kmr_map_rank_by_rank(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps sequentially with rank by rank for debugging.
Definition: kmrbase.c:1397
Unit-Sized Storage.
Definition: kmr.h:383
Definition: tpch.c:602
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2240
Definition: tpch.c:620
int kmr_put_ntuple_entry(KMR *mr, struct kmr_ntuple *u, const int sz, struct kmr_ntuple_entry e)
Adds an n-tuple entry E in an n-tuple U whose size is limited to SIZE.
Definition: kmrmoreops.c:1284
N-Tuple.
Definition: kmr.h:778
int(* kmr_redfn_t)(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg)
Reduce-function Type.
Definition: kmr.h:747
KMR_KVS * kmr_create_kvs7(KMR *mr, enum kmr_kv_field k, enum kmr_kv_field v, struct kmr_option opt, const char *, const int, const char *)
Makes a new key-value stream with the specified field data-types.
Definition: kmrbase.c:568
N-Tuple Argument.
Definition: kmr.h:787
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
Definition: kmr.h:736
int kmr_choose_first_part(KMR_KVS *kvi, KMR_KVS *kvo, long n, struct kmr_option opt)
Chooses the first N entries from a key-value stream KVI.
Definition: kmrmoreops.c:1145
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:168