Renamed "job.src" to "job.srcdir", "job.id" to "job.uuid", and "task.id" to "task...
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import arvados
4 import re
5 import os
6 import subprocess
7 import sys
8 import shutil
9 import subst
10 import time
11 import arvados.commands.put as put
12 import signal
13 import stat
14 import copy
15 import traceback
16 import pprint
17 import multiprocessing
18
19 os.umask(0077)
20
21 t = arvados.current_task().tmpdir
22
23 api = arvados.api('v1')
24
25 os.chdir(arvados.current_task().tmpdir)
26 os.mkdir("tmpdir")
27 os.mkdir("output")
28
29 os.chdir("output")
30
31 outdir = os.getcwd()
32
33 taskp = None
34 jobp = arvados.current_job()['script_parameters']
35 if len(arvados.current_task()['parameters']) > 0:
36     p = arvados.current_task()['parameters']
37
38 links = []
39
40 def sub_link(v):
41     r = os.path.join(outdir, os.path.basename(v))
42     os.symlink(v, r)
43     links.append(r)
44     return r
45
46 def sub_tmpdir(v):
47     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
48
49 def sub_outdir(v):
50     return outdir
51
52 def sub_cores(v):
53      return str(multiprocessing.cpu_count())
54
55 def sub_jobid(v):
56      return os.environ['JOB_UUID']
57
58 def sub_taskid(v):
59      return os.environ['TASK_UUID']
60
61 def sub_jobsrc(v):
62      return os.environ['CRUNCH_SRC']
63
64 subst.default_subs["link "] = sub_link
65 subst.default_subs["task.tmpdir"] = sub_tmpdir
66 subst.default_subs["task.outdir"] = sub_outdir
67 subst.default_subs["job.srcdir"] = sub_jobsrc
68 subst.default_subs["node.cores"] = sub_cores
69 subst.default_subs["job.uuid"] = sub_jobid
70 subst.default_subs["task.uuid"] = sub_taskid
71
72 rcode = 1
73
74 def machine_progress(bytes_written, bytes_expected):
75     return "run-command: wrote {} total {}\n".format(
76         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
77
78 class SigHandler(object):
79     def __init__(self):
80         self.sig = None
81
82     def send_signal(self, sp, signum):
83         sp.send_signal(signum)
84         self.sig = signum
85
86 def expand_item(p, c):
87     if isinstance(c, dict):
88         if "foreach" in c and "command" in c:
89             var = c["foreach"]
90             items = get_items(p, p[var])
91             r = []
92             for i in items:
93                 params = copy.copy(p)
94                 params[var] = i
95                 r.extend(expand_list(params, c["command"]))
96             return r
97     elif isinstance(c, list):
98         return expand_list(p, c)
99     elif isinstance(c, str) or isinstance(c, unicode):
100         return [subst.do_substitution(p, c)]
101
102     return []
103
104 def expand_list(p, l):
105     return [exp for arg in l for exp in expand_item(p, arg)]
106
107 def get_items(p, value):
108     if isinstance(value, list):
109         return expand_list(p, value)
110
111     fn = subst.do_substitution(p, value)
112     mode = os.stat(fn).st_mode
113     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
114     if mode != None:
115         if stat.S_ISDIR(mode):
116             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
117         elif stat.S_ISREG(mode):
118             with open(fn) as f:
119                 items = [line for line in f]
120         return items
121     else:
122         return None
123
124 stdoutname = None
125 stdoutfile = None
126
127 try:
128     if "task.foreach" in jobp:
129         if arvados.current_task()['sequence'] == 0:
130             var = jobp["task.foreach"]
131             items = get_items(jobp, jobp[var])
132             print("run-command: parallelizing on %s with items %s" % (var, items))
133             if items != None:
134                 for i in items:
135                     params = copy.copy(jobp)
136                     params[var] = i
137                     arvados.api().job_tasks().create(body={
138                         'job_uuid': arvados.current_job()['uuid'],
139                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
140                         'sequence': 1,
141                         'parameters': params
142                         }
143                     ).execute()
144                 arvados.current_task().set_output(None)
145                 sys.exit(0)
146             else:
147                 sys.exit(1)
148     else:
149         p = jobp
150
151     cmd = expand_list(p, p["command"])
152
153     if "save.stdout" in p:
154         stdoutname = subst.do_substitution(p, p["save.stdout"])
155         stdoutfile = open(stdoutname, "wb")
156
157     print("run-command: {}{}".format(' '.join(cmd), (" > " + stdoutname) if stdoutname != None else ""))
158
159 except Exception as e:
160     print("run-command: caught exception:")
161     traceback.print_exc(file=sys.stdout)
162     print("run-command: task parameters was:")
163     pprint.pprint(p)
164     sys.exit(1)
165
166 try:
167     sp = subprocess.Popen(cmd, shell=False, stdout=stdoutfile)
168     sig = SigHandler()
169
170     # forward signals to the process.
171     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
172     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
173     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
174
175     # wait for process to complete.
176     rcode = sp.wait()
177
178     if sig.sig != None:
179         print("run-command: terminating on signal %s" % sig.sig)
180         sys.exit(2)
181     else:
182         print("run-command: completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
183
184 except Exception as e:
185     print("run-command: caught exception:")
186     traceback.print_exc(file=sys.stdout)
187
188 # restore default signal handlers.
189 signal.signal(signal.SIGINT, signal.SIG_DFL)
190 signal.signal(signal.SIGTERM, signal.SIG_DFL)
191 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
192
193 for l in links:
194     os.unlink(l)
195
196 print("run-command: the following output files will be saved to keep:")
197
198 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"])
199
200 print("run-command: start writing output to keep")
201
202 done = False
203 resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
204 reporter = put.progress_writer(machine_progress)
205 bytes_expected = put.expected_bytes_for(".")
206 while not done:
207     try:
208         out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
209         out.do_queued_work()
210         out.write_directory_tree(".", max_manifest_depth=0)
211         outuuid = out.finish()
212         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
213                                              body={
214                                                  'output':outuuid,
215                                                  'success': (rcode == 0),
216                                                  'progress':1.0
217                                              }).execute()
218         done = True
219     except KeyboardInterrupt:
220         print("run-command: terminating on signal 2")
221         sys.exit(2)
222     except Exception as e:
223         print("run-command: caught exception:")
224         traceback.print_exc(file=sys.stdout)
225         time.sleep(5)
226
227 sys.exit(rcode)