4 logging.basicConfig(level=logging.INFO, format="run-command: %(message)s")
12 import crunchutil.subst as subst
14 import arvados.commands.put as put
20 import multiprocessing
21 import crunchutil.robust_put as robust_put
22 import crunchutil.vwd as vwd
26 t = arvados.current_task().tmpdir
28 api = arvados.api('v1')
30 os.chdir(arvados.current_task().tmpdir)
39 jobp = arvados.current_job()['script_parameters']
40 if len(arvados.current_task()['parameters']) > 0:
41 taskp = arvados.current_task()['parameters']
46 return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
52 return str(multiprocessing.cpu_count())
55 return os.environ['JOB_UUID']
58 return os.environ['TASK_UUID']
61 return os.environ['CRUNCH_SRC']
63 subst.default_subs["task.tmpdir"] = sub_tmpdir
64 subst.default_subs["task.outdir"] = sub_outdir
65 subst.default_subs["job.srcdir"] = sub_jobsrc
66 subst.default_subs["node.cores"] = sub_cores
67 subst.default_subs["job.uuid"] = sub_jobid
68 subst.default_subs["task.uuid"] = sub_taskid
70 class SigHandler(object):
74 def send_signal(self, sp, signum):
75 sp.send_signal(signum)
78 def expand_item(p, c):
79 if isinstance(c, dict):
80 if "foreach" in c and "command" in c:
82 items = get_items(p, p[var])
87 r.extend(expand_list(params, c["command"]))
89 elif isinstance(c, list):
90 return expand_list(p, c)
91 elif isinstance(c, str) or isinstance(c, unicode):
92 return [subst.do_substitution(p, c)]
96 def expand_list(p, l):
97 return [exp for arg in l for exp in expand_item(p, arg)]
99 def get_items(p, value):
100 if isinstance(value, list):
101 return expand_list(p, value)
103 fn = subst.do_substitution(p, value)
104 mode = os.stat(fn).st_mode
105 prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
107 if stat.S_ISDIR(mode):
108 items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
109 elif stat.S_ISREG(mode):
111 items = [line for line in f]
123 if "task.foreach" in jobp:
124 if arvados.current_task()['sequence'] == 0:
125 var = jobp["task.foreach"]
126 items = get_items(jobp, jobp[var])
127 logging.info("parallelizing on %s with items %s" % (var, items))
128 if items is not None:
130 params = copy.copy(jobp)
132 arvados.api().job_tasks().create(body={
133 'job_uuid': arvados.current_job()['uuid'],
134 'created_by_job_task_uuid': arvados.current_task()['uuid'],
139 if "task.vwd" in jobp:
140 # Base vwd collection will be merged with output fragments from
141 # the other tasks by crunch.
142 arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
144 arvados.current_task().set_output(None)
151 if "task.vwd" in taskp:
152 # Populate output directory with symlinks to files in collection
153 vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
155 if "task.cwd" in taskp:
156 os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
158 cmd = expand_list(taskp, taskp["command"])
160 if "task.stdin" in taskp:
161 stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
162 stdinfile = open(stdinname, "rb")
164 if "task.stdout" in taskp:
165 stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
166 stdoutfile = open(stdoutname, "wb")
168 logging.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
170 except Exception as e:
171 logging.exception("caught exception")
172 logging.error("task parameters was:")
173 logging.error(pprint.pformat(taskp))
177 sp = subprocess.Popen(cmd, shell=False, stdin=stdinfile, stdout=stdoutfile)
180 # forward signals to the process.
181 signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
182 signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
183 signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
185 # wait for process to complete.
188 if sig.sig is not None:
189 logging.critical("terminating on signal %s" % sig.sig)
192 logging.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
194 except Exception as e:
195 logging.exception("caught exception")
197 # restore default signal handlers.
198 signal.signal(signal.SIGINT, signal.SIG_DFL)
199 signal.signal(signal.SIGTERM, signal.SIG_DFL)
200 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
205 logging.info("the following output files will be saved to keep:")
207 subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr)
209 logging.info("start writing output to keep")
211 if "task.vwd" in taskp:
212 if "task.foreach" in jobp:
213 # This is a subtask, so don't merge with the original collection, that will happen at the end
214 outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
216 # Just a single task, so do merge with the original collection
217 outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
219 outcollection = robust_put.upload(outdir)
221 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
223 'output': outcollection,
224 'success': (rcode == 0),