+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
import arvados
import os
-import robust_put
import stat
-import arvados.command.run
+import arvados.commands.run
+import logging
# Implements "Virtual Working Directory"
# Provides a way of emulating a shared writable directory in Keep based
for f in files:
os.symlink(os.path.join(root, f), os.path.join(target_dir, rel, f))
-def is_collection(fn):
- if os.path.exists
-
-# Delete all symlinks and check in any remaining normal files.
-# If merge == True, merge the manifest with source_collection and return a
-# CollectionReader for the combined collection.
def checkin(target_dir):
- # delete symlinks, commit directory, merge manifests and return combined
- # collection.
+ """Write files in `target_dir` to Keep.
+
+ Regular files or symlinks to files outside the keep mount are written to
+ Keep as normal files (Keep does not support symlinks).
+
+ Symlinks to files in the keep mount will result in files in the new
+ collection which reference existing Keep blocks, no data copying necessary.
+
+ Returns a new Collection object, with data flushed but the collection record
+ not saved to the API.
+
+ """
outputcollection = arvados.collection.Collection(num_retries=5)
collections = {}
+ logger = logging.getLogger("arvados")
+
+ last_error = None
for root, dirs, files in os.walk(target_dir):
for f in files:
- s = os.lstat(os.path.join(root, f))
- if stat.S_ISLNK(s.st_mode):
- # 1. check if it is a link into a collection
- real = os.path.split(os.path.realpath(os.path.join(root, f)))
- (pdh, branch) = arvados.command.run.is_in_collection(real[0], real[1])
- if pdh is not None:
- # 2. load collection
- if pdh not in collections:
- collections[pdh] = arvados.collection.CollectionReader(pdh,
- api_client=outputcollection._my_api(),
- keep_client=outputcollection._my_keep(),
- num_retries=5)
- # 3. copy arvfile to new collection
- outputcollection.copy(branch, branch, source_collection=collections[pdh])
-
- elif stat.S_ISREG(s.st_mode):
- reldir = root[len(target_dir):]
- with outputcollection.open(os.path.join(reldir, f), "wb") as writer:
- with open(os.path.join(root, f), "rb") as reader:
- dat = reader.read(64*1024)
- while dat:
- writer.write(dat)
+ try:
+ s = os.lstat(os.path.join(root, f))
+
+ writeIt = False
+
+ if stat.S_ISREG(s.st_mode):
+ writeIt = True
+ elif stat.S_ISLNK(s.st_mode):
+ # 1. check if it is a link into a collection
+ real = os.path.split(os.path.realpath(os.path.join(root, f)))
+ (pdh, branch) = arvados.commands.run.is_in_collection(real[0], real[1])
+ if pdh is not None:
+ # 2. load collection
+ if pdh not in collections:
+ # 2.1 make sure it is flushed (see #5787 note 11)
+ fd = os.open(real[0], os.O_RDONLY)
+ os.fsync(fd)
+ os.close(fd)
+
+ # 2.2 get collection from API server
+ collections[pdh] = arvados.collection.CollectionReader(pdh,
+ api_client=outputcollection._my_api(),
+ keep_client=outputcollection._my_keep(),
+ num_retries=5)
+ # 3. copy arvfile to new collection
+ outputcollection.copy(branch, os.path.join(root[len(target_dir):], f), source_collection=collections[pdh])
+ else:
+ writeIt = True
+
+ if writeIt:
+ reldir = root[len(target_dir):]
+ with outputcollection.open(os.path.join(reldir, f), "wb") as writer:
+ with open(os.path.join(root, f), "rb") as reader:
dat = reader.read(64*1024)
+ while dat:
+ writer.write(dat)
+ dat = reader.read(64*1024)
+ except (IOError, OSError) as e:
+ logger.error(e)
+ last_error = e
- return outputcollection.manifest_text()
+ return (outputcollection, last_error)