4042: fix bad syntax
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28
29 os.umask(0077)
30
31 t = arvados.current_task().tmpdir
32
33 api = arvados.api('v1')
34
35 os.chdir(arvados.current_task().tmpdir)
36 os.mkdir("tmpdir")
37 os.mkdir("output")
38
39 os.chdir("output")
40
41 outdir = os.getcwd()
42
43 taskp = None
44 jobp = arvados.current_job()['script_parameters']
45 if len(arvados.current_task()['parameters']) > 0:
46     taskp = arvados.current_task()['parameters']
47
48 links = []
49
50 def sub_tmpdir(v):
51     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
52
53 def sub_outdir(v):
54     return outdir
55
56 def sub_cores(v):
57      return str(multiprocessing.cpu_count())
58
59 def sub_jobid(v):
60      return os.environ['JOB_UUID']
61
62 def sub_taskid(v):
63      return os.environ['TASK_UUID']
64
65 def sub_jobsrc(v):
66      return os.environ['CRUNCH_SRC']
67
68 subst.default_subs["task.tmpdir"] = sub_tmpdir
69 subst.default_subs["task.outdir"] = sub_outdir
70 subst.default_subs["job.srcdir"] = sub_jobsrc
71 subst.default_subs["node.cores"] = sub_cores
72 subst.default_subs["job.uuid"] = sub_jobid
73 subst.default_subs["task.uuid"] = sub_taskid
74
75 class SigHandler(object):
76     def __init__(self):
77         self.sig = None
78
79     def send_signal(self, sp, signum):
80         sp.send_signal(signum)
81         self.sig = signum
82
83 def expand_item(p, c):
84     if isinstance(c, dict):
85         if "foreach" in c and "command" in c:
86             var = c["foreach"]
87             items = get_items(p, p[var])
88             r = []
89             for i in items:
90                 params = copy.copy(p)
91                 params[var] = i
92                 r.extend(expand_list(params, c["command"]))
93             return r
94         if "list" in c and "index" in c:
95             var = c["list"]
96             items = get_items(p, p[var])
97             return items[int(c["index"])]
98     elif isinstance(c, list):
99         return expand_list(p, c)
100     elif isinstance(c, str) or isinstance(c, unicode):
101         return [subst.do_substitution(p, c)]
102
103     return []
104
105 def expand_list(p, l):
106     return [exp for arg in l for exp in expand_item(p, arg)]
107
108 def add_to_group(gr, match):
109     m = ('^_^').join(match.groups())
110     if m not in gr:
111         gr[m] = []
112     gr[m].append(match.group(0))
113
114 def get_items(p, value):
115     if isinstance(value, dict):
116         if "filter" in value and "regex" in value:
117             pattern = re.compile(value["regex"])
118             items = get_items(p, value["group"])
119             return [i for i in items if pattern.match(i)]
120
121         if "group" in value and "regex" in value:
122             pattern = re.compile(value["regex"])
123             items = get_items(p, value["group"])
124             groups = {}
125             for i in items:
126                 p = pattern.match(i)
127                 if p:
128                     add_to_group(groups, p)
129             return [groups[k] for k in groups]
130
131         if "extract" in value and "regex" in value:
132             pattern = re.compile(value["regex"])
133             items = get_items(p, value["group"])
134             r = []
135             for i in items:
136                 p = pattern.match(i)
137                 if p:
138                     r.append(p.groups())
139             return r
140
141     if isinstance(value, list):
142         return expand_list(p, value)
143
144     fn = subst.do_substitution(p, value)
145     mode = os.stat(fn).st_mode
146     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
147     if mode is not None:
148         if stat.S_ISDIR(mode):
149             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
150         elif stat.S_ISREG(mode):
151             with open(fn) as f:
152                 items = [line.rstrip("\r\n") for line in f]
153         return items
154     else:
155         return None
156
157 stdoutname = None
158 stdoutfile = None
159 stdinname = None
160 stdinfile = None
161 rcode = 1
162
163 def recursive_foreach(params, fvars):
164     var = fvars[0]
165     fvars = fvars[1:]
166     items = get_items(params, params[var])
167     logger.info("parallelizing on %s with items %s" % (var, items))
168     if items is not None:
169         for i in items:
170             params = copy.copy(params)
171             params[var] = i
172             if len(fvars) > 0:
173                 recursive_foreach(params, fvars)
174             else:
175                 arvados.api().job_tasks().create(body={
176                     'job_uuid': arvados.current_job()['uuid'],
177                     'created_by_job_task_uuid': arvados.current_task()['uuid'],
178                     'sequence': 1,
179                     'parameters': params
180                     }
181                 ).execute()
182     else:
183         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
184         sys.exit(1)
185
186 try:
187     if "task.foreach" in jobp:
188         if arvados.current_task()['sequence'] == 0:
189             # This is the first task to start the other tasks and exit
190             fvars = jobp["task.foreach"]
191             if isinstance(fvars, basestring):
192                 fvars = [fvars]
193             if not isinstance(fvars, list) or len(fvars) == 0:
194                 logger.error("value of task.foreach must be a string or non-empty list")
195                 sys.exit(1)
196             recursive_foreach(jobp, jobp["task.foreach"])
197             if "task.vwd" in jobp:
198                 # Set output of the first task to the base vwd collection so it
199                 # will be merged with output fragments from the other tasks by
200                 # crunch.
201                 arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
202             else:
203                 arvados.current_task().set_output(None)
204             sys.exit(0)
205     else:
206         # This is the only task so taskp/jobp are the same
207         taskp = jobp
208
209     if "task.vwd" in taskp:
210         # Populate output directory with symlinks to files in collection
211         vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
212
213     if "task.cwd" in taskp:
214         os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
215
216     cmd = expand_list(taskp, taskp["command"])
217
218     if "task.stdin" in taskp:
219         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
220         stdinfile = open(stdinname, "rb")
221
222     if "task.stdout" in taskp:
223         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
224         stdoutfile = open(stdoutname, "wb")
225
226     logger.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
227 except subst.SubstitutionError as e:
228     logger.error(str(e))
229     logger.error("task parameters were:")
230     logger.error(pprint.pformat(taskp))
231     sys.exit(1)
232 except Exception as e:
233     logger.exception("caught exception")
234     logger.error("task parameters were:")
235     logger.error(pprint.pformat(taskp))
236     sys.exit(1)
237
238 try:
239     sp = subprocess.Popen(cmd, shell=False, stdin=stdinfile, stdout=stdoutfile)
240     sig = SigHandler()
241
242     # forward signals to the process.
243     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
244     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
245     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
246
247     # wait for process to complete.
248     rcode = sp.wait()
249
250     if sig.sig is not None:
251         logger.critical("terminating on signal %s" % sig.sig)
252         sys.exit(2)
253     else:
254         logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
255
256 except Exception as e:
257     logger.exception("caught exception")
258
259 # restore default signal handlers.
260 signal.signal(signal.SIGINT, signal.SIG_DFL)
261 signal.signal(signal.SIGTERM, signal.SIG_DFL)
262 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
263
264 for l in links:
265     os.unlink(l)
266
267 logger.info("the following output files will be saved to keep:")
268
269 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
270
271 logger.info("start writing output to keep")
272
273 if "task.vwd" in taskp:
274     if "task.foreach" in jobp:
275         # This is a subtask, so don't merge with the original collection, that will happen at the end
276         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
277     else:
278         # Just a single task, so do merge with the original collection
279         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
280 else:
281     outcollection = robust_put.upload(outdir, logger)
282
283 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
284                                      body={
285                                          'output': outcollection,
286                                          'success': (rcode == 0),
287                                          'progress':1.0
288                                      }).execute()
289
290 sys.exit(rcode)