Merge branch 'master' into 4383-easy-install
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28 import argparse
29 import json
30 import tempfile
31 import errno
32
33 parser = argparse.ArgumentParser()
34 parser.add_argument('--dry-run', action='store_true')
35 parser.add_argument('--script-parameters', type=str, default="{}")
36 args = parser.parse_args()
37
38 os.umask(0077)
39
40 if not args.dry_run:
41     api = arvados.api('v1')
42     t = arvados.current_task().tmpdir
43     os.chdir(arvados.current_task().tmpdir)
44     os.mkdir("tmpdir")
45     os.mkdir("output")
46
47     os.chdir("output")
48
49     outdir = os.getcwd()
50
51     taskp = None
52     jobp = arvados.current_job()['script_parameters']
53     if len(arvados.current_task()['parameters']) > 0:
54         taskp = arvados.current_task()['parameters']
55 else:
56     outdir = "/tmp"
57     jobp = json.loads(args.script_parameters)
58     os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
59     os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
60     os.environ['CRUNCH_SRC'] = '/tmp/crunche-src'
61     if 'TASK_KEEPMOUNT' not in os.environ:
62         os.environ['TASK_KEEPMOUNT'] = '/keep'
63
64 links = []
65
66 def sub_tmpdir(v):
67     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
68
69 def sub_outdir(v):
70     return outdir
71
72 def sub_cores(v):
73      return str(multiprocessing.cpu_count())
74
75 def sub_jobid(v):
76      return os.environ['JOB_UUID']
77
78 def sub_taskid(v):
79      return os.environ['TASK_UUID']
80
81 def sub_jobsrc(v):
82      return os.environ['CRUNCH_SRC']
83
84 subst.default_subs["task.tmpdir"] = sub_tmpdir
85 subst.default_subs["task.outdir"] = sub_outdir
86 subst.default_subs["job.srcdir"] = sub_jobsrc
87 subst.default_subs["node.cores"] = sub_cores
88 subst.default_subs["job.uuid"] = sub_jobid
89 subst.default_subs["task.uuid"] = sub_taskid
90
91 class SigHandler(object):
92     def __init__(self):
93         self.sig = None
94
95     def send_signal(self, subprocesses, signum):
96         for sp in subprocesses:
97             sp.send_signal(signum)
98         self.sig = signum
99
100 # http://rightfootin.blogspot.com/2006/09/more-on-python-flatten.html
101 def flatten(l, ltypes=(list, tuple)):
102     ltype = type(l)
103     l = list(l)
104     i = 0
105     while i < len(l):
106         while isinstance(l[i], ltypes):
107             if not l[i]:
108                 l.pop(i)
109                 i -= 1
110                 break
111             else:
112                 l[i:i + 1] = l[i]
113         i += 1
114     return ltype(l)
115
116 def add_to_group(gr, match):
117     m = match.groups()
118     if m not in gr:
119         gr[m] = []
120     gr[m].append(match.group(0))
121
122 class EvaluationError(Exception):
123     pass
124
125 # Return the name of variable ('var') that will take on each value in 'items'
126 # when performing an inner substitution
127 def var_items(p, c, key):
128     if key not in c:
129         raise EvaluationError("'%s' was expected in 'p' but is missing" % key)
130
131     if "var" in c:
132         if not isinstance(c["var"], basestring):
133             raise EvaluationError("Value of 'var' must be a string")
134         # Var specifies the variable name for inner parameter substitution
135         return (c["var"], get_items(p, c[key]))
136     else:
137         # The component function ('key') value is a list, so return the list
138         # directly with no parameter selected.
139         if isinstance(c[key], list):
140             return (None, get_items(p, c[key]))
141         elif isinstance(c[key], basestring):
142             # check if c[key] is a string that looks like a parameter
143             m = re.match("^\$\((.*)\)$", c[key])
144             if m and m.group(1) in p:
145                 return (m.group(1), get_items(p, c[key]))
146             else:
147                 # backwards compatible, foreach specifies bare parameter name to use
148                 return (c[key], get_items(p, p[c[key]]))
149         else:
150             raise EvaluationError("Value of '%s' must be a string or list" % key)
151
152 # "p" is the parameter scope, "c" is the item to be expanded.
153 # If "c" is a dict, apply function expansion.
154 # If "c" is a list, recursively expand each item and return a new list.
155 # If "c" is a string, apply parameter substitution
156 def expand_item(p, c):
157     if isinstance(c, dict):
158         if "foreach" in c and "command" in c:
159             # Expand a command template for each item in the specified user
160             # parameter
161             var, items = var_items(p, c, "foreach")
162             if var is None:
163                 raise EvaluationError("Must specify 'var' in foreach")
164             r = []
165             for i in items:
166                 params = copy.copy(p)
167                 params[var] = i
168                 r.append(expand_item(params, c["command"]))
169             return r
170         elif "list" in c and "index" in c and "command" in c:
171             # extract a single item from a list
172             var, items = var_items(p, c, "list")
173             if var is None:
174                 raise EvaluationError("Must specify 'var' in list")
175             params = copy.copy(p)
176             params[var] = items[int(c["index"])]
177             return expand_item(params, c["command"])
178         elif "regex" in c:
179             pattern = re.compile(c["regex"])
180             if "filter" in c:
181                 # filter list so that it only includes items that match a
182                 # regular expression
183                 _, items = var_items(p, c, "filter")
184                 return [i for i in items if pattern.match(i)]
185             elif "group" in c:
186                 # generate a list of lists, where items are grouped on common
187                 # subexpression match
188                 _, items = var_items(p, c, "group")
189                 groups = {}
190                 for i in items:
191                     match = pattern.match(i)
192                     if match:
193                         add_to_group(groups, match)
194                 return [groups[k] for k in groups]
195             elif "extract" in c:
196                 # generate a list of lists, where items are split by
197                 # subexpression match
198                 _, items = var_items(p, c, "extract")
199                 r = []
200                 for i in items:
201                     match = pattern.match(i)
202                     if match:
203                         r.append(list(match.groups()))
204                 return r
205         elif "batch" in c and "size" in c:
206             # generate a list of lists, where items are split into a batch size
207             _, items = var_items(p, c, "batch")
208             sz = int(c["size"])
209             r = []
210             for j in xrange(0, len(items), sz):
211                 r.append(items[j:j+sz])
212             return r
213         raise EvaluationError("Missing valid list context function")
214     elif isinstance(c, list):
215         return [expand_item(p, arg) for arg in c]
216     elif isinstance(c, basestring):
217         m = re.match("^\$\((.*)\)$", c)
218         if m and m.group(1) in p:
219             return expand_item(p, p[m.group(1)])
220         else:
221             return subst.do_substitution(p, c)
222     else:
223         raise EvaluationError("expand_item() unexpected parameter type %s" % type(c))
224
225 # Evaluate in a list context
226 # "p" is the parameter scope, "value" will be evaluated
227 # if "value" is a list after expansion, return that
228 # if "value" is a path to a directory, return a list consisting of each entry in the directory
229 # if "value" is a path to a file, return a list consisting of each line of the file
230 def get_items(p, value):
231     value = expand_item(p, value)
232     if isinstance(value, list):
233         return value
234     elif isinstance(value, basestring):
235         mode = os.stat(value).st_mode
236         prefix = value[len(os.environ['TASK_KEEPMOUNT'])+1:]
237         if mode is not None:
238             if stat.S_ISDIR(mode):
239                 items = [os.path.join(value, l) for l in os.listdir(value)]
240             elif stat.S_ISREG(mode):
241                 with open(value) as f:
242                     items = [line.rstrip("\r\n") for line in f]
243             return items
244     raise EvaluationError("get_items did not yield a list")
245
246 stdoutname = None
247 stdoutfile = None
248 stdinname = None
249 stdinfile = None
250
251 # Construct the cross product of all values of each variable listed in fvars
252 def recursive_foreach(params, fvars):
253     var = fvars[0]
254     fvars = fvars[1:]
255     items = get_items(params, params[var])
256     logger.info("parallelizing on %s with items %s" % (var, items))
257     if items is not None:
258         for i in items:
259             params = copy.copy(params)
260             params[var] = i
261             if len(fvars) > 0:
262                 recursive_foreach(params, fvars)
263             else:
264                 if not args.dry_run:
265                     arvados.api().job_tasks().create(body={
266                         'job_uuid': arvados.current_job()['uuid'],
267                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
268                         'sequence': 1,
269                         'parameters': params
270                     }).execute()
271                 else:
272                     if isinstance(params["command"][0], list):
273                         for c in params["command"]:
274                             logger.info(flatten(expand_item(params, c)))
275                     else:
276                         logger.info(flatten(expand_item(params, params["command"])))
277     else:
278         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
279         sys.exit(1)
280
281 try:
282     if "task.foreach" in jobp:
283         if args.dry_run or arvados.current_task()['sequence'] == 0:
284             # This is the first task to start the other tasks and exit
285             fvars = jobp["task.foreach"]
286             if isinstance(fvars, basestring):
287                 fvars = [fvars]
288             if not isinstance(fvars, list) or len(fvars) == 0:
289                 logger.error("value of task.foreach must be a string or non-empty list")
290                 sys.exit(1)
291             recursive_foreach(jobp, jobp["task.foreach"])
292             if not args.dry_run:
293                 if "task.vwd" in jobp:
294                     # Set output of the first task to the base vwd collection so it
295                     # will be merged with output fragments from the other tasks by
296                     # crunch.
297                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
298                 else:
299                     arvados.current_task().set_output(None)
300             sys.exit(0)
301     else:
302         # This is the only task so taskp/jobp are the same
303         taskp = jobp
304 except Exception as e:
305     logger.exception("caught exception")
306     logger.error("job parameters were:")
307     logger.error(pprint.pformat(jobp))
308     sys.exit(1)
309
310 try:
311     if not args.dry_run:
312         if "task.vwd" in taskp:
313             # Populate output directory with symlinks to files in collection
314             vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
315
316         if "task.cwd" in taskp:
317             os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
318
319     cmd = []
320     if isinstance(taskp["command"][0], list):
321         for c in taskp["command"]:
322             cmd.append(flatten(expand_item(taskp, c)))
323     else:
324         cmd.append(flatten(expand_item(taskp, taskp["command"])))
325
326     if "task.stdin" in taskp:
327         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
328         if not args.dry_run:
329             stdinfile = open(stdinname, "rb")
330
331     if "task.stdout" in taskp:
332         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
333         if not args.dry_run:
334             stdoutfile = open(stdoutname, "wb")
335
336     logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
337
338     if args.dry_run:
339         sys.exit(0)
340 except subst.SubstitutionError as e:
341     logger.error(str(e))
342     logger.error("task parameters were:")
343     logger.error(pprint.pformat(taskp))
344     sys.exit(1)
345 except Exception as e:
346     logger.exception("caught exception")
347     logger.error("task parameters were:")
348     logger.error(pprint.pformat(taskp))
349     sys.exit(1)
350
351 try:
352     subprocesses = []
353     close_streams = []
354     if stdinfile:
355         close_streams.append(stdinfile)
356     next_stdin = stdinfile
357
358     for i in xrange(len(cmd)):
359         if i == len(cmd)-1:
360             # this is the last command in the pipeline, so its stdout should go to stdoutfile
361             next_stdout = stdoutfile
362         else:
363             # this is an intermediate command in the pipeline, so its stdout should go to a pipe
364             next_stdout = subprocess.PIPE
365
366         sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout)
367
368         # Need to close the FDs on our side so that subcommands will get SIGPIPE if the
369         # consuming process ends prematurely.
370         if sp.stdout:
371             close_streams.append(sp.stdout)
372
373         # Send this processes's stdout to to the next process's stdin
374         next_stdin = sp.stdout
375
376         subprocesses.append(sp)
377
378     # File descriptors have been handed off to the subprocesses, so close them here.
379     for s in close_streams:
380         s.close()
381
382     # Set up signal handling
383     sig = SigHandler()
384
385     # Forward terminate signals to the subprocesses.
386     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(subprocesses, signum))
387     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(subprocesses, signum))
388     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
389
390     active = 1
391     pids = set([s.pid for s in subprocesses])
392     rcode = {}
393     while len(pids) > 0:
394         (pid, status) = os.wait()
395         pids.discard(pid)
396         if not taskp.get("task.ignore_rcode"):
397             rcode[pid] = (status >> 8)
398         else:
399             rcode[pid] = 0
400
401     if sig.sig is not None:
402         logger.critical("terminating on signal %s" % sig.sig)
403         sys.exit(2)
404     else:
405         for i in xrange(len(cmd)):
406             r = rcode[subprocesses[i].pid]
407             logger.info("%s completed with exit code %i (%s)" % (cmd[i][0], r, "success" if r == 0 else "failed"))
408
409 except Exception as e:
410     logger.exception("caught exception")
411
412 # restore default signal handlers.
413 signal.signal(signal.SIGINT, signal.SIG_DFL)
414 signal.signal(signal.SIGTERM, signal.SIG_DFL)
415 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
416
417 for l in links:
418     os.unlink(l)
419
420 logger.info("the following output files will be saved to keep:")
421
422 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
423
424 logger.info("start writing output to keep")
425
426 if "task.vwd" in taskp:
427     if "task.foreach" in jobp:
428         # This is a subtask, so don't merge with the original collection, that will happen at the end
429         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
430     else:
431         # Just a single task, so do merge with the original collection
432         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
433 else:
434     outcollection = robust_put.upload(outdir, logger)
435
436 # Success if no non-zero return codes
437 success = not any([status != 0 for status in rcode.values()])
438
439 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
440                                      body={
441                                          'output': outcollection,
442                                          'success': success,
443                                          'progress':1.0
444                                      }).execute()
445
446 sys.exit(rcode)