""")
_group.add_argument('--update-collection', type=str, default=None,
- dest='update_collection', help="""
+ dest='update_collection', metavar="UUID", help="""
Update an existing collection identified by the given Arvados collection
-UUID or manifest block locator. All new local files will be uploaded.
+UUID. All new local files will be uploaded.
""")
upload_opts.add_argument('--use-filename', type=str, default=None,
pass
+class ResumeCacheInvalid(Exception):
+ pass
+
+
class ResumeCacheConflict(Exception):
pass
CACHE_DIR = '.cache/arvados/arv-put'
EMPTY_STATE = {
'manifest' : None, # Last saved manifest checkpoint
- 'files' : {} # Previous run file list: {path : {size, mtime}}
+ 'files' : {}, # Previous run file list: {path : {size, mtime}}
+ 'collection_uuid': None, # Saved collection's UUID
}
def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
filename=None, update_time=1.0, update_collection=None):
self.paths = paths
self.resume = resume
- self.update_collection = False
+ self.update = False
self.reporter = reporter
self.bytes_expected = bytes_expected
self.bytes_written = 0
self._cache_file = None
self._collection = None
self._collection_lock = threading.Lock()
+ self._local_collection = None # Previous collection manifest, used on update mode.
self._stop_checkpointer = threading.Event()
self._checkpointer = threading.Thread(target=self._update_task)
self._update_task_time = update_time # How many seconds wait between update runs
self.logger = logging.getLogger('arvados.arv_put')
+
# Load an already existing collection for update
- if update_collection:
- if re.match(arvados.util.keep_locator_pattern, update_collection) or re.match(arvados.util.collection_uuid_pattern, update_collection):
- try:
- self._collection = arvados.collection.Collection(update_collection)
- except arvados.errors.ApiError as error:
- raise CollectionUpdateError("Cannot update collection {} ({})".format(update_collection, error))
- else:
- self.update_collection = True
- self.resume = True
+ if update_collection and re.match(arvados.util.collection_uuid_pattern,
+ update_collection):
+ try:
+ self._collection = arvados.collection.Collection(update_collection)
+ except arvados.errors.ApiError as error:
+ raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
else:
- # Collection locator provided, but unknown format
- raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
+ self.update = True
+ self.resume = True
+ elif update_collection:
+ # Collection locator provided, but unknown format
+ raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
# Load cached data if any and if needed
- self._setup_state(update_collection)
-
- def start(self):
+ self._setup_state()
+ if self.update:
+ # Check if cache data belongs to the collection needed to be updated
+ if self._state['collection_uuid'] is None:
+ raise CollectionUpdateError("An initial upload is needed before being able to update the collection {}.".format(update_collection))
+ elif self._state['collection_uuid'] != update_collection:
+ raise CollectionUpdateError("Cached data doesn't belong to collection {}".format(update_collection))
+ elif self.resume:
+ # Check that cache data doesn't belong to an already created collection
+ if self._state['collection_uuid'] is not None:
+ raise ResumeCacheInvalid("Resume cache file '{}' belongs to existing collection {}".format(self._cache_filename, self._state['collection_uuid']))
+
+ def start(self, save_collection):
"""
Start supporting thread & file uploading
"""
self._checkpointer.join()
# Commit all & one last _update()
self.manifest_text()
+ if save_collection:
+ self.save_collection()
+ with self._state_lock:
+ self._state['collection_uuid'] = self._my_collection().manifest_locator()
self._update()
if self.resume:
self._cache_file.close()
def save_collection(self):
with self._collection_lock:
- if self.update_collection:
+ if self.update:
self._my_collection().save(num_retries = self.num_retries)
else:
self._my_collection().save_new(
# if stream_name == '.':
# stream_name = os.path.join('.', os.path.basename(path))
for item in os.listdir(path):
+ item_col_path = os.path.join(stream_name, item)
if os.path.isdir(os.path.join(path, item)):
+ self._my_collection().find_or_create(item_col_path,
+ arvados.collection.COLLECTION)
self._write_directory_tree(os.path.join(path, item),
- os.path.join(stream_name, item))
+ item_col_path)
else:
self._write_file(os.path.join(path, item),
- os.path.join(stream_name, item))
+ item_col_path)
def _write_stdin(self, filename):
with self._collection_lock:
def _write_file(self, source, filename):
resume_offset = 0
+ should_upload = True
+
if self.resume:
- # Check if file was already uploaded (at least partially)
- with self._collection_lock:
- try:
- file_in_collection = self._my_collection().find(filename)
- except IOError:
- # Not found
- file_in_collection = None
with self._state_lock:
# If no previous cached data on this file, store it for an eventual
# repeated run.
'size' : os.path.getsize(source)
}
cached_file_data = self._state['files'][source]
- # See if this file was already uploaded at least partially
- if file_in_collection:
- # If file already exist and we are updating the collection, ignore it
- # even if it's different from the local one.
- if self.update_collection:
- self.bytes_skipped += file_in_collection.size()
- return
-
- if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
- if cached_file_data['size'] == file_in_collection.size():
- # File already there, skip it.
- self.bytes_skipped += cached_file_data['size']
- return
- elif cached_file_data['size'] > file_in_collection.size():
- # File partially uploaded, resume!
- resume_offset = file_in_collection.size()
- else:
- # Inconsistent cache, re-upload the file
- self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+ # Check if file was already uploaded (at least partially)
+ with self._collection_lock:
+ file_in_collection = self._my_collection().find(filename)
+ if self.update:
+ file_in_local_collection = self._local_collection.find(filename)
+ # Decide what to do with this file.
+ should_upload = False
+ if not file_in_collection:
+ should_upload = True
+ elif self.update and file_in_local_collection != file_in_collection:
+ # File remotely modified.
+ should_upload = True
+ # From here, we are certain that the remote file is the same as last uploaded.
+ elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+ # Local file didn't change from last run.
+ if cached_file_data['size'] == file_in_collection.size():
+ # File already there, skip it.
+ self.bytes_skipped += cached_file_data['size']
+ elif cached_file_data['size'] > file_in_collection.size():
+ # File partially uploaded, resume!
+ resume_offset = file_in_collection.size()
+ should_upload = True
else:
- # Local file differs from cached data, re-upload it
- pass
+ # Inconsistent cache, re-upload the file
+ should_upload = True
+ self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+ elif cached_file_data['mtime'] < os.path.getmtime(source) and cached_file_data['size'] < os.path.getsize(source):
+ # File with appended data since last run
+ resume_offset = file_in_collection.size()
+ should_upload = True
+ else:
+ # Local file differs from cached data, re-upload it
+ should_upload = True
+
+ if should_upload is False:
+ return
+
with open(source, 'r') as source_fd:
+ if self.resume:
+ with self._state_lock:
+ self._state['files'][source]['mtime'] = os.path.getmtime(source)
+ self._state['files'][source]['size'] = os.path.getsize(source)
if resume_offset > 0:
# Start upload where we left off
with self._collection_lock:
replication_desired=self.replication_desired)
return self._collection
- def _setup_state(self, update_collection=None):
+ def _setup_state(self):
"""
Create a new cache file or load a previously existing one.
"""
if self.resume:
md5 = hashlib.md5()
md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
- if update_collection:
- md5.update(update_collection)
realpaths = sorted(os.path.realpath(path) for path in self.paths)
md5.update('\0'.join(realpaths))
if self.filename:
with self._state_lock:
try:
self._state = json.load(self._cache_file)
- if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+ if not set(['manifest', 'files', 'collection_uuid']).issubset(set(self._state.keys())):
# Cache at least partially incomplete, set up new cache
self._state = copy.deepcopy(self.EMPTY_STATE)
except ValueError:
# Cache file empty, set up new cache
self._state = copy.deepcopy(self.EMPTY_STATE)
+
+ # In update mode, load the previous manifest so we can check if files
+ # were modified remotely.
+ if self.update:
+ self._local_collection = arvados.collection.Collection(self._state['manifest'])
# Load how many bytes were uploaded on previous run
with self._collection_lock:
self.bytes_written = self._collection_size(self._my_collection())
reporter = None
bytes_expected = expected_bytes_for(args.paths)
+
try:
writer = ArvPutUploadJob(paths = args.paths,
resume = args.resume,
"arv-put: Another process is already uploading this data.",
" Use --no-resume if this is really what you want."])
sys.exit(1)
+ except ResumeCacheInvalid as error:
+ print >>stderr, "\n".join([
+ "arv-put: %s" % str(error),
+ " Use --no-resume or delete/move the cache file to upload to a new collection.",
+ " Use --update-collection otherwise."])
+ sys.exit(1)
except CollectionUpdateError as error:
print >>stderr, "\n".join([
"arv-put: %s" % str(error)])
writer.report_progress()
output = None
- writer.start()
+ try:
+ writer.start(save_collection=not(args.stream or args.raw))
+ except arvados.errors.ApiError as error:
+ print >>stderr, "\n".join([
+ "arv-put: %s" % str(error)])
+ sys.exit(1)
+
if args.progress: # Print newline to split stderr from stdout for humans.
print >>stderr
output = ','.join(writer.data_locators())
else:
try:
- writer.save_collection()
if args.update_collection:
print >>stderr, "Collection updated: '{}'".format(writer.collection_name())
else:
sys.exit(status)
# Success!
- writer.destroy_cache()
+ #writer.destroy_cache()
return output