4042: When listing directory, return list of absolute paths
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28 import argparse
29 import json
30 import tempfile
31
32 parser = argparse.ArgumentParser()
33 parser.add_argument('--dry-run', action='store_true')
34 parser.add_argument('--job-parameters', type=str, default="{}")
35 args = parser.parse_args()
36
37 os.umask(0077)
38
39 if not args.dry_run:
40     api = arvados.api('v1')
41     t = arvados.current_task().tmpdir
42     os.chdir(arvados.current_task().tmpdir)
43     os.mkdir("tmpdir")
44     os.mkdir("output")
45
46     os.chdir("output")
47
48     outdir = os.getcwd()
49
50     taskp = None
51     jobp = arvados.current_job()['script_parameters']
52     if len(arvados.current_task()['parameters']) > 0:
53         taskp = arvados.current_task()['parameters']
54 else:
55     outdir = "/tmp"
56     jobp = json.loads(args.job_parameters)
57     os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
58     os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
59     os.environ['CRUNCH_SRC'] = '/tmp/crunch-src'
60     os.environ['TASK_KEEPMOUNT'] = '/keep'
61
62 links = []
63
64 def sub_tmpdir(v):
65     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
66
67 def sub_outdir(v):
68     return outdir
69
70 def sub_cores(v):
71      return str(multiprocessing.cpu_count())
72
73 def sub_jobid(v):
74      return os.environ['JOB_UUID']
75
76 def sub_taskid(v):
77      return os.environ['TASK_UUID']
78
79 def sub_jobsrc(v):
80      return os.environ['CRUNCH_SRC']
81
82 subst.default_subs["task.tmpdir"] = sub_tmpdir
83 subst.default_subs["task.outdir"] = sub_outdir
84 subst.default_subs["job.srcdir"] = sub_jobsrc
85 subst.default_subs["node.cores"] = sub_cores
86 subst.default_subs["job.uuid"] = sub_jobid
87 subst.default_subs["task.uuid"] = sub_taskid
88
89 class SigHandler(object):
90     def __init__(self):
91         self.sig = None
92
93     def send_signal(self, sp, signum):
94         sp.send_signal(signum)
95         self.sig = signum
96
97 def add_to_group(gr, match):
98     m = ('^_^').join(match.groups())
99     if m not in gr:
100         gr[m] = []
101     gr[m].append(match.group(0))
102
103 def expand_item(p, c):
104     if isinstance(c, dict):
105         if "foreach" in c and "command" in c:
106             var = c["foreach"]
107             items = get_items(p, p[var])
108             r = []
109             for i in items:
110                 params = copy.copy(p)
111                 params[var] = i
112                 r.extend(expand_list(params, c["command"]))
113             return r
114         if "list" in c and "index" in c and "command" in c:
115             var = c["list"]
116             items = get_items(p, p[var])
117             params = copy.copy(p)
118             params[var] = items[int(c["index"])]
119             return expand_list(params, c["command"])
120         if "regex" in c:
121             pattern = re.compile(c["regex"])
122             if "filter" in c:
123                 items = get_items(p, p[c["filter"]])
124                 return [i for i in items if pattern.match(i)]
125             elif "group" in c:
126                 items = get_items(p, p[c["group"]])
127                 groups = {}
128                 for i in items:
129                     p = pattern.match(i)
130                     if p:
131                         add_to_group(groups, p)
132                 return [groups[k] for k in groups]
133             elif "extract" in c:
134                 items = get_items(p, p[c["extract"]])
135                 r = []
136                 for i in items:
137                     p = pattern.match(i)
138                     if p:
139                         r.append(list(p.groups()))
140                 return r
141     elif isinstance(c, list):
142         return expand_list(p, c)
143     elif isinstance(c, basestring):
144         return [subst.do_substitution(p, c)]
145
146     return []
147
148 def expand_list(p, l):
149     if isinstance(l, basestring):
150         return expand_item(p, l)
151     else:
152         return [exp for arg in l for exp in expand_item(p, arg)]
153
154 def get_items(p, value):
155     if isinstance(value, dict):
156         return expand_item(p, value)
157
158     if isinstance(value, list):
159         return expand_list(p, value)
160
161     fn = subst.do_substitution(p, value)
162     mode = os.stat(fn).st_mode
163     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
164     if mode is not None:
165         if stat.S_ISDIR(mode):
166             items = [os.path.join(fn, l) for l in os.listdir(fn)]
167         elif stat.S_ISREG(mode):
168             with open(fn) as f:
169                 items = [line.rstrip("\r\n") for line in f]
170         return items
171     else:
172         return None
173
174 stdoutname = None
175 stdoutfile = None
176 stdinname = None
177 stdinfile = None
178 rcode = 1
179
180 def recursive_foreach(params, fvars):
181     var = fvars[0]
182     fvars = fvars[1:]
183     items = get_items(params, params[var])
184     logger.info("parallelizing on %s with items %s" % (var, items))
185     if items is not None:
186         for i in items:
187             params = copy.copy(params)
188             params[var] = i
189             if len(fvars) > 0:
190                 recursive_foreach(params, fvars)
191             else:
192                 if not args.dry_run:
193                     arvados.api().job_tasks().create(body={
194                         'job_uuid': arvados.current_job()['uuid'],
195                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
196                         'sequence': 1,
197                         'parameters': params
198                     }).execute()
199                 else:
200                     logger.info(expand_list(params, params["command"]))
201     else:
202         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
203         sys.exit(1)
204
205 try:
206     if "task.foreach" in jobp:
207         if args.dry_run or arvados.current_task()['sequence'] == 0:
208             # This is the first task to start the other tasks and exit
209             fvars = jobp["task.foreach"]
210             if isinstance(fvars, basestring):
211                 fvars = [fvars]
212             if not isinstance(fvars, list) or len(fvars) == 0:
213                 logger.error("value of task.foreach must be a string or non-empty list")
214                 sys.exit(1)
215             recursive_foreach(jobp, jobp["task.foreach"])
216             if not args.dry_run:
217                 if "task.vwd" in jobp:
218                     # Set output of the first task to the base vwd collection so it
219                     # will be merged with output fragments from the other tasks by
220                     # crunch.
221                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
222                 else:
223                     arvados.current_task().set_output(None)
224             sys.exit(0)
225     else:
226         # This is the only task so taskp/jobp are the same
227         taskp = jobp
228
229     if not args.dry_run:
230         if "task.vwd" in taskp:
231             # Populate output directory with symlinks to files in collection
232             vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
233
234         if "task.cwd" in taskp:
235             os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
236
237     cmd = expand_list(taskp, taskp["command"])
238
239     if not args.dry_run:
240         if "task.stdin" in taskp:
241             stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
242             stdinfile = open(stdinname, "rb")
243
244         if "task.stdout" in taskp:
245             stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
246             stdoutfile = open(stdoutname, "wb")
247
248     logger.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
249
250     if args.dry_run:
251         sys.exit(0)
252 except subst.SubstitutionError as e:
253     logger.error(str(e))
254     logger.error("task parameters were:")
255     logger.error(pprint.pformat(taskp))
256     sys.exit(1)
257 except Exception as e:
258     logger.exception("caught exception")
259     logger.error("task parameters were:")
260     logger.error(pprint.pformat(taskp))
261     sys.exit(1)
262
263 try:
264     sp = subprocess.Popen(cmd, shell=False, stdin=stdinfile, stdout=stdoutfile)
265     sig = SigHandler()
266
267     # forward signals to the process.
268     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
269     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
270     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
271
272     # wait for process to complete.
273     rcode = sp.wait()
274
275     if sig.sig is not None:
276         logger.critical("terminating on signal %s" % sig.sig)
277         sys.exit(2)
278     else:
279         logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
280
281 except Exception as e:
282     logger.exception("caught exception")
283
284 # restore default signal handlers.
285 signal.signal(signal.SIGINT, signal.SIG_DFL)
286 signal.signal(signal.SIGTERM, signal.SIG_DFL)
287 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
288
289 for l in links:
290     os.unlink(l)
291
292 logger.info("the following output files will be saved to keep:")
293
294 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
295
296 logger.info("start writing output to keep")
297
298 if "task.vwd" in taskp:
299     if "task.foreach" in jobp:
300         # This is a subtask, so don't merge with the original collection, that will happen at the end
301         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
302     else:
303         # Just a single task, so do merge with the original collection
304         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
305 else:
306     outcollection = robust_put.upload(outdir, logger)
307
308 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
309                                      body={
310                                          'output': outcollection,
311                                          'success': (rcode == 0),
312                                          'progress':1.0
313                                      }).execute()
314
315 sys.exit(rcode)