4 """Python Binding for KMR Map-Reduce Library. This provides 5 straightforward wrappers to the C routines. See more about KMR at 6 "https://github.com/RIKEN-RCCS/kmr". All key-value data is stored in C 7 structures after encoding/decoding Python objects to byte arrays in C. 8 The documentation in Python is minimum, so please refer to the 9 documentation in C. It works with Python3 (possibly 3.4 and later), 24 __version__ =
"20201116" 29 """kmrso holds a libkmr.so library object.""" 31 kmrso_name =
"libkmr.so" 33 """kmrso_name holds a libkmr.so name, which can be set before calling KMR().""" 37 """_kmrso_version holds a version taken from a libkmr.so.""" 39 _pickle_protocol = pickle.DEFAULT_PROTOCOL
41 warning_function = warnings.warn
43 """warning_function specifies the function used to issue warnings.""" 45 ignore_exceptions_in_map_fn =
True 47 """ignore_exceptions_in_map_fn=True makes exceptions ignored.""" 49 print_backtrace_in_map_fn =
True 51 """print_backtrace_in_map_fn=True makes backtraces are printed at 52 exceptions in mapper/reducer functions.""" 54 force_null_terminate_in_cstring =
True 56 """force_null_terminate_in_cstring specifies to add a null-terminator 57 in C strings. Do not change it while some KVS'es are live.""" 59 _c_pointer = ctypes.POINTER(ctypes.c_char)
62 _c_kvsvec = ctypes.c_void_p
63 _c_boxvec = ctypes.c_void_p
64 _c_fnp = ctypes.c_void_p
65 _c_void_p = ctypes.c_void_p
66 _c_ubyte = ctypes.c_ubyte
67 _c_bool = ctypes.c_bool
69 _c_uint = ctypes.c_uint
70 _c_long = ctypes.c_long
71 _c_uint8 = ctypes.c_uint8
72 _c_uint32 = ctypes.c_uint32
73 _c_uint64 = ctypes.c_uint64
74 _c_double = ctypes.c_double
75 _c_size_t = ctypes.c_size_t
76 _c_string = ctypes.c_char_p
85 _c_null_pointer_value = _c_pointer()
87 def _c_null_pointer(p):
88 """Returns true if ctypes pointer is null.""" 95 _name_coding =
"latin-1" 98 return us.encode(_name_coding)
101 return bs.decode(_name_coding)
104 """Discrimination of a field indicated by key_or_value.""" 110 def _setup_mpi_constants():
111 """Imports values of some MPI constants. Calling kmr_mpi_type_size 112 and kmr_mpi_constant_value dose not need MPI be initialized.""" 114 def c_type_by_size(siz):
115 if (siz == ctypes.sizeof(_c_uint64)):
117 elif (siz == ctypes.sizeof(_c_uint32)):
120 raise Exception(
"Bad type size unknown: %d" % siz)
123 global _c_mpi_comm, _c_mpi_info
124 global _mpi_comm_world, _mpi_comm_self, _mpi_info_null
126 siz = kmrso.kmr_mpi_type_size(_encode(
"MPI_Comm"))
127 _c_mpi_comm = c_type_by_size(siz)
128 siz = kmrso.kmr_mpi_type_size(_encode(
"MPI_Info"))
129 _c_mpi_info = c_type_by_size(siz)
130 _mpi_comm_world = kmrso.kmr_mpi_constant_value(_encode(
"MPI_COMM_WORLD"))
131 _mpi_comm_self = kmrso.kmr_mpi_constant_value(_encode(
"MPI_COMM_SELF"))
132 _mpi_info_null = kmrso.kmr_mpi_constant_value(_encode(
"MPI_INFO_NULL"))
135 def _load_kmrso(soname = "libkmr.so"):
136 """Loads libkmr.so, and initializes some constants depending on the 139 global kmrso, _kmrso_version
141 global _kv_bad, _kv_opaque, _kv_cstring, _kv_integer, _kv_float8
142 global _field_name_type_map, _field_type_name_map
143 global _MKMAPFN, _MKREDFN
147 kmrso = ctypes.CDLL(soname)
149 _kmrso_version = ctypes.c_int.in_dll(kmrso,
"kmr_version").value
150 if (__version__ != str(_kmrso_version)):
151 warnings.warn((
"Version unmatch with libkmr.so;" 152 +
" found=" + str(_kmrso_version)
153 +
" required=" + __version__),
156 kmrso.kmr_mpi_type_size.argtypes = [_c_string]
157 kmrso.kmr_mpi_type_size.restype = _c_size_t
159 kmrso.kmr_mpi_constant_value.argtypes = [_c_string]
160 kmrso.kmr_mpi_constant_value.restype = _c_uint64
162 _setup_mpi_constants()
164 _c_funcptr = type(kmrso.kmr_init_2)
166 kmrso.kmr_init_2.argtypes = [ctypes.c_int]
167 kmrso.kmr_init_2.restype = ctypes.c_int
175 _kv_bad = ctypes.c_int.in_dll(kmrso,
"kmr_kv_field_bad").value
176 _kv_opaque = ctypes.c_int.in_dll(kmrso,
"kmr_kv_field_opaque").value
177 _kv_cstring = ctypes.c_int.in_dll(kmrso,
"kmr_kv_field_cstring").value
178 _kv_integer = ctypes.c_int.in_dll(kmrso,
"kmr_kv_field_integer").value
179 _kv_float8 = ctypes.c_int.in_dll(kmrso,
"kmr_kv_field_float8").value
181 _field_name_type_map = {
182 "opaque" : _kv_opaque,
"cstring" : _kv_cstring,
183 "integer" : _kv_integer,
"float8" : _kv_float8}
185 _field_type_name_map = dict(
186 map(tuple, map(reversed, _field_name_type_map.items())))
190 _MKMAPFN = ctypes.CFUNCTYPE(_c_int, _c_kvbox, _c_kvs, _c_kvs,
192 _MKREDFN = ctypes.CFUNCTYPE(_c_int, _c_boxvec, _c_long,
193 _c_kvs, _c_kvs, _c_void_p)
195 kmrso.kmr_fin.argtypes = []
196 kmrso.kmr_fin.restype = _c_int
198 kmrso.kmr_initialize_mpi.argtypes = [_c_pointer, _c_pointer]
199 kmrso.kmr_initialize_mpi.restype = _c_int
201 kmrso.kmr_create_context.argtypes = [_c_mpi_comm, _c_mpi_info, _c_string]
202 kmrso.kmr_create_context.restype = _c_pointer
204 kmrso.kmr_create_dummy_context.argtypes = []
205 kmrso.kmr_create_dummy_context.restype = _c_pointer
207 kmrso.kmr_free_context.argtypes = [_c_kmr]
208 kmrso.kmr_free_context.restype =
None 210 kmrso.kmr_set_option_by_strings.argtypes = [_c_kmr, _c_string, _c_string]
211 kmrso.kmr_set_option_by_strings.restype =
None 213 kmrso.kmr_create_kvs7.argtypes = [
214 _c_kmr, _c_int, _c_int, _c_option, _c_string, _c_int, _c_string]
215 kmrso.kmr_create_kvs7.restype = _c_kvs
217 kmrso.kmr_add_kv.argtypes = [_c_kvs, _c_kvbox]
218 kmrso.kmr_add_kv.restype =
None 220 kmrso.kmr_add_kv_done.argtypes = [_c_kvs]
221 kmrso.kmr_add_kv_done.restype =
None 223 kmrso.kmr_get_element_count.argtypes = [_c_kvs]
224 kmrso.kmr_get_element_count.restype = _c_long
226 kmrso.kmr_local_element_count.argtypes = [_c_kvs]
227 kmrso.kmr_local_element_count.restype = _c_long
229 kmrso.kmr_map9.argtypes = [
230 _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp,
231 _c_string, _c_int, _c_string]
232 kmrso.kmr_map9.restype =
None 234 kmrso.kmr_map_once.argtypes = [_c_kvs, _c_void_p, _c_option,
236 kmrso.kmr_map_once.restype =
None 238 kmrso.kmr_map_rank_by_rank.argtypes = [
239 _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp]
240 kmrso.kmr_map_rank_by_rank.restype =
None 242 kmrso.kmr_map_for_some.argtypes = [
243 _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp]
244 kmrso.kmr_map_for_some.restype =
None 246 kmrso.kmr_map_ms.argtypes = [_c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp]
247 kmrso.kmr_map_ms.restype = _c_int
249 kmrso.kmr_map_ms_commands.argtypes = [
250 _c_kvs, _c_kvs, _c_void_p, _c_option, _c_spawn_option, _c_fnp]
251 kmrso.kmr_map_ms_commands.restype = _c_int
253 kmrso.kmr_map_via_spawn.argtypes = [
254 _c_kvs, _c_kvs, _c_void_p, _c_mpi_info, _c_spawn_option, _c_fnp]
255 kmrso.kmr_map_via_spawn.restype =
None 257 kmrso.kmr_map_processes.argtypes = [
258 _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_mpi_info,
259 _c_spawn_option, _c_fnp]
260 kmrso.kmr_map_processes.restype =
None 262 kmrso.kmr_reduce9.argtypes = [
263 _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp,
264 _c_string, _c_int, _c_string]
265 kmrso.kmr_reduce9.restype =
None 267 kmrso.kmr_reduce_as_one.argtypes = [
268 _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp]
269 kmrso.kmr_reduce_as_one.type =
None 271 kmrso.kmr_shuffle.argtypes = [_c_kvs, _c_kvs, _c_option]
272 kmrso.kmr_shuffle.restype =
None 274 kmrso.kmr_replicate.argtypes = [_c_kvs, _c_kvs, _c_option]
275 kmrso.kmr_replicate.restype =
None 277 kmrso.kmr_distribute.argtypes = [_c_kvs, _c_kvs, _c_bool, _c_option]
278 kmrso.kmr_distribute.restype =
None 280 kmrso.kmr_concatenate_kvs.argtypes = [_c_kvsvec, _c_int, _c_kvs, _c_option]
281 kmrso.kmr_concatenate_kvs.restype =
None 283 kmrso.kmr_reverse.argtypes = [_c_kvs, _c_kvs, _c_option]
284 kmrso.kmr_reverse.restype =
None 286 kmrso.kmr_sort.argtypes = [_c_kvs, _c_kvs, _c_option]
287 kmrso.kmr_sort.restype =
None 289 kmrso.kmr_sort_locally.argtypes = [_c_kvs, _c_kvs, _c_bool, _c_option]
290 kmrso.kmr_sort_locally.restype =
None 292 kmrso.kmr_reply_to_spawner.argtypes = [_c_kmr]
293 kmrso.kmr_reply_to_spawner.restype =
None 295 kmrso.kmr_send_kvs_to_spawner.argtypes = [_c_kmr, _c_kvs]
296 kmrso.kmr_send_kvs_to_spawner.restype =
None 298 kmrso.kmr_get_spawner_communicator.argtypes = [_c_void_p, _c_long]
299 kmrso.kmr_get_spawner_communicator.restype = ctypes.POINTER(_c_mpi_comm)
301 kmrso.kmr_read_files_reassemble.argtypes = [
302 _c_kmr, _c_string, _c_int, _c_uint64, _c_uint64,
303 ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_uint64)]
304 kmrso.kmr_read_files_reassemble.restype =
None 306 kmrso.kmr_read_file_by_segments.argtypes = [
307 _c_kmr, _c_string, _c_int,
308 ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_uint64)]
309 kmrso.kmr_read_file_by_segments.restype =
None 311 kmrso.kmr_save_kvs.argtypes = [
312 _c_kvs, ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_size_t),
314 kmrso.kmr_save_kvs.restype =
None 316 kmrso.kmr_restore_kvs.argtypes = [
317 _c_kvs, _c_void_p, _c_size_t, _c_option]
318 kmrso.kmr_restore_kvs.restype =
None 320 kmrso.kmr_dump_kvs.argtypes = [_c_kvs, _c_int]
321 kmrso.kmr_dump_kvs.restype =
None 323 kmrso.kmr_get_key_type_ff.argtypes = [_c_kvs]
324 kmrso.kmr_get_key_type_ff.restype = _c_int
326 kmrso.kmr_get_value_type_ff.argtypes = [_c_kvs]
327 kmrso.kmr_get_value_type_ff.restype = _c_int
329 kmrso.kmr_get_nprocs.argtypes = [_c_kmr]
330 kmrso.kmr_get_nprocs.restype = _c_int
332 kmrso.kmr_get_rank.argtypes = [_c_kmr]
333 kmrso.kmr_get_rank.restype = _c_int
335 kmrso.kmr_mfree.argtypes = [_c_void_p, _c_size_t]
336 kmrso.kmr_mfree.restype =
None 338 kmrso.kmr_stringify_options.argtypes = [_c_option]
339 kmrso.kmr_stringify_options.restype = _c_string
341 kmrso.kmr_stringify_file_options.argtypes = [_c_file_option]
342 kmrso.kmr_stringify_file_options.restype = _c_string
344 kmrso.kmr_stringify_spawn_options.argtypes = [_c_spawn_option]
345 kmrso.kmr_stringify_spawn_options.restype = _c_string
347 kmrso.kmr_map_swf.argtypes = [
348 _c_kvs, _c_kvs, _c_void_p, _c_spawn_option, _c_fnp]
349 kmrso.kmr_map_swf.restype =
None 351 kmrso.kmr_init_swf.argtypes = [
352 _c_kmr, ctypes.POINTER(_c_mpi_comm), _c_int]
353 kmrso.kmr_init_swf.restype =
None 355 kmrso.kmr_detach_swf_workers.argtypes = [_c_kmr]
356 kmrso.kmr_detach_swf_workers.restype =
None 358 kmrso.kmr_stop_swf_workers.argtypes = [_c_kmr]
359 kmrso.kmr_stop_swf_workers.restype =
None 361 kmrso.kmr_finish_swf.argtypes = [_c_kmr]
362 kmrso.kmr_finish_swf.restype =
None 364 kmrso.kmr_split_swf_lanes.argtypes = [
365 _c_kmr, ctypes.POINTER(_c_mpi_comm), _c_int,
366 ctypes.POINTER(_c_string), _c_bool]
367 kmrso.kmr_split_swf_lanes.restype =
None 369 kmrso.kmr_dump_swf_lanes.argtypes = [_c_kmr]
370 kmrso.kmr_dump_swf_lanes.restype =
None 372 kmrso.kmr_set_swf_verbosity.argtypes = [_c_kmr]
373 kmrso.kmr_set_swf_verbosity.restype =
None 379 def _string_of_options(o):
380 """Returns a print string of options for _c_option, 381 _c_file_option, and _c_spawn_option.""" 383 prefix = o.__class__.__name__
384 attrs = o.__class__._fields_
386 for (f, _, _)
in attrs:
387 if ((f
not in [
"gap16",
"gap32"])
and getattr(o, f) == 1):
389 return (prefix +
"(" + (
",".join(ss)) +
")")
395 (
"nothreading", _c_uint8, 1),
396 (
"inspect", _c_uint8, 1),
397 (
"keep_open", _c_uint8, 1),
398 (
"key_as_rank", _c_uint8, 1),
399 (
"rank_zero", _c_uint8, 1),
400 (
"collapse", _c_uint8, 1),
401 (
"take_ckpt", _c_uint8, 1),
402 (
"gap16", _c_uint, 16),
403 (
"gap32", _c_uint, 32)]
405 def __init__(self, opts=None, enabledlist=None):
406 super(_c_option, self).__init__()
407 if opts
is None: opts = {}
408 if enabledlist
is None: enabledlist = []
410 for o, v
in iter(opts.items()):
411 if (o
in [
"key",
"value",
"output"]):
414 elif (enabledlist != []
and (o
not in enabledlist)):
415 raise Exception(
"Bad option: %s" % o)
416 elif (o ==
"nothreading"):
418 elif (o ==
"inspect"):
420 elif (o ==
"keep_open"):
422 elif (o ==
"key_as_rank"):
424 elif (o ==
"rank_zero"):
426 elif (o ==
"collapse"):
428 elif (o ==
"take_ckpt"):
431 raise Exception(
"Bad option: %s" % o)
435 return _string_of_options(self)
438 """kmr_file_option.""" 441 (
"each_rank", _c_uint8, 1),
442 (
"subdirectories", _c_uint8, 1),
443 (
"list_file", _c_uint8, 1),
444 (
"shuffle_names", _c_uint8, 1),
445 (
"gap16", _c_uint, 16),
446 (
"gap32", _c_uint, 32)]
448 def __init__(self, opts=None, enabledlist=None):
449 super(_c_file_option, self).__init__()
450 if opts
is None: opts = {}
451 if enabledlist
is None: enabledlist = []
453 for o, v
in iter(opts.items()):
454 if (o ==
"key" or o ==
"output"):
457 elif (enabledlist != []
and (o
not in enabledlist)):
458 raise Exception(
"Bad option: %s" % o)
459 elif (o ==
"each_rank"):
461 elif (o ==
"subdirectories"):
463 elif (o ==
"list_file"):
465 elif (o ==
"shuffle_names"):
468 raise Exception(
"Bad option: %s" % o)
472 return _string_of_options(self)
475 """kmr_spawn_option.""" 478 (
"separator_space", _c_uint8, 1),
479 (
"reply_each", _c_uint8, 1),
480 (
"reply_root", _c_uint8, 1),
481 (
"no_set_infos", _c_uint8, 1),
482 (
"take_ckpt", _c_uint8, 1),
483 (
"gap16", _c_uint, 16),
484 (
"gap32", _c_uint, 32)]
486 def __init__(self, opts=None, enabledlist=None):
487 super(_c_spawn_option, self).__init__()
488 if opts
is None: opts = {}
489 if enabledlist
is None: enabledlist = []
491 for o, v
in iter(opts.items()):
492 if (o ==
"key" or o ==
"output"):
495 elif (enabledlist != []
and (o
not in enabledlist)):
496 raise Exception(
"Bad option: %s" % o)
497 elif (o ==
"separator_space"):
499 elif (o ==
"reply_each"):
501 elif (o ==
"reply_root"):
503 elif (o ==
"no_set_infos"):
505 elif (o ==
"take_ckpt"):
508 raise Exception(
"Bad option: %s" % o)
512 return _string_of_options(self)
514 _spawn_option_list = [_k
for (_k, _, _)
in _c_spawn_option._fields_]
515 _file_option_list = [_k
for (_k, _, _)
in _c_file_option._fields_]
518 """kmr_unit_sized {const char *p; long i; double d;}.""" 526 """kmr_kv_box {int klen, vlen; kmr_unit_sized k, v;}.""" 540 def set(self, klen, key, vlen, val):
547 def _wrap_mapfn(pyfn):
548 """Returns a closure which calls a given Python map-function on 549 the unmarshalled contents in KVS.""" 553 elif (isinstance(pyfn, _c_funcptr)):
556 def applyfn(cbox, ckvi, ckvo, carg, cindex):
559 key = kvi._decode_content(cbox.klen, cbox.k, _Slot.Key)
560 val = kvi._decode_content(cbox.vlen, cbox.v, _Slot.Value)
562 pyfn((key, val), kvi, kvo, cindex)
564 warning_function((
"Exception in Python callbacks: %s" 565 % str(sys.exc_info()[1])),
567 if (print_backtrace_in_map_fn): traceback.print_exc()
568 return (0
if ignore_exceptions_in_map_fn
else -1)
569 return _MKMAPFN(applyfn)
571 def _wrap_redfn(pyfn):
572 """Returns a closure which calls a given Python reduce-function on 573 the unmarshalled contents in KVS.""" 577 elif (isinstance(pyfn, _c_funcptr)):
580 def applyfn(cboxvec, n, ckvi, ckvo, carg):
584 for i
in range(0, n):
585 pos = (cboxvec + ctypes.sizeof(_c_kvbox) * i)
586 cbox = _c_kvbox.from_address(pos)
587 key = kvi._decode_content(cbox.klen, cbox.k, _Slot.Key)
588 val = kvi._decode_content(cbox.vlen, cbox.v, _Slot.Value)
589 kvvec.append((key, val))
591 pyfn(kvvec, kvi, kvo)
593 warning_function((
"Exception in Python callbacks: %s" 594 % str(sys.exc_info()[1])),
596 if (print_backtrace_in_map_fn): traceback.print_exc()
597 return (0
if ignore_exceptions_in_map_fn
else -1)
598 return _MKREDFN(applyfn)
600 def _get_options(opts, with_keyty_valty):
601 """Returns a triple of the options: a key field type, a value 602 field type, and a flag of needs of output generation.""" 604 if ((
not with_keyty_valty)
and ((
"key" in opts)
or (
"value" in opts))):
605 raise Exception(
"Bad option: key= or value= not allowed")
606 keyty = opts.get(
"key",
"opaque")
607 valty = opts.get(
"value",
"opaque")
608 mkkvo = opts.get(
"output",
True)
609 return (keyty, valty, mkkvo)
611 def _make_frame_info(frame):
614 return (_encode(co.co_filename), sp.f_lineno, _encode(co.co_name))
616 def _filter_spawn_options(opts):
617 """Returns a pair of dictionaries, the 1st holds options to spawn, 618 and the 2nd holds the other options.""" 622 for o, v
in iter(opts.items()):
623 if (o
in _spawn_option_list):
627 return (sopts, mopts)
636 """Makes a KMR context with a given MPI communicator (comm), 637 which is used in succeeding operations. Info specifies its 638 options by MPI_Info. Arguments of comm/info are passed as a 639 long integer (assuming either an integer (int) or a pointer in 640 C). It also accepts an communicator instance of mpi4py.MPI.Comm, 641 a string "dummy" or "world" as a comm argument.""" 644 _load_kmrso(kmrso_name)
646 if (isinstance(info, (int))):
650 warninfo = (info !=
None)
651 cinfo = _mpi_info_null
652 if (isinstance(comm, (int))):
655 elif (isinstance(comm, mpi4py.MPI.Comm)):
657 comm_ptr = mpi4py.MPI._addressof(comm)
658 if (mpi4py.MPI._sizeof(mpi4py.MPI.Comm) == ctypes.sizeof(_c_uint64)):
662 ccomm = MPI_Comm.from_address(comm_ptr)
663 elif (comm ==
"dummy"):
665 ccomm = _mpi_comm_self
666 elif (comm ==
"world"):
668 ccomm = _mpi_comm_world
671 ccomm = _mpi_comm_world
673 self.
_ckmr = kmrso.kmr_create_context(ccomm, cinfo, _encode(
""))
675 """self._ckmr holds the C part of a KMR context.""" 677 if (_c_null_pointer(self.
_ckmr)):
678 raise Exception(
"kmr_create_context: failed")
682 """self._dismissed=True disables freeing KVS'es (by memory 683 management) which remain unconsumed after dismissing a KMR 684 context. It is because freeing them causes referencing 685 dangling pointers in C.""" 689 """self.emptykvs holds an empty KVS needed by map_once, 690 map_on_rank_zero, read_files_reassemble, and 691 read_file_by_segments.""" 695 """self.nprocs holds an nprocs of MPI.""" 697 self.
rank = kmrso.kmr_get_rank(self.
_ckmr)
699 """self.rank holds a rank of MPI.""" 701 if (warncomm
and (self.
rank == 0)):
702 warning_function(
"MPI comm ignored in KMR() constructor.", RuntimeWarning)
703 if (warninfo
and (self.
rank == 0)):
704 warning_function(
"MPI info ignored in KMR() constructor.", RuntimeWarning)
712 """Dismisses KMR (an alias of dismiss()).""" 719 if (
not _c_null_pointer(self.
_ckmr)):
720 kmrso.kmr_free_context(self.
_ckmr)
721 self.
_ckmr = _c_null_pointer_value
729 """Makes a new KVS (an alias of make_kvs()).""" 734 """Makes a new KVS.""" 736 (keyty, valty, _) = _get_options(opts,
True)
737 return KVS(self, keyty, valty)
740 """Sends a reply message from a spawned process.""" 742 kmrso.kmr_reply_to_spawner(self.
_ckmr)
746 """Obtains a parent communicator of a spawned process. C version 747 returns a reference, but this returns an entity""" 749 commref = kmrso.kmr_get_spawner_communicator(self.
_ckmr, index)
750 return commref.contents.value
753 """Sends the KVS from a spawned process to the spawner.""" 755 kmrso.kmr_send_kvs_to_spawner(self.
_ckmr, kvs._ckvs)
758 def _init_swf(self, splitcomms, masterank):
760 kmrso.kmr_init_swf(self.
_ckmr, splitcomms, masterank)
763 def _detach_swf_workers(self):
765 kmrso.kmr_detach_swf_workers(self.
_ckmr)
768 def _stop_swf_workers(self):
770 kmrso.kmr_stop_swf_workers(self.
_ckmr)
773 def _finish_swf(self):
775 kmrso.kmr_finish_swf(self.
_ckmr)
778 def _split_swf_lanes(self, masterrank, description, dump):
780 comms = (_c_mpi_comm * 4)()
781 desc = (ctypes.c_char_p * (len(description) + 1))()
782 desc[:-1] = description
783 desc[len(description)] =
None 784 kmrso.kmr_split_swf_lanes(self.
_ckmr, comms, masterrank, desc, dump)
787 def _dump_swf_lanes(self):
789 kmrso.kmr_dump_swf_lanes(self.
_ckmr)
792 def _set_swf_verbosity(self, level):
794 kmrso.kmr_set_swf_verbosity(self.
_ckmr, level)
798 """Sets KMR option, taking both arguments by strings.""" 800 kmrso.kmr_set_option_by_strings(self.
_ckmr, _encode(k), _encode(v))
803 _enabled_options_of_map = [
804 "nothreading",
"inspect",
"keep_open",
"take_ckpt"]
806 _enabled_options_of_map_once = [
807 "nothreading",
"keep_open",
"take_ckpt"]
809 _enabled_options_of_map_ms = [
810 "nothreading",
"keep_open"]
812 _enabled_options_of_reduce = [
813 "nothreading",
"inspect",
"take_ckpt"]
815 _enabled_options_of_reduce_as_one = [
816 "inspect",
"take_ckpt"]
818 _enabled_options_of_shuffle = [
819 "inspect",
"key_as_rank",
"take_ckpt"]
821 _enabled_options_of_replicate = [
822 "inspect",
"rank_zero",
"take_ckpt"]
824 _enabled_options_of_distribute = [
825 "nothreading",
"inspect",
"keep_open"]
827 _enabled_options_of_sort_locally = [
828 "nothreading",
"inspect",
"key_as_rank"]
830 _enabled_options_of_sort = [
834 """KVS. Note that there are dummy KVS'es which are temporarily 835 created to hold the C structure of the KVS passed to 836 mapper/reducer functions. A dummy KVS has None in its "mr" 841 def __init__(self, kmr_or_ckvs, keyty="opaque", valty="opaque"):
842 """Makes a KVS for a given KMR. Do not call the KVS constructor 843 directly, but use KMR.make_kvs() instead. A KVS is created 844 with the datatypes stored in the key and the value, specified 845 by the keywords "key=" and "value=". The datatype name is a 846 string, one of "opaque", "cstring", "integer", and "float8". 847 Most mappers and reducers (precisely, the methods that accepts 848 a function argument) take keyword arguments for the types, 849 defaulting with key="opaque" and value="opaque". The 850 datatypes affects the sorting order. """ 854 """mr attribute holds a KMR context object. Note that mr is 855 not accessible from mapping/reducing functions.""" 859 """_ckvs attribute holds a kvs in C.""" 863 """_frameinfo protects caller line information from garbage 866 if isinstance(kmr_or_ckvs, KMR):
867 kf = _field_name_type_map[keyty]
868 vf = _field_name_type_map[valty]
869 top = inspect.currentframe().f_back
870 self.
mr = kmr_or_ckvs
871 (f, l, n) = _make_frame_info(top)
872 self.
_ckvs = kmrso.kmr_create_kvs7(
875 elif isinstance(kmr_or_ckvs, _c_pointer):
878 self.
_ckvs = kmr_or_ckvs
880 raise Exception(
"Bad call to kvs constructor")
883 if ((
not self.
_is_dummy())
and (
not _c_null_pointer(self.
_ckvs))):
888 """Finishes the C part of a KVS.""" 891 raise Exception(
"Bad call to free_kvs on dummy KVS")
892 elif (_c_null_pointer(self.
_ckvs)):
893 raise Exception(
"Bad call to free_kvs on freed KVS")
894 elif ((
not self.
mr is None)
and self.
mr._dismissed):
898 kmrso.kmr_free_kvs(self.
_ckvs)
899 self.
_ckvs = _c_null_pointer_value
903 return (self.
mr is None)
906 """Releases a now dangling C pointer.""" 908 self.
_ckvs = _c_null_pointer_value
910 def _encode_content(self, o, key_or_value):
911 """Marshalls an object with regard to the field type. It 912 retuns a 3-tuple, with length, value-union, and the 3nd to 913 keep a reference to a buffer.""" 917 if (kvty ==
"opaque"):
918 data = pickle.dumps(o, _pickle_protocol)
920 return (len(data), u, data)
921 elif (kvty ==
"cstring"):
922 if (
not isinstance(o, str)):
923 raise Exception(
"Not 8-bit string for cstring: %s" % o)
925 os = ((o +
"\0")
if force_null_terminate_in_cstring
else o)
928 return (len(data), u, data)
929 elif (kvty ==
"integer"):
931 return (ctypes.sizeof(_c_long), u,
None)
932 elif (kvty ==
"float8"):
934 return (ctypes.sizeof(_c_double), u,
None)
936 raise Exception(
"Bad field type: %s" % kvty)
938 def _decode_content(self, siz, u, key_or_value):
939 """Unmarshalls an object with regard to the field type. It 940 returns integer 0 when the length is 0 (it is for a dummy 941 key-value used in kmr_map_once() etc).""" 947 if (kvty ==
"opaque"):
948 data = ctypes.string_at(u.i, siz)
949 o = pickle.loads(data)
951 elif (kvty ==
"cstring"):
953 siz1 = ((siz - 1)
if force_null_terminate_in_cstring
else siz)
954 data = ctypes.string_at(u.i, siz1)
957 elif (kvty ==
"integer"):
959 elif (kvty ==
"float8"):
962 raise Exception(
"Bad field type: %s" % kvty)
965 """Get a field type of a KVS.""" 967 if (_c_null_pointer(self.
_ckvs)):
968 raise Exception(
"Bad KVS (null C-object)")
969 if (key_or_value == _Slot.Key):
970 kvty = kmrso.kmr_get_key_type_ff(self.
_ckvs)
971 elif (key_or_value == _Slot.Value):
972 kvty = kmrso.kmr_get_value_type_ff(self.
_ckvs)
974 raise Exception(
"Bad field %s" % key_or_value.name)
975 if (kvty == _kv_bad):
976 raise Exception(
"Bad field type value %d in KVS" % kvty)
978 return _field_type_name_map[kvty]
981 """Adds a key-value pair.""" 987 """Adds a key-value pair.""" 993 cbox =
_c_kvbox().set(klen, k, vlen, v)
994 kmrso.kmr_add_kv(self.
_ckvs, cbox)
998 """Finishes adding key-value pairs.""" 1000 kmrso.kmr_add_kv_done(self.
_ckvs)
1004 """Gets the total number of key-value pairs.""" 1007 kmrso.kmr_get_element_count(self.
_ckvs, ctypes.byref(c))
1011 """Gets the number of key-value pairs locally.""" 1014 kmrso.kmr_local_element_count(self.
_ckvs, ctypes.byref(c))
1020 (keyty, valty, mkkvo) = _get_options(mopts,
True)
1021 cmopts =
_c_option(mopts, _enabled_options_of_map)
1022 cfn = _wrap_mapfn(fn)
1024 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1025 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1026 (f, l, n) = _make_frame_info(inspect.currentframe().f_back)
1027 kmrso.kmr_map9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1028 if (cmopts.inspect == 0): self.
_consume()
1032 """Maps once with a dummy key-value pair.""" 1035 (keyty, valty, mkkvo) = _get_options(mopts,
True)
1036 cmopts =
_c_option(mopts, _enabled_options_of_map_once)
1037 cfn = _wrap_mapfn(fn)
1038 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1039 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1040 kmrso.kmr_map_once(ckvo, 0, cmopts, rank_zero_only, cfn)
1044 """Maps on rank0 only.""" 1047 return self.
map_once(
True, fn, *mopts)
1050 """Maps sequentially with rank by rank for debugging.""" 1052 (keyty, valty, mkkvo) = _get_options(mopts,
True)
1053 cmopts =
_c_option(mopts, _enabled_options_of_map)
1054 cfn = _wrap_mapfn(fn)
1056 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1057 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1058 kmrso.kmr_map_rank_by_rank(ckvi, ckvo, 0, cmopts, cfn)
1059 if (cmopts.inspect == 0): self.
_consume()
1063 """Maps until some key-value are added.""" 1065 (keyty, valty, mkkvo) = _get_options(mopts,
True)
1066 cmopts =
_c_option(mopts, _enabled_options_of_map)
1068 cfn = _wrap_mapfn(fn)
1069 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1070 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1071 kmrso.kmr_map_for_some(ckvi, ckvo, 0, cmopts, cfn)
1072 if (cmopts.inspect == 0): self.
_consume()
1076 """Maps in master-worker mode.""" 1079 (keyty, valty, mkkvo) = _get_options(mopts,
True)
1080 cmopts =
_c_option(mopts, _enabled_options_of_map_ms)
1081 cfn = _wrap_mapfn(fn)
1083 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1084 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1087 rr = kmrso.kmr_map_ms(ckvi, ckvo, 0, cmopts, cfn)
1092 """Maps in master-worker mode, and runs serial commands.""" 1094 (sopts, mopts) = _filter_spawn_options(xopts)
1095 (keyty, valty, mkkvo) = _get_options(mopts,
True)
1096 cmopts =
_c_option(mopts, _enabled_options_of_map_ms)
1098 cfn = _wrap_mapfn(fn)
1100 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1101 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1104 rr = kmrso.kmr_map_ms_commands(ckvi, ckvo, 0, cmopts, csopts, cfn)
1109 """Maps on processes started by MPI_Comm_spawn().""" 1111 (sopts, mopts) = _filter_spawn_options(xopts)
1112 (keyty, valty, mkkvo) = _get_options(mopts,
True)
1113 cmopts =
_c_option(mopts, _enabled_options_of_map)
1115 cfn = _wrap_mapfn(fn)
1117 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1118 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1119 kmrso.kmr_map_via_spawn(ckvi, ckvo, 0, _mpi_info_null, csopts, cfn)
1124 """Maps on processes started by MPI_Comm_spawn().""" 1126 (keyty, valty, mkkvo) = _get_options(sopts,
True)
1128 cfn = _wrap_mapfn(fn)
1130 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1131 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1132 kmrso.kmr_map_processes(nonmpi, ckvi, ckvo, 0, _mpi_info_null,
1138 """Maps on processes started by MPI_Comm_spawn().""" 1143 """Maps on processes started by MPI_Comm_spawn().""" 1148 """Reduces key-value pairs.""" 1150 (keyty, valty, mkkvo) = _get_options(mopts,
True)
1151 cmopts =
_c_option(mopts, _enabled_options_of_reduce)
1152 cfn = _wrap_redfn(fn)
1154 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1155 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1156 (f, l, n) = _make_frame_info(inspect.currentframe().f_back)
1157 kmrso.kmr_reduce9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1158 if (cmopts.inspect == 0): self.
_consume()
1162 """ Reduces once as if all pairs had the same key.""" 1164 (keyty, valty, mkkvo) = _get_options(mopts,
True)
1165 cmopts =
_c_option(mopts, _enabled_options_of_reduce_as_one)
1166 cfn = _wrap_redfn(fn)
1168 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1169 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1170 kmrso.kmr_reduce_as_one(ckvi, ckvo, 0, cmopts, cfn)
1171 if (cmopts.inspect == 0): self.
_consume()
1175 """Reduces until some key-value are added.""" 1177 (keyty, valty, mkkvo) = _get_options(mopts,
True)
1178 cmopts =
_c_option(mopts, _enabled_options_of_reduce)
1179 cfn = _wrap_redfn(fn)
1181 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1182 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1184 (f, l, n) = _make_frame_info(inspect.currentframe())
1185 kmrso.kmr_reduce9(1, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1186 if (cmopts.inspect == 0): self.
_consume()
1190 """Makes a new pair by swapping the key and the value.""" 1194 (_, _, mkkvo) = _get_options(mopts,
False)
1195 cmopts =
_c_option(mopts, _enabled_options_of_map)
1196 assert (mkkvo
is True)
1198 kvo = (
KVS(self.
mr, valty, keyty)
if mkkvo
else None)
1199 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1200 kmrso.kmr_reverse(ckvi, ckvo, cmopts)
1201 if (cmopts.inspect == 0): self.
_consume()
1205 """Shuffles key-value pairs.""" 1209 (_, _, mkkvo) = _get_options(mopts,
False)
1210 cmopts =
_c_option(mopts, _enabled_options_of_shuffle)
1212 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1213 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1214 kmrso.kmr_shuffle(ckvi, ckvo, cmopts)
1215 if (cmopts.inspect == 0): self.
_consume()
1219 """Replicates key-value pairs to be visible on all ranks.""" 1223 (_, _, mkkvo) = _get_options(mopts,
False)
1224 cmopts =
_c_option(mopts, _enabled_options_of_replicate)
1226 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1227 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1228 kmrso.kmr_replicate(ckvi, ckvo, cmopts)
1229 if (cmopts.inspect == 0): self.
_consume()
1233 """Distributes pairs approximately evenly to ranks.""" 1237 (_, _, mkkvo) = _get_options(mopts,
False)
1238 cmopts =
_c_option(mopts, _enabled_options_of_distribute)
1240 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1241 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1242 kmrso.kmr_distribute(ckvi, ckvo, cyclic, cmopts)
1243 if (cmopts.inspect == 0): self.
_consume()
1247 """Reorders key-value pairs in a single rank.""" 1251 (_, _, mkkvo) = _get_options(mopts,
False)
1252 cmopts =
_c_option(mopts, _enabled_options_of_sort_locally)
1254 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1255 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1256 kmrso.kmr_sort_locally(ckvi, ckvo, shuffling, cmopts)
1257 if (cmopts.inspect == 0): self.
_consume()
1261 """Sorts a KVS globally.""" 1265 (_, _, mkkvo) = _get_options(mopts,
False)
1266 cmopts =
_c_option(mopts, _enabled_options_of_sort)
1268 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1269 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1270 kmrso.kmr_sort(ckvi, ckvo, cmopts)
1271 if (cmopts.inspect == 0): self.
_consume()
1275 """Concatenates a number of KVS'es to one.""" 1279 siz = (len(morekvs) + 1)
1280 ckvsvec = (_c_kvs * siz)()
1281 ckvsvec[0] = self.
_ckvs 1282 for i
in range(0, len(morekvs)):
1283 ckvsvec[i + 1] = morekvs[i]._ckvs
1285 kvo =
KVS(self.
mr, keyty, valty)
1287 kmrso.kmr_concatenate_kvs(ckvsvec, cn, ckvo,
_c_option())
1294 """Reassembles files reading by ranks.""" 1298 kmrso.kmr_read_files_reassemble(
1299 self.
mr._ckmr, _encode(filename), color, offset, bytes_,
1300 ctypes.byref(buf), ctypes.byref(siz))
1302 ptr = (_c_ubyte * siz.value).from_address(addr)
1303 data = bytearray(ptr)
1304 kmrso.kmr_mfree(addr, siz.value)
1308 """Reads one file by segments and reassembles.""" 1312 kmrso.kmr_read_file_by_segments(
1313 self.
mr._ckmr, _encode(filename), color,
1314 ctypes.byref(buf), ctypes.byref(siz))
1316 ptr = (_c_ubyte * siz.value).from_address(addr)
1317 data = bytearray(ptr)
1318 kmrso.kmr_mfree(addr, siz.value)
1322 """Packs locally the contents of a KVS to a byte array.""" 1326 kmrso.kmr_save_kvs(self.
_ckvs, ctypes.byref(buf), ctypes.byref(siz),
1329 ptr = (_c_ubyte * siz.value).from_address(addr)
1330 data = bytearray(ptr)
1331 kmrso.kmr_mfree(addr, siz.value)
1335 """Unpacks locally the contents of a KVS from a byte array.""" 1337 kvo =
KVS(self.
mr,
"opaque",
"opaque")
1339 addr = (_c_ubyte * siz).from_buffer(data)
1340 kmrso.kmr_restore_kvs(kvo._ckvs, addr, siz,
_c_option())
1343 def _map_swf(self, fn, **xopts):
1345 (sopts, mopts) = _filter_spawn_options(xopts)
1346 (keyty, valty, mkkvo) = _get_options(mopts,
True)
1347 cmopts =
_c_option(mopts, _enabled_options_of_map)
1349 cfn = _wrap_mapfn(fn)
1351 kvo = (
KVS(self.
mr, keyty, valty)
if mkkvo
else None)
1352 ckvo = (kvo._ckvs
if (kvo
is not None)
else None)
1353 kmrso.kmr_map_swf(ckvi, ckvo, 0, csopts, cfn)
1358 """Finishes using KMR4PY.""" 1364 """Returns an array of LOCAL contents.""" 1366 a = kvs.local_element_count() * [
None]
1367 def f (kv, kvi, kvo, i, *_data):
1370 kvo = kvs.map(f, output=
False, inspect=
True)
1371 assert (kvo
is None)
1374 def _stringify_options(o):
1375 return _decode(kmrso.kmr_stringify_options(o))
1377 def _stringify_file_options(o):
1378 return _decode(kmrso.kmr_stringify_file_options(o))
1380 def _stringify_spawn_options(o):
1381 return _decode(kmrso.kmr_stringify_spawn_options(o))
1383 def _check_ctypes_values():
1384 """Checks if ctypes values are properly used.""" 1386 if (
not _c_null_pointer(_c_null_pointer_value)):
1387 raise Exception(
"BAD: C null pointer has a wrong value.")
1389 def _check_passing_options():
1390 """Checks if the options are passed properly from Python to C.""" 1392 for (option, stringify)
in [
1393 (_c_option, _stringify_options),
1394 (_c_file_option, _stringify_file_options),
1395 (_c_spawn_option, _stringify_spawn_options)]:
1396 for (o, _, _)
in option._fields_:
1397 if ((o ==
"gap16")
or (o ==
"gap32")):
1400 copts = option({o : 1})
1401 s = stringify(copts)
1403 raise Exception(
"BAD: %s != %s" % (str(o), str(s)))
def map_for_some(self, fn, mopts)
nothreading
Sets the options as dictionary passed.
def reduce_for_some(self, fn, mopts)
def _encode_content(self, o, key_or_value)
def get_spawner_communicator(self, index)
def add_kv(self, key, val)
def map_processes(self, nonmpi, fn, sopts)
separator_space
Sets the options as dictionary passed.
def map_via_spawn(self, fn, xopts)
def reduce_as_one(self, fn, mopts)
def concatenate(self, morekvs)
def create_kvs(self, opts)
def read_files_reassemble(self, filename, color, offset, bytes_)
def map_rank_by_rank(self, fn, mopts)
def map_on_rank_zero(self, fn, mopts)
def map_ms(self, fn, mopts)
def get_element_count(self)
each_rank
Sets the options as dictionary passed.
def reduce(self, fn, mopts)
def map_ms_commands(self, fn, xopts)
def local_element_count(self)
def sort_locally(self, shuffling, mopts)
def map_serial_processes(self, fn, sopts)
def distribute(self, cyclic, mopts)
def __init__(self)
NOTE: Defining init with some arguments makes c-callback fail to call initializers.
def replicate(self, mopts)
def set_option(self, k, v)
def get_field_type(self, key_or_value)
def map_once(self, rank_zero_only, fn, mopts)
def __init__(self, comm, info=None)
attributes: self._ckmr, self.nprocs, self.rank, self.emptykvs, self._dismissed.
def map_parallel_processes(self, fn, sopts)
def reply_to_spawner(self)
def __init__(self, kmr_or_ckvs, keyty="opaque", valty="opaque")
def read_file_by_segments(self, filename, color)
_ckvs
Do not free when KMR object is dismissed.
def send_kvs_to_spawner(self, kvs)