4042: Functions defined in get_items() now moved into expand_items()
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28
29 os.umask(0077)
30
31 t = arvados.current_task().tmpdir
32
33 api = arvados.api('v1')
34
35 os.chdir(arvados.current_task().tmpdir)
36 os.mkdir("tmpdir")
37 os.mkdir("output")
38
39 os.chdir("output")
40
41 outdir = os.getcwd()
42
43 taskp = None
44 jobp = arvados.current_job()['script_parameters']
45 if len(arvados.current_task()['parameters']) > 0:
46     taskp = arvados.current_task()['parameters']
47
48 links = []
49
50 def sub_tmpdir(v):
51     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
52
53 def sub_outdir(v):
54     return outdir
55
56 def sub_cores(v):
57      return str(multiprocessing.cpu_count())
58
59 def sub_jobid(v):
60      return os.environ['JOB_UUID']
61
62 def sub_taskid(v):
63      return os.environ['TASK_UUID']
64
65 def sub_jobsrc(v):
66      return os.environ['CRUNCH_SRC']
67
68 subst.default_subs["task.tmpdir"] = sub_tmpdir
69 subst.default_subs["task.outdir"] = sub_outdir
70 subst.default_subs["job.srcdir"] = sub_jobsrc
71 subst.default_subs["node.cores"] = sub_cores
72 subst.default_subs["job.uuid"] = sub_jobid
73 subst.default_subs["task.uuid"] = sub_taskid
74
75 class SigHandler(object):
76     def __init__(self):
77         self.sig = None
78
79     def send_signal(self, sp, signum):
80         sp.send_signal(signum)
81         self.sig = signum
82
83 def expand_item(p, c):
84     if isinstance(c, dict):
85         if "foreach" in c and "command" in c:
86             var = c["foreach"]
87             items = get_items(p, p[var])
88             r = []
89             for i in items:
90                 params = copy.copy(p)
91                 params[var] = i
92                 r.extend(expand_list(params, c["command"]))
93             return r
94         if "list" in c and "index" in c and "command" in c:
95             var = c["list"]
96             items = get_items(p, p[var])
97             params = copy.copy(p)
98             params[var] = items[int(c["index"])]
99             return expand_list(params, c["command"])
100         if "regex" in value:
101             pattern = re.compile(value["regex"])
102             if "filter" in value:
103                 items = get_items(p, value["filter"])
104                 return [i for i in items if pattern.match(i)]
105             elif "group" in value:
106                 items = get_items(p, value["group"])
107                 groups = {}
108                 for i in items:
109                     p = pattern.match(i)
110                     if p:
111                         add_to_group(groups, p)
112                 return [groups[k] for k in groups]
113             elif "extract" in value:
114                 items = get_items(p, value["extract"])
115                 r = []
116                 for i in items:
117                     p = pattern.match(i)
118                     if p:
119                         r.append(p.groups())
120                 return r
121     elif isinstance(c, list):
122         return expand_list(p, c)
123     elif isinstance(c, basestring):
124         return [subst.do_substitution(p, c)]
125
126     return []
127
128 def expand_list(p, l):
129     if isinstance(l, basestring):
130         return expand_item(p, l)
131     else:
132         return [exp for arg in l for exp in expand_item(p, arg)]
133
134 def add_to_group(gr, match):
135     m = ('^_^').join(match.groups())
136     if m not in gr:
137         gr[m] = []
138     gr[m].append(match.group(0))
139
140 def get_items(p, value):
141     if isinstance(value, dict):
142         return expand_list(p, value)
143
144     if isinstance(value, list):
145         return expand_list(p, value)
146
147     fn = subst.do_substitution(p, value)
148     mode = os.stat(fn).st_mode
149     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
150     if mode is not None:
151         if stat.S_ISDIR(mode):
152             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
153         elif stat.S_ISREG(mode):
154             with open(fn) as f:
155                 items = [line.rstrip("\r\n") for line in f]
156         return items
157     else:
158         return None
159
160 stdoutname = None
161 stdoutfile = None
162 stdinname = None
163 stdinfile = None
164 rcode = 1
165
166 def recursive_foreach(params, fvars):
167     var = fvars[0]
168     fvars = fvars[1:]
169     items = get_items(params, params[var])
170     logger.info("parallelizing on %s with items %s" % (var, items))
171     if items is not None:
172         for i in items:
173             params = copy.copy(params)
174             params[var] = i
175             if len(fvars) > 0:
176                 recursive_foreach(params, fvars)
177             else:
178                 arvados.api().job_tasks().create(body={
179                     'job_uuid': arvados.current_job()['uuid'],
180                     'created_by_job_task_uuid': arvados.current_task()['uuid'],
181                     'sequence': 1,
182                     'parameters': params
183                     }
184                 ).execute()
185     else:
186         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
187         sys.exit(1)
188
189 try:
190     if "task.foreach" in jobp:
191         if arvados.current_task()['sequence'] == 0:
192             # This is the first task to start the other tasks and exit
193             fvars = jobp["task.foreach"]
194             if isinstance(fvars, basestring):
195                 fvars = [fvars]
196             if not isinstance(fvars, list) or len(fvars) == 0:
197                 logger.error("value of task.foreach must be a string or non-empty list")
198                 sys.exit(1)
199             recursive_foreach(jobp, jobp["task.foreach"])
200             if "task.vwd" in jobp:
201                 # Set output of the first task to the base vwd collection so it
202                 # will be merged with output fragments from the other tasks by
203                 # crunch.
204                 arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
205             else:
206                 arvados.current_task().set_output(None)
207             sys.exit(0)
208     else:
209         # This is the only task so taskp/jobp are the same
210         taskp = jobp
211
212     if "task.vwd" in taskp:
213         # Populate output directory with symlinks to files in collection
214         vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
215
216     if "task.cwd" in taskp:
217         os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
218
219     cmd = expand_list(taskp, taskp["command"])
220
221     if "task.stdin" in taskp:
222         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
223         stdinfile = open(stdinname, "rb")
224
225     if "task.stdout" in taskp:
226         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
227         stdoutfile = open(stdoutname, "wb")
228
229     logger.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
230 except subst.SubstitutionError as e:
231     logger.error(str(e))
232     logger.error("task parameters were:")
233     logger.error(pprint.pformat(taskp))
234     sys.exit(1)
235 except Exception as e:
236     logger.exception("caught exception")
237     logger.error("task parameters were:")
238     logger.error(pprint.pformat(taskp))
239     sys.exit(1)
240
241 try:
242     sp = subprocess.Popen(cmd, shell=False, stdin=stdinfile, stdout=stdoutfile)
243     sig = SigHandler()
244
245     # forward signals to the process.
246     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
247     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
248     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
249
250     # wait for process to complete.
251     rcode = sp.wait()
252
253     if sig.sig is not None:
254         logger.critical("terminating on signal %s" % sig.sig)
255         sys.exit(2)
256     else:
257         logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
258
259 except Exception as e:
260     logger.exception("caught exception")
261
262 # restore default signal handlers.
263 signal.signal(signal.SIGINT, signal.SIG_DFL)
264 signal.signal(signal.SIGTERM, signal.SIG_DFL)
265 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
266
267 for l in links:
268     os.unlink(l)
269
270 logger.info("the following output files will be saved to keep:")
271
272 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
273
274 logger.info("start writing output to keep")
275
276 if "task.vwd" in taskp:
277     if "task.foreach" in jobp:
278         # This is a subtask, so don't merge with the original collection, that will happen at the end
279         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
280     else:
281         # Just a single task, so do merge with the original collection
282         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
283 else:
284     outcollection = robust_put.upload(outdir, logger)
285
286 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
287                                      body={
288                                          'output': outcollection,
289                                          'success': (rcode == 0),
290                                          'progress':1.0
291                                      }).execute()
292
293 sys.exit(rcode)