X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1281ecab8f2396739ee9232c36796e46cd551426..fb9730d1da1eab233e4e7ea01c1015cd70ba6cf7:/crunch_scripts/crunchutil/vwd.py diff --git a/crunch_scripts/crunchutil/vwd.py b/crunch_scripts/crunchutil/vwd.py index 9d5578a30e..0ae1c46209 100644 --- a/crunch_scripts/crunchutil/vwd.py +++ b/crunch_scripts/crunchutil/vwd.py @@ -1,8 +1,8 @@ import arvados import os -import robust_put import stat -import arvados.command.run +import arvados.commands.run +import logging # Implements "Virtual Working Directory" # Provides a way of emulating a shared writable directory in Keep based @@ -33,15 +33,19 @@ def checkout(source_collection, target_dir, keepmount=None): for f in files: os.symlink(os.path.join(root, f), os.path.join(target_dir, rel, f)) -def is_collection(fn): - if os.path.exists - -# Delete all symlinks and check in any remaining normal files. -# If merge == True, merge the manifest with source_collection and return a -# CollectionReader for the combined collection. def checkin(target_dir): - # delete symlinks, commit directory, merge manifests and return combined - # collection. + """Write files in `target_dir` to Keep. + + Regular files or symlinks to files outside the keep mount are written to + Keep as normal files (Keep does not support symlinks). + + Symlinks to files in the keep mount will result in files in the new + collection which reference existing Keep blocks, no data copying necessary. + + Returns a new Collection object, with data flushed but the collection record + not saved to the API. + + """ outputcollection = arvados.collection.Collection(num_retries=5) @@ -50,30 +54,50 @@ def checkin(target_dir): collections = {} + logger = logging.getLogger("arvados") + + last_error = None for root, dirs, files in os.walk(target_dir): for f in files: - s = os.lstat(os.path.join(root, f)) - if stat.S_ISLNK(s.st_mode): - # 1. check if it is a link into a collection - real = os.path.split(os.path.realpath(os.path.join(root, f))) - (pdh, branch) = arvados.command.run.is_in_collection(real[0], real[1]) - if pdh is not None: - # 2. load collection - if pdh not in collections: - collections[pdh] = arvados.collection.CollectionReader(pdh, - api_client=outputcollection._my_api(), - keep_client=outputcollection._my_keep(), - num_retries=5) - # 3. copy arvfile to new collection - outputcollection.copy(branch, branch, source_collection=collections[pdh]) - - elif stat.S_ISREG(s.st_mode): - reldir = root[len(target_dir):] - with outputcollection.open(os.path.join(reldir, f), "wb") as writer: - with open(os.path.join(root, f), "rb") as reader: - dat = reader.read(64*1024) - while dat: - writer.write(dat) + try: + s = os.lstat(os.path.join(root, f)) + + writeIt = False + + if stat.S_ISREG(s.st_mode): + writeIt = True + elif stat.S_ISLNK(s.st_mode): + # 1. check if it is a link into a collection + real = os.path.split(os.path.realpath(os.path.join(root, f))) + (pdh, branch) = arvados.commands.run.is_in_collection(real[0], real[1]) + if pdh is not None: + # 2. load collection + if pdh not in collections: + # 2.1 make sure it is flushed (see #5787 note 11) + fd = os.open(real[0], os.O_RDONLY) + os.fsync(fd) + os.close(fd) + + # 2.2 get collection from API server + collections[pdh] = arvados.collection.CollectionReader(pdh, + api_client=outputcollection._my_api(), + keep_client=outputcollection._my_keep(), + num_retries=5) + # 3. copy arvfile to new collection + outputcollection.copy(branch, os.path.join(root[len(target_dir):], f), source_collection=collections[pdh]) + else: + writeIt = True + + if writeIt: + reldir = root[len(target_dir):] + with outputcollection.open(os.path.join(reldir, f), "wb") as writer: + with open(os.path.join(root, f), "rb") as reader: dat = reader.read(64*1024) + while dat: + writer.write(dat) + dat = reader.read(64*1024) + except (IOError, OSError) as e: + logger.error(e) + last_error = e - return outputcollection.manifest_text() + return (outputcollection, last_error)