def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
name=None, owner_uuid=None, ensure_unique_name=False,
- num_retries=None, write_copies=None, replication=None,
+ num_retries=None, replication_desired=None,
filename=None, update_time=60.0):
self.paths = paths
self.resume = resume
self.owner_uuid = owner_uuid
self.ensure_unique_name = ensure_unique_name
self.num_retries = num_retries
- self.write_copies = write_copies
- self.replication = replication
+ self.replication_desired = replication_desired
self.filename = filename
self._state_lock = threading.Lock()
self._state = None # Previous run state (file list & manifest)
"""
Start supporting thread & file uploading
"""
+ self._checkpointer.daemon = True
self._checkpointer.start()
try:
for path in self.paths:
# Stop the thread before doing anything else
self._stop_checkpointer.set()
self._checkpointer.join()
- # Successful upload, one last _update()
- self._update()
- if self.resume:
- self._cache_file.close()
- # Correct the final written bytes count
- self.bytes_written -= self.bytes_skipped
+ # Commit all & one last _update()
+ self.manifest_text()
+ self._update()
+ if self.resume:
+ self._cache_file.close()
+ # Correct the final written bytes count
+ self.bytes_written -= self.bytes_skipped
def save_collection(self):
with self._collection_lock:
self._my_collection().save_new(
- name=self.name, owner_uuid=self.owner_uuid,
- ensure_unique_name=self.ensure_unique_name,
- num_retries=self.num_retries,
- replication_desired=self.replication)
+ name=self.name, owner_uuid=self.owner_uuid,
+ ensure_unique_name=self.ensure_unique_name,
+ num_retries=self.num_retries)
def destroy_cache(self):
if self.resume:
try:
os.unlink(self._cache_filename)
except OSError as error:
- if error.errno != errno.ENOENT: # That's what we wanted anyway.
+ # That's what we wanted anyway.
+ if error.errno != errno.ENOENT:
raise
self._cache_file.close()
# Update cache, if resume enabled
if self.resume:
with self._state_lock:
- self._state['manifest'] = self._my_collection().manifest_text()
+ # Get the manifest text without comitting pending blocks
+ self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
if self.resume:
self._save_state()
# Call the reporter, if any
'mtime': os.path.getmtime(source),
'size' : os.path.getsize(source)
}
- cached_file_data = self._state['files'][source]
+ with self._state_lock:
+ cached_file_data = self._state['files'][source]
# See if this file was already uploaded at least partially
if file_in_collection:
if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
output.close()
def _write(self, source_fd, output):
+ first_read = True
while True:
data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
- if not data:
+ # Allow an empty file to be written
+ if not data and not first_read:
break
+ if first_read:
+ first_read = False
output.write(data)
def _my_collection(self):
if self.resume and manifest is not None:
# Create collection from saved state
self._collection = arvados.collection.Collection(
- manifest,
- num_write_copies=self.write_copies)
+ manifest,
+ replication_desired=self.replication_desired)
else:
# Create new collection
self._collection = arvados.collection.Collection(
- num_write_copies=self.write_copies)
+ replication_desired=self.replication_desired)
return self._collection
def _setup_state(self):
through subcollections
"""
if isinstance(item, arvados.arvfile.ArvadosFile):
- locators = []
- for segment in item.segments():
- loc = segment.locator
- locators.append(loc)
- return locators
+ if item.size() == 0:
+ # Empty file locator
+ return ["d41d8cd98f00b204e9800998ecf8427e+0"]
+ else:
+ locators = []
+ for segment in item.segments():
+ loc = segment.locator
+ locators.append(loc)
+ return locators
elif isinstance(item, arvados.collection.Collection):
l = [self._datablocks_on_item(x) for x in item.values()]
# Fast list flattener method taken from:
print >>stderr, error
sys.exit(1)
- # write_copies diverges from args.replication here.
- # args.replication is how many copies we will instruct Arvados to
- # maintain (by passing it in collections().create()) after all
- # data is written -- and if None was given, we'll use None there.
- # Meanwhile, write_copies is how many copies of each data block we
- # write to Keep, which has to be a number.
- #
- # If we simply changed args.replication from None to a default
- # here, we'd end up erroneously passing the default replication
- # level (instead of None) to collections().create().
- write_copies = (args.replication or
- api_client._rootDesc.get('defaultCollectionReplication', 2))
-
if args.progress:
reporter = progress_writer(human_progress)
elif args.batch_progress:
bytes_expected = expected_bytes_for(args.paths)
try:
writer = ArvPutUploadJob(paths = args.paths,
- resume = args.resume,
- reporter = reporter,
- bytes_expected = bytes_expected,
- num_retries = args.retries,
- write_copies = write_copies,
- replication = args.replication,
- name = collection_name,
- owner_uuid = project_uuid,
- ensure_unique_name = True)
+ resume = args.resume,
+ filename = args.filename,
+ reporter = reporter,
+ bytes_expected = bytes_expected,
+ num_retries = args.retries,
+ replication_desired = args.replication,
+ name = collection_name,
+ owner_uuid = project_uuid,
+ ensure_unique_name = True)
except ResumeCacheConflict:
print >>stderr, "\n".join([
"arv-put: Another process is already uploading this data.",
if status != 0:
sys.exit(status)
- else:
- writer.destroy_cache()
+ # Success!
+ writer.destroy_cache()
return output