4042: Add list index
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28
29 os.umask(0077)
30
31 t = arvados.current_task().tmpdir
32
33 api = arvados.api('v1')
34
35 os.chdir(arvados.current_task().tmpdir)
36 os.mkdir("tmpdir")
37 os.mkdir("output")
38
39 os.chdir("output")
40
41 outdir = os.getcwd()
42
43 taskp = None
44 jobp = arvados.current_job()['script_parameters']
45 if len(arvados.current_task()['parameters']) > 0:
46     taskp = arvados.current_task()['parameters']
47
48 links = []
49
50 def sub_tmpdir(v):
51     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
52
53 def sub_outdir(v):
54     return outdir
55
56 def sub_cores(v):
57      return str(multiprocessing.cpu_count())
58
59 def sub_jobid(v):
60      return os.environ['JOB_UUID']
61
62 def sub_taskid(v):
63      return os.environ['TASK_UUID']
64
65 def sub_jobsrc(v):
66      return os.environ['CRUNCH_SRC']
67
68 subst.default_subs["task.tmpdir"] = sub_tmpdir
69 subst.default_subs["task.outdir"] = sub_outdir
70 subst.default_subs["job.srcdir"] = sub_jobsrc
71 subst.default_subs["node.cores"] = sub_cores
72 subst.default_subs["job.uuid"] = sub_jobid
73 subst.default_subs["task.uuid"] = sub_taskid
74
75 class SigHandler(object):
76     def __init__(self):
77         self.sig = None
78
79     def send_signal(self, sp, signum):
80         sp.send_signal(signum)
81         self.sig = signum
82
83 def expand_item(p, c):
84     if isinstance(c, dict):
85         if "foreach" in c and "command" in c:
86             var = c["foreach"]
87             items = get_items(p, p[var])
88             r = []
89             for i in items:
90                 params = copy.copy(p)
91                 params[var] = i
92                 r.extend(expand_list(params, c["command"]))
93             return r
94         if "list" in c and "index" in c:
95             var = c["list"]
96             items = get_items(p, p[var])
97             return items[int(c["index"])]
98     elif isinstance(c, list):
99         return expand_list(p, c)
100     elif isinstance(c, str) or isinstance(c, unicode):
101         return [subst.do_substitution(p, c)]
102
103     return []
104
105 def expand_list(p, l):
106     return [exp for arg in l for exp in expand_item(p, arg)]
107
108 def add_to_group(gr, match):
109     m = ('^_^').join(match.groups())
110     if m not in gr:
111         gr[m] = []
112     gr[m].append(match.group(0))
113
114 def get_items(p, value):
115     if isinstance(value, dict):
116         if "filter" in value and "regex" in value:
117             pattern = re.compile(value["regex"])
118             items = get_items(p, value["group"])
119             return [i for i in items if pattern.match(i)]
120
121         if "group" in value and "regex" in value:
122             pattern = re.compile(value["regex"])
123             items = get_items(p, value["group"])
124             groups = {}
125             for i in items:
126                 p = pattern.match(i)
127                 if p:
128                     add_to_group(groups, p)
129             return [groups[k] for k in groups]
130
131         if "extract" in value and "regex" in value:
132             pattern = re.compile(value["regex"])
133             items = get_items(p, value["group"])
134             return [p.groups() for i in items if p = pattern.match(i)]
135
136     if isinstance(value, list):
137         return expand_list(p, value)
138
139     fn = subst.do_substitution(p, value)
140     mode = os.stat(fn).st_mode
141     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
142     if mode is not None:
143         if stat.S_ISDIR(mode):
144             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
145         elif stat.S_ISREG(mode):
146             with open(fn) as f:
147                 items = [line.rstrip("\r\n") for line in f]
148         return items
149     else:
150         return None
151
152 stdoutname = None
153 stdoutfile = None
154 stdinname = None
155 stdinfile = None
156 rcode = 1
157
158 def recursive_foreach(params, fvars):
159     var = fvars[0]
160     fvars = fvars[1:]
161     items = get_items(params, params[var])
162     logger.info("parallelizing on %s with items %s" % (var, items))
163     if items is not None:
164         for i in items:
165             params = copy.copy(params)
166             params[var] = i
167             if len(fvars) > 0:
168                 recursive_foreach(params, fvars)
169             else:
170                 arvados.api().job_tasks().create(body={
171                     'job_uuid': arvados.current_job()['uuid'],
172                     'created_by_job_task_uuid': arvados.current_task()['uuid'],
173                     'sequence': 1,
174                     'parameters': params
175                     }
176                 ).execute()
177     else:
178         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
179         sys.exit(1)
180
181 try:
182     if "task.foreach" in jobp:
183         if arvados.current_task()['sequence'] == 0:
184             # This is the first task to start the other tasks and exit
185             fvars = jobp["task.foreach"]
186             if isinstance(fvars, basestring):
187                 fvars = [fvars]
188             if not isinstance(fvars, list) or len(fvars) == 0:
189                 logger.error("value of task.foreach must be a string or non-empty list")
190                 sys.exit(1)
191             recursive_foreach(jobp, jobp["task.foreach"])
192             if "task.vwd" in jobp:
193                 # Set output of the first task to the base vwd collection so it
194                 # will be merged with output fragments from the other tasks by
195                 # crunch.
196                 arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
197             else:
198                 arvados.current_task().set_output(None)
199             sys.exit(0)
200     else:
201         # This is the only task so taskp/jobp are the same
202         taskp = jobp
203
204     if "task.vwd" in taskp:
205         # Populate output directory with symlinks to files in collection
206         vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
207
208     if "task.cwd" in taskp:
209         os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
210
211     cmd = expand_list(taskp, taskp["command"])
212
213     if "task.stdin" in taskp:
214         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
215         stdinfile = open(stdinname, "rb")
216
217     if "task.stdout" in taskp:
218         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
219         stdoutfile = open(stdoutname, "wb")
220
221     logger.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
222 except subst.SubstitutionError as e:
223     logger.error(str(e))
224     logger.error("task parameters were:")
225     logger.error(pprint.pformat(taskp))
226     sys.exit(1)
227 except Exception as e:
228     logger.exception("caught exception")
229     logger.error("task parameters were:")
230     logger.error(pprint.pformat(taskp))
231     sys.exit(1)
232
233 try:
234     sp = subprocess.Popen(cmd, shell=False, stdin=stdinfile, stdout=stdoutfile)
235     sig = SigHandler()
236
237     # forward signals to the process.
238     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
239     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
240     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
241
242     # wait for process to complete.
243     rcode = sp.wait()
244
245     if sig.sig is not None:
246         logger.critical("terminating on signal %s" % sig.sig)
247         sys.exit(2)
248     else:
249         logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
250
251 except Exception as e:
252     logger.exception("caught exception")
253
254 # restore default signal handlers.
255 signal.signal(signal.SIGINT, signal.SIG_DFL)
256 signal.signal(signal.SIGTERM, signal.SIG_DFL)
257 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
258
259 for l in links:
260     os.unlink(l)
261
262 logger.info("the following output files will be saved to keep:")
263
264 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
265
266 logger.info("start writing output to keep")
267
268 if "task.vwd" in taskp:
269     if "task.foreach" in jobp:
270         # This is a subtask, so don't merge with the original collection, that will happen at the end
271         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
272     else:
273         # Just a single task, so do merge with the original collection
274         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
275 else:
276     outcollection = robust_put.upload(outdir, logger)
277
278 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
279                                      body={
280                                          'output': outcollection,
281                                          'success': (rcode == 0),
282                                          'progress':1.0
283                                      }).execute()
284
285 sys.exit(rcode)