import stat
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import arvados
4 import re
5 import os
6 import subprocess
7 import sys
8 import shutil
9 import subst
10 import time
11 import arvados.commands.put as put
12 import signal
13 import stat
14
15 os.umask(0077)
16
17 t = arvados.current_task().tmpdir
18
19 api = arvados.api('v1')
20
21 os.chdir(arvados.current_task().tmpdir)
22 os.mkdir("tmpdir")
23 os.mkdir("output")
24
25 os.chdir("output")
26
27 taskp = None
28 jobp = arvados.current_job()['script_parameters']
29 if len(arvados.current_task()['parameters']) > 0:
30     p = arvados.current_task()['parameters']
31
32 links = []
33
34 def sub_link(v):
35     r = os.path.basename(v)
36     os.symlink(os.path.join(os.environ['TASK_KEEPMOUNT'], v) , r)
37     links.append(r)
38     return r
39
40 def sub_tmpdir(v):
41     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
42
43 def sub_cores(v):
44      return os.environ['CRUNCH_NODE_SLOTS']
45
46 def sub_jobid(v):
47      return os.environ['JOB_UUID']
48
49 def sub_taskid(v):
50      return os.environ['TASK_UUID']
51
52 subst.default_subs["link "] = sub_link
53 subst.default_subs["task.tmpdir"] = sub_tmpdir
54 subst.default_subs["node.cores"] = sub_cores
55 subst.default_subs["job.id"] = sub_jobid
56 subst.default_subs["task.id"] = sub_taskid
57
58 rcode = 1
59
60 def machine_progress(bytes_written, bytes_expected):
61     return "run-command: wrote {} total {}\n".format(
62         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
63
64 class SigHandler(object):
65     def __init__(self):
66         self.sig = None
67
68     def send_signal(self, sp, signum):
69         sp.send_signal(signum)
70         self.sig = signum
71
72 def expand_item(p, c):
73     if isinstance(c, dict):
74         if "foreach" in c and "command" in c:
75             var = c["foreach"]
76             items = get_items(p, p[var])
77             r = []
78             for i in items:
79                 params = copy.copy(p)
80                 params[var] = i
81                 r.extend(expand_list(params, c["command"]))
82             return r
83     elif isinstance(c, list):
84         return expand_list(p, c)
85     elif isinstance(c, str):
86         return [subst.do_substitution(p, c)]
87
88     return []
89
90 def expand_list(p, l):
91     return [exp for arg in l for exp in expand_item(p, arg)]
92
93 def get_items(p, value):
94     if isinstance(value, list):
95         return expand_list(p, value)
96
97     fn = subst.do_substitution(p, value)
98     mode = os.stat(fn)
99     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
100     if mode != None:
101         if stat.S_ISDIR(mode):
102             items = ["$(dir %s/%s)" % (prefix, l) for l in os.listdir(fn)]
103         elif stat.S_ISREG(mode):
104             with open(fn) as f:
105                 items = [line for line in f]
106         return items
107     else:
108         return None
109
110 if "task.foreach" in jobp:
111     if arvados.current_task()['sequence'] == 0:
112         var = jobp["task.foreach"]
113         items = get_items(jobp, jobp[var])
114         if items != None:
115             print("run-command: parallelizing on %s with items %s" % (var, items))
116
117             for i in items:
118                 params = copy.copy(jobp)
119                 params[var] = i
120                 arvados.api().job_tasks().create(body={
121                     'job_uuid': arvados.current_job()['uuid'],
122                     'created_by_job_task_uuid': arvados.current_task()['uuid'],
123                     'sequence': 1,
124                     'parameters': params
125                     }
126                 ).execute()
127             sys.exit(0)
128         else:
129             sys.exit(1)
130 else:
131     p = jobp
132
133 try:
134     cmd = expand_list(p, p["command"])
135
136     stdoutname = None
137     stdoutfile = None
138     if "save.stdout" in p:
139         stdoutname = subst.do_substitution(p, p["save.stdout"])
140         stdoutfile = open(stdoutname, "wb")
141
142     print("run-command: {}{}".format(' '.join(cmd), (" > " + stdoutname) if stdoutname != None else ""))
143
144     sp = subprocess.Popen(cmd, shell=False, stdout=stdoutfile)
145     sig = SigHandler()
146
147     # forward signals to the process.
148     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
149     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
150     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
151
152     # wait for process to complete.
153     rcode = sp.wait()
154
155     if sig.sig != None:
156         print("run-command: terminating on signal %s" % sig.sig)
157         sys.exit(2)
158     else:
159         print("run-command: completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
160
161 except Exception as e:
162     print("run-command: caught exception: {}".format(e))
163
164 # restore default signal handlers.
165 signal.signal(signal.SIGINT, signal.SIG_DFL)
166 signal.signal(signal.SIGTERM, signal.SIG_DFL)
167 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
168
169 for l in links:
170     os.unlink(l)
171
172 print("run-command: the following output files will be saved to keep:")
173
174 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"])
175
176 print("run-command: start writing output to keep")
177
178 done = False
179 resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
180 reporter = put.progress_writer(machine_progress)
181 bytes_expected = put.expected_bytes_for(".")
182 while not done:
183     try:
184         out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
185         out.do_queued_work()
186         out.write_directory_tree(".", max_manifest_depth=0)
187         outuuid = out.finish()
188         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
189                                              body={
190                                                  'output':outuuid,
191                                                  'success': (rcode == 0),
192                                                  'progress':1.0
193                                              }).execute()
194         done = True
195     except KeyboardInterrupt:
196         print("run-command: terminating on signal 2")
197         sys.exit(2)
198     except Exception as e:
199         print("run-command: caught exception: {}".format(e))
200         time.sleep(5)
201
202 sys.exit(rcode)