4042: add "extract"
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28
29 os.umask(0077)
30
31 t = arvados.current_task().tmpdir
32
33 api = arvados.api('v1')
34
35 os.chdir(arvados.current_task().tmpdir)
36 os.mkdir("tmpdir")
37 os.mkdir("output")
38
39 os.chdir("output")
40
41 outdir = os.getcwd()
42
43 taskp = None
44 jobp = arvados.current_job()['script_parameters']
45 if len(arvados.current_task()['parameters']) > 0:
46     taskp = arvados.current_task()['parameters']
47
48 links = []
49
50 def sub_tmpdir(v):
51     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
52
53 def sub_outdir(v):
54     return outdir
55
56 def sub_cores(v):
57      return str(multiprocessing.cpu_count())
58
59 def sub_jobid(v):
60      return os.environ['JOB_UUID']
61
62 def sub_taskid(v):
63      return os.environ['TASK_UUID']
64
65 def sub_jobsrc(v):
66      return os.environ['CRUNCH_SRC']
67
68 subst.default_subs["task.tmpdir"] = sub_tmpdir
69 subst.default_subs["task.outdir"] = sub_outdir
70 subst.default_subs["job.srcdir"] = sub_jobsrc
71 subst.default_subs["node.cores"] = sub_cores
72 subst.default_subs["job.uuid"] = sub_jobid
73 subst.default_subs["task.uuid"] = sub_taskid
74
75 class SigHandler(object):
76     def __init__(self):
77         self.sig = None
78
79     def send_signal(self, sp, signum):
80         sp.send_signal(signum)
81         self.sig = signum
82
83 def expand_item(p, c):
84     if isinstance(c, dict):
85         if "foreach" in c and "command" in c:
86             var = c["foreach"]
87             items = get_items(p, p[var])
88             r = []
89             for i in items:
90                 params = copy.copy(p)
91                 params[var] = i
92                 r.extend(expand_list(params, c["command"]))
93             return r
94     elif isinstance(c, list):
95         return expand_list(p, c)
96     elif isinstance(c, str) or isinstance(c, unicode):
97         return [subst.do_substitution(p, c)]
98
99     return []
100
101 def expand_list(p, l):
102     return [exp for arg in l for exp in expand_item(p, arg)]
103
104 def add_to_group(gr, match):
105     m = ('^_^').join(match.groups())
106     if m not in gr:
107         gr[m] = []
108     gr[m].append(match.group(0))
109
110 def get_items(p, value):
111     if isinstance(value, dict):
112         if "filter" in value and "regex" in value:
113             pattern = re.compile(value["regex"])
114             items = get_items(p, value["group"])
115             return [i for i in items if pattern.match(i)]
116
117         if "group" in value and "regex" in value:
118             pattern = re.compile(value["regex"])
119             items = get_items(p, value["group"])
120             groups = {}
121             for i in items:
122                 p = pattern.match(i)
123                 if p:
124                     add_to_group(groups, p)
125             return [groups[k] for k in groups]
126
127         if "extract" in value and "regex" in value:
128             pattern = re.compile(value["regex"])
129             items = get_items(p, value["group"])
130             return [p.groups() for i in items if p = pattern.match(i)]
131
132     if isinstance(value, list):
133         return expand_list(p, value)
134
135     fn = subst.do_substitution(p, value)
136     mode = os.stat(fn).st_mode
137     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
138     if mode is not None:
139         if stat.S_ISDIR(mode):
140             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
141         elif stat.S_ISREG(mode):
142             with open(fn) as f:
143                 items = [line.rstrip("\r\n") for line in f]
144         return items
145     else:
146         return None
147
148 stdoutname = None
149 stdoutfile = None
150 stdinname = None
151 stdinfile = None
152 rcode = 1
153
154 def recursive_foreach(params, fvars):
155     var = fvars[0]
156     fvars = fvars[1:]
157     items = get_items(params, params[var])
158     logger.info("parallelizing on %s with items %s" % (var, items))
159     if items is not None:
160         for i in items:
161             params = copy.copy(params)
162             params[var] = i
163             if len(fvars) > 0:
164                 recursive_foreach(params, fvars)
165             else:
166                 arvados.api().job_tasks().create(body={
167                     'job_uuid': arvados.current_job()['uuid'],
168                     'created_by_job_task_uuid': arvados.current_task()['uuid'],
169                     'sequence': 1,
170                     'parameters': params
171                     }
172                 ).execute()
173     else:
174         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
175         sys.exit(1)
176
177 try:
178     if "task.foreach" in jobp:
179         if arvados.current_task()['sequence'] == 0:
180             # This is the first task to start the other tasks and exit
181             fvars = jobp["task.foreach"]
182             if isinstance(fvars, basestring):
183                 fvars = [fvars]
184             if not isinstance(fvars, list) or len(fvars) == 0:
185                 logger.error("value of task.foreach must be a string or non-empty list")
186                 sys.exit(1)
187             recursive_foreach(jobp, jobp["task.foreach"])
188             if "task.vwd" in jobp:
189                 # Set output of the first task to the base vwd collection so it
190                 # will be merged with output fragments from the other tasks by
191                 # crunch.
192                 arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
193             else:
194                 arvados.current_task().set_output(None)
195             sys.exit(0)
196     else:
197         # This is the only task so taskp/jobp are the same
198         taskp = jobp
199
200     if "task.vwd" in taskp:
201         # Populate output directory with symlinks to files in collection
202         vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
203
204     if "task.cwd" in taskp:
205         os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
206
207     cmd = expand_list(taskp, taskp["command"])
208
209     if "task.stdin" in taskp:
210         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
211         stdinfile = open(stdinname, "rb")
212
213     if "task.stdout" in taskp:
214         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
215         stdoutfile = open(stdoutname, "wb")
216
217     logger.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
218 except subst.SubstitutionError as e:
219     logger.error(str(e))
220     logger.error("task parameters were:")
221     logger.error(pprint.pformat(taskp))
222     sys.exit(1)
223 except Exception as e:
224     logger.exception("caught exception")
225     logger.error("task parameters were:")
226     logger.error(pprint.pformat(taskp))
227     sys.exit(1)
228
229 try:
230     sp = subprocess.Popen(cmd, shell=False, stdin=stdinfile, stdout=stdoutfile)
231     sig = SigHandler()
232
233     # forward signals to the process.
234     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
235     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
236     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
237
238     # wait for process to complete.
239     rcode = sp.wait()
240
241     if sig.sig is not None:
242         logger.critical("terminating on signal %s" % sig.sig)
243         sys.exit(2)
244     else:
245         logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
246
247 except Exception as e:
248     logger.exception("caught exception")
249
250 # restore default signal handlers.
251 signal.signal(signal.SIGINT, signal.SIG_DFL)
252 signal.signal(signal.SIGTERM, signal.SIG_DFL)
253 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
254
255 for l in links:
256     os.unlink(l)
257
258 logger.info("the following output files will be saved to keep:")
259
260 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
261
262 logger.info("start writing output to keep")
263
264 if "task.vwd" in taskp:
265     if "task.foreach" in jobp:
266         # This is a subtask, so don't merge with the original collection, that will happen at the end
267         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
268     else:
269         # Just a single task, so do merge with the original collection
270         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
271 else:
272     outcollection = robust_put.upload(outdir, logger)
273
274 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
275                                      body={
276                                          'output': outcollection,
277                                          'success': (rcode == 0),
278                                          'progress':1.0
279                                      }).execute()
280
281 sys.exit(rcode)