Merge branch '4042-run-command-MxN' closes #4042
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28 import argparse
29 import json
30 import tempfile
31
32 parser = argparse.ArgumentParser()
33 parser.add_argument('--dry-run', action='store_true')
34 parser.add_argument('--script-parameters', type=str, default="{}")
35 args = parser.parse_args()
36
37 os.umask(0077)
38
39 if not args.dry_run:
40     api = arvados.api('v1')
41     t = arvados.current_task().tmpdir
42     os.chdir(arvados.current_task().tmpdir)
43     os.mkdir("tmpdir")
44     os.mkdir("output")
45
46     os.chdir("output")
47
48     outdir = os.getcwd()
49
50     taskp = None
51     jobp = arvados.current_job()['script_parameters']
52     if len(arvados.current_task()['parameters']) > 0:
53         taskp = arvados.current_task()['parameters']
54 else:
55     outdir = "/tmp"
56     jobp = json.loads(args.job_parameters)
57     os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
58     os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
59     os.environ['CRUNCH_SRC'] = '/tmp/crunche-src'
60     if 'TASK_KEEPMOUNT' not in os.environ:
61         os.environ['TASK_KEEPMOUNT'] = '/keep'
62
63 links = []
64
65 def sub_tmpdir(v):
66     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
67
68 def sub_outdir(v):
69     return outdir
70
71 def sub_cores(v):
72      return str(multiprocessing.cpu_count())
73
74 def sub_jobid(v):
75      return os.environ['JOB_UUID']
76
77 def sub_taskid(v):
78      return os.environ['TASK_UUID']
79
80 def sub_jobsrc(v):
81      return os.environ['CRUNCH_SRC']
82
83 subst.default_subs["task.tmpdir"] = sub_tmpdir
84 subst.default_subs["task.outdir"] = sub_outdir
85 subst.default_subs["job.srcdir"] = sub_jobsrc
86 subst.default_subs["node.cores"] = sub_cores
87 subst.default_subs["job.uuid"] = sub_jobid
88 subst.default_subs["task.uuid"] = sub_taskid
89
90 class SigHandler(object):
91     def __init__(self):
92         self.sig = None
93
94     def send_signal(self, sp, signum):
95         sp.send_signal(signum)
96         self.sig = signum
97
98 def add_to_group(gr, match):
99     m = match.groups()
100     if m not in gr:
101         gr[m] = []
102     gr[m].append(match.group(0))
103
104 def expand_item(p, c):
105     if isinstance(c, dict):
106         if "foreach" in c and "command" in c:
107             var = c["foreach"]
108             items = get_items(p, p[var])
109             r = []
110             for i in items:
111                 params = copy.copy(p)
112                 params[var] = i
113                 r.extend(expand_list(params, c["command"]))
114             return r
115         if "list" in c and "index" in c and "command" in c:
116             var = c["list"]
117             items = get_items(p, p[var])
118             params = copy.copy(p)
119             params[var] = items[int(c["index"])]
120             return expand_list(params, c["command"])
121         if "regex" in c:
122             pattern = re.compile(c["regex"])
123             if "filter" in c:
124                 items = get_items(p, p[c["filter"]])
125                 return [i for i in items if pattern.match(i)]
126             elif "group" in c:
127                 items = get_items(p, p[c["group"]])
128                 groups = {}
129                 for i in items:
130                     match = pattern.match(i)
131                     if match:
132                         add_to_group(groups, match)
133                 return [groups[k] for k in groups]
134             elif "extract" in c:
135                 items = get_items(p, p[c["extract"]])
136                 r = []
137                 for i in items:
138                     match = pattern.match(i)
139                     if match:
140                         r.append(list(match.groups()))
141                 return r
142     elif isinstance(c, list):
143         return expand_list(p, c)
144     elif isinstance(c, basestring):
145         return [subst.do_substitution(p, c)]
146
147     return []
148
149 def expand_list(p, l):
150     if isinstance(l, basestring):
151         return expand_item(p, l)
152     else:
153         return [exp for arg in l for exp in expand_item(p, arg)]
154
155 def get_items(p, value):
156     if isinstance(value, dict):
157         return expand_item(p, value)
158
159     if isinstance(value, list):
160         return expand_list(p, value)
161
162     fn = subst.do_substitution(p, value)
163     mode = os.stat(fn).st_mode
164     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
165     if mode is not None:
166         if stat.S_ISDIR(mode):
167             items = [os.path.join(fn, l) for l in os.listdir(fn)]
168         elif stat.S_ISREG(mode):
169             with open(fn) as f:
170                 items = [line.rstrip("\r\n") for line in f]
171         return items
172     else:
173         return None
174
175 stdoutname = None
176 stdoutfile = None
177 stdinname = None
178 stdinfile = None
179 rcode = 1
180
181 def recursive_foreach(params, fvars):
182     var = fvars[0]
183     fvars = fvars[1:]
184     items = get_items(params, params[var])
185     logger.info("parallelizing on %s with items %s" % (var, items))
186     if items is not None:
187         for i in items:
188             params = copy.copy(params)
189             params[var] = i
190             if len(fvars) > 0:
191                 recursive_foreach(params, fvars)
192             else:
193                 if not args.dry_run:
194                     arvados.api().job_tasks().create(body={
195                         'job_uuid': arvados.current_job()['uuid'],
196                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
197                         'sequence': 1,
198                         'parameters': params
199                     }).execute()
200                 else:
201                     logger.info(expand_list(params, params["command"]))
202     else:
203         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
204         sys.exit(1)
205
206 try:
207     if "task.foreach" in jobp:
208         if args.dry_run or arvados.current_task()['sequence'] == 0:
209             # This is the first task to start the other tasks and exit
210             fvars = jobp["task.foreach"]
211             if isinstance(fvars, basestring):
212                 fvars = [fvars]
213             if not isinstance(fvars, list) or len(fvars) == 0:
214                 logger.error("value of task.foreach must be a string or non-empty list")
215                 sys.exit(1)
216             recursive_foreach(jobp, jobp["task.foreach"])
217             if not args.dry_run:
218                 if "task.vwd" in jobp:
219                     # Set output of the first task to the base vwd collection so it
220                     # will be merged with output fragments from the other tasks by
221                     # crunch.
222                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
223                 else:
224                     arvados.current_task().set_output(None)
225             sys.exit(0)
226     else:
227         # This is the only task so taskp/jobp are the same
228         taskp = jobp
229
230     if not args.dry_run:
231         if "task.vwd" in taskp:
232             # Populate output directory with symlinks to files in collection
233             vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
234
235         if "task.cwd" in taskp:
236             os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
237
238     cmd = expand_list(taskp, taskp["command"])
239
240     if not args.dry_run:
241         if "task.stdin" in taskp:
242             stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
243             stdinfile = open(stdinname, "rb")
244
245         if "task.stdout" in taskp:
246             stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
247             stdoutfile = open(stdoutname, "wb")
248
249     logger.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
250
251     if args.dry_run:
252         sys.exit(0)
253 except subst.SubstitutionError as e:
254     logger.error(str(e))
255     logger.error("task parameters were:")
256     logger.error(pprint.pformat(taskp))
257     sys.exit(1)
258 except Exception as e:
259     logger.exception("caught exception")
260     logger.error("task parameters were:")
261     logger.error(pprint.pformat(taskp))
262     sys.exit(1)
263
264 try:
265     sp = subprocess.Popen(cmd, shell=False, stdin=stdinfile, stdout=stdoutfile)
266     sig = SigHandler()
267
268     # forward signals to the process.
269     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
270     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
271     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
272
273     # wait for process to complete.
274     rcode = sp.wait()
275
276     if sig.sig is not None:
277         logger.critical("terminating on signal %s" % sig.sig)
278         sys.exit(2)
279     else:
280         logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
281
282 except Exception as e:
283     logger.exception("caught exception")
284
285 # restore default signal handlers.
286 signal.signal(signal.SIGINT, signal.SIG_DFL)
287 signal.signal(signal.SIGTERM, signal.SIG_DFL)
288 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
289
290 for l in links:
291     os.unlink(l)
292
293 logger.info("the following output files will be saved to keep:")
294
295 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
296
297 logger.info("start writing output to keep")
298
299 if "task.vwd" in taskp:
300     if "task.foreach" in jobp:
301         # This is a subtask, so don't merge with the original collection, that will happen at the end
302         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
303     else:
304         # Just a single task, so do merge with the original collection
305         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
306 else:
307     outcollection = robust_put.upload(outdir, logger)
308
309 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
310                                      body={
311                                          'output': outcollection,
312                                          'success': (rcode == 0),
313                                          'progress':1.0
314                                      }).execute()
315
316 sys.exit(rcode)