New wait logic, report all exit codes.
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28 import argparse
29 import json
30 import tempfile
31 import errno
32
33 parser = argparse.ArgumentParser()
34 parser.add_argument('--dry-run', action='store_true')
35 parser.add_argument('--script-parameters', type=str, default="{}")
36 args = parser.parse_args()
37
38 os.umask(0077)
39
40 if not args.dry_run:
41     api = arvados.api('v1')
42     t = arvados.current_task().tmpdir
43     os.chdir(arvados.current_task().tmpdir)
44     os.mkdir("tmpdir")
45     os.mkdir("output")
46
47     os.chdir("output")
48
49     outdir = os.getcwd()
50
51     taskp = None
52     jobp = arvados.current_job()['script_parameters']
53     if len(arvados.current_task()['parameters']) > 0:
54         taskp = arvados.current_task()['parameters']
55 else:
56     outdir = "/tmp"
57     jobp = json.loads(args.script_parameters)
58     os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
59     os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
60     os.environ['CRUNCH_SRC'] = '/tmp/crunche-src'
61     if 'TASK_KEEPMOUNT' not in os.environ:
62         os.environ['TASK_KEEPMOUNT'] = '/keep'
63
64 links = []
65
66 def sub_tmpdir(v):
67     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
68
69 def sub_outdir(v):
70     return outdir
71
72 def sub_cores(v):
73      return str(multiprocessing.cpu_count())
74
75 def sub_jobid(v):
76      return os.environ['JOB_UUID']
77
78 def sub_taskid(v):
79      return os.environ['TASK_UUID']
80
81 def sub_jobsrc(v):
82      return os.environ['CRUNCH_SRC']
83
84 subst.default_subs["task.tmpdir"] = sub_tmpdir
85 subst.default_subs["task.outdir"] = sub_outdir
86 subst.default_subs["job.srcdir"] = sub_jobsrc
87 subst.default_subs["node.cores"] = sub_cores
88 subst.default_subs["job.uuid"] = sub_jobid
89 subst.default_subs["task.uuid"] = sub_taskid
90
91 class SigHandler(object):
92     def __init__(self):
93         self.sig = None
94
95     def send_signal(self, subprocesses, signum):
96         for sp in subprocesses:
97             sp.send_signal(signum)
98         self.sig = signum
99
100 def add_to_group(gr, match):
101     m = match.groups()
102     if m not in gr:
103         gr[m] = []
104     gr[m].append(match.group(0))
105
106 def expand_item(p, c):
107     if isinstance(c, dict):
108         if "foreach" in c and "command" in c:
109             var = c["foreach"]
110             items = get_items(p, p[var])
111             r = []
112             for i in items:
113                 params = copy.copy(p)
114                 params[var] = i
115                 r.extend(expand_list(params, c["command"]))
116             return r
117         if "list" in c and "index" in c and "command" in c:
118             var = c["list"]
119             items = get_items(p, p[var])
120             params = copy.copy(p)
121             params[var] = items[int(c["index"])]
122             return expand_list(params, c["command"])
123         if "regex" in c:
124             pattern = re.compile(c["regex"])
125             if "filter" in c:
126                 items = get_items(p, p[c["filter"]])
127                 return [i for i in items if pattern.match(i)]
128             elif "group" in c:
129                 items = get_items(p, p[c["group"]])
130                 groups = {}
131                 for i in items:
132                     match = pattern.match(i)
133                     if match:
134                         add_to_group(groups, match)
135                 return [groups[k] for k in groups]
136             elif "extract" in c:
137                 items = get_items(p, p[c["extract"]])
138                 r = []
139                 for i in items:
140                     match = pattern.match(i)
141                     if match:
142                         r.append(list(match.groups()))
143                 return r
144     elif isinstance(c, list):
145         return expand_list(p, c)
146     elif isinstance(c, basestring):
147         return [subst.do_substitution(p, c)]
148
149     return []
150
151 def expand_list(p, l):
152     if isinstance(l, basestring):
153         return expand_item(p, l)
154     else:
155         return [exp for arg in l for exp in expand_item(p, arg)]
156
157 def get_items(p, value):
158     if isinstance(value, dict):
159         return expand_item(p, value)
160
161     if isinstance(value, list):
162         return expand_list(p, value)
163
164     fn = subst.do_substitution(p, value)
165     mode = os.stat(fn).st_mode
166     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
167     if mode is not None:
168         if stat.S_ISDIR(mode):
169             items = [os.path.join(fn, l) for l in os.listdir(fn)]
170         elif stat.S_ISREG(mode):
171             with open(fn) as f:
172                 items = [line.rstrip("\r\n") for line in f]
173         return items
174     else:
175         return None
176
177 stdoutname = None
178 stdoutfile = None
179 stdinname = None
180 stdinfile = None
181
182 def recursive_foreach(params, fvars):
183     var = fvars[0]
184     fvars = fvars[1:]
185     items = get_items(params, params[var])
186     logger.info("parallelizing on %s with items %s" % (var, items))
187     if items is not None:
188         for i in items:
189             params = copy.copy(params)
190             params[var] = i
191             if len(fvars) > 0:
192                 recursive_foreach(params, fvars)
193             else:
194                 if not args.dry_run:
195                     arvados.api().job_tasks().create(body={
196                         'job_uuid': arvados.current_job()['uuid'],
197                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
198                         'sequence': 1,
199                         'parameters': params
200                     }).execute()
201                 else:
202                     logger.info(expand_list(params, params["command"]))
203     else:
204         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
205         sys.exit(1)
206
207 try:
208     if "task.foreach" in jobp:
209         if args.dry_run or arvados.current_task()['sequence'] == 0:
210             # This is the first task to start the other tasks and exit
211             fvars = jobp["task.foreach"]
212             if isinstance(fvars, basestring):
213                 fvars = [fvars]
214             if not isinstance(fvars, list) or len(fvars) == 0:
215                 logger.error("value of task.foreach must be a string or non-empty list")
216                 sys.exit(1)
217             recursive_foreach(jobp, jobp["task.foreach"])
218             if not args.dry_run:
219                 if "task.vwd" in jobp:
220                     # Set output of the first task to the base vwd collection so it
221                     # will be merged with output fragments from the other tasks by
222                     # crunch.
223                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
224                 else:
225                     arvados.current_task().set_output(None)
226             sys.exit(0)
227     else:
228         # This is the only task so taskp/jobp are the same
229         taskp = jobp
230
231     if not args.dry_run:
232         if "task.vwd" in taskp:
233             # Populate output directory with symlinks to files in collection
234             vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
235
236         if "task.cwd" in taskp:
237             os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
238
239     cmd = []
240     if isinstance(taskp["command"][0], list):
241         for c in taskp["command"]:
242             cmd.append(expand_list(taskp, c))
243     else:
244         cmd.append(expand_list(taskp, taskp["command"]))
245
246     if "task.stdin" in taskp:
247         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
248         if not args.dry_run:
249             stdinfile = open(stdinname, "rb")
250
251     if "task.stdout" in taskp:
252         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
253         if not args.dry_run:
254             stdoutfile = open(stdoutname, "wb")
255
256     logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
257
258     if args.dry_run:
259         sys.exit(0)
260 except subst.SubstitutionError as e:
261     logger.error(str(e))
262     logger.error("task parameters were:")
263     logger.error(pprint.pformat(taskp))
264     sys.exit(1)
265 except Exception as e:
266     logger.exception("caught exception")
267     logger.error("task parameters were:")
268     logger.error(pprint.pformat(taskp))
269     sys.exit(1)
270
271 try:
272     subprocesses = []
273     close_streams = []
274     if stdinfile:
275         close_streams.append(stdinfile)
276     next_stdin = stdinfile
277
278     for i in xrange(len(cmd)):
279         if i == len(cmd)-1:
280             # this is the last command in the pipeline, so its stdout should go to stdoutfile
281             next_stdout = stdoutfile
282         else:
283             # this is an intermediate command in the pipeline, so its stdout should go to a pipe
284             next_stdout = subprocess.PIPE
285
286         sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout)
287
288         # Need to close the FDs on our side so that subcommands will get SIGPIPE if the
289         # consuming process ends prematurely.
290         if sp.stdout:
291             close_streams.append(sp.stdout)
292
293         # Send this processes's stdout to to the next process's stdin
294         next_stdin = sp.stdout
295
296         subprocesses.append(sp)
297
298     # File descriptors have been handed off to the subprocesses, so close them here.
299     for s in close_streams:
300         s.close()
301
302     # Set up signal handling
303     sig = SigHandler()
304
305     # Forward terminate signals to the subprocesses.
306     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(subprocesses, signum))
307     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(subprocesses, signum))
308     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
309
310     active = 1
311     pids = set([s.pid for s in subprocesses])
312     rcode = {}
313     while len(pids) > 0:
314         (pid, status) = os.wait()
315         pids.discard(pid)
316         rcode[pid] = (status >> 8)
317
318     if sig.sig is not None:
319         logger.critical("terminating on signal %s" % sig.sig)
320         sys.exit(2)
321     else:
322         for i in xrange(len(cmd)):
323             r = rcode[subprocesses[i].pid]
324             logger.info("%s completed with exit code %i (%s)" % (cmd[i][0], r, "success" if r == 0 else "failed"))
325
326 except Exception as e:
327     logger.exception("caught exception")
328
329 # restore default signal handlers.
330 signal.signal(signal.SIGINT, signal.SIG_DFL)
331 signal.signal(signal.SIGTERM, signal.SIG_DFL)
332 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
333
334 for l in links:
335     os.unlink(l)
336
337 logger.info("the following output files will be saved to keep:")
338
339 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
340
341 logger.info("start writing output to keep")
342
343 if "task.vwd" in taskp:
344     if "task.foreach" in jobp:
345         # This is a subtask, so don't merge with the original collection, that will happen at the end
346         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
347     else:
348         # Just a single task, so do merge with the original collection
349         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
350 else:
351     outcollection = robust_put.upload(outdir, logger)
352
353 success = reduce(lambda x, y: x & (y == 0), [True]+rcode.values())
354
355 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
356                                      body={
357                                          'output': outcollection,
358                                          'success': success,
359                                          'progress':1.0
360                                      }).execute()
361
362 sys.exit(rcode)