KMR
Classes | Macros | Enumerations | Functions | Variables
kmrmapms.c File Reference

Master-Worker Mapping on Key-Value Stream. More...

#include <mpi.h>
#include <stddef.h>
#include <stdlib.h>
#include <unistd.h>
#include <limits.h>
#include <poll.h>
#include <netdb.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <sys/param.h>
#include <arpa/inet.h>
#include <sys/mman.h>
#include <assert.h>
#include "kmr.h"
#include "kmrimpl.h"

Go to the source code of this file.

Classes

struct  kmr_map_ms_commands_argument
 
struct  kmr_spawn_state
 State of each Spawning. More...
 
struct  kmr_spawning
 State of Spawner. More...
 

Macros

#define KMR_RPC_ID_FIN   -2
 
#define KMR_RPC_ID_NONE   -1
 
#define MAX(a, b)   (((a)>(b))?(a):(b))
 
#define MIN(a, b)   (((a)<(b))?(a):(b))
 

Enumerations

enum  { KMR_RPC_NONE, KMR_RPC_GOON, KMR_RPC_DONE }
 
enum  kmr_spawn_mode { KMR_SPAWN_INTERACT, KMR_SPAWN_SERIAL, KMR_SPAWN_PARALLEL }
 

Functions

static int kmr_accept_on_watch (KMR *mr, struct kmr_spawning *spw, int index)
 
static void kmr_assert_peer_tag (int tag)
 
int kmr_check_exec__ (KMR *mr)
 
KMRkmr_create_dummy_context (void)
 
static int kmr_exec_command (const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
 Runs commands in kmr_map_ms_commands(). More...
 
static int kmr_exec_command_e (_Bool use_exec, const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
 
static int kmr_free_comm_with_tracing (KMR *mr, struct kmr_spawning *spw, struct kmr_spawn_state *s)
 
MPI_Comm * kmr_get_spawner_communicator (KMR *mr, long index)
 Obtains (a reference to) a parent inter-communicator of a spawned process. More...
 
int kmr_get_spawner_communicator_ff (KMR *mr, long ii, int *comm)
 
static int kmr_list_spawns (struct kmr_spawning *spw, KMR_KVS *kvi, MPI_Info info, struct kmr_spawn_option opt)
 
static int kmr_listen_to_watch (KMR *mr, struct kmr_spawning *spw, int index)
 
static int kmr_map_master (KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
 Delivers key-value pairs as requested. More...
 
int kmr_map_ms (KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
 Maps in master-worker mode. More...
 
int kmr_map_ms_commands (KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, struct kmr_spawn_option sopt, kmr_mapfn_t m)
 Maps in the master-worker mode, specialized to run serial commands. More...
 
int kmr_map_parallel_processes (KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
 Maps on processes started by MPI_Comm_spawn() to run independent MPI processes, which will not communicate to the parent. More...
 
int kmr_map_processes (_Bool nonmpi, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
 Maps on processes started by MPI_Comm_spawn() to run independent processes. More...
 
int kmr_map_serial_processes (KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
 Maps on processes started by MPI_Comm_spawn() to run serial processes. More...
 
static int kmr_map_spawned_processes (enum kmr_spawn_mode mode, char *name, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
 
int kmr_map_via_spawn (KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
 Maps on processes started by MPI_Comm_spawn(). More...
 
int kmr_map_via_spawn_ff (KMR_KVS *kvi, KMR_KVS *kvo, void *arg, int finfo, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
 
static int kmr_map_worker (KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
 Asks the master for a task, then calls a map-function. More...
 
static int kmr_receive_for_reply (KMR *mr, struct kmr_spawning *spw, int w, _Bool replyeach, _Bool replyroot)
 
int kmr_receive_kvs_from_spawned_fn (const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
 Collects key-value pairs generated by spawned processes. More...
 
int kmr_reply_to_spawner (KMR *mr)
 Sends a reply message in the spawned process, which tells it is ready to finish and may have some data to send to the spawner in kmr_map_via_spawn(). More...
 
int kmr_send_kvs_to_spawner (KMR *mr, KMR_KVS *kvs)
 Sends the KVS from a spawned process to the map-function of the spawner. More...
 
static void kmr_spawn_info_get (struct kmr_spawn_info *info, struct kmr_spawn_state *s)
 
static void kmr_spawn_info_put (struct kmr_spawn_info *info, struct kmr_spawn_state *s, struct kmr_spawn_option opt, void *arg)
 
static int kmr_sum_on_all_ranks (KMR *mr, int v, int *sum)
 
static int kmr_wait_for_reply (KMR *mr, struct kmr_spawning *spw, struct kmr_spawn_option opt)
 
static int kmr_wait_for_watch (KMR *mr, struct kmr_spawning *spw, struct kmr_spawn_option _)
 
static int kmr_wait_then_map (KMR *mr, struct kmr_spawning *spw, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_spawn_option opt, kmr_mapfn_t m)
 

Variables

static const int kmr_kv_buffer_slack_size = 1024
 

Detailed Description

Master-Worker Mapping on Key-Value Stream.

Definition in file kmrmapms.c.

Function Documentation

◆ kmr_map_master()

static int kmr_map_master ( KMR_KVS kvi,
KMR_KVS kvo,
void *  arg,
struct kmr_option  opt,
kmr_mapfn_t  m 
)
static

Delivers key-value pairs as requested.

It returns MPI_SUCCESS if all done, or MPI_ERR_ROOT otherwise. It finishes the tasks when all nodes have contacted and all worker threads are done. Protocol: (1) Receive an RPC request (KMR_TAG_REQ). A request consists of a triple of integers (task-ID, peer-tag, result-size) ("int req[3]"). The task-ID encodes some special values. (2) Receive a result if a worker has one. (3) Return a new task if available. A reply consists of a tuple of integers (task-ID, argument-size) ("int ack[2]"). (4) Or, return a "no-tasks" indicator by ID=KMR_RPC_ID_NONE. (5) Count "done" messages by ID=KMR_RPC_ID_FIN, which indicates the worker node has finished for all worker threads. The task-ID in an RPC request is KMR_RPC_ID_NONE for the first request (meaning that the request has no result). Peer-tags are used in subsequent messages to direct reply messages to a requesting thread.

Definition at line 72 of file kmrmapms.c.

◆ kmr_map_worker()

static int kmr_map_worker ( KMR_KVS kvi,
KMR_KVS kvo,
void *  arg,
struct kmr_option  opt,
kmr_mapfn_t  m 
)
static

Asks the master for a task, then calls a map-function.

With threading, each thread works independently asking the master for a task. It simply protects MPI send/recv calls by OMP critical sections, but their grain sizes are too large for uses of OMP critical sections.

Definition at line 222 of file kmrmapms.c.

◆ kmr_map_ms()

int kmr_map_ms ( KMR_KVS kvi,
KMR_KVS kvo,
void *  arg,
struct kmr_option  opt,
kmr_mapfn_t  m 
)

Maps in master-worker mode.

The input key-value stream should be empty except on rank0 where the master is running (the contents on the worker ranks are ignored). It consumes the input key-value stream. The master does delivery only. The master returns frequently to give a chance to check-pointing, etc. The master returns immaturely each time one pair is delivered, and those returns are marked by MPI_ERR_ROOT indicating more tasks remain. In contrast, workers return only after all tasks done. The enough state to have to keep during kmr_map_ms() for check-pointing is in the key-value streams KVI and KVO on the master. Note that this totally diverges from bulk-synchronous execution. It does not accept key-value field types KMR_KV_POINTER_OWNED or KMR_KV_POINTER_UNMANAGED. Effective-options: NOTHREADING, KEEP_OPEN. See struct kmr_option.

Definition at line 344 of file kmrmapms.c.

◆ kmr_reply_to_spawner()

int kmr_reply_to_spawner ( KMR mr)

Sends a reply message in the spawned process, which tells it is ready to finish and may have some data to send to the spawner in kmr_map_via_spawn().

Definition at line 1893 of file kmrmapms.c.

◆ kmr_get_spawner_communicator()

MPI_Comm* kmr_get_spawner_communicator ( KMR mr,
long  index 
)

Obtains (a reference to) a parent inter-communicator of a spawned process.

It is used inside a map-function of kmr_map_via_spawn(); Pass INDEX the same argument to a map-function. It returns a reference for the side-effect of freeing a communicator in a map-function.

Definition at line 1916 of file kmrmapms.c.

◆ kmr_map_via_spawn()

int kmr_map_via_spawn ( KMR_KVS kvi,
KMR_KVS kvo,
void *  arg,
MPI_Info  info,
struct kmr_spawn_option  opt,
kmr_mapfn_t  mapfn 
)

Maps on processes started by MPI_Comm_spawn().

It is intended to run custom MPI programs which will return a reply as MPI messages. Consider other variations to run independent processes, when the spawned processes will not interact with the parent: kmr_map_serial_processes(), kmr_map_parallel_processes(), or kmr_map_ms_commands().
A spawner (parent) spawns processes specified by key-value pairs. The key part is ignored, and the value part is a list of null-separated strings which constitutes a command and arguments. The option SEPARATOR_SPACE changes the separator character to whitespaces. If the first string is "maxprocs=n", then the number of processes is taken from this string. Or, an MPI_Info entry "maxprocs" in INFO is used, in which case "maxprocs" is common to all spawns. It is an error if neither is specified. A spawner tries to control the simultaneously running processes limited to the number of processes in the universe. When multiple spawners are active (more than one ranks have the entries to spawn), they divide the universe evenly among them.
The option REPLY_EACH or REPLY_ROOT lets a spawner wait for reply messages from the spawned processes, and then the spawner calls a map-function. A reply message is of the tag KMR_TAG_SPAWN_REPLY=500 and length zero, and kmr_reply_to_spawner() can be used to send this reply. When none of REPLY_EACH or REPLY_ROOT are specified, the spawner immediately calls a map-function one-by-one in the FIFO order (before the spawned processes finish). In that case, no load-balance is taken. Thus, the map-function should wait for the spawned processes to finish, otherwise, a spawner starts next spawns continuously and runs out the processes, which causes the MPI runtime to signal an error.
Communication between the spawned processes and a map-function of a spawner is through an inter-communicator. The parent inter-communicator of the spawned processes can be taken by MPI_Comm_get_parent() as usual. The inter-communicator at the spawner side can be obtained by calling kmr_get_spawner_communicator() inside a map-function.
The INFO argument is passed to MPI_Comm_spawn() after inserting the entries which appear in the command line, when the command line has prefixes of the form "key=value". Insertion of the prefixes can be terminated by an empty entry "=". Use of info is discouraged, because it is not portable and may contradicts to the implicit assumption of the KMR implementation.
NOTE: There is no way to check the availability of processes for spawning in the MPI specification and MPI implementations. And, the MPI runtime signals errors when it runs out the processes. Thus, it puts a sleep (1 sec) in between MPI_Comm_spawn() calls to allow clean-ups in the MPI runtime and to avoid timing issues.
INTERFACE CHANGE: Set mr->spawn_pass_intercomm_in_argument=1 to enables the old interface, where the map-function MAPFN is called with the kmr_spawn_state structure as the general argument. The argument ARG passed to the mapper is stored in the MAPARG slot in the kmr_spawn_state structure. When TAKE_CKPT option is specified, a checkpoint data file of the output key-value stream is saved if both CKPT_ENABLE and CKPT_SELECTIVE global options are set.

Definition at line 1992 of file kmrmapms.c.

◆ kmr_map_parallel_processes()

int kmr_map_parallel_processes ( KMR_KVS kvi,
KMR_KVS kvo,
void *  arg,
MPI_Info  info,
struct kmr_spawn_option  opt,
kmr_mapfn_t  mapfn 
)

Maps on processes started by MPI_Comm_spawn() to run independent MPI processes, which will not communicate to the parent.

The programs need to be MPI. It is a variation of kmr_map_via_spawn(), and refer to the comments on it for the basic usage. Since the spawned program does not know the parent, there is no way to communicate from the spawner. The map-function is called after the processes have exited, so that the map-function can check the result files created by the spawned processes.
This function detects the end of spawned processes using a watch-program "kmrwatch0", by checking a closure of a socket to which "kmrwatch0" connected.
NOTE THAT THIS OPERATION WILL BLOCK INDEFINITELY AND FAIL, DEPENDING ON THE BEHAVIOR OF AN MPI IMPLEMENTATION. It is checked to work with Open MPI (1.6) and MPICH2 (1.5), but not with Intel MPI (4.1) and YAMPI2 (GridMPI 2.1). It depends on the behavior that MPI_Comm_free() on the parent and MPI_Finalize() on the child do not synchronize. The quote of the standard (MPI 2.x) says: "Though collective, MPI_Comm_free is anticipated that this operation will normally be implemented to be local, ..." The blocking situation can be checked by enabling tracing around calls to MPI_Comm_free() by (mr->trace_map_spawn=1).
NOTE (on MPI spawn implementations): Open MPI (1.6) allows to spawn non-MPI processes by passing an special MPI_Info. MPICH2 (1.5) does not allow to spawn non-MPI processes, because MPI_Comm_spawn() of the parent and MPI_Init() of the child synchronize. In Intel MPI (4.1) and YAMPI2 (GridMPI), the calls of MPI_Comm_free() on the parent and MPI_Finalize() or MPI_Comm_free() on the child synchronize, and thus, they require to call MPI_Comm_free() at an appropriate time on the parent.
Options REPLY_ROOT and REPLY_EACH have no effect. When TAKE_CKPT option is specified, a checkpoint data file of the output key-value stream is saved if both CKPT_ENABLE and CKPT_SELECTIVE global options are set.

Definition at line 2037 of file kmrmapms.c.

◆ kmr_map_serial_processes()

int kmr_map_serial_processes ( KMR_KVS kvi,
KMR_KVS kvo,
void *  arg,
MPI_Info  info,
struct kmr_spawn_option  opt,
kmr_mapfn_t  mapfn 
)

Maps on processes started by MPI_Comm_spawn() to run serial processes.

This should NOT be used; Use kmr_map_ms_commands(), instead. Fork-execing in kmr_map_ms_commands() is simpler than spawning. See also the comments on kmr_map_via_spawn() and kmr_map_parallel_processes(). The map-function is called after the processes have exited, thus, there is no way to communicate from the map-function. Instead, the map-function can check the result files created by the spawned processes.
This function detects the end of spawned processes using a watch-program "kmrwatch0" which sends a reply to the parent in place of the serial program. Options REPLY_ROOT and REPLY_EACH have no effect. When TAKE_CKPT option is specified, a checkpoint data file of the output key-value stream is saved if both CKPT_ENABLE and CKPT_SELECTIVE global options are set.

Definition at line 2067 of file kmrmapms.c.

◆ kmr_map_processes()

int kmr_map_processes ( _Bool  nonmpi,
KMR_KVS kvi,
KMR_KVS kvo,
void *  arg,
MPI_Info  info,
struct kmr_spawn_option  opt,
kmr_mapfn_t  mapfn 
)

Maps on processes started by MPI_Comm_spawn() to run independent processes.

It either calls kmr_map_parallel_processes() or kmr_map_serial_processes() with regard to the NONMPI argument. See the comments of kmr_map_parallel_processes() and kmr_map_serial_processes().

Definition at line 2087 of file kmrmapms.c.

◆ kmr_send_kvs_to_spawner()

int kmr_send_kvs_to_spawner ( KMR mr,
KMR_KVS kvs 
)

Sends the KVS from a spawned process to the map-function of the spawner.

It is paired with kmr_receive_kvs_from_spawned_fn().

Definition at line 2127 of file kmrmapms.c.

◆ kmr_receive_kvs_from_spawned_fn()

int kmr_receive_kvs_from_spawned_fn ( const struct kmr_kv_box  kv,
const KMR_KVS kvi,
KMR_KVS kvo,
void *  arg,
const long  index 
)

Collects key-value pairs generated by spawned processes.

It is a map-function to be used with kmr_map_via_spawn() with the REPLY_EACH option. The spawned processes call kmr_send_kvs_to_spawner() to send generated key-value pairs, and this function receives and puts them into KVO. PROTOCOL: The reply consists of one or two messages with the tag KMR_TAG_SPAWN_REPLY1=501. One is the data size, which is followed by a marshaled key-value stream when the data size is non-zero.

Definition at line 2161 of file kmrmapms.c.

◆ kmr_exec_command()

static int kmr_exec_command ( const struct kmr_kv_box  kv,
const KMR_KVS kvi,
KMR_KVS kvo,
void *  arg,
const long  index 
)
static

Runs commands in kmr_map_ms_commands().

It has system(3C) and fork-exec variants.

Definition at line 2394 of file kmrmapms.c.

◆ kmr_map_ms_commands()

int kmr_map_ms_commands ( KMR_KVS kvi,
KMR_KVS kvo,
void *  arg,
struct kmr_option  opt,
struct kmr_spawn_option  sopt,
kmr_mapfn_t  m 
)

Maps in the master-worker mode, specialized to run serial commands.

It executes a command specified by a key-value, then calls a map-function at finishes of the command. It takes the commands in the same way as kmr_map_via_spawn(). The commands never be MPI programs. It uses system(3C) or fork-exec, switching to fork-exec either when the SEPARATOR_SPACE option is specified, a command string includes null characters, or the MAP_MS_USE_EXEC option to KMR is specified. It is implemented with kmr_map_ms(); see the comments on kmr_map_ms().

Definition at line 2432 of file kmrmapms.c.