From: Ward Vandewege Date: Mon, 30 Jun 2014 19:12:40 +0000 (-0400) Subject: Merge branch 'master' into 3118-docker-fixes X-Git-Tag: 1.1.0~2504^2~2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/87a7a3fbb7cf5231fdcdcd72bc3199dc15f95b71?hp=9b9ceec0908bdf769109d4df73b3afe1e693c631 Merge branch 'master' into 3118-docker-fixes --- diff --git a/crunch_scripts/collection-merge b/crunch_scripts/collection-merge old mode 100644 new mode 100755 index 50e90f7f0a..f16d62466a --- a/crunch_scripts/collection-merge +++ b/crunch_scripts/collection-merge @@ -1,13 +1,57 @@ #!/usr/bin/env python import arvados +import md5 +import subst +import subprocess +import os +import hashlib -inputs = arvados.current_job()['script_parameters']['input'] -if not isinstance(inputs, (list,tuple)): - inputs = [inputs] +p = arvados.current_job()['script_parameters'] -out_manifest = '' -for locator in inputs: - out_manifest += arvados.CollectionReader(locator).manifest_text() +merged = "" +src = [] +for c in p["input"]: + c = subst.do_substitution(p, c) + i = c.find('/') + if i == -1: + src.append(c) + merged += arvados.CollectionReader(c).manifest_text() + else: + src.append(c[0:i]) + cr = arvados.CollectionReader(c[0:i]) + j = c.rfind('/') + stream = c[i+1:j] + if stream == "": + stream = "." + fn = c[(j+1):] + for s in cr.all_streams(): + if s.name() == stream: + if fn in s.files(): + merged += s.files()[fn].as_manifest() -arvados.current_task().set_output(arvados.Keep.put(out_manifest)) +crm = arvados.CollectionReader(merged) + +combined = crm.manifest_text(strip=True) + +m = hashlib.new('md5') +m.update(combined) + +uuid = "{}+{}".format(m.hexdigest(), len(combined)) + +collection = arvados.api().collections().create( + body={ + 'uuid': uuid, + 'manifest_text': crm.manifest_text(), + }).execute() + +for s in src: + l = arvados.api().links().create(body={ + "link": { + "tail_uuid": s, + "head_uuid": uuid, + "link_class": "provenance", + "name": "provided" + }}).execute() + +arvados.current_task().set_output(uuid) diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command new file mode 100755 index 0000000000..528baab4c1 --- /dev/null +++ b/crunch_scripts/run-command @@ -0,0 +1,73 @@ +#!/usr/bin/env python + +import arvados +import re +import os +import subprocess +import sys +import shutil +import subst + +os.umask(0077) + +t = arvados.current_task().tmpdir + +os.chdir(arvados.current_task().tmpdir) +os.mkdir("tmpdir") +os.mkdir("output") + +os.chdir("output") + +if len(arvados.current_task()['parameters']) > 0: + p = arvados.current_task()['parameters'] +else: + p = arvados.current_job()['script_parameters'] + +links = [] + +def sub_link(v): + r = os.path.basename(v) + os.symlink(os.path.join(os.environ['TASK_KEEPMOUNT'], v) , r) + links.append(r) + return r + +def sub_tmpdir(v): + return os.path.join(arvados.current_task().tmpdir, 'tmpdir') + +subst.default_subs["link "] = sub_link +subst.default_subs["tmpdir"] = sub_tmpdir + +rcode = 1 + +try: + cmd = [] + for c in p["command"]: + cmd.append(subst.do_substitution(p, c)) + + stdoutname = None + stdoutfile = None + if "stdout" in p: + stdoutname = subst.do_substitution(p, p["stdout"]) + stdoutfile = open(stdoutname, "wb") + + print("Running command: {}{}".format(' '.join(cmd), (" > " + stdoutname) if stdoutname != None else "")) + + rcode = subprocess.call(cmd, stdout=stdoutfile) + +except Exception as e: + print("Caught exception {}".format(e)) + +finally: + for l in links: + os.unlink(l) + + out = arvados.CollectionWriter() + out.write_directory_tree(".", max_manifest_depth=0) + arvados.current_task().set_output(out.finish()) + +if rcode == 0: + os.chdir("..") + shutil.rmtree("tmpdir") + shutil.rmtree("output") + +sys.exit(rcode) diff --git a/crunch_scripts/subst.py b/crunch_scripts/subst.py new file mode 100644 index 0000000000..2598e1cc94 --- /dev/null +++ b/crunch_scripts/subst.py @@ -0,0 +1,71 @@ +import os +import glob + +def search(c): + DEFAULT = 0 + DOLLAR = 1 + + i = 0 + state = DEFAULT + start = None + depth = 0 + while i < len(c): + if c[i] == '\\': + i += 1 + elif state == DEFAULT: + if c[i] == '$': + state = DOLLAR + if depth == 0: + start = i + elif c[i] == ')': + if depth == 1: + return [start, i] + if depth > 0: + depth -= 1 + elif state == DOLLAR: + if c[i] == '(': + depth += 1 + state = DEFAULT + i += 1 + if depth != 0: + raise Exception("Substitution error, mismatched parentheses {}".format(c)) + return None + +def sub_file(v): + return os.path.join(os.environ['TASK_KEEPMOUNT'], v) + +def sub_dir(v): + d = os.path.dirname(v) + if d == '': + d = v + return os.path.join(os.environ['TASK_KEEPMOUNT'], d) + +def sub_basename(v): + return os.path.splitext(os.path.basename(v))[0] + +def sub_glob(v): + return glob.glob(v)[0] + +default_subs = {"file ": sub_file, + "dir ": sub_dir, + "basename ": sub_basename, + "glob ": sub_glob} + +def do_substitution(p, c, subs=default_subs): + while True: + #print("c is", c) + m = search(c) + if m != None: + v = do_substitution(p, c[m[0]+2 : m[1]]) + var = True + for sub in subs: + if v.startswith(sub): + r = subs[sub](v[len(sub):]) + var = False + break + if var: + r = p[v] + + c = c[:m[0]] + r + c[m[1]+1:] + else: + return c