From 1281ecab8f2396739ee9232c36796e46cd551426 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 1 May 2015 10:32:01 -0400 Subject: [PATCH] 5787: run-command uploading uses new collection API and supports symlinks into Keep --- crunch_scripts/crunchutil/vwd.py | 47 ++++++++++++++++++++++++-------- crunch_scripts/run-command | 24 ++++++---------- 2 files changed, 45 insertions(+), 26 deletions(-) diff --git a/crunch_scripts/crunchutil/vwd.py b/crunch_scripts/crunchutil/vwd.py index 3d54c9c2b3..9d5578a30e 100644 --- a/crunch_scripts/crunchutil/vwd.py +++ b/crunch_scripts/crunchutil/vwd.py @@ -2,6 +2,7 @@ import arvados import os import robust_put import stat +import arvados.command.run # Implements "Virtual Working Directory" # Provides a way of emulating a shared writable directory in Keep based @@ -32,23 +33,47 @@ def checkout(source_collection, target_dir, keepmount=None): for f in files: os.symlink(os.path.join(root, f), os.path.join(target_dir, rel, f)) +def is_collection(fn): + if os.path.exists + # Delete all symlinks and check in any remaining normal files. # If merge == True, merge the manifest with source_collection and return a # CollectionReader for the combined collection. -def checkin(source_collection, target_dir, merge=True): +def checkin(target_dir): # delete symlinks, commit directory, merge manifests and return combined # collection. + + outputcollection = arvados.collection.Collection(num_retries=5) + + if target_dir[-1:] != '/': + target_dir += '/' + + collections = {} + for root, dirs, files in os.walk(target_dir): for f in files: s = os.lstat(os.path.join(root, f)) if stat.S_ISLNK(s.st_mode): - os.unlink(os.path.join(root, f)) - - uuid = robust_put.upload(target_dir) - if merge: - cr1 = arvados.CollectionReader(source_collection) - cr2 = arvados.CollectionReader(uuid) - combined = arvados.CollectionReader(cr1.manifest_text() + cr2.manifest_text()) - return combined - else: - return arvados.CollectionReader(uuid) + # 1. check if it is a link into a collection + real = os.path.split(os.path.realpath(os.path.join(root, f))) + (pdh, branch) = arvados.command.run.is_in_collection(real[0], real[1]) + if pdh is not None: + # 2. load collection + if pdh not in collections: + collections[pdh] = arvados.collection.CollectionReader(pdh, + api_client=outputcollection._my_api(), + keep_client=outputcollection._my_keep(), + num_retries=5) + # 3. copy arvfile to new collection + outputcollection.copy(branch, branch, source_collection=collections[pdh]) + + elif stat.S_ISREG(s.st_mode): + reldir = root[len(target_dir):] + with outputcollection.open(os.path.join(reldir, f), "wb") as writer: + with open(os.path.join(root, f), "rb") as reader: + dat = reader.read(64*1024) + while dat: + writer.write(dat) + dat = reader.read(64*1024) + + return outputcollection.manifest_text() diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command index 1fcdb40dc4..ae2233e491 100755 --- a/crunch_scripts/run-command +++ b/crunch_scripts/run-command @@ -61,8 +61,6 @@ else: if 'TASK_KEEPMOUNT' not in os.environ: os.environ['TASK_KEEPMOUNT'] = '/keep' -links = [] - def sub_tmpdir(v): return os.path.join(arvados.current_task().tmpdir, 'tmpdir') @@ -415,24 +413,20 @@ signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGQUIT, signal.SIG_DFL) -for l in links: - os.unlink(l) - logger.info("the following output files will be saved to keep:") -subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr, cwd=outdir) +subprocess.call(["find", ".", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr, cwd=outdir) logger.info("start writing output to keep") -if "task.vwd" in taskp: - if "task.foreach" in jobp: - # This is a subtask, so don't merge with the original collection, that will happen at the end - outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text() - else: - # Just a single task, so do merge with the original collection - outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text() -else: - outcollection = robust_put.upload(outdir, logger) +if "task.vwd" in taskp and "task.foreach" in jobp: + for root, dirs, files in os.walk(outdir): + for f in files: + s = os.lstat(os.path.join(root, f)) + if stat.S_ISLNK(s.st_mode): + os.unlink(os.path.join(root, f)) + +outcollection = vwd.checkin(outdir).manifest_text() # Success if we ran any subprocess, and they all exited 0. success = rcode and all(status == 0 for status in rcode.itervalues()) -- 2.30.2