Merge branch '5562-pycurl-upload' closes #5562
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28 import argparse
29 import json
30 import tempfile
31 import errno
32
33 parser = argparse.ArgumentParser()
34 parser.add_argument('--dry-run', action='store_true')
35 parser.add_argument('--script-parameters', type=str, default="{}")
36 args = parser.parse_args()
37
38 os.umask(0077)
39
40 if not args.dry_run:
41     api = arvados.api('v1')
42     t = arvados.current_task().tmpdir
43     os.chdir(arvados.current_task().tmpdir)
44     os.mkdir("tmpdir")
45     os.mkdir("output")
46
47     os.chdir("output")
48
49     outdir = os.getcwd()
50
51     taskp = None
52     jobp = arvados.current_job()['script_parameters']
53     if len(arvados.current_task()['parameters']) > 0:
54         taskp = arvados.current_task()['parameters']
55 else:
56     outdir = "/tmp"
57     jobp = json.loads(args.script_parameters)
58     os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
59     os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
60     os.environ['CRUNCH_SRC'] = '/tmp/crunch-src'
61     if 'TASK_KEEPMOUNT' not in os.environ:
62         os.environ['TASK_KEEPMOUNT'] = '/keep'
63
64 links = []
65
66 def sub_tmpdir(v):
67     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
68
69 def sub_outdir(v):
70     return outdir
71
72 def sub_cores(v):
73      return str(multiprocessing.cpu_count())
74
75 def sub_jobid(v):
76      return os.environ['JOB_UUID']
77
78 def sub_taskid(v):
79      return os.environ['TASK_UUID']
80
81 def sub_jobsrc(v):
82      return os.environ['CRUNCH_SRC']
83
84 subst.default_subs["task.tmpdir"] = sub_tmpdir
85 subst.default_subs["task.outdir"] = sub_outdir
86 subst.default_subs["job.srcdir"] = sub_jobsrc
87 subst.default_subs["node.cores"] = sub_cores
88 subst.default_subs["job.uuid"] = sub_jobid
89 subst.default_subs["task.uuid"] = sub_taskid
90
91 class SigHandler(object):
92     def __init__(self):
93         self.sig = None
94
95     def send_signal(self, subprocesses, signum):
96         for sp in subprocesses:
97             sp.send_signal(signum)
98         self.sig = signum
99
100 # http://rightfootin.blogspot.com/2006/09/more-on-python-flatten.html
101 def flatten(l, ltypes=(list, tuple)):
102     ltype = type(l)
103     l = list(l)
104     i = 0
105     while i < len(l):
106         while isinstance(l[i], ltypes):
107             if not l[i]:
108                 l.pop(i)
109                 i -= 1
110                 break
111             else:
112                 l[i:i + 1] = l[i]
113         i += 1
114     return ltype(l)
115
116 def add_to_group(gr, match):
117     m = match.groups()
118     if m not in gr:
119         gr[m] = []
120     gr[m].append(match.group(0))
121
122 class EvaluationError(Exception):
123     pass
124
125 # Return the name of variable ('var') that will take on each value in 'items'
126 # when performing an inner substitution
127 def var_items(p, c, key):
128     if key not in c:
129         raise EvaluationError("'%s' was expected in 'p' but is missing" % key)
130
131     if "var" in c:
132         if not isinstance(c["var"], basestring):
133             raise EvaluationError("Value of 'var' must be a string")
134         # Var specifies the variable name for inner parameter substitution
135         return (c["var"], get_items(p, c[key]))
136     else:
137         # The component function ('key') value is a list, so return the list
138         # directly with no parameter selected.
139         if isinstance(c[key], list):
140             return (None, get_items(p, c[key]))
141         elif isinstance(c[key], basestring):
142             # check if c[key] is a string that looks like a parameter
143             m = re.match("^\$\((.*)\)$", c[key])
144             if m and m.group(1) in p:
145                 return (m.group(1), get_items(p, c[key]))
146             else:
147                 # backwards compatible, foreach specifies bare parameter name to use
148                 return (c[key], get_items(p, p[c[key]]))
149         else:
150             raise EvaluationError("Value of '%s' must be a string or list" % key)
151
152 # "p" is the parameter scope, "c" is the item to be expanded.
153 # If "c" is a dict, apply function expansion.
154 # If "c" is a list, recursively expand each item and return a new list.
155 # If "c" is a string, apply parameter substitution
156 def expand_item(p, c):
157     if isinstance(c, dict):
158         if "foreach" in c and "command" in c:
159             # Expand a command template for each item in the specified user
160             # parameter
161             var, items = var_items(p, c, "foreach")
162             if var is None:
163                 raise EvaluationError("Must specify 'var' in foreach")
164             r = []
165             for i in items:
166                 params = copy.copy(p)
167                 params[var] = i
168                 r.append(expand_item(params, c["command"]))
169             return r
170         elif "list" in c and "index" in c and "command" in c:
171             # extract a single item from a list
172             var, items = var_items(p, c, "list")
173             if var is None:
174                 raise EvaluationError("Must specify 'var' in list")
175             params = copy.copy(p)
176             params[var] = items[int(c["index"])]
177             return expand_item(params, c["command"])
178         elif "regex" in c:
179             pattern = re.compile(c["regex"])
180             if "filter" in c:
181                 # filter list so that it only includes items that match a
182                 # regular expression
183                 _, items = var_items(p, c, "filter")
184                 return [i for i in items if pattern.match(i)]
185             elif "group" in c:
186                 # generate a list of lists, where items are grouped on common
187                 # subexpression match
188                 _, items = var_items(p, c, "group")
189                 groups = {}
190                 for i in items:
191                     match = pattern.match(i)
192                     if match:
193                         add_to_group(groups, match)
194                 return [groups[k] for k in groups]
195             elif "extract" in c:
196                 # generate a list of lists, where items are split by
197                 # subexpression match
198                 _, items = var_items(p, c, "extract")
199                 r = []
200                 for i in items:
201                     match = pattern.match(i)
202                     if match:
203                         r.append(list(match.groups()))
204                 return r
205         elif "batch" in c and "size" in c:
206             # generate a list of lists, where items are split into a batch size
207             _, items = var_items(p, c, "batch")
208             sz = int(c["size"])
209             r = []
210             for j in xrange(0, len(items), sz):
211                 r.append(items[j:j+sz])
212             return r
213         raise EvaluationError("Missing valid list context function")
214     elif isinstance(c, list):
215         return [expand_item(p, arg) for arg in c]
216     elif isinstance(c, basestring):
217         m = re.match("^\$\((.*)\)$", c)
218         if m and m.group(1) in p:
219             return expand_item(p, p[m.group(1)])
220         else:
221             return subst.do_substitution(p, c)
222     else:
223         raise EvaluationError("expand_item() unexpected parameter type %s" % type(c))
224
225 # Evaluate in a list context
226 # "p" is the parameter scope, "value" will be evaluated
227 # if "value" is a list after expansion, return that
228 # if "value" is a path to a directory, return a list consisting of each entry in the directory
229 # if "value" is a path to a file, return a list consisting of each line of the file
230 def get_items(p, value):
231     value = expand_item(p, value)
232     if isinstance(value, list):
233         return value
234     elif isinstance(value, basestring):
235         mode = os.stat(value).st_mode
236         prefix = value[len(os.environ['TASK_KEEPMOUNT'])+1:]
237         if mode is not None:
238             if stat.S_ISDIR(mode):
239                 items = [os.path.join(value, l) for l in os.listdir(value)]
240             elif stat.S_ISREG(mode):
241                 with open(value) as f:
242                     items = [line.rstrip("\r\n") for line in f]
243             return items
244     raise EvaluationError("get_items did not yield a list")
245
246 stdoutname = None
247 stdoutfile = None
248 stdinname = None
249 stdinfile = None
250
251 # Construct the cross product of all values of each variable listed in fvars
252 def recursive_foreach(params, fvars):
253     var = fvars[0]
254     fvars = fvars[1:]
255     items = get_items(params, params[var])
256     logger.info("parallelizing on %s with items %s" % (var, items))
257     if items is not None:
258         for i in items:
259             params = copy.copy(params)
260             params[var] = i
261             if len(fvars) > 0:
262                 recursive_foreach(params, fvars)
263             else:
264                 if not args.dry_run:
265                     arvados.api().job_tasks().create(body={
266                         'job_uuid': arvados.current_job()['uuid'],
267                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
268                         'sequence': 1,
269                         'parameters': params
270                     }).execute()
271                 else:
272                     if isinstance(params["command"][0], list):
273                         for c in params["command"]:
274                             logger.info(flatten(expand_item(params, c)))
275                     else:
276                         logger.info(flatten(expand_item(params, params["command"])))
277     else:
278         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
279         sys.exit(1)
280
281 try:
282     if "task.foreach" in jobp:
283         if args.dry_run or arvados.current_task()['sequence'] == 0:
284             # This is the first task to start the other tasks and exit
285             fvars = jobp["task.foreach"]
286             if isinstance(fvars, basestring):
287                 fvars = [fvars]
288             if not isinstance(fvars, list) or len(fvars) == 0:
289                 logger.error("value of task.foreach must be a string or non-empty list")
290                 sys.exit(1)
291             recursive_foreach(jobp, jobp["task.foreach"])
292             if not args.dry_run:
293                 if "task.vwd" in jobp:
294                     # Set output of the first task to the base vwd collection so it
295                     # will be merged with output fragments from the other tasks by
296                     # crunch.
297                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
298                 else:
299                     arvados.current_task().set_output(None)
300             sys.exit(0)
301     else:
302         # This is the only task so taskp/jobp are the same
303         taskp = jobp
304 except Exception as e:
305     logger.exception("caught exception")
306     logger.error("job parameters were:")
307     logger.error(pprint.pformat(jobp))
308     sys.exit(1)
309
310 try:
311     if not args.dry_run:
312         if "task.vwd" in taskp:
313             # Populate output directory with symlinks to files in collection
314             vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
315
316         if "task.cwd" in taskp:
317             os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
318
319     cmd = []
320     if isinstance(taskp["command"][0], list):
321         for c in taskp["command"]:
322             cmd.append(flatten(expand_item(taskp, c)))
323     else:
324         cmd.append(flatten(expand_item(taskp, taskp["command"])))
325
326     if "task.stdin" in taskp:
327         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
328         if not args.dry_run:
329             stdinfile = open(stdinname, "rb")
330
331     if "task.stdout" in taskp:
332         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
333         if not args.dry_run:
334             stdoutfile = open(stdoutname, "wb")
335
336     logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
337
338     if args.dry_run:
339         sys.exit(0)
340 except subst.SubstitutionError as e:
341     logger.error(str(e))
342     logger.error("task parameters were:")
343     logger.error(pprint.pformat(taskp))
344     sys.exit(1)
345 except Exception as e:
346     logger.exception("caught exception")
347     logger.error("task parameters were:")
348     logger.error(pprint.pformat(taskp))
349     sys.exit(1)
350
351 # rcode holds the return codes produced by each subprocess
352 rcode = {}
353 try:
354     subprocesses = []
355     close_streams = []
356     if stdinfile:
357         close_streams.append(stdinfile)
358     next_stdin = stdinfile
359
360     for i in xrange(len(cmd)):
361         if i == len(cmd)-1:
362             # this is the last command in the pipeline, so its stdout should go to stdoutfile
363             next_stdout = stdoutfile
364         else:
365             # this is an intermediate command in the pipeline, so its stdout should go to a pipe
366             next_stdout = subprocess.PIPE
367
368         sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout)
369
370         # Need to close the FDs on our side so that subcommands will get SIGPIPE if the
371         # consuming process ends prematurely.
372         if sp.stdout:
373             close_streams.append(sp.stdout)
374
375         # Send this processes's stdout to to the next process's stdin
376         next_stdin = sp.stdout
377
378         subprocesses.append(sp)
379
380     # File descriptors have been handed off to the subprocesses, so close them here.
381     for s in close_streams:
382         s.close()
383
384     # Set up signal handling
385     sig = SigHandler()
386
387     # Forward terminate signals to the subprocesses.
388     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(subprocesses, signum))
389     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(subprocesses, signum))
390     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
391
392     active = 1
393     pids = set([s.pid for s in subprocesses])
394     while len(pids) > 0:
395         (pid, status) = os.wait()
396         pids.discard(pid)
397         if not taskp.get("task.ignore_rcode"):
398             rcode[pid] = (status >> 8)
399         else:
400             rcode[pid] = 0
401
402     if sig.sig is not None:
403         logger.critical("terminating on signal %s" % sig.sig)
404         sys.exit(2)
405     else:
406         for i in xrange(len(cmd)):
407             r = rcode[subprocesses[i].pid]
408             logger.info("%s completed with exit code %i (%s)" % (cmd[i][0], r, "success" if r == 0 else "failed"))
409
410 except Exception as e:
411     logger.exception("caught exception")
412
413 # restore default signal handlers.
414 signal.signal(signal.SIGINT, signal.SIG_DFL)
415 signal.signal(signal.SIGTERM, signal.SIG_DFL)
416 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
417
418 for l in links:
419     os.unlink(l)
420
421 logger.info("the following output files will be saved to keep:")
422
423 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr, cwd=outdir)
424
425 logger.info("start writing output to keep")
426
427 if "task.vwd" in taskp:
428     if "task.foreach" in jobp:
429         # This is a subtask, so don't merge with the original collection, that will happen at the end
430         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
431     else:
432         # Just a single task, so do merge with the original collection
433         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
434 else:
435     outcollection = robust_put.upload(outdir, logger)
436
437 # Success if we ran any subprocess, and they all exited 0.
438 success = rcode and all(status == 0 for status in rcode.itervalues())
439
440 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
441                                      body={
442                                          'output': outcollection,
443                                          'success': success,
444                                          'progress':1.0
445                                      }).execute()
446
447 sys.exit(0 if success else 1)