Catch ECHILD from os.waitpid()
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28 import argparse
29 import json
30 import tempfile
31 import errno
32
33 parser = argparse.ArgumentParser()
34 parser.add_argument('--dry-run', action='store_true')
35 parser.add_argument('--script-parameters', type=str, default="{}")
36 args = parser.parse_args()
37
38 os.umask(0077)
39
40 if not args.dry_run:
41     api = arvados.api('v1')
42     t = arvados.current_task().tmpdir
43     os.chdir(arvados.current_task().tmpdir)
44     os.mkdir("tmpdir")
45     os.mkdir("output")
46
47     os.chdir("output")
48
49     outdir = os.getcwd()
50
51     taskp = None
52     jobp = arvados.current_job()['script_parameters']
53     if len(arvados.current_task()['parameters']) > 0:
54         taskp = arvados.current_task()['parameters']
55 else:
56     outdir = "/tmp"
57     jobp = json.loads(args.script_parameters)
58     os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
59     os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
60     os.environ['CRUNCH_SRC'] = '/tmp/crunche-src'
61     if 'TASK_KEEPMOUNT' not in os.environ:
62         os.environ['TASK_KEEPMOUNT'] = '/keep'
63
64 links = []
65
66 def sub_tmpdir(v):
67     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
68
69 def sub_outdir(v):
70     return outdir
71
72 def sub_cores(v):
73      return str(multiprocessing.cpu_count())
74
75 def sub_jobid(v):
76      return os.environ['JOB_UUID']
77
78 def sub_taskid(v):
79      return os.environ['TASK_UUID']
80
81 def sub_jobsrc(v):
82      return os.environ['CRUNCH_SRC']
83
84 subst.default_subs["task.tmpdir"] = sub_tmpdir
85 subst.default_subs["task.outdir"] = sub_outdir
86 subst.default_subs["job.srcdir"] = sub_jobsrc
87 subst.default_subs["node.cores"] = sub_cores
88 subst.default_subs["job.uuid"] = sub_jobid
89 subst.default_subs["task.uuid"] = sub_taskid
90
91 class SigHandler(object):
92     def __init__(self):
93         self.sig = None
94
95     def send_signal(self, subprocesses, signum):
96         for sp in subprocesses:
97             sp.send_signal(signum)
98         self.sig = signum
99
100 def add_to_group(gr, match):
101     m = match.groups()
102     if m not in gr:
103         gr[m] = []
104     gr[m].append(match.group(0))
105
106 def expand_item(p, c):
107     if isinstance(c, dict):
108         if "foreach" in c and "command" in c:
109             var = c["foreach"]
110             items = get_items(p, p[var])
111             r = []
112             for i in items:
113                 params = copy.copy(p)
114                 params[var] = i
115                 r.extend(expand_list(params, c["command"]))
116             return r
117         if "list" in c and "index" in c and "command" in c:
118             var = c["list"]
119             items = get_items(p, p[var])
120             params = copy.copy(p)
121             params[var] = items[int(c["index"])]
122             return expand_list(params, c["command"])
123         if "regex" in c:
124             pattern = re.compile(c["regex"])
125             if "filter" in c:
126                 items = get_items(p, p[c["filter"]])
127                 return [i for i in items if pattern.match(i)]
128             elif "group" in c:
129                 items = get_items(p, p[c["group"]])
130                 groups = {}
131                 for i in items:
132                     match = pattern.match(i)
133                     if match:
134                         add_to_group(groups, match)
135                 return [groups[k] for k in groups]
136             elif "extract" in c:
137                 items = get_items(p, p[c["extract"]])
138                 r = []
139                 for i in items:
140                     match = pattern.match(i)
141                     if match:
142                         r.append(list(match.groups()))
143                 return r
144     elif isinstance(c, list):
145         return expand_list(p, c)
146     elif isinstance(c, basestring):
147         return [subst.do_substitution(p, c)]
148
149     return []
150
151 def expand_list(p, l):
152     if isinstance(l, basestring):
153         return expand_item(p, l)
154     else:
155         return [exp for arg in l for exp in expand_item(p, arg)]
156
157 def get_items(p, value):
158     if isinstance(value, dict):
159         return expand_item(p, value)
160
161     if isinstance(value, list):
162         return expand_list(p, value)
163
164     fn = subst.do_substitution(p, value)
165     mode = os.stat(fn).st_mode
166     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
167     if mode is not None:
168         if stat.S_ISDIR(mode):
169             items = [os.path.join(fn, l) for l in os.listdir(fn)]
170         elif stat.S_ISREG(mode):
171             with open(fn) as f:
172                 items = [line.rstrip("\r\n") for line in f]
173         return items
174     else:
175         return None
176
177 stdoutname = None
178 stdoutfile = None
179 stdinname = None
180 stdinfile = None
181 rcode = 1
182
183 def recursive_foreach(params, fvars):
184     var = fvars[0]
185     fvars = fvars[1:]
186     items = get_items(params, params[var])
187     logger.info("parallelizing on %s with items %s" % (var, items))
188     if items is not None:
189         for i in items:
190             params = copy.copy(params)
191             params[var] = i
192             if len(fvars) > 0:
193                 recursive_foreach(params, fvars)
194             else:
195                 if not args.dry_run:
196                     arvados.api().job_tasks().create(body={
197                         'job_uuid': arvados.current_job()['uuid'],
198                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
199                         'sequence': 1,
200                         'parameters': params
201                     }).execute()
202                 else:
203                     logger.info(expand_list(params, params["command"]))
204     else:
205         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
206         sys.exit(1)
207
208 try:
209     if "task.foreach" in jobp:
210         if args.dry_run or arvados.current_task()['sequence'] == 0:
211             # This is the first task to start the other tasks and exit
212             fvars = jobp["task.foreach"]
213             if isinstance(fvars, basestring):
214                 fvars = [fvars]
215             if not isinstance(fvars, list) or len(fvars) == 0:
216                 logger.error("value of task.foreach must be a string or non-empty list")
217                 sys.exit(1)
218             recursive_foreach(jobp, jobp["task.foreach"])
219             if not args.dry_run:
220                 if "task.vwd" in jobp:
221                     # Set output of the first task to the base vwd collection so it
222                     # will be merged with output fragments from the other tasks by
223                     # crunch.
224                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
225                 else:
226                     arvados.current_task().set_output(None)
227             sys.exit(0)
228     else:
229         # This is the only task so taskp/jobp are the same
230         taskp = jobp
231
232     if not args.dry_run:
233         if "task.vwd" in taskp:
234             # Populate output directory with symlinks to files in collection
235             vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
236
237         if "task.cwd" in taskp:
238             os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
239
240     cmd = []
241     if isinstance(taskp["command"][0], list):
242         for c in taskp["command"]:
243             cmd.append(expand_list(taskp, c))
244     else:
245         cmd.append(expand_list(taskp, taskp["command"]))
246
247     if "task.stdin" in taskp:
248         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
249         if not args.dry_run:
250             stdinfile = open(stdinname, "rb")
251
252     if "task.stdout" in taskp:
253         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
254         if not args.dry_run:
255             stdoutfile = open(stdoutname, "wb")
256
257     logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
258
259     if args.dry_run:
260         sys.exit(0)
261 except subst.SubstitutionError as e:
262     logger.error(str(e))
263     logger.error("task parameters were:")
264     logger.error(pprint.pformat(taskp))
265     sys.exit(1)
266 except Exception as e:
267     logger.exception("caught exception")
268     logger.error("task parameters were:")
269     logger.error(pprint.pformat(taskp))
270     sys.exit(1)
271
272 try:
273     subprocesses = []
274     close_streams = []
275     if stdinfile:
276         close_streams.append(stdinfile)
277     next_stdin = stdinfile
278
279     for i in xrange(len(cmd)):
280         if i == len(cmd)-1:
281             # this is the last command in the pipeline, so its stdout should go to stdoutfile
282             next_stdout = stdoutfile
283         else:
284             # this is an intermediate command in the pipeline, so its stdout should go to a pipe
285             next_stdout = subprocess.PIPE
286
287         sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout)
288
289         # Need to close the FDs on our side so that subcommands will get SIGPIPE if the
290         # consuming process ends prematurely.
291         if sp.stdout:
292             close_streams.append(sp.stdout)
293
294         # Send this processes's stdout to to the next process's stdin
295         next_stdin = sp.stdout
296
297         subprocesses.append(sp)
298
299     # File descriptors have been handed off to the subprocesses, so close them here.
300     for s in close_streams:
301         s.close()
302
303     # Set up signal handling
304     sig = SigHandler()
305
306     # Forward terminate signals to the subprocesses.
307     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(subprocesses, signum))
308     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(subprocesses, signum))
309     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
310
311     active = 1
312     while active > 0:
313         try:
314             os.waitpid(0, 0)
315         except OSError as e:
316             if e.errno == errno.ECHILD:
317                 # child already exited
318                 pass
319             else:
320                 raise
321         active = sum([1 if s.poll() is None else 0 for s in subprocesses])
322
323     # wait for process to complete.
324     rcode = subprocesses[len(subprocesses)-1].returncode
325
326     if sig.sig is not None:
327         logger.critical("terminating on signal %s" % sig.sig)
328         sys.exit(2)
329     else:
330         logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
331
332 except Exception as e:
333     logger.exception("caught exception")
334
335 # restore default signal handlers.
336 signal.signal(signal.SIGINT, signal.SIG_DFL)
337 signal.signal(signal.SIGTERM, signal.SIG_DFL)
338 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
339
340 for l in links:
341     os.unlink(l)
342
343 logger.info("the following output files will be saved to keep:")
344
345 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
346
347 logger.info("start writing output to keep")
348
349 if "task.vwd" in taskp:
350     if "task.foreach" in jobp:
351         # This is a subtask, so don't merge with the original collection, that will happen at the end
352         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
353     else:
354         # Just a single task, so do merge with the original collection
355         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
356 else:
357     outcollection = robust_put.upload(outdir, logger)
358
359 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
360                                      body={
361                                          'output': outcollection,
362                                          'success': (rcode == 0),
363                                          'progress':1.0
364                                      }).execute()
365
366 sys.exit(rcode)