self._stop_checkpointer = threading.Event()
self._checkpointer = threading.Thread(target=self._update_task)
self._update_task_time = update_time # How many seconds wait between update runs
+ self._files_to_upload = []
self.logger = logging.getLogger('arvados.arv_put')
if not self.use_cache and self.resume:
dirs.sort()
files.sort()
for f in files:
- self._write_file(os.path.join(root, f),
+ self._check_file(os.path.join(root, f),
os.path.join(root[len(prefixdir):], f))
else:
- self._write_file(os.path.abspath(path),
+ self._check_file(os.path.abspath(path),
self.filename or os.path.basename(path))
+ # Update bytes_written from current local collection and
+ # report initial progress.
+ self._update()
+ # Actual file upload
+ self._upload_files()
finally:
# Stop the thread before doing anything else
self._stop_checkpointer.set()
self._cache_file.close()
if save_collection:
self.save_collection()
- # Correct the final written bytes count
- self.bytes_written -= self.bytes_skipped
def save_collection(self):
if self.update:
self._write(sys.stdin, output)
output.close()
- def _write_file(self, source, filename):
+ def _check_file(self, source, filename):
+ """Check if this file needs to be uploaded"""
resume_offset = 0
should_upload = False
new_file_in_cache = False
-
# Record file path for updating the remote collection before exiting
self._file_paths.append(filename)
# Permission token expired, re-upload file. This will change whenever
# we have a API for refreshing tokens.
should_upload = True
+ self._local_collection.remove(filename)
elif cached_file_data['size'] == file_in_local_collection.size():
# File already there, skip it.
self.bytes_skipped += cached_file_data['size']
elif cached_file_data['size'] > file_in_local_collection.size():
# File partially uploaded, resume!
resume_offset = file_in_local_collection.size()
+ self.bytes_skipped += resume_offset
should_upload = True
else:
# Inconsistent cache, re-upload the file
should_upload = True
+ self._local_collection.remove(filename)
self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
# Local file differs from cached data, re-upload it.
else:
+ if file_in_local_collection:
+ self._local_collection.remove(filename)
should_upload = True
if should_upload:
+ self._files_to_upload.append((source, resume_offset, filename))
+
+ def _upload_files(self):
+ for source, resume_offset, filename in self._files_to_upload:
with open(source, 'r') as source_fd:
with self._state_lock:
self._state['files'][source]['mtime'] = os.path.getmtime(source)
# Start upload where we left off
output = self._local_collection.open(filename, 'a')
source_fd.seek(resume_offset)
- self.bytes_skipped += resume_offset
else:
# Start from scratch
output = self._local_collection.open(filename, 'w')
self._state = copy.deepcopy(self.EMPTY_STATE)
# Load the previous manifest so we can check if files were modified remotely.
self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
- # Load how many bytes were uploaded on previous run
- with self._collection_lock:
- self.bytes_written = self._collection_size(self._local_collection)
def _lock_file(self, fileobj):
try:
f.flush()
cwriter = arv_put.ArvPutUploadJob([f.name])
cwriter.start(save_collection=False)
- self.assertEqual(3, cwriter.bytes_written)
+ self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
# Don't destroy the cache, and start another upload
cwriter_new = arv_put.ArvPutUploadJob([f.name])
cwriter_new.start(save_collection=False)
cwriter_new.destroy_cache()
- self.assertEqual(0, cwriter_new.bytes_written)
+ self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
def make_progress_tester(self):
progression = []
replication_desired=1)
with self.assertRaises(SystemExit):
writer.start(save_collection=False)
- self.assertLess(writer.bytes_written,
- os.path.getsize(self.large_file_name))
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
# Retry the upload
writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
replication_desired=1)
writer2.start(save_collection=False)
- self.assertEqual(writer.bytes_written + writer2.bytes_written,
+ self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
os.path.getsize(self.large_file_name))
writer2.destroy_cache()