md5 = hashlib.md5()
md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
realpaths = sorted(os.path.realpath(path) for path in args.paths)
- md5.update(''.join(realpaths))
+ md5.update('\0'.join(realpaths))
if any(os.path.isdir(path) for path in realpaths):
md5.update(str(max(args.max_manifest_depth, -1)))
elif args.filename:
return writer
def cache_state(self):
+ if self.cache is None:
+ return
state = self.dump_state()
# Transform attributes for serialization.
for attr, value in state.items():
self.reporter(self.bytes_written, self.bytes_expected)
def flush_data(self):
- bytes_buffered = self._data_buffer_len
+ start_buffer_len = self._data_buffer_len
+ start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
super(ArvPutCollectionWriter, self).flush_data()
- # Checkpoint and report progress if data was PUT to Keep.
- if self._data_buffer_len < start_buffer_len:
+ if self._data_buffer_len < start_buffer_len: # We actually PUT data.
self.bytes_written += (start_buffer_len - self._data_buffer_len)
self.report_progress()
+ if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
+ self.cache_state()
def _record_new_input(self, input_type, source_name, dest_name):
# The key needs to be a list because that's what we'll get back
outfile.write(progress_func(bytes_written, bytes_expected))
return write_progress
+def exit_signal_handler(sigcode, frame):
+ sys.exit(-sigcode)
+
def main(arguments=None):
- ResumeCache.setup_user_cache()
args = parse_arguments(arguments)
if args.progress:
reporter = progress_writer(machine_progress)
else:
reporter = None
+ bytes_expected = expected_bytes_for(args.paths)
try:
+ ResumeCache.setup_user_cache()
resume_cache = ResumeCache(ResumeCache.make_path(args))
- if not args.resume:
- resume_cache.restart()
+ except (IOError, OSError):
+ # Couldn't open cache directory/file. Continue without it.
+ resume_cache = None
+ writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
except ResumeCacheConflict:
print "arv-put: Another process is already uploading this data."
sys.exit(1)
+ else:
+ if not args.resume:
+ resume_cache.restart()
+ writer = ArvPutCollectionWriter.from_cache(
+ resume_cache, reporter, bytes_expected)
- writer = ArvPutCollectionWriter.from_cache(
- resume_cache, reporter, expected_bytes_for(args.paths))
-
- def signal_handler(sigcode, frame):
- writer.cache_state()
- sys.exit(-sigcode)
# Install our signal handler for each code in CAUGHT_SIGNALS, and save
# the originals.
- orig_signal_handlers = {sigcode: signal.signal(sigcode, signal_handler)
+ orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
for sigcode in CAUGHT_SIGNALS}
if writer.bytes_written > 0: # We're resuming a previous upload.
- print >>sys.stderr, "arv-put: Resuming previous upload. Bypass with the --no-resume option."
+ print >>sys.stderr, "\n".join([
+ "arv-put: Resuming previous upload from last checkpoint.",
+ " Use the --no-resume option to start over."])
writer.report_progress()
- try:
- writer.do_queued_work() # Do work resumed from cache.
- for path in args.paths: # Copy file data to Keep.
- if os.path.isdir(path):
- writer.write_directory_tree(
- path, max_manifest_depth=args.max_manifest_depth)
- else:
- writer.start_new_stream()
- writer.write_file(path, args.filename or os.path.basename(path))
- except Exception:
- writer.cache_state()
- raise
+ writer.do_queued_work() # Do work resumed from cache.
+ for path in args.paths: # Copy file data to Keep.
+ if os.path.isdir(path):
+ writer.write_directory_tree(
+ path, max_manifest_depth=args.max_manifest_depth)
+ else:
+ writer.start_new_stream()
+ writer.write_file(path, args.filename or os.path.basename(path))
+ writer.finish_current_stream()
+
+ if args.progress: # Print newline to split stderr from stdout for humans.
+ print >>sys.stderr
if args.stream:
print writer.manifest_text(),
elif args.raw:
- writer.finish_current_stream()
print ','.join(writer.data_locators())
else:
# Register the resulting collection in Arvados.
- arvados.api().collections().create(
+ collection = arvados.api().collections().create(
body={
'uuid': writer.finish(),
'manifest_text': writer.manifest_text(),
).execute()
# Print the locator (uuid) of the new collection.
- print writer.finish()
+ print collection['uuid']
for sigcode, orig_handler in orig_signal_handlers.items():
signal.signal(sigcode, orig_handler)
- resume_cache.destroy()
+ if resume_cache is not None:
+ resume_cache.destroy()
if __name__ == '__main__':
main()