8567: Use Docker image repo+tag name instead of PDH so that API server can select...
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28 import argparse
29 import json
30 import tempfile
31 import errno
32
33 parser = argparse.ArgumentParser()
34 parser.add_argument('--dry-run', action='store_true')
35 parser.add_argument('--script-parameters', type=str, default="{}")
36 args = parser.parse_args()
37
38 os.umask(0077)
39
40 if not args.dry_run:
41     api = arvados.api('v1')
42     t = arvados.current_task().tmpdir
43     os.chdir(arvados.current_task().tmpdir)
44     os.mkdir("tmpdir")
45     os.mkdir("output")
46
47     os.chdir("output")
48
49     outdir = os.getcwd()
50
51     taskp = None
52     jobp = arvados.current_job()['script_parameters']
53     if len(arvados.current_task()['parameters']) > 0:
54         taskp = arvados.current_task()['parameters']
55 else:
56     outdir = "/tmp"
57     jobp = json.loads(args.script_parameters)
58     os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
59     os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
60     os.environ['CRUNCH_SRC'] = '/tmp/crunch-src'
61     if 'TASK_KEEPMOUNT' not in os.environ:
62         os.environ['TASK_KEEPMOUNT'] = '/keep'
63
64 def sub_tmpdir(v):
65     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
66
67 def sub_outdir(v):
68     return outdir
69
70 def sub_cores(v):
71      return str(multiprocessing.cpu_count())
72
73 def sub_jobid(v):
74      return os.environ['JOB_UUID']
75
76 def sub_taskid(v):
77      return os.environ['TASK_UUID']
78
79 def sub_jobsrc(v):
80      return os.environ['CRUNCH_SRC']
81
82 subst.default_subs["task.tmpdir"] = sub_tmpdir
83 subst.default_subs["task.outdir"] = sub_outdir
84 subst.default_subs["job.srcdir"] = sub_jobsrc
85 subst.default_subs["node.cores"] = sub_cores
86 subst.default_subs["job.uuid"] = sub_jobid
87 subst.default_subs["task.uuid"] = sub_taskid
88
89 class SigHandler(object):
90     def __init__(self):
91         self.sig = None
92
93     def send_signal(self, subprocesses, signum):
94         for sp in subprocesses:
95             sp.send_signal(signum)
96         self.sig = signum
97
98 # http://rightfootin.blogspot.com/2006/09/more-on-python-flatten.html
99 def flatten(l, ltypes=(list, tuple)):
100     ltype = type(l)
101     l = list(l)
102     i = 0
103     while i < len(l):
104         while isinstance(l[i], ltypes):
105             if not l[i]:
106                 l.pop(i)
107                 i -= 1
108                 break
109             else:
110                 l[i:i + 1] = l[i]
111         i += 1
112     return ltype(l)
113
114 def add_to_group(gr, match):
115     m = match.groups()
116     if m not in gr:
117         gr[m] = []
118     gr[m].append(match.group(0))
119
120 class EvaluationError(Exception):
121     pass
122
123 # Return the name of variable ('var') that will take on each value in 'items'
124 # when performing an inner substitution
125 def var_items(p, c, key):
126     if key not in c:
127         raise EvaluationError("'%s' was expected in 'p' but is missing" % key)
128
129     if "var" in c:
130         if not isinstance(c["var"], basestring):
131             raise EvaluationError("Value of 'var' must be a string")
132         # Var specifies the variable name for inner parameter substitution
133         return (c["var"], get_items(p, c[key]))
134     else:
135         # The component function ('key') value is a list, so return the list
136         # directly with no parameter selected.
137         if isinstance(c[key], list):
138             return (None, get_items(p, c[key]))
139         elif isinstance(c[key], basestring):
140             # check if c[key] is a string that looks like a parameter
141             m = re.match("^\$\((.*)\)$", c[key])
142             if m and m.group(1) in p:
143                 return (m.group(1), get_items(p, c[key]))
144             else:
145                 # backwards compatible, foreach specifies bare parameter name to use
146                 return (c[key], get_items(p, p[c[key]]))
147         else:
148             raise EvaluationError("Value of '%s' must be a string or list" % key)
149
150 # "p" is the parameter scope, "c" is the item to be expanded.
151 # If "c" is a dict, apply function expansion.
152 # If "c" is a list, recursively expand each item and return a new list.
153 # If "c" is a string, apply parameter substitution
154 def expand_item(p, c):
155     if isinstance(c, dict):
156         if "foreach" in c and "command" in c:
157             # Expand a command template for each item in the specified user
158             # parameter
159             var, items = var_items(p, c, "foreach")
160             if var is None:
161                 raise EvaluationError("Must specify 'var' in foreach")
162             r = []
163             for i in items:
164                 params = copy.copy(p)
165                 params[var] = i
166                 r.append(expand_item(params, c["command"]))
167             return r
168         elif "list" in c and "index" in c and "command" in c:
169             # extract a single item from a list
170             var, items = var_items(p, c, "list")
171             if var is None:
172                 raise EvaluationError("Must specify 'var' in list")
173             params = copy.copy(p)
174             params[var] = items[int(c["index"])]
175             return expand_item(params, c["command"])
176         elif "regex" in c:
177             pattern = re.compile(c["regex"])
178             if "filter" in c:
179                 # filter list so that it only includes items that match a
180                 # regular expression
181                 _, items = var_items(p, c, "filter")
182                 return [i for i in items if pattern.match(i)]
183             elif "group" in c:
184                 # generate a list of lists, where items are grouped on common
185                 # subexpression match
186                 _, items = var_items(p, c, "group")
187                 groups = {}
188                 for i in items:
189                     match = pattern.match(i)
190                     if match:
191                         add_to_group(groups, match)
192                 return [groups[k] for k in groups]
193             elif "extract" in c:
194                 # generate a list of lists, where items are split by
195                 # subexpression match
196                 _, items = var_items(p, c, "extract")
197                 r = []
198                 for i in items:
199                     match = pattern.match(i)
200                     if match:
201                         r.append(list(match.groups()))
202                 return r
203         elif "batch" in c and "size" in c:
204             # generate a list of lists, where items are split into a batch size
205             _, items = var_items(p, c, "batch")
206             sz = int(c["size"])
207             r = []
208             for j in xrange(0, len(items), sz):
209                 r.append(items[j:j+sz])
210             return r
211         raise EvaluationError("Missing valid list context function")
212     elif isinstance(c, list):
213         return [expand_item(p, arg) for arg in c]
214     elif isinstance(c, basestring):
215         m = re.match("^\$\((.*)\)$", c)
216         if m and m.group(1) in p:
217             return expand_item(p, p[m.group(1)])
218         else:
219             return subst.do_substitution(p, c)
220     else:
221         raise EvaluationError("expand_item() unexpected parameter type %s" % type(c))
222
223 # Evaluate in a list context
224 # "p" is the parameter scope, "value" will be evaluated
225 # if "value" is a list after expansion, return that
226 # if "value" is a path to a directory, return a list consisting of each entry in the directory
227 # if "value" is a path to a file, return a list consisting of each line of the file
228 def get_items(p, value):
229     value = expand_item(p, value)
230     if isinstance(value, list):
231         return value
232     elif isinstance(value, basestring):
233         mode = os.stat(value).st_mode
234         prefix = value[len(os.environ['TASK_KEEPMOUNT'])+1:]
235         if mode is not None:
236             if stat.S_ISDIR(mode):
237                 items = [os.path.join(value, l) for l in os.listdir(value)]
238             elif stat.S_ISREG(mode):
239                 with open(value) as f:
240                     items = [line.rstrip("\r\n") for line in f]
241             return items
242     raise EvaluationError("get_items did not yield a list")
243
244 stdoutname = None
245 stdoutfile = None
246 stdinname = None
247 stdinfile = None
248
249 # Construct the cross product of all values of each variable listed in fvars
250 def recursive_foreach(params, fvars):
251     var = fvars[0]
252     fvars = fvars[1:]
253     items = get_items(params, params[var])
254     logger.info("parallelizing on %s with items %s" % (var, items))
255     if items is not None:
256         for i in items:
257             params = copy.copy(params)
258             params[var] = i
259             if len(fvars) > 0:
260                 recursive_foreach(params, fvars)
261             else:
262                 if not args.dry_run:
263                     arvados.api().job_tasks().create(body={
264                         'job_uuid': arvados.current_job()['uuid'],
265                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
266                         'sequence': 1,
267                         'parameters': params
268                     }).execute()
269                 else:
270                     if isinstance(params["command"][0], list):
271                         for c in params["command"]:
272                             logger.info(flatten(expand_item(params, c)))
273                     else:
274                         logger.info(flatten(expand_item(params, params["command"])))
275     else:
276         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
277         sys.exit(1)
278
279 try:
280     if "task.foreach" in jobp:
281         if args.dry_run or arvados.current_task()['sequence'] == 0:
282             # This is the first task to start the other tasks and exit
283             fvars = jobp["task.foreach"]
284             if isinstance(fvars, basestring):
285                 fvars = [fvars]
286             if not isinstance(fvars, list) or len(fvars) == 0:
287                 logger.error("value of task.foreach must be a string or non-empty list")
288                 sys.exit(1)
289             recursive_foreach(jobp, jobp["task.foreach"])
290             if not args.dry_run:
291                 if "task.vwd" in jobp:
292                     # Set output of the first task to the base vwd collection so it
293                     # will be merged with output fragments from the other tasks by
294                     # crunch.
295                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
296                 else:
297                     arvados.current_task().set_output(None)
298             sys.exit(0)
299     else:
300         # This is the only task so taskp/jobp are the same
301         taskp = jobp
302 except Exception as e:
303     logger.exception("caught exception")
304     logger.error("job parameters were:")
305     logger.error(pprint.pformat(jobp))
306     sys.exit(1)
307
308 try:
309     if not args.dry_run:
310         if "task.vwd" in taskp:
311             # Populate output directory with symlinks to files in collection
312             vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
313
314         if "task.cwd" in taskp:
315             os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
316
317     cmd = []
318     if isinstance(taskp["command"][0], list):
319         for c in taskp["command"]:
320             cmd.append(flatten(expand_item(taskp, c)))
321     else:
322         cmd.append(flatten(expand_item(taskp, taskp["command"])))
323
324     if "task.stdin" in taskp:
325         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
326         if not args.dry_run:
327             stdinfile = open(stdinname, "rb")
328
329     if "task.stdout" in taskp:
330         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
331         if not args.dry_run:
332             stdoutfile = open(stdoutname, "wb")
333
334     if "task.env" in taskp:
335         env = copy.copy(os.environ)
336         for k,v in taskp["task.env"].items():
337             env[k] = subst.do_substitution(taskp, v)
338     else:
339         env = None
340
341     logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
342
343     if args.dry_run:
344         sys.exit(0)
345 except subst.SubstitutionError as e:
346     logger.error(str(e))
347     logger.error("task parameters were:")
348     logger.error(pprint.pformat(taskp))
349     sys.exit(1)
350 except Exception as e:
351     logger.exception("caught exception")
352     logger.error("task parameters were:")
353     logger.error(pprint.pformat(taskp))
354     sys.exit(1)
355
356 # rcode holds the return codes produced by each subprocess
357 rcode = {}
358 try:
359     subprocesses = []
360     close_streams = []
361     if stdinfile:
362         close_streams.append(stdinfile)
363     next_stdin = stdinfile
364
365     for i in xrange(len(cmd)):
366         if i == len(cmd)-1:
367             # this is the last command in the pipeline, so its stdout should go to stdoutfile
368             next_stdout = stdoutfile
369         else:
370             # this is an intermediate command in the pipeline, so its stdout should go to a pipe
371             next_stdout = subprocess.PIPE
372
373         sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout, env=env)
374
375         # Need to close the FDs on our side so that subcommands will get SIGPIPE if the
376         # consuming process ends prematurely.
377         if sp.stdout:
378             close_streams.append(sp.stdout)
379
380         # Send this processes's stdout to to the next process's stdin
381         next_stdin = sp.stdout
382
383         subprocesses.append(sp)
384
385     # File descriptors have been handed off to the subprocesses, so close them here.
386     for s in close_streams:
387         s.close()
388
389     # Set up signal handling
390     sig = SigHandler()
391
392     # Forward terminate signals to the subprocesses.
393     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(subprocesses, signum))
394     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(subprocesses, signum))
395     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
396
397     active = 1
398     pids = set([s.pid for s in subprocesses])
399     while len(pids) > 0:
400         try:
401             (pid, status) = os.wait()
402         except OSError as e:
403             if e.errno == errno.EINTR:
404                 pass
405             else:
406                 raise
407         else:
408             pids.discard(pid)
409             if not taskp.get("task.ignore_rcode"):
410                 rcode[pid] = (status >> 8)
411             else:
412                 rcode[pid] = 0
413
414     if sig.sig is not None:
415         logger.critical("terminating on signal %s" % sig.sig)
416         sys.exit(2)
417     else:
418         for i in xrange(len(cmd)):
419             r = rcode[subprocesses[i].pid]
420             logger.info("%s completed with exit code %i (%s)" % (cmd[i][0], r, "success" if r == 0 else "failed"))
421
422 except Exception as e:
423     logger.exception("caught exception")
424
425 # restore default signal handlers.
426 signal.signal(signal.SIGINT, signal.SIG_DFL)
427 signal.signal(signal.SIGTERM, signal.SIG_DFL)
428 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
429
430 logger.info("the following output files will be saved to keep:")
431
432 subprocess.call(["find", "-L", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr, cwd=outdir)
433
434 logger.info("start writing output to keep")
435
436 if "task.vwd" in taskp and "task.foreach" in jobp:
437     for root, dirs, files in os.walk(outdir):
438         for f in files:
439             s = os.lstat(os.path.join(root, f))
440             if stat.S_ISLNK(s.st_mode):
441                 os.unlink(os.path.join(root, f))
442
443 (outcollection, checkin_error) = vwd.checkin(outdir)
444
445 # Success if we ran any subprocess, and they all exited 0.
446 success = rcode and all(status == 0 for status in rcode.itervalues()) and not checkin_error
447
448 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
449                                      body={
450                                          'output': outcollection.manifest_text(),
451                                          'success': success,
452                                          'progress':1.0
453                                      }).execute()
454
455 sys.exit(0 if success else 1)