3505: Move helper scripts into crunchutil module. In run-command, added
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import arvados
4 import re
5 import os
6 import subprocess
7 import sys
8 import shutil
9 import crunchutil.subst as subst
10 import time
11 import arvados.commands.put as put
12 import signal
13 import stat
14 import copy
15 import traceback
16 import pprint
17 import multiprocessing
18 import logging
19 import crunchutil.robust_put as robust_put
20 import crunchutil.vwd as vwd
21
22 os.umask(0077)
23 logging.basicConfig(format="run-command: %(message)s")
24
25 t = arvados.current_task().tmpdir
26
27 api = arvados.api('v1')
28
29 os.chdir(arvados.current_task().tmpdir)
30 os.mkdir("tmpdir")
31 os.mkdir("output")
32
33 os.chdir("output")
34
35 outdir = os.getcwd()
36
37 taskp = None
38 jobp = arvados.current_job()['script_parameters']
39 if len(arvados.current_task()['parameters']) > 0:
40     taskp = arvados.current_task()['parameters']
41
42 links = []
43
44 def sub_tmpdir(v):
45     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
46
47 def sub_outdir(v):
48     return outdir
49
50 def sub_cores(v):
51      return str(multiprocessing.cpu_count())
52
53 def sub_jobid(v):
54      return os.environ['JOB_UUID']
55
56 def sub_taskid(v):
57      return os.environ['TASK_UUID']
58
59 def sub_jobsrc(v):
60      return os.environ['CRUNCH_SRC']
61
62 subst.default_subs["task.tmpdir"] = sub_tmpdir
63 subst.default_subs["task.outdir"] = sub_outdir
64 subst.default_subs["job.srcdir"] = sub_jobsrc
65 subst.default_subs["node.cores"] = sub_cores
66 subst.default_subs["job.uuid"] = sub_jobid
67 subst.default_subs["task.uuid"] = sub_taskid
68
69 class SigHandler(object):
70     def __init__(self):
71         self.sig = None
72
73     def send_signal(self, sp, signum):
74         sp.send_signal(signum)
75         self.sig = signum
76
77 def expand_item(p, c):
78     if isinstance(c, dict):
79         if "foreach" in c and "command" in c:
80             var = c["foreach"]
81             items = get_items(p, p[var])
82             r = []
83             for i in items:
84                 params = copy.copy(p)
85                 params[var] = i
86                 r.extend(expand_list(params, c["command"]))
87             return r
88     elif isinstance(c, list):
89         return expand_list(p, c)
90     elif isinstance(c, str) or isinstance(c, unicode):
91         return [subst.do_substitution(p, c)]
92
93     return []
94
95 def expand_list(p, l):
96     return [exp for arg in l for exp in expand_item(p, arg)]
97
98 def get_items(p, value):
99     if isinstance(value, list):
100         return expand_list(p, value)
101
102     fn = subst.do_substitution(p, value)
103     mode = os.stat(fn).st_mode
104     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
105     if mode is not None:
106         if stat.S_ISDIR(mode):
107             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
108         elif stat.S_ISREG(mode):
109             with open(fn) as f:
110                 items = [line for line in f]
111         return items
112     else:
113         return None
114
115 stdoutname = None
116 stdoutfile = None
117 stdinname = None
118 stdinfile = None
119 rcode = 1
120
121 try:
122     if "task.foreach" in jobp:
123         if arvados.current_task()['sequence'] == 0:
124             var = jobp["task.foreach"]
125             items = get_items(jobp, jobp[var])
126             logging.info("parallelizing on %s with items %s" % (var, items))
127             if items is not None:
128                 for i in items:
129                     params = copy.copy(jobp)
130                     params[var] = i
131                     arvados.api().job_tasks().create(body={
132                         'job_uuid': arvados.current_job()['uuid'],
133                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
134                         'sequence': 1,
135                         'parameters': params
136                         }
137                     ).execute()
138                 if "task.vwd" in jobp:
139                     # Base vwd collection will be merged with output fragments from
140                     # the other tasks by crunch.
141                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
142                 else:
143                     arvados.current_task().set_output(None)
144                 sys.exit(0)
145             else:
146                 sys.exit(1)
147     else:
148         taskp = jobp
149
150     if "task.vwd" in taskp:
151         # Populate output directory with symlinks to files in collection
152         vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
153
154     if "task.cwd" in taskp:
155         os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
156
157     cmd = expand_list(taskp, taskp["command"])
158
159     if "task.stdin" in taskp:
160         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
161         stdinfile = open(stdinname, "rb")
162
163     if "task.stdout" in taskp:
164         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
165         stdoutfile = open(stdoutname, "wb")
166
167     logging.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else ""), (" > " + stdoutname) if stdoutname is not None else ""))
168
169 except Exception as e:
170     logging.exception("caught exception")
171     logging.error("task parameters was:")
172     logging.error(pprint.pformat(taskp))
173     sys.exit(1)
174
175 try:
176     sp = subprocess.Popen(cmd, shell=False, stdin=stdinfile, stdout=stdoutfile)
177     sig = SigHandler()
178
179     # forward signals to the process.
180     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
181     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
182     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
183
184     # wait for process to complete.
185     rcode = sp.wait()
186
187     if sig.sig is not None:
188         logging.critical("terminating on signal %s" % sig.sig)
189         sys.exit(2)
190     else:
191         logging.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
192
193 except Exception as e:
194     logging.exception("caught exception")
195
196 # restore default signal handlers.
197 signal.signal(signal.SIGINT, signal.SIG_DFL)
198 signal.signal(signal.SIGTERM, signal.SIG_DFL)
199 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
200
201 for l in links:
202     os.unlink(l)
203
204 logging.info("the following output files will be saved to keep:")
205
206 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
207
208 logging.info("start writing output to keep")
209
210 if "task.vwd" in taskp:
211     if "task.foreach" in jobp:
212         # This is a subtask, so don't merge with the original collection, that will happen at the end
213         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
214     else:
215         # Just a single task, so do merge with the original collection
216         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
217 else:
218     outcollection = robust_put.upload(outdir)
219
220 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
221                                      body={
222                                          'output': outcollection,
223                                          'success': (rcode == 0),
224                                          'progress':1.0
225                                      }).execute()
226
227 sys.exit(rcode)