3373: Switch run-command to use python logging instead of print.
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import arvados
4 import re
5 import os
6 import subprocess
7 import sys
8 import shutil
9 import subst
10 import time
11 import arvados.commands.put as put
12 import signal
13 import stat
14 import copy
15 import traceback
16 import pprint
17 import multiprocessing
18 import logging
19
20 os.umask(0077)
21 logging.basicConfig(format="run-command: %(message)s")
22
23 t = arvados.current_task().tmpdir
24
25 api = arvados.api('v1')
26
27 os.chdir(arvados.current_task().tmpdir)
28 os.mkdir("tmpdir")
29 os.mkdir("output")
30
31 os.chdir("output")
32
33 outdir = os.getcwd()
34
35 taskp = None
36 jobp = arvados.current_job()['script_parameters']
37 if len(arvados.current_task()['parameters']) > 0:
38     p = arvados.current_task()['parameters']
39
40 links = []
41
42 def sub_link(v):
43     r = os.path.join(outdir, os.path.basename(v))
44     os.symlink(v, r)
45     links.append(r)
46     return r
47
48 def sub_tmpdir(v):
49     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
50
51 def sub_outdir(v):
52     return outdir
53
54 def sub_cores(v):
55      return str(multiprocessing.cpu_count())
56
57 def sub_jobid(v):
58      return os.environ['JOB_UUID']
59
60 def sub_taskid(v):
61      return os.environ['TASK_UUID']
62
63 def sub_jobsrc(v):
64      return os.environ['CRUNCH_SRC']
65
66 subst.default_subs["link "] = sub_link
67 subst.default_subs["task.tmpdir"] = sub_tmpdir
68 subst.default_subs["task.outdir"] = sub_outdir
69 subst.default_subs["job.srcdir"] = sub_jobsrc
70 subst.default_subs["node.cores"] = sub_cores
71 subst.default_subs["job.uuid"] = sub_jobid
72 subst.default_subs["task.uuid"] = sub_taskid
73
74 rcode = 1
75
76 def machine_progress(bytes_written, bytes_expected):
77     return "run-command: wrote {} total {}\n".format(
78         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
79
80 class SigHandler(object):
81     def __init__(self):
82         self.sig = None
83
84     def send_signal(self, sp, signum):
85         sp.send_signal(signum)
86         self.sig = signum
87
88 def expand_item(p, c):
89     if isinstance(c, dict):
90         if "foreach" in c and "command" in c:
91             var = c["foreach"]
92             items = get_items(p, p[var])
93             r = []
94             for i in items:
95                 params = copy.copy(p)
96                 params[var] = i
97                 r.extend(expand_list(params, c["command"]))
98             return r
99     elif isinstance(c, list):
100         return expand_list(p, c)
101     elif isinstance(c, str) or isinstance(c, unicode):
102         return [subst.do_substitution(p, c)]
103
104     return []
105
106 def expand_list(p, l):
107     return [exp for arg in l for exp in expand_item(p, arg)]
108
109 def get_items(p, value):
110     if isinstance(value, list):
111         return expand_list(p, value)
112
113     fn = subst.do_substitution(p, value)
114     mode = os.stat(fn).st_mode
115     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
116     if mode != None:
117         if stat.S_ISDIR(mode):
118             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
119         elif stat.S_ISREG(mode):
120             with open(fn) as f:
121                 items = [line for line in f]
122         return items
123     else:
124         return None
125
126 stdoutname = None
127 stdoutfile = None
128
129 try:
130     if "task.foreach" in jobp:
131         if arvados.current_task()['sequence'] == 0:
132             var = jobp["task.foreach"]
133             items = get_items(jobp, jobp[var])
134             logging.info("run-command: parallelizing on %s with items %s" % (var, items))
135             if items != None:
136                 for i in items:
137                     params = copy.copy(jobp)
138                     params[var] = i
139                     arvados.api().job_tasks().create(body={
140                         'job_uuid': arvados.current_job()['uuid'],
141                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
142                         'sequence': 1,
143                         'parameters': params
144                         }
145                     ).execute()
146                 arvados.current_task().set_output(None)
147                 sys.exit(0)
148             else:
149                 sys.exit(1)
150     else:
151         p = jobp
152
153     cmd = expand_list(p, p["command"])
154
155     if "save.stdout" in p:
156         stdoutname = subst.do_substitution(p, p["save.stdout"])
157         stdoutfile = open(stdoutname, "wb")
158
159     logging.info("run-command: {}{}".format(' '.join(cmd), (" > " + stdoutname) if stdoutname != None else ""))
160
161 except Exception as e:
162     logging.info("run-command: caught exception:")
163     traceback.print_exc(file=sys.stdout)
164     logging.info("run-command: task parameters was:")
165     pprint.plogging.info(p)
166     sys.exit(1)
167
168 try:
169     sp = subprocess.Popen(cmd, shell=False, stdout=stdoutfile)
170     sig = SigHandler()
171
172     # forward signals to the process.
173     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
174     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
175     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
176
177     # wait for process to complete.
178     rcode = sp.wait()
179
180     if sig.sig != None:
181         logging.warning("run-command: terminating on signal %s" % sig.sig)
182         sys.exit(2)
183     else:
184         logging.info("run-command: completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
185
186 except Exception as e:
187     logging.error("run-command: caught exception:")
188     traceback.print_exc(file=sys.stdout)
189
190 # restore default signal handlers.
191 signal.signal(signal.SIGINT, signal.SIG_DFL)
192 signal.signal(signal.SIGTERM, signal.SIG_DFL)
193 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
194
195 for l in links:
196     os.unlink(l)
197
198 logging.info("run-command: the following output files will be saved to keep:")
199
200 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"])
201
202 logging.info("run-command: start writing output to keep")
203
204 done = False
205 resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
206 reporter = put.progress_writer(machine_progress)
207 bytes_expected = put.expected_bytes_for(".")
208 while not done:
209     try:
210         out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
211         out.do_queued_work()
212         out.write_directory_tree(".", max_manifest_depth=0)
213         outuuid = out.finish()
214         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
215                                              body={
216                                                  'output':outuuid,
217                                                  'success': (rcode == 0),
218                                                  'progress':1.0
219                                              }).execute()
220         done = True
221     except KeyboardInterrupt:
222         logging.error("run-command: terminating on signal 2")
223         sys.exit(2)
224     except Exception as e:
225         logging.error("run-command: caught exception:")
226         traceback.print_exc(file=sys.stdout)
227         time.sleep(5)
228
229 sys.exit(rcode)