11 from optparse
import OptionParser
19 def _check_exist(path):
20 if not os.path.exists(path):
21 print(
'Error: file or dir "%s" is not exist.' % path, file=sys.stderr)
30 def check_cmdline(cmdline, sched):
31 _check_exist(cmdline.split()[0])
32 if sched.upper() ==
'K':
33 cmdlns = cmdline.split()
34 cmdlns[0] =
'./' + os.path.basename(cmdlns[0])
35 return ' '.join(cmdlns)
43 def check_indir(dirname, sched):
45 if sched.upper() ==
'K':
46 _dirname = dirname.rstrip().rstrip(
'/')
47 return './' + os.path.basename(_dirname)
58 def check_restart(restart_basename, procstr, sched):
59 if sched.upper() ==
'K':
60 if restart_basename
is None:
return 61 ckpt_prefix = restart_basename +
'.' 63 ckpt_prefix =
'ckptdir' 64 repatter = re.compile(
r'^%s\d+$' % ckpt_prefix)
65 files = os.listdir(
'./')
68 if repatter.match(file_):
72 nprocs_file = ckpt_prefix +
'00000/nprocs' 73 if not os.path.exists(nprocs_file):
74 print(
'Error: Checkpoint nproc file %s not exit.\n' % nprocs_file,
77 preprocstr = open(nprocs_file).read()
78 preproc = preprocstr.split(
"=")[1]
79 if count != int(preproc):
80 print(
'Error: Do not match number of checkpoint file and ' \
81 'executed process. ***\n', file=sys.stderr)
83 proc = k_node_to_int(procstr)
84 if proc > int(preproc):
85 print(
'Error: On restart, increasing number of process is ' \
86 'not supported. ***\n', file=sys.stderr)
89 sys.stderr.write(
"*** Reduction mode. ***\n")
95 def k_node_to_int(shape_str):
96 m = re.match(
r"(\d+)x?(\d+)?x?(\d+)?(:strict)?", shape_str)
98 for mstr
in m.groups()[0:3]:
121 def k_scheduler(name, queue, rsctime, node, kmrrun_path, kmrrun_parameter,
122 template_path, shape, proc, mapper, kvgen, reducer, indir,
123 ckpt, restart_basename):
127 mapper_cmd = mapper.split()[0]
128 mapper_cmd_base = os.path.basename(mapper_cmd)
129 stginstr +=
'#PJM --stgin "%s %s"' % (mapper_cmd, mapper_cmd_base)
133 kvgen_cmd = kvgen.split()[0]
134 kvgen_cmd_base = os.path.basename(kvgen_cmd)
135 stginstr +=
'#PJM --stgin "%s %s"' % (kvgen_cmd, kvgen_cmd_base)
139 reducer_cmd = reducer.split()[0]
140 reducer_cmd_base = os.path.basename(reducer_cmd)
141 stginstr +=
'#PJM --stgin "%s %s"' % (reducer_cmd, reducer_cmd_base)
144 indir_stgin =
'./' + os.path.basename(indir.rstrip().rstrip(
'/'))
145 stginstr +=
'#PJM --stgin "%s/* %s/"' % (indir, indir_stgin)
148 fname = os.path.basename(restart_basename) +
'.00000/nprocs' 149 nproc = int(open(fname).read().split(
'=')[1])
150 for rank
in range(nproc):
152 stginstr +=
'#PJM --stgin "./%s.%05d/* ./ckptdir%05d/"' \
153 % (restart_basename, rank, rank)
156 stgoutstr =
"#\n# !!WRITE STGOUT HERE!!\n#" 158 if ckpt
or restart_basename:
159 for rank
in range(k_node_to_int(proc)):
161 stgoutstr +=
'#PJM --stgout "./ckptdir%05d/* ' \
162 './ckptdir_%%j.%05d/"' % (rank, rank)
164 execstr =
'mpiexec -n %d ./kmrrun %s' % (k_node_to_int(proc), kmrrun_parameter)
166 template = open(template_path).read()
167 return template % {
'NAME': name,
'QUEUE': queue,
'NODE': node,
168 'RSCTIME': rsctime,
'KMRRUN': kmrrun_path,
169 'SHAPE': shape,
'PROC': proc,
'DATASTGIN': stginstr,
170 'DATASTGOUT': stgoutstr,
'EXEC': execstr}
182 def focus_scheduler(name, queue, rsctime, node, kmrrun_path, kmrrun_parameter,
184 template = open(template_path).read()
185 return template % {
'NAME': name,
'QUEUE': queue,
'NODE': node,
186 'RSCTIME': rsctime,
'KMRRUN': kmrrun_path,
187 'KMRRUN_PARAM': kmrrun_parameter}
194 def select_scheduler(opts, sched):
196 template_dir = kmrhome +
'/lib' 197 kmrrun_path = template_dir +
'/kmrrun' 198 if not os.path.exists(kmrrun_path):
202 kmrrun_path = template_dir +
'/../kmrrun/kmrrun' 203 if not os.path.exists(kmrrun_path):
205 print(
'Error: could not find kmrrun utility.', file=sys.stderr)
211 rsctime = options.rsctime
212 mapper = check_cmdline(opts.mapper, sched)
213 kvgen = check_cmdline(opts.kvgen, sched)
214 reducer = check_cmdline(opts.reducer, sched)
215 kmrrun_parameter =
'' 217 kmrrun_parameter +=
'-n %s ' % (opts.taskproc)
219 kmrrun_parameter +=
'-m "%s" ' % (mapper)
221 kmrrun_parameter +=
'-k "%s" ' % (kvgen)
223 kmrrun_parameter +=
'-r "%s" ' % (reducer)
224 if opts.ckpt
or opts.restart:
225 kmrrun_parameter +=
'--ckpt ' 226 kmrrun_parameter += check_indir(opts.indir, sched)
231 if sched.upper() ==
'K':
232 script = k_scheduler(name, queue, rsctime, node, kmrrun_path,
234 template_dir +
'/kmrrungenscript.template.k',
235 opts.shape, opts.proc, opts.mapper, opts.kvgen,
236 opts. reducer, opts.indir,
237 opts.ckpt, opts.restart)
238 elif sched.upper() ==
'FOCUS':
239 script = focus_scheduler(name, queue, rsctime, node, kmrrun_path,
241 template_dir +
'/kmrrungenscript.template.focus')
244 print(
'Unknown scheduler', file=sys.stderr)
248 if opts.scrfile
is None:
251 out = open(opts.scrfile,
"w")
252 print(script, file=out)
259 def warn_stageout(opts):
260 if opts.sched !=
'K':
264 ######################################################################### 265 Don't forget to write stage-out directives for MapReduce output files. 267 if opts.ckpt
or opts.restart:
269 A job script generated by this program stages-out only checkpoint files. 273 ######################################################################### 275 print(message, file=sys.stderr)
281 if __name__ ==
"__main__":
283 usage =
"usage: %prog [options] -m mapper [-k keygener -r reducer]" 284 parser = OptionParser(usage)
286 parser.add_option(
"-q",
290 help=
"queue to submit your job",
294 parser.add_option(
"-t",
298 help=
"job execution time (default is '00:10:00')",
302 parser.add_option(
"-e",
306 help=
"number of node (default is '12')",
310 parser.add_option(
"-s",
314 help=
"mpi process shape. " 315 "Valid only on K scheduler. (default is '1')",
319 parser.add_option(
"-p",
323 help=
"number of mpi processes. " 324 "Valid only on K scheduler. (default is '8')",
328 parser.add_option(
"-d",
332 help=
"input file directory. " 333 "When used on K computer, this directory should be one " 334 "located in K global storage that is staged-in. " 335 "(default is './input')",
339 parser.add_option(
"-n",
343 help=
"number of processes to run each mapper/reducer " 348 parser.add_option(
"-m",
352 help=
"mapper command path and its arguments",
355 parser.add_option(
"-k",
359 help=
"kv generator command path and its arguments",
362 parser.add_option(
"-r",
366 help=
"reducer command path and its arguments",
369 parser.add_option(
"-C",
373 help=
"enable Checkpoint/Restart (default is false)",
376 parser.add_option(
"-R",
377 "--restart-filename",
380 help=
"specify prefix of directories where checkpoint " 381 "files are located. " 382 "This option should be given when restarting on " 383 "a system that requires staging. " 384 "Valid only on K scheduler.",
387 parser.add_option(
"-S",
391 help=
"scheduler type. " 392 "Specify Scheduler 'K' or 'FOCUS'. " 393 "'K' supports K computer/FX10 and 'FOCUS' supports " 394 "Focus supercomputer. (default is 'K')",
398 parser.add_option(
"-w",
399 "--write-scriptfile",
402 help=
"output job script filename",
405 (options, args) = parser.parse_args()
409 parser.error(
"Error: Missing parameter")
411 if not options.mapper:
412 print(
"Error: Mapper is not specified\n", file=sys.stderr)
414 if options.reducer
and not options.kvgen:
415 print(
"Error: Specify kv generator when reducer is specified\n",
420 if options.sched ==
'K':
421 check_restart(options.restart, options.proc,
'K')
423 check_restart(options.restart,
'1', options.sched)
425 select_scheduler(options, options.sched)
426 warn_stageout(options)