X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/06cf8e35c69540ae44890f8e96961e31cb7fbe66..6029fb64fd6372c577f9f143e6f3dcaded127ac3:/sdk/python/arvados/commands/put.py diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py index 3229125887..01bae2fead 100644 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -172,7 +172,7 @@ class ResumeCache(object): md5 = hashlib.md5() md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost')) realpaths = sorted(os.path.realpath(path) for path in args.paths) - md5.update(''.join(realpaths)) + md5.update('\0'.join(realpaths)) if any(os.path.isdir(path) for path in realpaths): md5.update(str(max(args.max_manifest_depth, -1))) elif args.filename: @@ -325,8 +325,10 @@ def progress_writer(progress_func, outfile=sys.stderr): outfile.write(progress_func(bytes_written, bytes_expected)) return write_progress +def exit_signal_handler(sigcode, frame): + sys.exit(-sigcode) + def main(arguments=None): - ResumeCache.setup_user_cache() args = parse_arguments(arguments) if args.progress: @@ -335,51 +337,55 @@ def main(arguments=None): reporter = progress_writer(machine_progress) else: reporter = None + bytes_expected = expected_bytes_for(args.paths) try: + ResumeCache.setup_user_cache() resume_cache = ResumeCache(ResumeCache.make_path(args)) - if not args.resume: - resume_cache.restart() + except (IOError, OSError): + # Couldn't open cache directory/file. Continue without it. + resume_cache = None + writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected) except ResumeCacheConflict: print "arv-put: Another process is already uploading this data." sys.exit(1) + else: + if not args.resume: + resume_cache.restart() + writer = ArvPutCollectionWriter.from_cache( + resume_cache, reporter, bytes_expected) - writer = ArvPutCollectionWriter.from_cache( - resume_cache, reporter, expected_bytes_for(args.paths)) - - def signal_handler(sigcode, frame): - writer.cache_state() - sys.exit(-sigcode) # Install our signal handler for each code in CAUGHT_SIGNALS, and save # the originals. - orig_signal_handlers = {sigcode: signal.signal(sigcode, signal_handler) + orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler) for sigcode in CAUGHT_SIGNALS} if writer.bytes_written > 0: # We're resuming a previous upload. - print >>sys.stderr, "arv-put: Resuming previous upload. Bypass with the --no-resume option." + print >>sys.stderr, "\n".join([ + "arv-put: Resuming previous upload from last checkpoint.", + " Use the --no-resume option to start over."]) writer.report_progress() - try: - writer.do_queued_work() # Do work resumed from cache. - for path in args.paths: # Copy file data to Keep. - if os.path.isdir(path): - writer.write_directory_tree( - path, max_manifest_depth=args.max_manifest_depth) - else: - writer.start_new_stream() - writer.write_file(path, args.filename or os.path.basename(path)) - except Exception: - writer.cache_state() - raise + writer.do_queued_work() # Do work resumed from cache. + for path in args.paths: # Copy file data to Keep. + if os.path.isdir(path): + writer.write_directory_tree( + path, max_manifest_depth=args.max_manifest_depth) + else: + writer.start_new_stream() + writer.write_file(path, args.filename or os.path.basename(path)) + writer.finish_current_stream() + + if args.progress: # Print newline to split stderr from stdout for humans. + print >>sys.stderr if args.stream: print writer.manifest_text(), elif args.raw: - writer.finish_current_stream() print ','.join(writer.data_locators()) else: # Register the resulting collection in Arvados. - arvados.api().collections().create( + collection = arvados.api().collections().create( body={ 'uuid': writer.finish(), 'manifest_text': writer.manifest_text(), @@ -387,12 +393,13 @@ def main(arguments=None): ).execute() # Print the locator (uuid) of the new collection. - print writer.finish() + print collection['uuid'] for sigcode, orig_handler in orig_signal_handlers.items(): signal.signal(sigcode, orig_handler) - resume_cache.destroy() + if resume_cache is not None: + resume_cache.destroy() if __name__ == '__main__': main()