34419b4de42fb7d908f9a4726fb301a4d3439af9
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28 import argparse
29 import json
30 import tempfile
31
32 parser = argparse.ArgumentParser()
33 parser.add_argument('--dry-run', action='store_true')
34 parser.add_argument('--script-parameters', type=str, default="{}")
35 args = parser.parse_args()
36
37 os.umask(0077)
38
39 if not args.dry_run:
40     api = arvados.api('v1')
41     t = arvados.current_task().tmpdir
42     os.chdir(arvados.current_task().tmpdir)
43     os.mkdir("tmpdir")
44     os.mkdir("output")
45
46     os.chdir("output")
47
48     outdir = os.getcwd()
49
50     taskp = None
51     jobp = arvados.current_job()['script_parameters']
52     if len(arvados.current_task()['parameters']) > 0:
53         taskp = arvados.current_task()['parameters']
54 else:
55     outdir = "/tmp"
56     jobp = json.loads(args.script_parameters)
57     os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
58     os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
59     os.environ['CRUNCH_SRC'] = '/tmp/crunche-src'
60     if 'TASK_KEEPMOUNT' not in os.environ:
61         os.environ['TASK_KEEPMOUNT'] = '/keep'
62
63 links = []
64
65 def sub_tmpdir(v):
66     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
67
68 def sub_outdir(v):
69     return outdir
70
71 def sub_cores(v):
72      return str(multiprocessing.cpu_count())
73
74 def sub_jobid(v):
75      return os.environ['JOB_UUID']
76
77 def sub_taskid(v):
78      return os.environ['TASK_UUID']
79
80 def sub_jobsrc(v):
81      return os.environ['CRUNCH_SRC']
82
83 subst.default_subs["task.tmpdir"] = sub_tmpdir
84 subst.default_subs["task.outdir"] = sub_outdir
85 subst.default_subs["job.srcdir"] = sub_jobsrc
86 subst.default_subs["node.cores"] = sub_cores
87 subst.default_subs["job.uuid"] = sub_jobid
88 subst.default_subs["task.uuid"] = sub_taskid
89
90 class SigHandler(object):
91     def __init__(self):
92         self.sig = None
93
94     def send_signal(self, subprocesses, signum):
95         for sp in subprocesses:
96             sp.send_signal(signum)
97         self.sig = signum
98
99 def add_to_group(gr, match):
100     m = match.groups()
101     if m not in gr:
102         gr[m] = []
103     gr[m].append(match.group(0))
104
105 def expand_item(p, c):
106     if isinstance(c, dict):
107         if "foreach" in c and "command" in c:
108             var = c["foreach"]
109             items = get_items(p, p[var])
110             r = []
111             for i in items:
112                 params = copy.copy(p)
113                 params[var] = i
114                 r.extend(expand_list(params, c["command"]))
115             return r
116         if "list" in c and "index" in c and "command" in c:
117             var = c["list"]
118             items = get_items(p, p[var])
119             params = copy.copy(p)
120             params[var] = items[int(c["index"])]
121             return expand_list(params, c["command"])
122         if "regex" in c:
123             pattern = re.compile(c["regex"])
124             if "filter" in c:
125                 items = get_items(p, p[c["filter"]])
126                 return [i for i in items if pattern.match(i)]
127             elif "group" in c:
128                 items = get_items(p, p[c["group"]])
129                 groups = {}
130                 for i in items:
131                     match = pattern.match(i)
132                     if match:
133                         add_to_group(groups, match)
134                 return [groups[k] for k in groups]
135             elif "extract" in c:
136                 items = get_items(p, p[c["extract"]])
137                 r = []
138                 for i in items:
139                     match = pattern.match(i)
140                     if match:
141                         r.append(list(match.groups()))
142                 return r
143     elif isinstance(c, list):
144         return expand_list(p, c)
145     elif isinstance(c, basestring):
146         return [subst.do_substitution(p, c)]
147
148     return []
149
150 def expand_list(p, l):
151     if isinstance(l, basestring):
152         return expand_item(p, l)
153     else:
154         return [exp for arg in l for exp in expand_item(p, arg)]
155
156 def get_items(p, value):
157     if isinstance(value, dict):
158         return expand_item(p, value)
159
160     if isinstance(value, list):
161         return expand_list(p, value)
162
163     fn = subst.do_substitution(p, value)
164     mode = os.stat(fn).st_mode
165     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
166     if mode is not None:
167         if stat.S_ISDIR(mode):
168             items = [os.path.join(fn, l) for l in os.listdir(fn)]
169         elif stat.S_ISREG(mode):
170             with open(fn) as f:
171                 items = [line.rstrip("\r\n") for line in f]
172         return items
173     else:
174         return None
175
176 stdoutname = None
177 stdoutfile = None
178 stdinname = None
179 stdinfile = None
180 rcode = 1
181
182 def recursive_foreach(params, fvars):
183     var = fvars[0]
184     fvars = fvars[1:]
185     items = get_items(params, params[var])
186     logger.info("parallelizing on %s with items %s" % (var, items))
187     if items is not None:
188         for i in items:
189             params = copy.copy(params)
190             params[var] = i
191             if len(fvars) > 0:
192                 recursive_foreach(params, fvars)
193             else:
194                 if not args.dry_run:
195                     arvados.api().job_tasks().create(body={
196                         'job_uuid': arvados.current_job()['uuid'],
197                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
198                         'sequence': 1,
199                         'parameters': params
200                     }).execute()
201                 else:
202                     logger.info(expand_list(params, params["command"]))
203     else:
204         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
205         sys.exit(1)
206
207 try:
208     if "task.foreach" in jobp:
209         if args.dry_run or arvados.current_task()['sequence'] == 0:
210             # This is the first task to start the other tasks and exit
211             fvars = jobp["task.foreach"]
212             if isinstance(fvars, basestring):
213                 fvars = [fvars]
214             if not isinstance(fvars, list) or len(fvars) == 0:
215                 logger.error("value of task.foreach must be a string or non-empty list")
216                 sys.exit(1)
217             recursive_foreach(jobp, jobp["task.foreach"])
218             if not args.dry_run:
219                 if "task.vwd" in jobp:
220                     # Set output of the first task to the base vwd collection so it
221                     # will be merged with output fragments from the other tasks by
222                     # crunch.
223                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
224                 else:
225                     arvados.current_task().set_output(None)
226             sys.exit(0)
227     else:
228         # This is the only task so taskp/jobp are the same
229         taskp = jobp
230
231     if not args.dry_run:
232         if "task.vwd" in taskp:
233             # Populate output directory with symlinks to files in collection
234             vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
235
236         if "task.cwd" in taskp:
237             os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
238
239     cmd = []
240     if isinstance(taskp["command"][0], list):
241         for c in taskp["command"]:
242             cmd.append(expand_list(taskp, c))
243     else:
244         cmd.append(expand_list(taskp, taskp["command"]))
245
246     if "task.stdin" in taskp:
247         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
248         if not args.dry_run:
249             stdinfile = open(stdinname, "rb")
250
251     if "task.stdout" in taskp:
252         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
253         if not args.dry_run:
254             stdoutfile = open(stdoutname, "wb")
255
256     logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
257
258     if args.dry_run:
259         sys.exit(0)
260 except subst.SubstitutionError as e:
261     logger.error(str(e))
262     logger.error("task parameters were:")
263     logger.error(pprint.pformat(taskp))
264     sys.exit(1)
265 except Exception as e:
266     logger.exception("caught exception")
267     logger.error("task parameters were:")
268     logger.error(pprint.pformat(taskp))
269     sys.exit(1)
270
271 try:
272     subprocesses = []
273     close_streams = []
274     if stdinfile:
275         close_streams.append(stdinfile)
276     next_stdin = stdinfile
277
278     for i in xrange(len(cmd)):
279         if i == len(cmd)-1:
280             # this is the last command in the pipeline, so its stdout should go to stdoutfile
281             next_stdout = stdoutfile
282         else:
283             # this is an intermediate command in the pipeline, so its stdout should go to a pipe
284             next_stdout = subprocess.PIPE
285
286         sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout)
287
288         # Need to close the FDs on our side so that subcommands will get SIGPIPE if the
289         # consuming process ends prematurely.
290         if sp.stdout:
291             close_streams.append(sp.stdout)
292
293         # Send this processes's stdout to to the next process's stdin
294         next_stdin = sp.stdout
295
296         subprocesses.append(sp)
297
298     # File descriptors have been handed off to the subprocesses, so close them here.
299     for s in close_streams:
300         s.close()
301
302     # Set up signal handling
303     sig = SigHandler()
304
305     # Forward terminate signals to the subprocesses.
306     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(subprocesses, signum))
307     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(subprocesses, signum))
308     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
309
310     active = 1
311     while active > 0:
312         os.waitpid(0, 0)
313         active = sum([1 if s.poll() is None else 0 for s in subprocesses])
314
315     # wait for process to complete.
316     rcode = subprocesses[len(subprocesses)-1].returncode
317
318     if sig.sig is not None:
319         logger.critical("terminating on signal %s" % sig.sig)
320         sys.exit(2)
321     else:
322         logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
323
324 except Exception as e:
325     logger.exception("caught exception")
326
327 # restore default signal handlers.
328 signal.signal(signal.SIGINT, signal.SIG_DFL)
329 signal.signal(signal.SIGTERM, signal.SIG_DFL)
330 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
331
332 for l in links:
333     os.unlink(l)
334
335 logger.info("the following output files will be saved to keep:")
336
337 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
338
339 logger.info("start writing output to keep")
340
341 if "task.vwd" in taskp:
342     if "task.foreach" in jobp:
343         # This is a subtask, so don't merge with the original collection, that will happen at the end
344         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
345     else:
346         # Just a single task, so do merge with the original collection
347         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
348 else:
349     outcollection = robust_put.upload(outdir, logger)
350
351 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
352                                      body={
353                                          'output': outcollection,
354                                          'success': (rcode == 0),
355                                          'progress':1.0
356                                      }).execute()
357
358 sys.exit(rcode)