kmr4py

Python Binding for KMR Map-Reduce Library. This provides straightforward wrappers to the C routines. See more about KMR at "https://github.com/RIKEN-RCCS/kmr". All key-value data is stored in C structures after encoding/decoding Python objects to byte arrays in C. The documentation in Python is minimum, so please refer to the documentation in C. It works with Python3 (possibly 3.4 and later), but not with 2.x.

   1## kmr4py.py
   2## Copyright (C) 2012-2018 RIKEN R-CCS
   3
   4"""Python Binding for KMR Map-Reduce Library.  This provides
   5straightforward wrappers to the C routines.  See more about KMR at
   6"https://github.com/RIKEN-RCCS/kmr".  All key-value data is stored in C
   7structures after encoding/decoding Python objects to byte arrays in C.
   8The documentation in Python is minimum, so please refer to the
   9documentation in C.  It works with Python3 (possibly 3.4 and later),
  10but not with 2.x."""
  11
  12## NOTE: Importing MPI module from mpi4py package initializes for MPI
  13## execution.  Import MPI and then import kmr4py in application codes.
  14
  15from enum import Enum
  16import warnings
  17import ctypes
  18import pickle
  19import inspect
  20import traceback
  21import sys
  22##import mpi4py
  23
  24__version__ = "20201116"
  25kmrversion = "1.10"
  26
  27kmrso = None
  28
  29"""kmrso holds a libkmr.so library object."""
  30
  31kmrso_name = "libkmr.so"
  32
  33"""kmrso_name holds a libkmr.so name, which can be set before calling KMR()."""
  34
  35_kmrso_version = None
  36
  37"""_kmrso_version holds a version taken from a libkmr.so."""
  38
  39_pickle_protocol = pickle.DEFAULT_PROTOCOL
  40
  41warning_function = warnings.warn
  42
  43"""warning_function specifies the function used to issue warnings."""
  44
  45ignore_exceptions_in_map_fn = True
  46
  47"""ignore_exceptions_in_map_fn=True makes exceptions ignored."""
  48
  49print_backtrace_in_map_fn = True
  50
  51"""print_backtrace_in_map_fn=True makes backtraces are printed at
  52exceptions in mapper/reducer functions."""
  53
  54force_null_terminate_in_cstring = True
  55
  56"""force_null_terminate_in_cstring specifies to add a null-terminator
  57in C strings.  Do not change it while some KVS'es are live."""
  58
  59_c_pointer = ctypes.POINTER(ctypes.c_char)
  60_c_kmr = _c_pointer
  61_c_kvs = _c_pointer
  62_c_kvsvec = ctypes.c_void_p
  63_c_boxvec = ctypes.c_void_p
  64_c_fnp = ctypes.c_void_p
  65_c_void_p = ctypes.c_void_p
  66_c_ubyte = ctypes.c_ubyte
  67_c_bool = ctypes.c_bool
  68_c_int = ctypes.c_int
  69_c_uint = ctypes.c_uint
  70_c_long = ctypes.c_long
  71_c_uint8 = ctypes.c_uint8
  72_c_uint32 = ctypes.c_uint32
  73_c_uint64 = ctypes.c_uint64
  74_c_double = ctypes.c_double
  75_c_size_t = ctypes.c_size_t
  76_c_string = ctypes.c_char_p
  77
  78## _c_funcptr is ctypes._FuncPtr, but it is taken indirectly
  79## because it is hidden.
  80
  81##_c_funcptr = type(kmrso.kmr_init_2)
  82
  83## _c_null_pointer_value is a null value for ctypes.
  84
  85_c_null_pointer_value = _c_pointer()
  86
  87def _c_null_pointer(p):
  88    """Returns true if ctypes pointer is null."""
  89
  90    return (not bool(p))
  91
  92## _name_coding is used to pass a string to C routines and back.
  93
  94##_name_coding = "utf-8"
  95_name_coding = "latin-1"
  96
  97def _encode(us):
  98    return us.encode(_name_coding)
  99
 100def _decode(bs):
 101    return bs.decode(_name_coding)
 102
 103class _Slot(Enum):
 104    """Discrimination of a field indicated by key_or_value."""
 105    Key = 0
 106    Value = 1
 107
 108## Loading C routines of libkmr.so.
 109
 110def _setup_mpi_constants():
 111    """Imports values of some MPI constants.  Calling kmr_mpi_type_size
 112    and kmr_mpi_constant_value dose not need MPI be initialized."""
 113
 114    def c_type_by_size(siz):
 115        if (siz == ctypes.sizeof(_c_uint64)):
 116            return _c_uint64
 117        elif (siz == ctypes.sizeof(_c_uint32)):
 118            return _c_uint32
 119        else:
 120            raise Exception("Bad type size unknown: %d" % siz)
 121        return None
 122
 123    global _c_mpi_comm, _c_mpi_info
 124    global _mpi_comm_world, _mpi_comm_self, _mpi_info_null
 125
 126    siz = kmrso.kmr_mpi_type_size(_encode("MPI_Comm"))
 127    _c_mpi_comm = c_type_by_size(siz)
 128    siz = kmrso.kmr_mpi_type_size(_encode("MPI_Info"))
 129    _c_mpi_info = c_type_by_size(siz)
 130    _mpi_comm_world = kmrso.kmr_mpi_constant_value(_encode("MPI_COMM_WORLD"))
 131    _mpi_comm_self = kmrso.kmr_mpi_constant_value(_encode("MPI_COMM_SELF"))
 132    _mpi_info_null = kmrso.kmr_mpi_constant_value(_encode("MPI_INFO_NULL"))
 133    return
 134
 135def _load_kmrso(soname = "libkmr.so"):
 136    """Loads libkmr.so, and initializes some constants depending on the
 137    library."""
 138
 139    global kmrso, _kmrso_version
 140    global _c_funcptr
 141    global _kv_bad, _kv_opaque, _kv_cstring, _kv_integer, _kv_float8
 142    global _field_name_type_map, _field_type_name_map
 143    global _MKMAPFN, _MKREDFN
 144
 145    ## Load "libkmr.so".
 146
 147    kmrso = ctypes.CDLL(soname)
 148
 149    _kmrso_version = ctypes.c_int.in_dll(kmrso, "kmr_version").value
 150    if (__version__ != str(_kmrso_version)):
 151        warnings.warn(("Version unmatch with libkmr.so;"
 152                       + " found=" + str(_kmrso_version)
 153                       + " required=" + __version__),
 154                      RuntimeWarning)
 155
 156    kmrso.kmr_mpi_type_size.argtypes = [_c_string]
 157    kmrso.kmr_mpi_type_size.restype = _c_size_t
 158
 159    kmrso.kmr_mpi_constant_value.argtypes = [_c_string]
 160    kmrso.kmr_mpi_constant_value.restype = _c_uint64
 161
 162    _setup_mpi_constants()
 163
 164    _c_funcptr = type(kmrso.kmr_init_2)
 165
 166    kmrso.kmr_init_2.argtypes = [ctypes.c_int]
 167    kmrso.kmr_init_2.restype = ctypes.c_int
 168
 169    ## Initializes KMR at this point.
 170
 171    kmrso.kmr_init_2(0)
 172
 173    ## Library dependent constants.
 174
 175    _kv_bad = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_bad").value
 176    _kv_opaque = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_opaque").value
 177    _kv_cstring = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_cstring").value
 178    _kv_integer = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_integer").value
 179    _kv_float8 = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_float8").value
 180
 181    _field_name_type_map = {
 182        "opaque" : _kv_opaque, "cstring" : _kv_cstring,
 183        "integer" : _kv_integer, "float8" : _kv_float8}
 184
 185    _field_type_name_map = dict(
 186        map(tuple, map(reversed, _field_name_type_map.items())))
 187
 188    ## C-callable function factories.
 189
 190    _MKMAPFN = ctypes.CFUNCTYPE(_c_int, _c_kvbox, _c_kvs, _c_kvs,
 191                                _c_void_p, _c_long)
 192    _MKREDFN = ctypes.CFUNCTYPE(_c_int, _c_boxvec, _c_long,
 193                                _c_kvs, _c_kvs, _c_void_p)
 194
 195    kmrso.kmr_fin.argtypes = []
 196    kmrso.kmr_fin.restype = _c_int
 197
 198    kmrso.kmr_initialize_mpi.argtypes = [_c_pointer, _c_pointer]
 199    kmrso.kmr_initialize_mpi.restype = _c_int
 200
 201    kmrso.kmr_create_context.argtypes = [_c_mpi_comm, _c_mpi_info, _c_string]
 202    kmrso.kmr_create_context.restype = _c_pointer
 203
 204    kmrso.kmr_create_dummy_context.argtypes = []
 205    kmrso.kmr_create_dummy_context.restype = _c_pointer
 206
 207    kmrso.kmr_free_context.argtypes = [_c_kmr]
 208    kmrso.kmr_free_context.restype = None
 209
 210    kmrso.kmr_set_option_by_strings.argtypes = [_c_kmr, _c_string, _c_string]
 211    kmrso.kmr_set_option_by_strings.restype = None
 212
 213    kmrso.kmr_create_kvs7.argtypes = [
 214        _c_kmr, _c_int, _c_int, _c_option, _c_string, _c_int, _c_string]
 215    kmrso.kmr_create_kvs7.restype = _c_kvs
 216
 217    kmrso.kmr_add_kv.argtypes = [_c_kvs, _c_kvbox]
 218    kmrso.kmr_add_kv.restype = None
 219
 220    kmrso.kmr_add_kv_done.argtypes = [_c_kvs]
 221    kmrso.kmr_add_kv_done.restype = None
 222
 223    kmrso.kmr_get_element_count.argtypes = [_c_kvs]
 224    kmrso.kmr_get_element_count.restype = _c_long
 225
 226    kmrso.kmr_local_element_count.argtypes = [_c_kvs]
 227    kmrso.kmr_local_element_count.restype = _c_long
 228
 229    kmrso.kmr_map9.argtypes = [
 230        _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp,
 231        _c_string, _c_int, _c_string]
 232    kmrso.kmr_map9.restype = None
 233
 234    kmrso.kmr_map_once.argtypes = [_c_kvs, _c_void_p, _c_option,
 235                                   _c_bool, _c_fnp]
 236    kmrso.kmr_map_once.restype = None
 237
 238    kmrso.kmr_map_rank_by_rank.argtypes = [
 239        _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp]
 240    kmrso.kmr_map_rank_by_rank.restype = None
 241
 242    kmrso.kmr_map_for_some.argtypes = [
 243        _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp]
 244    kmrso.kmr_map_for_some.restype = None
 245
 246    kmrso.kmr_map_ms.argtypes = [_c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp]
 247    kmrso.kmr_map_ms.restype = _c_int
 248
 249    kmrso.kmr_map_ms_commands.argtypes = [
 250        _c_kvs, _c_kvs, _c_void_p, _c_option, _c_spawn_option, _c_fnp]
 251    kmrso.kmr_map_ms_commands.restype = _c_int
 252
 253    kmrso.kmr_map_via_spawn.argtypes = [
 254        _c_kvs, _c_kvs, _c_void_p, _c_mpi_info, _c_spawn_option, _c_fnp]
 255    kmrso.kmr_map_via_spawn.restype = None
 256
 257    kmrso.kmr_map_processes.argtypes = [
 258        _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_mpi_info,
 259        _c_spawn_option, _c_fnp]
 260    kmrso.kmr_map_processes.restype = None
 261
 262    kmrso.kmr_reduce9.argtypes = [
 263        _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp,
 264        _c_string, _c_int, _c_string]
 265    kmrso.kmr_reduce9.restype = None
 266
 267    kmrso.kmr_reduce_as_one.argtypes = [
 268        _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp]
 269    kmrso.kmr_reduce_as_one.type = None
 270
 271    kmrso.kmr_shuffle.argtypes = [_c_kvs, _c_kvs, _c_option]
 272    kmrso.kmr_shuffle.restype = None
 273
 274    kmrso.kmr_replicate.argtypes = [_c_kvs, _c_kvs, _c_option]
 275    kmrso.kmr_replicate.restype = None
 276
 277    kmrso.kmr_distribute.argtypes = [_c_kvs, _c_kvs, _c_bool, _c_option]
 278    kmrso.kmr_distribute.restype = None
 279
 280    kmrso.kmr_concatenate_kvs.argtypes = [_c_kvsvec, _c_int, _c_kvs, _c_option]
 281    kmrso.kmr_concatenate_kvs.restype = None
 282
 283    kmrso.kmr_reverse.argtypes = [_c_kvs, _c_kvs, _c_option]
 284    kmrso.kmr_reverse.restype = None
 285
 286    kmrso.kmr_sort.argtypes = [_c_kvs, _c_kvs, _c_option]
 287    kmrso.kmr_sort.restype = None
 288
 289    kmrso.kmr_sort_locally.argtypes = [_c_kvs, _c_kvs, _c_bool, _c_option]
 290    kmrso.kmr_sort_locally.restype = None
 291
 292    kmrso.kmr_reply_to_spawner.argtypes = [_c_kmr]
 293    kmrso.kmr_reply_to_spawner.restype = None
 294
 295    kmrso.kmr_send_kvs_to_spawner.argtypes = [_c_kmr, _c_kvs]
 296    kmrso.kmr_send_kvs_to_spawner.restype = None
 297
 298    kmrso.kmr_get_spawner_communicator.argtypes = [_c_void_p, _c_long]
 299    kmrso.kmr_get_spawner_communicator.restype = ctypes.POINTER(_c_mpi_comm)
 300
 301    kmrso.kmr_read_files_reassemble.argtypes = [
 302        _c_kmr, _c_string, _c_int, _c_uint64, _c_uint64,
 303        ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_uint64)]
 304    kmrso.kmr_read_files_reassemble.restype = None
 305
 306    kmrso.kmr_read_file_by_segments.argtypes = [
 307        _c_kmr, _c_string, _c_int,
 308        ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_uint64)]
 309    kmrso.kmr_read_file_by_segments.restype = None
 310
 311    kmrso.kmr_save_kvs.argtypes = [
 312        _c_kvs, ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_size_t),
 313        _c_option]
 314    kmrso.kmr_save_kvs.restype = None
 315
 316    kmrso.kmr_restore_kvs.argtypes = [
 317        _c_kvs, _c_void_p, _c_size_t, _c_option]
 318    kmrso.kmr_restore_kvs.restype = None
 319
 320    kmrso.kmr_dump_kvs.argtypes = [_c_kvs, _c_int]
 321    kmrso.kmr_dump_kvs.restype = None
 322
 323    kmrso.kmr_get_key_type_ff.argtypes = [_c_kvs]
 324    kmrso.kmr_get_key_type_ff.restype = _c_int
 325
 326    kmrso.kmr_get_value_type_ff.argtypes = [_c_kvs]
 327    kmrso.kmr_get_value_type_ff.restype = _c_int
 328
 329    kmrso.kmr_get_nprocs.argtypes = [_c_kmr]
 330    kmrso.kmr_get_nprocs.restype = _c_int
 331
 332    kmrso.kmr_get_rank.argtypes = [_c_kmr]
 333    kmrso.kmr_get_rank.restype = _c_int
 334
 335    kmrso.kmr_mfree.argtypes = [_c_void_p, _c_size_t]
 336    kmrso.kmr_mfree.restype = None
 337
 338    kmrso.kmr_stringify_options.argtypes = [_c_option]
 339    kmrso.kmr_stringify_options.restype = _c_string
 340
 341    kmrso.kmr_stringify_file_options.argtypes = [_c_file_option]
 342    kmrso.kmr_stringify_file_options.restype = _c_string
 343
 344    kmrso.kmr_stringify_spawn_options.argtypes = [_c_spawn_option]
 345    kmrso.kmr_stringify_spawn_options.restype = _c_string
 346
 347    kmrso.kmr_map_swf.argtypes = [
 348        _c_kvs, _c_kvs, _c_void_p, _c_spawn_option, _c_fnp]
 349    kmrso.kmr_map_swf.restype = None
 350
 351    kmrso.kmr_init_swf.argtypes = [
 352        _c_kmr, ctypes.POINTER(_c_mpi_comm), _c_int]
 353    kmrso.kmr_init_swf.restype = None
 354
 355    kmrso.kmr_detach_swf_workers.argtypes = [_c_kmr]
 356    kmrso.kmr_detach_swf_workers.restype = None
 357
 358    kmrso.kmr_stop_swf_workers.argtypes = [_c_kmr]
 359    kmrso.kmr_stop_swf_workers.restype = None
 360
 361    kmrso.kmr_finish_swf.argtypes = [_c_kmr]
 362    kmrso.kmr_finish_swf.restype = None
 363
 364    kmrso.kmr_split_swf_lanes.argtypes = [
 365        _c_kmr, ctypes.POINTER(_c_mpi_comm), _c_int,
 366        ctypes.POINTER(_c_string), _c_bool]
 367    kmrso.kmr_split_swf_lanes.restype = None
 368
 369    kmrso.kmr_dump_swf_lanes.argtypes = [_c_kmr]
 370    kmrso.kmr_dump_swf_lanes.restype = None
 371
 372    kmrso.kmr_set_swf_verbosity.argtypes = [_c_kmr]
 373    kmrso.kmr_set_swf_verbosity.restype = None
 374
 375    #receive_kvs_from_spawned_fn = kmrso.kmr_receive_kvs_from_spawned_fn
 376
 377    return None
 378
 379def _string_of_options(o):
 380    """Returns a print string of options for _c_option,
 381    _c_file_option, and _c_spawn_option."""
 382
 383    prefix = o.__class__.__name__
 384    attrs = o.__class__._fields_
 385    ss = []
 386    for (f, _, _) in attrs:
 387        if ((f not in ["gap16", "gap32"]) and getattr(o, f) == 1):
 388            ss.append(f + "=1")
 389    return (prefix + "(" + (",".join(ss)) + ")")
 390
 391class _c_option(ctypes.Structure):
 392    """kmr_option."""
 393
 394    _fields_ = [
 395        ("nothreading", _c_uint8, 1),
 396        ("inspect", _c_uint8, 1),
 397        ("keep_open", _c_uint8, 1),
 398        ("key_as_rank", _c_uint8, 1),
 399        ("rank_zero", _c_uint8, 1),
 400        ("collapse", _c_uint8, 1),
 401        ("take_ckpt", _c_uint8, 1),
 402        ("gap16", _c_uint, 16),
 403        ("gap32", _c_uint, 32)]
 404
 405    def __init__(self, opts=None, enabledlist=None):
 406        super(_c_option, self).__init__()
 407        if opts is None: opts = {}
 408        if enabledlist is None: enabledlist = []
 409        ## Sets the options as dictionary passed.
 410        for o, v in iter(opts.items()):
 411            if (o in ["key", "value", "output"]):
 412                ## "key", "value", and "output" are Python binding only.
 413                pass
 414            elif (enabledlist != [] and (o not in enabledlist)):
 415                raise Exception("Bad option: %s" % o)
 416            elif (o == "nothreading"):
 417                self.nothreading = v
 418            elif (o == "inspect"):
 419                self.inspect = v
 420            elif (o == "keep_open"):
 421                self.keep_open = v
 422            elif (o == "key_as_rank"):
 423                self.key_as_rank = v
 424            elif (o == "rank_zero"):
 425                self.rank_zero = v
 426            elif (o == "collapse"):
 427                self.collapse = v
 428            elif (o == "take_ckpt"):
 429                self.take_ckpt = v
 430            else:
 431                raise Exception("Bad option: %s" % o)
 432        return
 433
 434    def __str__(self):
 435        return _string_of_options(self)
 436
 437class _c_file_option(ctypes.Structure):
 438    """kmr_file_option."""
 439
 440    _fields_ = [
 441        ("each_rank", _c_uint8, 1),
 442        ("subdirectories", _c_uint8, 1),
 443        ("list_file", _c_uint8, 1),
 444        ("shuffle_names", _c_uint8, 1),
 445        ("gap16", _c_uint, 16),
 446        ("gap32", _c_uint, 32)]
 447
 448    def __init__(self, opts=None, enabledlist=None):
 449        super(_c_file_option, self).__init__()
 450        if opts is None: opts = {}
 451        if enabledlist is None: enabledlist = []
 452        ## Sets the options as dictionary passed.
 453        for o, v in iter(opts.items()):
 454            if (o == "key" or o == "output"):
 455                ## "key" and "output" are Python binding only.
 456                pass
 457            elif (enabledlist != [] and (o not in enabledlist)):
 458                raise Exception("Bad option: %s" % o)
 459            elif (o == "each_rank"):
 460                self.each_rank = v
 461            elif (o == "subdirectories"):
 462                self.subdirectories = v
 463            elif (o == "list_file"):
 464                self.list_file = v
 465            elif (o == "shuffle_names"):
 466                self.shuffle_names = v
 467            else:
 468                raise Exception("Bad option: %s" % o)
 469            return
 470
 471    def __str__(self):
 472        return _string_of_options(self)
 473
 474class _c_spawn_option(ctypes.Structure):
 475    """kmr_spawn_option."""
 476
 477    _fields_ = [
 478        ("separator_space", _c_uint8, 1),
 479        ("reply_each", _c_uint8, 1),
 480        ("reply_root", _c_uint8, 1),
 481        ("no_set_infos", _c_uint8, 1),
 482        ("take_ckpt", _c_uint8, 1),
 483        ("gap16", _c_uint, 16),
 484        ("gap32", _c_uint, 32)]
 485
 486    def __init__(self, opts=None, enabledlist=None):
 487        super(_c_spawn_option, self).__init__()
 488        if opts is None: opts = {}
 489        if enabledlist is None: enabledlist = []
 490        ## Sets the options as dictionary passed.
 491        for o, v in iter(opts.items()):
 492            if (o == "key" or o == "output"):
 493                ## "key" and "output" are Python binding only.
 494                pass
 495            elif (enabledlist != [] and (o not in enabledlist)):
 496                raise Exception("Bad option: %s" % o)
 497            elif (o == "separator_space"):
 498                self.separator_space = v
 499            elif (o == "reply_each"):
 500                self.reply_each = v
 501            elif (o == "reply_root"):
 502                self.reply_root = v
 503            elif (o == "no_set_infos"):
 504                self.no_set_infos = v
 505            elif (o == "take_ckpt"):
 506                self.take_ckpt = v
 507            else:
 508                raise Exception("Bad option: %s" % o)
 509        return
 510
 511    def __str__(self):
 512        return _string_of_options(self)
 513
 514_spawn_option_list = [_k for (_k, _, _) in _c_spawn_option._fields_]
 515_file_option_list = [_k for (_k, _, _) in _c_file_option._fields_]
 516
 517class _c_unitsized(ctypes.Union):
 518    """kmr_unit_sized {const char *p; long i; double d;}."""
 519
 520    _fields_ = [
 521        ("p", _c_string),
 522        ("i", _c_long),
 523        ("d", _c_double)]
 524
 525class _c_kvbox(ctypes.Structure):
 526    """kmr_kv_box {int klen, vlen; kmr_unit_sized k, v;}."""
 527
 528    _fields_ = [
 529        ("klen", _c_int),
 530        ("vlen", _c_int),
 531        ("k", _c_unitsized),
 532        ("v", _c_unitsized)]
 533
 534    ## NOTE: Defining __init__ with some arguments makes c-callback
 535    ## fail to call initializers.
 536
 537    def __init__(self):
 538        super(_c_kvbox, self).__init__()
 539
 540    def set(self, klen, key, vlen, val):
 541        self.klen = klen
 542        self.vlen = vlen
 543        self.k = key
 544        self.v = val
 545        return self
 546
 547def _wrap_mapfn(pyfn):
 548    """Returns a closure which calls a given Python map-function on
 549    the unmarshalled contents in KVS."""
 550
 551    if (pyfn is None):
 552        return 0
 553    elif (isinstance(pyfn, _c_funcptr)):
 554        return pyfn
 555    else:
 556        def applyfn(cbox, ckvi, ckvo, carg, cindex):
 557            kvi = KVS(ckvi)
 558            kvo = KVS(ckvo)
 559            key = kvi._decode_content(cbox.klen, cbox.k, _Slot.Key)
 560            val = kvi._decode_content(cbox.vlen, cbox.v, _Slot.Value)
 561            try:
 562                pyfn((key, val), kvi, kvo, cindex)
 563            except:
 564                warning_function(("Exception in Python callbacks: %s"
 565                                  % str(sys.exc_info()[1])),
 566                                 RuntimeWarning)
 567                if (print_backtrace_in_map_fn): traceback.print_exc()
 568            return (0 if ignore_exceptions_in_map_fn else -1)
 569        return _MKMAPFN(applyfn)
 570
 571def _wrap_redfn(pyfn):
 572    """Returns a closure which calls a given Python reduce-function on
 573    the unmarshalled contents in KVS."""
 574
 575    if (pyfn is None):
 576        return 0
 577    elif (isinstance(pyfn, _c_funcptr)):
 578        return pyfn
 579    else:
 580        def applyfn(cboxvec, n, ckvi, ckvo, carg):
 581            kvi = KVS(ckvi)
 582            kvo = KVS(ckvo)
 583            kvvec = []
 584            for i in range(0, n):
 585                pos = (cboxvec + ctypes.sizeof(_c_kvbox) * i)
 586                cbox = _c_kvbox.from_address(pos)
 587                key = kvi._decode_content(cbox.klen, cbox.k, _Slot.Key)
 588                val = kvi._decode_content(cbox.vlen, cbox.v, _Slot.Value)
 589                kvvec.append((key, val))
 590            try:
 591                pyfn(kvvec, kvi, kvo)
 592            except:
 593                warning_function(("Exception in Python callbacks: %s"
 594                                  % str(sys.exc_info()[1])),
 595                                 RuntimeWarning)
 596                if (print_backtrace_in_map_fn): traceback.print_exc()
 597            return (0 if ignore_exceptions_in_map_fn else -1)
 598        return _MKREDFN(applyfn)
 599
 600def _get_options(opts, with_keyty_valty):
 601    """Returns a triple of the options: a key field type, a value
 602    field type, and a flag of needs of output generation."""
 603
 604    if ((not with_keyty_valty) and (("key" in opts) or ("value" in opts))):
 605        raise Exception("Bad option: key= or value= not allowed")
 606    keyty = opts.get("key", "opaque")
 607    valty = opts.get("value", "opaque")
 608    mkkvo = opts.get("output", True)
 609    return (keyty, valty, mkkvo)
 610
 611def _make_frame_info(frame):
 612    sp = frame
 613    co = sp.f_code
 614    return (_encode(co.co_filename), sp.f_lineno, _encode(co.co_name))
 615
 616def _filter_spawn_options(opts):
 617    """Returns a pair of dictionaries, the 1st holds options to spawn,
 618    and the 2nd holds the other options."""
 619
 620    sopts = dict()
 621    mopts = dict()
 622    for o, v in iter(opts.items()):
 623        if (o in _spawn_option_list):
 624            sopts[o] = v
 625        else:
 626            mopts[o] = v
 627    return (sopts, mopts)
 628
 629class KMR():
 630    """KMR context."""
 631
 632    ## attributes: self._ckmr, self.nprocs, self.rank, self.emptykvs,
 633    ## self._dismissed.
 634
 635    def __init__(self, comm, info=None):
 636        """Makes a KMR context with a given MPI communicator (comm),
 637        which is used in succeeding operations.  Info specifies its
 638        options by MPI_Info.  Arguments of comm/info are passed as a
 639        long integer (assuming either an integer (int) or a pointer in
 640        C).  It also accepts an communicator instance of mpi4py.MPI.Comm,
 641        a string "dummy" or "world" as a comm argument."""
 642
 643        if (kmrso == None):
 644            _load_kmrso(kmrso_name)
 645
 646        if (isinstance(info, (int))):
 647            warninfo = False
 648            cinfo = info
 649        else:
 650            warninfo = (info != None)
 651            cinfo = _mpi_info_null
 652        if (isinstance(comm, (int))):
 653            warncomm = False
 654            ccomm = comm
 655        elif (isinstance(comm, mpi4py.MPI.Comm)):
 656            warncomm = False
 657            comm_ptr = mpi4py.MPI._addressof(comm)
 658            if (mpi4py.MPI._sizeof(mpi4py.MPI.Comm) == ctypes.sizeof(_c_uint64)):
 659                MPI_Comm = _c_uint64
 660            else:
 661                MPI_Comm = _c_void_p
 662            ccomm = MPI_Comm.from_address(comm_ptr)
 663        elif (comm == "dummy"):
 664            warncomm = False
 665            ccomm = _mpi_comm_self
 666        elif (comm == "world"):
 667            warncomm = False
 668            ccomm = _mpi_comm_world
 669        else:
 670            warncomm = True
 671            ccomm = _mpi_comm_world
 672
 673        self._ckmr = kmrso.kmr_create_context(ccomm, cinfo, _encode(""))
 674
 675        """self._ckmr holds the C part of a KMR context."""
 676
 677        if (_c_null_pointer(self._ckmr)):
 678            raise Exception("kmr_create_context: failed")
 679
 680        self._dismissed = False
 681
 682        """self._dismissed=True disables freeing KVS'es (by memory
 683        management) which remain unconsumed after dismissing a KMR
 684        context.  It is because freeing them causes referencing
 685        dangling pointers in C."""
 686
 687        self.emptykvs = KVS(self).free()
 688
 689        """self.emptykvs holds an empty KVS needed by map_once,
 690        map_on_rank_zero, read_files_reassemble, and
 691        read_file_by_segments."""
 692
 693        self.nprocs = kmrso.kmr_get_nprocs(self._ckmr)
 694
 695        """self.nprocs holds an nprocs of MPI."""
 696
 697        self.rank = kmrso.kmr_get_rank(self._ckmr)
 698
 699        """self.rank holds a rank of MPI."""
 700
 701        if (warncomm and (self.rank == 0)):
 702            warning_function("MPI comm ignored in KMR() constructor.", RuntimeWarning)
 703        if (warninfo and (self.rank == 0)):
 704            warning_function("MPI info ignored in KMR() constructor.", RuntimeWarning)
 705        return
 706
 707    def __del__(self):
 708        self.dismiss()
 709        return
 710
 711    def free(self):
 712        """Dismisses KMR (an alias of dismiss())."""
 713
 714        self.dismiss()
 715
 716    def dismiss(self):
 717        """Dismisses KMR."""
 718
 719        if (not _c_null_pointer(self._ckmr)):
 720            kmrso.kmr_free_context(self._ckmr)
 721        self._ckmr = _c_null_pointer_value
 722        self._dismissed = True
 723        self.emptykvs = None
 724        self.nprocs = -1
 725        self.rank = -1
 726        return
 727
 728    def create_kvs(self, **opts):
 729        """Makes a new KVS (an alias of make_kvs())."""
 730
 731        self.make_kvs(**opts)
 732
 733    def make_kvs(self, **opts):
 734        """Makes a new KVS."""
 735
 736        (keyty, valty, _) = _get_options(opts, True)
 737        return KVS(self, keyty, valty)
 738
 739    def reply_to_spawner(self):
 740        """Sends a reply message from a spawned process."""
 741
 742        kmrso.kmr_reply_to_spawner(self._ckmr)
 743        return
 744
 745    def get_spawner_communicator(self, index):
 746        """Obtains a parent communicator of a spawned process.  C version
 747        returns a reference, but this returns an entity"""
 748
 749        commref = kmrso.kmr_get_spawner_communicator(self._ckmr, index)
 750        return commref.contents.value
 751
 752    def send_kvs_to_spawner(self, kvs):
 753        """Sends the KVS from a spawned process to the spawner."""
 754
 755        kmrso.kmr_send_kvs_to_spawner(self._ckmr, kvs._ckvs)
 756        return
 757
 758    def _init_swf(self, splitcomms, masterank):
 759        """."""
 760        kmrso.kmr_init_swf(self._ckmr, splitcomms, masterank)
 761        return
 762
 763    def _detach_swf_workers(self):
 764        """."""
 765        kmrso.kmr_detach_swf_workers(self._ckmr)
 766        return
 767
 768    def _stop_swf_workers(self):
 769        """."""
 770        kmrso.kmr_stop_swf_workers(self._ckmr)
 771        return
 772
 773    def _finish_swf(self):
 774        """."""
 775        kmrso.kmr_finish_swf(self._ckmr)
 776        return
 777
 778    def _split_swf_lanes(self, masterrank, description, dump):
 779        """."""
 780        comms = (_c_mpi_comm * 4)()
 781        desc = (ctypes.c_char_p * (len(description) + 1))()
 782        desc[:-1] = description
 783        desc[len(description)] = None
 784        kmrso.kmr_split_swf_lanes(self._ckmr, comms, masterrank, desc, dump)
 785        return comms
 786
 787    def _dump_swf_lanes(self):
 788        """."""
 789        kmrso.kmr_dump_swf_lanes(self._ckmr)
 790        return
 791
 792    def _set_swf_verbosity(self, level):
 793        """."""
 794        kmrso.kmr_set_swf_verbosity(self._ckmr, level)
 795        return
 796
 797    def set_option(self, k, v):
 798        """Sets KMR option, taking both arguments by strings."""
 799
 800        kmrso.kmr_set_option_by_strings(self._ckmr, _encode(k), _encode(v))
 801        return
 802
 803_enabled_options_of_map = [
 804    "nothreading", "inspect", "keep_open", "take_ckpt"]
 805
 806_enabled_options_of_map_once = [
 807    "nothreading", "keep_open", "take_ckpt"]
 808
 809_enabled_options_of_map_ms = [
 810    "nothreading", "keep_open"]
 811
 812_enabled_options_of_reduce = [
 813    "nothreading", "inspect", "take_ckpt"]
 814
 815_enabled_options_of_reduce_as_one = [
 816    "inspect", "take_ckpt"]
 817
 818_enabled_options_of_shuffle = [
 819    "inspect", "key_as_rank", "take_ckpt"]
 820
 821_enabled_options_of_replicate = [
 822    "inspect", "rank_zero", "take_ckpt"]
 823
 824_enabled_options_of_distribute = [
 825    "nothreading", "inspect", "keep_open"]
 826
 827_enabled_options_of_sort_locally = [
 828    "nothreading", "inspect", "key_as_rank"]
 829
 830_enabled_options_of_sort = [
 831    "inspect"]
 832
 833class KVS():
 834    """KVS.  Note that there are dummy KVS'es which are temporarily
 835    created to hold the C structure of the KVS passed to
 836    mapper/reducer functions.  A dummy KVS has None in its "mr"
 837    attribute."""
 838
 839    # attributes: self._ckvs, self.mr, self._frameinfo.
 840
 841    def __init__(self, kmr_or_ckvs, keyty="opaque", valty="opaque"):
 842        """Makes a KVS for a given KMR.  Do not call the KVS constructor
 843        directly, but use KMR.make_kvs() instead.  A KVS is created
 844        with the datatypes stored in the key and the value, specified
 845        by the keywords "key=" and "value=".  The datatype name is a
 846        string, one of "opaque", "cstring", "integer", and "float8".
 847        Most mappers and reducers (precisely, the methods that accepts
 848        a function argument) take keyword arguments for the types,
 849        defaulting with key="opaque" and value="opaque".  The
 850        datatypes affects the sorting order.  """
 851
 852        self.mr = None
 853
 854        """mr attribute holds a KMR context object.  Note that mr is
 855        not accessible from mapping/reducing functions."""
 856
 857        self._ckvs = None
 858
 859        """_ckvs attribute holds a kvs in C."""
 860
 861        self._frameinfo = None
 862
 863        """_frameinfo protects caller line information from garbage
 864        collected."""
 865
 866        if isinstance(kmr_or_ckvs, KMR):
 867            kf = _field_name_type_map[keyty]
 868            vf = _field_name_type_map[valty]
 869            top = inspect.currentframe().f_back
 870            self.mr = kmr_or_ckvs
 871            (f, l, n) = _make_frame_info(top)
 872            self._ckvs = kmrso.kmr_create_kvs7(
 873                self.mr._ckmr, kf, vf, _c_option(), f, l, n)
 874            self._frameinfo = (f, l, n)
 875        elif isinstance(kmr_or_ckvs, _c_pointer):
 876            ## Return a dummy KVS.
 877            self.mr = None
 878            self._ckvs = kmr_or_ckvs
 879        else:
 880            raise Exception("Bad call to kvs constructor")
 881
 882    def __del__(self):
 883        if ((not self._is_dummy()) and (not _c_null_pointer(self._ckvs))):
 884            self.free()
 885        return
 886
 887    def free(self):
 888        """Finishes the C part of a KVS."""
 889
 890        if (self._is_dummy()):
 891            raise Exception("Bad call to free_kvs on dummy KVS")
 892        elif (_c_null_pointer(self._ckvs)):
 893            raise Exception("Bad call to free_kvs on freed KVS")
 894        elif ((not self.mr is None) and self.mr._dismissed):
 895            ## Do not free when KMR object is dismissed.
 896            pass
 897        else:
 898            kmrso.kmr_free_kvs(self._ckvs)
 899            self._ckvs = _c_null_pointer_value
 900            return self
 901
 902    def _is_dummy(self):
 903        return (self.mr is None)
 904
 905    def _consume(self):
 906        """Releases a now dangling C pointer."""
 907
 908        self._ckvs = _c_null_pointer_value
 909
 910    def _encode_content(self, o, key_or_value):
 911        """Marshalls an object with regard to the field type.  It
 912        retuns a 3-tuple, with length, value-union, and the 3nd to
 913        keep a reference to a buffer."""
 914
 915        kvty = self.get_field_type(key_or_value)
 916        u = _c_unitsized()
 917        if (kvty == "opaque"):
 918            data = pickle.dumps(o, _pickle_protocol)
 919            u.p = data
 920            return (len(data), u, data)
 921        elif (kvty == "cstring"):
 922            if (not isinstance(o, str)):
 923                raise Exception("Not 8-bit string for cstring: %s" % o)
 924            ## (Add null for C string).
 925            os = ((o + "\0") if force_null_terminate_in_cstring else o)
 926            data = _encode(os)
 927            u.p = data
 928            return (len(data), u, data)
 929        elif (kvty == "integer"):
 930            u.i = o
 931            return (ctypes.sizeof(_c_long), u, None)
 932        elif (kvty == "float8"):
 933            u.d = o
 934            return (ctypes.sizeof(_c_double), u, None)
 935        else:
 936            raise Exception("Bad field type: %s" % kvty)
 937
 938    def _decode_content(self, siz, u, key_or_value):
 939        """Unmarshalls an object with regard to the field type.  It
 940        returns integer 0 when the length is 0 (it is for a dummy
 941        key-value used in kmr_map_once() etc)."""
 942
 943        if (siz == 0):
 944            return 0
 945        else:
 946            kvty = self.get_field_type(key_or_value)
 947            if (kvty == "opaque"):
 948                data = ctypes.string_at(u.i, siz)
 949                o = pickle.loads(data)
 950                return o
 951            elif (kvty == "cstring"):
 952                ## (Delete null added for C string).
 953                siz1 = ((siz - 1) if force_null_terminate_in_cstring else siz)
 954                data = ctypes.string_at(u.i, siz1)
 955                os = _decode(data)
 956                return os
 957            elif (kvty == "integer"):
 958                return u.i
 959            elif (kvty == "float8"):
 960                return u.d
 961            else:
 962                raise Exception("Bad field type: %s" % kvty)
 963
 964    def get_field_type(self, key_or_value):
 965        """Get a field type of a KVS."""
 966
 967        if (_c_null_pointer(self._ckvs)):
 968            raise Exception("Bad KVS (null C-object)")
 969        if (key_or_value == _Slot.Key):
 970            kvty = kmrso.kmr_get_key_type_ff(self._ckvs)
 971        elif (key_or_value == _Slot.Value):
 972            kvty = kmrso.kmr_get_value_type_ff(self._ckvs)
 973        else:
 974            raise Exception("Bad field %s" % key_or_value.name)
 975        if (kvty == _kv_bad):
 976            raise Exception("Bad field type value %d in KVS" % kvty)
 977        else:
 978            return _field_type_name_map[kvty]
 979
 980    def add(self, key, val):
 981        """Adds a key-value pair."""
 982
 983        self.add_kv(key, val)
 984        return
 985
 986    def add_kv(self, key, val):
 987        """Adds a key-value pair."""
 988
 989        ## Note it keeps the created string until kmr_add_kv(),
 990        ## because kvbox does not hold the references.
 991        (klen, k, ks) = self._encode_content(key, _Slot.Key)
 992        (vlen, v, vs) = self._encode_content(val, _Slot.Value)
 993        cbox = _c_kvbox().set(klen, k, vlen, v)
 994        kmrso.kmr_add_kv(self._ckvs, cbox)
 995        return
 996
 997    def add_kv_done(self):
 998        """Finishes adding key-value pairs."""
 999
1000        kmrso.kmr_add_kv_done(self._ckvs)
1001        return
1002
1003    def get_element_count(self):
1004        """Gets the total number of key-value pairs."""
1005
1006        c = _c_long(0)
1007        kmrso.kmr_get_element_count(self._ckvs, ctypes.byref(c))
1008        return c.value
1009
1010    def local_element_count(self):
1011        """Gets the number of key-value pairs locally."""
1012
1013        c = _c_long(0)
1014        kmrso.kmr_local_element_count(self._ckvs, ctypes.byref(c))
1015        return c.value
1016
1017    def map(self, fn, **mopts):
1018        """Maps simply."""
1019
1020        (keyty, valty, mkkvo) = _get_options(mopts, True)
1021        cmopts = _c_option(mopts, _enabled_options_of_map)
1022        cfn = _wrap_mapfn(fn)
1023        ckvi = self._ckvs
1024        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1025        ckvo = (kvo._ckvs if (kvo is not None) else None)
1026        (f, l, n) = _make_frame_info(inspect.currentframe().f_back)
1027        kmrso.kmr_map9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1028        if (cmopts.inspect == 0): self._consume()
1029        return kvo
1030
1031    def map_once(self, rank_zero_only, fn, **mopts):
1032        """Maps once with a dummy key-value pair."""
1033
1034        ## It needs dummy input; Never inspects.
1035        (keyty, valty, mkkvo) = _get_options(mopts, True)
1036        cmopts = _c_option(mopts, _enabled_options_of_map_once)
1037        cfn = _wrap_mapfn(fn)
1038        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1039        ckvo = (kvo._ckvs if (kvo is not None) else None)
1040        kmrso.kmr_map_once(ckvo, 0, cmopts, rank_zero_only, cfn)
1041        return kvo
1042
1043    def map_on_rank_zero(self, fn, **mopts):
1044        """Maps on rank0 only."""
1045
1046        ## It needs dummy input.
1047        return self.map_once(True, fn, *mopts)
1048
1049    def map_rank_by_rank(self, fn, **mopts):
1050        """Maps sequentially with rank by rank for debugging."""
1051
1052        (keyty, valty, mkkvo) = _get_options(mopts, True)
1053        cmopts = _c_option(mopts, _enabled_options_of_map)
1054        cfn = _wrap_mapfn(fn)
1055        ckvi = self._ckvs
1056        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1057        ckvo = (kvo._ckvs if (kvo is not None) else None)
1058        kmrso.kmr_map_rank_by_rank(ckvi, ckvo, 0, cmopts, cfn)
1059        if (cmopts.inspect == 0): self._consume()
1060        return kvo
1061
1062    def map_for_some(self, fn, **mopts):
1063        """Maps until some key-value are added."""
1064
1065        (keyty, valty, mkkvo) = _get_options(mopts, True)
1066        cmopts = _c_option(mopts, _enabled_options_of_map)
1067        ckvi = self._ckvs
1068        cfn = _wrap_mapfn(fn)
1069        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1070        ckvo = (kvo._ckvs if (kvo is not None) else None)
1071        kmrso.kmr_map_for_some(ckvi, ckvo, 0, cmopts, cfn)
1072        if (cmopts.inspect == 0): self._consume()
1073        return kvo
1074
1075    def map_ms(self, fn, **mopts):
1076        """Maps in master-worker mode."""
1077
1078        ## Its call is repeated until True (assuming MPI_SUCCESS==0).
1079        (keyty, valty, mkkvo) = _get_options(mopts, True)
1080        cmopts = _c_option(mopts, _enabled_options_of_map_ms)
1081        cfn = _wrap_mapfn(fn)
1082        ckvi = self._ckvs
1083        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1084        ckvo = (kvo._ckvs if (kvo is not None) else None)
1085        rr = 1
1086        while (rr != 0):
1087            rr = kmrso.kmr_map_ms(ckvi, ckvo, 0, cmopts, cfn)
1088        self._consume()
1089        return kvo
1090
1091    def map_ms_commands(self, fn, **xopts):
1092        """Maps in master-worker mode, and runs serial commands."""
1093
1094        (sopts, mopts) = _filter_spawn_options(xopts)
1095        (keyty, valty, mkkvo) = _get_options(mopts, True)
1096        cmopts = _c_option(mopts, _enabled_options_of_map_ms)
1097        csopts = _c_spawn_option(sopts)
1098        cfn = _wrap_mapfn(fn)
1099        ckvi = self._ckvs
1100        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1101        ckvo = (kvo._ckvs if (kvo is not None) else None)
1102        rr = 1
1103        while (rr != 0):
1104            rr = kmrso.kmr_map_ms_commands(ckvi, ckvo, 0, cmopts, csopts, cfn)
1105        self._consume()
1106        return kvo
1107
1108    def map_via_spawn(self, fn, **xopts):
1109        """Maps on processes started by MPI_Comm_spawn()."""
1110
1111        (sopts, mopts) = _filter_spawn_options(xopts)
1112        (keyty, valty, mkkvo) = _get_options(mopts, True)
1113        cmopts = _c_option(mopts, _enabled_options_of_map)
1114        csopts = _c_spawn_option(sopts)
1115        cfn = _wrap_mapfn(fn)
1116        ckvi = self._ckvs
1117        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1118        ckvo = (kvo._ckvs if (kvo is not None) else None)
1119        kmrso.kmr_map_via_spawn(ckvi, ckvo, 0, _mpi_info_null, csopts, cfn)
1120        self._consume()
1121        return kvo
1122
1123    def map_processes(self, nonmpi, fn, **sopts):
1124        """Maps on processes started by MPI_Comm_spawn()."""
1125
1126        (keyty, valty, mkkvo) = _get_options(sopts, True)
1127        csopts = _c_spawn_option(sopts)
1128        cfn = _wrap_mapfn(fn)
1129        ckvi = self._ckvs
1130        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1131        ckvo = (kvo._ckvs if (kvo is not None) else None)
1132        kmrso.kmr_map_processes(nonmpi, ckvi, ckvo, 0, _mpi_info_null,
1133                                csopts, cfn)
1134        self._consume()
1135        return kvo
1136
1137    def map_parallel_processes(self, fn, **sopts):
1138        """Maps on processes started by MPI_Comm_spawn()."""
1139
1140        return self.map_processes(False, fn, **sopts)
1141
1142    def map_serial_processes(self, fn, **sopts):
1143        """Maps on processes started by MPI_Comm_spawn()."""
1144
1145        return self.map_processes(True, fn, **sopts)
1146
1147    def reduce(self, fn, **mopts):
1148        """Reduces key-value pairs."""
1149
1150        (keyty, valty, mkkvo) = _get_options(mopts, True)
1151        cmopts = _c_option(mopts, _enabled_options_of_reduce)
1152        cfn = _wrap_redfn(fn)
1153        ckvi = self._ckvs
1154        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1155        ckvo = (kvo._ckvs if (kvo is not None) else None)
1156        (f, l, n) = _make_frame_info(inspect.currentframe().f_back)
1157        kmrso.kmr_reduce9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1158        if (cmopts.inspect == 0): self._consume()
1159        return kvo
1160
1161    def reduce_as_one(self, fn, **mopts):
1162        """ Reduces once as if all pairs had the same key."""
1163
1164        (keyty, valty, mkkvo) = _get_options(mopts, True)
1165        cmopts = _c_option(mopts, _enabled_options_of_reduce_as_one)
1166        cfn = _wrap_redfn(fn)
1167        ckvi = self._ckvs
1168        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1169        ckvo = (kvo._ckvs if (kvo is not None) else None)
1170        kmrso.kmr_reduce_as_one(ckvi, ckvo, 0, cmopts, cfn)
1171        if (cmopts.inspect == 0): self._consume()
1172        return kvo
1173
1174    def reduce_for_some(self, fn, **mopts):
1175        """Reduces until some key-value are added."""
1176
1177        (keyty, valty, mkkvo) = _get_options(mopts, True)
1178        cmopts = _c_option(mopts, _enabled_options_of_reduce)
1179        cfn = _wrap_redfn(fn)
1180        ckvi = self._ckvs
1181        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1182        ckvo = (kvo._ckvs if (kvo is not None) else None)
1183        ## (NOTE: It passes a frame of reduce_for_some.)
1184        (f, l, n) = _make_frame_info(inspect.currentframe())
1185        kmrso.kmr_reduce9(1, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1186        if (cmopts.inspect == 0): self._consume()
1187        return kvo
1188
1189    def reverse(self, **mopts):
1190        """Makes a new pair by swapping the key and the value."""
1191
1192        keyty = self.get_field_type(_Slot.Key)
1193        valty = self.get_field_type(_Slot.Value)
1194        (_, _, mkkvo) = _get_options(mopts, False)
1195        cmopts = _c_option(mopts, _enabled_options_of_map)
1196        assert (mkkvo is True)
1197        ckvi = self._ckvs
1198        kvo = (KVS(self.mr, valty, keyty) if mkkvo else None)
1199        ckvo = (kvo._ckvs if (kvo is not None) else None)
1200        kmrso.kmr_reverse(ckvi, ckvo, cmopts)
1201        if (cmopts.inspect == 0): self._consume()
1202        return kvo
1203
1204    def shuffle(self, **mopts):
1205        """Shuffles key-value pairs."""
1206
1207        keyty = self.get_field_type(_Slot.Key)
1208        valty = self.get_field_type(_Slot.Value)
1209        (_, _, mkkvo) = _get_options(mopts, False)
1210        cmopts = _c_option(mopts, _enabled_options_of_shuffle)
1211        ckvi = self._ckvs
1212        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1213        ckvo = (kvo._ckvs if (kvo is not None) else None)
1214        kmrso.kmr_shuffle(ckvi, ckvo, cmopts)
1215        if (cmopts.inspect == 0): self._consume()
1216        return kvo
1217
1218    def replicate(self, **mopts):
1219        """Replicates key-value pairs to be visible on all ranks."""
1220
1221        keyty = self.get_field_type(_Slot.Key)
1222        valty = self.get_field_type(_Slot.Value)
1223        (_, _, mkkvo) = _get_options(mopts, False)
1224        cmopts = _c_option(mopts, _enabled_options_of_replicate)
1225        ckvi = self._ckvs
1226        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1227        ckvo = (kvo._ckvs if (kvo is not None) else None)
1228        kmrso.kmr_replicate(ckvi, ckvo, cmopts)
1229        if (cmopts.inspect == 0): self._consume()
1230        return kvo
1231
1232    def distribute(self, cyclic, **mopts):
1233        """Distributes pairs approximately evenly to ranks."""
1234
1235        keyty = self.get_field_type(_Slot.Key)
1236        valty = self.get_field_type(_Slot.Value)
1237        (_, _, mkkvo) = _get_options(mopts, False)
1238        cmopts = _c_option(mopts, _enabled_options_of_distribute)
1239        ckvi = self._ckvs
1240        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1241        ckvo = (kvo._ckvs if (kvo is not None) else None)
1242        kmrso.kmr_distribute(ckvi, ckvo, cyclic, cmopts)
1243        if (cmopts.inspect == 0): self._consume()
1244        return kvo
1245
1246    def sort_locally(self, shuffling, **mopts):
1247        """Reorders key-value pairs in a single rank."""
1248
1249        keyty = self.get_field_type(_Slot.Key)
1250        valty = self.get_field_type(_Slot.Value)
1251        (_, _, mkkvo) = _get_options(mopts, False)
1252        cmopts = _c_option(mopts, _enabled_options_of_sort_locally)
1253        ckvi = self._ckvs
1254        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1255        ckvo = (kvo._ckvs if (kvo is not None) else None)
1256        kmrso.kmr_sort_locally(ckvi, ckvo, shuffling, cmopts)
1257        if (cmopts.inspect == 0): self._consume()
1258        return kvo
1259
1260    def sort(self, **mopts):
1261        """Sorts a KVS globally."""
1262
1263        keyty = self.get_field_type(_Slot.Key)
1264        valty = self.get_field_type(_Slot.Value)
1265        (_, _, mkkvo) = _get_options(mopts, False)
1266        cmopts = _c_option(mopts, _enabled_options_of_sort)
1267        ckvi = self._ckvs
1268        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1269        ckvo = (kvo._ckvs if (kvo is not None) else None)
1270        kmrso.kmr_sort(ckvi, ckvo, cmopts)
1271        if (cmopts.inspect == 0): self._consume()
1272        return kvo
1273
1274    def concatenate(self, *morekvs):
1275        """Concatenates a number of KVS'es to one."""
1276
1277        keyty = self.get_field_type(_Slot.Key)
1278        valty = self.get_field_type(_Slot.Value)
1279        siz = (len(morekvs) + 1)
1280        ckvsvec = (_c_kvs * siz)()
1281        ckvsvec[0] = self._ckvs
1282        for i in range(0, len(morekvs)):
1283            ckvsvec[i + 1] = morekvs[i]._ckvs
1284        cn = _c_int(siz)
1285        kvo = KVS(self.mr, keyty, valty)
1286        ckvo = kvo._ckvs
1287        kmrso.kmr_concatenate_kvs(ckvsvec, cn, ckvo, _c_option())
1288        for i in morekvs:
1289            i._consume()
1290        self._consume()
1291        return kvo
1292
1293    def read_files_reassemble(self, filename, color, offset, bytes_):
1294        """Reassembles files reading by ranks."""
1295
1296        buf = _c_void_p()
1297        siz = _c_uint64(0)
1298        kmrso.kmr_read_files_reassemble(
1299            self.mr._ckmr, _encode(filename), color, offset, bytes_,
1300            ctypes.byref(buf), ctypes.byref(siz))
1301        addr = buf.value
1302        ptr = (_c_ubyte * siz.value).from_address(addr)
1303        data = bytearray(ptr)
1304        kmrso.kmr_mfree(addr, siz.value)
1305        return data
1306
1307    def read_file_by_segments(self, filename, color):
1308        """Reads one file by segments and reassembles."""
1309
1310        buf = _c_void_p()
1311        siz = _c_uint64(0)
1312        kmrso.kmr_read_file_by_segments(
1313            self.mr._ckmr, _encode(filename), color,
1314            ctypes.byref(buf), ctypes.byref(siz))
1315        addr = buf.value
1316        ptr = (_c_ubyte * siz.value).from_address(addr)
1317        data = bytearray(ptr)
1318        kmrso.kmr_mfree(addr, siz.value)
1319        return data
1320
1321    def save(self):
1322        """Packs locally the contents of a KVS to a byte array."""
1323
1324        buf = _c_void_p(0)
1325        siz = _c_size_t(0)
1326        kmrso.kmr_save_kvs(self._ckvs, ctypes.byref(buf), ctypes.byref(siz),
1327                           _c_option())
1328        addr = buf.value
1329        ptr = (_c_ubyte * siz.value).from_address(addr)
1330        data = bytearray(ptr)
1331        kmrso.kmr_mfree(addr, siz.value)
1332        return data
1333
1334    def restore(self, data):
1335        """Unpacks locally the contents of a KVS from a byte array."""
1336
1337        kvo = KVS(self.mr, "opaque", "opaque")
1338        siz = len(data)
1339        addr = (_c_ubyte * siz).from_buffer(data)
1340        kmrso.kmr_restore_kvs(kvo._ckvs, addr, siz, _c_option())
1341        return kvo
1342
1343    def _map_swf(self, fn, **xopts):
1344        """."""
1345        (sopts, mopts) = _filter_spawn_options(xopts)
1346        (keyty, valty, mkkvo) = _get_options(mopts, True)
1347        cmopts = _c_option(mopts, _enabled_options_of_map)
1348        csopts = _c_spawn_option(sopts)
1349        cfn = _wrap_mapfn(fn)
1350        ckvi = self._ckvs
1351        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1352        ckvo = (kvo._ckvs if (kvo is not None) else None)
1353        kmrso.kmr_map_swf(ckvi, ckvo, 0, csopts, cfn)
1354        self._consume()
1355        return kvo
1356
1357def fin():
1358    """Finishes using KMR4PY."""
1359
1360    kmrso.kmr_fin()
1361    return
1362
1363def listify(kvs):
1364    """Returns an array of LOCAL contents."""
1365
1366    a = kvs.local_element_count() * [None]
1367    def f (kv, kvi, kvo, i, *_data):
1368        a[i] = kv
1369        return 0
1370    kvo = kvs.map(f, output=False, inspect=True)
1371    assert (kvo is None)
1372    return a
1373
1374def _stringify_options(o):
1375    return _decode(kmrso.kmr_stringify_options(o))
1376
1377def _stringify_file_options(o):
1378    return _decode(kmrso.kmr_stringify_file_options(o))
1379
1380def _stringify_spawn_options(o):
1381    return _decode(kmrso.kmr_stringify_spawn_options(o))
1382
1383def _check_ctypes_values():
1384    """Checks if ctypes values are properly used."""
1385
1386    if (not _c_null_pointer(_c_null_pointer_value)):
1387        raise Exception("BAD: C null pointer has a wrong value.")
1388
1389def _check_passing_options():
1390    """Checks if the options are passed properly from Python to C."""
1391
1392    for (option, stringify) in [
1393            (_c_option, _stringify_options),
1394            (_c_file_option, _stringify_file_options),
1395            (_c_spawn_option, _stringify_spawn_options)]:
1396        for (o, _, _) in option._fields_:
1397            if ((o == "gap16") or (o == "gap32")):
1398                pass
1399            else:
1400                copts = option({o : 1})
1401                s = stringify(copts)
1402                if (o != s):
1403                    raise Exception("BAD: %s != %s" % (str(o), str(s)))
1404
1405## Document generator "pdoc" requires loading kmr4py.py without
1406## initializing MPI.
1407##if (sys.modules["mpi4py"].__name__ != "dummy_mpi4py"):
1408##    _load_kmrso(kmrso_name)
1409
1410# Copyright (C) 2012-2018 RIKEN R-CCS
1411# This library is distributed WITHOUT ANY WARRANTY.  This library can be
1412# redistributed and/or modified under the terms of the BSD 2-Clause License.
kmrversion = '1.10'
kmrso = None

kmrso holds a libkmr.so library object.

kmrso_name = 'libkmr.so'

kmrso_name holds a libkmr.so name, which can be set before calling KMR().

def warning_function(message, category=None, stacklevel=1, source=None):

warning_function specifies the function used to issue warnings.

ignore_exceptions_in_map_fn = True

ignore_exceptions_in_map_fn=True makes exceptions ignored.

force_null_terminate_in_cstring = True

force_null_terminate_in_cstring specifies to add a null-terminator in C strings. Do not change it while some KVS'es are live.

class KMR:
630class KMR():
631    """KMR context."""
632
633    ## attributes: self._ckmr, self.nprocs, self.rank, self.emptykvs,
634    ## self._dismissed.
635
636    def __init__(self, comm, info=None):
637        """Makes a KMR context with a given MPI communicator (comm),
638        which is used in succeeding operations.  Info specifies its
639        options by MPI_Info.  Arguments of comm/info are passed as a
640        long integer (assuming either an integer (int) or a pointer in
641        C).  It also accepts an communicator instance of mpi4py.MPI.Comm,
642        a string "dummy" or "world" as a comm argument."""
643
644        if (kmrso == None):
645            _load_kmrso(kmrso_name)
646
647        if (isinstance(info, (int))):
648            warninfo = False
649            cinfo = info
650        else:
651            warninfo = (info != None)
652            cinfo = _mpi_info_null
653        if (isinstance(comm, (int))):
654            warncomm = False
655            ccomm = comm
656        elif (isinstance(comm, mpi4py.MPI.Comm)):
657            warncomm = False
658            comm_ptr = mpi4py.MPI._addressof(comm)
659            if (mpi4py.MPI._sizeof(mpi4py.MPI.Comm) == ctypes.sizeof(_c_uint64)):
660                MPI_Comm = _c_uint64
661            else:
662                MPI_Comm = _c_void_p
663            ccomm = MPI_Comm.from_address(comm_ptr)
664        elif (comm == "dummy"):
665            warncomm = False
666            ccomm = _mpi_comm_self
667        elif (comm == "world"):
668            warncomm = False
669            ccomm = _mpi_comm_world
670        else:
671            warncomm = True
672            ccomm = _mpi_comm_world
673
674        self._ckmr = kmrso.kmr_create_context(ccomm, cinfo, _encode(""))
675
676        """self._ckmr holds the C part of a KMR context."""
677
678        if (_c_null_pointer(self._ckmr)):
679            raise Exception("kmr_create_context: failed")
680
681        self._dismissed = False
682
683        """self._dismissed=True disables freeing KVS'es (by memory
684        management) which remain unconsumed after dismissing a KMR
685        context.  It is because freeing them causes referencing
686        dangling pointers in C."""
687
688        self.emptykvs = KVS(self).free()
689
690        """self.emptykvs holds an empty KVS needed by map_once,
691        map_on_rank_zero, read_files_reassemble, and
692        read_file_by_segments."""
693
694        self.nprocs = kmrso.kmr_get_nprocs(self._ckmr)
695
696        """self.nprocs holds an nprocs of MPI."""
697
698        self.rank = kmrso.kmr_get_rank(self._ckmr)
699
700        """self.rank holds a rank of MPI."""
701
702        if (warncomm and (self.rank == 0)):
703            warning_function("MPI comm ignored in KMR() constructor.", RuntimeWarning)
704        if (warninfo and (self.rank == 0)):
705            warning_function("MPI info ignored in KMR() constructor.", RuntimeWarning)
706        return
707
708    def __del__(self):
709        self.dismiss()
710        return
711
712    def free(self):
713        """Dismisses KMR (an alias of dismiss())."""
714
715        self.dismiss()
716
717    def dismiss(self):
718        """Dismisses KMR."""
719
720        if (not _c_null_pointer(self._ckmr)):
721            kmrso.kmr_free_context(self._ckmr)
722        self._ckmr = _c_null_pointer_value
723        self._dismissed = True
724        self.emptykvs = None
725        self.nprocs = -1
726        self.rank = -1
727        return
728
729    def create_kvs(self, **opts):
730        """Makes a new KVS (an alias of make_kvs())."""
731
732        self.make_kvs(**opts)
733
734    def make_kvs(self, **opts):
735        """Makes a new KVS."""
736
737        (keyty, valty, _) = _get_options(opts, True)
738        return KVS(self, keyty, valty)
739
740    def reply_to_spawner(self):
741        """Sends a reply message from a spawned process."""
742
743        kmrso.kmr_reply_to_spawner(self._ckmr)
744        return
745
746    def get_spawner_communicator(self, index):
747        """Obtains a parent communicator of a spawned process.  C version
748        returns a reference, but this returns an entity"""
749
750        commref = kmrso.kmr_get_spawner_communicator(self._ckmr, index)
751        return commref.contents.value
752
753    def send_kvs_to_spawner(self, kvs):
754        """Sends the KVS from a spawned process to the spawner."""
755
756        kmrso.kmr_send_kvs_to_spawner(self._ckmr, kvs._ckvs)
757        return
758
759    def _init_swf(self, splitcomms, masterank):
760        """."""
761        kmrso.kmr_init_swf(self._ckmr, splitcomms, masterank)
762        return
763
764    def _detach_swf_workers(self):
765        """."""
766        kmrso.kmr_detach_swf_workers(self._ckmr)
767        return
768
769    def _stop_swf_workers(self):
770        """."""
771        kmrso.kmr_stop_swf_workers(self._ckmr)
772        return
773
774    def _finish_swf(self):
775        """."""
776        kmrso.kmr_finish_swf(self._ckmr)
777        return
778
779    def _split_swf_lanes(self, masterrank, description, dump):
780        """."""
781        comms = (_c_mpi_comm * 4)()
782        desc = (ctypes.c_char_p * (len(description) + 1))()
783        desc[:-1] = description
784        desc[len(description)] = None
785        kmrso.kmr_split_swf_lanes(self._ckmr, comms, masterrank, desc, dump)
786        return comms
787
788    def _dump_swf_lanes(self):
789        """."""
790        kmrso.kmr_dump_swf_lanes(self._ckmr)
791        return
792
793    def _set_swf_verbosity(self, level):
794        """."""
795        kmrso.kmr_set_swf_verbosity(self._ckmr, level)
796        return
797
798    def set_option(self, k, v):
799        """Sets KMR option, taking both arguments by strings."""
800
801        kmrso.kmr_set_option_by_strings(self._ckmr, _encode(k), _encode(v))
802        return

KMR context.

KMR(comm, info=None)
636    def __init__(self, comm, info=None):
637        """Makes a KMR context with a given MPI communicator (comm),
638        which is used in succeeding operations.  Info specifies its
639        options by MPI_Info.  Arguments of comm/info are passed as a
640        long integer (assuming either an integer (int) or a pointer in
641        C).  It also accepts an communicator instance of mpi4py.MPI.Comm,
642        a string "dummy" or "world" as a comm argument."""
643
644        if (kmrso == None):
645            _load_kmrso(kmrso_name)
646
647        if (isinstance(info, (int))):
648            warninfo = False
649            cinfo = info
650        else:
651            warninfo = (info != None)
652            cinfo = _mpi_info_null
653        if (isinstance(comm, (int))):
654            warncomm = False
655            ccomm = comm
656        elif (isinstance(comm, mpi4py.MPI.Comm)):
657            warncomm = False
658            comm_ptr = mpi4py.MPI._addressof(comm)
659            if (mpi4py.MPI._sizeof(mpi4py.MPI.Comm) == ctypes.sizeof(_c_uint64)):
660                MPI_Comm = _c_uint64
661            else:
662                MPI_Comm = _c_void_p
663            ccomm = MPI_Comm.from_address(comm_ptr)
664        elif (comm == "dummy"):
665            warncomm = False
666            ccomm = _mpi_comm_self
667        elif (comm == "world"):
668            warncomm = False
669            ccomm = _mpi_comm_world
670        else:
671            warncomm = True
672            ccomm = _mpi_comm_world
673
674        self._ckmr = kmrso.kmr_create_context(ccomm, cinfo, _encode(""))
675
676        """self._ckmr holds the C part of a KMR context."""
677
678        if (_c_null_pointer(self._ckmr)):
679            raise Exception("kmr_create_context: failed")
680
681        self._dismissed = False
682
683        """self._dismissed=True disables freeing KVS'es (by memory
684        management) which remain unconsumed after dismissing a KMR
685        context.  It is because freeing them causes referencing
686        dangling pointers in C."""
687
688        self.emptykvs = KVS(self).free()
689
690        """self.emptykvs holds an empty KVS needed by map_once,
691        map_on_rank_zero, read_files_reassemble, and
692        read_file_by_segments."""
693
694        self.nprocs = kmrso.kmr_get_nprocs(self._ckmr)
695
696        """self.nprocs holds an nprocs of MPI."""
697
698        self.rank = kmrso.kmr_get_rank(self._ckmr)
699
700        """self.rank holds a rank of MPI."""
701
702        if (warncomm and (self.rank == 0)):
703            warning_function("MPI comm ignored in KMR() constructor.", RuntimeWarning)
704        if (warninfo and (self.rank == 0)):
705            warning_function("MPI info ignored in KMR() constructor.", RuntimeWarning)
706        return

Makes a KMR context with a given MPI communicator (comm), which is used in succeeding operations. Info specifies its options by MPI_Info. Arguments of comm/info are passed as a long integer (assuming either an integer (int) or a pointer in C). It also accepts an communicator instance of mpi4py.MPI.Comm, a string "dummy" or "world" as a comm argument.

emptykvs

self.emptykvs holds an empty KVS needed by map_once, map_on_rank_zero, read_files_reassemble, and read_file_by_segments.

nprocs

self.nprocs holds an nprocs of MPI.

rank

self.rank holds a rank of MPI.

def free(self):
712    def free(self):
713        """Dismisses KMR (an alias of dismiss())."""
714
715        self.dismiss()

Dismisses KMR (an alias of dismiss()).

def dismiss(self):
717    def dismiss(self):
718        """Dismisses KMR."""
719
720        if (not _c_null_pointer(self._ckmr)):
721            kmrso.kmr_free_context(self._ckmr)
722        self._ckmr = _c_null_pointer_value
723        self._dismissed = True
724        self.emptykvs = None
725        self.nprocs = -1
726        self.rank = -1
727        return

Dismisses KMR.

def create_kvs(self, **opts):
729    def create_kvs(self, **opts):
730        """Makes a new KVS (an alias of make_kvs())."""
731
732        self.make_kvs(**opts)

Makes a new KVS (an alias of make_kvs()).

def make_kvs(self, **opts):
734    def make_kvs(self, **opts):
735        """Makes a new KVS."""
736
737        (keyty, valty, _) = _get_options(opts, True)
738        return KVS(self, keyty, valty)

Makes a new KVS.

def reply_to_spawner(self):
740    def reply_to_spawner(self):
741        """Sends a reply message from a spawned process."""
742
743        kmrso.kmr_reply_to_spawner(self._ckmr)
744        return

Sends a reply message from a spawned process.

def get_spawner_communicator(self, index):
746    def get_spawner_communicator(self, index):
747        """Obtains a parent communicator of a spawned process.  C version
748        returns a reference, but this returns an entity"""
749
750        commref = kmrso.kmr_get_spawner_communicator(self._ckmr, index)
751        return commref.contents.value

Obtains a parent communicator of a spawned process. C version returns a reference, but this returns an entity

def send_kvs_to_spawner(self, kvs):
753    def send_kvs_to_spawner(self, kvs):
754        """Sends the KVS from a spawned process to the spawner."""
755
756        kmrso.kmr_send_kvs_to_spawner(self._ckmr, kvs._ckvs)
757        return

Sends the KVS from a spawned process to the spawner.

def set_option(self, k, v):
798    def set_option(self, k, v):
799        """Sets KMR option, taking both arguments by strings."""
800
801        kmrso.kmr_set_option_by_strings(self._ckmr, _encode(k), _encode(v))
802        return

Sets KMR option, taking both arguments by strings.

class KVS:
 834class KVS():
 835    """KVS.  Note that there are dummy KVS'es which are temporarily
 836    created to hold the C structure of the KVS passed to
 837    mapper/reducer functions.  A dummy KVS has None in its "mr"
 838    attribute."""
 839
 840    # attributes: self._ckvs, self.mr, self._frameinfo.
 841
 842    def __init__(self, kmr_or_ckvs, keyty="opaque", valty="opaque"):
 843        """Makes a KVS for a given KMR.  Do not call the KVS constructor
 844        directly, but use KMR.make_kvs() instead.  A KVS is created
 845        with the datatypes stored in the key and the value, specified
 846        by the keywords "key=" and "value=".  The datatype name is a
 847        string, one of "opaque", "cstring", "integer", and "float8".
 848        Most mappers and reducers (precisely, the methods that accepts
 849        a function argument) take keyword arguments for the types,
 850        defaulting with key="opaque" and value="opaque".  The
 851        datatypes affects the sorting order.  """
 852
 853        self.mr = None
 854
 855        """mr attribute holds a KMR context object.  Note that mr is
 856        not accessible from mapping/reducing functions."""
 857
 858        self._ckvs = None
 859
 860        """_ckvs attribute holds a kvs in C."""
 861
 862        self._frameinfo = None
 863
 864        """_frameinfo protects caller line information from garbage
 865        collected."""
 866
 867        if isinstance(kmr_or_ckvs, KMR):
 868            kf = _field_name_type_map[keyty]
 869            vf = _field_name_type_map[valty]
 870            top = inspect.currentframe().f_back
 871            self.mr = kmr_or_ckvs
 872            (f, l, n) = _make_frame_info(top)
 873            self._ckvs = kmrso.kmr_create_kvs7(
 874                self.mr._ckmr, kf, vf, _c_option(), f, l, n)
 875            self._frameinfo = (f, l, n)
 876        elif isinstance(kmr_or_ckvs, _c_pointer):
 877            ## Return a dummy KVS.
 878            self.mr = None
 879            self._ckvs = kmr_or_ckvs
 880        else:
 881            raise Exception("Bad call to kvs constructor")
 882
 883    def __del__(self):
 884        if ((not self._is_dummy()) and (not _c_null_pointer(self._ckvs))):
 885            self.free()
 886        return
 887
 888    def free(self):
 889        """Finishes the C part of a KVS."""
 890
 891        if (self._is_dummy()):
 892            raise Exception("Bad call to free_kvs on dummy KVS")
 893        elif (_c_null_pointer(self._ckvs)):
 894            raise Exception("Bad call to free_kvs on freed KVS")
 895        elif ((not self.mr is None) and self.mr._dismissed):
 896            ## Do not free when KMR object is dismissed.
 897            pass
 898        else:
 899            kmrso.kmr_free_kvs(self._ckvs)
 900            self._ckvs = _c_null_pointer_value
 901            return self
 902
 903    def _is_dummy(self):
 904        return (self.mr is None)
 905
 906    def _consume(self):
 907        """Releases a now dangling C pointer."""
 908
 909        self._ckvs = _c_null_pointer_value
 910
 911    def _encode_content(self, o, key_or_value):
 912        """Marshalls an object with regard to the field type.  It
 913        retuns a 3-tuple, with length, value-union, and the 3nd to
 914        keep a reference to a buffer."""
 915
 916        kvty = self.get_field_type(key_or_value)
 917        u = _c_unitsized()
 918        if (kvty == "opaque"):
 919            data = pickle.dumps(o, _pickle_protocol)
 920            u.p = data
 921            return (len(data), u, data)
 922        elif (kvty == "cstring"):
 923            if (not isinstance(o, str)):
 924                raise Exception("Not 8-bit string for cstring: %s" % o)
 925            ## (Add null for C string).
 926            os = ((o + "\0") if force_null_terminate_in_cstring else o)
 927            data = _encode(os)
 928            u.p = data
 929            return (len(data), u, data)
 930        elif (kvty == "integer"):
 931            u.i = o
 932            return (ctypes.sizeof(_c_long), u, None)
 933        elif (kvty == "float8"):
 934            u.d = o
 935            return (ctypes.sizeof(_c_double), u, None)
 936        else:
 937            raise Exception("Bad field type: %s" % kvty)
 938
 939    def _decode_content(self, siz, u, key_or_value):
 940        """Unmarshalls an object with regard to the field type.  It
 941        returns integer 0 when the length is 0 (it is for a dummy
 942        key-value used in kmr_map_once() etc)."""
 943
 944        if (siz == 0):
 945            return 0
 946        else:
 947            kvty = self.get_field_type(key_or_value)
 948            if (kvty == "opaque"):
 949                data = ctypes.string_at(u.i, siz)
 950                o = pickle.loads(data)
 951                return o
 952            elif (kvty == "cstring"):
 953                ## (Delete null added for C string).
 954                siz1 = ((siz - 1) if force_null_terminate_in_cstring else siz)
 955                data = ctypes.string_at(u.i, siz1)
 956                os = _decode(data)
 957                return os
 958            elif (kvty == "integer"):
 959                return u.i
 960            elif (kvty == "float8"):
 961                return u.d
 962            else:
 963                raise Exception("Bad field type: %s" % kvty)
 964
 965    def get_field_type(self, key_or_value):
 966        """Get a field type of a KVS."""
 967
 968        if (_c_null_pointer(self._ckvs)):
 969            raise Exception("Bad KVS (null C-object)")
 970        if (key_or_value == _Slot.Key):
 971            kvty = kmrso.kmr_get_key_type_ff(self._ckvs)
 972        elif (key_or_value == _Slot.Value):
 973            kvty = kmrso.kmr_get_value_type_ff(self._ckvs)
 974        else:
 975            raise Exception("Bad field %s" % key_or_value.name)
 976        if (kvty == _kv_bad):
 977            raise Exception("Bad field type value %d in KVS" % kvty)
 978        else:
 979            return _field_type_name_map[kvty]
 980
 981    def add(self, key, val):
 982        """Adds a key-value pair."""
 983
 984        self.add_kv(key, val)
 985        return
 986
 987    def add_kv(self, key, val):
 988        """Adds a key-value pair."""
 989
 990        ## Note it keeps the created string until kmr_add_kv(),
 991        ## because kvbox does not hold the references.
 992        (klen, k, ks) = self._encode_content(key, _Slot.Key)
 993        (vlen, v, vs) = self._encode_content(val, _Slot.Value)
 994        cbox = _c_kvbox().set(klen, k, vlen, v)
 995        kmrso.kmr_add_kv(self._ckvs, cbox)
 996        return
 997
 998    def add_kv_done(self):
 999        """Finishes adding key-value pairs."""
1000
1001        kmrso.kmr_add_kv_done(self._ckvs)
1002        return
1003
1004    def get_element_count(self):
1005        """Gets the total number of key-value pairs."""
1006
1007        c = _c_long(0)
1008        kmrso.kmr_get_element_count(self._ckvs, ctypes.byref(c))
1009        return c.value
1010
1011    def local_element_count(self):
1012        """Gets the number of key-value pairs locally."""
1013
1014        c = _c_long(0)
1015        kmrso.kmr_local_element_count(self._ckvs, ctypes.byref(c))
1016        return c.value
1017
1018    def map(self, fn, **mopts):
1019        """Maps simply."""
1020
1021        (keyty, valty, mkkvo) = _get_options(mopts, True)
1022        cmopts = _c_option(mopts, _enabled_options_of_map)
1023        cfn = _wrap_mapfn(fn)
1024        ckvi = self._ckvs
1025        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1026        ckvo = (kvo._ckvs if (kvo is not None) else None)
1027        (f, l, n) = _make_frame_info(inspect.currentframe().f_back)
1028        kmrso.kmr_map9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1029        if (cmopts.inspect == 0): self._consume()
1030        return kvo
1031
1032    def map_once(self, rank_zero_only, fn, **mopts):
1033        """Maps once with a dummy key-value pair."""
1034
1035        ## It needs dummy input; Never inspects.
1036        (keyty, valty, mkkvo) = _get_options(mopts, True)
1037        cmopts = _c_option(mopts, _enabled_options_of_map_once)
1038        cfn = _wrap_mapfn(fn)
1039        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1040        ckvo = (kvo._ckvs if (kvo is not None) else None)
1041        kmrso.kmr_map_once(ckvo, 0, cmopts, rank_zero_only, cfn)
1042        return kvo
1043
1044    def map_on_rank_zero(self, fn, **mopts):
1045        """Maps on rank0 only."""
1046
1047        ## It needs dummy input.
1048        return self.map_once(True, fn, *mopts)
1049
1050    def map_rank_by_rank(self, fn, **mopts):
1051        """Maps sequentially with rank by rank for debugging."""
1052
1053        (keyty, valty, mkkvo) = _get_options(mopts, True)
1054        cmopts = _c_option(mopts, _enabled_options_of_map)
1055        cfn = _wrap_mapfn(fn)
1056        ckvi = self._ckvs
1057        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1058        ckvo = (kvo._ckvs if (kvo is not None) else None)
1059        kmrso.kmr_map_rank_by_rank(ckvi, ckvo, 0, cmopts, cfn)
1060        if (cmopts.inspect == 0): self._consume()
1061        return kvo
1062
1063    def map_for_some(self, fn, **mopts):
1064        """Maps until some key-value are added."""
1065
1066        (keyty, valty, mkkvo) = _get_options(mopts, True)
1067        cmopts = _c_option(mopts, _enabled_options_of_map)
1068        ckvi = self._ckvs
1069        cfn = _wrap_mapfn(fn)
1070        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1071        ckvo = (kvo._ckvs if (kvo is not None) else None)
1072        kmrso.kmr_map_for_some(ckvi, ckvo, 0, cmopts, cfn)
1073        if (cmopts.inspect == 0): self._consume()
1074        return kvo
1075
1076    def map_ms(self, fn, **mopts):
1077        """Maps in master-worker mode."""
1078
1079        ## Its call is repeated until True (assuming MPI_SUCCESS==0).
1080        (keyty, valty, mkkvo) = _get_options(mopts, True)
1081        cmopts = _c_option(mopts, _enabled_options_of_map_ms)
1082        cfn = _wrap_mapfn(fn)
1083        ckvi = self._ckvs
1084        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1085        ckvo = (kvo._ckvs if (kvo is not None) else None)
1086        rr = 1
1087        while (rr != 0):
1088            rr = kmrso.kmr_map_ms(ckvi, ckvo, 0, cmopts, cfn)
1089        self._consume()
1090        return kvo
1091
1092    def map_ms_commands(self, fn, **xopts):
1093        """Maps in master-worker mode, and runs serial commands."""
1094
1095        (sopts, mopts) = _filter_spawn_options(xopts)
1096        (keyty, valty, mkkvo) = _get_options(mopts, True)
1097        cmopts = _c_option(mopts, _enabled_options_of_map_ms)
1098        csopts = _c_spawn_option(sopts)
1099        cfn = _wrap_mapfn(fn)
1100        ckvi = self._ckvs
1101        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1102        ckvo = (kvo._ckvs if (kvo is not None) else None)
1103        rr = 1
1104        while (rr != 0):
1105            rr = kmrso.kmr_map_ms_commands(ckvi, ckvo, 0, cmopts, csopts, cfn)
1106        self._consume()
1107        return kvo
1108
1109    def map_via_spawn(self, fn, **xopts):
1110        """Maps on processes started by MPI_Comm_spawn()."""
1111
1112        (sopts, mopts) = _filter_spawn_options(xopts)
1113        (keyty, valty, mkkvo) = _get_options(mopts, True)
1114        cmopts = _c_option(mopts, _enabled_options_of_map)
1115        csopts = _c_spawn_option(sopts)
1116        cfn = _wrap_mapfn(fn)
1117        ckvi = self._ckvs
1118        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1119        ckvo = (kvo._ckvs if (kvo is not None) else None)
1120        kmrso.kmr_map_via_spawn(ckvi, ckvo, 0, _mpi_info_null, csopts, cfn)
1121        self._consume()
1122        return kvo
1123
1124    def map_processes(self, nonmpi, fn, **sopts):
1125        """Maps on processes started by MPI_Comm_spawn()."""
1126
1127        (keyty, valty, mkkvo) = _get_options(sopts, True)
1128        csopts = _c_spawn_option(sopts)
1129        cfn = _wrap_mapfn(fn)
1130        ckvi = self._ckvs
1131        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1132        ckvo = (kvo._ckvs if (kvo is not None) else None)
1133        kmrso.kmr_map_processes(nonmpi, ckvi, ckvo, 0, _mpi_info_null,
1134                                csopts, cfn)
1135        self._consume()
1136        return kvo
1137
1138    def map_parallel_processes(self, fn, **sopts):
1139        """Maps on processes started by MPI_Comm_spawn()."""
1140
1141        return self.map_processes(False, fn, **sopts)
1142
1143    def map_serial_processes(self, fn, **sopts):
1144        """Maps on processes started by MPI_Comm_spawn()."""
1145
1146        return self.map_processes(True, fn, **sopts)
1147
1148    def reduce(self, fn, **mopts):
1149        """Reduces key-value pairs."""
1150
1151        (keyty, valty, mkkvo) = _get_options(mopts, True)
1152        cmopts = _c_option(mopts, _enabled_options_of_reduce)
1153        cfn = _wrap_redfn(fn)
1154        ckvi = self._ckvs
1155        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1156        ckvo = (kvo._ckvs if (kvo is not None) else None)
1157        (f, l, n) = _make_frame_info(inspect.currentframe().f_back)
1158        kmrso.kmr_reduce9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1159        if (cmopts.inspect == 0): self._consume()
1160        return kvo
1161
1162    def reduce_as_one(self, fn, **mopts):
1163        """ Reduces once as if all pairs had the same key."""
1164
1165        (keyty, valty, mkkvo) = _get_options(mopts, True)
1166        cmopts = _c_option(mopts, _enabled_options_of_reduce_as_one)
1167        cfn = _wrap_redfn(fn)
1168        ckvi = self._ckvs
1169        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1170        ckvo = (kvo._ckvs if (kvo is not None) else None)
1171        kmrso.kmr_reduce_as_one(ckvi, ckvo, 0, cmopts, cfn)
1172        if (cmopts.inspect == 0): self._consume()
1173        return kvo
1174
1175    def reduce_for_some(self, fn, **mopts):
1176        """Reduces until some key-value are added."""
1177
1178        (keyty, valty, mkkvo) = _get_options(mopts, True)
1179        cmopts = _c_option(mopts, _enabled_options_of_reduce)
1180        cfn = _wrap_redfn(fn)
1181        ckvi = self._ckvs
1182        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1183        ckvo = (kvo._ckvs if (kvo is not None) else None)
1184        ## (NOTE: It passes a frame of reduce_for_some.)
1185        (f, l, n) = _make_frame_info(inspect.currentframe())
1186        kmrso.kmr_reduce9(1, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1187        if (cmopts.inspect == 0): self._consume()
1188        return kvo
1189
1190    def reverse(self, **mopts):
1191        """Makes a new pair by swapping the key and the value."""
1192
1193        keyty = self.get_field_type(_Slot.Key)
1194        valty = self.get_field_type(_Slot.Value)
1195        (_, _, mkkvo) = _get_options(mopts, False)
1196        cmopts = _c_option(mopts, _enabled_options_of_map)
1197        assert (mkkvo is True)
1198        ckvi = self._ckvs
1199        kvo = (KVS(self.mr, valty, keyty) if mkkvo else None)
1200        ckvo = (kvo._ckvs if (kvo is not None) else None)
1201        kmrso.kmr_reverse(ckvi, ckvo, cmopts)
1202        if (cmopts.inspect == 0): self._consume()
1203        return kvo
1204
1205    def shuffle(self, **mopts):
1206        """Shuffles key-value pairs."""
1207
1208        keyty = self.get_field_type(_Slot.Key)
1209        valty = self.get_field_type(_Slot.Value)
1210        (_, _, mkkvo) = _get_options(mopts, False)
1211        cmopts = _c_option(mopts, _enabled_options_of_shuffle)
1212        ckvi = self._ckvs
1213        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1214        ckvo = (kvo._ckvs if (kvo is not None) else None)
1215        kmrso.kmr_shuffle(ckvi, ckvo, cmopts)
1216        if (cmopts.inspect == 0): self._consume()
1217        return kvo
1218
1219    def replicate(self, **mopts):
1220        """Replicates key-value pairs to be visible on all ranks."""
1221
1222        keyty = self.get_field_type(_Slot.Key)
1223        valty = self.get_field_type(_Slot.Value)
1224        (_, _, mkkvo) = _get_options(mopts, False)
1225        cmopts = _c_option(mopts, _enabled_options_of_replicate)
1226        ckvi = self._ckvs
1227        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1228        ckvo = (kvo._ckvs if (kvo is not None) else None)
1229        kmrso.kmr_replicate(ckvi, ckvo, cmopts)
1230        if (cmopts.inspect == 0): self._consume()
1231        return kvo
1232
1233    def distribute(self, cyclic, **mopts):
1234        """Distributes pairs approximately evenly to ranks."""
1235
1236        keyty = self.get_field_type(_Slot.Key)
1237        valty = self.get_field_type(_Slot.Value)
1238        (_, _, mkkvo) = _get_options(mopts, False)
1239        cmopts = _c_option(mopts, _enabled_options_of_distribute)
1240        ckvi = self._ckvs
1241        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1242        ckvo = (kvo._ckvs if (kvo is not None) else None)
1243        kmrso.kmr_distribute(ckvi, ckvo, cyclic, cmopts)
1244        if (cmopts.inspect == 0): self._consume()
1245        return kvo
1246
1247    def sort_locally(self, shuffling, **mopts):
1248        """Reorders key-value pairs in a single rank."""
1249
1250        keyty = self.get_field_type(_Slot.Key)
1251        valty = self.get_field_type(_Slot.Value)
1252        (_, _, mkkvo) = _get_options(mopts, False)
1253        cmopts = _c_option(mopts, _enabled_options_of_sort_locally)
1254        ckvi = self._ckvs
1255        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1256        ckvo = (kvo._ckvs if (kvo is not None) else None)
1257        kmrso.kmr_sort_locally(ckvi, ckvo, shuffling, cmopts)
1258        if (cmopts.inspect == 0): self._consume()
1259        return kvo
1260
1261    def sort(self, **mopts):
1262        """Sorts a KVS globally."""
1263
1264        keyty = self.get_field_type(_Slot.Key)
1265        valty = self.get_field_type(_Slot.Value)
1266        (_, _, mkkvo) = _get_options(mopts, False)
1267        cmopts = _c_option(mopts, _enabled_options_of_sort)
1268        ckvi = self._ckvs
1269        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1270        ckvo = (kvo._ckvs if (kvo is not None) else None)
1271        kmrso.kmr_sort(ckvi, ckvo, cmopts)
1272        if (cmopts.inspect == 0): self._consume()
1273        return kvo
1274
1275    def concatenate(self, *morekvs):
1276        """Concatenates a number of KVS'es to one."""
1277
1278        keyty = self.get_field_type(_Slot.Key)
1279        valty = self.get_field_type(_Slot.Value)
1280        siz = (len(morekvs) + 1)
1281        ckvsvec = (_c_kvs * siz)()
1282        ckvsvec[0] = self._ckvs
1283        for i in range(0, len(morekvs)):
1284            ckvsvec[i + 1] = morekvs[i]._ckvs
1285        cn = _c_int(siz)
1286        kvo = KVS(self.mr, keyty, valty)
1287        ckvo = kvo._ckvs
1288        kmrso.kmr_concatenate_kvs(ckvsvec, cn, ckvo, _c_option())
1289        for i in morekvs:
1290            i._consume()
1291        self._consume()
1292        return kvo
1293
1294    def read_files_reassemble(self, filename, color, offset, bytes_):
1295        """Reassembles files reading by ranks."""
1296
1297        buf = _c_void_p()
1298        siz = _c_uint64(0)
1299        kmrso.kmr_read_files_reassemble(
1300            self.mr._ckmr, _encode(filename), color, offset, bytes_,
1301            ctypes.byref(buf), ctypes.byref(siz))
1302        addr = buf.value
1303        ptr = (_c_ubyte * siz.value).from_address(addr)
1304        data = bytearray(ptr)
1305        kmrso.kmr_mfree(addr, siz.value)
1306        return data
1307
1308    def read_file_by_segments(self, filename, color):
1309        """Reads one file by segments and reassembles."""
1310
1311        buf = _c_void_p()
1312        siz = _c_uint64(0)
1313        kmrso.kmr_read_file_by_segments(
1314            self.mr._ckmr, _encode(filename), color,
1315            ctypes.byref(buf), ctypes.byref(siz))
1316        addr = buf.value
1317        ptr = (_c_ubyte * siz.value).from_address(addr)
1318        data = bytearray(ptr)
1319        kmrso.kmr_mfree(addr, siz.value)
1320        return data
1321
1322    def save(self):
1323        """Packs locally the contents of a KVS to a byte array."""
1324
1325        buf = _c_void_p(0)
1326        siz = _c_size_t(0)
1327        kmrso.kmr_save_kvs(self._ckvs, ctypes.byref(buf), ctypes.byref(siz),
1328                           _c_option())
1329        addr = buf.value
1330        ptr = (_c_ubyte * siz.value).from_address(addr)
1331        data = bytearray(ptr)
1332        kmrso.kmr_mfree(addr, siz.value)
1333        return data
1334
1335    def restore(self, data):
1336        """Unpacks locally the contents of a KVS from a byte array."""
1337
1338        kvo = KVS(self.mr, "opaque", "opaque")
1339        siz = len(data)
1340        addr = (_c_ubyte * siz).from_buffer(data)
1341        kmrso.kmr_restore_kvs(kvo._ckvs, addr, siz, _c_option())
1342        return kvo
1343
1344    def _map_swf(self, fn, **xopts):
1345        """."""
1346        (sopts, mopts) = _filter_spawn_options(xopts)
1347        (keyty, valty, mkkvo) = _get_options(mopts, True)
1348        cmopts = _c_option(mopts, _enabled_options_of_map)
1349        csopts = _c_spawn_option(sopts)
1350        cfn = _wrap_mapfn(fn)
1351        ckvi = self._ckvs
1352        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1353        ckvo = (kvo._ckvs if (kvo is not None) else None)
1354        kmrso.kmr_map_swf(ckvi, ckvo, 0, csopts, cfn)
1355        self._consume()
1356        return kvo

KVS. Note that there are dummy KVS'es which are temporarily created to hold the C structure of the KVS passed to mapper/reducer functions. A dummy KVS has None in its "mr" attribute.

KVS(kmr_or_ckvs, keyty='opaque', valty='opaque')
842    def __init__(self, kmr_or_ckvs, keyty="opaque", valty="opaque"):
843        """Makes a KVS for a given KMR.  Do not call the KVS constructor
844        directly, but use KMR.make_kvs() instead.  A KVS is created
845        with the datatypes stored in the key and the value, specified
846        by the keywords "key=" and "value=".  The datatype name is a
847        string, one of "opaque", "cstring", "integer", and "float8".
848        Most mappers and reducers (precisely, the methods that accepts
849        a function argument) take keyword arguments for the types,
850        defaulting with key="opaque" and value="opaque".  The
851        datatypes affects the sorting order.  """
852
853        self.mr = None
854
855        """mr attribute holds a KMR context object.  Note that mr is
856        not accessible from mapping/reducing functions."""
857
858        self._ckvs = None
859
860        """_ckvs attribute holds a kvs in C."""
861
862        self._frameinfo = None
863
864        """_frameinfo protects caller line information from garbage
865        collected."""
866
867        if isinstance(kmr_or_ckvs, KMR):
868            kf = _field_name_type_map[keyty]
869            vf = _field_name_type_map[valty]
870            top = inspect.currentframe().f_back
871            self.mr = kmr_or_ckvs
872            (f, l, n) = _make_frame_info(top)
873            self._ckvs = kmrso.kmr_create_kvs7(
874                self.mr._ckmr, kf, vf, _c_option(), f, l, n)
875            self._frameinfo = (f, l, n)
876        elif isinstance(kmr_or_ckvs, _c_pointer):
877            ## Return a dummy KVS.
878            self.mr = None
879            self._ckvs = kmr_or_ckvs
880        else:
881            raise Exception("Bad call to kvs constructor")

Makes a KVS for a given KMR. Do not call the KVS constructor directly, but use KMR.make_kvs() instead. A KVS is created with the datatypes stored in the key and the value, specified by the keywords "key=" and "value=". The datatype name is a string, one of "opaque", "cstring", "integer", and "float8". Most mappers and reducers (precisely, the methods that accepts a function argument) take keyword arguments for the types, defaulting with key="opaque" and value="opaque". The datatypes affects the sorting order.

mr

mr attribute holds a KMR context object. Note that mr is not accessible from mapping/reducing functions.

def free(self):
888    def free(self):
889        """Finishes the C part of a KVS."""
890
891        if (self._is_dummy()):
892            raise Exception("Bad call to free_kvs on dummy KVS")
893        elif (_c_null_pointer(self._ckvs)):
894            raise Exception("Bad call to free_kvs on freed KVS")
895        elif ((not self.mr is None) and self.mr._dismissed):
896            ## Do not free when KMR object is dismissed.
897            pass
898        else:
899            kmrso.kmr_free_kvs(self._ckvs)
900            self._ckvs = _c_null_pointer_value
901            return self

Finishes the C part of a KVS.

def get_field_type(self, key_or_value):
965    def get_field_type(self, key_or_value):
966        """Get a field type of a KVS."""
967
968        if (_c_null_pointer(self._ckvs)):
969            raise Exception("Bad KVS (null C-object)")
970        if (key_or_value == _Slot.Key):
971            kvty = kmrso.kmr_get_key_type_ff(self._ckvs)
972        elif (key_or_value == _Slot.Value):
973            kvty = kmrso.kmr_get_value_type_ff(self._ckvs)
974        else:
975            raise Exception("Bad field %s" % key_or_value.name)
976        if (kvty == _kv_bad):
977            raise Exception("Bad field type value %d in KVS" % kvty)
978        else:
979            return _field_type_name_map[kvty]

Get a field type of a KVS.

def add(self, key, val):
981    def add(self, key, val):
982        """Adds a key-value pair."""
983
984        self.add_kv(key, val)
985        return

Adds a key-value pair.

def add_kv(self, key, val):
987    def add_kv(self, key, val):
988        """Adds a key-value pair."""
989
990        ## Note it keeps the created string until kmr_add_kv(),
991        ## because kvbox does not hold the references.
992        (klen, k, ks) = self._encode_content(key, _Slot.Key)
993        (vlen, v, vs) = self._encode_content(val, _Slot.Value)
994        cbox = _c_kvbox().set(klen, k, vlen, v)
995        kmrso.kmr_add_kv(self._ckvs, cbox)
996        return

Adds a key-value pair.

def add_kv_done(self):
 998    def add_kv_done(self):
 999        """Finishes adding key-value pairs."""
1000
1001        kmrso.kmr_add_kv_done(self._ckvs)
1002        return

Finishes adding key-value pairs.

def get_element_count(self):
1004    def get_element_count(self):
1005        """Gets the total number of key-value pairs."""
1006
1007        c = _c_long(0)
1008        kmrso.kmr_get_element_count(self._ckvs, ctypes.byref(c))
1009        return c.value

Gets the total number of key-value pairs.

def local_element_count(self):
1011    def local_element_count(self):
1012        """Gets the number of key-value pairs locally."""
1013
1014        c = _c_long(0)
1015        kmrso.kmr_local_element_count(self._ckvs, ctypes.byref(c))
1016        return c.value

Gets the number of key-value pairs locally.

def map(self, fn, **mopts):
1018    def map(self, fn, **mopts):
1019        """Maps simply."""
1020
1021        (keyty, valty, mkkvo) = _get_options(mopts, True)
1022        cmopts = _c_option(mopts, _enabled_options_of_map)
1023        cfn = _wrap_mapfn(fn)
1024        ckvi = self._ckvs
1025        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1026        ckvo = (kvo._ckvs if (kvo is not None) else None)
1027        (f, l, n) = _make_frame_info(inspect.currentframe().f_back)
1028        kmrso.kmr_map9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1029        if (cmopts.inspect == 0): self._consume()
1030        return kvo

Maps simply.

def map_once(self, rank_zero_only, fn, **mopts):
1032    def map_once(self, rank_zero_only, fn, **mopts):
1033        """Maps once with a dummy key-value pair."""
1034
1035        ## It needs dummy input; Never inspects.
1036        (keyty, valty, mkkvo) = _get_options(mopts, True)
1037        cmopts = _c_option(mopts, _enabled_options_of_map_once)
1038        cfn = _wrap_mapfn(fn)
1039        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1040        ckvo = (kvo._ckvs if (kvo is not None) else None)
1041        kmrso.kmr_map_once(ckvo, 0, cmopts, rank_zero_only, cfn)
1042        return kvo

Maps once with a dummy key-value pair.

def map_on_rank_zero(self, fn, **mopts):
1044    def map_on_rank_zero(self, fn, **mopts):
1045        """Maps on rank0 only."""
1046
1047        ## It needs dummy input.
1048        return self.map_once(True, fn, *mopts)

Maps on rank0 only.

def map_rank_by_rank(self, fn, **mopts):
1050    def map_rank_by_rank(self, fn, **mopts):
1051        """Maps sequentially with rank by rank for debugging."""
1052
1053        (keyty, valty, mkkvo) = _get_options(mopts, True)
1054        cmopts = _c_option(mopts, _enabled_options_of_map)
1055        cfn = _wrap_mapfn(fn)
1056        ckvi = self._ckvs
1057        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1058        ckvo = (kvo._ckvs if (kvo is not None) else None)
1059        kmrso.kmr_map_rank_by_rank(ckvi, ckvo, 0, cmopts, cfn)
1060        if (cmopts.inspect == 0): self._consume()
1061        return kvo

Maps sequentially with rank by rank for debugging.

def map_for_some(self, fn, **mopts):
1063    def map_for_some(self, fn, **mopts):
1064        """Maps until some key-value are added."""
1065
1066        (keyty, valty, mkkvo) = _get_options(mopts, True)
1067        cmopts = _c_option(mopts, _enabled_options_of_map)
1068        ckvi = self._ckvs
1069        cfn = _wrap_mapfn(fn)
1070        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1071        ckvo = (kvo._ckvs if (kvo is not None) else None)
1072        kmrso.kmr_map_for_some(ckvi, ckvo, 0, cmopts, cfn)
1073        if (cmopts.inspect == 0): self._consume()
1074        return kvo

Maps until some key-value are added.

def map_ms(self, fn, **mopts):
1076    def map_ms(self, fn, **mopts):
1077        """Maps in master-worker mode."""
1078
1079        ## Its call is repeated until True (assuming MPI_SUCCESS==0).
1080        (keyty, valty, mkkvo) = _get_options(mopts, True)
1081        cmopts = _c_option(mopts, _enabled_options_of_map_ms)
1082        cfn = _wrap_mapfn(fn)
1083        ckvi = self._ckvs
1084        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1085        ckvo = (kvo._ckvs if (kvo is not None) else None)
1086        rr = 1
1087        while (rr != 0):
1088            rr = kmrso.kmr_map_ms(ckvi, ckvo, 0, cmopts, cfn)
1089        self._consume()
1090        return kvo

Maps in master-worker mode.

def map_ms_commands(self, fn, **xopts):
1092    def map_ms_commands(self, fn, **xopts):
1093        """Maps in master-worker mode, and runs serial commands."""
1094
1095        (sopts, mopts) = _filter_spawn_options(xopts)
1096        (keyty, valty, mkkvo) = _get_options(mopts, True)
1097        cmopts = _c_option(mopts, _enabled_options_of_map_ms)
1098        csopts = _c_spawn_option(sopts)
1099        cfn = _wrap_mapfn(fn)
1100        ckvi = self._ckvs
1101        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1102        ckvo = (kvo._ckvs if (kvo is not None) else None)
1103        rr = 1
1104        while (rr != 0):
1105            rr = kmrso.kmr_map_ms_commands(ckvi, ckvo, 0, cmopts, csopts, cfn)
1106        self._consume()
1107        return kvo

Maps in master-worker mode, and runs serial commands.

def map_via_spawn(self, fn, **xopts):
1109    def map_via_spawn(self, fn, **xopts):
1110        """Maps on processes started by MPI_Comm_spawn()."""
1111
1112        (sopts, mopts) = _filter_spawn_options(xopts)
1113        (keyty, valty, mkkvo) = _get_options(mopts, True)
1114        cmopts = _c_option(mopts, _enabled_options_of_map)
1115        csopts = _c_spawn_option(sopts)
1116        cfn = _wrap_mapfn(fn)
1117        ckvi = self._ckvs
1118        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1119        ckvo = (kvo._ckvs if (kvo is not None) else None)
1120        kmrso.kmr_map_via_spawn(ckvi, ckvo, 0, _mpi_info_null, csopts, cfn)
1121        self._consume()
1122        return kvo

Maps on processes started by MPI_Comm_spawn().

def map_processes(self, nonmpi, fn, **sopts):
1124    def map_processes(self, nonmpi, fn, **sopts):
1125        """Maps on processes started by MPI_Comm_spawn()."""
1126
1127        (keyty, valty, mkkvo) = _get_options(sopts, True)
1128        csopts = _c_spawn_option(sopts)
1129        cfn = _wrap_mapfn(fn)
1130        ckvi = self._ckvs
1131        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1132        ckvo = (kvo._ckvs if (kvo is not None) else None)
1133        kmrso.kmr_map_processes(nonmpi, ckvi, ckvo, 0, _mpi_info_null,
1134                                csopts, cfn)
1135        self._consume()
1136        return kvo

Maps on processes started by MPI_Comm_spawn().

def map_parallel_processes(self, fn, **sopts):
1138    def map_parallel_processes(self, fn, **sopts):
1139        """Maps on processes started by MPI_Comm_spawn()."""
1140
1141        return self.map_processes(False, fn, **sopts)

Maps on processes started by MPI_Comm_spawn().

def map_serial_processes(self, fn, **sopts):
1143    def map_serial_processes(self, fn, **sopts):
1144        """Maps on processes started by MPI_Comm_spawn()."""
1145
1146        return self.map_processes(True, fn, **sopts)

Maps on processes started by MPI_Comm_spawn().

def reduce(self, fn, **mopts):
1148    def reduce(self, fn, **mopts):
1149        """Reduces key-value pairs."""
1150
1151        (keyty, valty, mkkvo) = _get_options(mopts, True)
1152        cmopts = _c_option(mopts, _enabled_options_of_reduce)
1153        cfn = _wrap_redfn(fn)
1154        ckvi = self._ckvs
1155        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1156        ckvo = (kvo._ckvs if (kvo is not None) else None)
1157        (f, l, n) = _make_frame_info(inspect.currentframe().f_back)
1158        kmrso.kmr_reduce9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1159        if (cmopts.inspect == 0): self._consume()
1160        return kvo

Reduces key-value pairs.

def reduce_as_one(self, fn, **mopts):
1162    def reduce_as_one(self, fn, **mopts):
1163        """ Reduces once as if all pairs had the same key."""
1164
1165        (keyty, valty, mkkvo) = _get_options(mopts, True)
1166        cmopts = _c_option(mopts, _enabled_options_of_reduce_as_one)
1167        cfn = _wrap_redfn(fn)
1168        ckvi = self._ckvs
1169        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1170        ckvo = (kvo._ckvs if (kvo is not None) else None)
1171        kmrso.kmr_reduce_as_one(ckvi, ckvo, 0, cmopts, cfn)
1172        if (cmopts.inspect == 0): self._consume()
1173        return kvo

Reduces once as if all pairs had the same key.

def reduce_for_some(self, fn, **mopts):
1175    def reduce_for_some(self, fn, **mopts):
1176        """Reduces until some key-value are added."""
1177
1178        (keyty, valty, mkkvo) = _get_options(mopts, True)
1179        cmopts = _c_option(mopts, _enabled_options_of_reduce)
1180        cfn = _wrap_redfn(fn)
1181        ckvi = self._ckvs
1182        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1183        ckvo = (kvo._ckvs if (kvo is not None) else None)
1184        ## (NOTE: It passes a frame of reduce_for_some.)
1185        (f, l, n) = _make_frame_info(inspect.currentframe())
1186        kmrso.kmr_reduce9(1, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1187        if (cmopts.inspect == 0): self._consume()
1188        return kvo

Reduces until some key-value are added.

def reverse(self, **mopts):
1190    def reverse(self, **mopts):
1191        """Makes a new pair by swapping the key and the value."""
1192
1193        keyty = self.get_field_type(_Slot.Key)
1194        valty = self.get_field_type(_Slot.Value)
1195        (_, _, mkkvo) = _get_options(mopts, False)
1196        cmopts = _c_option(mopts, _enabled_options_of_map)
1197        assert (mkkvo is True)
1198        ckvi = self._ckvs
1199        kvo = (KVS(self.mr, valty, keyty) if mkkvo else None)
1200        ckvo = (kvo._ckvs if (kvo is not None) else None)
1201        kmrso.kmr_reverse(ckvi, ckvo, cmopts)
1202        if (cmopts.inspect == 0): self._consume()
1203        return kvo

Makes a new pair by swapping the key and the value.

def shuffle(self, **mopts):
1205    def shuffle(self, **mopts):
1206        """Shuffles key-value pairs."""
1207
1208        keyty = self.get_field_type(_Slot.Key)
1209        valty = self.get_field_type(_Slot.Value)
1210        (_, _, mkkvo) = _get_options(mopts, False)
1211        cmopts = _c_option(mopts, _enabled_options_of_shuffle)
1212        ckvi = self._ckvs
1213        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1214        ckvo = (kvo._ckvs if (kvo is not None) else None)
1215        kmrso.kmr_shuffle(ckvi, ckvo, cmopts)
1216        if (cmopts.inspect == 0): self._consume()
1217        return kvo

Shuffles key-value pairs.

def replicate(self, **mopts):
1219    def replicate(self, **mopts):
1220        """Replicates key-value pairs to be visible on all ranks."""
1221
1222        keyty = self.get_field_type(_Slot.Key)
1223        valty = self.get_field_type(_Slot.Value)
1224        (_, _, mkkvo) = _get_options(mopts, False)
1225        cmopts = _c_option(mopts, _enabled_options_of_replicate)
1226        ckvi = self._ckvs
1227        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1228        ckvo = (kvo._ckvs if (kvo is not None) else None)
1229        kmrso.kmr_replicate(ckvi, ckvo, cmopts)
1230        if (cmopts.inspect == 0): self._consume()
1231        return kvo

Replicates key-value pairs to be visible on all ranks.

def distribute(self, cyclic, **mopts):
1233    def distribute(self, cyclic, **mopts):
1234        """Distributes pairs approximately evenly to ranks."""
1235
1236        keyty = self.get_field_type(_Slot.Key)
1237        valty = self.get_field_type(_Slot.Value)
1238        (_, _, mkkvo) = _get_options(mopts, False)
1239        cmopts = _c_option(mopts, _enabled_options_of_distribute)
1240        ckvi = self._ckvs
1241        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1242        ckvo = (kvo._ckvs if (kvo is not None) else None)
1243        kmrso.kmr_distribute(ckvi, ckvo, cyclic, cmopts)
1244        if (cmopts.inspect == 0): self._consume()
1245        return kvo

Distributes pairs approximately evenly to ranks.

def sort_locally(self, shuffling, **mopts):
1247    def sort_locally(self, shuffling, **mopts):
1248        """Reorders key-value pairs in a single rank."""
1249
1250        keyty = self.get_field_type(_Slot.Key)
1251        valty = self.get_field_type(_Slot.Value)
1252        (_, _, mkkvo) = _get_options(mopts, False)
1253        cmopts = _c_option(mopts, _enabled_options_of_sort_locally)
1254        ckvi = self._ckvs
1255        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1256        ckvo = (kvo._ckvs if (kvo is not None) else None)
1257        kmrso.kmr_sort_locally(ckvi, ckvo, shuffling, cmopts)
1258        if (cmopts.inspect == 0): self._consume()
1259        return kvo

Reorders key-value pairs in a single rank.

def sort(self, **mopts):
1261    def sort(self, **mopts):
1262        """Sorts a KVS globally."""
1263
1264        keyty = self.get_field_type(_Slot.Key)
1265        valty = self.get_field_type(_Slot.Value)
1266        (_, _, mkkvo) = _get_options(mopts, False)
1267        cmopts = _c_option(mopts, _enabled_options_of_sort)
1268        ckvi = self._ckvs
1269        kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1270        ckvo = (kvo._ckvs if (kvo is not None) else None)
1271        kmrso.kmr_sort(ckvi, ckvo, cmopts)
1272        if (cmopts.inspect == 0): self._consume()
1273        return kvo

Sorts a KVS globally.

def concatenate(self, *morekvs):
1275    def concatenate(self, *morekvs):
1276        """Concatenates a number of KVS'es to one."""
1277
1278        keyty = self.get_field_type(_Slot.Key)
1279        valty = self.get_field_type(_Slot.Value)
1280        siz = (len(morekvs) + 1)
1281        ckvsvec = (_c_kvs * siz)()
1282        ckvsvec[0] = self._ckvs
1283        for i in range(0, len(morekvs)):
1284            ckvsvec[i + 1] = morekvs[i]._ckvs
1285        cn = _c_int(siz)
1286        kvo = KVS(self.mr, keyty, valty)
1287        ckvo = kvo._ckvs
1288        kmrso.kmr_concatenate_kvs(ckvsvec, cn, ckvo, _c_option())
1289        for i in morekvs:
1290            i._consume()
1291        self._consume()
1292        return kvo

Concatenates a number of KVS'es to one.

def read_files_reassemble(self, filename, color, offset, bytes_):
1294    def read_files_reassemble(self, filename, color, offset, bytes_):
1295        """Reassembles files reading by ranks."""
1296
1297        buf = _c_void_p()
1298        siz = _c_uint64(0)
1299        kmrso.kmr_read_files_reassemble(
1300            self.mr._ckmr, _encode(filename), color, offset, bytes_,
1301            ctypes.byref(buf), ctypes.byref(siz))
1302        addr = buf.value
1303        ptr = (_c_ubyte * siz.value).from_address(addr)
1304        data = bytearray(ptr)
1305        kmrso.kmr_mfree(addr, siz.value)
1306        return data

Reassembles files reading by ranks.

def read_file_by_segments(self, filename, color):
1308    def read_file_by_segments(self, filename, color):
1309        """Reads one file by segments and reassembles."""
1310
1311        buf = _c_void_p()
1312        siz = _c_uint64(0)
1313        kmrso.kmr_read_file_by_segments(
1314            self.mr._ckmr, _encode(filename), color,
1315            ctypes.byref(buf), ctypes.byref(siz))
1316        addr = buf.value
1317        ptr = (_c_ubyte * siz.value).from_address(addr)
1318        data = bytearray(ptr)
1319        kmrso.kmr_mfree(addr, siz.value)
1320        return data

Reads one file by segments and reassembles.

def save(self):
1322    def save(self):
1323        """Packs locally the contents of a KVS to a byte array."""
1324
1325        buf = _c_void_p(0)
1326        siz = _c_size_t(0)
1327        kmrso.kmr_save_kvs(self._ckvs, ctypes.byref(buf), ctypes.byref(siz),
1328                           _c_option())
1329        addr = buf.value
1330        ptr = (_c_ubyte * siz.value).from_address(addr)
1331        data = bytearray(ptr)
1332        kmrso.kmr_mfree(addr, siz.value)
1333        return data

Packs locally the contents of a KVS to a byte array.

def restore(self, data):
1335    def restore(self, data):
1336        """Unpacks locally the contents of a KVS from a byte array."""
1337
1338        kvo = KVS(self.mr, "opaque", "opaque")
1339        siz = len(data)
1340        addr = (_c_ubyte * siz).from_buffer(data)
1341        kmrso.kmr_restore_kvs(kvo._ckvs, addr, siz, _c_option())
1342        return kvo

Unpacks locally the contents of a KVS from a byte array.

def fin():
1358def fin():
1359    """Finishes using KMR4PY."""
1360
1361    kmrso.kmr_fin()
1362    return

Finishes using KMR4PY.

def listify(kvs):
1364def listify(kvs):
1365    """Returns an array of LOCAL contents."""
1366
1367    a = kvs.local_element_count() * [None]
1368    def f (kv, kvi, kvo, i, *_data):
1369        a[i] = kv
1370        return 0
1371    kvo = kvs.map(f, output=False, inspect=True)
1372    assert (kvo is None)
1373    return a

Returns an array of LOCAL contents.