3609: Documentation improvements. More error checking on run-command list function...
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28 import argparse
29 import json
30 import tempfile
31 import errno
32
33 parser = argparse.ArgumentParser()
34 parser.add_argument('--dry-run', action='store_true')
35 parser.add_argument('--script-parameters', type=str, default="{}")
36 args = parser.parse_args()
37
38 os.umask(0077)
39
40 if not args.dry_run:
41     api = arvados.api('v1')
42     t = arvados.current_task().tmpdir
43     os.chdir(arvados.current_task().tmpdir)
44     os.mkdir("tmpdir")
45     os.mkdir("output")
46
47     os.chdir("output")
48
49     outdir = os.getcwd()
50
51     taskp = None
52     jobp = arvados.current_job()['script_parameters']
53     if len(arvados.current_task()['parameters']) > 0:
54         taskp = arvados.current_task()['parameters']
55 else:
56     outdir = "/tmp"
57     jobp = json.loads(args.script_parameters)
58     os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
59     os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
60     os.environ['CRUNCH_SRC'] = '/tmp/crunche-src'
61     if 'TASK_KEEPMOUNT' not in os.environ:
62         os.environ['TASK_KEEPMOUNT'] = '/keep'
63
64 links = []
65
66 def sub_tmpdir(v):
67     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
68
69 def sub_outdir(v):
70     return outdir
71
72 def sub_cores(v):
73      return str(multiprocessing.cpu_count())
74
75 def sub_jobid(v):
76      return os.environ['JOB_UUID']
77
78 def sub_taskid(v):
79      return os.environ['TASK_UUID']
80
81 def sub_jobsrc(v):
82      return os.environ['CRUNCH_SRC']
83
84 subst.default_subs["task.tmpdir"] = sub_tmpdir
85 subst.default_subs["task.outdir"] = sub_outdir
86 subst.default_subs["job.srcdir"] = sub_jobsrc
87 subst.default_subs["node.cores"] = sub_cores
88 subst.default_subs["job.uuid"] = sub_jobid
89 subst.default_subs["task.uuid"] = sub_taskid
90
91 class SigHandler(object):
92     def __init__(self):
93         self.sig = None
94
95     def send_signal(self, subprocesses, signum):
96         for sp in subprocesses:
97             sp.send_signal(signum)
98         self.sig = signum
99
100 # http://rightfootin.blogspot.com/2006/09/more-on-python-flatten.html
101 def flatten(l, ltypes=(list, tuple)):
102     ltype = type(l)
103     l = list(l)
104     i = 0
105     while i < len(l):
106         while isinstance(l[i], ltypes):
107             if not l[i]:
108                 l.pop(i)
109                 i -= 1
110                 break
111             else:
112                 l[i:i + 1] = l[i]
113         i += 1
114     return ltype(l)
115
116 def add_to_group(gr, match):
117     m = match.groups()
118     if m not in gr:
119         gr[m] = []
120     gr[m].append(match.group(0))
121
122 class EvaluationError(Exception):
123     pass
124
125 # Return the name of variable ('var') that will take on each value in 'items'
126 # when performing an inner substitution
127 def var_items(p, c, key):
128     if key not in c:
129         raise EvaluationError("'%s' was expected in 'p' but is missing" % key)
130
131     if "var" in c:
132         if not isinstance(c["var"], basestring):
133             raise EvaluationError("Value of 'var' must be a string")
134         # Var specifies the variable name for inner parameter substitution
135         return (c["var"], get_items(p, c[key]))
136     else:
137         # The component function ('key') value is a list, so return the list
138         # directly with no parameter selected.
139         if isinstance(c[key], list):
140             return (None, get_items(p, c[key]))
141         elif isinstance(c[key], basestring):
142             # check if c[key] is a string that looks like a parameter
143             m = re.match("^\$\((.*)\)$", c[key])
144             if m and m.group(1) in p:
145                 return (m.group(1), get_items(p, c[key]))
146             else:
147                 # backwards compatible, foreach specifies bare parameter name to use
148                 return (c[key], get_items(p, p[c[key]]))
149         else:
150             raise EvaluationError("Value of '%s' must be a string or list" % key)
151
152 # "p" is the parameter scope, "c" is the item to be expanded.
153 # If "c" is a dict, apply function expansion.
154 # If "c" is a list, recursively expand each item and return a new list.
155 # If "c" is a string, apply parameter substitution
156 def expand_item(p, c):
157     if isinstance(c, dict):
158         if "foreach" in c and "command" in c:
159             var, items = var_items(p, c, "foreach")
160             if var is None:
161                 raise EvaluationError("Must specify 'var' in foreach")
162             r = []
163             for i in items:
164                 params = copy.copy(p)
165                 params[var] = i
166                 r.append(expand_item(params, c["command"]))
167             return r
168         elif "list" in c and "index" in c and "command" in c:
169             var, items = var_items(p, c, "list")
170             if var is None:
171                 raise EvaluationError("Must specify 'var' in list")
172             params = copy.copy(p)
173             params[var] = items[int(c["index"])]
174             return expand_item(params, c["command"])
175         elif "regex" in c:
176             pattern = re.compile(c["regex"])
177             if "filter" in c:
178                 _, items = var_items(p, c, "filter")
179                 return [i for i in items if pattern.match(i)]
180             elif "group" in c:
181                 _, items = var_items(p, c, "group")
182                 groups = {}
183                 for i in items:
184                     match = pattern.match(i)
185                     if match:
186                         add_to_group(groups, match)
187                 return [groups[k] for k in groups]
188             elif "extract" in c:
189                 _, items = var_items(p, c, "extract")
190                 r = []
191                 for i in items:
192                     match = pattern.match(i)
193                     if match:
194                         r.append(list(match.groups()))
195                 return r
196         elif "batch" in c and "size" in c:
197             _, items = var_items(p, c, "batch")
198             sz = int(c["size"])
199             r = []
200             for j in xrange(0, len(items), sz):
201                 r.append(items[j:j+sz])
202             return r
203         raise EvaluationError("Missing valid list context function")
204     elif isinstance(c, list):
205         return [expand_item(p, arg) for arg in c]
206     elif isinstance(c, basestring):
207         m = re.match("^\$\((.*)\)$", c)
208         if m and m.group(1) in p:
209             return expand_item(p, p[m.group(1)])
210         else:
211             return subst.do_substitution(p, c)
212
213     raise EvaluationError("expand_item() unexpected parameter type %s" % (type(c))
214
215 # Evaluate in a list context
216 # "p" is the parameter scope, "value" will be evaluated
217 # if "value" is a list after expansion, return that
218 # if "value" is a path to a directory, return a list consisting of each entry in the directory
219 # if "value" is a path to a file, return a list consisting of each line of the file
220 def get_items(p, value):
221     value = expand_item(p, value)
222     if isinstance(value, list):
223         return value
224     elif isinstance(value, basestring):
225         mode = os.stat(value).st_mode
226         prefix = value[len(os.environ['TASK_KEEPMOUNT'])+1:]
227         if mode is not None:
228             if stat.S_ISDIR(mode):
229                 items = [os.path.join(value, l) for l in os.listdir(value)]
230             elif stat.S_ISREG(mode):
231                 with open(value) as f:
232                     items = [line.rstrip("\r\n") for line in f]
233             return items
234     raise EvaluationError("get_items did not yield a list")
235
236 stdoutname = None
237 stdoutfile = None
238 stdinname = None
239 stdinfile = None
240
241 # Construct the cross product of all values of each variable listed in fvars
242 def recursive_foreach(params, fvars):
243     var = fvars[0]
244     fvars = fvars[1:]
245     items = get_items(params, params[var])
246     logger.info("parallelizing on %s with items %s" % (var, items))
247     if items is not None:
248         for i in items:
249             params = copy.copy(params)
250             params[var] = i
251             if len(fvars) > 0:
252                 recursive_foreach(params, fvars)
253             else:
254                 if not args.dry_run:
255                     arvados.api().job_tasks().create(body={
256                         'job_uuid': arvados.current_job()['uuid'],
257                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
258                         'sequence': 1,
259                         'parameters': params
260                     }).execute()
261                 else:
262                     if isinstance(params["command"][0], list):
263                         for c in params["command"]:
264                             logger.info(flatten(expand_item(params, c)))
265                     else:
266                         logger.info(flatten(expand_item(params, params["command"])))
267     else:
268         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
269         sys.exit(1)
270
271 try:
272     if "task.foreach" in jobp:
273         if args.dry_run or arvados.current_task()['sequence'] == 0:
274             # This is the first task to start the other tasks and exit
275             fvars = jobp["task.foreach"]
276             if isinstance(fvars, basestring):
277                 fvars = [fvars]
278             if not isinstance(fvars, list) or len(fvars) == 0:
279                 logger.error("value of task.foreach must be a string or non-empty list")
280                 sys.exit(1)
281             recursive_foreach(jobp, jobp["task.foreach"])
282             if not args.dry_run:
283                 if "task.vwd" in jobp:
284                     # Set output of the first task to the base vwd collection so it
285                     # will be merged with output fragments from the other tasks by
286                     # crunch.
287                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
288                 else:
289                     arvados.current_task().set_output(None)
290             sys.exit(0)
291     else:
292         # This is the only task so taskp/jobp are the same
293         taskp = jobp
294 except Exception as e:
295     logger.exception("caught exception")
296     logger.error("job parameters were:")
297     logger.error(pprint.pformat(jobp))
298     sys.exit(1)
299
300 try:
301     if not args.dry_run:
302         if "task.vwd" in taskp:
303             # Populate output directory with symlinks to files in collection
304             vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
305
306         if "task.cwd" in taskp:
307             os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
308
309     cmd = []
310     if isinstance(taskp["command"][0], list):
311         for c in taskp["command"]:
312             cmd.append(flatten(expand_item(taskp, c)))
313     else:
314         cmd.append(flatten(expand_item(taskp, taskp["command"])))
315
316     if "task.stdin" in taskp:
317         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
318         if not args.dry_run:
319             stdinfile = open(stdinname, "rb")
320
321     if "task.stdout" in taskp:
322         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
323         if not args.dry_run:
324             stdoutfile = open(stdoutname, "wb")
325
326     logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
327
328     if args.dry_run:
329         sys.exit(0)
330 except subst.SubstitutionError as e:
331     logger.error(str(e))
332     logger.error("task parameters were:")
333     logger.error(pprint.pformat(taskp))
334     sys.exit(1)
335 except Exception as e:
336     logger.exception("caught exception")
337     logger.error("task parameters were:")
338     logger.error(pprint.pformat(taskp))
339     sys.exit(1)
340
341 try:
342     subprocesses = []
343     close_streams = []
344     if stdinfile:
345         close_streams.append(stdinfile)
346     next_stdin = stdinfile
347
348     for i in xrange(len(cmd)):
349         if i == len(cmd)-1:
350             # this is the last command in the pipeline, so its stdout should go to stdoutfile
351             next_stdout = stdoutfile
352         else:
353             # this is an intermediate command in the pipeline, so its stdout should go to a pipe
354             next_stdout = subprocess.PIPE
355
356         sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout)
357
358         # Need to close the FDs on our side so that subcommands will get SIGPIPE if the
359         # consuming process ends prematurely.
360         if sp.stdout:
361             close_streams.append(sp.stdout)
362
363         # Send this processes's stdout to to the next process's stdin
364         next_stdin = sp.stdout
365
366         subprocesses.append(sp)
367
368     # File descriptors have been handed off to the subprocesses, so close them here.
369     for s in close_streams:
370         s.close()
371
372     # Set up signal handling
373     sig = SigHandler()
374
375     # Forward terminate signals to the subprocesses.
376     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(subprocesses, signum))
377     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(subprocesses, signum))
378     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
379
380     active = 1
381     pids = set([s.pid for s in subprocesses])
382     rcode = {}
383     while len(pids) > 0:
384         (pid, status) = os.wait()
385         pids.discard(pid)
386         if not taskp.get("task.ignore_rcode"):
387             rcode[pid] = (status >> 8)
388         else:
389             rcode[pid] = 0
390
391     if sig.sig is not None:
392         logger.critical("terminating on signal %s" % sig.sig)
393         sys.exit(2)
394     else:
395         for i in xrange(len(cmd)):
396             r = rcode[subprocesses[i].pid]
397             logger.info("%s completed with exit code %i (%s)" % (cmd[i][0], r, "success" if r == 0 else "failed"))
398
399 except Exception as e:
400     logger.exception("caught exception")
401
402 # restore default signal handlers.
403 signal.signal(signal.SIGINT, signal.SIG_DFL)
404 signal.signal(signal.SIGTERM, signal.SIG_DFL)
405 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
406
407 for l in links:
408     os.unlink(l)
409
410 logger.info("the following output files will be saved to keep:")
411
412 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
413
414 logger.info("start writing output to keep")
415
416 if "task.vwd" in taskp:
417     if "task.foreach" in jobp:
418         # This is a subtask, so don't merge with the original collection, that will happen at the end
419         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
420     else:
421         # Just a single task, so do merge with the original collection
422         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
423 else:
424     outcollection = robust_put.upload(outdir, logger)
425
426 # Success if no non-zero return codes
427 success = not any([status != 0 for status in rcode.values()])
428
429 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
430                                      body={
431                                          'output': outcollection,
432                                          'success': success,
433                                          'progress':1.0
434                                      }).execute()
435
436 sys.exit(rcode)