Merge branch '3373-improve-gatk3-snv-pipeline' closes #3373
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import arvados
4 import re
5 import os
6 import subprocess
7 import sys
8 import shutil
9 import subst
10 import time
11 import arvados.commands.put as put
12 import signal
13 import stat
14 import copy
15 import traceback
16 import pprint
17 import multiprocessing
18 import logging
19
20 os.umask(0077)
21 logging.basicConfig(format="run-command: %(message)s")
22
23 t = arvados.current_task().tmpdir
24
25 api = arvados.api('v1')
26
27 os.chdir(arvados.current_task().tmpdir)
28 os.mkdir("tmpdir")
29 os.mkdir("output")
30
31 os.chdir("output")
32
33 outdir = os.getcwd()
34
35 taskp = None
36 jobp = arvados.current_job()['script_parameters']
37 if len(arvados.current_task()['parameters']) > 0:
38     taskp = arvados.current_task()['parameters']
39
40 links = []
41
42 def sub_link(v):
43     r = os.path.join(outdir, os.path.basename(v))
44     os.symlink(v, r)
45     links.append(r)
46     return r
47
48 def sub_tmpdir(v):
49     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
50
51 def sub_outdir(v):
52     return outdir
53
54 def sub_cores(v):
55      return str(multiprocessing.cpu_count())
56
57 def sub_jobid(v):
58      return os.environ['JOB_UUID']
59
60 def sub_taskid(v):
61      return os.environ['TASK_UUID']
62
63 def sub_jobsrc(v):
64      return os.environ['CRUNCH_SRC']
65
66 subst.default_subs["link "] = sub_link
67 subst.default_subs["task.tmpdir"] = sub_tmpdir
68 subst.default_subs["task.outdir"] = sub_outdir
69 subst.default_subs["job.srcdir"] = sub_jobsrc
70 subst.default_subs["node.cores"] = sub_cores
71 subst.default_subs["job.uuid"] = sub_jobid
72 subst.default_subs["task.uuid"] = sub_taskid
73
74 def machine_progress(bytes_written, bytes_expected):
75     return "run-command: wrote {} total {}\n".format(
76         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
77
78 class SigHandler(object):
79     def __init__(self):
80         self.sig = None
81
82     def send_signal(self, sp, signum):
83         sp.send_signal(signum)
84         self.sig = signum
85
86 def expand_item(p, c):
87     if isinstance(c, dict):
88         if "foreach" in c and "command" in c:
89             var = c["foreach"]
90             items = get_items(p, p[var])
91             r = []
92             for i in items:
93                 params = copy.copy(p)
94                 params[var] = i
95                 r.extend(expand_list(params, c["command"]))
96             return r
97     elif isinstance(c, list):
98         return expand_list(p, c)
99     elif isinstance(c, str) or isinstance(c, unicode):
100         return [subst.do_substitution(p, c)]
101
102     return []
103
104 def expand_list(p, l):
105     return [exp for arg in l for exp in expand_item(p, arg)]
106
107 def get_items(p, value):
108     if isinstance(value, list):
109         return expand_list(p, value)
110
111     fn = subst.do_substitution(p, value)
112     mode = os.stat(fn).st_mode
113     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
114     if mode != None:
115         if stat.S_ISDIR(mode):
116             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
117         elif stat.S_ISREG(mode):
118             with open(fn) as f:
119                 items = [line for line in f]
120         return items
121     else:
122         return None
123
124 stdoutname = None
125 stdoutfile = None
126 rcode = 1
127
128 try:
129     if "task.foreach" in jobp:
130         if arvados.current_task()['sequence'] == 0:
131             var = jobp["task.foreach"]
132             items = get_items(jobp, jobp[var])
133             logging.info("parallelizing on %s with items %s" % (var, items))
134             if items != None:
135                 for i in items:
136                     params = copy.copy(jobp)
137                     params[var] = i
138                     arvados.api().job_tasks().create(body={
139                         'job_uuid': arvados.current_job()['uuid'],
140                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
141                         'sequence': 1,
142                         'parameters': params
143                         }
144                     ).execute()
145                 arvados.current_task().set_output(None)
146                 sys.exit(0)
147             else:
148                 sys.exit(1)
149     else:
150         taskp = jobp
151
152     cmd = expand_list(taskp, taskp["command"])
153
154     if "save.stdout" in taskp:
155         stdoutname = subst.do_substitution(taskp, taskp["save.stdout"])
156         stdoutfile = open(stdoutname, "wb")
157
158     logging.info("{}{}".format(' '.join(cmd), (" > " + stdoutname) if stdoutname != None else ""))
159
160 except Exception as e:
161     logging.exception("caught exception")
162     logging.error("task parameters was:")
163     logging.error(pprint.pformat(taskp))
164     sys.exit(1)
165
166 try:
167     sp = subprocess.Popen(cmd, shell=False, stdout=stdoutfile)
168     sig = SigHandler()
169
170     # forward signals to the process.
171     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
172     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
173     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
174
175     # wait for process to complete.
176     rcode = sp.wait()
177
178     if sig.sig != None:
179         logging.critical("terminating on signal %s" % sig.sig)
180         sys.exit(2)
181     else:
182         logging.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
183
184 except Exception as e:
185     logging.exception("caught exception")
186
187 # restore default signal handlers.
188 signal.signal(signal.SIGINT, signal.SIG_DFL)
189 signal.signal(signal.SIGTERM, signal.SIG_DFL)
190 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
191
192 for l in links:
193     os.unlink(l)
194
195 logging.info("the following output files will be saved to keep:")
196
197 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
198
199 logging.info("start writing output to keep")
200
201 done = False
202 resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
203 reporter = put.progress_writer(machine_progress)
204 bytes_expected = put.expected_bytes_for(".")
205 while not done:
206     try:
207         out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
208         out.do_queued_work()
209         out.write_directory_tree(".", max_manifest_depth=0)
210         outuuid = out.finish()
211         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
212                                              body={
213                                                  'output':outuuid,
214                                                  'success': (rcode == 0),
215                                                  'progress':1.0
216                                              }).execute()
217         done = True
218     except KeyboardInterrupt:
219         logging.critical("terminating on signal 2")
220         sys.exit(2)
221     except Exception as e:
222         logging.exception("caught exception:")
223         time.sleep(5)
224
225 sys.exit(rcode)