3609: Further documentation improvements, --local now runs pipeline runner instead...
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28 import argparse
29 import json
30 import tempfile
31 import errno
32
33 parser = argparse.ArgumentParser()
34 parser.add_argument('--dry-run', action='store_true')
35 parser.add_argument('--script-parameters', type=str, default="{}")
36 args = parser.parse_args()
37
38 os.umask(0077)
39
40 if not args.dry_run:
41     api = arvados.api('v1')
42     t = arvados.current_task().tmpdir
43     os.chdir(arvados.current_task().tmpdir)
44     os.mkdir("tmpdir")
45     os.mkdir("output")
46
47     os.chdir("output")
48
49     outdir = os.getcwd()
50
51     taskp = None
52     jobp = arvados.current_job()['script_parameters']
53     if len(arvados.current_task()['parameters']) > 0:
54         taskp = arvados.current_task()['parameters']
55 else:
56     outdir = "/tmp"
57     jobp = json.loads(args.script_parameters)
58     os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
59     os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
60     os.environ['CRUNCH_SRC'] = '/tmp/crunche-src'
61     if 'TASK_KEEPMOUNT' not in os.environ:
62         os.environ['TASK_KEEPMOUNT'] = '/keep'
63
64 links = []
65
66 def sub_tmpdir(v):
67     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
68
69 def sub_outdir(v):
70     return outdir
71
72 def sub_cores(v):
73      return str(multiprocessing.cpu_count())
74
75 def sub_jobid(v):
76      return os.environ['JOB_UUID']
77
78 def sub_taskid(v):
79      return os.environ['TASK_UUID']
80
81 def sub_jobsrc(v):
82      return os.environ['CRUNCH_SRC']
83
84 subst.default_subs["task.tmpdir"] = sub_tmpdir
85 subst.default_subs["task.outdir"] = sub_outdir
86 subst.default_subs["job.srcdir"] = sub_jobsrc
87 subst.default_subs["node.cores"] = sub_cores
88 subst.default_subs["job.uuid"] = sub_jobid
89 subst.default_subs["task.uuid"] = sub_taskid
90
91 class SigHandler(object):
92     def __init__(self):
93         self.sig = None
94
95     def send_signal(self, subprocesses, signum):
96         for sp in subprocesses:
97             sp.send_signal(signum)
98         self.sig = signum
99
100 # http://rightfootin.blogspot.com/2006/09/more-on-python-flatten.html
101 def flatten(l, ltypes=(list, tuple)):
102     ltype = type(l)
103     l = list(l)
104     i = 0
105     while i < len(l):
106         while isinstance(l[i], ltypes):
107             if not l[i]:
108                 l.pop(i)
109                 i -= 1
110                 break
111             else:
112                 l[i:i + 1] = l[i]
113         i += 1
114     return ltype(l)
115
116 def add_to_group(gr, match):
117     m = match.groups()
118     if m not in gr:
119         gr[m] = []
120     gr[m].append(match.group(0))
121
122 def var_items(p, c, key):
123     if "var" in c:
124         # Var specifies
125         return (c["var"], get_items(p, c[key]))
126     else:
127         if isinstance(c[key], list):
128             return (None, get_items(p, c[key]))
129         m = re.match("^\$\((.*)\)$", c[key])
130         if m and m.group(1) in p:
131             return (m.group(1), get_items(p, c[key]))
132         else:
133             # backwards compatible, foreach specifies bare parameter name to use
134             return (c[key], get_items(p, p[c[key]]))
135
136 def expand_item(p, c):
137     if isinstance(c, dict):
138         if "foreach" in c and "command" in c:
139             var, items = var_items(p, c, "foreach")
140             r = []
141             for i in items:
142                 params = copy.copy(p)
143                 params[var] = i
144                 r.append(expand_item(params, c["command"]))
145             return r
146         if "list" in c and "index" in c and "command" in c:
147             var, items = var_items(p, c, "list")
148             params = copy.copy(p)
149             params[var] = items[int(c["index"])]
150             return expand_item(params, c["command"])
151         if "regex" in c:
152             pattern = re.compile(c["regex"])
153             if "filter" in c:
154                 var, items = var_items(p, c, "filter")
155                 return [i for i in items if pattern.match(i)]
156             elif "group" in c:
157                 var, items = var_items(p, c, "group")
158                 groups = {}
159                 for i in items:
160                     match = pattern.match(i)
161                     if match:
162                         add_to_group(groups, match)
163                 return [groups[k] for k in groups]
164             elif "extract" in c:
165                 var, items = var_items(p, c, "extract")
166                 r = []
167                 for i in items:
168                     match = pattern.match(i)
169                     if match:
170                         r.append(list(match.groups()))
171                 return r
172         if "batch" in c and "size" in c:
173             var, items = var_items(p, c, "batch")
174             sz = int(c["size"])
175             r = []
176             for j in xrange(0, len(items), sz):
177                 r.append(items[j:j+sz])
178             return r
179     elif isinstance(c, list):
180         return [expand_item(p, arg) for arg in c]
181     elif isinstance(c, basestring):
182         m = re.match("^\$\((.*)\)$", c)
183         if m and m.group(1) in p:
184             return expand_item(p, p[m.group(1)])
185         else:
186             return subst.do_substitution(p, c)
187
188     return []
189
190 def get_items(p, value):
191     value = expand_item(p, value)
192     if isinstance(value, list):
193         return value
194     elif isinstance(value, basestring):
195         mode = os.stat(value).st_mode
196         prefix = value[len(os.environ['TASK_KEEPMOUNT'])+1:]
197         if mode is not None:
198             if stat.S_ISDIR(mode):
199                 items = [os.path.join(value, l) for l in os.listdir(value)]
200             elif stat.S_ISREG(mode):
201                 with open(value) as f:
202                     items = [line.rstrip("\r\n") for line in f]
203             return items
204     raise Exception("get_items did not yield a list")
205
206 stdoutname = None
207 stdoutfile = None
208 stdinname = None
209 stdinfile = None
210
211 def recursive_foreach(params, fvars):
212     var = fvars[0]
213     fvars = fvars[1:]
214     items = get_items(params, params[var])
215     logger.info("parallelizing on %s with items %s" % (var, items))
216     if items is not None:
217         for i in items:
218             params = copy.copy(params)
219             params[var] = i
220             if len(fvars) > 0:
221                 recursive_foreach(params, fvars)
222             else:
223                 if not args.dry_run:
224                     arvados.api().job_tasks().create(body={
225                         'job_uuid': arvados.current_job()['uuid'],
226                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
227                         'sequence': 1,
228                         'parameters': params
229                     }).execute()
230                 else:
231                     if isinstance(params["command"][0], list):
232                         for c in params["command"]:
233                             logger.info(flatten(expand_item(params, c)))
234                     else:
235                         logger.info(flatten(expand_item(params, params["command"])))
236     else:
237         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
238         sys.exit(1)
239
240 try:
241     if "task.foreach" in jobp:
242         if args.dry_run or arvados.current_task()['sequence'] == 0:
243             # This is the first task to start the other tasks and exit
244             fvars = jobp["task.foreach"]
245             if isinstance(fvars, basestring):
246                 fvars = [fvars]
247             if not isinstance(fvars, list) or len(fvars) == 0:
248                 logger.error("value of task.foreach must be a string or non-empty list")
249                 sys.exit(1)
250             recursive_foreach(jobp, jobp["task.foreach"])
251             if not args.dry_run:
252                 if "task.vwd" in jobp:
253                     # Set output of the first task to the base vwd collection so it
254                     # will be merged with output fragments from the other tasks by
255                     # crunch.
256                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
257                 else:
258                     arvados.current_task().set_output(None)
259             sys.exit(0)
260     else:
261         # This is the only task so taskp/jobp are the same
262         taskp = jobp
263 except Exception as e:
264     logger.exception("caught exception")
265     logger.error("job parameters were:")
266     logger.error(pprint.pformat(jobp))
267     sys.exit(1)
268
269 try:
270     if not args.dry_run:
271         if "task.vwd" in taskp:
272             # Populate output directory with symlinks to files in collection
273             vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
274
275         if "task.cwd" in taskp:
276             os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
277
278     cmd = []
279     if isinstance(taskp["command"][0], list):
280         for c in taskp["command"]:
281             cmd.append(flatten(expand_item(taskp, c)))
282     else:
283         cmd.append(flatten(expand_item(taskp, taskp["command"])))
284
285     if "task.stdin" in taskp:
286         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
287         if not args.dry_run:
288             stdinfile = open(stdinname, "rb")
289
290     if "task.stdout" in taskp:
291         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
292         if not args.dry_run:
293             stdoutfile = open(stdoutname, "wb")
294
295     logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
296
297     if args.dry_run:
298         sys.exit(0)
299 except subst.SubstitutionError as e:
300     logger.error(str(e))
301     logger.error("task parameters were:")
302     logger.error(pprint.pformat(taskp))
303     sys.exit(1)
304 except Exception as e:
305     logger.exception("caught exception")
306     logger.error("task parameters were:")
307     logger.error(pprint.pformat(taskp))
308     sys.exit(1)
309
310 try:
311     subprocesses = []
312     close_streams = []
313     if stdinfile:
314         close_streams.append(stdinfile)
315     next_stdin = stdinfile
316
317     for i in xrange(len(cmd)):
318         if i == len(cmd)-1:
319             # this is the last command in the pipeline, so its stdout should go to stdoutfile
320             next_stdout = stdoutfile
321         else:
322             # this is an intermediate command in the pipeline, so its stdout should go to a pipe
323             next_stdout = subprocess.PIPE
324
325         sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout)
326
327         # Need to close the FDs on our side so that subcommands will get SIGPIPE if the
328         # consuming process ends prematurely.
329         if sp.stdout:
330             close_streams.append(sp.stdout)
331
332         # Send this processes's stdout to to the next process's stdin
333         next_stdin = sp.stdout
334
335         subprocesses.append(sp)
336
337     # File descriptors have been handed off to the subprocesses, so close them here.
338     for s in close_streams:
339         s.close()
340
341     # Set up signal handling
342     sig = SigHandler()
343
344     # Forward terminate signals to the subprocesses.
345     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(subprocesses, signum))
346     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(subprocesses, signum))
347     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
348
349     active = 1
350     pids = set([s.pid for s in subprocesses])
351     rcode = {}
352     while len(pids) > 0:
353         (pid, status) = os.wait()
354         pids.discard(pid)
355         if not taskp.get("task.ignore_rcode"):
356             rcode[pid] = (status >> 8)
357         else:
358             rcode[pid] = 0
359
360     if sig.sig is not None:
361         logger.critical("terminating on signal %s" % sig.sig)
362         sys.exit(2)
363     else:
364         for i in xrange(len(cmd)):
365             r = rcode[subprocesses[i].pid]
366             logger.info("%s completed with exit code %i (%s)" % (cmd[i][0], r, "success" if r == 0 else "failed"))
367
368 except Exception as e:
369     logger.exception("caught exception")
370
371 # restore default signal handlers.
372 signal.signal(signal.SIGINT, signal.SIG_DFL)
373 signal.signal(signal.SIGTERM, signal.SIG_DFL)
374 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
375
376 for l in links:
377     os.unlink(l)
378
379 logger.info("the following output files will be saved to keep:")
380
381 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
382
383 logger.info("start writing output to keep")
384
385 if "task.vwd" in taskp:
386     if "task.foreach" in jobp:
387         # This is a subtask, so don't merge with the original collection, that will happen at the end
388         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
389     else:
390         # Just a single task, so do merge with the original collection
391         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
392 else:
393     outcollection = robust_put.upload(outdir, logger)
394
395 success = reduce(lambda x, y: x & (y == 0), [True]+rcode.values())
396
397 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
398                                      body={
399                                          'output': outcollection,
400                                          'success': success,
401                                          'progress':1.0
402                                      }).execute()
403
404 sys.exit(rcode)