3719: Raise more specific SubstitutionError instead of generic exception on
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4 logging.basicConfig(level=logging.INFO, format="run-command: %(message)s")
5
6 import arvados
7 import re
8 import os
9 import subprocess
10 import sys
11 import shutil
12 import crunchutil.subst as subst
13 import time
14 import arvados.commands.put as put
15 import signal
16 import stat
17 import copy
18 import traceback
19 import pprint
20 import multiprocessing
21 import crunchutil.robust_put as robust_put
22 import crunchutil.vwd as vwd
23
24 os.umask(0077)
25
26 t = arvados.current_task().tmpdir
27
28 api = arvados.api('v1')
29
30 os.chdir(arvados.current_task().tmpdir)
31 os.mkdir("tmpdir")
32 os.mkdir("output")
33
34 os.chdir("output")
35
36 outdir = os.getcwd()
37
38 taskp = None
39 jobp = arvados.current_job()['script_parameters']
40 if len(arvados.current_task()['parameters']) > 0:
41     taskp = arvados.current_task()['parameters']
42
43 links = []
44
45 def sub_tmpdir(v):
46     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
47
48 def sub_outdir(v):
49     return outdir
50
51 def sub_cores(v):
52      return str(multiprocessing.cpu_count())
53
54 def sub_jobid(v):
55      return os.environ['JOB_UUID']
56
57 def sub_taskid(v):
58      return os.environ['TASK_UUID']
59
60 def sub_jobsrc(v):
61      return os.environ['CRUNCH_SRC']
62
63 subst.default_subs["task.tmpdir"] = sub_tmpdir
64 subst.default_subs["task.outdir"] = sub_outdir
65 subst.default_subs["job.srcdir"] = sub_jobsrc
66 subst.default_subs["node.cores"] = sub_cores
67 subst.default_subs["job.uuid"] = sub_jobid
68 subst.default_subs["task.uuid"] = sub_taskid
69
70 class SigHandler(object):
71     def __init__(self):
72         self.sig = None
73
74     def send_signal(self, sp, signum):
75         sp.send_signal(signum)
76         self.sig = signum
77
78 def expand_item(p, c):
79     if isinstance(c, dict):
80         if "foreach" in c and "command" in c:
81             var = c["foreach"]
82             items = get_items(p, p[var])
83             r = []
84             for i in items:
85                 params = copy.copy(p)
86                 params[var] = i
87                 r.extend(expand_list(params, c["command"]))
88             return r
89     elif isinstance(c, list):
90         return expand_list(p, c)
91     elif isinstance(c, str) or isinstance(c, unicode):
92         return [subst.do_substitution(p, c)]
93
94     return []
95
96 def expand_list(p, l):
97     return [exp for arg in l for exp in expand_item(p, arg)]
98
99 def get_items(p, value):
100     if isinstance(value, list):
101         return expand_list(p, value)
102
103     fn = subst.do_substitution(p, value)
104     mode = os.stat(fn).st_mode
105     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
106     if mode is not None:
107         if stat.S_ISDIR(mode):
108             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
109         elif stat.S_ISREG(mode):
110             with open(fn) as f:
111                 items = [line for line in f]
112         return items
113     else:
114         return None
115
116 stdoutname = None
117 stdoutfile = None
118 stdinname = None
119 stdinfile = None
120 rcode = 1
121
122 try:
123     if "task.foreach" in jobp:
124         if arvados.current_task()['sequence'] == 0:
125             var = jobp["task.foreach"]
126             items = get_items(jobp, jobp[var])
127             logging.info("parallelizing on %s with items %s" % (var, items))
128             if items is not None:
129                 for i in items:
130                     params = copy.copy(jobp)
131                     params[var] = i
132                     arvados.api().job_tasks().create(body={
133                         'job_uuid': arvados.current_job()['uuid'],
134                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
135                         'sequence': 1,
136                         'parameters': params
137                         }
138                     ).execute()
139                 if "task.vwd" in jobp:
140                     # Base vwd collection will be merged with output fragments from
141                     # the other tasks by crunch.
142                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
143                 else:
144                     arvados.current_task().set_output(None)
145                 sys.exit(0)
146             else:
147                 sys.exit(1)
148     else:
149         taskp = jobp
150
151     if "task.vwd" in taskp:
152         # Populate output directory with symlinks to files in collection
153         vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
154
155     if "task.cwd" in taskp:
156         os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
157
158     cmd = expand_list(taskp, taskp["command"])
159
160     if "task.stdin" in taskp:
161         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
162         stdinfile = open(stdinname, "rb")
163
164     if "task.stdout" in taskp:
165         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
166         stdoutfile = open(stdoutname, "wb")
167
168     logging.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
169 except subst.SubstitutionError as e:
170     logging.error(str(e))
171     logging.error("task parameters was:")
172     logging.error(pprint.pformat(taskp))
173     sys.exit(1)
174 except Exception as e:
175     logging.exception("caught exception")
176     logging.error("task parameters was:")
177     logging.error(pprint.pformat(taskp))
178     sys.exit(1)
179
180 try:
181     sp = subprocess.Popen(cmd, shell=False, stdin=stdinfile, stdout=stdoutfile)
182     sig = SigHandler()
183
184     # forward signals to the process.
185     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
186     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
187     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
188
189     # wait for process to complete.
190     rcode = sp.wait()
191
192     if sig.sig is not None:
193         logging.critical("terminating on signal %s" % sig.sig)
194         sys.exit(2)
195     else:
196         logging.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
197
198 except Exception as e:
199     logging.exception("caught exception")
200
201 # restore default signal handlers.
202 signal.signal(signal.SIGINT, signal.SIG_DFL)
203 signal.signal(signal.SIGTERM, signal.SIG_DFL)
204 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
205
206 for l in links:
207     os.unlink(l)
208
209 logging.info("the following output files will be saved to keep:")
210
211 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
212
213 logging.info("start writing output to keep")
214
215 if "task.vwd" in taskp:
216     if "task.foreach" in jobp:
217         # This is a subtask, so don't merge with the original collection, that will happen at the end
218         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
219     else:
220         # Just a single task, so do merge with the original collection
221         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
222 else:
223     outcollection = robust_put.upload(outdir)
224
225 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
226                                      body={
227                                          'output': outcollection,
228                                          'success': (rcode == 0),
229                                          'progress':1.0
230                                      }).execute()
231
232 sys.exit(rcode)