KMR
kmrrungenscript.in.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 # -*-coding: utf-8;-*-
3 
4 ## Copyright (C) 2012-2018 RIKEN R-CCS
5 
6 ## \file kmrrungenscript.in.py KMRRUN Job-Script Generator.
7 
8 import sys
9 import os
10 import re
11 from optparse import OptionParser
12 
13 kmrhome = '@KMRHOME@'
14 
15 ## Checks file existence.
16 # If file does not exist, it prints an error message and exit.
17 # @param path file path for check.
18 
19 def _check_exist(path):
20  if not os.path.exists(path):
21  print('Error: file or dir "%s" is not exist.' % path, file=sys.stderr)
22  sys.exit()
23  return path
24 
25 
26 ## Check if command in the specified command line exists.
27 # @param cmdline a command line to be executed
28 # @param sched string that represents scheduler type
29 
30 def check_cmdline(cmdline, sched):
31  _check_exist(cmdline.split()[0])
32  if sched.upper() == 'K':
33  cmdlns = cmdline.split()
34  cmdlns[0] = './' + os.path.basename(cmdlns[0])
35  return ' '.join(cmdlns)
36  else:
37  return cmdline
38 
39 ## Check if command in the specified command line exists.
40 # @param dirname name of directory where input files are located
41 # @param sched string that represents scheduler type
42 
43 def check_indir(dirname, sched):
44  _check_exist(dirname)
45  if sched.upper() == 'K':
46  _dirname = dirname.rstrip().rstrip('/')
47  return './' + os.path.basename(_dirname)
48  else:
49  return dirname
50 
51 
52 ## Check restart mode.
53 # If node > number of checkpoint file, error.
54 # @param restart_basename prefix of checkpoint directory name
55 # @param procstr string that represents process number
56 # @param sched string that represents scheduler type
57 
58 def check_restart(restart_basename, procstr, sched):
59  if sched.upper() == 'K':
60  if restart_basename is None: return
61  ckpt_prefix = restart_basename + '.'
62  else:
63  ckpt_prefix = 'ckptdir'
64  repatter = re.compile(r'^%s\d+$' % ckpt_prefix)
65  files = os.listdir('./')
66  count = 0
67  for file_ in files:
68  if repatter.match(file_):
69  count += 1
70  if count == 0: return
71 
72  nprocs_file = ckpt_prefix + '00000/nprocs'
73  if not os.path.exists(nprocs_file):
74  print('Error: Checkpoint nproc file %s not exit.\n' % nprocs_file,
75  file=sys.stderr)
76  sys.exit()
77  preprocstr = open(nprocs_file).read()
78  preproc = preprocstr.split("=")[1]
79  if count != int(preproc):
80  print('Error: Do not match number of checkpoint file and ' \
81  'executed process. ***\n', file=sys.stderr)
82  sys.exit()
83  proc = k_node_to_int(procstr)
84  if proc > int(preproc):
85  print('Error: On restart, increasing number of process is ' \
86  'not supported. ***\n', file=sys.stderr)
87  sys.exit()
88  if count > proc:
89  sys.stderr.write("*** Reduction mode. ***\n")
90 
91 
92 ## Parse K node declaration into an integer.
93 # @param shape_str string that represents K node shape
94 
95 def k_node_to_int(shape_str):
96  m = re.match(r"(\d+)x?(\d+)?x?(\d+)?(:strict)?", shape_str)
97  prdct = 1
98  for mstr in m.groups()[0:3]:
99  if mstr:
100  prdct *= int(mstr)
101  return prdct
102 
103 
104 ## Generates job-script for K.
105 # @param name name of the job
106 # @param queue queue to submit job
107 # @param rsctime resource time limit
108 # @param node number of node to execute.
109 # @param kmrrun_path path to kmrrun command
110 # @param kmrrun_parameter parameter for kmrrun
111 # @param template_path path for template file
112 # @param shape mpi process shape
113 # @param proc number of execute proc
114 # @param mapper mapper command line
115 # @param kvgen kv generator command line
116 # @param reducer reducer command line
117 # @param indir directory where inputs are located(staged-in)
118 # @param ckpt enable checkpoint
119 # @param restart_basename prefix of checkpoint directory name
120 
121 def k_scheduler(name, queue, rsctime, node, kmrrun_path, kmrrun_parameter,
122  template_path, shape, proc, mapper, kvgen, reducer, indir,
123  ckpt, restart_basename):
124  # Stage in section
125  stginstr = ''
126  if mapper:
127  mapper_cmd = mapper.split()[0]
128  mapper_cmd_base = os.path.basename(mapper_cmd)
129  stginstr += '#PJM --stgin "%s %s"' % (mapper_cmd, mapper_cmd_base)
130  if kvgen:
131  if len(stginstr):
132  stginstr += '\n'
133  kvgen_cmd = kvgen.split()[0]
134  kvgen_cmd_base = os.path.basename(kvgen_cmd)
135  stginstr += '#PJM --stgin "%s %s"' % (kvgen_cmd, kvgen_cmd_base)
136  if reducer:
137  if len(stginstr):
138  stginstr += '\n'
139  reducer_cmd = reducer.split()[0]
140  reducer_cmd_base = os.path.basename(reducer_cmd)
141  stginstr += '#PJM --stgin "%s %s"' % (reducer_cmd, reducer_cmd_base)
142  if len(stginstr):
143  stginstr += '\n'
144  indir_stgin = './' + os.path.basename(indir.rstrip().rstrip('/'))
145  stginstr += '#PJM --stgin "%s/* %s/"' % (indir, indir_stgin)
146  # Stage in ckpt files
147  if restart_basename:
148  fname = os.path.basename(restart_basename) + '.00000/nprocs'
149  nproc = int(open(fname).read().split('=')[1])
150  for rank in range(nproc):
151  stginstr += '\n'
152  stginstr += '#PJM --stgin "./%s.%05d/* ./ckptdir%05d/"' \
153  % (restart_basename, rank, rank)
154 
155  # Stage out section
156  stgoutstr = "#\n# !!WRITE STGOUT HERE!!\n#"
157  # Stage out ckpt files
158  if ckpt or restart_basename:
159  for rank in range(k_node_to_int(proc)):
160  stgoutstr += '\n'
161  stgoutstr += '#PJM --stgout "./ckptdir%05d/* ' \
162  './ckptdir_%%j.%05d/"' % (rank, rank)
163 
164  execstr = 'mpiexec -n %d ./kmrrun %s' % (k_node_to_int(proc), kmrrun_parameter)
165 
166  template = open(template_path).read()
167  return template % {'NAME': name, 'QUEUE': queue, 'NODE': node,
168  'RSCTIME': rsctime, 'KMRRUN': kmrrun_path,
169  'SHAPE': shape, 'PROC': proc, 'DATASTGIN': stginstr,
170  'DATASTGOUT': stgoutstr, 'EXEC': execstr}
171 
172 
173 ## Generates job-script for FOCUS supercomputer
174 # @param name name of the job
175 # @param queue queue to submit job
176 # @param rsctime resource time limit
177 # @param node number of MPI processes to use
178 # @param kmrrun_path path to kmrrun command
179 # @param kmrrun_parameter parameter for kmrrun
180 # @param template_path path for template file
181 
182 def focus_scheduler(name, queue, rsctime, node, kmrrun_path, kmrrun_parameter,
183  template_path):
184  template = open(template_path).read()
185  return template % {'NAME': name, 'QUEUE': queue, 'NODE': node,
186  'RSCTIME': rsctime, 'KMRRUN': kmrrun_path,
187  'KMRRUN_PARAM': kmrrun_parameter}
188 
189 
190 ## Selects job-scheduler.
191 # @param opts Options to the generator
192 # @param sched scheduler
193 
194 def select_scheduler(opts, sched):
195  # find kmrrun and its job-scheduler templates
196  template_dir = kmrhome + '/lib'
197  kmrrun_path = template_dir + '/kmrrun'
198  if not os.path.exists(kmrrun_path):
199  # kmrrun does not exist in the install directory. In this case,
200  # We assume that we are working in KMRSRC/cmd directory.
201  template_dir = '.'
202  kmrrun_path = template_dir + '/../kmrrun/kmrrun'
203  if not os.path.exists(kmrrun_path):
204  # error exit
205  print('Error: could not find kmrrun utility.', file=sys.stderr)
206  sys.exit()
207 
208  # set parameters
209  queue = opts.queue
210  node = opts.node
211  rsctime = options.rsctime
212  mapper = check_cmdline(opts.mapper, sched)
213  kvgen = check_cmdline(opts.kvgen, sched)
214  reducer = check_cmdline(opts.reducer, sched)
215  kmrrun_parameter = ''
216  if opts.taskproc:
217  kmrrun_parameter += '-n %s ' % (opts.taskproc)
218  if opts.mapper:
219  kmrrun_parameter += '-m "%s" ' % (mapper)
220  if opts.kvgen:
221  kmrrun_parameter += '-k "%s" ' % (kvgen)
222  if opts.reducer:
223  kmrrun_parameter += '-r "%s" ' % (reducer)
224  if opts.ckpt or opts.restart:
225  kmrrun_parameter += '--ckpt '
226  kmrrun_parameter += check_indir(opts.indir, sched)
227  name = 'kmrrun_job'
228  if opts.scrfile:
229  name = opts.scrfile
230 
231  if sched.upper() == 'K':
232  script = k_scheduler(name, queue, rsctime, node, kmrrun_path,
233  kmrrun_parameter,
234  template_dir + '/kmrrungenscript.template.k',
235  opts.shape, opts.proc, opts.mapper, opts.kvgen,
236  opts. reducer, opts.indir,
237  opts.ckpt, opts.restart)
238  elif sched.upper() == 'FOCUS':
239  script = focus_scheduler(name, queue, rsctime, node, kmrrun_path,
240  kmrrun_parameter,
241  template_dir + '/kmrrungenscript.template.focus')
242  # for other schedulers...
243  else:
244  print('Unknown scheduler', file=sys.stderr)
245  sys.exit()
246 
247  # output script
248  if opts.scrfile is None:
249  print(script)
250  else:
251  out = open(opts.scrfile, "w")
252  print(script, file=out)
253  out.close()
254 
255 
256 ## Warn to write Stage-out section.
257 # @param opts Options to the generator
258 
259 def warn_stageout(opts):
260  if opts.sched != 'K':
261  return
262 
263  message = """
264 #########################################################################
265 Don't forget to write stage-out directives for MapReduce output files.
266 """[1:-1]
267  if opts.ckpt or opts.restart:
268  message += """
269 A job script generated by this program stages-out only checkpoint files.
270 """[0:-1]
271 
272  message += """
273 #########################################################################
274 """
275  print(message, file=sys.stderr)
276 
277 
278 ## kmrgenscript main routine.
279 # It works on Python 2.4 or later.
280 
281 if __name__ == "__main__":
282 
283  usage = "usage: %prog [options] -m mapper [-k keygener -r reducer]"
284  parser = OptionParser(usage)
285 
286  parser.add_option("-q",
287  "--queue",
288  dest="queue",
289  type="string",
290  help="queue to submit your job",
291  metavar="'string'",
292  default='None')
293 
294  parser.add_option("-t",
295  "--resource-time",
296  dest="rsctime",
297  type="string",
298  help="job execution time (default is '00:10:00')",
299  metavar="'string'",
300  default='00:10:00')
301 
302  parser.add_option("-e",
303  "--number-of-node",
304  dest="node",
305  type="string",
306  help="number of node (default is '12')",
307  metavar="'string'",
308  default='12')
309 
310  parser.add_option("-s",
311  "--shape",
312  dest="shape",
313  type="string",
314  help="mpi process shape. "
315  "Valid only on K scheduler. (default is '1')",
316  metavar="'string'",
317  default='1')
318 
319  parser.add_option("-p",
320  "--proc",
321  dest="proc",
322  type="string",
323  help="number of mpi processes. "
324  "Valid only on K scheduler. (default is '8')",
325  metavar="'string'",
326  default='8')
327 
328  parser.add_option("-d",
329  "--inputdir",
330  dest="indir",
331  type="string",
332  help="input file directory. "
333  "When used on K computer, this directory should be one "
334  "located in K global storage that is staged-in. "
335  "(default is './input')",
336  metavar="'string'",
337  default='./input')
338 
339  parser.add_option("-n",
340  "--task-proc",
341  dest="taskproc",
342  type="string",
343  help="number of processes to run each mapper/reducer "
344  "(default is 1)",
345  metavar="number",
346  default=1)
347 
348  parser.add_option("-m",
349  "--mapper",
350  dest="mapper",
351  type="string",
352  help="mapper command path and its arguments",
353  metavar="'string'")
354 
355  parser.add_option("-k",
356  "--kvgen",
357  dest="kvgen",
358  type="string",
359  help="kv generator command path and its arguments",
360  metavar="'string'")
361 
362  parser.add_option("-r",
363  "--reducer",
364  dest="reducer",
365  type="string",
366  help="reducer command path and its arguments",
367  metavar="'string'")
368 
369  parser.add_option("-C",
370  "--ckpt",
371  dest="ckpt",
372  action="store_true",
373  help="enable Checkpoint/Restart (default is false)",
374  default=False)
375 
376  parser.add_option("-R",
377  "--restart-filename",
378  dest="restart",
379  type="string",
380  help="specify prefix of directories where checkpoint "
381  "files are located. "
382  "This option should be given when restarting on "
383  "a system that requires staging. "
384  "Valid only on K scheduler.",
385  metavar="'string'")
386 
387  parser.add_option("-S",
388  "--scheduler",
389  dest="sched",
390  type="string",
391  help="scheduler type. "
392  "Specify Scheduler 'K' or 'FOCUS'. "
393  "'K' supports K computer/FX10 and 'FOCUS' supports "
394  "Focus supercomputer. (default is 'K')",
395  metavar="'string'",
396  default='K')
397 
398  parser.add_option("-w",
399  "--write-scriptfile",
400  dest="scrfile",
401  type="string",
402  help="output job script filename",
403  metavar="'string'")
404 
405  (options, args) = parser.parse_args()
406 
407  # check parameters.
408  if len(args) != 0:
409  parser.error("Error: Missing parameter")
410  sys.exit()
411  if not options.mapper:
412  print("Error: Mapper is not specified\n", file=sys.stderr)
413  sys.exit()
414  if options.reducer and not options.kvgen:
415  print("Error: Specify kv generator when reducer is specified\n",
416  file=sys.stderr)
417  sys.exit()
418 
419  if options.ckpt:
420  if options.sched == 'K':
421  check_restart(options.restart, options.proc, 'K')
422  else:
423  check_restart(options.restart, '1', options.sched)
424 
425  select_scheduler(options, options.sched)
426  warn_stageout(options)
427 
428 
429 # Copyright (C) 2012-2018 RIKEN R-CCS
430 # This library is distributed WITHOUT ANY WARRANTY. This library can be
431 # redistributed and/or modified under the terms of the BSD 2-Clause License.