6087: Use app-configured key by default for blob signing and verification.
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28 import argparse
29 import json
30 import tempfile
31 import errno
32
33 parser = argparse.ArgumentParser()
34 parser.add_argument('--dry-run', action='store_true')
35 parser.add_argument('--script-parameters', type=str, default="{}")
36 args = parser.parse_args()
37
38 os.umask(0077)
39
40 if not args.dry_run:
41     api = arvados.api('v1')
42     t = arvados.current_task().tmpdir
43     os.chdir(arvados.current_task().tmpdir)
44     os.mkdir("tmpdir")
45     os.mkdir("output")
46
47     os.chdir("output")
48
49     outdir = os.getcwd()
50
51     taskp = None
52     jobp = arvados.current_job()['script_parameters']
53     if len(arvados.current_task()['parameters']) > 0:
54         taskp = arvados.current_task()['parameters']
55 else:
56     outdir = "/tmp"
57     jobp = json.loads(args.script_parameters)
58     os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
59     os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
60     os.environ['CRUNCH_SRC'] = '/tmp/crunch-src'
61     if 'TASK_KEEPMOUNT' not in os.environ:
62         os.environ['TASK_KEEPMOUNT'] = '/keep'
63
64 def sub_tmpdir(v):
65     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
66
67 def sub_outdir(v):
68     return outdir
69
70 def sub_cores(v):
71      return str(multiprocessing.cpu_count())
72
73 def sub_jobid(v):
74      return os.environ['JOB_UUID']
75
76 def sub_taskid(v):
77      return os.environ['TASK_UUID']
78
79 def sub_jobsrc(v):
80      return os.environ['CRUNCH_SRC']
81
82 subst.default_subs["task.tmpdir"] = sub_tmpdir
83 subst.default_subs["task.outdir"] = sub_outdir
84 subst.default_subs["job.srcdir"] = sub_jobsrc
85 subst.default_subs["node.cores"] = sub_cores
86 subst.default_subs["job.uuid"] = sub_jobid
87 subst.default_subs["task.uuid"] = sub_taskid
88
89 class SigHandler(object):
90     def __init__(self):
91         self.sig = None
92
93     def send_signal(self, subprocesses, signum):
94         for sp in subprocesses:
95             sp.send_signal(signum)
96         self.sig = signum
97
98 # http://rightfootin.blogspot.com/2006/09/more-on-python-flatten.html
99 def flatten(l, ltypes=(list, tuple)):
100     ltype = type(l)
101     l = list(l)
102     i = 0
103     while i < len(l):
104         while isinstance(l[i], ltypes):
105             if not l[i]:
106                 l.pop(i)
107                 i -= 1
108                 break
109             else:
110                 l[i:i + 1] = l[i]
111         i += 1
112     return ltype(l)
113
114 def add_to_group(gr, match):
115     m = match.groups()
116     if m not in gr:
117         gr[m] = []
118     gr[m].append(match.group(0))
119
120 class EvaluationError(Exception):
121     pass
122
123 # Return the name of variable ('var') that will take on each value in 'items'
124 # when performing an inner substitution
125 def var_items(p, c, key):
126     if key not in c:
127         raise EvaluationError("'%s' was expected in 'p' but is missing" % key)
128
129     if "var" in c:
130         if not isinstance(c["var"], basestring):
131             raise EvaluationError("Value of 'var' must be a string")
132         # Var specifies the variable name for inner parameter substitution
133         return (c["var"], get_items(p, c[key]))
134     else:
135         # The component function ('key') value is a list, so return the list
136         # directly with no parameter selected.
137         if isinstance(c[key], list):
138             return (None, get_items(p, c[key]))
139         elif isinstance(c[key], basestring):
140             # check if c[key] is a string that looks like a parameter
141             m = re.match("^\$\((.*)\)$", c[key])
142             if m and m.group(1) in p:
143                 return (m.group(1), get_items(p, c[key]))
144             else:
145                 # backwards compatible, foreach specifies bare parameter name to use
146                 return (c[key], get_items(p, p[c[key]]))
147         else:
148             raise EvaluationError("Value of '%s' must be a string or list" % key)
149
150 # "p" is the parameter scope, "c" is the item to be expanded.
151 # If "c" is a dict, apply function expansion.
152 # If "c" is a list, recursively expand each item and return a new list.
153 # If "c" is a string, apply parameter substitution
154 def expand_item(p, c):
155     if isinstance(c, dict):
156         if "foreach" in c and "command" in c:
157             # Expand a command template for each item in the specified user
158             # parameter
159             var, items = var_items(p, c, "foreach")
160             if var is None:
161                 raise EvaluationError("Must specify 'var' in foreach")
162             r = []
163             for i in items:
164                 params = copy.copy(p)
165                 params[var] = i
166                 r.append(expand_item(params, c["command"]))
167             return r
168         elif "list" in c and "index" in c and "command" in c:
169             # extract a single item from a list
170             var, items = var_items(p, c, "list")
171             if var is None:
172                 raise EvaluationError("Must specify 'var' in list")
173             params = copy.copy(p)
174             params[var] = items[int(c["index"])]
175             return expand_item(params, c["command"])
176         elif "regex" in c:
177             pattern = re.compile(c["regex"])
178             if "filter" in c:
179                 # filter list so that it only includes items that match a
180                 # regular expression
181                 _, items = var_items(p, c, "filter")
182                 return [i for i in items if pattern.match(i)]
183             elif "group" in c:
184                 # generate a list of lists, where items are grouped on common
185                 # subexpression match
186                 _, items = var_items(p, c, "group")
187                 groups = {}
188                 for i in items:
189                     match = pattern.match(i)
190                     if match:
191                         add_to_group(groups, match)
192                 return [groups[k] for k in groups]
193             elif "extract" in c:
194                 # generate a list of lists, where items are split by
195                 # subexpression match
196                 _, items = var_items(p, c, "extract")
197                 r = []
198                 for i in items:
199                     match = pattern.match(i)
200                     if match:
201                         r.append(list(match.groups()))
202                 return r
203         elif "batch" in c and "size" in c:
204             # generate a list of lists, where items are split into a batch size
205             _, items = var_items(p, c, "batch")
206             sz = int(c["size"])
207             r = []
208             for j in xrange(0, len(items), sz):
209                 r.append(items[j:j+sz])
210             return r
211         raise EvaluationError("Missing valid list context function")
212     elif isinstance(c, list):
213         return [expand_item(p, arg) for arg in c]
214     elif isinstance(c, basestring):
215         m = re.match("^\$\((.*)\)$", c)
216         if m and m.group(1) in p:
217             return expand_item(p, p[m.group(1)])
218         else:
219             return subst.do_substitution(p, c)
220     else:
221         raise EvaluationError("expand_item() unexpected parameter type %s" % type(c))
222
223 # Evaluate in a list context
224 # "p" is the parameter scope, "value" will be evaluated
225 # if "value" is a list after expansion, return that
226 # if "value" is a path to a directory, return a list consisting of each entry in the directory
227 # if "value" is a path to a file, return a list consisting of each line of the file
228 def get_items(p, value):
229     value = expand_item(p, value)
230     if isinstance(value, list):
231         return value
232     elif isinstance(value, basestring):
233         mode = os.stat(value).st_mode
234         prefix = value[len(os.environ['TASK_KEEPMOUNT'])+1:]
235         if mode is not None:
236             if stat.S_ISDIR(mode):
237                 items = [os.path.join(value, l) for l in os.listdir(value)]
238             elif stat.S_ISREG(mode):
239                 with open(value) as f:
240                     items = [line.rstrip("\r\n") for line in f]
241             return items
242     raise EvaluationError("get_items did not yield a list")
243
244 stdoutname = None
245 stdoutfile = None
246 stdinname = None
247 stdinfile = None
248
249 # Construct the cross product of all values of each variable listed in fvars
250 def recursive_foreach(params, fvars):
251     var = fvars[0]
252     fvars = fvars[1:]
253     items = get_items(params, params[var])
254     logger.info("parallelizing on %s with items %s" % (var, items))
255     if items is not None:
256         for i in items:
257             params = copy.copy(params)
258             params[var] = i
259             if len(fvars) > 0:
260                 recursive_foreach(params, fvars)
261             else:
262                 if not args.dry_run:
263                     arvados.api().job_tasks().create(body={
264                         'job_uuid': arvados.current_job()['uuid'],
265                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
266                         'sequence': 1,
267                         'parameters': params
268                     }).execute()
269                 else:
270                     if isinstance(params["command"][0], list):
271                         for c in params["command"]:
272                             logger.info(flatten(expand_item(params, c)))
273                     else:
274                         logger.info(flatten(expand_item(params, params["command"])))
275     else:
276         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
277         sys.exit(1)
278
279 try:
280     if "task.foreach" in jobp:
281         if args.dry_run or arvados.current_task()['sequence'] == 0:
282             # This is the first task to start the other tasks and exit
283             fvars = jobp["task.foreach"]
284             if isinstance(fvars, basestring):
285                 fvars = [fvars]
286             if not isinstance(fvars, list) or len(fvars) == 0:
287                 logger.error("value of task.foreach must be a string or non-empty list")
288                 sys.exit(1)
289             recursive_foreach(jobp, jobp["task.foreach"])
290             if not args.dry_run:
291                 if "task.vwd" in jobp:
292                     # Set output of the first task to the base vwd collection so it
293                     # will be merged with output fragments from the other tasks by
294                     # crunch.
295                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
296                 else:
297                     arvados.current_task().set_output(None)
298             sys.exit(0)
299     else:
300         # This is the only task so taskp/jobp are the same
301         taskp = jobp
302 except Exception as e:
303     logger.exception("caught exception")
304     logger.error("job parameters were:")
305     logger.error(pprint.pformat(jobp))
306     sys.exit(1)
307
308 try:
309     if not args.dry_run:
310         if "task.vwd" in taskp:
311             # Populate output directory with symlinks to files in collection
312             vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
313
314         if "task.cwd" in taskp:
315             os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
316
317     cmd = []
318     if isinstance(taskp["command"][0], list):
319         for c in taskp["command"]:
320             cmd.append(flatten(expand_item(taskp, c)))
321     else:
322         cmd.append(flatten(expand_item(taskp, taskp["command"])))
323
324     if "task.stdin" in taskp:
325         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
326         if not args.dry_run:
327             stdinfile = open(stdinname, "rb")
328
329     if "task.stdout" in taskp:
330         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
331         if not args.dry_run:
332             stdoutfile = open(stdoutname, "wb")
333
334     logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
335
336     if args.dry_run:
337         sys.exit(0)
338 except subst.SubstitutionError as e:
339     logger.error(str(e))
340     logger.error("task parameters were:")
341     logger.error(pprint.pformat(taskp))
342     sys.exit(1)
343 except Exception as e:
344     logger.exception("caught exception")
345     logger.error("task parameters were:")
346     logger.error(pprint.pformat(taskp))
347     sys.exit(1)
348
349 # rcode holds the return codes produced by each subprocess
350 rcode = {}
351 try:
352     subprocesses = []
353     close_streams = []
354     if stdinfile:
355         close_streams.append(stdinfile)
356     next_stdin = stdinfile
357
358     for i in xrange(len(cmd)):
359         if i == len(cmd)-1:
360             # this is the last command in the pipeline, so its stdout should go to stdoutfile
361             next_stdout = stdoutfile
362         else:
363             # this is an intermediate command in the pipeline, so its stdout should go to a pipe
364             next_stdout = subprocess.PIPE
365
366         sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout)
367
368         # Need to close the FDs on our side so that subcommands will get SIGPIPE if the
369         # consuming process ends prematurely.
370         if sp.stdout:
371             close_streams.append(sp.stdout)
372
373         # Send this processes's stdout to to the next process's stdin
374         next_stdin = sp.stdout
375
376         subprocesses.append(sp)
377
378     # File descriptors have been handed off to the subprocesses, so close them here.
379     for s in close_streams:
380         s.close()
381
382     # Set up signal handling
383     sig = SigHandler()
384
385     # Forward terminate signals to the subprocesses.
386     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(subprocesses, signum))
387     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(subprocesses, signum))
388     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
389
390     active = 1
391     pids = set([s.pid for s in subprocesses])
392     while len(pids) > 0:
393         (pid, status) = os.wait()
394         pids.discard(pid)
395         if not taskp.get("task.ignore_rcode"):
396             rcode[pid] = (status >> 8)
397         else:
398             rcode[pid] = 0
399
400     if sig.sig is not None:
401         logger.critical("terminating on signal %s" % sig.sig)
402         sys.exit(2)
403     else:
404         for i in xrange(len(cmd)):
405             r = rcode[subprocesses[i].pid]
406             logger.info("%s completed with exit code %i (%s)" % (cmd[i][0], r, "success" if r == 0 else "failed"))
407
408 except Exception as e:
409     logger.exception("caught exception")
410
411 # restore default signal handlers.
412 signal.signal(signal.SIGINT, signal.SIG_DFL)
413 signal.signal(signal.SIGTERM, signal.SIG_DFL)
414 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
415
416 logger.info("the following output files will be saved to keep:")
417
418 subprocess.call(["find", "-L", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr, cwd=outdir)
419
420 logger.info("start writing output to keep")
421
422 if "task.vwd" in taskp and "task.foreach" in jobp:
423     for root, dirs, files in os.walk(outdir):
424         for f in files:
425             s = os.lstat(os.path.join(root, f))
426             if stat.S_ISLNK(s.st_mode):
427                 os.unlink(os.path.join(root, f))
428
429 (outcollection, checkin_error) = vwd.checkin(outdir)
430
431 # Success if we ran any subprocess, and they all exited 0.
432 success = rcode and all(status == 0 for status in rcode.itervalues()) and not checkin_error
433
434 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
435                                      body={
436                                          'output': outcollection.manifest_text(),
437                                          'success': success,
438                                          'progress':1.0
439                                      }).execute()
440
441 sys.exit(0 if success else 1)