4042: task.foreach should now accept multiple parameters and generate tasks
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28
29 os.umask(0077)
30
31 t = arvados.current_task().tmpdir
32
33 api = arvados.api('v1')
34
35 os.chdir(arvados.current_task().tmpdir)
36 os.mkdir("tmpdir")
37 os.mkdir("output")
38
39 os.chdir("output")
40
41 outdir = os.getcwd()
42
43 taskp = None
44 jobp = arvados.current_job()['script_parameters']
45 if len(arvados.current_task()['parameters']) > 0:
46     taskp = arvados.current_task()['parameters']
47
48 links = []
49
50 def sub_tmpdir(v):
51     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
52
53 def sub_outdir(v):
54     return outdir
55
56 def sub_cores(v):
57      return str(multiprocessing.cpu_count())
58
59 def sub_jobid(v):
60      return os.environ['JOB_UUID']
61
62 def sub_taskid(v):
63      return os.environ['TASK_UUID']
64
65 def sub_jobsrc(v):
66      return os.environ['CRUNCH_SRC']
67
68 subst.default_subs["task.tmpdir"] = sub_tmpdir
69 subst.default_subs["task.outdir"] = sub_outdir
70 subst.default_subs["job.srcdir"] = sub_jobsrc
71 subst.default_subs["node.cores"] = sub_cores
72 subst.default_subs["job.uuid"] = sub_jobid
73 subst.default_subs["task.uuid"] = sub_taskid
74
75 class SigHandler(object):
76     def __init__(self):
77         self.sig = None
78
79     def send_signal(self, sp, signum):
80         sp.send_signal(signum)
81         self.sig = signum
82
83 def expand_item(p, c):
84     if isinstance(c, dict):
85         if "foreach" in c and "command" in c:
86             var = c["foreach"]
87             items = get_items(p, p[var])
88             r = []
89             for i in items:
90                 params = copy.copy(p)
91                 params[var] = i
92                 r.extend(expand_list(params, c["command"]))
93             return r
94     elif isinstance(c, list):
95         return expand_list(p, c)
96     elif isinstance(c, str) or isinstance(c, unicode):
97         return [subst.do_substitution(p, c)]
98
99     return []
100
101 def expand_list(p, l):
102     return [exp for arg in l for exp in expand_item(p, arg)]
103
104 def get_items(p, value):
105     if isinstance(value, list):
106         return expand_list(p, value)
107
108     fn = subst.do_substitution(p, value)
109     mode = os.stat(fn).st_mode
110     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
111     if mode is not None:
112         if stat.S_ISDIR(mode):
113             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
114         elif stat.S_ISREG(mode):
115             with open(fn) as f:
116                 items = [line.rstrip("\r\n") for line in f]
117         return items
118     else:
119         return None
120
121 stdoutname = None
122 stdoutfile = None
123 stdinname = None
124 stdinfile = None
125 rcode = 1
126
127 def recursive_foreach(params, fvars):
128     var = fvars[0]
129     fvars = fvars[1:]
130     items = get_items(params, params[var])
131     logger.info("parallelizing on %s with items %s" % (var, items))
132     if items is not None:
133         for i in items:
134             params = copy.copy(params)
135             params[var] = i
136             if len(fvars) > 0:
137                 recursive_foreach(params, fvars)
138             else:
139                 arvados.api().job_tasks().create(body={
140                     'job_uuid': arvados.current_job()['uuid'],
141                     'created_by_job_task_uuid': arvados.current_task()['uuid'],
142                     'sequence': 1,
143                     'parameters': params
144                     }
145                 ).execute()
146     else:
147         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
148         sys.exit(1)
149
150 try:
151     if "task.foreach" in jobp:
152         if arvados.current_task()['sequence'] == 0:
153             # This is the first task to start the other tasks and exit
154             fvars = jobp["task.foreach"]
155             if isinstance(fvars, basestring):
156                 fvars = [fvars]
157             if not isinstance(fvars, list) or len(fvars) == 0:
158                 logger.error("value of task.foreach must be a string or non-empty list")
159                 sys.exit(1)
160             recursive_foreach(jobp, jobp["task.foreach"])
161             if "task.vwd" in jobp:
162                 # Set output of the first task to the base vwd collection so it
163                 # will be merged with output fragments from the other tasks by
164                 # crunch.
165                 arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
166             else:
167                 arvados.current_task().set_output(None)
168             sys.exit(0)
169     else:
170         # This is the only task so taskp/jobp are the same
171         taskp = jobp
172
173     if "task.vwd" in taskp:
174         # Populate output directory with symlinks to files in collection
175         vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
176
177     if "task.cwd" in taskp:
178         os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
179
180     cmd = expand_list(taskp, taskp["command"])
181
182     if "task.stdin" in taskp:
183         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
184         stdinfile = open(stdinname, "rb")
185
186     if "task.stdout" in taskp:
187         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
188         stdoutfile = open(stdoutname, "wb")
189
190     logger.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
191 except subst.SubstitutionError as e:
192     logger.error(str(e))
193     logger.error("task parameters were:")
194     logger.error(pprint.pformat(taskp))
195     sys.exit(1)
196 except Exception as e:
197     logger.exception("caught exception")
198     logger.error("task parameters were:")
199     logger.error(pprint.pformat(taskp))
200     sys.exit(1)
201
202 try:
203     sp = subprocess.Popen(cmd, shell=False, stdin=stdinfile, stdout=stdoutfile)
204     sig = SigHandler()
205
206     # forward signals to the process.
207     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
208     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
209     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
210
211     # wait for process to complete.
212     rcode = sp.wait()
213
214     if sig.sig is not None:
215         logger.critical("terminating on signal %s" % sig.sig)
216         sys.exit(2)
217     else:
218         logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
219
220 except Exception as e:
221     logger.exception("caught exception")
222
223 # restore default signal handlers.
224 signal.signal(signal.SIGINT, signal.SIG_DFL)
225 signal.signal(signal.SIGTERM, signal.SIG_DFL)
226 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
227
228 for l in links:
229     os.unlink(l)
230
231 logger.info("the following output files will be saved to keep:")
232
233 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
234
235 logger.info("start writing output to keep")
236
237 if "task.vwd" in taskp:
238     if "task.foreach" in jobp:
239         # This is a subtask, so don't merge with the original collection, that will happen at the end
240         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
241     else:
242         # Just a single task, so do merge with the original collection
243         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
244 else:
245     outcollection = robust_put.upload(outdir, logger)
246
247 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
248                                      body={
249                                          'output': outcollection,
250                                          'success': (rcode == 0),
251                                          'progress':1.0
252                                      }).execute()
253
254 sys.exit(rcode)