4042: Add support for filter and group of lists
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28
29 os.umask(0077)
30
31 t = arvados.current_task().tmpdir
32
33 api = arvados.api('v1')
34
35 os.chdir(arvados.current_task().tmpdir)
36 os.mkdir("tmpdir")
37 os.mkdir("output")
38
39 os.chdir("output")
40
41 outdir = os.getcwd()
42
43 taskp = None
44 jobp = arvados.current_job()['script_parameters']
45 if len(arvados.current_task()['parameters']) > 0:
46     taskp = arvados.current_task()['parameters']
47
48 links = []
49
50 def sub_tmpdir(v):
51     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
52
53 def sub_outdir(v):
54     return outdir
55
56 def sub_cores(v):
57      return str(multiprocessing.cpu_count())
58
59 def sub_jobid(v):
60      return os.environ['JOB_UUID']
61
62 def sub_taskid(v):
63      return os.environ['TASK_UUID']
64
65 def sub_jobsrc(v):
66      return os.environ['CRUNCH_SRC']
67
68 subst.default_subs["task.tmpdir"] = sub_tmpdir
69 subst.default_subs["task.outdir"] = sub_outdir
70 subst.default_subs["job.srcdir"] = sub_jobsrc
71 subst.default_subs["node.cores"] = sub_cores
72 subst.default_subs["job.uuid"] = sub_jobid
73 subst.default_subs["task.uuid"] = sub_taskid
74
75 class SigHandler(object):
76     def __init__(self):
77         self.sig = None
78
79     def send_signal(self, sp, signum):
80         sp.send_signal(signum)
81         self.sig = signum
82
83 def expand_item(p, c):
84     if isinstance(c, dict):
85         if "foreach" in c and "command" in c:
86             var = c["foreach"]
87             items = get_items(p, p[var])
88             r = []
89             for i in items:
90                 params = copy.copy(p)
91                 params[var] = i
92                 r.extend(expand_list(params, c["command"]))
93             return r
94     elif isinstance(c, list):
95         return expand_list(p, c)
96     elif isinstance(c, str) or isinstance(c, unicode):
97         return [subst.do_substitution(p, c)]
98
99     return []
100
101 def expand_list(p, l):
102     return [exp for arg in l for exp in expand_item(p, arg)]
103
104 def add_to_group(gr, match):
105     m = ('^_^').join(match.groups()[1:])
106     if m not in gr:
107         gr[m] = []
108     gr[m].extend(match.group(0))
109
110 def get_items(p, value):
111     if isinstance(value, dict):
112         if "filter" in value and "regex" in value:
113             pattern = re.compile(value["regex"])
114             items = get_items(p, value["group"])
115             return [i for i in items if pattern.match(i)]
116
117         if "group" in value and "regex" in value:
118             pattern = re.compile(value["regex"])
119             items = get_items(p, value["group"])
120             groups = {}
121             for i in items:
122                 p = pattern.match(i)
123                 if p:
124                     add_to_group(groups, p)
125             return [r[k] for k in r]
126     if isinstance(value, list):
127         return expand_list(p, value)
128
129     fn = subst.do_substitution(p, value)
130     mode = os.stat(fn).st_mode
131     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
132     if mode is not None:
133         if stat.S_ISDIR(mode):
134             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
135         elif stat.S_ISREG(mode):
136             with open(fn) as f:
137                 items = [line.rstrip("\r\n") for line in f]
138         return items
139     else:
140         return None
141
142 stdoutname = None
143 stdoutfile = None
144 stdinname = None
145 stdinfile = None
146 rcode = 1
147
148 def recursive_foreach(params, fvars):
149     var = fvars[0]
150     fvars = fvars[1:]
151     items = get_items(params, params[var])
152     logger.info("parallelizing on %s with items %s" % (var, items))
153     if items is not None:
154         for i in items:
155             params = copy.copy(params)
156             params[var] = i
157             if len(fvars) > 0:
158                 recursive_foreach(params, fvars)
159             else:
160                 arvados.api().job_tasks().create(body={
161                     'job_uuid': arvados.current_job()['uuid'],
162                     'created_by_job_task_uuid': arvados.current_task()['uuid'],
163                     'sequence': 1,
164                     'parameters': params
165                     }
166                 ).execute()
167     else:
168         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
169         sys.exit(1)
170
171 try:
172     if "task.foreach" in jobp:
173         if arvados.current_task()['sequence'] == 0:
174             # This is the first task to start the other tasks and exit
175             fvars = jobp["task.foreach"]
176             if isinstance(fvars, basestring):
177                 fvars = [fvars]
178             if not isinstance(fvars, list) or len(fvars) == 0:
179                 logger.error("value of task.foreach must be a string or non-empty list")
180                 sys.exit(1)
181             recursive_foreach(jobp, jobp["task.foreach"])
182             if "task.vwd" in jobp:
183                 # Set output of the first task to the base vwd collection so it
184                 # will be merged with output fragments from the other tasks by
185                 # crunch.
186                 arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
187             else:
188                 arvados.current_task().set_output(None)
189             sys.exit(0)
190     else:
191         # This is the only task so taskp/jobp are the same
192         taskp = jobp
193
194     if "task.vwd" in taskp:
195         # Populate output directory with symlinks to files in collection
196         vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
197
198     if "task.cwd" in taskp:
199         os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
200
201     cmd = expand_list(taskp, taskp["command"])
202
203     if "task.stdin" in taskp:
204         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
205         stdinfile = open(stdinname, "rb")
206
207     if "task.stdout" in taskp:
208         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
209         stdoutfile = open(stdoutname, "wb")
210
211     logger.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
212 except subst.SubstitutionError as e:
213     logger.error(str(e))
214     logger.error("task parameters were:")
215     logger.error(pprint.pformat(taskp))
216     sys.exit(1)
217 except Exception as e:
218     logger.exception("caught exception")
219     logger.error("task parameters were:")
220     logger.error(pprint.pformat(taskp))
221     sys.exit(1)
222
223 try:
224     sp = subprocess.Popen(cmd, shell=False, stdin=stdinfile, stdout=stdoutfile)
225     sig = SigHandler()
226
227     # forward signals to the process.
228     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
229     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
230     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
231
232     # wait for process to complete.
233     rcode = sp.wait()
234
235     if sig.sig is not None:
236         logger.critical("terminating on signal %s" % sig.sig)
237         sys.exit(2)
238     else:
239         logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
240
241 except Exception as e:
242     logger.exception("caught exception")
243
244 # restore default signal handlers.
245 signal.signal(signal.SIGINT, signal.SIG_DFL)
246 signal.signal(signal.SIGTERM, signal.SIG_DFL)
247 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
248
249 for l in links:
250     os.unlink(l)
251
252 logger.info("the following output files will be saved to keep:")
253
254 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
255
256 logger.info("start writing output to keep")
257
258 if "task.vwd" in taskp:
259     if "task.foreach" in jobp:
260         # This is a subtask, so don't merge with the original collection, that will happen at the end
261         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
262     else:
263         # Just a single task, so do merge with the original collection
264         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
265 else:
266     outcollection = robust_put.upload(outdir, logger)
267
268 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
269                                      body={
270                                          'output': outcollection,
271                                          'success': (rcode == 0),
272                                          'progress':1.0
273                                      }).execute()
274
275 sys.exit(rcode)