3609: Inherit --retries from _util. Be more specific about error being caught. ...
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28 import argparse
29 import json
30 import tempfile
31 import errno
32
33 parser = argparse.ArgumentParser()
34 parser.add_argument('--dry-run', action='store_true')
35 parser.add_argument('--script-parameters', type=str, default="{}")
36 args = parser.parse_args()
37
38 os.umask(0077)
39
40 if not args.dry_run:
41     api = arvados.api('v1')
42     t = arvados.current_task().tmpdir
43     os.chdir(arvados.current_task().tmpdir)
44     os.mkdir("tmpdir")
45     os.mkdir("output")
46
47     os.chdir("output")
48
49     outdir = os.getcwd()
50
51     taskp = None
52     jobp = arvados.current_job()['script_parameters']
53     if len(arvados.current_task()['parameters']) > 0:
54         taskp = arvados.current_task()['parameters']
55 else:
56     outdir = "/tmp"
57     jobp = json.loads(args.script_parameters)
58     os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
59     os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
60     os.environ['CRUNCH_SRC'] = '/tmp/crunche-src'
61     if 'TASK_KEEPMOUNT' not in os.environ:
62         os.environ['TASK_KEEPMOUNT'] = '/keep'
63
64 links = []
65
66 def sub_tmpdir(v):
67     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
68
69 def sub_outdir(v):
70     return outdir
71
72 def sub_cores(v):
73      return str(multiprocessing.cpu_count())
74
75 def sub_jobid(v):
76      return os.environ['JOB_UUID']
77
78 def sub_taskid(v):
79      return os.environ['TASK_UUID']
80
81 def sub_jobsrc(v):
82      return os.environ['CRUNCH_SRC']
83
84 subst.default_subs["task.tmpdir"] = sub_tmpdir
85 subst.default_subs["task.outdir"] = sub_outdir
86 subst.default_subs["job.srcdir"] = sub_jobsrc
87 subst.default_subs["node.cores"] = sub_cores
88 subst.default_subs["job.uuid"] = sub_jobid
89 subst.default_subs["task.uuid"] = sub_taskid
90
91 class SigHandler(object):
92     def __init__(self):
93         self.sig = None
94
95     def send_signal(self, subprocesses, signum):
96         for sp in subprocesses:
97             sp.send_signal(signum)
98         self.sig = signum
99
100 # http://rightfootin.blogspot.com/2006/09/more-on-python-flatten.html
101 def flatten(l, ltypes=(list, tuple)):
102     ltype = type(l)
103     l = list(l)
104     i = 0
105     while i < len(l):
106         while isinstance(l[i], ltypes):
107             if not l[i]:
108                 l.pop(i)
109                 i -= 1
110                 break
111             else:
112                 l[i:i + 1] = l[i]
113         i += 1
114     return ltype(l)
115
116 def add_to_group(gr, match):
117     m = match.groups()
118     if m not in gr:
119         gr[m] = []
120     gr[m].append(match.group(0))
121
122 # Return the name of variable ('var') that will take on each value in 'items'
123 # when performing an inner substitution
124 def var_items(p, c, key):
125     if "var" in c:
126         # Var specifies the variable name for inner parameter substitution
127         return (c["var"], get_items(p, c[key]))
128     else:
129         # The component function ('key') value is a list, so return the list
130         # directly with no parameter substition.
131         if isinstance(c[key], list):
132             return (None, get_items(p, c[key]))
133
134         # check if c[key] is a string that looks like a parameter
135         m = re.match("^\$\((.*)\)$", c[key])
136         if m and m.group(1) in p:
137             return (m.group(1), get_items(p, c[key]))
138         else:
139             # backwards compatible, foreach specifies bare parameter name to use
140             return (c[key], get_items(p, p[c[key]]))
141
142 # "p" is the parameter scope, "c" is the item to be expanded.
143 # If "c" is a dict, apply function expansion.
144 # If "c" is a list, recursively expand each item and return a new list.
145 # If "c" is a string, apply parameter substitution
146 def expand_item(p, c):
147     if isinstance(c, dict):
148         if "foreach" in c and "command" in c:
149             var, items = var_items(p, c, "foreach")
150             r = []
151             for i in items:
152                 params = copy.copy(p)
153                 params[var] = i
154                 r.append(expand_item(params, c["command"]))
155             return r
156         if "list" in c and "index" in c and "command" in c:
157             var, items = var_items(p, c, "list")
158             params = copy.copy(p)
159             params[var] = items[int(c["index"])]
160             return expand_item(params, c["command"])
161         if "regex" in c:
162             pattern = re.compile(c["regex"])
163             if "filter" in c:
164                 var, items = var_items(p, c, "filter")
165                 return [i for i in items if pattern.match(i)]
166             elif "group" in c:
167                 var, items = var_items(p, c, "group")
168                 groups = {}
169                 for i in items:
170                     match = pattern.match(i)
171                     if match:
172                         add_to_group(groups, match)
173                 return [groups[k] for k in groups]
174             elif "extract" in c:
175                 var, items = var_items(p, c, "extract")
176                 r = []
177                 for i in items:
178                     match = pattern.match(i)
179                     if match:
180                         r.append(list(match.groups()))
181                 return r
182         if "batch" in c and "size" in c:
183             var, items = var_items(p, c, "batch")
184             sz = int(c["size"])
185             r = []
186             for j in xrange(0, len(items), sz):
187                 r.append(items[j:j+sz])
188             return r
189     elif isinstance(c, list):
190         return [expand_item(p, arg) for arg in c]
191     elif isinstance(c, basestring):
192         m = re.match("^\$\((.*)\)$", c)
193         if m and m.group(1) in p:
194             return expand_item(p, p[m.group(1)])
195         else:
196             return subst.do_substitution(p, c)
197
198     raise Exception("expand_item() unexpected parameter type %s" % (type(c))
199
200 # Evaluate in a list context
201 # "p" is the parameter scope, "value" will be evaluated
202 # if "value" is a list after expansion, return that
203 # if "value" is a path to a directory, return a list consisting of each entry in the directory
204 # if "value" is a path to a file, return a list consisting of each line of the file
205 def get_items(p, value):
206     value = expand_item(p, value)
207     if isinstance(value, list):
208         return value
209     elif isinstance(value, basestring):
210         mode = os.stat(value).st_mode
211         prefix = value[len(os.environ['TASK_KEEPMOUNT'])+1:]
212         if mode is not None:
213             if stat.S_ISDIR(mode):
214                 items = [os.path.join(value, l) for l in os.listdir(value)]
215             elif stat.S_ISREG(mode):
216                 with open(value) as f:
217                     items = [line.rstrip("\r\n") for line in f]
218             return items
219     raise Exception("get_items did not yield a list")
220
221 stdoutname = None
222 stdoutfile = None
223 stdinname = None
224 stdinfile = None
225
226 # Construct the cross product of all values of each variable listed in fvars
227 def recursive_foreach(params, fvars):
228     var = fvars[0]
229     fvars = fvars[1:]
230     items = get_items(params, params[var])
231     logger.info("parallelizing on %s with items %s" % (var, items))
232     if items is not None:
233         for i in items:
234             params = copy.copy(params)
235             params[var] = i
236             if len(fvars) > 0:
237                 recursive_foreach(params, fvars)
238             else:
239                 if not args.dry_run:
240                     arvados.api().job_tasks().create(body={
241                         'job_uuid': arvados.current_job()['uuid'],
242                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
243                         'sequence': 1,
244                         'parameters': params
245                     }).execute()
246                 else:
247                     if isinstance(params["command"][0], list):
248                         for c in params["command"]:
249                             logger.info(flatten(expand_item(params, c)))
250                     else:
251                         logger.info(flatten(expand_item(params, params["command"])))
252     else:
253         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
254         sys.exit(1)
255
256 try:
257     if "task.foreach" in jobp:
258         if args.dry_run or arvados.current_task()['sequence'] == 0:
259             # This is the first task to start the other tasks and exit
260             fvars = jobp["task.foreach"]
261             if isinstance(fvars, basestring):
262                 fvars = [fvars]
263             if not isinstance(fvars, list) or len(fvars) == 0:
264                 logger.error("value of task.foreach must be a string or non-empty list")
265                 sys.exit(1)
266             recursive_foreach(jobp, jobp["task.foreach"])
267             if not args.dry_run:
268                 if "task.vwd" in jobp:
269                     # Set output of the first task to the base vwd collection so it
270                     # will be merged with output fragments from the other tasks by
271                     # crunch.
272                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
273                 else:
274                     arvados.current_task().set_output(None)
275             sys.exit(0)
276     else:
277         # This is the only task so taskp/jobp are the same
278         taskp = jobp
279 except Exception as e:
280     logger.exception("caught exception")
281     logger.error("job parameters were:")
282     logger.error(pprint.pformat(jobp))
283     sys.exit(1)
284
285 try:
286     if not args.dry_run:
287         if "task.vwd" in taskp:
288             # Populate output directory with symlinks to files in collection
289             vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
290
291         if "task.cwd" in taskp:
292             os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
293
294     cmd = []
295     if isinstance(taskp["command"][0], list):
296         for c in taskp["command"]:
297             cmd.append(flatten(expand_item(taskp, c)))
298     else:
299         cmd.append(flatten(expand_item(taskp, taskp["command"])))
300
301     if "task.stdin" in taskp:
302         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
303         if not args.dry_run:
304             stdinfile = open(stdinname, "rb")
305
306     if "task.stdout" in taskp:
307         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
308         if not args.dry_run:
309             stdoutfile = open(stdoutname, "wb")
310
311     logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
312
313     if args.dry_run:
314         sys.exit(0)
315 except subst.SubstitutionError as e:
316     logger.error(str(e))
317     logger.error("task parameters were:")
318     logger.error(pprint.pformat(taskp))
319     sys.exit(1)
320 except Exception as e:
321     logger.exception("caught exception")
322     logger.error("task parameters were:")
323     logger.error(pprint.pformat(taskp))
324     sys.exit(1)
325
326 try:
327     subprocesses = []
328     close_streams = []
329     if stdinfile:
330         close_streams.append(stdinfile)
331     next_stdin = stdinfile
332
333     for i in xrange(len(cmd)):
334         if i == len(cmd)-1:
335             # this is the last command in the pipeline, so its stdout should go to stdoutfile
336             next_stdout = stdoutfile
337         else:
338             # this is an intermediate command in the pipeline, so its stdout should go to a pipe
339             next_stdout = subprocess.PIPE
340
341         sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout)
342
343         # Need to close the FDs on our side so that subcommands will get SIGPIPE if the
344         # consuming process ends prematurely.
345         if sp.stdout:
346             close_streams.append(sp.stdout)
347
348         # Send this processes's stdout to to the next process's stdin
349         next_stdin = sp.stdout
350
351         subprocesses.append(sp)
352
353     # File descriptors have been handed off to the subprocesses, so close them here.
354     for s in close_streams:
355         s.close()
356
357     # Set up signal handling
358     sig = SigHandler()
359
360     # Forward terminate signals to the subprocesses.
361     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(subprocesses, signum))
362     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(subprocesses, signum))
363     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
364
365     active = 1
366     pids = set([s.pid for s in subprocesses])
367     rcode = {}
368     while len(pids) > 0:
369         (pid, status) = os.wait()
370         pids.discard(pid)
371         if not taskp.get("task.ignore_rcode"):
372             rcode[pid] = (status >> 8)
373         else:
374             rcode[pid] = 0
375
376     if sig.sig is not None:
377         logger.critical("terminating on signal %s" % sig.sig)
378         sys.exit(2)
379     else:
380         for i in xrange(len(cmd)):
381             r = rcode[subprocesses[i].pid]
382             logger.info("%s completed with exit code %i (%s)" % (cmd[i][0], r, "success" if r == 0 else "failed"))
383
384 except Exception as e:
385     logger.exception("caught exception")
386
387 # restore default signal handlers.
388 signal.signal(signal.SIGINT, signal.SIG_DFL)
389 signal.signal(signal.SIGTERM, signal.SIG_DFL)
390 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
391
392 for l in links:
393     os.unlink(l)
394
395 logger.info("the following output files will be saved to keep:")
396
397 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
398
399 logger.info("start writing output to keep")
400
401 if "task.vwd" in taskp:
402     if "task.foreach" in jobp:
403         # This is a subtask, so don't merge with the original collection, that will happen at the end
404         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
405     else:
406         # Just a single task, so do merge with the original collection
407         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
408 else:
409     outcollection = robust_put.upload(outdir, logger)
410
411 # Success if no non-zero return codes
412 success = not any([status != 0 for status in rcode.values()])
413
414 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
415                                      body={
416                                          'output': outcollection,
417                                          'success': success,
418                                          'progress':1.0
419                                      }).execute()
420
421 sys.exit(rcode)