Merge branch '2800-python-global-state' into 2800-pgs
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4 logging.basicConfig(level=logging.INFO, format="run-command: %(message)s")
5
6 import arvados
7 import re
8 import os
9 import subprocess
10 import sys
11 import shutil
12 import crunchutil.subst as subst
13 import time
14 import arvados.commands.put as put
15 import signal
16 import stat
17 import copy
18 import traceback
19 import pprint
20 import multiprocessing
21 import crunchutil.robust_put as robust_put
22 import crunchutil.vwd as vwd
23
24 os.umask(0077)
25
26 t = arvados.current_task().tmpdir
27
28 api = arvados.api('v1')
29
30 os.chdir(arvados.current_task().tmpdir)
31 os.mkdir("tmpdir")
32 os.mkdir("output")
33
34 os.chdir("output")
35
36 outdir = os.getcwd()
37
38 taskp = None
39 jobp = arvados.current_job()['script_parameters']
40 if len(arvados.current_task()['parameters']) > 0:
41     taskp = arvados.current_task()['parameters']
42
43 links = []
44
45 def sub_tmpdir(v):
46     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
47
48 def sub_outdir(v):
49     return outdir
50
51 def sub_cores(v):
52      return str(multiprocessing.cpu_count())
53
54 def sub_jobid(v):
55      return os.environ['JOB_UUID']
56
57 def sub_taskid(v):
58      return os.environ['TASK_UUID']
59
60 def sub_jobsrc(v):
61      return os.environ['CRUNCH_SRC']
62
63 subst.default_subs["task.tmpdir"] = sub_tmpdir
64 subst.default_subs["task.outdir"] = sub_outdir
65 subst.default_subs["job.srcdir"] = sub_jobsrc
66 subst.default_subs["node.cores"] = sub_cores
67 subst.default_subs["job.uuid"] = sub_jobid
68 subst.default_subs["task.uuid"] = sub_taskid
69
70 class SigHandler(object):
71     def __init__(self):
72         self.sig = None
73
74     def send_signal(self, sp, signum):
75         sp.send_signal(signum)
76         self.sig = signum
77
78 def expand_item(p, c):
79     if isinstance(c, dict):
80         if "foreach" in c and "command" in c:
81             var = c["foreach"]
82             items = get_items(p, p[var])
83             r = []
84             for i in items:
85                 params = copy.copy(p)
86                 params[var] = i
87                 r.extend(expand_list(params, c["command"]))
88             return r
89     elif isinstance(c, list):
90         return expand_list(p, c)
91     elif isinstance(c, str) or isinstance(c, unicode):
92         return [subst.do_substitution(p, c)]
93
94     return []
95
96 def expand_list(p, l):
97     return [exp for arg in l for exp in expand_item(p, arg)]
98
99 def get_items(p, value):
100     if isinstance(value, list):
101         return expand_list(p, value)
102
103     fn = subst.do_substitution(p, value)
104     mode = os.stat(fn).st_mode
105     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
106     if mode is not None:
107         if stat.S_ISDIR(mode):
108             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
109         elif stat.S_ISREG(mode):
110             with open(fn) as f:
111                 items = [line for line in f]
112         return items
113     else:
114         return None
115
116 stdoutname = None
117 stdoutfile = None
118 stdinname = None
119 stdinfile = None
120 rcode = 1
121
122 try:
123     if "task.foreach" in jobp:
124         if arvados.current_task()['sequence'] == 0:
125             var = jobp["task.foreach"]
126             items = get_items(jobp, jobp[var])
127             logging.info("parallelizing on %s with items %s" % (var, items))
128             if items is not None:
129                 for i in items:
130                     params = copy.copy(jobp)
131                     params[var] = i
132                     arvados.api().job_tasks().create(body={
133                         'job_uuid': arvados.current_job()['uuid'],
134                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
135                         'sequence': 1,
136                         'parameters': params
137                         }
138                     ).execute()
139                 if "task.vwd" in jobp:
140                     # Base vwd collection will be merged with output fragments from
141                     # the other tasks by crunch.
142                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
143                 else:
144                     arvados.current_task().set_output(None)
145                 sys.exit(0)
146             else:
147                 sys.exit(1)
148     else:
149         taskp = jobp
150
151     if "task.vwd" in taskp:
152         # Populate output directory with symlinks to files in collection
153         vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
154
155     if "task.cwd" in taskp:
156         os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
157
158     cmd = expand_list(taskp, taskp["command"])
159
160     if "task.stdin" in taskp:
161         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
162         stdinfile = open(stdinname, "rb")
163
164     if "task.stdout" in taskp:
165         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
166         stdoutfile = open(stdoutname, "wb")
167
168     logging.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
169
170 except Exception as e:
171     logging.exception("caught exception")
172     logging.error("task parameters was:")
173     logging.error(pprint.pformat(taskp))
174     sys.exit(1)
175
176 try:
177     sp = subprocess.Popen(cmd, shell=False, stdin=stdinfile, stdout=stdoutfile)
178     sig = SigHandler()
179
180     # forward signals to the process.
181     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
182     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
183     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
184
185     # wait for process to complete.
186     rcode = sp.wait()
187
188     if sig.sig is not None:
189         logging.critical("terminating on signal %s" % sig.sig)
190         sys.exit(2)
191     else:
192         logging.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
193
194 except Exception as e:
195     logging.exception("caught exception")
196
197 # restore default signal handlers.
198 signal.signal(signal.SIGINT, signal.SIG_DFL)
199 signal.signal(signal.SIGTERM, signal.SIG_DFL)
200 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
201
202 for l in links:
203     os.unlink(l)
204
205 logging.info("the following output files will be saved to keep:")
206
207 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
208
209 logging.info("start writing output to keep")
210
211 if "task.vwd" in taskp:
212     if "task.foreach" in jobp:
213         # This is a subtask, so don't merge with the original collection, that will happen at the end
214         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
215     else:
216         # Just a single task, so do merge with the original collection
217         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
218 else:
219     outcollection = robust_put.upload(outdir)
220
221 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
222                                      body={
223                                          'output': outcollection,
224                                          'success': (rcode == 0),
225                                          'progress':1.0
226                                      }).execute()
227
228 sys.exit(rcode)