Start working on adding local unix pipe support to run-command.
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28 import argparse
29 import json
30 import tempfile
31
32 parser = argparse.ArgumentParser()
33 parser.add_argument('--dry-run', action='store_true')
34 parser.add_argument('--script-parameters', type=str, default="{}")
35 args = parser.parse_args()
36
37 os.umask(0077)
38
39 if not args.dry_run:
40     api = arvados.api('v1')
41     t = arvados.current_task().tmpdir
42     os.chdir(arvados.current_task().tmpdir)
43     os.mkdir("tmpdir")
44     os.mkdir("output")
45
46     os.chdir("output")
47
48     outdir = os.getcwd()
49
50     taskp = None
51     jobp = arvados.current_job()['script_parameters']
52     if len(arvados.current_task()['parameters']) > 0:
53         taskp = arvados.current_task()['parameters']
54 else:
55     outdir = "/tmp"
56     jobp = json.loads(args.job_parameters)
57     os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
58     os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
59     os.environ['CRUNCH_SRC'] = '/tmp/crunche-src'
60     if 'TASK_KEEPMOUNT' not in os.environ:
61         os.environ['TASK_KEEPMOUNT'] = '/keep'
62
63 links = []
64
65 def sub_tmpdir(v):
66     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
67
68 def sub_outdir(v):
69     return outdir
70
71 def sub_cores(v):
72      return str(multiprocessing.cpu_count())
73
74 def sub_jobid(v):
75      return os.environ['JOB_UUID']
76
77 def sub_taskid(v):
78      return os.environ['TASK_UUID']
79
80 def sub_jobsrc(v):
81      return os.environ['CRUNCH_SRC']
82
83 subst.default_subs["task.tmpdir"] = sub_tmpdir
84 subst.default_subs["task.outdir"] = sub_outdir
85 subst.default_subs["job.srcdir"] = sub_jobsrc
86 subst.default_subs["node.cores"] = sub_cores
87 subst.default_subs["job.uuid"] = sub_jobid
88 subst.default_subs["task.uuid"] = sub_taskid
89
90 class SigHandler(object):
91     def __init__(self):
92         self.sig = None
93
94     def send_signal(self, sp, signum):
95         sp.send_signal(signum)
96         self.sig = signum
97
98 def add_to_group(gr, match):
99     m = match.groups()
100     if m not in gr:
101         gr[m] = []
102     gr[m].append(match.group(0))
103
104 def expand_item(p, c):
105     if isinstance(c, dict):
106         if "foreach" in c and "command" in c:
107             var = c["foreach"]
108             items = get_items(p, p[var])
109             r = []
110             for i in items:
111                 params = copy.copy(p)
112                 params[var] = i
113                 r.extend(expand_list(params, c["command"]))
114             return r
115         if "list" in c and "index" in c and "command" in c:
116             var = c["list"]
117             items = get_items(p, p[var])
118             params = copy.copy(p)
119             params[var] = items[int(c["index"])]
120             return expand_list(params, c["command"])
121         if "regex" in c:
122             pattern = re.compile(c["regex"])
123             if "filter" in c:
124                 items = get_items(p, p[c["filter"]])
125                 return [i for i in items if pattern.match(i)]
126             elif "group" in c:
127                 items = get_items(p, p[c["group"]])
128                 groups = {}
129                 for i in items:
130                     match = pattern.match(i)
131                     if match:
132                         add_to_group(groups, match)
133                 return [groups[k] for k in groups]
134             elif "extract" in c:
135                 items = get_items(p, p[c["extract"]])
136                 r = []
137                 for i in items:
138                     match = pattern.match(i)
139                     if match:
140                         r.append(list(match.groups()))
141                 return r
142     elif isinstance(c, list):
143         return expand_list(p, c)
144     elif isinstance(c, basestring):
145         return [subst.do_substitution(p, c)]
146
147     return []
148
149 def expand_list(p, l):
150     if isinstance(l, basestring):
151         return expand_item(p, l)
152     else:
153         return [exp for arg in l for exp in expand_item(p, arg)]
154
155 def get_items(p, value):
156     if isinstance(value, dict):
157         return expand_item(p, value)
158
159     if isinstance(value, list):
160         return expand_list(p, value)
161
162     fn = subst.do_substitution(p, value)
163     mode = os.stat(fn).st_mode
164     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
165     if mode is not None:
166         if stat.S_ISDIR(mode):
167             items = [os.path.join(fn, l) for l in os.listdir(fn)]
168         elif stat.S_ISREG(mode):
169             with open(fn) as f:
170                 items = [line.rstrip("\r\n") for line in f]
171         return items
172     else:
173         return None
174
175 stdoutname = None
176 stdoutfile = None
177 stdinname = None
178 stdinfile = None
179 rcode = 1
180
181 def recursive_foreach(params, fvars):
182     var = fvars[0]
183     fvars = fvars[1:]
184     items = get_items(params, params[var])
185     logger.info("parallelizing on %s with items %s" % (var, items))
186     if items is not None:
187         for i in items:
188             params = copy.copy(params)
189             params[var] = i
190             if len(fvars) > 0:
191                 recursive_foreach(params, fvars)
192             else:
193                 if not args.dry_run:
194                     arvados.api().job_tasks().create(body={
195                         'job_uuid': arvados.current_job()['uuid'],
196                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
197                         'sequence': 1,
198                         'parameters': params
199                     }).execute()
200                 else:
201                     logger.info(expand_list(params, params["command"]))
202     else:
203         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
204         sys.exit(1)
205
206 try:
207     if "task.foreach" in jobp:
208         if args.dry_run or arvados.current_task()['sequence'] == 0:
209             # This is the first task to start the other tasks and exit
210             fvars = jobp["task.foreach"]
211             if isinstance(fvars, basestring):
212                 fvars = [fvars]
213             if not isinstance(fvars, list) or len(fvars) == 0:
214                 logger.error("value of task.foreach must be a string or non-empty list")
215                 sys.exit(1)
216             recursive_foreach(jobp, jobp["task.foreach"])
217             if not args.dry_run:
218                 if "task.vwd" in jobp:
219                     # Set output of the first task to the base vwd collection so it
220                     # will be merged with output fragments from the other tasks by
221                     # crunch.
222                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
223                 else:
224                     arvados.current_task().set_output(None)
225             sys.exit(0)
226     else:
227         # This is the only task so taskp/jobp are the same
228         taskp = jobp
229
230     if not args.dry_run:
231         if "task.vwd" in taskp:
232             # Populate output directory with symlinks to files in collection
233             vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
234
235         if "task.cwd" in taskp:
236             os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
237
238     if "piped_commands" in taskp:
239         cmd = []
240         for c in taskp["piped_commands"]:
241             cmd += expand_list(taskp, c)
242     else:
243         cmd = [expand_list(taskp, taskp["command"])]
244
245     if not args.dry_run:
246         if "task.stdin" in taskp:
247             stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
248             stdinfile = open(stdinname, "rb")
249
250         if "task.stdout" in taskp:
251             stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
252             stdoutfile = open(stdoutname, "wb")
253
254     logger.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
255
256     if args.dry_run:
257         sys.exit(0)
258 except subst.SubstitutionError as e:
259     logger.error(str(e))
260     logger.error("task parameters were:")
261     logger.error(pprint.pformat(taskp))
262     sys.exit(1)
263 except Exception as e:
264     logger.exception("caught exception")
265     logger.error("task parameters were:")
266     logger.error(pprint.pformat(taskp))
267     sys.exit(1)
268
269 try:
270     subprocesses = []
271     next_cmd_stdin = stdinfile
272     for i in xrange(len(cmd)):
273         if i == len(cmd)-1:
274             next_cmd_stdout = stdoutfile
275         else:
276             next_cmd_stdout = subprocess.PIPE
277         sp = subprocess.Popen(cmd, shell=False, stdin=next_cmd_stdin, stdout=next_cmd_stdout)
278         next_cmd_stdin = sp.stdout
279
280         sig = SigHandler()
281
282         # forward signals to the process.
283         signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
284         signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
285         signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
286
287         subprocesses.append(sp)
288
289     # wait for process to complete.
290     rcode = sp.wait()
291
292     if sig.sig is not None:
293         logger.critical("terminating on signal %s" % sig.sig)
294         sys.exit(2)
295     else:
296         logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
297
298 except Exception as e:
299     logger.exception("caught exception")
300
301 # restore default signal handlers.
302 signal.signal(signal.SIGINT, signal.SIG_DFL)
303 signal.signal(signal.SIGTERM, signal.SIG_DFL)
304 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
305
306 for l in links:
307     os.unlink(l)
308
309 logger.info("the following output files will be saved to keep:")
310
311 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
312
313 logger.info("start writing output to keep")
314
315 if "task.vwd" in taskp:
316     if "task.foreach" in jobp:
317         # This is a subtask, so don't merge with the original collection, that will happen at the end
318         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
319     else:
320         # Just a single task, so do merge with the original collection
321         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
322 else:
323     outcollection = robust_put.upload(outdir, logger)
324
325 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
326                                      body={
327                                          'output': outcollection,
328                                          'success': (rcode == 0),
329                                          'progress':1.0
330                                      }).execute()
331
332 sys.exit(rcode)