kmr4py
Python Binding for KMR Map-Reduce Library. This provides straightforward wrappers to the C routines. See more about KMR at "https://github.com/RIKEN-RCCS/kmr". All key-value data is stored in C structures after encoding/decoding Python objects to byte arrays in C. The documentation in Python is minimum, so please refer to the documentation in C. It works with Python3 (possibly 3.4 and later), but not with 2.x.
1## kmr4py.py 2## Copyright (C) 2012-2018 RIKEN R-CCS 3 4"""Python Binding for KMR Map-Reduce Library. This provides 5straightforward wrappers to the C routines. See more about KMR at 6"https://github.com/RIKEN-RCCS/kmr". All key-value data is stored in C 7structures after encoding/decoding Python objects to byte arrays in C. 8The documentation in Python is minimum, so please refer to the 9documentation in C. It works with Python3 (possibly 3.4 and later), 10but not with 2.x.""" 11 12## NOTE: Importing MPI module from mpi4py package initializes for MPI 13## execution. Import MPI and then import kmr4py in application codes. 14 15from enum import Enum 16import warnings 17import ctypes 18import pickle 19import inspect 20import traceback 21import sys 22##import mpi4py 23 24__version__ = "20201116" 25kmrversion = "1.10" 26 27kmrso = None 28 29"""kmrso holds a libkmr.so library object.""" 30 31kmrso_name = "libkmr.so" 32 33"""kmrso_name holds a libkmr.so name, which can be set before calling KMR().""" 34 35_kmrso_version = None 36 37"""_kmrso_version holds a version taken from a libkmr.so.""" 38 39_pickle_protocol = pickle.DEFAULT_PROTOCOL 40 41warning_function = warnings.warn 42 43"""warning_function specifies the function used to issue warnings.""" 44 45ignore_exceptions_in_map_fn = True 46 47"""ignore_exceptions_in_map_fn=True makes exceptions ignored.""" 48 49print_backtrace_in_map_fn = True 50 51"""print_backtrace_in_map_fn=True makes backtraces are printed at 52exceptions in mapper/reducer functions.""" 53 54force_null_terminate_in_cstring = True 55 56"""force_null_terminate_in_cstring specifies to add a null-terminator 57in C strings. Do not change it while some KVS'es are live.""" 58 59_c_pointer = ctypes.POINTER(ctypes.c_char) 60_c_kmr = _c_pointer 61_c_kvs = _c_pointer 62_c_kvsvec = ctypes.c_void_p 63_c_boxvec = ctypes.c_void_p 64_c_fnp = ctypes.c_void_p 65_c_void_p = ctypes.c_void_p 66_c_ubyte = ctypes.c_ubyte 67_c_bool = ctypes.c_bool 68_c_int = ctypes.c_int 69_c_uint = ctypes.c_uint 70_c_long = ctypes.c_long 71_c_uint8 = ctypes.c_uint8 72_c_uint32 = ctypes.c_uint32 73_c_uint64 = ctypes.c_uint64 74_c_double = ctypes.c_double 75_c_size_t = ctypes.c_size_t 76_c_string = ctypes.c_char_p 77 78## _c_funcptr is ctypes._FuncPtr, but it is taken indirectly 79## because it is hidden. 80 81##_c_funcptr = type(kmrso.kmr_init_2) 82 83## _c_null_pointer_value is a null value for ctypes. 84 85_c_null_pointer_value = _c_pointer() 86 87def _c_null_pointer(p): 88 """Returns true if ctypes pointer is null.""" 89 90 return (not bool(p)) 91 92## _name_coding is used to pass a string to C routines and back. 93 94##_name_coding = "utf-8" 95_name_coding = "latin-1" 96 97def _encode(us): 98 return us.encode(_name_coding) 99 100def _decode(bs): 101 return bs.decode(_name_coding) 102 103class _Slot(Enum): 104 """Discrimination of a field indicated by key_or_value.""" 105 Key = 0 106 Value = 1 107 108## Loading C routines of libkmr.so. 109 110def _setup_mpi_constants(): 111 """Imports values of some MPI constants. Calling kmr_mpi_type_size 112 and kmr_mpi_constant_value dose not need MPI be initialized.""" 113 114 def c_type_by_size(siz): 115 if (siz == ctypes.sizeof(_c_uint64)): 116 return _c_uint64 117 elif (siz == ctypes.sizeof(_c_uint32)): 118 return _c_uint32 119 else: 120 raise Exception("Bad type size unknown: %d" % siz) 121 return None 122 123 global _c_mpi_comm, _c_mpi_info 124 global _mpi_comm_world, _mpi_comm_self, _mpi_info_null 125 126 siz = kmrso.kmr_mpi_type_size(_encode("MPI_Comm")) 127 _c_mpi_comm = c_type_by_size(siz) 128 siz = kmrso.kmr_mpi_type_size(_encode("MPI_Info")) 129 _c_mpi_info = c_type_by_size(siz) 130 _mpi_comm_world = kmrso.kmr_mpi_constant_value(_encode("MPI_COMM_WORLD")) 131 _mpi_comm_self = kmrso.kmr_mpi_constant_value(_encode("MPI_COMM_SELF")) 132 _mpi_info_null = kmrso.kmr_mpi_constant_value(_encode("MPI_INFO_NULL")) 133 return 134 135def _load_kmrso(soname = "libkmr.so"): 136 """Loads libkmr.so, and initializes some constants depending on the 137 library.""" 138 139 global kmrso, _kmrso_version 140 global _c_funcptr 141 global _kv_bad, _kv_opaque, _kv_cstring, _kv_integer, _kv_float8 142 global _field_name_type_map, _field_type_name_map 143 global _MKMAPFN, _MKREDFN 144 145 ## Load "libkmr.so". 146 147 kmrso = ctypes.CDLL(soname) 148 149 _kmrso_version = ctypes.c_int.in_dll(kmrso, "kmr_version").value 150 if (__version__ != str(_kmrso_version)): 151 warnings.warn(("Version unmatch with libkmr.so;" 152 + " found=" + str(_kmrso_version) 153 + " required=" + __version__), 154 RuntimeWarning) 155 156 kmrso.kmr_mpi_type_size.argtypes = [_c_string] 157 kmrso.kmr_mpi_type_size.restype = _c_size_t 158 159 kmrso.kmr_mpi_constant_value.argtypes = [_c_string] 160 kmrso.kmr_mpi_constant_value.restype = _c_uint64 161 162 _setup_mpi_constants() 163 164 _c_funcptr = type(kmrso.kmr_init_2) 165 166 kmrso.kmr_init_2.argtypes = [ctypes.c_int] 167 kmrso.kmr_init_2.restype = ctypes.c_int 168 169 ## Initializes KMR at this point. 170 171 kmrso.kmr_init_2(0) 172 173 ## Library dependent constants. 174 175 _kv_bad = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_bad").value 176 _kv_opaque = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_opaque").value 177 _kv_cstring = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_cstring").value 178 _kv_integer = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_integer").value 179 _kv_float8 = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_float8").value 180 181 _field_name_type_map = { 182 "opaque" : _kv_opaque, "cstring" : _kv_cstring, 183 "integer" : _kv_integer, "float8" : _kv_float8} 184 185 _field_type_name_map = dict( 186 map(tuple, map(reversed, _field_name_type_map.items()))) 187 188 ## C-callable function factories. 189 190 _MKMAPFN = ctypes.CFUNCTYPE(_c_int, _c_kvbox, _c_kvs, _c_kvs, 191 _c_void_p, _c_long) 192 _MKREDFN = ctypes.CFUNCTYPE(_c_int, _c_boxvec, _c_long, 193 _c_kvs, _c_kvs, _c_void_p) 194 195 kmrso.kmr_fin.argtypes = [] 196 kmrso.kmr_fin.restype = _c_int 197 198 kmrso.kmr_initialize_mpi.argtypes = [_c_pointer, _c_pointer] 199 kmrso.kmr_initialize_mpi.restype = _c_int 200 201 kmrso.kmr_create_context.argtypes = [_c_mpi_comm, _c_mpi_info, _c_string] 202 kmrso.kmr_create_context.restype = _c_pointer 203 204 kmrso.kmr_create_dummy_context.argtypes = [] 205 kmrso.kmr_create_dummy_context.restype = _c_pointer 206 207 kmrso.kmr_free_context.argtypes = [_c_kmr] 208 kmrso.kmr_free_context.restype = None 209 210 kmrso.kmr_set_option_by_strings.argtypes = [_c_kmr, _c_string, _c_string] 211 kmrso.kmr_set_option_by_strings.restype = None 212 213 kmrso.kmr_create_kvs7.argtypes = [ 214 _c_kmr, _c_int, _c_int, _c_option, _c_string, _c_int, _c_string] 215 kmrso.kmr_create_kvs7.restype = _c_kvs 216 217 kmrso.kmr_add_kv.argtypes = [_c_kvs, _c_kvbox] 218 kmrso.kmr_add_kv.restype = None 219 220 kmrso.kmr_add_kv_done.argtypes = [_c_kvs] 221 kmrso.kmr_add_kv_done.restype = None 222 223 kmrso.kmr_get_element_count.argtypes = [_c_kvs] 224 kmrso.kmr_get_element_count.restype = _c_long 225 226 kmrso.kmr_local_element_count.argtypes = [_c_kvs] 227 kmrso.kmr_local_element_count.restype = _c_long 228 229 kmrso.kmr_map9.argtypes = [ 230 _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp, 231 _c_string, _c_int, _c_string] 232 kmrso.kmr_map9.restype = None 233 234 kmrso.kmr_map_once.argtypes = [_c_kvs, _c_void_p, _c_option, 235 _c_bool, _c_fnp] 236 kmrso.kmr_map_once.restype = None 237 238 kmrso.kmr_map_rank_by_rank.argtypes = [ 239 _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp] 240 kmrso.kmr_map_rank_by_rank.restype = None 241 242 kmrso.kmr_map_for_some.argtypes = [ 243 _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp] 244 kmrso.kmr_map_for_some.restype = None 245 246 kmrso.kmr_map_ms.argtypes = [_c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp] 247 kmrso.kmr_map_ms.restype = _c_int 248 249 kmrso.kmr_map_ms_commands.argtypes = [ 250 _c_kvs, _c_kvs, _c_void_p, _c_option, _c_spawn_option, _c_fnp] 251 kmrso.kmr_map_ms_commands.restype = _c_int 252 253 kmrso.kmr_map_via_spawn.argtypes = [ 254 _c_kvs, _c_kvs, _c_void_p, _c_mpi_info, _c_spawn_option, _c_fnp] 255 kmrso.kmr_map_via_spawn.restype = None 256 257 kmrso.kmr_map_processes.argtypes = [ 258 _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_mpi_info, 259 _c_spawn_option, _c_fnp] 260 kmrso.kmr_map_processes.restype = None 261 262 kmrso.kmr_reduce9.argtypes = [ 263 _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp, 264 _c_string, _c_int, _c_string] 265 kmrso.kmr_reduce9.restype = None 266 267 kmrso.kmr_reduce_as_one.argtypes = [ 268 _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp] 269 kmrso.kmr_reduce_as_one.type = None 270 271 kmrso.kmr_shuffle.argtypes = [_c_kvs, _c_kvs, _c_option] 272 kmrso.kmr_shuffle.restype = None 273 274 kmrso.kmr_replicate.argtypes = [_c_kvs, _c_kvs, _c_option] 275 kmrso.kmr_replicate.restype = None 276 277 kmrso.kmr_distribute.argtypes = [_c_kvs, _c_kvs, _c_bool, _c_option] 278 kmrso.kmr_distribute.restype = None 279 280 kmrso.kmr_concatenate_kvs.argtypes = [_c_kvsvec, _c_int, _c_kvs, _c_option] 281 kmrso.kmr_concatenate_kvs.restype = None 282 283 kmrso.kmr_reverse.argtypes = [_c_kvs, _c_kvs, _c_option] 284 kmrso.kmr_reverse.restype = None 285 286 kmrso.kmr_sort.argtypes = [_c_kvs, _c_kvs, _c_option] 287 kmrso.kmr_sort.restype = None 288 289 kmrso.kmr_sort_locally.argtypes = [_c_kvs, _c_kvs, _c_bool, _c_option] 290 kmrso.kmr_sort_locally.restype = None 291 292 kmrso.kmr_reply_to_spawner.argtypes = [_c_kmr] 293 kmrso.kmr_reply_to_spawner.restype = None 294 295 kmrso.kmr_send_kvs_to_spawner.argtypes = [_c_kmr, _c_kvs] 296 kmrso.kmr_send_kvs_to_spawner.restype = None 297 298 kmrso.kmr_get_spawner_communicator.argtypes = [_c_void_p, _c_long] 299 kmrso.kmr_get_spawner_communicator.restype = ctypes.POINTER(_c_mpi_comm) 300 301 kmrso.kmr_read_files_reassemble.argtypes = [ 302 _c_kmr, _c_string, _c_int, _c_uint64, _c_uint64, 303 ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_uint64)] 304 kmrso.kmr_read_files_reassemble.restype = None 305 306 kmrso.kmr_read_file_by_segments.argtypes = [ 307 _c_kmr, _c_string, _c_int, 308 ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_uint64)] 309 kmrso.kmr_read_file_by_segments.restype = None 310 311 kmrso.kmr_save_kvs.argtypes = [ 312 _c_kvs, ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_size_t), 313 _c_option] 314 kmrso.kmr_save_kvs.restype = None 315 316 kmrso.kmr_restore_kvs.argtypes = [ 317 _c_kvs, _c_void_p, _c_size_t, _c_option] 318 kmrso.kmr_restore_kvs.restype = None 319 320 kmrso.kmr_dump_kvs.argtypes = [_c_kvs, _c_int] 321 kmrso.kmr_dump_kvs.restype = None 322 323 kmrso.kmr_get_key_type_ff.argtypes = [_c_kvs] 324 kmrso.kmr_get_key_type_ff.restype = _c_int 325 326 kmrso.kmr_get_value_type_ff.argtypes = [_c_kvs] 327 kmrso.kmr_get_value_type_ff.restype = _c_int 328 329 kmrso.kmr_get_nprocs.argtypes = [_c_kmr] 330 kmrso.kmr_get_nprocs.restype = _c_int 331 332 kmrso.kmr_get_rank.argtypes = [_c_kmr] 333 kmrso.kmr_get_rank.restype = _c_int 334 335 kmrso.kmr_mfree.argtypes = [_c_void_p, _c_size_t] 336 kmrso.kmr_mfree.restype = None 337 338 kmrso.kmr_stringify_options.argtypes = [_c_option] 339 kmrso.kmr_stringify_options.restype = _c_string 340 341 kmrso.kmr_stringify_file_options.argtypes = [_c_file_option] 342 kmrso.kmr_stringify_file_options.restype = _c_string 343 344 kmrso.kmr_stringify_spawn_options.argtypes = [_c_spawn_option] 345 kmrso.kmr_stringify_spawn_options.restype = _c_string 346 347 kmrso.kmr_map_swf.argtypes = [ 348 _c_kvs, _c_kvs, _c_void_p, _c_spawn_option, _c_fnp] 349 kmrso.kmr_map_swf.restype = None 350 351 kmrso.kmr_init_swf.argtypes = [ 352 _c_kmr, ctypes.POINTER(_c_mpi_comm), _c_int] 353 kmrso.kmr_init_swf.restype = None 354 355 kmrso.kmr_detach_swf_workers.argtypes = [_c_kmr] 356 kmrso.kmr_detach_swf_workers.restype = None 357 358 kmrso.kmr_stop_swf_workers.argtypes = [_c_kmr] 359 kmrso.kmr_stop_swf_workers.restype = None 360 361 kmrso.kmr_finish_swf.argtypes = [_c_kmr] 362 kmrso.kmr_finish_swf.restype = None 363 364 kmrso.kmr_split_swf_lanes.argtypes = [ 365 _c_kmr, ctypes.POINTER(_c_mpi_comm), _c_int, 366 ctypes.POINTER(_c_string), _c_bool] 367 kmrso.kmr_split_swf_lanes.restype = None 368 369 kmrso.kmr_dump_swf_lanes.argtypes = [_c_kmr] 370 kmrso.kmr_dump_swf_lanes.restype = None 371 372 kmrso.kmr_set_swf_verbosity.argtypes = [_c_kmr] 373 kmrso.kmr_set_swf_verbosity.restype = None 374 375 #receive_kvs_from_spawned_fn = kmrso.kmr_receive_kvs_from_spawned_fn 376 377 return None 378 379def _string_of_options(o): 380 """Returns a print string of options for _c_option, 381 _c_file_option, and _c_spawn_option.""" 382 383 prefix = o.__class__.__name__ 384 attrs = o.__class__._fields_ 385 ss = [] 386 for (f, _, _) in attrs: 387 if ((f not in ["gap16", "gap32"]) and getattr(o, f) == 1): 388 ss.append(f + "=1") 389 return (prefix + "(" + (",".join(ss)) + ")") 390 391class _c_option(ctypes.Structure): 392 """kmr_option.""" 393 394 _fields_ = [ 395 ("nothreading", _c_uint8, 1), 396 ("inspect", _c_uint8, 1), 397 ("keep_open", _c_uint8, 1), 398 ("key_as_rank", _c_uint8, 1), 399 ("rank_zero", _c_uint8, 1), 400 ("collapse", _c_uint8, 1), 401 ("take_ckpt", _c_uint8, 1), 402 ("gap16", _c_uint, 16), 403 ("gap32", _c_uint, 32)] 404 405 def __init__(self, opts=None, enabledlist=None): 406 super(_c_option, self).__init__() 407 if opts is None: opts = {} 408 if enabledlist is None: enabledlist = [] 409 ## Sets the options as dictionary passed. 410 for o, v in iter(opts.items()): 411 if (o in ["key", "value", "output"]): 412 ## "key", "value", and "output" are Python binding only. 413 pass 414 elif (enabledlist != [] and (o not in enabledlist)): 415 raise Exception("Bad option: %s" % o) 416 elif (o == "nothreading"): 417 self.nothreading = v 418 elif (o == "inspect"): 419 self.inspect = v 420 elif (o == "keep_open"): 421 self.keep_open = v 422 elif (o == "key_as_rank"): 423 self.key_as_rank = v 424 elif (o == "rank_zero"): 425 self.rank_zero = v 426 elif (o == "collapse"): 427 self.collapse = v 428 elif (o == "take_ckpt"): 429 self.take_ckpt = v 430 else: 431 raise Exception("Bad option: %s" % o) 432 return 433 434 def __str__(self): 435 return _string_of_options(self) 436 437class _c_file_option(ctypes.Structure): 438 """kmr_file_option.""" 439 440 _fields_ = [ 441 ("each_rank", _c_uint8, 1), 442 ("subdirectories", _c_uint8, 1), 443 ("list_file", _c_uint8, 1), 444 ("shuffle_names", _c_uint8, 1), 445 ("gap16", _c_uint, 16), 446 ("gap32", _c_uint, 32)] 447 448 def __init__(self, opts=None, enabledlist=None): 449 super(_c_file_option, self).__init__() 450 if opts is None: opts = {} 451 if enabledlist is None: enabledlist = [] 452 ## Sets the options as dictionary passed. 453 for o, v in iter(opts.items()): 454 if (o == "key" or o == "output"): 455 ## "key" and "output" are Python binding only. 456 pass 457 elif (enabledlist != [] and (o not in enabledlist)): 458 raise Exception("Bad option: %s" % o) 459 elif (o == "each_rank"): 460 self.each_rank = v 461 elif (o == "subdirectories"): 462 self.subdirectories = v 463 elif (o == "list_file"): 464 self.list_file = v 465 elif (o == "shuffle_names"): 466 self.shuffle_names = v 467 else: 468 raise Exception("Bad option: %s" % o) 469 return 470 471 def __str__(self): 472 return _string_of_options(self) 473 474class _c_spawn_option(ctypes.Structure): 475 """kmr_spawn_option.""" 476 477 _fields_ = [ 478 ("separator_space", _c_uint8, 1), 479 ("reply_each", _c_uint8, 1), 480 ("reply_root", _c_uint8, 1), 481 ("no_set_infos", _c_uint8, 1), 482 ("take_ckpt", _c_uint8, 1), 483 ("gap16", _c_uint, 16), 484 ("gap32", _c_uint, 32)] 485 486 def __init__(self, opts=None, enabledlist=None): 487 super(_c_spawn_option, self).__init__() 488 if opts is None: opts = {} 489 if enabledlist is None: enabledlist = [] 490 ## Sets the options as dictionary passed. 491 for o, v in iter(opts.items()): 492 if (o == "key" or o == "output"): 493 ## "key" and "output" are Python binding only. 494 pass 495 elif (enabledlist != [] and (o not in enabledlist)): 496 raise Exception("Bad option: %s" % o) 497 elif (o == "separator_space"): 498 self.separator_space = v 499 elif (o == "reply_each"): 500 self.reply_each = v 501 elif (o == "reply_root"): 502 self.reply_root = v 503 elif (o == "no_set_infos"): 504 self.no_set_infos = v 505 elif (o == "take_ckpt"): 506 self.take_ckpt = v 507 else: 508 raise Exception("Bad option: %s" % o) 509 return 510 511 def __str__(self): 512 return _string_of_options(self) 513 514_spawn_option_list = [_k for (_k, _, _) in _c_spawn_option._fields_] 515_file_option_list = [_k for (_k, _, _) in _c_file_option._fields_] 516 517class _c_unitsized(ctypes.Union): 518 """kmr_unit_sized {const char *p; long i; double d;}.""" 519 520 _fields_ = [ 521 ("p", _c_string), 522 ("i", _c_long), 523 ("d", _c_double)] 524 525class _c_kvbox(ctypes.Structure): 526 """kmr_kv_box {int klen, vlen; kmr_unit_sized k, v;}.""" 527 528 _fields_ = [ 529 ("klen", _c_int), 530 ("vlen", _c_int), 531 ("k", _c_unitsized), 532 ("v", _c_unitsized)] 533 534 ## NOTE: Defining __init__ with some arguments makes c-callback 535 ## fail to call initializers. 536 537 def __init__(self): 538 super(_c_kvbox, self).__init__() 539 540 def set(self, klen, key, vlen, val): 541 self.klen = klen 542 self.vlen = vlen 543 self.k = key 544 self.v = val 545 return self 546 547def _wrap_mapfn(pyfn): 548 """Returns a closure which calls a given Python map-function on 549 the unmarshalled contents in KVS.""" 550 551 if (pyfn is None): 552 return 0 553 elif (isinstance(pyfn, _c_funcptr)): 554 return pyfn 555 else: 556 def applyfn(cbox, ckvi, ckvo, carg, cindex): 557 kvi = KVS(ckvi) 558 kvo = KVS(ckvo) 559 key = kvi._decode_content(cbox.klen, cbox.k, _Slot.Key) 560 val = kvi._decode_content(cbox.vlen, cbox.v, _Slot.Value) 561 try: 562 pyfn((key, val), kvi, kvo, cindex) 563 except: 564 warning_function(("Exception in Python callbacks: %s" 565 % str(sys.exc_info()[1])), 566 RuntimeWarning) 567 if (print_backtrace_in_map_fn): traceback.print_exc() 568 return (0 if ignore_exceptions_in_map_fn else -1) 569 return _MKMAPFN(applyfn) 570 571def _wrap_redfn(pyfn): 572 """Returns a closure which calls a given Python reduce-function on 573 the unmarshalled contents in KVS.""" 574 575 if (pyfn is None): 576 return 0 577 elif (isinstance(pyfn, _c_funcptr)): 578 return pyfn 579 else: 580 def applyfn(cboxvec, n, ckvi, ckvo, carg): 581 kvi = KVS(ckvi) 582 kvo = KVS(ckvo) 583 kvvec = [] 584 for i in range(0, n): 585 pos = (cboxvec + ctypes.sizeof(_c_kvbox) * i) 586 cbox = _c_kvbox.from_address(pos) 587 key = kvi._decode_content(cbox.klen, cbox.k, _Slot.Key) 588 val = kvi._decode_content(cbox.vlen, cbox.v, _Slot.Value) 589 kvvec.append((key, val)) 590 try: 591 pyfn(kvvec, kvi, kvo) 592 except: 593 warning_function(("Exception in Python callbacks: %s" 594 % str(sys.exc_info()[1])), 595 RuntimeWarning) 596 if (print_backtrace_in_map_fn): traceback.print_exc() 597 return (0 if ignore_exceptions_in_map_fn else -1) 598 return _MKREDFN(applyfn) 599 600def _get_options(opts, with_keyty_valty): 601 """Returns a triple of the options: a key field type, a value 602 field type, and a flag of needs of output generation.""" 603 604 if ((not with_keyty_valty) and (("key" in opts) or ("value" in opts))): 605 raise Exception("Bad option: key= or value= not allowed") 606 keyty = opts.get("key", "opaque") 607 valty = opts.get("value", "opaque") 608 mkkvo = opts.get("output", True) 609 return (keyty, valty, mkkvo) 610 611def _make_frame_info(frame): 612 sp = frame 613 co = sp.f_code 614 return (_encode(co.co_filename), sp.f_lineno, _encode(co.co_name)) 615 616def _filter_spawn_options(opts): 617 """Returns a pair of dictionaries, the 1st holds options to spawn, 618 and the 2nd holds the other options.""" 619 620 sopts = dict() 621 mopts = dict() 622 for o, v in iter(opts.items()): 623 if (o in _spawn_option_list): 624 sopts[o] = v 625 else: 626 mopts[o] = v 627 return (sopts, mopts) 628 629class KMR(): 630 """KMR context.""" 631 632 ## attributes: self._ckmr, self.nprocs, self.rank, self.emptykvs, 633 ## self._dismissed. 634 635 def __init__(self, comm, info=None): 636 """Makes a KMR context with a given MPI communicator (comm), 637 which is used in succeeding operations. Info specifies its 638 options by MPI_Info. Arguments of comm/info are passed as a 639 long integer (assuming either an integer (int) or a pointer in 640 C). It also accepts an communicator instance of mpi4py.MPI.Comm, 641 a string "dummy" or "world" as a comm argument.""" 642 643 if (kmrso == None): 644 _load_kmrso(kmrso_name) 645 646 if (isinstance(info, (int))): 647 warninfo = False 648 cinfo = info 649 else: 650 warninfo = (info != None) 651 cinfo = _mpi_info_null 652 if (isinstance(comm, (int))): 653 warncomm = False 654 ccomm = comm 655 elif (isinstance(comm, mpi4py.MPI.Comm)): 656 warncomm = False 657 comm_ptr = mpi4py.MPI._addressof(comm) 658 if (mpi4py.MPI._sizeof(mpi4py.MPI.Comm) == ctypes.sizeof(_c_uint64)): 659 MPI_Comm = _c_uint64 660 else: 661 MPI_Comm = _c_void_p 662 ccomm = MPI_Comm.from_address(comm_ptr) 663 elif (comm == "dummy"): 664 warncomm = False 665 ccomm = _mpi_comm_self 666 elif (comm == "world"): 667 warncomm = False 668 ccomm = _mpi_comm_world 669 else: 670 warncomm = True 671 ccomm = _mpi_comm_world 672 673 self._ckmr = kmrso.kmr_create_context(ccomm, cinfo, _encode("")) 674 675 """self._ckmr holds the C part of a KMR context.""" 676 677 if (_c_null_pointer(self._ckmr)): 678 raise Exception("kmr_create_context: failed") 679 680 self._dismissed = False 681 682 """self._dismissed=True disables freeing KVS'es (by memory 683 management) which remain unconsumed after dismissing a KMR 684 context. It is because freeing them causes referencing 685 dangling pointers in C.""" 686 687 self.emptykvs = KVS(self).free() 688 689 """self.emptykvs holds an empty KVS needed by map_once, 690 map_on_rank_zero, read_files_reassemble, and 691 read_file_by_segments.""" 692 693 self.nprocs = kmrso.kmr_get_nprocs(self._ckmr) 694 695 """self.nprocs holds an nprocs of MPI.""" 696 697 self.rank = kmrso.kmr_get_rank(self._ckmr) 698 699 """self.rank holds a rank of MPI.""" 700 701 if (warncomm and (self.rank == 0)): 702 warning_function("MPI comm ignored in KMR() constructor.", RuntimeWarning) 703 if (warninfo and (self.rank == 0)): 704 warning_function("MPI info ignored in KMR() constructor.", RuntimeWarning) 705 return 706 707 def __del__(self): 708 self.dismiss() 709 return 710 711 def free(self): 712 """Dismisses KMR (an alias of dismiss()).""" 713 714 self.dismiss() 715 716 def dismiss(self): 717 """Dismisses KMR.""" 718 719 if (not _c_null_pointer(self._ckmr)): 720 kmrso.kmr_free_context(self._ckmr) 721 self._ckmr = _c_null_pointer_value 722 self._dismissed = True 723 self.emptykvs = None 724 self.nprocs = -1 725 self.rank = -1 726 return 727 728 def create_kvs(self, **opts): 729 """Makes a new KVS (an alias of make_kvs()).""" 730 731 self.make_kvs(**opts) 732 733 def make_kvs(self, **opts): 734 """Makes a new KVS.""" 735 736 (keyty, valty, _) = _get_options(opts, True) 737 return KVS(self, keyty, valty) 738 739 def reply_to_spawner(self): 740 """Sends a reply message from a spawned process.""" 741 742 kmrso.kmr_reply_to_spawner(self._ckmr) 743 return 744 745 def get_spawner_communicator(self, index): 746 """Obtains a parent communicator of a spawned process. C version 747 returns a reference, but this returns an entity""" 748 749 commref = kmrso.kmr_get_spawner_communicator(self._ckmr, index) 750 return commref.contents.value 751 752 def send_kvs_to_spawner(self, kvs): 753 """Sends the KVS from a spawned process to the spawner.""" 754 755 kmrso.kmr_send_kvs_to_spawner(self._ckmr, kvs._ckvs) 756 return 757 758 def _init_swf(self, splitcomms, masterank): 759 """.""" 760 kmrso.kmr_init_swf(self._ckmr, splitcomms, masterank) 761 return 762 763 def _detach_swf_workers(self): 764 """.""" 765 kmrso.kmr_detach_swf_workers(self._ckmr) 766 return 767 768 def _stop_swf_workers(self): 769 """.""" 770 kmrso.kmr_stop_swf_workers(self._ckmr) 771 return 772 773 def _finish_swf(self): 774 """.""" 775 kmrso.kmr_finish_swf(self._ckmr) 776 return 777 778 def _split_swf_lanes(self, masterrank, description, dump): 779 """.""" 780 comms = (_c_mpi_comm * 4)() 781 desc = (ctypes.c_char_p * (len(description) + 1))() 782 desc[:-1] = description 783 desc[len(description)] = None 784 kmrso.kmr_split_swf_lanes(self._ckmr, comms, masterrank, desc, dump) 785 return comms 786 787 def _dump_swf_lanes(self): 788 """.""" 789 kmrso.kmr_dump_swf_lanes(self._ckmr) 790 return 791 792 def _set_swf_verbosity(self, level): 793 """.""" 794 kmrso.kmr_set_swf_verbosity(self._ckmr, level) 795 return 796 797 def set_option(self, k, v): 798 """Sets KMR option, taking both arguments by strings.""" 799 800 kmrso.kmr_set_option_by_strings(self._ckmr, _encode(k), _encode(v)) 801 return 802 803_enabled_options_of_map = [ 804 "nothreading", "inspect", "keep_open", "take_ckpt"] 805 806_enabled_options_of_map_once = [ 807 "nothreading", "keep_open", "take_ckpt"] 808 809_enabled_options_of_map_ms = [ 810 "nothreading", "keep_open"] 811 812_enabled_options_of_reduce = [ 813 "nothreading", "inspect", "take_ckpt"] 814 815_enabled_options_of_reduce_as_one = [ 816 "inspect", "take_ckpt"] 817 818_enabled_options_of_shuffle = [ 819 "inspect", "key_as_rank", "take_ckpt"] 820 821_enabled_options_of_replicate = [ 822 "inspect", "rank_zero", "take_ckpt"] 823 824_enabled_options_of_distribute = [ 825 "nothreading", "inspect", "keep_open"] 826 827_enabled_options_of_sort_locally = [ 828 "nothreading", "inspect", "key_as_rank"] 829 830_enabled_options_of_sort = [ 831 "inspect"] 832 833class KVS(): 834 """KVS. Note that there are dummy KVS'es which are temporarily 835 created to hold the C structure of the KVS passed to 836 mapper/reducer functions. A dummy KVS has None in its "mr" 837 attribute.""" 838 839 # attributes: self._ckvs, self.mr, self._frameinfo. 840 841 def __init__(self, kmr_or_ckvs, keyty="opaque", valty="opaque"): 842 """Makes a KVS for a given KMR. Do not call the KVS constructor 843 directly, but use KMR.make_kvs() instead. A KVS is created 844 with the datatypes stored in the key and the value, specified 845 by the keywords "key=" and "value=". The datatype name is a 846 string, one of "opaque", "cstring", "integer", and "float8". 847 Most mappers and reducers (precisely, the methods that accepts 848 a function argument) take keyword arguments for the types, 849 defaulting with key="opaque" and value="opaque". The 850 datatypes affects the sorting order. """ 851 852 self.mr = None 853 854 """mr attribute holds a KMR context object. Note that mr is 855 not accessible from mapping/reducing functions.""" 856 857 self._ckvs = None 858 859 """_ckvs attribute holds a kvs in C.""" 860 861 self._frameinfo = None 862 863 """_frameinfo protects caller line information from garbage 864 collected.""" 865 866 if isinstance(kmr_or_ckvs, KMR): 867 kf = _field_name_type_map[keyty] 868 vf = _field_name_type_map[valty] 869 top = inspect.currentframe().f_back 870 self.mr = kmr_or_ckvs 871 (f, l, n) = _make_frame_info(top) 872 self._ckvs = kmrso.kmr_create_kvs7( 873 self.mr._ckmr, kf, vf, _c_option(), f, l, n) 874 self._frameinfo = (f, l, n) 875 elif isinstance(kmr_or_ckvs, _c_pointer): 876 ## Return a dummy KVS. 877 self.mr = None 878 self._ckvs = kmr_or_ckvs 879 else: 880 raise Exception("Bad call to kvs constructor") 881 882 def __del__(self): 883 if ((not self._is_dummy()) and (not _c_null_pointer(self._ckvs))): 884 self.free() 885 return 886 887 def free(self): 888 """Finishes the C part of a KVS.""" 889 890 if (self._is_dummy()): 891 raise Exception("Bad call to free_kvs on dummy KVS") 892 elif (_c_null_pointer(self._ckvs)): 893 raise Exception("Bad call to free_kvs on freed KVS") 894 elif ((not self.mr is None) and self.mr._dismissed): 895 ## Do not free when KMR object is dismissed. 896 pass 897 else: 898 kmrso.kmr_free_kvs(self._ckvs) 899 self._ckvs = _c_null_pointer_value 900 return self 901 902 def _is_dummy(self): 903 return (self.mr is None) 904 905 def _consume(self): 906 """Releases a now dangling C pointer.""" 907 908 self._ckvs = _c_null_pointer_value 909 910 def _encode_content(self, o, key_or_value): 911 """Marshalls an object with regard to the field type. It 912 retuns a 3-tuple, with length, value-union, and the 3nd to 913 keep a reference to a buffer.""" 914 915 kvty = self.get_field_type(key_or_value) 916 u = _c_unitsized() 917 if (kvty == "opaque"): 918 data = pickle.dumps(o, _pickle_protocol) 919 u.p = data 920 return (len(data), u, data) 921 elif (kvty == "cstring"): 922 if (not isinstance(o, str)): 923 raise Exception("Not 8-bit string for cstring: %s" % o) 924 ## (Add null for C string). 925 os = ((o + "\0") if force_null_terminate_in_cstring else o) 926 data = _encode(os) 927 u.p = data 928 return (len(data), u, data) 929 elif (kvty == "integer"): 930 u.i = o 931 return (ctypes.sizeof(_c_long), u, None) 932 elif (kvty == "float8"): 933 u.d = o 934 return (ctypes.sizeof(_c_double), u, None) 935 else: 936 raise Exception("Bad field type: %s" % kvty) 937 938 def _decode_content(self, siz, u, key_or_value): 939 """Unmarshalls an object with regard to the field type. It 940 returns integer 0 when the length is 0 (it is for a dummy 941 key-value used in kmr_map_once() etc).""" 942 943 if (siz == 0): 944 return 0 945 else: 946 kvty = self.get_field_type(key_or_value) 947 if (kvty == "opaque"): 948 data = ctypes.string_at(u.i, siz) 949 o = pickle.loads(data) 950 return o 951 elif (kvty == "cstring"): 952 ## (Delete null added for C string). 953 siz1 = ((siz - 1) if force_null_terminate_in_cstring else siz) 954 data = ctypes.string_at(u.i, siz1) 955 os = _decode(data) 956 return os 957 elif (kvty == "integer"): 958 return u.i 959 elif (kvty == "float8"): 960 return u.d 961 else: 962 raise Exception("Bad field type: %s" % kvty) 963 964 def get_field_type(self, key_or_value): 965 """Get a field type of a KVS.""" 966 967 if (_c_null_pointer(self._ckvs)): 968 raise Exception("Bad KVS (null C-object)") 969 if (key_or_value == _Slot.Key): 970 kvty = kmrso.kmr_get_key_type_ff(self._ckvs) 971 elif (key_or_value == _Slot.Value): 972 kvty = kmrso.kmr_get_value_type_ff(self._ckvs) 973 else: 974 raise Exception("Bad field %s" % key_or_value.name) 975 if (kvty == _kv_bad): 976 raise Exception("Bad field type value %d in KVS" % kvty) 977 else: 978 return _field_type_name_map[kvty] 979 980 def add(self, key, val): 981 """Adds a key-value pair.""" 982 983 self.add_kv(key, val) 984 return 985 986 def add_kv(self, key, val): 987 """Adds a key-value pair.""" 988 989 ## Note it keeps the created string until kmr_add_kv(), 990 ## because kvbox does not hold the references. 991 (klen, k, ks) = self._encode_content(key, _Slot.Key) 992 (vlen, v, vs) = self._encode_content(val, _Slot.Value) 993 cbox = _c_kvbox().set(klen, k, vlen, v) 994 kmrso.kmr_add_kv(self._ckvs, cbox) 995 return 996 997 def add_kv_done(self): 998 """Finishes adding key-value pairs.""" 999 1000 kmrso.kmr_add_kv_done(self._ckvs) 1001 return 1002 1003 def get_element_count(self): 1004 """Gets the total number of key-value pairs.""" 1005 1006 c = _c_long(0) 1007 kmrso.kmr_get_element_count(self._ckvs, ctypes.byref(c)) 1008 return c.value 1009 1010 def local_element_count(self): 1011 """Gets the number of key-value pairs locally.""" 1012 1013 c = _c_long(0) 1014 kmrso.kmr_local_element_count(self._ckvs, ctypes.byref(c)) 1015 return c.value 1016 1017 def map(self, fn, **mopts): 1018 """Maps simply.""" 1019 1020 (keyty, valty, mkkvo) = _get_options(mopts, True) 1021 cmopts = _c_option(mopts, _enabled_options_of_map) 1022 cfn = _wrap_mapfn(fn) 1023 ckvi = self._ckvs 1024 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1025 ckvo = (kvo._ckvs if (kvo is not None) else None) 1026 (f, l, n) = _make_frame_info(inspect.currentframe().f_back) 1027 kmrso.kmr_map9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n)) 1028 if (cmopts.inspect == 0): self._consume() 1029 return kvo 1030 1031 def map_once(self, rank_zero_only, fn, **mopts): 1032 """Maps once with a dummy key-value pair.""" 1033 1034 ## It needs dummy input; Never inspects. 1035 (keyty, valty, mkkvo) = _get_options(mopts, True) 1036 cmopts = _c_option(mopts, _enabled_options_of_map_once) 1037 cfn = _wrap_mapfn(fn) 1038 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1039 ckvo = (kvo._ckvs if (kvo is not None) else None) 1040 kmrso.kmr_map_once(ckvo, 0, cmopts, rank_zero_only, cfn) 1041 return kvo 1042 1043 def map_on_rank_zero(self, fn, **mopts): 1044 """Maps on rank0 only.""" 1045 1046 ## It needs dummy input. 1047 return self.map_once(True, fn, *mopts) 1048 1049 def map_rank_by_rank(self, fn, **mopts): 1050 """Maps sequentially with rank by rank for debugging.""" 1051 1052 (keyty, valty, mkkvo) = _get_options(mopts, True) 1053 cmopts = _c_option(mopts, _enabled_options_of_map) 1054 cfn = _wrap_mapfn(fn) 1055 ckvi = self._ckvs 1056 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1057 ckvo = (kvo._ckvs if (kvo is not None) else None) 1058 kmrso.kmr_map_rank_by_rank(ckvi, ckvo, 0, cmopts, cfn) 1059 if (cmopts.inspect == 0): self._consume() 1060 return kvo 1061 1062 def map_for_some(self, fn, **mopts): 1063 """Maps until some key-value are added.""" 1064 1065 (keyty, valty, mkkvo) = _get_options(mopts, True) 1066 cmopts = _c_option(mopts, _enabled_options_of_map) 1067 ckvi = self._ckvs 1068 cfn = _wrap_mapfn(fn) 1069 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1070 ckvo = (kvo._ckvs if (kvo is not None) else None) 1071 kmrso.kmr_map_for_some(ckvi, ckvo, 0, cmopts, cfn) 1072 if (cmopts.inspect == 0): self._consume() 1073 return kvo 1074 1075 def map_ms(self, fn, **mopts): 1076 """Maps in master-worker mode.""" 1077 1078 ## Its call is repeated until True (assuming MPI_SUCCESS==0). 1079 (keyty, valty, mkkvo) = _get_options(mopts, True) 1080 cmopts = _c_option(mopts, _enabled_options_of_map_ms) 1081 cfn = _wrap_mapfn(fn) 1082 ckvi = self._ckvs 1083 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1084 ckvo = (kvo._ckvs if (kvo is not None) else None) 1085 rr = 1 1086 while (rr != 0): 1087 rr = kmrso.kmr_map_ms(ckvi, ckvo, 0, cmopts, cfn) 1088 self._consume() 1089 return kvo 1090 1091 def map_ms_commands(self, fn, **xopts): 1092 """Maps in master-worker mode, and runs serial commands.""" 1093 1094 (sopts, mopts) = _filter_spawn_options(xopts) 1095 (keyty, valty, mkkvo) = _get_options(mopts, True) 1096 cmopts = _c_option(mopts, _enabled_options_of_map_ms) 1097 csopts = _c_spawn_option(sopts) 1098 cfn = _wrap_mapfn(fn) 1099 ckvi = self._ckvs 1100 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1101 ckvo = (kvo._ckvs if (kvo is not None) else None) 1102 rr = 1 1103 while (rr != 0): 1104 rr = kmrso.kmr_map_ms_commands(ckvi, ckvo, 0, cmopts, csopts, cfn) 1105 self._consume() 1106 return kvo 1107 1108 def map_via_spawn(self, fn, **xopts): 1109 """Maps on processes started by MPI_Comm_spawn().""" 1110 1111 (sopts, mopts) = _filter_spawn_options(xopts) 1112 (keyty, valty, mkkvo) = _get_options(mopts, True) 1113 cmopts = _c_option(mopts, _enabled_options_of_map) 1114 csopts = _c_spawn_option(sopts) 1115 cfn = _wrap_mapfn(fn) 1116 ckvi = self._ckvs 1117 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1118 ckvo = (kvo._ckvs if (kvo is not None) else None) 1119 kmrso.kmr_map_via_spawn(ckvi, ckvo, 0, _mpi_info_null, csopts, cfn) 1120 self._consume() 1121 return kvo 1122 1123 def map_processes(self, nonmpi, fn, **sopts): 1124 """Maps on processes started by MPI_Comm_spawn().""" 1125 1126 (keyty, valty, mkkvo) = _get_options(sopts, True) 1127 csopts = _c_spawn_option(sopts) 1128 cfn = _wrap_mapfn(fn) 1129 ckvi = self._ckvs 1130 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1131 ckvo = (kvo._ckvs if (kvo is not None) else None) 1132 kmrso.kmr_map_processes(nonmpi, ckvi, ckvo, 0, _mpi_info_null, 1133 csopts, cfn) 1134 self._consume() 1135 return kvo 1136 1137 def map_parallel_processes(self, fn, **sopts): 1138 """Maps on processes started by MPI_Comm_spawn().""" 1139 1140 return self.map_processes(False, fn, **sopts) 1141 1142 def map_serial_processes(self, fn, **sopts): 1143 """Maps on processes started by MPI_Comm_spawn().""" 1144 1145 return self.map_processes(True, fn, **sopts) 1146 1147 def reduce(self, fn, **mopts): 1148 """Reduces key-value pairs.""" 1149 1150 (keyty, valty, mkkvo) = _get_options(mopts, True) 1151 cmopts = _c_option(mopts, _enabled_options_of_reduce) 1152 cfn = _wrap_redfn(fn) 1153 ckvi = self._ckvs 1154 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1155 ckvo = (kvo._ckvs if (kvo is not None) else None) 1156 (f, l, n) = _make_frame_info(inspect.currentframe().f_back) 1157 kmrso.kmr_reduce9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n)) 1158 if (cmopts.inspect == 0): self._consume() 1159 return kvo 1160 1161 def reduce_as_one(self, fn, **mopts): 1162 """ Reduces once as if all pairs had the same key.""" 1163 1164 (keyty, valty, mkkvo) = _get_options(mopts, True) 1165 cmopts = _c_option(mopts, _enabled_options_of_reduce_as_one) 1166 cfn = _wrap_redfn(fn) 1167 ckvi = self._ckvs 1168 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1169 ckvo = (kvo._ckvs if (kvo is not None) else None) 1170 kmrso.kmr_reduce_as_one(ckvi, ckvo, 0, cmopts, cfn) 1171 if (cmopts.inspect == 0): self._consume() 1172 return kvo 1173 1174 def reduce_for_some(self, fn, **mopts): 1175 """Reduces until some key-value are added.""" 1176 1177 (keyty, valty, mkkvo) = _get_options(mopts, True) 1178 cmopts = _c_option(mopts, _enabled_options_of_reduce) 1179 cfn = _wrap_redfn(fn) 1180 ckvi = self._ckvs 1181 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1182 ckvo = (kvo._ckvs if (kvo is not None) else None) 1183 ## (NOTE: It passes a frame of reduce_for_some.) 1184 (f, l, n) = _make_frame_info(inspect.currentframe()) 1185 kmrso.kmr_reduce9(1, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n)) 1186 if (cmopts.inspect == 0): self._consume() 1187 return kvo 1188 1189 def reverse(self, **mopts): 1190 """Makes a new pair by swapping the key and the value.""" 1191 1192 keyty = self.get_field_type(_Slot.Key) 1193 valty = self.get_field_type(_Slot.Value) 1194 (_, _, mkkvo) = _get_options(mopts, False) 1195 cmopts = _c_option(mopts, _enabled_options_of_map) 1196 assert (mkkvo is True) 1197 ckvi = self._ckvs 1198 kvo = (KVS(self.mr, valty, keyty) if mkkvo else None) 1199 ckvo = (kvo._ckvs if (kvo is not None) else None) 1200 kmrso.kmr_reverse(ckvi, ckvo, cmopts) 1201 if (cmopts.inspect == 0): self._consume() 1202 return kvo 1203 1204 def shuffle(self, **mopts): 1205 """Shuffles key-value pairs.""" 1206 1207 keyty = self.get_field_type(_Slot.Key) 1208 valty = self.get_field_type(_Slot.Value) 1209 (_, _, mkkvo) = _get_options(mopts, False) 1210 cmopts = _c_option(mopts, _enabled_options_of_shuffle) 1211 ckvi = self._ckvs 1212 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1213 ckvo = (kvo._ckvs if (kvo is not None) else None) 1214 kmrso.kmr_shuffle(ckvi, ckvo, cmopts) 1215 if (cmopts.inspect == 0): self._consume() 1216 return kvo 1217 1218 def replicate(self, **mopts): 1219 """Replicates key-value pairs to be visible on all ranks.""" 1220 1221 keyty = self.get_field_type(_Slot.Key) 1222 valty = self.get_field_type(_Slot.Value) 1223 (_, _, mkkvo) = _get_options(mopts, False) 1224 cmopts = _c_option(mopts, _enabled_options_of_replicate) 1225 ckvi = self._ckvs 1226 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1227 ckvo = (kvo._ckvs if (kvo is not None) else None) 1228 kmrso.kmr_replicate(ckvi, ckvo, cmopts) 1229 if (cmopts.inspect == 0): self._consume() 1230 return kvo 1231 1232 def distribute(self, cyclic, **mopts): 1233 """Distributes pairs approximately evenly to ranks.""" 1234 1235 keyty = self.get_field_type(_Slot.Key) 1236 valty = self.get_field_type(_Slot.Value) 1237 (_, _, mkkvo) = _get_options(mopts, False) 1238 cmopts = _c_option(mopts, _enabled_options_of_distribute) 1239 ckvi = self._ckvs 1240 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1241 ckvo = (kvo._ckvs if (kvo is not None) else None) 1242 kmrso.kmr_distribute(ckvi, ckvo, cyclic, cmopts) 1243 if (cmopts.inspect == 0): self._consume() 1244 return kvo 1245 1246 def sort_locally(self, shuffling, **mopts): 1247 """Reorders key-value pairs in a single rank.""" 1248 1249 keyty = self.get_field_type(_Slot.Key) 1250 valty = self.get_field_type(_Slot.Value) 1251 (_, _, mkkvo) = _get_options(mopts, False) 1252 cmopts = _c_option(mopts, _enabled_options_of_sort_locally) 1253 ckvi = self._ckvs 1254 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1255 ckvo = (kvo._ckvs if (kvo is not None) else None) 1256 kmrso.kmr_sort_locally(ckvi, ckvo, shuffling, cmopts) 1257 if (cmopts.inspect == 0): self._consume() 1258 return kvo 1259 1260 def sort(self, **mopts): 1261 """Sorts a KVS globally.""" 1262 1263 keyty = self.get_field_type(_Slot.Key) 1264 valty = self.get_field_type(_Slot.Value) 1265 (_, _, mkkvo) = _get_options(mopts, False) 1266 cmopts = _c_option(mopts, _enabled_options_of_sort) 1267 ckvi = self._ckvs 1268 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1269 ckvo = (kvo._ckvs if (kvo is not None) else None) 1270 kmrso.kmr_sort(ckvi, ckvo, cmopts) 1271 if (cmopts.inspect == 0): self._consume() 1272 return kvo 1273 1274 def concatenate(self, *morekvs): 1275 """Concatenates a number of KVS'es to one.""" 1276 1277 keyty = self.get_field_type(_Slot.Key) 1278 valty = self.get_field_type(_Slot.Value) 1279 siz = (len(morekvs) + 1) 1280 ckvsvec = (_c_kvs * siz)() 1281 ckvsvec[0] = self._ckvs 1282 for i in range(0, len(morekvs)): 1283 ckvsvec[i + 1] = morekvs[i]._ckvs 1284 cn = _c_int(siz) 1285 kvo = KVS(self.mr, keyty, valty) 1286 ckvo = kvo._ckvs 1287 kmrso.kmr_concatenate_kvs(ckvsvec, cn, ckvo, _c_option()) 1288 for i in morekvs: 1289 i._consume() 1290 self._consume() 1291 return kvo 1292 1293 def read_files_reassemble(self, filename, color, offset, bytes_): 1294 """Reassembles files reading by ranks.""" 1295 1296 buf = _c_void_p() 1297 siz = _c_uint64(0) 1298 kmrso.kmr_read_files_reassemble( 1299 self.mr._ckmr, _encode(filename), color, offset, bytes_, 1300 ctypes.byref(buf), ctypes.byref(siz)) 1301 addr = buf.value 1302 ptr = (_c_ubyte * siz.value).from_address(addr) 1303 data = bytearray(ptr) 1304 kmrso.kmr_mfree(addr, siz.value) 1305 return data 1306 1307 def read_file_by_segments(self, filename, color): 1308 """Reads one file by segments and reassembles.""" 1309 1310 buf = _c_void_p() 1311 siz = _c_uint64(0) 1312 kmrso.kmr_read_file_by_segments( 1313 self.mr._ckmr, _encode(filename), color, 1314 ctypes.byref(buf), ctypes.byref(siz)) 1315 addr = buf.value 1316 ptr = (_c_ubyte * siz.value).from_address(addr) 1317 data = bytearray(ptr) 1318 kmrso.kmr_mfree(addr, siz.value) 1319 return data 1320 1321 def save(self): 1322 """Packs locally the contents of a KVS to a byte array.""" 1323 1324 buf = _c_void_p(0) 1325 siz = _c_size_t(0) 1326 kmrso.kmr_save_kvs(self._ckvs, ctypes.byref(buf), ctypes.byref(siz), 1327 _c_option()) 1328 addr = buf.value 1329 ptr = (_c_ubyte * siz.value).from_address(addr) 1330 data = bytearray(ptr) 1331 kmrso.kmr_mfree(addr, siz.value) 1332 return data 1333 1334 def restore(self, data): 1335 """Unpacks locally the contents of a KVS from a byte array.""" 1336 1337 kvo = KVS(self.mr, "opaque", "opaque") 1338 siz = len(data) 1339 addr = (_c_ubyte * siz).from_buffer(data) 1340 kmrso.kmr_restore_kvs(kvo._ckvs, addr, siz, _c_option()) 1341 return kvo 1342 1343 def _map_swf(self, fn, **xopts): 1344 """.""" 1345 (sopts, mopts) = _filter_spawn_options(xopts) 1346 (keyty, valty, mkkvo) = _get_options(mopts, True) 1347 cmopts = _c_option(mopts, _enabled_options_of_map) 1348 csopts = _c_spawn_option(sopts) 1349 cfn = _wrap_mapfn(fn) 1350 ckvi = self._ckvs 1351 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1352 ckvo = (kvo._ckvs if (kvo is not None) else None) 1353 kmrso.kmr_map_swf(ckvi, ckvo, 0, csopts, cfn) 1354 self._consume() 1355 return kvo 1356 1357def fin(): 1358 """Finishes using KMR4PY.""" 1359 1360 kmrso.kmr_fin() 1361 return 1362 1363def listify(kvs): 1364 """Returns an array of LOCAL contents.""" 1365 1366 a = kvs.local_element_count() * [None] 1367 def f (kv, kvi, kvo, i, *_data): 1368 a[i] = kv 1369 return 0 1370 kvo = kvs.map(f, output=False, inspect=True) 1371 assert (kvo is None) 1372 return a 1373 1374def _stringify_options(o): 1375 return _decode(kmrso.kmr_stringify_options(o)) 1376 1377def _stringify_file_options(o): 1378 return _decode(kmrso.kmr_stringify_file_options(o)) 1379 1380def _stringify_spawn_options(o): 1381 return _decode(kmrso.kmr_stringify_spawn_options(o)) 1382 1383def _check_ctypes_values(): 1384 """Checks if ctypes values are properly used.""" 1385 1386 if (not _c_null_pointer(_c_null_pointer_value)): 1387 raise Exception("BAD: C null pointer has a wrong value.") 1388 1389def _check_passing_options(): 1390 """Checks if the options are passed properly from Python to C.""" 1391 1392 for (option, stringify) in [ 1393 (_c_option, _stringify_options), 1394 (_c_file_option, _stringify_file_options), 1395 (_c_spawn_option, _stringify_spawn_options)]: 1396 for (o, _, _) in option._fields_: 1397 if ((o == "gap16") or (o == "gap32")): 1398 pass 1399 else: 1400 copts = option({o : 1}) 1401 s = stringify(copts) 1402 if (o != s): 1403 raise Exception("BAD: %s != %s" % (str(o), str(s))) 1404 1405## Document generator "pdoc" requires loading kmr4py.py without 1406## initializing MPI. 1407##if (sys.modules["mpi4py"].__name__ != "dummy_mpi4py"): 1408## _load_kmrso(kmrso_name) 1409 1410# Copyright (C) 2012-2018 RIKEN R-CCS 1411# This library is distributed WITHOUT ANY WARRANTY. This library can be 1412# redistributed and/or modified under the terms of the BSD 2-Clause License.
kmrso holds a libkmr.so library object.
kmrso_name holds a libkmr.so name, which can be set before calling KMR().
warning_function specifies the function used to issue warnings.
ignore_exceptions_in_map_fn=True makes exceptions ignored.
print_backtrace_in_map_fn=True makes backtraces are printed at exceptions in mapper/reducer functions.
force_null_terminate_in_cstring specifies to add a null-terminator in C strings. Do not change it while some KVS'es are live.
630class KMR(): 631 """KMR context.""" 632 633 ## attributes: self._ckmr, self.nprocs, self.rank, self.emptykvs, 634 ## self._dismissed. 635 636 def __init__(self, comm, info=None): 637 """Makes a KMR context with a given MPI communicator (comm), 638 which is used in succeeding operations. Info specifies its 639 options by MPI_Info. Arguments of comm/info are passed as a 640 long integer (assuming either an integer (int) or a pointer in 641 C). It also accepts an communicator instance of mpi4py.MPI.Comm, 642 a string "dummy" or "world" as a comm argument.""" 643 644 if (kmrso == None): 645 _load_kmrso(kmrso_name) 646 647 if (isinstance(info, (int))): 648 warninfo = False 649 cinfo = info 650 else: 651 warninfo = (info != None) 652 cinfo = _mpi_info_null 653 if (isinstance(comm, (int))): 654 warncomm = False 655 ccomm = comm 656 elif (isinstance(comm, mpi4py.MPI.Comm)): 657 warncomm = False 658 comm_ptr = mpi4py.MPI._addressof(comm) 659 if (mpi4py.MPI._sizeof(mpi4py.MPI.Comm) == ctypes.sizeof(_c_uint64)): 660 MPI_Comm = _c_uint64 661 else: 662 MPI_Comm = _c_void_p 663 ccomm = MPI_Comm.from_address(comm_ptr) 664 elif (comm == "dummy"): 665 warncomm = False 666 ccomm = _mpi_comm_self 667 elif (comm == "world"): 668 warncomm = False 669 ccomm = _mpi_comm_world 670 else: 671 warncomm = True 672 ccomm = _mpi_comm_world 673 674 self._ckmr = kmrso.kmr_create_context(ccomm, cinfo, _encode("")) 675 676 """self._ckmr holds the C part of a KMR context.""" 677 678 if (_c_null_pointer(self._ckmr)): 679 raise Exception("kmr_create_context: failed") 680 681 self._dismissed = False 682 683 """self._dismissed=True disables freeing KVS'es (by memory 684 management) which remain unconsumed after dismissing a KMR 685 context. It is because freeing them causes referencing 686 dangling pointers in C.""" 687 688 self.emptykvs = KVS(self).free() 689 690 """self.emptykvs holds an empty KVS needed by map_once, 691 map_on_rank_zero, read_files_reassemble, and 692 read_file_by_segments.""" 693 694 self.nprocs = kmrso.kmr_get_nprocs(self._ckmr) 695 696 """self.nprocs holds an nprocs of MPI.""" 697 698 self.rank = kmrso.kmr_get_rank(self._ckmr) 699 700 """self.rank holds a rank of MPI.""" 701 702 if (warncomm and (self.rank == 0)): 703 warning_function("MPI comm ignored in KMR() constructor.", RuntimeWarning) 704 if (warninfo and (self.rank == 0)): 705 warning_function("MPI info ignored in KMR() constructor.", RuntimeWarning) 706 return 707 708 def __del__(self): 709 self.dismiss() 710 return 711 712 def free(self): 713 """Dismisses KMR (an alias of dismiss()).""" 714 715 self.dismiss() 716 717 def dismiss(self): 718 """Dismisses KMR.""" 719 720 if (not _c_null_pointer(self._ckmr)): 721 kmrso.kmr_free_context(self._ckmr) 722 self._ckmr = _c_null_pointer_value 723 self._dismissed = True 724 self.emptykvs = None 725 self.nprocs = -1 726 self.rank = -1 727 return 728 729 def create_kvs(self, **opts): 730 """Makes a new KVS (an alias of make_kvs()).""" 731 732 self.make_kvs(**opts) 733 734 def make_kvs(self, **opts): 735 """Makes a new KVS.""" 736 737 (keyty, valty, _) = _get_options(opts, True) 738 return KVS(self, keyty, valty) 739 740 def reply_to_spawner(self): 741 """Sends a reply message from a spawned process.""" 742 743 kmrso.kmr_reply_to_spawner(self._ckmr) 744 return 745 746 def get_spawner_communicator(self, index): 747 """Obtains a parent communicator of a spawned process. C version 748 returns a reference, but this returns an entity""" 749 750 commref = kmrso.kmr_get_spawner_communicator(self._ckmr, index) 751 return commref.contents.value 752 753 def send_kvs_to_spawner(self, kvs): 754 """Sends the KVS from a spawned process to the spawner.""" 755 756 kmrso.kmr_send_kvs_to_spawner(self._ckmr, kvs._ckvs) 757 return 758 759 def _init_swf(self, splitcomms, masterank): 760 """.""" 761 kmrso.kmr_init_swf(self._ckmr, splitcomms, masterank) 762 return 763 764 def _detach_swf_workers(self): 765 """.""" 766 kmrso.kmr_detach_swf_workers(self._ckmr) 767 return 768 769 def _stop_swf_workers(self): 770 """.""" 771 kmrso.kmr_stop_swf_workers(self._ckmr) 772 return 773 774 def _finish_swf(self): 775 """.""" 776 kmrso.kmr_finish_swf(self._ckmr) 777 return 778 779 def _split_swf_lanes(self, masterrank, description, dump): 780 """.""" 781 comms = (_c_mpi_comm * 4)() 782 desc = (ctypes.c_char_p * (len(description) + 1))() 783 desc[:-1] = description 784 desc[len(description)] = None 785 kmrso.kmr_split_swf_lanes(self._ckmr, comms, masterrank, desc, dump) 786 return comms 787 788 def _dump_swf_lanes(self): 789 """.""" 790 kmrso.kmr_dump_swf_lanes(self._ckmr) 791 return 792 793 def _set_swf_verbosity(self, level): 794 """.""" 795 kmrso.kmr_set_swf_verbosity(self._ckmr, level) 796 return 797 798 def set_option(self, k, v): 799 """Sets KMR option, taking both arguments by strings.""" 800 801 kmrso.kmr_set_option_by_strings(self._ckmr, _encode(k), _encode(v)) 802 return
KMR context.
636 def __init__(self, comm, info=None): 637 """Makes a KMR context with a given MPI communicator (comm), 638 which is used in succeeding operations. Info specifies its 639 options by MPI_Info. Arguments of comm/info are passed as a 640 long integer (assuming either an integer (int) or a pointer in 641 C). It also accepts an communicator instance of mpi4py.MPI.Comm, 642 a string "dummy" or "world" as a comm argument.""" 643 644 if (kmrso == None): 645 _load_kmrso(kmrso_name) 646 647 if (isinstance(info, (int))): 648 warninfo = False 649 cinfo = info 650 else: 651 warninfo = (info != None) 652 cinfo = _mpi_info_null 653 if (isinstance(comm, (int))): 654 warncomm = False 655 ccomm = comm 656 elif (isinstance(comm, mpi4py.MPI.Comm)): 657 warncomm = False 658 comm_ptr = mpi4py.MPI._addressof(comm) 659 if (mpi4py.MPI._sizeof(mpi4py.MPI.Comm) == ctypes.sizeof(_c_uint64)): 660 MPI_Comm = _c_uint64 661 else: 662 MPI_Comm = _c_void_p 663 ccomm = MPI_Comm.from_address(comm_ptr) 664 elif (comm == "dummy"): 665 warncomm = False 666 ccomm = _mpi_comm_self 667 elif (comm == "world"): 668 warncomm = False 669 ccomm = _mpi_comm_world 670 else: 671 warncomm = True 672 ccomm = _mpi_comm_world 673 674 self._ckmr = kmrso.kmr_create_context(ccomm, cinfo, _encode("")) 675 676 """self._ckmr holds the C part of a KMR context.""" 677 678 if (_c_null_pointer(self._ckmr)): 679 raise Exception("kmr_create_context: failed") 680 681 self._dismissed = False 682 683 """self._dismissed=True disables freeing KVS'es (by memory 684 management) which remain unconsumed after dismissing a KMR 685 context. It is because freeing them causes referencing 686 dangling pointers in C.""" 687 688 self.emptykvs = KVS(self).free() 689 690 """self.emptykvs holds an empty KVS needed by map_once, 691 map_on_rank_zero, read_files_reassemble, and 692 read_file_by_segments.""" 693 694 self.nprocs = kmrso.kmr_get_nprocs(self._ckmr) 695 696 """self.nprocs holds an nprocs of MPI.""" 697 698 self.rank = kmrso.kmr_get_rank(self._ckmr) 699 700 """self.rank holds a rank of MPI.""" 701 702 if (warncomm and (self.rank == 0)): 703 warning_function("MPI comm ignored in KMR() constructor.", RuntimeWarning) 704 if (warninfo and (self.rank == 0)): 705 warning_function("MPI info ignored in KMR() constructor.", RuntimeWarning) 706 return
Makes a KMR context with a given MPI communicator (comm), which is used in succeeding operations. Info specifies its options by MPI_Info. Arguments of comm/info are passed as a long integer (assuming either an integer (int) or a pointer in C). It also accepts an communicator instance of mpi4py.MPI.Comm, a string "dummy" or "world" as a comm argument.
self.emptykvs holds an empty KVS needed by map_once, map_on_rank_zero, read_files_reassemble, and read_file_by_segments.
717 def dismiss(self): 718 """Dismisses KMR.""" 719 720 if (not _c_null_pointer(self._ckmr)): 721 kmrso.kmr_free_context(self._ckmr) 722 self._ckmr = _c_null_pointer_value 723 self._dismissed = True 724 self.emptykvs = None 725 self.nprocs = -1 726 self.rank = -1 727 return
Dismisses KMR.
729 def create_kvs(self, **opts): 730 """Makes a new KVS (an alias of make_kvs()).""" 731 732 self.make_kvs(**opts)
Makes a new KVS (an alias of make_kvs()).
734 def make_kvs(self, **opts): 735 """Makes a new KVS.""" 736 737 (keyty, valty, _) = _get_options(opts, True) 738 return KVS(self, keyty, valty)
Makes a new KVS.
740 def reply_to_spawner(self): 741 """Sends a reply message from a spawned process.""" 742 743 kmrso.kmr_reply_to_spawner(self._ckmr) 744 return
Sends a reply message from a spawned process.
746 def get_spawner_communicator(self, index): 747 """Obtains a parent communicator of a spawned process. C version 748 returns a reference, but this returns an entity""" 749 750 commref = kmrso.kmr_get_spawner_communicator(self._ckmr, index) 751 return commref.contents.value
Obtains a parent communicator of a spawned process. C version returns a reference, but this returns an entity
834class KVS(): 835 """KVS. Note that there are dummy KVS'es which are temporarily 836 created to hold the C structure of the KVS passed to 837 mapper/reducer functions. A dummy KVS has None in its "mr" 838 attribute.""" 839 840 # attributes: self._ckvs, self.mr, self._frameinfo. 841 842 def __init__(self, kmr_or_ckvs, keyty="opaque", valty="opaque"): 843 """Makes a KVS for a given KMR. Do not call the KVS constructor 844 directly, but use KMR.make_kvs() instead. A KVS is created 845 with the datatypes stored in the key and the value, specified 846 by the keywords "key=" and "value=". The datatype name is a 847 string, one of "opaque", "cstring", "integer", and "float8". 848 Most mappers and reducers (precisely, the methods that accepts 849 a function argument) take keyword arguments for the types, 850 defaulting with key="opaque" and value="opaque". The 851 datatypes affects the sorting order. """ 852 853 self.mr = None 854 855 """mr attribute holds a KMR context object. Note that mr is 856 not accessible from mapping/reducing functions.""" 857 858 self._ckvs = None 859 860 """_ckvs attribute holds a kvs in C.""" 861 862 self._frameinfo = None 863 864 """_frameinfo protects caller line information from garbage 865 collected.""" 866 867 if isinstance(kmr_or_ckvs, KMR): 868 kf = _field_name_type_map[keyty] 869 vf = _field_name_type_map[valty] 870 top = inspect.currentframe().f_back 871 self.mr = kmr_or_ckvs 872 (f, l, n) = _make_frame_info(top) 873 self._ckvs = kmrso.kmr_create_kvs7( 874 self.mr._ckmr, kf, vf, _c_option(), f, l, n) 875 self._frameinfo = (f, l, n) 876 elif isinstance(kmr_or_ckvs, _c_pointer): 877 ## Return a dummy KVS. 878 self.mr = None 879 self._ckvs = kmr_or_ckvs 880 else: 881 raise Exception("Bad call to kvs constructor") 882 883 def __del__(self): 884 if ((not self._is_dummy()) and (not _c_null_pointer(self._ckvs))): 885 self.free() 886 return 887 888 def free(self): 889 """Finishes the C part of a KVS.""" 890 891 if (self._is_dummy()): 892 raise Exception("Bad call to free_kvs on dummy KVS") 893 elif (_c_null_pointer(self._ckvs)): 894 raise Exception("Bad call to free_kvs on freed KVS") 895 elif ((not self.mr is None) and self.mr._dismissed): 896 ## Do not free when KMR object is dismissed. 897 pass 898 else: 899 kmrso.kmr_free_kvs(self._ckvs) 900 self._ckvs = _c_null_pointer_value 901 return self 902 903 def _is_dummy(self): 904 return (self.mr is None) 905 906 def _consume(self): 907 """Releases a now dangling C pointer.""" 908 909 self._ckvs = _c_null_pointer_value 910 911 def _encode_content(self, o, key_or_value): 912 """Marshalls an object with regard to the field type. It 913 retuns a 3-tuple, with length, value-union, and the 3nd to 914 keep a reference to a buffer.""" 915 916 kvty = self.get_field_type(key_or_value) 917 u = _c_unitsized() 918 if (kvty == "opaque"): 919 data = pickle.dumps(o, _pickle_protocol) 920 u.p = data 921 return (len(data), u, data) 922 elif (kvty == "cstring"): 923 if (not isinstance(o, str)): 924 raise Exception("Not 8-bit string for cstring: %s" % o) 925 ## (Add null for C string). 926 os = ((o + "\0") if force_null_terminate_in_cstring else o) 927 data = _encode(os) 928 u.p = data 929 return (len(data), u, data) 930 elif (kvty == "integer"): 931 u.i = o 932 return (ctypes.sizeof(_c_long), u, None) 933 elif (kvty == "float8"): 934 u.d = o 935 return (ctypes.sizeof(_c_double), u, None) 936 else: 937 raise Exception("Bad field type: %s" % kvty) 938 939 def _decode_content(self, siz, u, key_or_value): 940 """Unmarshalls an object with regard to the field type. It 941 returns integer 0 when the length is 0 (it is for a dummy 942 key-value used in kmr_map_once() etc).""" 943 944 if (siz == 0): 945 return 0 946 else: 947 kvty = self.get_field_type(key_or_value) 948 if (kvty == "opaque"): 949 data = ctypes.string_at(u.i, siz) 950 o = pickle.loads(data) 951 return o 952 elif (kvty == "cstring"): 953 ## (Delete null added for C string). 954 siz1 = ((siz - 1) if force_null_terminate_in_cstring else siz) 955 data = ctypes.string_at(u.i, siz1) 956 os = _decode(data) 957 return os 958 elif (kvty == "integer"): 959 return u.i 960 elif (kvty == "float8"): 961 return u.d 962 else: 963 raise Exception("Bad field type: %s" % kvty) 964 965 def get_field_type(self, key_or_value): 966 """Get a field type of a KVS.""" 967 968 if (_c_null_pointer(self._ckvs)): 969 raise Exception("Bad KVS (null C-object)") 970 if (key_or_value == _Slot.Key): 971 kvty = kmrso.kmr_get_key_type_ff(self._ckvs) 972 elif (key_or_value == _Slot.Value): 973 kvty = kmrso.kmr_get_value_type_ff(self._ckvs) 974 else: 975 raise Exception("Bad field %s" % key_or_value.name) 976 if (kvty == _kv_bad): 977 raise Exception("Bad field type value %d in KVS" % kvty) 978 else: 979 return _field_type_name_map[kvty] 980 981 def add(self, key, val): 982 """Adds a key-value pair.""" 983 984 self.add_kv(key, val) 985 return 986 987 def add_kv(self, key, val): 988 """Adds a key-value pair.""" 989 990 ## Note it keeps the created string until kmr_add_kv(), 991 ## because kvbox does not hold the references. 992 (klen, k, ks) = self._encode_content(key, _Slot.Key) 993 (vlen, v, vs) = self._encode_content(val, _Slot.Value) 994 cbox = _c_kvbox().set(klen, k, vlen, v) 995 kmrso.kmr_add_kv(self._ckvs, cbox) 996 return 997 998 def add_kv_done(self): 999 """Finishes adding key-value pairs.""" 1000 1001 kmrso.kmr_add_kv_done(self._ckvs) 1002 return 1003 1004 def get_element_count(self): 1005 """Gets the total number of key-value pairs.""" 1006 1007 c = _c_long(0) 1008 kmrso.kmr_get_element_count(self._ckvs, ctypes.byref(c)) 1009 return c.value 1010 1011 def local_element_count(self): 1012 """Gets the number of key-value pairs locally.""" 1013 1014 c = _c_long(0) 1015 kmrso.kmr_local_element_count(self._ckvs, ctypes.byref(c)) 1016 return c.value 1017 1018 def map(self, fn, **mopts): 1019 """Maps simply.""" 1020 1021 (keyty, valty, mkkvo) = _get_options(mopts, True) 1022 cmopts = _c_option(mopts, _enabled_options_of_map) 1023 cfn = _wrap_mapfn(fn) 1024 ckvi = self._ckvs 1025 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1026 ckvo = (kvo._ckvs if (kvo is not None) else None) 1027 (f, l, n) = _make_frame_info(inspect.currentframe().f_back) 1028 kmrso.kmr_map9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n)) 1029 if (cmopts.inspect == 0): self._consume() 1030 return kvo 1031 1032 def map_once(self, rank_zero_only, fn, **mopts): 1033 """Maps once with a dummy key-value pair.""" 1034 1035 ## It needs dummy input; Never inspects. 1036 (keyty, valty, mkkvo) = _get_options(mopts, True) 1037 cmopts = _c_option(mopts, _enabled_options_of_map_once) 1038 cfn = _wrap_mapfn(fn) 1039 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1040 ckvo = (kvo._ckvs if (kvo is not None) else None) 1041 kmrso.kmr_map_once(ckvo, 0, cmopts, rank_zero_only, cfn) 1042 return kvo 1043 1044 def map_on_rank_zero(self, fn, **mopts): 1045 """Maps on rank0 only.""" 1046 1047 ## It needs dummy input. 1048 return self.map_once(True, fn, *mopts) 1049 1050 def map_rank_by_rank(self, fn, **mopts): 1051 """Maps sequentially with rank by rank for debugging.""" 1052 1053 (keyty, valty, mkkvo) = _get_options(mopts, True) 1054 cmopts = _c_option(mopts, _enabled_options_of_map) 1055 cfn = _wrap_mapfn(fn) 1056 ckvi = self._ckvs 1057 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1058 ckvo = (kvo._ckvs if (kvo is not None) else None) 1059 kmrso.kmr_map_rank_by_rank(ckvi, ckvo, 0, cmopts, cfn) 1060 if (cmopts.inspect == 0): self._consume() 1061 return kvo 1062 1063 def map_for_some(self, fn, **mopts): 1064 """Maps until some key-value are added.""" 1065 1066 (keyty, valty, mkkvo) = _get_options(mopts, True) 1067 cmopts = _c_option(mopts, _enabled_options_of_map) 1068 ckvi = self._ckvs 1069 cfn = _wrap_mapfn(fn) 1070 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1071 ckvo = (kvo._ckvs if (kvo is not None) else None) 1072 kmrso.kmr_map_for_some(ckvi, ckvo, 0, cmopts, cfn) 1073 if (cmopts.inspect == 0): self._consume() 1074 return kvo 1075 1076 def map_ms(self, fn, **mopts): 1077 """Maps in master-worker mode.""" 1078 1079 ## Its call is repeated until True (assuming MPI_SUCCESS==0). 1080 (keyty, valty, mkkvo) = _get_options(mopts, True) 1081 cmopts = _c_option(mopts, _enabled_options_of_map_ms) 1082 cfn = _wrap_mapfn(fn) 1083 ckvi = self._ckvs 1084 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1085 ckvo = (kvo._ckvs if (kvo is not None) else None) 1086 rr = 1 1087 while (rr != 0): 1088 rr = kmrso.kmr_map_ms(ckvi, ckvo, 0, cmopts, cfn) 1089 self._consume() 1090 return kvo 1091 1092 def map_ms_commands(self, fn, **xopts): 1093 """Maps in master-worker mode, and runs serial commands.""" 1094 1095 (sopts, mopts) = _filter_spawn_options(xopts) 1096 (keyty, valty, mkkvo) = _get_options(mopts, True) 1097 cmopts = _c_option(mopts, _enabled_options_of_map_ms) 1098 csopts = _c_spawn_option(sopts) 1099 cfn = _wrap_mapfn(fn) 1100 ckvi = self._ckvs 1101 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1102 ckvo = (kvo._ckvs if (kvo is not None) else None) 1103 rr = 1 1104 while (rr != 0): 1105 rr = kmrso.kmr_map_ms_commands(ckvi, ckvo, 0, cmopts, csopts, cfn) 1106 self._consume() 1107 return kvo 1108 1109 def map_via_spawn(self, fn, **xopts): 1110 """Maps on processes started by MPI_Comm_spawn().""" 1111 1112 (sopts, mopts) = _filter_spawn_options(xopts) 1113 (keyty, valty, mkkvo) = _get_options(mopts, True) 1114 cmopts = _c_option(mopts, _enabled_options_of_map) 1115 csopts = _c_spawn_option(sopts) 1116 cfn = _wrap_mapfn(fn) 1117 ckvi = self._ckvs 1118 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1119 ckvo = (kvo._ckvs if (kvo is not None) else None) 1120 kmrso.kmr_map_via_spawn(ckvi, ckvo, 0, _mpi_info_null, csopts, cfn) 1121 self._consume() 1122 return kvo 1123 1124 def map_processes(self, nonmpi, fn, **sopts): 1125 """Maps on processes started by MPI_Comm_spawn().""" 1126 1127 (keyty, valty, mkkvo) = _get_options(sopts, True) 1128 csopts = _c_spawn_option(sopts) 1129 cfn = _wrap_mapfn(fn) 1130 ckvi = self._ckvs 1131 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1132 ckvo = (kvo._ckvs if (kvo is not None) else None) 1133 kmrso.kmr_map_processes(nonmpi, ckvi, ckvo, 0, _mpi_info_null, 1134 csopts, cfn) 1135 self._consume() 1136 return kvo 1137 1138 def map_parallel_processes(self, fn, **sopts): 1139 """Maps on processes started by MPI_Comm_spawn().""" 1140 1141 return self.map_processes(False, fn, **sopts) 1142 1143 def map_serial_processes(self, fn, **sopts): 1144 """Maps on processes started by MPI_Comm_spawn().""" 1145 1146 return self.map_processes(True, fn, **sopts) 1147 1148 def reduce(self, fn, **mopts): 1149 """Reduces key-value pairs.""" 1150 1151 (keyty, valty, mkkvo) = _get_options(mopts, True) 1152 cmopts = _c_option(mopts, _enabled_options_of_reduce) 1153 cfn = _wrap_redfn(fn) 1154 ckvi = self._ckvs 1155 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1156 ckvo = (kvo._ckvs if (kvo is not None) else None) 1157 (f, l, n) = _make_frame_info(inspect.currentframe().f_back) 1158 kmrso.kmr_reduce9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n)) 1159 if (cmopts.inspect == 0): self._consume() 1160 return kvo 1161 1162 def reduce_as_one(self, fn, **mopts): 1163 """ Reduces once as if all pairs had the same key.""" 1164 1165 (keyty, valty, mkkvo) = _get_options(mopts, True) 1166 cmopts = _c_option(mopts, _enabled_options_of_reduce_as_one) 1167 cfn = _wrap_redfn(fn) 1168 ckvi = self._ckvs 1169 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1170 ckvo = (kvo._ckvs if (kvo is not None) else None) 1171 kmrso.kmr_reduce_as_one(ckvi, ckvo, 0, cmopts, cfn) 1172 if (cmopts.inspect == 0): self._consume() 1173 return kvo 1174 1175 def reduce_for_some(self, fn, **mopts): 1176 """Reduces until some key-value are added.""" 1177 1178 (keyty, valty, mkkvo) = _get_options(mopts, True) 1179 cmopts = _c_option(mopts, _enabled_options_of_reduce) 1180 cfn = _wrap_redfn(fn) 1181 ckvi = self._ckvs 1182 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1183 ckvo = (kvo._ckvs if (kvo is not None) else None) 1184 ## (NOTE: It passes a frame of reduce_for_some.) 1185 (f, l, n) = _make_frame_info(inspect.currentframe()) 1186 kmrso.kmr_reduce9(1, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n)) 1187 if (cmopts.inspect == 0): self._consume() 1188 return kvo 1189 1190 def reverse(self, **mopts): 1191 """Makes a new pair by swapping the key and the value.""" 1192 1193 keyty = self.get_field_type(_Slot.Key) 1194 valty = self.get_field_type(_Slot.Value) 1195 (_, _, mkkvo) = _get_options(mopts, False) 1196 cmopts = _c_option(mopts, _enabled_options_of_map) 1197 assert (mkkvo is True) 1198 ckvi = self._ckvs 1199 kvo = (KVS(self.mr, valty, keyty) if mkkvo else None) 1200 ckvo = (kvo._ckvs if (kvo is not None) else None) 1201 kmrso.kmr_reverse(ckvi, ckvo, cmopts) 1202 if (cmopts.inspect == 0): self._consume() 1203 return kvo 1204 1205 def shuffle(self, **mopts): 1206 """Shuffles key-value pairs.""" 1207 1208 keyty = self.get_field_type(_Slot.Key) 1209 valty = self.get_field_type(_Slot.Value) 1210 (_, _, mkkvo) = _get_options(mopts, False) 1211 cmopts = _c_option(mopts, _enabled_options_of_shuffle) 1212 ckvi = self._ckvs 1213 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1214 ckvo = (kvo._ckvs if (kvo is not None) else None) 1215 kmrso.kmr_shuffle(ckvi, ckvo, cmopts) 1216 if (cmopts.inspect == 0): self._consume() 1217 return kvo 1218 1219 def replicate(self, **mopts): 1220 """Replicates key-value pairs to be visible on all ranks.""" 1221 1222 keyty = self.get_field_type(_Slot.Key) 1223 valty = self.get_field_type(_Slot.Value) 1224 (_, _, mkkvo) = _get_options(mopts, False) 1225 cmopts = _c_option(mopts, _enabled_options_of_replicate) 1226 ckvi = self._ckvs 1227 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1228 ckvo = (kvo._ckvs if (kvo is not None) else None) 1229 kmrso.kmr_replicate(ckvi, ckvo, cmopts) 1230 if (cmopts.inspect == 0): self._consume() 1231 return kvo 1232 1233 def distribute(self, cyclic, **mopts): 1234 """Distributes pairs approximately evenly to ranks.""" 1235 1236 keyty = self.get_field_type(_Slot.Key) 1237 valty = self.get_field_type(_Slot.Value) 1238 (_, _, mkkvo) = _get_options(mopts, False) 1239 cmopts = _c_option(mopts, _enabled_options_of_distribute) 1240 ckvi = self._ckvs 1241 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1242 ckvo = (kvo._ckvs if (kvo is not None) else None) 1243 kmrso.kmr_distribute(ckvi, ckvo, cyclic, cmopts) 1244 if (cmopts.inspect == 0): self._consume() 1245 return kvo 1246 1247 def sort_locally(self, shuffling, **mopts): 1248 """Reorders key-value pairs in a single rank.""" 1249 1250 keyty = self.get_field_type(_Slot.Key) 1251 valty = self.get_field_type(_Slot.Value) 1252 (_, _, mkkvo) = _get_options(mopts, False) 1253 cmopts = _c_option(mopts, _enabled_options_of_sort_locally) 1254 ckvi = self._ckvs 1255 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1256 ckvo = (kvo._ckvs if (kvo is not None) else None) 1257 kmrso.kmr_sort_locally(ckvi, ckvo, shuffling, cmopts) 1258 if (cmopts.inspect == 0): self._consume() 1259 return kvo 1260 1261 def sort(self, **mopts): 1262 """Sorts a KVS globally.""" 1263 1264 keyty = self.get_field_type(_Slot.Key) 1265 valty = self.get_field_type(_Slot.Value) 1266 (_, _, mkkvo) = _get_options(mopts, False) 1267 cmopts = _c_option(mopts, _enabled_options_of_sort) 1268 ckvi = self._ckvs 1269 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1270 ckvo = (kvo._ckvs if (kvo is not None) else None) 1271 kmrso.kmr_sort(ckvi, ckvo, cmopts) 1272 if (cmopts.inspect == 0): self._consume() 1273 return kvo 1274 1275 def concatenate(self, *morekvs): 1276 """Concatenates a number of KVS'es to one.""" 1277 1278 keyty = self.get_field_type(_Slot.Key) 1279 valty = self.get_field_type(_Slot.Value) 1280 siz = (len(morekvs) + 1) 1281 ckvsvec = (_c_kvs * siz)() 1282 ckvsvec[0] = self._ckvs 1283 for i in range(0, len(morekvs)): 1284 ckvsvec[i + 1] = morekvs[i]._ckvs 1285 cn = _c_int(siz) 1286 kvo = KVS(self.mr, keyty, valty) 1287 ckvo = kvo._ckvs 1288 kmrso.kmr_concatenate_kvs(ckvsvec, cn, ckvo, _c_option()) 1289 for i in morekvs: 1290 i._consume() 1291 self._consume() 1292 return kvo 1293 1294 def read_files_reassemble(self, filename, color, offset, bytes_): 1295 """Reassembles files reading by ranks.""" 1296 1297 buf = _c_void_p() 1298 siz = _c_uint64(0) 1299 kmrso.kmr_read_files_reassemble( 1300 self.mr._ckmr, _encode(filename), color, offset, bytes_, 1301 ctypes.byref(buf), ctypes.byref(siz)) 1302 addr = buf.value 1303 ptr = (_c_ubyte * siz.value).from_address(addr) 1304 data = bytearray(ptr) 1305 kmrso.kmr_mfree(addr, siz.value) 1306 return data 1307 1308 def read_file_by_segments(self, filename, color): 1309 """Reads one file by segments and reassembles.""" 1310 1311 buf = _c_void_p() 1312 siz = _c_uint64(0) 1313 kmrso.kmr_read_file_by_segments( 1314 self.mr._ckmr, _encode(filename), color, 1315 ctypes.byref(buf), ctypes.byref(siz)) 1316 addr = buf.value 1317 ptr = (_c_ubyte * siz.value).from_address(addr) 1318 data = bytearray(ptr) 1319 kmrso.kmr_mfree(addr, siz.value) 1320 return data 1321 1322 def save(self): 1323 """Packs locally the contents of a KVS to a byte array.""" 1324 1325 buf = _c_void_p(0) 1326 siz = _c_size_t(0) 1327 kmrso.kmr_save_kvs(self._ckvs, ctypes.byref(buf), ctypes.byref(siz), 1328 _c_option()) 1329 addr = buf.value 1330 ptr = (_c_ubyte * siz.value).from_address(addr) 1331 data = bytearray(ptr) 1332 kmrso.kmr_mfree(addr, siz.value) 1333 return data 1334 1335 def restore(self, data): 1336 """Unpacks locally the contents of a KVS from a byte array.""" 1337 1338 kvo = KVS(self.mr, "opaque", "opaque") 1339 siz = len(data) 1340 addr = (_c_ubyte * siz).from_buffer(data) 1341 kmrso.kmr_restore_kvs(kvo._ckvs, addr, siz, _c_option()) 1342 return kvo 1343 1344 def _map_swf(self, fn, **xopts): 1345 """.""" 1346 (sopts, mopts) = _filter_spawn_options(xopts) 1347 (keyty, valty, mkkvo) = _get_options(mopts, True) 1348 cmopts = _c_option(mopts, _enabled_options_of_map) 1349 csopts = _c_spawn_option(sopts) 1350 cfn = _wrap_mapfn(fn) 1351 ckvi = self._ckvs 1352 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1353 ckvo = (kvo._ckvs if (kvo is not None) else None) 1354 kmrso.kmr_map_swf(ckvi, ckvo, 0, csopts, cfn) 1355 self._consume() 1356 return kvo
KVS. Note that there are dummy KVS'es which are temporarily created to hold the C structure of the KVS passed to mapper/reducer functions. A dummy KVS has None in its "mr" attribute.
842 def __init__(self, kmr_or_ckvs, keyty="opaque", valty="opaque"): 843 """Makes a KVS for a given KMR. Do not call the KVS constructor 844 directly, but use KMR.make_kvs() instead. A KVS is created 845 with the datatypes stored in the key and the value, specified 846 by the keywords "key=" and "value=". The datatype name is a 847 string, one of "opaque", "cstring", "integer", and "float8". 848 Most mappers and reducers (precisely, the methods that accepts 849 a function argument) take keyword arguments for the types, 850 defaulting with key="opaque" and value="opaque". The 851 datatypes affects the sorting order. """ 852 853 self.mr = None 854 855 """mr attribute holds a KMR context object. Note that mr is 856 not accessible from mapping/reducing functions.""" 857 858 self._ckvs = None 859 860 """_ckvs attribute holds a kvs in C.""" 861 862 self._frameinfo = None 863 864 """_frameinfo protects caller line information from garbage 865 collected.""" 866 867 if isinstance(kmr_or_ckvs, KMR): 868 kf = _field_name_type_map[keyty] 869 vf = _field_name_type_map[valty] 870 top = inspect.currentframe().f_back 871 self.mr = kmr_or_ckvs 872 (f, l, n) = _make_frame_info(top) 873 self._ckvs = kmrso.kmr_create_kvs7( 874 self.mr._ckmr, kf, vf, _c_option(), f, l, n) 875 self._frameinfo = (f, l, n) 876 elif isinstance(kmr_or_ckvs, _c_pointer): 877 ## Return a dummy KVS. 878 self.mr = None 879 self._ckvs = kmr_or_ckvs 880 else: 881 raise Exception("Bad call to kvs constructor")
Makes a KVS for a given KMR. Do not call the KVS constructor directly, but use KMR.make_kvs() instead. A KVS is created with the datatypes stored in the key and the value, specified by the keywords "key=" and "value=". The datatype name is a string, one of "opaque", "cstring", "integer", and "float8". Most mappers and reducers (precisely, the methods that accepts a function argument) take keyword arguments for the types, defaulting with key="opaque" and value="opaque". The datatypes affects the sorting order.
mr attribute holds a KMR context object. Note that mr is not accessible from mapping/reducing functions.
888 def free(self): 889 """Finishes the C part of a KVS.""" 890 891 if (self._is_dummy()): 892 raise Exception("Bad call to free_kvs on dummy KVS") 893 elif (_c_null_pointer(self._ckvs)): 894 raise Exception("Bad call to free_kvs on freed KVS") 895 elif ((not self.mr is None) and self.mr._dismissed): 896 ## Do not free when KMR object is dismissed. 897 pass 898 else: 899 kmrso.kmr_free_kvs(self._ckvs) 900 self._ckvs = _c_null_pointer_value 901 return self
Finishes the C part of a KVS.
965 def get_field_type(self, key_or_value): 966 """Get a field type of a KVS.""" 967 968 if (_c_null_pointer(self._ckvs)): 969 raise Exception("Bad KVS (null C-object)") 970 if (key_or_value == _Slot.Key): 971 kvty = kmrso.kmr_get_key_type_ff(self._ckvs) 972 elif (key_or_value == _Slot.Value): 973 kvty = kmrso.kmr_get_value_type_ff(self._ckvs) 974 else: 975 raise Exception("Bad field %s" % key_or_value.name) 976 if (kvty == _kv_bad): 977 raise Exception("Bad field type value %d in KVS" % kvty) 978 else: 979 return _field_type_name_map[kvty]
Get a field type of a KVS.
981 def add(self, key, val): 982 """Adds a key-value pair.""" 983 984 self.add_kv(key, val) 985 return
Adds a key-value pair.
987 def add_kv(self, key, val): 988 """Adds a key-value pair.""" 989 990 ## Note it keeps the created string until kmr_add_kv(), 991 ## because kvbox does not hold the references. 992 (klen, k, ks) = self._encode_content(key, _Slot.Key) 993 (vlen, v, vs) = self._encode_content(val, _Slot.Value) 994 cbox = _c_kvbox().set(klen, k, vlen, v) 995 kmrso.kmr_add_kv(self._ckvs, cbox) 996 return
Adds a key-value pair.
998 def add_kv_done(self): 999 """Finishes adding key-value pairs.""" 1000 1001 kmrso.kmr_add_kv_done(self._ckvs) 1002 return
Finishes adding key-value pairs.
1004 def get_element_count(self): 1005 """Gets the total number of key-value pairs.""" 1006 1007 c = _c_long(0) 1008 kmrso.kmr_get_element_count(self._ckvs, ctypes.byref(c)) 1009 return c.value
Gets the total number of key-value pairs.
1011 def local_element_count(self): 1012 """Gets the number of key-value pairs locally.""" 1013 1014 c = _c_long(0) 1015 kmrso.kmr_local_element_count(self._ckvs, ctypes.byref(c)) 1016 return c.value
Gets the number of key-value pairs locally.
1018 def map(self, fn, **mopts): 1019 """Maps simply.""" 1020 1021 (keyty, valty, mkkvo) = _get_options(mopts, True) 1022 cmopts = _c_option(mopts, _enabled_options_of_map) 1023 cfn = _wrap_mapfn(fn) 1024 ckvi = self._ckvs 1025 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1026 ckvo = (kvo._ckvs if (kvo is not None) else None) 1027 (f, l, n) = _make_frame_info(inspect.currentframe().f_back) 1028 kmrso.kmr_map9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n)) 1029 if (cmopts.inspect == 0): self._consume() 1030 return kvo
Maps simply.
1032 def map_once(self, rank_zero_only, fn, **mopts): 1033 """Maps once with a dummy key-value pair.""" 1034 1035 ## It needs dummy input; Never inspects. 1036 (keyty, valty, mkkvo) = _get_options(mopts, True) 1037 cmopts = _c_option(mopts, _enabled_options_of_map_once) 1038 cfn = _wrap_mapfn(fn) 1039 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1040 ckvo = (kvo._ckvs if (kvo is not None) else None) 1041 kmrso.kmr_map_once(ckvo, 0, cmopts, rank_zero_only, cfn) 1042 return kvo
Maps once with a dummy key-value pair.
1044 def map_on_rank_zero(self, fn, **mopts): 1045 """Maps on rank0 only.""" 1046 1047 ## It needs dummy input. 1048 return self.map_once(True, fn, *mopts)
Maps on rank0 only.
1050 def map_rank_by_rank(self, fn, **mopts): 1051 """Maps sequentially with rank by rank for debugging.""" 1052 1053 (keyty, valty, mkkvo) = _get_options(mopts, True) 1054 cmopts = _c_option(mopts, _enabled_options_of_map) 1055 cfn = _wrap_mapfn(fn) 1056 ckvi = self._ckvs 1057 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1058 ckvo = (kvo._ckvs if (kvo is not None) else None) 1059 kmrso.kmr_map_rank_by_rank(ckvi, ckvo, 0, cmopts, cfn) 1060 if (cmopts.inspect == 0): self._consume() 1061 return kvo
Maps sequentially with rank by rank for debugging.
1063 def map_for_some(self, fn, **mopts): 1064 """Maps until some key-value are added.""" 1065 1066 (keyty, valty, mkkvo) = _get_options(mopts, True) 1067 cmopts = _c_option(mopts, _enabled_options_of_map) 1068 ckvi = self._ckvs 1069 cfn = _wrap_mapfn(fn) 1070 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1071 ckvo = (kvo._ckvs if (kvo is not None) else None) 1072 kmrso.kmr_map_for_some(ckvi, ckvo, 0, cmopts, cfn) 1073 if (cmopts.inspect == 0): self._consume() 1074 return kvo
Maps until some key-value are added.
1076 def map_ms(self, fn, **mopts): 1077 """Maps in master-worker mode.""" 1078 1079 ## Its call is repeated until True (assuming MPI_SUCCESS==0). 1080 (keyty, valty, mkkvo) = _get_options(mopts, True) 1081 cmopts = _c_option(mopts, _enabled_options_of_map_ms) 1082 cfn = _wrap_mapfn(fn) 1083 ckvi = self._ckvs 1084 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1085 ckvo = (kvo._ckvs if (kvo is not None) else None) 1086 rr = 1 1087 while (rr != 0): 1088 rr = kmrso.kmr_map_ms(ckvi, ckvo, 0, cmopts, cfn) 1089 self._consume() 1090 return kvo
Maps in master-worker mode.
1092 def map_ms_commands(self, fn, **xopts): 1093 """Maps in master-worker mode, and runs serial commands.""" 1094 1095 (sopts, mopts) = _filter_spawn_options(xopts) 1096 (keyty, valty, mkkvo) = _get_options(mopts, True) 1097 cmopts = _c_option(mopts, _enabled_options_of_map_ms) 1098 csopts = _c_spawn_option(sopts) 1099 cfn = _wrap_mapfn(fn) 1100 ckvi = self._ckvs 1101 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1102 ckvo = (kvo._ckvs if (kvo is not None) else None) 1103 rr = 1 1104 while (rr != 0): 1105 rr = kmrso.kmr_map_ms_commands(ckvi, ckvo, 0, cmopts, csopts, cfn) 1106 self._consume() 1107 return kvo
Maps in master-worker mode, and runs serial commands.
1109 def map_via_spawn(self, fn, **xopts): 1110 """Maps on processes started by MPI_Comm_spawn().""" 1111 1112 (sopts, mopts) = _filter_spawn_options(xopts) 1113 (keyty, valty, mkkvo) = _get_options(mopts, True) 1114 cmopts = _c_option(mopts, _enabled_options_of_map) 1115 csopts = _c_spawn_option(sopts) 1116 cfn = _wrap_mapfn(fn) 1117 ckvi = self._ckvs 1118 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1119 ckvo = (kvo._ckvs if (kvo is not None) else None) 1120 kmrso.kmr_map_via_spawn(ckvi, ckvo, 0, _mpi_info_null, csopts, cfn) 1121 self._consume() 1122 return kvo
Maps on processes started by MPI_Comm_spawn().
1124 def map_processes(self, nonmpi, fn, **sopts): 1125 """Maps on processes started by MPI_Comm_spawn().""" 1126 1127 (keyty, valty, mkkvo) = _get_options(sopts, True) 1128 csopts = _c_spawn_option(sopts) 1129 cfn = _wrap_mapfn(fn) 1130 ckvi = self._ckvs 1131 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1132 ckvo = (kvo._ckvs if (kvo is not None) else None) 1133 kmrso.kmr_map_processes(nonmpi, ckvi, ckvo, 0, _mpi_info_null, 1134 csopts, cfn) 1135 self._consume() 1136 return kvo
Maps on processes started by MPI_Comm_spawn().
1138 def map_parallel_processes(self, fn, **sopts): 1139 """Maps on processes started by MPI_Comm_spawn().""" 1140 1141 return self.map_processes(False, fn, **sopts)
Maps on processes started by MPI_Comm_spawn().
1143 def map_serial_processes(self, fn, **sopts): 1144 """Maps on processes started by MPI_Comm_spawn().""" 1145 1146 return self.map_processes(True, fn, **sopts)
Maps on processes started by MPI_Comm_spawn().
1148 def reduce(self, fn, **mopts): 1149 """Reduces key-value pairs.""" 1150 1151 (keyty, valty, mkkvo) = _get_options(mopts, True) 1152 cmopts = _c_option(mopts, _enabled_options_of_reduce) 1153 cfn = _wrap_redfn(fn) 1154 ckvi = self._ckvs 1155 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1156 ckvo = (kvo._ckvs if (kvo is not None) else None) 1157 (f, l, n) = _make_frame_info(inspect.currentframe().f_back) 1158 kmrso.kmr_reduce9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n)) 1159 if (cmopts.inspect == 0): self._consume() 1160 return kvo
Reduces key-value pairs.
1162 def reduce_as_one(self, fn, **mopts): 1163 """ Reduces once as if all pairs had the same key.""" 1164 1165 (keyty, valty, mkkvo) = _get_options(mopts, True) 1166 cmopts = _c_option(mopts, _enabled_options_of_reduce_as_one) 1167 cfn = _wrap_redfn(fn) 1168 ckvi = self._ckvs 1169 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1170 ckvo = (kvo._ckvs if (kvo is not None) else None) 1171 kmrso.kmr_reduce_as_one(ckvi, ckvo, 0, cmopts, cfn) 1172 if (cmopts.inspect == 0): self._consume() 1173 return kvo
Reduces once as if all pairs had the same key.
1175 def reduce_for_some(self, fn, **mopts): 1176 """Reduces until some key-value are added.""" 1177 1178 (keyty, valty, mkkvo) = _get_options(mopts, True) 1179 cmopts = _c_option(mopts, _enabled_options_of_reduce) 1180 cfn = _wrap_redfn(fn) 1181 ckvi = self._ckvs 1182 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1183 ckvo = (kvo._ckvs if (kvo is not None) else None) 1184 ## (NOTE: It passes a frame of reduce_for_some.) 1185 (f, l, n) = _make_frame_info(inspect.currentframe()) 1186 kmrso.kmr_reduce9(1, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n)) 1187 if (cmopts.inspect == 0): self._consume() 1188 return kvo
Reduces until some key-value are added.
1190 def reverse(self, **mopts): 1191 """Makes a new pair by swapping the key and the value.""" 1192 1193 keyty = self.get_field_type(_Slot.Key) 1194 valty = self.get_field_type(_Slot.Value) 1195 (_, _, mkkvo) = _get_options(mopts, False) 1196 cmopts = _c_option(mopts, _enabled_options_of_map) 1197 assert (mkkvo is True) 1198 ckvi = self._ckvs 1199 kvo = (KVS(self.mr, valty, keyty) if mkkvo else None) 1200 ckvo = (kvo._ckvs if (kvo is not None) else None) 1201 kmrso.kmr_reverse(ckvi, ckvo, cmopts) 1202 if (cmopts.inspect == 0): self._consume() 1203 return kvo
Makes a new pair by swapping the key and the value.
1205 def shuffle(self, **mopts): 1206 """Shuffles key-value pairs.""" 1207 1208 keyty = self.get_field_type(_Slot.Key) 1209 valty = self.get_field_type(_Slot.Value) 1210 (_, _, mkkvo) = _get_options(mopts, False) 1211 cmopts = _c_option(mopts, _enabled_options_of_shuffle) 1212 ckvi = self._ckvs 1213 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1214 ckvo = (kvo._ckvs if (kvo is not None) else None) 1215 kmrso.kmr_shuffle(ckvi, ckvo, cmopts) 1216 if (cmopts.inspect == 0): self._consume() 1217 return kvo
Shuffles key-value pairs.
1219 def replicate(self, **mopts): 1220 """Replicates key-value pairs to be visible on all ranks.""" 1221 1222 keyty = self.get_field_type(_Slot.Key) 1223 valty = self.get_field_type(_Slot.Value) 1224 (_, _, mkkvo) = _get_options(mopts, False) 1225 cmopts = _c_option(mopts, _enabled_options_of_replicate) 1226 ckvi = self._ckvs 1227 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1228 ckvo = (kvo._ckvs if (kvo is not None) else None) 1229 kmrso.kmr_replicate(ckvi, ckvo, cmopts) 1230 if (cmopts.inspect == 0): self._consume() 1231 return kvo
Replicates key-value pairs to be visible on all ranks.
1233 def distribute(self, cyclic, **mopts): 1234 """Distributes pairs approximately evenly to ranks.""" 1235 1236 keyty = self.get_field_type(_Slot.Key) 1237 valty = self.get_field_type(_Slot.Value) 1238 (_, _, mkkvo) = _get_options(mopts, False) 1239 cmopts = _c_option(mopts, _enabled_options_of_distribute) 1240 ckvi = self._ckvs 1241 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1242 ckvo = (kvo._ckvs if (kvo is not None) else None) 1243 kmrso.kmr_distribute(ckvi, ckvo, cyclic, cmopts) 1244 if (cmopts.inspect == 0): self._consume() 1245 return kvo
Distributes pairs approximately evenly to ranks.
1247 def sort_locally(self, shuffling, **mopts): 1248 """Reorders key-value pairs in a single rank.""" 1249 1250 keyty = self.get_field_type(_Slot.Key) 1251 valty = self.get_field_type(_Slot.Value) 1252 (_, _, mkkvo) = _get_options(mopts, False) 1253 cmopts = _c_option(mopts, _enabled_options_of_sort_locally) 1254 ckvi = self._ckvs 1255 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1256 ckvo = (kvo._ckvs if (kvo is not None) else None) 1257 kmrso.kmr_sort_locally(ckvi, ckvo, shuffling, cmopts) 1258 if (cmopts.inspect == 0): self._consume() 1259 return kvo
Reorders key-value pairs in a single rank.
1261 def sort(self, **mopts): 1262 """Sorts a KVS globally.""" 1263 1264 keyty = self.get_field_type(_Slot.Key) 1265 valty = self.get_field_type(_Slot.Value) 1266 (_, _, mkkvo) = _get_options(mopts, False) 1267 cmopts = _c_option(mopts, _enabled_options_of_sort) 1268 ckvi = self._ckvs 1269 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1270 ckvo = (kvo._ckvs if (kvo is not None) else None) 1271 kmrso.kmr_sort(ckvi, ckvo, cmopts) 1272 if (cmopts.inspect == 0): self._consume() 1273 return kvo
Sorts a KVS globally.
1275 def concatenate(self, *morekvs): 1276 """Concatenates a number of KVS'es to one.""" 1277 1278 keyty = self.get_field_type(_Slot.Key) 1279 valty = self.get_field_type(_Slot.Value) 1280 siz = (len(morekvs) + 1) 1281 ckvsvec = (_c_kvs * siz)() 1282 ckvsvec[0] = self._ckvs 1283 for i in range(0, len(morekvs)): 1284 ckvsvec[i + 1] = morekvs[i]._ckvs 1285 cn = _c_int(siz) 1286 kvo = KVS(self.mr, keyty, valty) 1287 ckvo = kvo._ckvs 1288 kmrso.kmr_concatenate_kvs(ckvsvec, cn, ckvo, _c_option()) 1289 for i in morekvs: 1290 i._consume() 1291 self._consume() 1292 return kvo
Concatenates a number of KVS'es to one.
1294 def read_files_reassemble(self, filename, color, offset, bytes_): 1295 """Reassembles files reading by ranks.""" 1296 1297 buf = _c_void_p() 1298 siz = _c_uint64(0) 1299 kmrso.kmr_read_files_reassemble( 1300 self.mr._ckmr, _encode(filename), color, offset, bytes_, 1301 ctypes.byref(buf), ctypes.byref(siz)) 1302 addr = buf.value 1303 ptr = (_c_ubyte * siz.value).from_address(addr) 1304 data = bytearray(ptr) 1305 kmrso.kmr_mfree(addr, siz.value) 1306 return data
Reassembles files reading by ranks.
1308 def read_file_by_segments(self, filename, color): 1309 """Reads one file by segments and reassembles.""" 1310 1311 buf = _c_void_p() 1312 siz = _c_uint64(0) 1313 kmrso.kmr_read_file_by_segments( 1314 self.mr._ckmr, _encode(filename), color, 1315 ctypes.byref(buf), ctypes.byref(siz)) 1316 addr = buf.value 1317 ptr = (_c_ubyte * siz.value).from_address(addr) 1318 data = bytearray(ptr) 1319 kmrso.kmr_mfree(addr, siz.value) 1320 return data
Reads one file by segments and reassembles.
1322 def save(self): 1323 """Packs locally the contents of a KVS to a byte array.""" 1324 1325 buf = _c_void_p(0) 1326 siz = _c_size_t(0) 1327 kmrso.kmr_save_kvs(self._ckvs, ctypes.byref(buf), ctypes.byref(siz), 1328 _c_option()) 1329 addr = buf.value 1330 ptr = (_c_ubyte * siz.value).from_address(addr) 1331 data = bytearray(ptr) 1332 kmrso.kmr_mfree(addr, siz.value) 1333 return data
Packs locally the contents of a KVS to a byte array.
1335 def restore(self, data): 1336 """Unpacks locally the contents of a KVS from a byte array.""" 1337 1338 kvo = KVS(self.mr, "opaque", "opaque") 1339 siz = len(data) 1340 addr = (_c_ubyte * siz).from_buffer(data) 1341 kmrso.kmr_restore_kvs(kvo._ckvs, addr, siz, _c_option()) 1342 return kvo
Unpacks locally the contents of a KVS from a byte array.
Finishes using KMR4PY.
1364def listify(kvs): 1365 """Returns an array of LOCAL contents.""" 1366 1367 a = kvs.local_element_count() * [None] 1368 def f (kv, kvi, kvo, i, *_data): 1369 a[i] = kv 1370 return 0 1371 kvo = kvs.map(f, output=False, inspect=True) 1372 assert (kvo is None) 1373 return a
Returns an array of LOCAL contents.