4042: list handling bugfix
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28
29 os.umask(0077)
30
31 t = arvados.current_task().tmpdir
32
33 api = arvados.api('v1')
34
35 os.chdir(arvados.current_task().tmpdir)
36 os.mkdir("tmpdir")
37 os.mkdir("output")
38
39 os.chdir("output")
40
41 outdir = os.getcwd()
42
43 taskp = None
44 jobp = arvados.current_job()['script_parameters']
45 if len(arvados.current_task()['parameters']) > 0:
46     taskp = arvados.current_task()['parameters']
47
48 links = []
49
50 def sub_tmpdir(v):
51     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
52
53 def sub_outdir(v):
54     return outdir
55
56 def sub_cores(v):
57      return str(multiprocessing.cpu_count())
58
59 def sub_jobid(v):
60      return os.environ['JOB_UUID']
61
62 def sub_taskid(v):
63      return os.environ['TASK_UUID']
64
65 def sub_jobsrc(v):
66      return os.environ['CRUNCH_SRC']
67
68 subst.default_subs["task.tmpdir"] = sub_tmpdir
69 subst.default_subs["task.outdir"] = sub_outdir
70 subst.default_subs["job.srcdir"] = sub_jobsrc
71 subst.default_subs["node.cores"] = sub_cores
72 subst.default_subs["job.uuid"] = sub_jobid
73 subst.default_subs["task.uuid"] = sub_taskid
74
75 class SigHandler(object):
76     def __init__(self):
77         self.sig = None
78
79     def send_signal(self, sp, signum):
80         sp.send_signal(signum)
81         self.sig = signum
82
83 def expand_item(p, c):
84     if isinstance(c, dict):
85         if "foreach" in c and "command" in c:
86             var = c["foreach"]
87             items = get_items(p, p[var])
88             r = []
89             for i in items:
90                 params = copy.copy(p)
91                 params[var] = i
92                 r.extend(expand_list(params, c["command"]))
93             return r
94     elif isinstance(c, list):
95         return expand_list(p, c)
96     elif isinstance(c, str) or isinstance(c, unicode):
97         return [subst.do_substitution(p, c)]
98
99     return []
100
101 def expand_list(p, l):
102     return [exp for arg in l for exp in expand_item(p, arg)]
103
104 def add_to_group(gr, match):
105     m = ('^_^').join(match.groups())
106     if m not in gr:
107         gr[m] = []
108     gr[m].append(match.group(0))
109
110 def get_items(p, value):
111     if isinstance(value, dict):
112         if "filter" in value and "regex" in value:
113             pattern = re.compile(value["regex"])
114             items = get_items(p, value["group"])
115             return [i for i in items if pattern.match(i)]
116
117         if "group" in value and "regex" in value:
118             pattern = re.compile(value["regex"])
119             items = get_items(p, value["group"])
120             groups = {}
121             for i in items:
122                 p = pattern.match(i)
123                 if p:
124                     add_to_group(groups, p)
125             print groups
126             return [groups[k] for k in groups]
127
128     if isinstance(value, list):
129         return expand_list(p, value)
130
131     fn = subst.do_substitution(p, value)
132     mode = os.stat(fn).st_mode
133     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
134     if mode is not None:
135         if stat.S_ISDIR(mode):
136             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
137         elif stat.S_ISREG(mode):
138             with open(fn) as f:
139                 items = [line.rstrip("\r\n") for line in f]
140         return items
141     else:
142         return None
143
144 stdoutname = None
145 stdoutfile = None
146 stdinname = None
147 stdinfile = None
148 rcode = 1
149
150 def recursive_foreach(params, fvars):
151     var = fvars[0]
152     fvars = fvars[1:]
153     items = get_items(params, params[var])
154     logger.info("parallelizing on %s with items %s" % (var, items))
155     if items is not None:
156         for i in items:
157             params = copy.copy(params)
158             params[var] = i
159             if len(fvars) > 0:
160                 recursive_foreach(params, fvars)
161             else:
162                 arvados.api().job_tasks().create(body={
163                     'job_uuid': arvados.current_job()['uuid'],
164                     'created_by_job_task_uuid': arvados.current_task()['uuid'],
165                     'sequence': 1,
166                     'parameters': params
167                     }
168                 ).execute()
169     else:
170         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
171         sys.exit(1)
172
173 try:
174     if "task.foreach" in jobp:
175         if arvados.current_task()['sequence'] == 0:
176             # This is the first task to start the other tasks and exit
177             fvars = jobp["task.foreach"]
178             if isinstance(fvars, basestring):
179                 fvars = [fvars]
180             if not isinstance(fvars, list) or len(fvars) == 0:
181                 logger.error("value of task.foreach must be a string or non-empty list")
182                 sys.exit(1)
183             recursive_foreach(jobp, jobp["task.foreach"])
184             if "task.vwd" in jobp:
185                 # Set output of the first task to the base vwd collection so it
186                 # will be merged with output fragments from the other tasks by
187                 # crunch.
188                 arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
189             else:
190                 arvados.current_task().set_output(None)
191             sys.exit(0)
192     else:
193         # This is the only task so taskp/jobp are the same
194         taskp = jobp
195
196     if "task.vwd" in taskp:
197         # Populate output directory with symlinks to files in collection
198         vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
199
200     if "task.cwd" in taskp:
201         os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
202
203     cmd = expand_list(taskp, taskp["command"])
204
205     if "task.stdin" in taskp:
206         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
207         stdinfile = open(stdinname, "rb")
208
209     if "task.stdout" in taskp:
210         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
211         stdoutfile = open(stdoutname, "wb")
212
213     logger.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
214 except subst.SubstitutionError as e:
215     logger.error(str(e))
216     logger.error("task parameters were:")
217     logger.error(pprint.pformat(taskp))
218     sys.exit(1)
219 except Exception as e:
220     logger.exception("caught exception")
221     logger.error("task parameters were:")
222     logger.error(pprint.pformat(taskp))
223     sys.exit(1)
224
225 try:
226     sp = subprocess.Popen(cmd, shell=False, stdin=stdinfile, stdout=stdoutfile)
227     sig = SigHandler()
228
229     # forward signals to the process.
230     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
231     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
232     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
233
234     # wait for process to complete.
235     rcode = sp.wait()
236
237     if sig.sig is not None:
238         logger.critical("terminating on signal %s" % sig.sig)
239         sys.exit(2)
240     else:
241         logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
242
243 except Exception as e:
244     logger.exception("caught exception")
245
246 # restore default signal handlers.
247 signal.signal(signal.SIGINT, signal.SIG_DFL)
248 signal.signal(signal.SIGTERM, signal.SIG_DFL)
249 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
250
251 for l in links:
252     os.unlink(l)
253
254 logger.info("the following output files will be saved to keep:")
255
256 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
257
258 logger.info("start writing output to keep")
259
260 if "task.vwd" in taskp:
261     if "task.foreach" in jobp:
262         # This is a subtask, so don't merge with the original collection, that will happen at the end
263         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
264     else:
265         # Just a single task, so do merge with the original collection
266         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
267 else:
268     outcollection = robust_put.upload(outdir, logger)
269
270 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
271                                      body={
272                                          'output': outcollection,
273                                          'success': (rcode == 0),
274                                          'progress':1.0
275                                      }).execute()
276
277 sys.exit(rcode)