KMR
kmr4py.py
1 ## kmr4py.py
2 ## Copyright (C) 2012-2018 RIKEN R-CCS
3 
4 """Python Binding for KMR Map-Reduce Library. This provides
5 straightforward wrappers to the C routines. See more about KMR at
6 "https://github.com/RIKEN-RCCS/kmr". All key-value data is stored in C
7 structures after encoding/decoding Python objects to byte arrays in C.
8 The documentation in Python is minimum, so please refer to the
9 documentation in C. It works with Python3 (possibly 3.4 and later),
10 but not with 2.x."""
11 
12 ## NOTE: Importing MPI module from mpi4py package initializes for MPI
13 ## execution. Import MPI and then import kmr4py in application codes.
14 
15 from enum import Enum
16 import warnings
17 import ctypes
18 import pickle
19 import inspect
20 import traceback
21 import sys
22 import mpi4py
23 
24 __version__ = "20201116"
25 kmrversion = "1.10"
26 
27 kmrso = None
28 
29 """kmrso holds a libkmr.so library object."""
30 
31 kmrso_name = "libkmr.so"
32 
33 """kmrso_name holds a libkmr.so name, which can be set before calling KMR()."""
34 
35 _kmrso_version = None
36 
37 """_kmrso_version holds a version taken from a libkmr.so."""
38 
39 _pickle_protocol = pickle.DEFAULT_PROTOCOL
40 
41 warning_function = warnings.warn
42 
43 """warning_function specifies the function used to issue warnings."""
44 
45 ignore_exceptions_in_map_fn = True
46 
47 """ignore_exceptions_in_map_fn=True makes exceptions ignored."""
48 
49 print_backtrace_in_map_fn = True
50 
51 """print_backtrace_in_map_fn=True makes backtraces are printed at
52 exceptions in mapper/reducer functions."""
53 
54 force_null_terminate_in_cstring = True
55 
56 """force_null_terminate_in_cstring specifies to add a null-terminator
57 in C strings. Do not change it while some KVS'es are live."""
58 
59 _c_pointer = ctypes.POINTER(ctypes.c_char)
60 _c_kmr = _c_pointer
61 _c_kvs = _c_pointer
62 _c_kvsvec = ctypes.c_void_p
63 _c_boxvec = ctypes.c_void_p
64 _c_fnp = ctypes.c_void_p
65 _c_void_p = ctypes.c_void_p
66 _c_ubyte = ctypes.c_ubyte
67 _c_bool = ctypes.c_bool
68 _c_int = ctypes.c_int
69 _c_uint = ctypes.c_uint
70 _c_long = ctypes.c_long
71 _c_uint8 = ctypes.c_uint8
72 _c_uint32 = ctypes.c_uint32
73 _c_uint64 = ctypes.c_uint64
74 _c_double = ctypes.c_double
75 _c_size_t = ctypes.c_size_t
76 _c_string = ctypes.c_char_p
77 
78 ## _c_funcptr is ctypes._FuncPtr, but it is taken indirectly
79 ## because it is hidden.
80 
81 ##_c_funcptr = type(kmrso.kmr_init_2)
82 
83 ## _c_null_pointer_value is a null value for ctypes.
84 
85 _c_null_pointer_value = _c_pointer()
86 
87 def _c_null_pointer(p):
88  """Returns true if ctypes pointer is null."""
89 
90  return (not bool(p))
91 
92 ## _name_coding is used to pass a string to C routines and back.
93 
94 ##_name_coding = "utf-8"
95 _name_coding = "latin-1"
96 
97 def _encode(us):
98  return us.encode(_name_coding)
99 
100 def _decode(bs):
101  return bs.decode(_name_coding)
102 
103 class _Slot(Enum):
104  """Discrimination of a field indicated by key_or_value."""
105  Key = 0
106  Value = 1
107 
108 ## Loading C routines of libkmr.so.
109 
110 def _setup_mpi_constants():
111  """Imports values of some MPI constants. Calling kmr_mpi_type_size
112  and kmr_mpi_constant_value dose not need MPI be initialized."""
113 
114  def c_type_by_size(siz):
115  if (siz == ctypes.sizeof(_c_uint64)):
116  return _c_uint64
117  elif (siz == ctypes.sizeof(_c_uint32)):
118  return _c_uint32
119  else:
120  raise Exception("Bad type size unknown: %d" % siz)
121  return None
122 
123  global _c_mpi_comm, _c_mpi_info
124  global _mpi_comm_world, _mpi_comm_self, _mpi_info_null
125 
126  siz = kmrso.kmr_mpi_type_size(_encode("MPI_Comm"))
127  _c_mpi_comm = c_type_by_size(siz)
128  siz = kmrso.kmr_mpi_type_size(_encode("MPI_Info"))
129  _c_mpi_info = c_type_by_size(siz)
130  _mpi_comm_world = kmrso.kmr_mpi_constant_value(_encode("MPI_COMM_WORLD"))
131  _mpi_comm_self = kmrso.kmr_mpi_constant_value(_encode("MPI_COMM_SELF"))
132  _mpi_info_null = kmrso.kmr_mpi_constant_value(_encode("MPI_INFO_NULL"))
133  return
134 
135 def _load_kmrso(soname = "libkmr.so"):
136  """Loads libkmr.so, and initializes some constants depending on the
137  library."""
138 
139  global kmrso, _kmrso_version
140  global _c_funcptr
141  global _kv_bad, _kv_opaque, _kv_cstring, _kv_integer, _kv_float8
142  global _field_name_type_map, _field_type_name_map
143  global _MKMAPFN, _MKREDFN
144 
145  ## Load "libkmr.so".
146 
147  kmrso = ctypes.CDLL(soname)
148 
149  _kmrso_version = ctypes.c_int.in_dll(kmrso, "kmr_version").value
150  if (__version__ != str(_kmrso_version)):
151  warnings.warn(("Version unmatch with libkmr.so;"
152  + " found=" + str(_kmrso_version)
153  + " required=" + __version__),
154  RuntimeWarning)
155 
156  kmrso.kmr_mpi_type_size.argtypes = [_c_string]
157  kmrso.kmr_mpi_type_size.restype = _c_size_t
158 
159  kmrso.kmr_mpi_constant_value.argtypes = [_c_string]
160  kmrso.kmr_mpi_constant_value.restype = _c_uint64
161 
162  _setup_mpi_constants()
163 
164  _c_funcptr = type(kmrso.kmr_init_2)
165 
166  kmrso.kmr_init_2.argtypes = [ctypes.c_int]
167  kmrso.kmr_init_2.restype = ctypes.c_int
168 
169  ## Initializes KMR at this point.
170 
171  kmrso.kmr_init_2(0)
172 
173  ## Library dependent constants.
174 
175  _kv_bad = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_bad").value
176  _kv_opaque = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_opaque").value
177  _kv_cstring = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_cstring").value
178  _kv_integer = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_integer").value
179  _kv_float8 = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_float8").value
180 
181  _field_name_type_map = {
182  "opaque" : _kv_opaque, "cstring" : _kv_cstring,
183  "integer" : _kv_integer, "float8" : _kv_float8}
184 
185  _field_type_name_map = dict(
186  map(tuple, map(reversed, _field_name_type_map.items())))
187 
188  ## C-callable function factories.
189 
190  _MKMAPFN = ctypes.CFUNCTYPE(_c_int, _c_kvbox, _c_kvs, _c_kvs,
191  _c_void_p, _c_long)
192  _MKREDFN = ctypes.CFUNCTYPE(_c_int, _c_boxvec, _c_long,
193  _c_kvs, _c_kvs, _c_void_p)
194 
195  kmrso.kmr_fin.argtypes = []
196  kmrso.kmr_fin.restype = _c_int
197 
198  kmrso.kmr_initialize_mpi.argtypes = [_c_pointer, _c_pointer]
199  kmrso.kmr_initialize_mpi.restype = _c_int
200 
201  kmrso.kmr_create_context.argtypes = [_c_mpi_comm, _c_mpi_info, _c_string]
202  kmrso.kmr_create_context.restype = _c_pointer
203 
204  kmrso.kmr_create_dummy_context.argtypes = []
205  kmrso.kmr_create_dummy_context.restype = _c_pointer
206 
207  kmrso.kmr_free_context.argtypes = [_c_kmr]
208  kmrso.kmr_free_context.restype = None
209 
210  kmrso.kmr_set_option_by_strings.argtypes = [_c_kmr, _c_string, _c_string]
211  kmrso.kmr_set_option_by_strings.restype = None
212 
213  kmrso.kmr_create_kvs7.argtypes = [
214  _c_kmr, _c_int, _c_int, _c_option, _c_string, _c_int, _c_string]
215  kmrso.kmr_create_kvs7.restype = _c_kvs
216 
217  kmrso.kmr_add_kv.argtypes = [_c_kvs, _c_kvbox]
218  kmrso.kmr_add_kv.restype = None
219 
220  kmrso.kmr_add_kv_done.argtypes = [_c_kvs]
221  kmrso.kmr_add_kv_done.restype = None
222 
223  kmrso.kmr_get_element_count.argtypes = [_c_kvs]
224  kmrso.kmr_get_element_count.restype = _c_long
225 
226  kmrso.kmr_local_element_count.argtypes = [_c_kvs]
227  kmrso.kmr_local_element_count.restype = _c_long
228 
229  kmrso.kmr_map9.argtypes = [
230  _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp,
231  _c_string, _c_int, _c_string]
232  kmrso.kmr_map9.restype = None
233 
234  kmrso.kmr_map_once.argtypes = [_c_kvs, _c_void_p, _c_option,
235  _c_bool, _c_fnp]
236  kmrso.kmr_map_once.restype = None
237 
238  kmrso.kmr_map_rank_by_rank.argtypes = [
239  _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp]
240  kmrso.kmr_map_rank_by_rank.restype = None
241 
242  kmrso.kmr_map_for_some.argtypes = [
243  _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp]
244  kmrso.kmr_map_for_some.restype = None
245 
246  kmrso.kmr_map_ms.argtypes = [_c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp]
247  kmrso.kmr_map_ms.restype = _c_int
248 
249  kmrso.kmr_map_ms_commands.argtypes = [
250  _c_kvs, _c_kvs, _c_void_p, _c_option, _c_spawn_option, _c_fnp]
251  kmrso.kmr_map_ms_commands.restype = _c_int
252 
253  kmrso.kmr_map_via_spawn.argtypes = [
254  _c_kvs, _c_kvs, _c_void_p, _c_mpi_info, _c_spawn_option, _c_fnp]
255  kmrso.kmr_map_via_spawn.restype = None
256 
257  kmrso.kmr_map_processes.argtypes = [
258  _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_mpi_info,
259  _c_spawn_option, _c_fnp]
260  kmrso.kmr_map_processes.restype = None
261 
262  kmrso.kmr_reduce9.argtypes = [
263  _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp,
264  _c_string, _c_int, _c_string]
265  kmrso.kmr_reduce9.restype = None
266 
267  kmrso.kmr_reduce_as_one.argtypes = [
268  _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp]
269  kmrso.kmr_reduce_as_one.type = None
270 
271  kmrso.kmr_shuffle.argtypes = [_c_kvs, _c_kvs, _c_option]
272  kmrso.kmr_shuffle.restype = None
273 
274  kmrso.kmr_replicate.argtypes = [_c_kvs, _c_kvs, _c_option]
275  kmrso.kmr_replicate.restype = None
276 
277  kmrso.kmr_distribute.argtypes = [_c_kvs, _c_kvs, _c_bool, _c_option]
278  kmrso.kmr_distribute.restype = None
279 
280  kmrso.kmr_concatenate_kvs.argtypes = [_c_kvsvec, _c_int, _c_kvs, _c_option]
281  kmrso.kmr_concatenate_kvs.restype = None
282 
283  kmrso.kmr_reverse.argtypes = [_c_kvs, _c_kvs, _c_option]
284  kmrso.kmr_reverse.restype = None
285 
286  kmrso.kmr_sort.argtypes = [_c_kvs, _c_kvs, _c_option]
287  kmrso.kmr_sort.restype = None
288 
289  kmrso.kmr_sort_locally.argtypes = [_c_kvs, _c_kvs, _c_bool, _c_option]
290  kmrso.kmr_sort_locally.restype = None
291 
292  kmrso.kmr_reply_to_spawner.argtypes = [_c_kmr]
293  kmrso.kmr_reply_to_spawner.restype = None
294 
295  kmrso.kmr_send_kvs_to_spawner.argtypes = [_c_kmr, _c_kvs]
296  kmrso.kmr_send_kvs_to_spawner.restype = None
297 
298  kmrso.kmr_get_spawner_communicator.argtypes = [_c_void_p, _c_long]
299  kmrso.kmr_get_spawner_communicator.restype = ctypes.POINTER(_c_mpi_comm)
300 
301  kmrso.kmr_read_files_reassemble.argtypes = [
302  _c_kmr, _c_string, _c_int, _c_uint64, _c_uint64,
303  ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_uint64)]
304  kmrso.kmr_read_files_reassemble.restype = None
305 
306  kmrso.kmr_read_file_by_segments.argtypes = [
307  _c_kmr, _c_string, _c_int,
308  ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_uint64)]
309  kmrso.kmr_read_file_by_segments.restype = None
310 
311  kmrso.kmr_save_kvs.argtypes = [
312  _c_kvs, ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_size_t),
313  _c_option]
314  kmrso.kmr_save_kvs.restype = None
315 
316  kmrso.kmr_restore_kvs.argtypes = [
317  _c_kvs, _c_void_p, _c_size_t, _c_option]
318  kmrso.kmr_restore_kvs.restype = None
319 
320  kmrso.kmr_dump_kvs.argtypes = [_c_kvs, _c_int]
321  kmrso.kmr_dump_kvs.restype = None
322 
323  kmrso.kmr_get_key_type_ff.argtypes = [_c_kvs]
324  kmrso.kmr_get_key_type_ff.restype = _c_int
325 
326  kmrso.kmr_get_value_type_ff.argtypes = [_c_kvs]
327  kmrso.kmr_get_value_type_ff.restype = _c_int
328 
329  kmrso.kmr_get_nprocs.argtypes = [_c_kmr]
330  kmrso.kmr_get_nprocs.restype = _c_int
331 
332  kmrso.kmr_get_rank.argtypes = [_c_kmr]
333  kmrso.kmr_get_rank.restype = _c_int
334 
335  kmrso.kmr_mfree.argtypes = [_c_void_p, _c_size_t]
336  kmrso.kmr_mfree.restype = None
337 
338  kmrso.kmr_stringify_options.argtypes = [_c_option]
339  kmrso.kmr_stringify_options.restype = _c_string
340 
341  kmrso.kmr_stringify_file_options.argtypes = [_c_file_option]
342  kmrso.kmr_stringify_file_options.restype = _c_string
343 
344  kmrso.kmr_stringify_spawn_options.argtypes = [_c_spawn_option]
345  kmrso.kmr_stringify_spawn_options.restype = _c_string
346 
347  kmrso.kmr_map_swf.argtypes = [
348  _c_kvs, _c_kvs, _c_void_p, _c_spawn_option, _c_fnp]
349  kmrso.kmr_map_swf.restype = None
350 
351  kmrso.kmr_init_swf.argtypes = [
352  _c_kmr, ctypes.POINTER(_c_mpi_comm), _c_int]
353  kmrso.kmr_init_swf.restype = None
354 
355  kmrso.kmr_detach_swf_workers.argtypes = [_c_kmr]
356  kmrso.kmr_detach_swf_workers.restype = None
357 
358  kmrso.kmr_stop_swf_workers.argtypes = [_c_kmr]
359  kmrso.kmr_stop_swf_workers.restype = None
360 
361  kmrso.kmr_finish_swf.argtypes = [_c_kmr]
362  kmrso.kmr_finish_swf.restype = None
363 
364  kmrso.kmr_split_swf_lanes.argtypes = [
365  _c_kmr, ctypes.POINTER(_c_mpi_comm), _c_int,
366  ctypes.POINTER(_c_string), _c_bool]
367  kmrso.kmr_split_swf_lanes.restype = None
368 
369  kmrso.kmr_dump_swf_lanes.argtypes = [_c_kmr]
370  kmrso.kmr_dump_swf_lanes.restype = None
371 
372  kmrso.kmr_set_swf_verbosity.argtypes = [_c_kmr]
373  kmrso.kmr_set_swf_verbosity.restype = None
374 
375  #receive_kvs_from_spawned_fn = kmrso.kmr_receive_kvs_from_spawned_fn
376 
377  return None
378 
379 def _string_of_options(o):
380  """Returns a print string of options for _c_option,
381  _c_file_option, and _c_spawn_option."""
382 
383  prefix = o.__class__.__name__
384  attrs = o.__class__._fields_
385  ss = []
386  for (f, _, _) in attrs:
387  if ((f not in ["gap16", "gap32"]) and getattr(o, f) == 1):
388  ss.append(f + "=1")
389  return (prefix + "(" + (",".join(ss)) + ")")
390 
391 class _c_option(ctypes.Structure):
392  """kmr_option."""
393 
394  _fields_ = [
395  ("nothreading", _c_uint8, 1),
396  ("inspect", _c_uint8, 1),
397  ("keep_open", _c_uint8, 1),
398  ("key_as_rank", _c_uint8, 1),
399  ("rank_zero", _c_uint8, 1),
400  ("collapse", _c_uint8, 1),
401  ("take_ckpt", _c_uint8, 1),
402  ("gap16", _c_uint, 16),
403  ("gap32", _c_uint, 32)]
404 
405  def __init__(self, opts=None, enabledlist=None):
406  super(_c_option, self).__init__()
407  if opts is None: opts = {}
408  if enabledlist is None: enabledlist = []
409  ## Sets the options as dictionary passed.
410  for o, v in iter(opts.items()):
411  if (o in ["key", "value", "output"]):
412  ## "key", "value", and "output" are Python binding only.
413  pass
414  elif (enabledlist != [] and (o not in enabledlist)):
415  raise Exception("Bad option: %s" % o)
416  elif (o == "nothreading"):
417  self.nothreading = v
418  elif (o == "inspect"):
419  self.inspect = v
420  elif (o == "keep_open"):
421  self.keep_open = v
422  elif (o == "key_as_rank"):
423  self.key_as_rank = v
424  elif (o == "rank_zero"):
425  self.rank_zero = v
426  elif (o == "collapse"):
427  self.collapse = v
428  elif (o == "take_ckpt"):
429  self.take_ckpt = v
430  else:
431  raise Exception("Bad option: %s" % o)
432  return
433 
434  def __str__(self):
435  return _string_of_options(self)
436 
437 class _c_file_option(ctypes.Structure):
438  """kmr_file_option."""
439 
440  _fields_ = [
441  ("each_rank", _c_uint8, 1),
442  ("subdirectories", _c_uint8, 1),
443  ("list_file", _c_uint8, 1),
444  ("shuffle_names", _c_uint8, 1),
445  ("gap16", _c_uint, 16),
446  ("gap32", _c_uint, 32)]
447 
448  def __init__(self, opts=None, enabledlist=None):
449  super(_c_file_option, self).__init__()
450  if opts is None: opts = {}
451  if enabledlist is None: enabledlist = []
452  ## Sets the options as dictionary passed.
453  for o, v in iter(opts.items()):
454  if (o == "key" or o == "output"):
455  ## "key" and "output" are Python binding only.
456  pass
457  elif (enabledlist != [] and (o not in enabledlist)):
458  raise Exception("Bad option: %s" % o)
459  elif (o == "each_rank"):
460  self.each_rank = v
461  elif (o == "subdirectories"):
462  self.subdirectories = v
463  elif (o == "list_file"):
464  self.list_file = v
465  elif (o == "shuffle_names"):
466  self.shuffle_names = v
467  else:
468  raise Exception("Bad option: %s" % o)
469  return
470 
471  def __str__(self):
472  return _string_of_options(self)
473 
474 class _c_spawn_option(ctypes.Structure):
475  """kmr_spawn_option."""
476 
477  _fields_ = [
478  ("separator_space", _c_uint8, 1),
479  ("reply_each", _c_uint8, 1),
480  ("reply_root", _c_uint8, 1),
481  ("no_set_infos", _c_uint8, 1),
482  ("take_ckpt", _c_uint8, 1),
483  ("gap16", _c_uint, 16),
484  ("gap32", _c_uint, 32)]
485 
486  def __init__(self, opts=None, enabledlist=None):
487  super(_c_spawn_option, self).__init__()
488  if opts is None: opts = {}
489  if enabledlist is None: enabledlist = []
490  ## Sets the options as dictionary passed.
491  for o, v in iter(opts.items()):
492  if (o == "key" or o == "output"):
493  ## "key" and "output" are Python binding only.
494  pass
495  elif (enabledlist != [] and (o not in enabledlist)):
496  raise Exception("Bad option: %s" % o)
497  elif (o == "separator_space"):
499  elif (o == "reply_each"):
500  self.reply_each = v
501  elif (o == "reply_root"):
502  self.reply_root = v
503  elif (o == "no_set_infos"):
504  self.no_set_infos = v
505  elif (o == "take_ckpt"):
506  self.take_ckpt = v
507  else:
508  raise Exception("Bad option: %s" % o)
509  return
510 
511  def __str__(self):
512  return _string_of_options(self)
513 
514 _spawn_option_list = [_k for (_k, _, _) in _c_spawn_option._fields_]
515 _file_option_list = [_k for (_k, _, _) in _c_file_option._fields_]
516 
517 class _c_unitsized(ctypes.Union):
518  """kmr_unit_sized {const char *p; long i; double d;}."""
519 
520  _fields_ = [
521  ("p", _c_string),
522  ("i", _c_long),
523  ("d", _c_double)]
524 
525 class _c_kvbox(ctypes.Structure):
526  """kmr_kv_box {int klen, vlen; kmr_unit_sized k, v;}."""
527 
528  _fields_ = [
529  ("klen", _c_int),
530  ("vlen", _c_int),
531  ("k", _c_unitsized),
532  ("v", _c_unitsized)]
533 
534  ## NOTE: Defining __init__ with some arguments makes c-callback
535  ## fail to call initializers.
536 
537  def __init__(self):
538  super(_c_kvbox, self).__init__()
539 
540  def set(self, klen, key, vlen, val):
541  self.klen = klen
542  self.vlen = vlen
543  self.k = key
544  self.v = val
545  return self
546 
547 def _wrap_mapfn(pyfn):
548  """Returns a closure which calls a given Python map-function on
549  the unmarshalled contents in KVS."""
550 
551  if (pyfn is None):
552  return 0
553  elif (isinstance(pyfn, _c_funcptr)):
554  return pyfn
555  else:
556  def applyfn(cbox, ckvi, ckvo, carg, cindex):
557  kvi = KVS(ckvi)
558  kvo = KVS(ckvo)
559  key = kvi._decode_content(cbox.klen, cbox.k, _Slot.Key)
560  val = kvi._decode_content(cbox.vlen, cbox.v, _Slot.Value)
561  try:
562  pyfn((key, val), kvi, kvo, cindex)
563  except:
564  warning_function(("Exception in Python callbacks: %s"
565  % str(sys.exc_info()[1])),
566  RuntimeWarning)
567  if (print_backtrace_in_map_fn): traceback.print_exc()
568  return (0 if ignore_exceptions_in_map_fn else -1)
569  return _MKMAPFN(applyfn)
570 
571 def _wrap_redfn(pyfn):
572  """Returns a closure which calls a given Python reduce-function on
573  the unmarshalled contents in KVS."""
574 
575  if (pyfn is None):
576  return 0
577  elif (isinstance(pyfn, _c_funcptr)):
578  return pyfn
579  else:
580  def applyfn(cboxvec, n, ckvi, ckvo, carg):
581  kvi = KVS(ckvi)
582  kvo = KVS(ckvo)
583  kvvec = []
584  for i in range(0, n):
585  pos = (cboxvec + ctypes.sizeof(_c_kvbox) * i)
586  cbox = _c_kvbox.from_address(pos)
587  key = kvi._decode_content(cbox.klen, cbox.k, _Slot.Key)
588  val = kvi._decode_content(cbox.vlen, cbox.v, _Slot.Value)
589  kvvec.append((key, val))
590  try:
591  pyfn(kvvec, kvi, kvo)
592  except:
593  warning_function(("Exception in Python callbacks: %s"
594  % str(sys.exc_info()[1])),
595  RuntimeWarning)
596  if (print_backtrace_in_map_fn): traceback.print_exc()
597  return (0 if ignore_exceptions_in_map_fn else -1)
598  return _MKREDFN(applyfn)
599 
600 def _get_options(opts, with_keyty_valty):
601  """Returns a triple of the options: a key field type, a value
602  field type, and a flag of needs of output generation."""
603 
604  if ((not with_keyty_valty) and (("key" in opts) or ("value" in opts))):
605  raise Exception("Bad option: key= or value= not allowed")
606  keyty = opts.get("key", "opaque")
607  valty = opts.get("value", "opaque")
608  mkkvo = opts.get("output", True)
609  return (keyty, valty, mkkvo)
610 
611 def _make_frame_info(frame):
612  sp = frame
613  co = sp.f_code
614  return (_encode(co.co_filename), sp.f_lineno, _encode(co.co_name))
615 
616 def _filter_spawn_options(opts):
617  """Returns a pair of dictionaries, the 1st holds options to spawn,
618  and the 2nd holds the other options."""
619 
620  sopts = dict()
621  mopts = dict()
622  for o, v in iter(opts.items()):
623  if (o in _spawn_option_list):
624  sopts[o] = v
625  else:
626  mopts[o] = v
627  return (sopts, mopts)
628 
629 class KMR():
630  """KMR context."""
631 
632  ## attributes: self._ckmr, self.nprocs, self.rank, self.emptykvs,
633  ## self._dismissed.
634 
635  def __init__(self, comm, info=None):
636  """Makes a KMR context with a given MPI communicator (comm),
637  which is used in succeeding operations. Info specifies its
638  options by MPI_Info. Arguments of comm/info are passed as a
639  long integer (assuming either an integer (int) or a pointer in
640  C). It also accepts an communicator instance of mpi4py.MPI.Comm,
641  a string "dummy" or "world" as a comm argument."""
642 
643  if (kmrso == None):
644  _load_kmrso(kmrso_name)
645 
646  if (isinstance(info, (int))):
647  warninfo = False
648  cinfo = info
649  else:
650  warninfo = (info != None)
651  cinfo = _mpi_info_null
652  if (isinstance(comm, (int))):
653  warncomm = False
654  ccomm = comm
655  elif (isinstance(comm, mpi4py.MPI.Comm)):
656  warncomm = False
657  comm_ptr = mpi4py.MPI._addressof(comm)
658  if (mpi4py.MPI._sizeof(mpi4py.MPI.Comm) == ctypes.sizeof(_c_uint64)):
659  MPI_Comm = _c_uint64
660  else:
661  MPI_Comm = _c_void_p
662  ccomm = MPI_Comm.from_address(comm_ptr)
663  elif (comm == "dummy"):
664  warncomm = False
665  ccomm = _mpi_comm_self
666  elif (comm == "world"):
667  warncomm = False
668  ccomm = _mpi_comm_world
669  else:
670  warncomm = True
671  ccomm = _mpi_comm_world
672 
673  self._ckmr = kmrso.kmr_create_context(ccomm, cinfo, _encode(""))
674 
675  """self._ckmr holds the C part of a KMR context."""
676 
677  if (_c_null_pointer(self._ckmr)):
678  raise Exception("kmr_create_context: failed")
679 
680  self._dismissed = False
681 
682  """self._dismissed=True disables freeing KVS'es (by memory
683  management) which remain unconsumed after dismissing a KMR
684  context. It is because freeing them causes referencing
685  dangling pointers in C."""
686 
687  self.emptykvs = KVS(self).free()
688 
689  """self.emptykvs holds an empty KVS needed by map_once,
690  map_on_rank_zero, read_files_reassemble, and
691  read_file_by_segments."""
692 
693  self.nprocs = kmrso.kmr_get_nprocs(self._ckmr)
694 
695  """self.nprocs holds an nprocs of MPI."""
696 
697  self.rank = kmrso.kmr_get_rank(self._ckmr)
698 
699  """self.rank holds a rank of MPI."""
700 
701  if (warncomm and (self.rank == 0)):
702  warning_function("MPI comm ignored in KMR() constructor.", RuntimeWarning)
703  if (warninfo and (self.rank == 0)):
704  warning_function("MPI info ignored in KMR() constructor.", RuntimeWarning)
705  return
706 
707  def __del__(self):
708  self.dismiss()
709  return
710 
711  def free(self):
712  """Dismisses KMR (an alias of dismiss())."""
713 
714  self.dismiss()
715 
716  def dismiss(self):
717  """Dismisses KMR."""
718 
719  if (not _c_null_pointer(self._ckmr)):
720  kmrso.kmr_free_context(self._ckmr)
721  self._ckmr = _c_null_pointer_value
722  self._dismissed = True
723  self.emptykvs = None
724  self.nprocs = -1
725  self.rank = -1
726  return
727 
728  def create_kvs(self, **opts):
729  """Makes a new KVS (an alias of make_kvs())."""
730 
731  self.make_kvs(**opts)
732 
733  def make_kvs(self, **opts):
734  """Makes a new KVS."""
735 
736  (keyty, valty, _) = _get_options(opts, True)
737  return KVS(self, keyty, valty)
738 
739  def reply_to_spawner(self):
740  """Sends a reply message from a spawned process."""
741 
742  kmrso.kmr_reply_to_spawner(self._ckmr)
743  return
744 
745  def get_spawner_communicator(self, index):
746  """Obtains a parent communicator of a spawned process. C version
747  returns a reference, but this returns an entity"""
748 
749  commref = kmrso.kmr_get_spawner_communicator(self._ckmr, index)
750  return commref.contents.value
751 
752  def send_kvs_to_spawner(self, kvs):
753  """Sends the KVS from a spawned process to the spawner."""
754 
755  kmrso.kmr_send_kvs_to_spawner(self._ckmr, kvs._ckvs)
756  return
757 
758  def _init_swf(self, splitcomms, masterank):
759  """."""
760  kmrso.kmr_init_swf(self._ckmr, splitcomms, masterank)
761  return
762 
763  def _detach_swf_workers(self):
764  """."""
765  kmrso.kmr_detach_swf_workers(self._ckmr)
766  return
767 
768  def _stop_swf_workers(self):
769  """."""
770  kmrso.kmr_stop_swf_workers(self._ckmr)
771  return
772 
773  def _finish_swf(self):
774  """."""
775  kmrso.kmr_finish_swf(self._ckmr)
776  return
777 
778  def _split_swf_lanes(self, masterrank, description, dump):
779  """."""
780  comms = (_c_mpi_comm * 4)()
781  desc = (ctypes.c_char_p * (len(description) + 1))()
782  desc[:-1] = description
783  desc[len(description)] = None
784  kmrso.kmr_split_swf_lanes(self._ckmr, comms, masterrank, desc, dump)
785  return comms
786 
787  def _dump_swf_lanes(self):
788  """."""
789  kmrso.kmr_dump_swf_lanes(self._ckmr)
790  return
791 
792  def _set_swf_verbosity(self, level):
793  """."""
794  kmrso.kmr_set_swf_verbosity(self._ckmr, level)
795  return
796 
797  def set_option(self, k, v):
798  """Sets KMR option, taking both arguments by strings."""
799 
800  kmrso.kmr_set_option_by_strings(self._ckmr, _encode(k), _encode(v))
801  return
802 
803 _enabled_options_of_map = [
804  "nothreading", "inspect", "keep_open", "take_ckpt"]
805 
806 _enabled_options_of_map_once = [
807  "nothreading", "keep_open", "take_ckpt"]
808 
809 _enabled_options_of_map_ms = [
810  "nothreading", "keep_open"]
811 
812 _enabled_options_of_reduce = [
813  "nothreading", "inspect", "take_ckpt"]
814 
815 _enabled_options_of_reduce_as_one = [
816  "inspect", "take_ckpt"]
817 
818 _enabled_options_of_shuffle = [
819  "inspect", "key_as_rank", "take_ckpt"]
820 
821 _enabled_options_of_replicate = [
822  "inspect", "rank_zero", "take_ckpt"]
823 
824 _enabled_options_of_distribute = [
825  "nothreading", "inspect", "keep_open"]
826 
827 _enabled_options_of_sort_locally = [
828  "nothreading", "inspect", "key_as_rank"]
829 
830 _enabled_options_of_sort = [
831  "inspect"]
832 
833 class KVS():
834  """KVS. Note that there are dummy KVS'es which are temporarily
835  created to hold the C structure of the KVS passed to
836  mapper/reducer functions. A dummy KVS has None in its "mr"
837  attribute."""
838 
839  # attributes: self._ckvs, self.mr, self._frameinfo.
840 
841  def __init__(self, kmr_or_ckvs, keyty="opaque", valty="opaque"):
842  """Makes a KVS for a given KMR. Do not call the KVS constructor
843  directly, but use KMR.make_kvs() instead. A KVS is created
844  with the datatypes stored in the key and the value, specified
845  by the keywords "key=" and "value=". The datatype name is a
846  string, one of "opaque", "cstring", "integer", and "float8".
847  Most mappers and reducers (precisely, the methods that accepts
848  a function argument) take keyword arguments for the types,
849  defaulting with key="opaque" and value="opaque". The
850  datatypes affects the sorting order. """
851 
852  self.mr = None
853 
854  """mr attribute holds a KMR context object. Note that mr is
855  not accessible from mapping/reducing functions."""
856 
857  self._ckvs = None
858 
859  """_ckvs attribute holds a kvs in C."""
860 
861  self._frameinfo = None
862 
863  """_frameinfo protects caller line information from garbage
864  collected."""
865 
866  if isinstance(kmr_or_ckvs, KMR):
867  kf = _field_name_type_map[keyty]
868  vf = _field_name_type_map[valty]
869  top = inspect.currentframe().f_back
870  self.mr = kmr_or_ckvs
871  (f, l, n) = _make_frame_info(top)
872  self._ckvs = kmrso.kmr_create_kvs7(
873  self.mr._ckmr, kf, vf, _c_option(), f, l, n)
874  self._frameinfo = (f, l, n)
875  elif isinstance(kmr_or_ckvs, _c_pointer):
876  ## Return a dummy KVS.
877  self.mr = None
878  self._ckvs = kmr_or_ckvs
879  else:
880  raise Exception("Bad call to kvs constructor")
881 
882  def __del__(self):
883  if ((not self._is_dummy()) and (not _c_null_pointer(self._ckvs))):
884  self.free()
885  return
886 
887  def free(self):
888  """Finishes the C part of a KVS."""
889 
890  if (self._is_dummy()):
891  raise Exception("Bad call to free_kvs on dummy KVS")
892  elif (_c_null_pointer(self._ckvs)):
893  raise Exception("Bad call to free_kvs on freed KVS")
894  elif ((not self.mr is None) and self.mr._dismissed):
895  ## Do not free when KMR object is dismissed.
896  pass
897  else:
898  kmrso.kmr_free_kvs(self._ckvs)
899  self._ckvs = _c_null_pointer_value
900  return self
901 
902  def _is_dummy(self):
903  return (self.mr is None)
904 
905  def _consume(self):
906  """Releases a now dangling C pointer."""
907 
908  self._ckvs = _c_null_pointer_value
909 
910  def _encode_content(self, o, key_or_value):
911  """Marshalls an object with regard to the field type. It
912  retuns a 3-tuple, with length, value-union, and the 3nd to
913  keep a reference to a buffer."""
914 
915  kvty = self.get_field_type(key_or_value)
916  u = _c_unitsized()
917  if (kvty == "opaque"):
918  data = pickle.dumps(o, _pickle_protocol)
919  u.p = data
920  return (len(data), u, data)
921  elif (kvty == "cstring"):
922  if (not isinstance(o, str)):
923  raise Exception("Not 8-bit string for cstring: %s" % o)
924  ## (Add null for C string).
925  os = ((o + "\0") if force_null_terminate_in_cstring else o)
926  data = _encode(os)
927  u.p = data
928  return (len(data), u, data)
929  elif (kvty == "integer"):
930  u.i = o
931  return (ctypes.sizeof(_c_long), u, None)
932  elif (kvty == "float8"):
933  u.d = o
934  return (ctypes.sizeof(_c_double), u, None)
935  else:
936  raise Exception("Bad field type: %s" % kvty)
937 
938  def _decode_content(self, siz, u, key_or_value):
939  """Unmarshalls an object with regard to the field type. It
940  returns integer 0 when the length is 0 (it is for a dummy
941  key-value used in kmr_map_once() etc)."""
942 
943  if (siz == 0):
944  return 0
945  else:
946  kvty = self.get_field_type(key_or_value)
947  if (kvty == "opaque"):
948  data = ctypes.string_at(u.i, siz)
949  o = pickle.loads(data)
950  return o
951  elif (kvty == "cstring"):
952  ## (Delete null added for C string).
953  siz1 = ((siz - 1) if force_null_terminate_in_cstring else siz)
954  data = ctypes.string_at(u.i, siz1)
955  os = _decode(data)
956  return os
957  elif (kvty == "integer"):
958  return u.i
959  elif (kvty == "float8"):
960  return u.d
961  else:
962  raise Exception("Bad field type: %s" % kvty)
963 
964  def get_field_type(self, key_or_value):
965  """Get a field type of a KVS."""
966 
967  if (_c_null_pointer(self._ckvs)):
968  raise Exception("Bad KVS (null C-object)")
969  if (key_or_value == _Slot.Key):
970  kvty = kmrso.kmr_get_key_type_ff(self._ckvs)
971  elif (key_or_value == _Slot.Value):
972  kvty = kmrso.kmr_get_value_type_ff(self._ckvs)
973  else:
974  raise Exception("Bad field %s" % key_or_value.name)
975  if (kvty == _kv_bad):
976  raise Exception("Bad field type value %d in KVS" % kvty)
977  else:
978  return _field_type_name_map[kvty]
979 
980  def add(self, key, val):
981  """Adds a key-value pair."""
982 
983  self.add_kv(key, val)
984  return
985 
986  def add_kv(self, key, val):
987  """Adds a key-value pair."""
988 
989  ## Note it keeps the created string until kmr_add_kv(),
990  ## because kvbox does not hold the references.
991  (klen, k, ks) = self._encode_content(key, _Slot.Key)
992  (vlen, v, vs) = self._encode_content(val, _Slot.Value)
993  cbox = _c_kvbox().set(klen, k, vlen, v)
994  kmrso.kmr_add_kv(self._ckvs, cbox)
995  return
996 
997  def add_kv_done(self):
998  """Finishes adding key-value pairs."""
999 
1000  kmrso.kmr_add_kv_done(self._ckvs)
1001  return
1002 
1004  """Gets the total number of key-value pairs."""
1005 
1006  c = _c_long(0)
1007  kmrso.kmr_get_element_count(self._ckvs, ctypes.byref(c))
1008  return c.value
1009 
1011  """Gets the number of key-value pairs locally."""
1012 
1013  c = _c_long(0)
1014  kmrso.kmr_local_element_count(self._ckvs, ctypes.byref(c))
1015  return c.value
1016 
1017  def map(self, fn, **mopts):
1018  """Maps simply."""
1019 
1020  (keyty, valty, mkkvo) = _get_options(mopts, True)
1021  cmopts = _c_option(mopts, _enabled_options_of_map)
1022  cfn = _wrap_mapfn(fn)
1023  ckvi = self._ckvs
1024  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1025  ckvo = (kvo._ckvs if (kvo is not None) else None)
1026  (f, l, n) = _make_frame_info(inspect.currentframe().f_back)
1027  kmrso.kmr_map9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1028  if (cmopts.inspect == 0): self._consume()
1029  return kvo
1030 
1031  def map_once(self, rank_zero_only, fn, **mopts):
1032  """Maps once with a dummy key-value pair."""
1033 
1034  ## It needs dummy input; Never inspects.
1035  (keyty, valty, mkkvo) = _get_options(mopts, True)
1036  cmopts = _c_option(mopts, _enabled_options_of_map_once)
1037  cfn = _wrap_mapfn(fn)
1038  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1039  ckvo = (kvo._ckvs if (kvo is not None) else None)
1040  kmrso.kmr_map_once(ckvo, 0, cmopts, rank_zero_only, cfn)
1041  return kvo
1042 
1043  def map_on_rank_zero(self, fn, **mopts):
1044  """Maps on rank0 only."""
1045 
1046  ## It needs dummy input.
1047  return self.map_once(True, fn, *mopts)
1048 
1049  def map_rank_by_rank(self, fn, **mopts):
1050  """Maps sequentially with rank by rank for debugging."""
1051 
1052  (keyty, valty, mkkvo) = _get_options(mopts, True)
1053  cmopts = _c_option(mopts, _enabled_options_of_map)
1054  cfn = _wrap_mapfn(fn)
1055  ckvi = self._ckvs
1056  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1057  ckvo = (kvo._ckvs if (kvo is not None) else None)
1058  kmrso.kmr_map_rank_by_rank(ckvi, ckvo, 0, cmopts, cfn)
1059  if (cmopts.inspect == 0): self._consume()
1060  return kvo
1061 
1062  def map_for_some(self, fn, **mopts):
1063  """Maps until some key-value are added."""
1064 
1065  (keyty, valty, mkkvo) = _get_options(mopts, True)
1066  cmopts = _c_option(mopts, _enabled_options_of_map)
1067  ckvi = self._ckvs
1068  cfn = _wrap_mapfn(fn)
1069  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1070  ckvo = (kvo._ckvs if (kvo is not None) else None)
1071  kmrso.kmr_map_for_some(ckvi, ckvo, 0, cmopts, cfn)
1072  if (cmopts.inspect == 0): self._consume()
1073  return kvo
1074 
1075  def map_ms(self, fn, **mopts):
1076  """Maps in master-worker mode."""
1077 
1078  ## Its call is repeated until True (assuming MPI_SUCCESS==0).
1079  (keyty, valty, mkkvo) = _get_options(mopts, True)
1080  cmopts = _c_option(mopts, _enabled_options_of_map_ms)
1081  cfn = _wrap_mapfn(fn)
1082  ckvi = self._ckvs
1083  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1084  ckvo = (kvo._ckvs if (kvo is not None) else None)
1085  rr = 1
1086  while (rr != 0):
1087  rr = kmrso.kmr_map_ms(ckvi, ckvo, 0, cmopts, cfn)
1088  self._consume()
1089  return kvo
1090 
1091  def map_ms_commands(self, fn, **xopts):
1092  """Maps in master-worker mode, and runs serial commands."""
1093 
1094  (sopts, mopts) = _filter_spawn_options(xopts)
1095  (keyty, valty, mkkvo) = _get_options(mopts, True)
1096  cmopts = _c_option(mopts, _enabled_options_of_map_ms)
1097  csopts = _c_spawn_option(sopts)
1098  cfn = _wrap_mapfn(fn)
1099  ckvi = self._ckvs
1100  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1101  ckvo = (kvo._ckvs if (kvo is not None) else None)
1102  rr = 1
1103  while (rr != 0):
1104  rr = kmrso.kmr_map_ms_commands(ckvi, ckvo, 0, cmopts, csopts, cfn)
1105  self._consume()
1106  return kvo
1107 
1108  def map_via_spawn(self, fn, **xopts):
1109  """Maps on processes started by MPI_Comm_spawn()."""
1110 
1111  (sopts, mopts) = _filter_spawn_options(xopts)
1112  (keyty, valty, mkkvo) = _get_options(mopts, True)
1113  cmopts = _c_option(mopts, _enabled_options_of_map)
1114  csopts = _c_spawn_option(sopts)
1115  cfn = _wrap_mapfn(fn)
1116  ckvi = self._ckvs
1117  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1118  ckvo = (kvo._ckvs if (kvo is not None) else None)
1119  kmrso.kmr_map_via_spawn(ckvi, ckvo, 0, _mpi_info_null, csopts, cfn)
1120  self._consume()
1121  return kvo
1122 
1123  def map_processes(self, nonmpi, fn, **sopts):
1124  """Maps on processes started by MPI_Comm_spawn()."""
1125 
1126  (keyty, valty, mkkvo) = _get_options(sopts, True)
1127  csopts = _c_spawn_option(sopts)
1128  cfn = _wrap_mapfn(fn)
1129  ckvi = self._ckvs
1130  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1131  ckvo = (kvo._ckvs if (kvo is not None) else None)
1132  kmrso.kmr_map_processes(nonmpi, ckvi, ckvo, 0, _mpi_info_null,
1133  csopts, cfn)
1134  self._consume()
1135  return kvo
1136 
1137  def map_parallel_processes(self, fn, **sopts):
1138  """Maps on processes started by MPI_Comm_spawn()."""
1139 
1140  return self.map_processes(False, fn, **sopts)
1141 
1142  def map_serial_processes(self, fn, **sopts):
1143  """Maps on processes started by MPI_Comm_spawn()."""
1144 
1145  return self.map_processes(True, fn, **sopts)
1146 
1147  def reduce(self, fn, **mopts):
1148  """Reduces key-value pairs."""
1149 
1150  (keyty, valty, mkkvo) = _get_options(mopts, True)
1151  cmopts = _c_option(mopts, _enabled_options_of_reduce)
1152  cfn = _wrap_redfn(fn)
1153  ckvi = self._ckvs
1154  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1155  ckvo = (kvo._ckvs if (kvo is not None) else None)
1156  (f, l, n) = _make_frame_info(inspect.currentframe().f_back)
1157  kmrso.kmr_reduce9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1158  if (cmopts.inspect == 0): self._consume()
1159  return kvo
1160 
1161  def reduce_as_one(self, fn, **mopts):
1162  """ Reduces once as if all pairs had the same key."""
1163 
1164  (keyty, valty, mkkvo) = _get_options(mopts, True)
1165  cmopts = _c_option(mopts, _enabled_options_of_reduce_as_one)
1166  cfn = _wrap_redfn(fn)
1167  ckvi = self._ckvs
1168  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1169  ckvo = (kvo._ckvs if (kvo is not None) else None)
1170  kmrso.kmr_reduce_as_one(ckvi, ckvo, 0, cmopts, cfn)
1171  if (cmopts.inspect == 0): self._consume()
1172  return kvo
1173 
1174  def reduce_for_some(self, fn, **mopts):
1175  """Reduces until some key-value are added."""
1176 
1177  (keyty, valty, mkkvo) = _get_options(mopts, True)
1178  cmopts = _c_option(mopts, _enabled_options_of_reduce)
1179  cfn = _wrap_redfn(fn)
1180  ckvi = self._ckvs
1181  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1182  ckvo = (kvo._ckvs if (kvo is not None) else None)
1183  ## (NOTE: It passes a frame of reduce_for_some.)
1184  (f, l, n) = _make_frame_info(inspect.currentframe())
1185  kmrso.kmr_reduce9(1, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1186  if (cmopts.inspect == 0): self._consume()
1187  return kvo
1188 
1189  def reverse(self, **mopts):
1190  """Makes a new pair by swapping the key and the value."""
1191 
1192  keyty = self.get_field_type(_Slot.Key)
1193  valty = self.get_field_type(_Slot.Value)
1194  (_, _, mkkvo) = _get_options(mopts, False)
1195  cmopts = _c_option(mopts, _enabled_options_of_map)
1196  assert (mkkvo is True)
1197  ckvi = self._ckvs
1198  kvo = (KVS(self.mr, valty, keyty) if mkkvo else None)
1199  ckvo = (kvo._ckvs if (kvo is not None) else None)
1200  kmrso.kmr_reverse(ckvi, ckvo, cmopts)
1201  if (cmopts.inspect == 0): self._consume()
1202  return kvo
1203 
1204  def shuffle(self, **mopts):
1205  """Shuffles key-value pairs."""
1206 
1207  keyty = self.get_field_type(_Slot.Key)
1208  valty = self.get_field_type(_Slot.Value)
1209  (_, _, mkkvo) = _get_options(mopts, False)
1210  cmopts = _c_option(mopts, _enabled_options_of_shuffle)
1211  ckvi = self._ckvs
1212  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1213  ckvo = (kvo._ckvs if (kvo is not None) else None)
1214  kmrso.kmr_shuffle(ckvi, ckvo, cmopts)
1215  if (cmopts.inspect == 0): self._consume()
1216  return kvo
1217 
1218  def replicate(self, **mopts):
1219  """Replicates key-value pairs to be visible on all ranks."""
1220 
1221  keyty = self.get_field_type(_Slot.Key)
1222  valty = self.get_field_type(_Slot.Value)
1223  (_, _, mkkvo) = _get_options(mopts, False)
1224  cmopts = _c_option(mopts, _enabled_options_of_replicate)
1225  ckvi = self._ckvs
1226  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1227  ckvo = (kvo._ckvs if (kvo is not None) else None)
1228  kmrso.kmr_replicate(ckvi, ckvo, cmopts)
1229  if (cmopts.inspect == 0): self._consume()
1230  return kvo
1231 
1232  def distribute(self, cyclic, **mopts):
1233  """Distributes pairs approximately evenly to ranks."""
1234 
1235  keyty = self.get_field_type(_Slot.Key)
1236  valty = self.get_field_type(_Slot.Value)
1237  (_, _, mkkvo) = _get_options(mopts, False)
1238  cmopts = _c_option(mopts, _enabled_options_of_distribute)
1239  ckvi = self._ckvs
1240  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1241  ckvo = (kvo._ckvs if (kvo is not None) else None)
1242  kmrso.kmr_distribute(ckvi, ckvo, cyclic, cmopts)
1243  if (cmopts.inspect == 0): self._consume()
1244  return kvo
1245 
1246  def sort_locally(self, shuffling, **mopts):
1247  """Reorders key-value pairs in a single rank."""
1248 
1249  keyty = self.get_field_type(_Slot.Key)
1250  valty = self.get_field_type(_Slot.Value)
1251  (_, _, mkkvo) = _get_options(mopts, False)
1252  cmopts = _c_option(mopts, _enabled_options_of_sort_locally)
1253  ckvi = self._ckvs
1254  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1255  ckvo = (kvo._ckvs if (kvo is not None) else None)
1256  kmrso.kmr_sort_locally(ckvi, ckvo, shuffling, cmopts)
1257  if (cmopts.inspect == 0): self._consume()
1258  return kvo
1259 
1260  def sort(self, **mopts):
1261  """Sorts a KVS globally."""
1262 
1263  keyty = self.get_field_type(_Slot.Key)
1264  valty = self.get_field_type(_Slot.Value)
1265  (_, _, mkkvo) = _get_options(mopts, False)
1266  cmopts = _c_option(mopts, _enabled_options_of_sort)
1267  ckvi = self._ckvs
1268  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1269  ckvo = (kvo._ckvs if (kvo is not None) else None)
1270  kmrso.kmr_sort(ckvi, ckvo, cmopts)
1271  if (cmopts.inspect == 0): self._consume()
1272  return kvo
1273 
1274  def concatenate(self, *morekvs):
1275  """Concatenates a number of KVS'es to one."""
1276 
1277  keyty = self.get_field_type(_Slot.Key)
1278  valty = self.get_field_type(_Slot.Value)
1279  siz = (len(morekvs) + 1)
1280  ckvsvec = (_c_kvs * siz)()
1281  ckvsvec[0] = self._ckvs
1282  for i in range(0, len(morekvs)):
1283  ckvsvec[i + 1] = morekvs[i]._ckvs
1284  cn = _c_int(siz)
1285  kvo = KVS(self.mr, keyty, valty)
1286  ckvo = kvo._ckvs
1287  kmrso.kmr_concatenate_kvs(ckvsvec, cn, ckvo, _c_option())
1288  for i in morekvs:
1289  i._consume()
1290  self._consume()
1291  return kvo
1292 
1293  def read_files_reassemble(self, filename, color, offset, bytes_):
1294  """Reassembles files reading by ranks."""
1295 
1296  buf = _c_void_p()
1297  siz = _c_uint64(0)
1298  kmrso.kmr_read_files_reassemble(
1299  self.mr._ckmr, _encode(filename), color, offset, bytes_,
1300  ctypes.byref(buf), ctypes.byref(siz))
1301  addr = buf.value
1302  ptr = (_c_ubyte * siz.value).from_address(addr)
1303  data = bytearray(ptr)
1304  kmrso.kmr_mfree(addr, siz.value)
1305  return data
1306 
1307  def read_file_by_segments(self, filename, color):
1308  """Reads one file by segments and reassembles."""
1309 
1310  buf = _c_void_p()
1311  siz = _c_uint64(0)
1312  kmrso.kmr_read_file_by_segments(
1313  self.mr._ckmr, _encode(filename), color,
1314  ctypes.byref(buf), ctypes.byref(siz))
1315  addr = buf.value
1316  ptr = (_c_ubyte * siz.value).from_address(addr)
1317  data = bytearray(ptr)
1318  kmrso.kmr_mfree(addr, siz.value)
1319  return data
1320 
1321  def save(self):
1322  """Packs locally the contents of a KVS to a byte array."""
1323 
1324  buf = _c_void_p(0)
1325  siz = _c_size_t(0)
1326  kmrso.kmr_save_kvs(self._ckvs, ctypes.byref(buf), ctypes.byref(siz),
1327  _c_option())
1328  addr = buf.value
1329  ptr = (_c_ubyte * siz.value).from_address(addr)
1330  data = bytearray(ptr)
1331  kmrso.kmr_mfree(addr, siz.value)
1332  return data
1333 
1334  def restore(self, data):
1335  """Unpacks locally the contents of a KVS from a byte array."""
1336 
1337  kvo = KVS(self.mr, "opaque", "opaque")
1338  siz = len(data)
1339  addr = (_c_ubyte * siz).from_buffer(data)
1340  kmrso.kmr_restore_kvs(kvo._ckvs, addr, siz, _c_option())
1341  return kvo
1342 
1343  def _map_swf(self, fn, **xopts):
1344  """."""
1345  (sopts, mopts) = _filter_spawn_options(xopts)
1346  (keyty, valty, mkkvo) = _get_options(mopts, True)
1347  cmopts = _c_option(mopts, _enabled_options_of_map)
1348  csopts = _c_spawn_option(sopts)
1349  cfn = _wrap_mapfn(fn)
1350  ckvi = self._ckvs
1351  kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1352  ckvo = (kvo._ckvs if (kvo is not None) else None)
1353  kmrso.kmr_map_swf(ckvi, ckvo, 0, csopts, cfn)
1354  self._consume()
1355  return kvo
1356 
1357 def fin():
1358  """Finishes using KMR4PY."""
1359 
1360  kmrso.kmr_fin()
1361  return
1362 
1363 def listify(kvs):
1364  """Returns an array of LOCAL contents."""
1365 
1366  a = kvs.local_element_count() * [None]
1367  def f (kv, kvi, kvo, i, *_data):
1368  a[i] = kv
1369  return 0
1370  kvo = kvs.map(f, output=False, inspect=True)
1371  assert (kvo is None)
1372  return a
1373 
1374 def _stringify_options(o):
1375  return _decode(kmrso.kmr_stringify_options(o))
1376 
1377 def _stringify_file_options(o):
1378  return _decode(kmrso.kmr_stringify_file_options(o))
1379 
1380 def _stringify_spawn_options(o):
1381  return _decode(kmrso.kmr_stringify_spawn_options(o))
1382 
1383 def _check_ctypes_values():
1384  """Checks if ctypes values are properly used."""
1385 
1386  if (not _c_null_pointer(_c_null_pointer_value)):
1387  raise Exception("BAD: C null pointer has a wrong value.")
1388 
1389 def _check_passing_options():
1390  """Checks if the options are passed properly from Python to C."""
1391 
1392  for (option, stringify) in [
1393  (_c_option, _stringify_options),
1394  (_c_file_option, _stringify_file_options),
1395  (_c_spawn_option, _stringify_spawn_options)]:
1396  for (o, _, _) in option._fields_:
1397  if ((o == "gap16") or (o == "gap32")):
1398  pass
1399  else:
1400  copts = option({o : 1})
1401  s = stringify(copts)
1402  if (o != s):
1403  raise Exception("BAD: %s != %s" % (str(o), str(s)))
1404 
1405 ## Document generator "pdoc" requires loading kmr4py.py without
1406 ## initializing MPI.
1407 ##if (sys.modules["mpi4py"].__name__ != "dummy_mpi4py"):
1408 ## _load_kmrso(kmrso_name)
1409 
1410 # Copyright (C) 2012-2018 RIKEN R-CCS
1411 # This library is distributed WITHOUT ANY WARRANTY. This library can be
1412 # redistributed and/or modified under the terms of the BSD 2-Clause License.
def map_for_some(self, fn, mopts)
Definition: kmr4py.py:1062
nothreading
Sets the options as dictionary passed.
Definition: kmr4py.py:417
def reduce_for_some(self, fn, mopts)
Definition: kmr4py.py:1174
def _encode_content(self, o, key_or_value)
Definition: kmr4py.py:910
def get_spawner_communicator(self, index)
Definition: kmr4py.py:745
def add_kv(self, key, val)
Definition: kmr4py.py:986
def map_processes(self, nonmpi, fn, sopts)
Definition: kmr4py.py:1123
def free(self)
Definition: kmr4py.py:887
separator_space
Sets the options as dictionary passed.
Definition: kmr4py.py:498
def add_kv_done(self)
Definition: kmr4py.py:997
def map_via_spawn(self, fn, xopts)
Definition: kmr4py.py:1108
def reduce_as_one(self, fn, mopts)
Definition: kmr4py.py:1161
def shuffle(self, mopts)
Definition: kmr4py.py:1204
def dismiss(self)
Definition: kmr4py.py:716
mr
Return a dummy KVS.
Definition: kmr4py.py:852
def concatenate(self, morekvs)
Definition: kmr4py.py:1274
def create_kvs(self, opts)
Definition: kmr4py.py:728
def read_files_reassemble(self, filename, color, offset, bytes_)
Definition: kmr4py.py:1293
def map_rank_by_rank(self, fn, mopts)
Definition: kmr4py.py:1049
def map_on_rank_zero(self, fn, mopts)
Definition: kmr4py.py:1043
def make_kvs(self, opts)
Definition: kmr4py.py:733
def map_ms(self, fn, mopts)
Definition: kmr4py.py:1075
def get_element_count(self)
Definition: kmr4py.py:1003
def save(self)
Definition: kmr4py.py:1321
each_rank
Sets the options as dictionary passed.
Definition: kmr4py.py:460
def reduce(self, fn, mopts)
Definition: kmr4py.py:1147
def map_ms_commands(self, fn, xopts)
Definition: kmr4py.py:1091
def local_element_count(self)
Definition: kmr4py.py:1010
def sort_locally(self, shuffling, mopts)
Definition: kmr4py.py:1246
def reverse(self, mopts)
Definition: kmr4py.py:1189
def map_serial_processes(self, fn, sopts)
Definition: kmr4py.py:1142
def distribute(self, cyclic, mopts)
Definition: kmr4py.py:1232
def __init__(self)
NOTE: Defining init with some arguments makes c-callback fail to call initializers.
Definition: kmr4py.py:537
def replicate(self, mopts)
Definition: kmr4py.py:1218
def sort(self, mopts)
Definition: kmr4py.py:1260
def set_option(self, k, v)
Definition: kmr4py.py:797
def _consume(self)
Definition: kmr4py.py:905
def free(self)
Definition: kmr4py.py:711
def _is_dummy(self)
Definition: kmr4py.py:902
def get_field_type(self, key_or_value)
Definition: kmr4py.py:964
def map_once(self, rank_zero_only, fn, mopts)
Definition: kmr4py.py:1031
def map(self, fn, mopts)
Definition: kmr4py.py:1017
def __init__(self, comm, info=None)
attributes: self._ckmr, self.nprocs, self.rank, self.emptykvs, self._dismissed.
Definition: kmr4py.py:635
def add(self, key, val)
Definition: kmr4py.py:980
def restore(self, data)
Definition: kmr4py.py:1334
def map_parallel_processes(self, fn, sopts)
Definition: kmr4py.py:1137
def reply_to_spawner(self)
Definition: kmr4py.py:739
def __init__(self, kmr_or_ckvs, keyty="opaque", valty="opaque")
Definition: kmr4py.py:841
def read_file_by_segments(self, filename, color)
Definition: kmr4py.py:1307
_ckvs
Do not free when KMR object is dismissed.
Definition: kmr4py.py:857
def send_kvs_to_spawner(self, kvs)
Definition: kmr4py.py:752