Merge branch 'master' into 3699-arv-copy
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28
29 os.umask(0077)
30
31 t = arvados.current_task().tmpdir
32
33 api = arvados.api('v1')
34
35 os.chdir(arvados.current_task().tmpdir)
36 os.mkdir("tmpdir")
37 os.mkdir("output")
38
39 os.chdir("output")
40
41 outdir = os.getcwd()
42
43 taskp = None
44 jobp = arvados.current_job()['script_parameters']
45 if len(arvados.current_task()['parameters']) > 0:
46     taskp = arvados.current_task()['parameters']
47
48 links = []
49
50 def sub_tmpdir(v):
51     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
52
53 def sub_outdir(v):
54     return outdir
55
56 def sub_cores(v):
57      return str(multiprocessing.cpu_count())
58
59 def sub_jobid(v):
60      return os.environ['JOB_UUID']
61
62 def sub_taskid(v):
63      return os.environ['TASK_UUID']
64
65 def sub_jobsrc(v):
66      return os.environ['CRUNCH_SRC']
67
68 subst.default_subs["task.tmpdir"] = sub_tmpdir
69 subst.default_subs["task.outdir"] = sub_outdir
70 subst.default_subs["job.srcdir"] = sub_jobsrc
71 subst.default_subs["node.cores"] = sub_cores
72 subst.default_subs["job.uuid"] = sub_jobid
73 subst.default_subs["task.uuid"] = sub_taskid
74
75 class SigHandler(object):
76     def __init__(self):
77         self.sig = None
78
79     def send_signal(self, sp, signum):
80         sp.send_signal(signum)
81         self.sig = signum
82
83 def expand_item(p, c):
84     if isinstance(c, dict):
85         if "foreach" in c and "command" in c:
86             var = c["foreach"]
87             items = get_items(p, p[var])
88             r = []
89             for i in items:
90                 params = copy.copy(p)
91                 params[var] = i
92                 r.extend(expand_list(params, c["command"]))
93             return r
94     elif isinstance(c, list):
95         return expand_list(p, c)
96     elif isinstance(c, str) or isinstance(c, unicode):
97         return [subst.do_substitution(p, c)]
98
99     return []
100
101 def expand_list(p, l):
102     return [exp for arg in l for exp in expand_item(p, arg)]
103
104 def get_items(p, value):
105     if isinstance(value, list):
106         return expand_list(p, value)
107
108     fn = subst.do_substitution(p, value)
109     mode = os.stat(fn).st_mode
110     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
111     if mode is not None:
112         if stat.S_ISDIR(mode):
113             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
114         elif stat.S_ISREG(mode):
115             with open(fn) as f:
116                 items = [line for line in f]
117         return items
118     else:
119         return None
120
121 stdoutname = None
122 stdoutfile = None
123 stdinname = None
124 stdinfile = None
125 rcode = 1
126
127 try:
128     if "task.foreach" in jobp:
129         if arvados.current_task()['sequence'] == 0:
130             var = jobp["task.foreach"]
131             items = get_items(jobp, jobp[var])
132             logger.info("parallelizing on %s with items %s" % (var, items))
133             if items is not None:
134                 for i in items:
135                     params = copy.copy(jobp)
136                     params[var] = i
137                     arvados.api().job_tasks().create(body={
138                         'job_uuid': arvados.current_job()['uuid'],
139                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
140                         'sequence': 1,
141                         'parameters': params
142                         }
143                     ).execute()
144                 if "task.vwd" in jobp:
145                     # Base vwd collection will be merged with output fragments from
146                     # the other tasks by crunch.
147                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
148                 else:
149                     arvados.current_task().set_output(None)
150                 sys.exit(0)
151             else:
152                 sys.exit(1)
153     else:
154         taskp = jobp
155
156     if "task.vwd" in taskp:
157         # Populate output directory with symlinks to files in collection
158         vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
159
160     if "task.cwd" in taskp:
161         os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
162
163     cmd = expand_list(taskp, taskp["command"])
164
165     if "task.stdin" in taskp:
166         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
167         stdinfile = open(stdinname, "rb")
168
169     if "task.stdout" in taskp:
170         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
171         stdoutfile = open(stdoutname, "wb")
172
173     logger.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
174 except subst.SubstitutionError as e:
175     logger.error(str(e))
176     logger.error("task parameters were:")
177     logger.error(pprint.pformat(taskp))
178     sys.exit(1)
179 except Exception as e:
180     logger.exception("caught exception")
181     logger.error("task parameters were:")
182     logger.error(pprint.pformat(taskp))
183     sys.exit(1)
184
185 try:
186     sp = subprocess.Popen(cmd, shell=False, stdin=stdinfile, stdout=stdoutfile)
187     sig = SigHandler()
188
189     # forward signals to the process.
190     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
191     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
192     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
193
194     # wait for process to complete.
195     rcode = sp.wait()
196
197     if sig.sig is not None:
198         logger.critical("terminating on signal %s" % sig.sig)
199         sys.exit(2)
200     else:
201         logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
202
203 except Exception as e:
204     logger.exception("caught exception")
205
206 # restore default signal handlers.
207 signal.signal(signal.SIGINT, signal.SIG_DFL)
208 signal.signal(signal.SIGTERM, signal.SIG_DFL)
209 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
210
211 for l in links:
212     os.unlink(l)
213
214 logger.info("the following output files will be saved to keep:")
215
216 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
217
218 logger.info("start writing output to keep")
219
220 if "task.vwd" in taskp:
221     if "task.foreach" in jobp:
222         # This is a subtask, so don't merge with the original collection, that will happen at the end
223         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
224     else:
225         # Just a single task, so do merge with the original collection
226         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
227 else:
228     outcollection = robust_put.upload(outdir, logger)
229
230 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
231                                      body={
232                                          'output': outcollection,
233                                          'success': (rcode == 0),
234                                          'progress':1.0
235                                      }).execute()
236
237 sys.exit(rcode)