sleep so it doesn't go haywire
[arvados.git] / crunch_scripts / run-command
1 #!/usr/bin/env python
2
3 import logging
4
5 logger = logging.getLogger('run-command')
6 log_handler = logging.StreamHandler()
7 log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
8 logger.addHandler(log_handler)
9 logger.setLevel(logging.INFO)
10
11 import arvados
12 import re
13 import os
14 import subprocess
15 import sys
16 import shutil
17 import crunchutil.subst as subst
18 import time
19 import arvados.commands.put as put
20 import signal
21 import stat
22 import copy
23 import traceback
24 import pprint
25 import multiprocessing
26 import crunchutil.robust_put as robust_put
27 import crunchutil.vwd as vwd
28 import argparse
29 import json
30 import tempfile
31 import errno
32
33 parser = argparse.ArgumentParser()
34 parser.add_argument('--dry-run', action='store_true')
35 parser.add_argument('--script-parameters', type=str, default="{}")
36 args = parser.parse_args()
37
38 os.umask(0077)
39
40 if not args.dry_run:
41     api = arvados.api('v1')
42     t = arvados.current_task().tmpdir
43     os.chdir(arvados.current_task().tmpdir)
44     os.mkdir("tmpdir")
45     os.mkdir("output")
46
47     os.chdir("output")
48
49     outdir = os.getcwd()
50
51     taskp = None
52     jobp = arvados.current_job()['script_parameters']
53     if len(arvados.current_task()['parameters']) > 0:
54         taskp = arvados.current_task()['parameters']
55 else:
56     outdir = "/tmp"
57     jobp = json.loads(args.script_parameters)
58     os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
59     os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
60     os.environ['CRUNCH_SRC'] = '/tmp/crunche-src'
61     if 'TASK_KEEPMOUNT' not in os.environ:
62         os.environ['TASK_KEEPMOUNT'] = '/keep'
63
64 links = []
65
66 def sub_tmpdir(v):
67     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
68
69 def sub_outdir(v):
70     return outdir
71
72 def sub_cores(v):
73      return str(multiprocessing.cpu_count())
74
75 def sub_jobid(v):
76      return os.environ['JOB_UUID']
77
78 def sub_taskid(v):
79      return os.environ['TASK_UUID']
80
81 def sub_jobsrc(v):
82      return os.environ['CRUNCH_SRC']
83
84 subst.default_subs["task.tmpdir"] = sub_tmpdir
85 subst.default_subs["task.outdir"] = sub_outdir
86 subst.default_subs["job.srcdir"] = sub_jobsrc
87 subst.default_subs["node.cores"] = sub_cores
88 subst.default_subs["job.uuid"] = sub_jobid
89 subst.default_subs["task.uuid"] = sub_taskid
90
91 class SigHandler(object):
92     def __init__(self):
93         self.sig = None
94
95     def send_signal(self, subprocesses, signum):
96         for sp in subprocesses:
97             sp.send_signal(signum)
98         self.sig = signum
99
100 def add_to_group(gr, match):
101     m = match.groups()
102     if m not in gr:
103         gr[m] = []
104     gr[m].append(match.group(0))
105
106 def expand_item(p, c):
107     if isinstance(c, dict):
108         if "foreach" in c and "command" in c:
109             var = c["foreach"]
110             items = get_items(p, p[var])
111             r = []
112             for i in items:
113                 params = copy.copy(p)
114                 params[var] = i
115                 r.extend(expand_list(params, c["command"]))
116             return r
117         if "list" in c and "index" in c and "command" in c:
118             var = c["list"]
119             items = get_items(p, p[var])
120             params = copy.copy(p)
121             params[var] = items[int(c["index"])]
122             return expand_list(params, c["command"])
123         if "regex" in c:
124             pattern = re.compile(c["regex"])
125             if "filter" in c:
126                 items = get_items(p, p[c["filter"]])
127                 return [i for i in items if pattern.match(i)]
128             elif "group" in c:
129                 items = get_items(p, p[c["group"]])
130                 groups = {}
131                 for i in items:
132                     match = pattern.match(i)
133                     if match:
134                         add_to_group(groups, match)
135                 return [groups[k] for k in groups]
136             elif "extract" in c:
137                 items = get_items(p, p[c["extract"]])
138                 r = []
139                 for i in items:
140                     match = pattern.match(i)
141                     if match:
142                         r.append(list(match.groups()))
143                 return r
144     elif isinstance(c, list):
145         return expand_list(p, c)
146     elif isinstance(c, basestring):
147         return [subst.do_substitution(p, c)]
148
149     return []
150
151 def expand_list(p, l):
152     if isinstance(l, basestring):
153         return expand_item(p, l)
154     else:
155         return [exp for arg in l for exp in expand_item(p, arg)]
156
157 def get_items(p, value):
158     if isinstance(value, dict):
159         return expand_item(p, value)
160
161     if isinstance(value, list):
162         return expand_list(p, value)
163
164     fn = subst.do_substitution(p, value)
165     mode = os.stat(fn).st_mode
166     prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
167     if mode is not None:
168         if stat.S_ISDIR(mode):
169             items = [os.path.join(fn, l) for l in os.listdir(fn)]
170         elif stat.S_ISREG(mode):
171             with open(fn) as f:
172                 items = [line.rstrip("\r\n") for line in f]
173         return items
174     else:
175         return None
176
177 stdoutname = None
178 stdoutfile = None
179 stdinname = None
180 stdinfile = None
181 rcode = 1
182
183 def recursive_foreach(params, fvars):
184     var = fvars[0]
185     fvars = fvars[1:]
186     items = get_items(params, params[var])
187     logger.info("parallelizing on %s with items %s" % (var, items))
188     if items is not None:
189         for i in items:
190             params = copy.copy(params)
191             params[var] = i
192             if len(fvars) > 0:
193                 recursive_foreach(params, fvars)
194             else:
195                 if not args.dry_run:
196                     arvados.api().job_tasks().create(body={
197                         'job_uuid': arvados.current_job()['uuid'],
198                         'created_by_job_task_uuid': arvados.current_task()['uuid'],
199                         'sequence': 1,
200                         'parameters': params
201                     }).execute()
202                 else:
203                     logger.info(expand_list(params, params["command"]))
204     else:
205         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
206         sys.exit(1)
207
208 try:
209     if "task.foreach" in jobp:
210         if args.dry_run or arvados.current_task()['sequence'] == 0:
211             # This is the first task to start the other tasks and exit
212             fvars = jobp["task.foreach"]
213             if isinstance(fvars, basestring):
214                 fvars = [fvars]
215             if not isinstance(fvars, list) or len(fvars) == 0:
216                 logger.error("value of task.foreach must be a string or non-empty list")
217                 sys.exit(1)
218             recursive_foreach(jobp, jobp["task.foreach"])
219             if not args.dry_run:
220                 if "task.vwd" in jobp:
221                     # Set output of the first task to the base vwd collection so it
222                     # will be merged with output fragments from the other tasks by
223                     # crunch.
224                     arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
225                 else:
226                     arvados.current_task().set_output(None)
227             sys.exit(0)
228     else:
229         # This is the only task so taskp/jobp are the same
230         taskp = jobp
231
232     if not args.dry_run:
233         if "task.vwd" in taskp:
234             # Populate output directory with symlinks to files in collection
235             vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
236
237         if "task.cwd" in taskp:
238             os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
239
240     cmd = []
241     if isinstance(taskp["command"][0], list):
242         for c in taskp["command"]:
243             cmd.append(expand_list(taskp, c))
244     else:
245         cmd.append(expand_list(taskp, taskp["command"]))
246
247     if "task.stdin" in taskp:
248         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
249         if not args.dry_run:
250             stdinfile = open(stdinname, "rb")
251
252     if "task.stdout" in taskp:
253         stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
254         if not args.dry_run:
255             stdoutfile = open(stdoutname, "wb")
256
257     logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
258
259     if args.dry_run:
260         sys.exit(0)
261 except subst.SubstitutionError as e:
262     logger.error(str(e))
263     logger.error("task parameters were:")
264     logger.error(pprint.pformat(taskp))
265     sys.exit(1)
266 except Exception as e:
267     logger.exception("caught exception")
268     logger.error("task parameters were:")
269     logger.error(pprint.pformat(taskp))
270     sys.exit(1)
271
272 try:
273     subprocesses = []
274     close_streams = []
275     if stdinfile:
276         close_streams.append(stdinfile)
277     next_stdin = stdinfile
278
279     for i in xrange(len(cmd)):
280         if i == len(cmd)-1:
281             # this is the last command in the pipeline, so its stdout should go to stdoutfile
282             next_stdout = stdoutfile
283         else:
284             # this is an intermediate command in the pipeline, so its stdout should go to a pipe
285             next_stdout = subprocess.PIPE
286
287         sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout)
288
289         # Need to close the FDs on our side so that subcommands will get SIGPIPE if the
290         # consuming process ends prematurely.
291         if sp.stdout:
292             close_streams.append(sp.stdout)
293
294         # Send this processes's stdout to to the next process's stdin
295         next_stdin = sp.stdout
296
297         subprocesses.append(sp)
298
299     # File descriptors have been handed off to the subprocesses, so close them here.
300     for s in close_streams:
301         s.close()
302
303     # Set up signal handling
304     sig = SigHandler()
305
306     # Forward terminate signals to the subprocesses.
307     signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(subprocesses, signum))
308     signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(subprocesses, signum))
309     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
310
311     active = 1
312     while active > 0:
313         try:
314             os.waitpid(0, 0)
315         except OSError as e:
316             if e.errno == errno.ECHILD:
317                 # child already exited
318                 print "got ECHILD"
319                 time.sleep(1)
320                 for s in subprocesses:
321                     print s.poll()
322             else:
323                 raise
324         active = sum([1 if s.poll() is None else 0 for s in subprocesses])
325         print "active is %i" % active
326
327     # wait for process to complete.
328     rcode = subprocesses[len(subprocesses)-1].returncode
329
330     if sig.sig is not None:
331         logger.critical("terminating on signal %s" % sig.sig)
332         sys.exit(2)
333     else:
334         logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
335
336 except Exception as e:
337     logger.exception("caught exception")
338
339 # restore default signal handlers.
340 signal.signal(signal.SIGINT, signal.SIG_DFL)
341 signal.signal(signal.SIGTERM, signal.SIG_DFL)
342 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
343
344 for l in links:
345     os.unlink(l)
346
347 logger.info("the following output files will be saved to keep:")
348
349 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
350
351 logger.info("start writing output to keep")
352
353 if "task.vwd" in taskp:
354     if "task.foreach" in jobp:
355         # This is a subtask, so don't merge with the original collection, that will happen at the end
356         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
357     else:
358         # Just a single task, so do merge with the original collection
359         outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
360 else:
361     outcollection = robust_put.upload(outdir, logger)
362
363 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
364                                      body={
365                                          'output': outcollection,
366                                          'success': (rcode == 0),
367                                          'progress':1.0
368                                      }).execute()
369
370 sys.exit(rcode)