Fix $(node.cores)
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import arvados
4 import re
5 import os
6 import subprocess
7 import sys
8 import shutil
9 import subst
10 import time
11 import arvados.commands.put as put
12 import signal
13 import stat
14 import copy
15 import traceback
16 import pprint
17 import multiprocessing
18
19 os.umask(0077)
20
21 t = arvados.current_task().tmpdir
22
23 api = arvados.api('v1')
24
25 os.chdir(arvados.current_task().tmpdir)
26 os.mkdir("tmpdir")
27 os.mkdir("output")
28
29 os.chdir("output")
30
31 taskp = None
32 jobp = arvados.current_job()['script_parameters']
33 if len(arvados.current_task()['parameters']) > 0:
34     p = arvados.current_task()['parameters']
35
36 links = []
37
38 def sub_link(v):
39     r = os.path.basename(v)
40     os.symlink(os.path.join(os.environ['TASK_KEEPMOUNT'], v) , r)
41     links.append(r)
42     return r
43
44 def sub_tmpdir(v):
45     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
46
47 def sub_cores(v):
48      return str(multiprocessing.cpu_count())
49
50 def sub_jobid(v):
51      return os.environ['JOB_UUID']
52
53 def sub_taskid(v):
54      return os.environ['TASK_UUID']
55
56 subst.default_subs["link "] = sub_link
57 subst.default_subs["task.tmpdir"] = sub_tmpdir
58 subst.default_subs["node.cores"] = sub_cores
59 subst.default_subs["job.id"] = sub_jobid
60 subst.default_subs["task.id"] = sub_taskid
61
62 rcode = 1
63
64 def machine_progress(bytes_written, bytes_expected):
65     return "run-command: wrote {} total {}\n".format(
66         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
67
68 class SigHandler(object):
69     def __init__(self):
70         self.sig = None
71
72     def send_signal(self, sp, signum):
73         sp.send_signal(signum)
74         self.sig = signum
75
76 def expand_item(p, c):
77     if isinstance(c, dict):
78         if "foreach" in c and "command" in c:
79             var = c["foreach"]
80             items = get_items(p, p[var])
81             r = []
82             for i in items:
83                 params = copy.copy(p)
84                 params[var] = i
85                 r.extend(expand_list(params, c["command"]))
86             return r
87     elif isinstance(c, list):
88         return expand_list(p, c)
89     elif isinstance(c, str) or isinstance(c, unicode):
90         return [subst.do_substitution(p, c)]
91
92     return []
93
94 def expand_list(p, l):
95     return [exp for arg in l for exp in expand_item(p, arg)]
96
97 def get_items(p, value):
98     if isinstance(value, list):
99         return expand_list(p, value)
100
101     fn = subst.do_substitution(p, value)
102     mode = os.stat(fn).st_mode
103     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
104     if mode != None:
105         if stat.S_ISDIR(mode):
106             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
107         elif stat.S_ISREG(mode):
108             with open(fn) as f:
109                 items = [line for line in f]
110         return items
111     else:
112         return None
113
114 stdoutname = None
115 stdoutfile = None
116
117 try:
118     if "task.foreach" in jobp:
119         if arvados.current_task()['sequence'] == 0:
120             var = jobp["task.foreach"]
121             items = get_items(jobp, jobp[var])
122             print("run-command: parallelizing on %s with items %s" % (var, items))
123             if items != None:
124                 for i in items:
125                     params = copy.copy(jobp)
126                     params[var] = i
127                     arvados.api().job_tasks().create(body={
128                         'job_uuid': arvados.current_job()['uuid'],
129                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
130                         'sequence': 1,
131                         'parameters': params
132                         }
133                     ).execute()
134                 arvados.current_task().set_output(None)
135                 sys.exit(0)
136             else:
137                 sys.exit(1)
138     else:
139         p = jobp
140
141     cmd = expand_list(p, p["command"])
142
143     if "save.stdout" in p:
144         stdoutname = subst.do_substitution(p, p["save.stdout"])
145         stdoutfile = open(stdoutname, "wb")
146
147     print("run-command: {}{}".format(' '.join(cmd), (" > " + stdoutname) if stdoutname != None else ""))
148
149 except Exception as e:
150     print("run-command: caught exception:")
151     traceback.print_exc(file=sys.stdout)
152     print("run-command: task parameters was:")
153     pprint.pprint(p)
154     sys.exit(1)
155
156 try:
157     sp = subprocess.Popen(cmd, shell=False, stdout=stdoutfile)
158     sig = SigHandler()
159
160     # forward signals to the process.
161     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
162     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
163     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
164
165     # wait for process to complete.
166     rcode = sp.wait()
167
168     if sig.sig != None:
169         print("run-command: terminating on signal %s" % sig.sig)
170         sys.exit(2)
171     else:
172         print("run-command: completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
173
174 except Exception as e:
175     print("run-command: caught exception:")
176     traceback.print_exc(file=sys.stdout)
177
178 # restore default signal handlers.
179 signal.signal(signal.SIGINT, signal.SIG_DFL)
180 signal.signal(signal.SIGTERM, signal.SIG_DFL)
181 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
182
183 for l in links:
184     os.unlink(l)
185
186 print("run-command: the following output files will be saved to keep:")
187
188 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"])
189
190 print("run-command: start writing output to keep")
191
192 done = False
193 resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
194 reporter = put.progress_writer(machine_progress)
195 bytes_expected = put.expected_bytes_for(".")
196 while not done:
197     try:
198         out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
199         out.do_queued_work()
200         out.write_directory_tree(".", max_manifest_depth=0)
201         outuuid = out.finish()
202         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
203                                              body={
204                                                  'output':outuuid,
205                                                  'success': (rcode == 0),
206                                                  'progress':1.0
207                                              }).execute()
208         done = True
209     except KeyboardInterrupt:
210         print("run-command: terminating on signal 2")
211         sys.exit(2)
212     except Exception as e:
213         print("run-command: caught exception:")
214         traceback.print_exc(file=sys.stdout)
215         time.sleep(5)
216
217 sys.exit(rcode)