4042: expand list fix
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28
29 os.umask(0077)
30
31 t = arvados.current_task().tmpdir
32
33 api = arvados.api('v1')
34
35 os.chdir(arvados.current_task().tmpdir)
36 os.mkdir("tmpdir")
37 os.mkdir("output")
38
39 os.chdir("output")
40
41 outdir = os.getcwd()
42
43 taskp = None
44 jobp = arvados.current_job()['script_parameters']
45 if len(arvados.current_task()['parameters']) > 0:
46     taskp = arvados.current_task()['parameters']
47
48 links = []
49
50 def sub_tmpdir(v):
51     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
52
53 def sub_outdir(v):
54     return outdir
55
56 def sub_cores(v):
57      return str(multiprocessing.cpu_count())
58
59 def sub_jobid(v):
60      return os.environ['JOB_UUID']
61
62 def sub_taskid(v):
63      return os.environ['TASK_UUID']
64
65 def sub_jobsrc(v):
66      return os.environ['CRUNCH_SRC']
67
68 subst.default_subs["task.tmpdir"] = sub_tmpdir
69 subst.default_subs["task.outdir"] = sub_outdir
70 subst.default_subs["job.srcdir"] = sub_jobsrc
71 subst.default_subs["node.cores"] = sub_cores
72 subst.default_subs["job.uuid"] = sub_jobid
73 subst.default_subs["task.uuid"] = sub_taskid
74
75 class SigHandler(object):
76     def __init__(self):
77         self.sig = None
78
79     def send_signal(self, sp, signum):
80         sp.send_signal(signum)
81         self.sig = signum
82
83 def expand_item(p, c):
84     if isinstance(c, dict):
85         if "foreach" in c and "command" in c:
86             var = c["foreach"]
87             items = get_items(p, p[var])
88             r = []
89             for i in items:
90                 params = copy.copy(p)
91                 params[var] = i
92                 r.extend(expand_list(params, c["command"]))
93             return r
94         if "list" in c and "index" in c and "command" in c:
95             var = c["list"]
96             items = get_items(p, p[var])
97             params = copy.copy(p)
98             params[var] = items[int(c["index"])]
99             return expand_list(params, c["command"])
100     elif isinstance(c, list):
101         return expand_list(p, c)
102     elif isinstance(c, str) or isinstance(c, unicode):
103         return [subst.do_substitution(p, c)]
104
105     return []
106
107 def expand_list(p, l):
108     if isinstance(l, basestring):
109         return [expand_item(p, l)]
110     else:
111         return [exp for arg in l for exp in expand_item(p, arg)]
112
113 def add_to_group(gr, match):
114     m = ('^_^').join(match.groups())
115     if m not in gr:
116         gr[m] = []
117     gr[m].append(match.group(0))
118
119 def get_items(p, value):
120     if isinstance(value, dict):
121         if "regex" in value:
122             pattern = re.compile(value["regex"])
123             if "filter" in value:
124                 items = get_items(p, value["filter"])
125                 return [i for i in items if pattern.match(i)]
126             elif "group" in value:
127                 items = get_items(p, value["group"])
128                 groups = {}
129                 for i in items:
130                     p = pattern.match(i)
131                     if p:
132                         add_to_group(groups, p)
133                 return [groups[k] for k in groups]
134             elif "extract" in value:
135                 items = get_items(p, value["extract"])
136                 r = []
137                 for i in items:
138                     p = pattern.match(i)
139                     if p:
140                         r.append(p.groups())
141                 return r
142
143     if isinstance(value, list):
144         return expand_list(p, value)
145
146     fn = subst.do_substitution(p, value)
147     mode = os.stat(fn).st_mode
148     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
149     if mode is not None:
150         if stat.S_ISDIR(mode):
151             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
152         elif stat.S_ISREG(mode):
153             with open(fn) as f:
154                 items = [line.rstrip("\r\n") for line in f]
155         return items
156     else:
157         return None
158
159 stdoutname = None
160 stdoutfile = None
161 stdinname = None
162 stdinfile = None
163 rcode = 1
164
165 def recursive_foreach(params, fvars):
166     var = fvars[0]
167     fvars = fvars[1:]
168     items = get_items(params, params[var])
169     logger.info("parallelizing on %s with items %s" % (var, items))
170     if items is not None:
171         for i in items:
172             params = copy.copy(params)
173             params[var] = i
174             if len(fvars) > 0:
175                 recursive_foreach(params, fvars)
176             else:
177                 arvados.api().job_tasks().create(body={
178                     'job_uuid': arvados.current_job()['uuid'],
179                     'created_by_job_task_uuid': arvados.current_task()['uuid'],
180                     'sequence': 1,
181                     'parameters': params
182                     }
183                 ).execute()
184     else:
185         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
186         sys.exit(1)
187
188 try:
189     if "task.foreach" in jobp:
190         if arvados.current_task()['sequence'] == 0:
191             # This is the first task to start the other tasks and exit
192             fvars = jobp["task.foreach"]
193             if isinstance(fvars, basestring):
194                 fvars = [fvars]
195             if not isinstance(fvars, list) or len(fvars) == 0:
196                 logger.error("value of task.foreach must be a string or non-empty list")
197                 sys.exit(1)
198             recursive_foreach(jobp, jobp["task.foreach"])
199             if "task.vwd" in jobp:
200                 # Set output of the first task to the base vwd collection so it
201                 # will be merged with output fragments from the other tasks by
202                 # crunch.
203                 arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
204             else:
205                 arvados.current_task().set_output(None)
206             sys.exit(0)
207     else:
208         # This is the only task so taskp/jobp are the same
209         taskp = jobp
210
211     if "task.vwd" in taskp:
212         # Populate output directory with symlinks to files in collection
213         vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
214
215     if "task.cwd" in taskp:
216         os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
217
218     cmd = expand_list(taskp, taskp["command"])
219
220     if "task.stdin" in taskp:
221         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
222         stdinfile = open(stdinname, "rb")
223
224     if "task.stdout" in taskp:
225         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
226         stdoutfile = open(stdoutname, "wb")
227
228     logger.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
229 except subst.SubstitutionError as e:
230     logger.error(str(e))
231     logger.error("task parameters were:")
232     logger.error(pprint.pformat(taskp))
233     sys.exit(1)
234 except Exception as e:
235     logger.exception("caught exception")
236     logger.error("task parameters were:")
237     logger.error(pprint.pformat(taskp))
238     sys.exit(1)
239
240 try:
241     sp = subprocess.Popen(cmd, shell=False, stdin=stdinfile, stdout=stdoutfile)
242     sig = SigHandler()
243
244     # forward signals to the process.
245     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
246     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
247     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
248
249     # wait for process to complete.
250     rcode = sp.wait()
251
252     if sig.sig is not None:
253         logger.critical("terminating on signal %s" % sig.sig)
254         sys.exit(2)
255     else:
256         logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
257
258 except Exception as e:
259     logger.exception("caught exception")
260
261 # restore default signal handlers.
262 signal.signal(signal.SIGINT, signal.SIG_DFL)
263 signal.signal(signal.SIGTERM, signal.SIG_DFL)
264 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
265
266 for l in links:
267     os.unlink(l)
268
269 logger.info("the following output files will be saved to keep:")
270
271 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
272
273 logger.info("start writing output to keep")
274
275 if "task.vwd" in taskp:
276     if "task.foreach" in jobp:
277         # This is a subtask, so don't merge with the original collection, that will happen at the end
278         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
279     else:
280         # Just a single task, so do merge with the original collection
281         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
282 else:
283     outcollection = robust_put.upload(outdir, logger)
284
285 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
286                                      body={
287                                          'output': outcollection,
288                                          'success': (rcode == 0),
289                                          'progress':1.0
290                                      }).execute()
291
292 sys.exit(rcode)