3505: Adjust order of initialization to reflect potential side effects (e.g. file
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import arvados
4 import re
5 import os
6 import subprocess
7 import sys
8 import shutil
9 import subst
10 import time
11 import arvados.commands.put as put
12 import signal
13 import stat
14 import copy
15 import traceback
16 import pprint
17 import multiprocessing
18 import logging
19 import robust_put
20 import vwd
21
22 os.umask(0077)
23 logging.basicConfig(format="run-command: %(message)s")
24
25 t = arvados.current_task().tmpdir
26
27 api = arvados.api('v1')
28
29 os.chdir(arvados.current_task().tmpdir)
30 os.mkdir("tmpdir")
31 os.mkdir("output")
32
33 os.chdir("output")
34
35 outdir = os.getcwd()
36
37 taskp = None
38 jobp = arvados.current_job()['script_parameters']
39 if len(arvados.current_task()['parameters']) > 0:
40     taskp = arvados.current_task()['parameters']
41
42 links = []
43
44 def sub_tmpdir(v):
45     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
46
47 def sub_outdir(v):
48     return outdir
49
50 def sub_cores(v):
51      return str(multiprocessing.cpu_count())
52
53 def sub_jobid(v):
54      return os.environ['JOB_UUID']
55
56 def sub_taskid(v):
57      return os.environ['TASK_UUID']
58
59 def sub_jobsrc(v):
60      return os.environ['CRUNCH_SRC']
61
62 subst.default_subs["task.tmpdir"] = sub_tmpdir
63 subst.default_subs["task.outdir"] = sub_outdir
64 subst.default_subs["job.srcdir"] = sub_jobsrc
65 subst.default_subs["node.cores"] = sub_cores
66 subst.default_subs["job.uuid"] = sub_jobid
67 subst.default_subs["task.uuid"] = sub_taskid
68
69 class SigHandler(object):
70     def __init__(self):
71         self.sig = None
72
73     def send_signal(self, sp, signum):
74         sp.send_signal(signum)
75         self.sig = signum
76
77 def expand_item(p, c):
78     if isinstance(c, dict):
79         if "foreach" in c and "command" in c:
80             var = c["foreach"]
81             items = get_items(p, p[var])
82             r = []
83             for i in items:
84                 params = copy.copy(p)
85                 params[var] = i
86                 r.extend(expand_list(params, c["command"]))
87             return r
88     elif isinstance(c, list):
89         return expand_list(p, c)
90     elif isinstance(c, str) or isinstance(c, unicode):
91         return [subst.do_substitution(p, c)]
92
93     return []
94
95 def expand_list(p, l):
96     return [exp for arg in l for exp in expand_item(p, arg)]
97
98 def get_items(p, value):
99     if isinstance(value, list):
100         return expand_list(p, value)
101
102     fn = subst.do_substitution(p, value)
103     mode = os.stat(fn).st_mode
104     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
105     if mode != None:
106         if stat.S_ISDIR(mode):
107             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
108         elif stat.S_ISREG(mode):
109             with open(fn) as f:
110                 items = [line for line in f]
111         return items
112     else:
113         return None
114
115 stdoutname = None
116 stdoutfile = None
117 rcode = 1
118
119 try:
120     if "task.foreach" in jobp:
121         if arvados.current_task()['sequence'] == 0:
122             var = jobp["task.foreach"]
123             items = get_items(jobp, jobp[var])
124             logging.info("parallelizing on %s with items %s" % (var, items))
125             if items != None:
126                 for i in items:
127                     params = copy.copy(jobp)
128                     params[var] = i
129                     arvados.api().job_tasks().create(body={
130                         'job_uuid': arvados.current_job()['uuid'],
131                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
132                         'sequence': 1,
133                         'parameters': params
134                         }
135                     ).execute()
136                 if "task.vwd" in jobp:
137                     # Base vwd collection will be merged with output fragments from
138                     # the other tasks by crunch.
139                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
140                 else:
141                     arvados.current_task().set_output(None)
142                 sys.exit(0)
143             else:
144                 sys.exit(1)
145     else:
146         taskp = jobp
147
148     if "task.vwd" in taskp:
149         # Populate output directory with symlinks to files in collection
150         vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
151
152     if "task.cwd" in taskp:
153         os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
154
155     cmd = expand_list(taskp, taskp["command"])
156
157     if "save.stdout" in taskp:
158         stdoutname = subst.do_substitution(taskp, taskp["save.stdout"])
159         stdoutfile = open(stdoutname, "wb")
160
161     logging.info("{}{}".format(' '.join(cmd), (" > " + stdoutname) if stdoutname != None else ""))
162
163 except Exception as e:
164     logging.exception("caught exception")
165     logging.error("task parameters was:")
166     logging.error(pprint.pformat(taskp))
167     sys.exit(1)
168
169 try:
170     sp = subprocess.Popen(cmd, shell=False, stdout=stdoutfile)
171     sig = SigHandler()
172
173     # forward signals to the process.
174     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
175     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
176     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
177
178     # wait for process to complete.
179     rcode = sp.wait()
180
181     if sig.sig != None:
182         logging.critical("terminating on signal %s" % sig.sig)
183         sys.exit(2)
184     else:
185         logging.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
186
187 except Exception as e:
188     logging.exception("caught exception")
189
190 # restore default signal handlers.
191 signal.signal(signal.SIGINT, signal.SIG_DFL)
192 signal.signal(signal.SIGTERM, signal.SIG_DFL)
193 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
194
195 for l in links:
196     os.unlink(l)
197
198 logging.info("the following output files will be saved to keep:")
199
200 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
201
202 logging.info("start writing output to keep")
203
204 if "task.vwd" in taskp:
205     if "task.foreach" in jobp:
206         # This is a subtask, so don't merge with the original collection, that will happen at the end
207         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
208     else:
209         # Just a single task, so do merge with the original collection
210         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
211 else:
212     outcollection = robust_put.upload(outdir)
213
214 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
215                                      body={
216                                          'output': outcollection,
217                                          'success': (rcode == 0),
218                                          'progress':1.0
219                                      }).execute()
220
221 sys.exit(rcode)