X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/91e7c9058bf1f38ad50008a6fd2397c1e15d33eb..6029fb64fd6372c577f9f143e6f3dcaded127ac3:/sdk/python/arvados/commands/put.py diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py index 705dcfd8f3..01bae2fead 100644 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -11,9 +11,12 @@ import fcntl import hashlib import json import os +import signal import sys import tempfile +CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM] + def parse_arguments(arguments): parser = argparse.ArgumentParser( description='Copy data from the local filesystem to Keep.') @@ -108,6 +111,16 @@ def parse_arguments(arguments): total data size). """) + group = parser.add_mutually_exclusive_group() + group.add_argument('--resume', action='store_true', default=True, + help=""" + Continue interrupted uploads from cached state (default). + """) + group.add_argument('--no-resume', action='store_false', dest='resume', + help=""" + Do not continue interrupted uploads from cached state. + """) + args = parser.parse_args(arguments) if len(args.paths) == 0: @@ -139,12 +152,18 @@ class ResumeCacheConflict(Exception): class ResumeCache(object): CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put') - def __init__(self, file_spec): + @classmethod + def setup_user_cache(cls): try: - self.cache_file = open(file_spec, 'a+') - except TypeError: - file_spec = self.make_path(file_spec) - self.cache_file = open(file_spec, 'a+') + os.makedirs(cls.CACHE_DIR) + except OSError as error: + if error.errno != errno.EEXIST: + raise + else: + os.chmod(cls.CACHE_DIR, 0o700) + + def __init__(self, file_spec): + self.cache_file = open(file_spec, 'a+') self._lock_file(self.cache_file) self.filename = self.cache_file.name @@ -153,7 +172,7 @@ class ResumeCache(object): md5 = hashlib.md5() md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost')) realpaths = sorted(os.path.realpath(path) for path in args.paths) - md5.update(''.join(realpaths)) + md5.update('\0'.join(realpaths)) if any(os.path.isdir(path) for path in realpaths): md5.update(str(max(args.max_manifest_depth, -1))) elif args.filename: @@ -198,26 +217,36 @@ class ResumeCache(object): raise self.close() + def restart(self): + self.destroy() + self.__init__(self.filename) + + +class ArvPutCollectionWriter(arvados.ResumableCollectionWriter): + STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS + + ['bytes_written', '_seen_inputs']) -class ResumeCacheCollectionWriter(arvados.ResumableCollectionWriter): - def __init__(self, cache=None): + def __init__(self, cache=None, reporter=None, bytes_expected=None): + self.bytes_written = 0 + self._seen_inputs = [] self.cache = cache - super(ResumeCacheCollectionWriter, self).__init__() + self.reporter = reporter + self.bytes_expected = bytes_expected + super(ArvPutCollectionWriter, self).__init__() @classmethod - def from_cache(cls, cache): + def from_cache(cls, cache, reporter=None, bytes_expected=None): try: state = cache.load() state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])] - writer = cls.from_state(state) + writer = cls.from_state(state, cache, reporter, bytes_expected) except (TypeError, ValueError, arvados.errors.StaleWriterStateError) as error: - return cls(cache) + return cls(cache, reporter, bytes_expected) else: - writer.cache = cache return writer - def checkpoint_state(self): + def cache_state(self): if self.cache is None: return state = self.dump_state() @@ -229,41 +258,43 @@ class ResumeCacheCollectionWriter(arvados.ResumableCollectionWriter): state[attr] = list(value) self.cache.save(state) - -class CollectionWriterWithProgress(arvados.CollectionWriter): - def flush_data(self, *args, **kwargs): - if not getattr(self, 'display_type', None): - return - if not hasattr(self, 'bytes_flushed'): - self.bytes_flushed = 0 - self.bytes_flushed += self._data_buffer_len - super(CollectionWriterWithProgress, self).flush_data(*args, **kwargs) - self.bytes_flushed -= self._data_buffer_len - if self.display_type == 'machine': - sys.stderr.write('%s %d: %d written %d total\n' % - (sys.argv[0], - os.getpid(), - self.bytes_flushed, - getattr(self, 'bytes_expected', -1))) - elif getattr(self, 'bytes_expected', 0) > 0: - pct = 100.0 * self.bytes_flushed / self.bytes_expected - sys.stderr.write('\r%dM / %dM %.1f%% ' % - (self.bytes_flushed >> 20, - self.bytes_expected >> 20, pct)) - else: - sys.stderr.write('\r%d ' % self.bytes_flushed) - - def manifest_text(self, *args, **kwargs): - manifest_text = (super(CollectionWriterWithProgress, self) - .manifest_text(*args, **kwargs)) - if getattr(self, 'display_type', None): - if self.display_type == 'human': - sys.stderr.write('\n') - self.display_type = None - return manifest_text + def report_progress(self): + if self.reporter is not None: + self.reporter(self.bytes_written, self.bytes_expected) + + def flush_data(self): + start_buffer_len = self._data_buffer_len + start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE + super(ArvPutCollectionWriter, self).flush_data() + if self._data_buffer_len < start_buffer_len: # We actually PUT data. + self.bytes_written += (start_buffer_len - self._data_buffer_len) + self.report_progress() + if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count: + self.cache_state() + + def _record_new_input(self, input_type, source_name, dest_name): + # The key needs to be a list because that's what we'll get back + # from JSON deserialization. + key = [input_type, source_name, dest_name] + if key in self._seen_inputs: + return False + self._seen_inputs.append(key) + return True + + def write_file(self, source, filename=None): + if self._record_new_input('file', source, filename): + super(ArvPutCollectionWriter, self).write_file(source, filename) + + def write_directory_tree(self, + path, stream_name='.', max_manifest_depth=-1): + if self._record_new_input('directory', path, stream_name): + super(ArvPutCollectionWriter, self).write_directory_tree( + path, stream_name, max_manifest_depth) def expected_bytes_for(pathlist): + # Walk the given directory trees and stat files, adding up file sizes, + # so we can display progress as percent bytesum = 0 for path in pathlist: if os.path.isdir(path): @@ -283,47 +314,78 @@ def machine_progress(bytes_written, bytes_expected): def human_progress(bytes_written, bytes_expected): if bytes_expected: - return "\r{}M / {}M {:.1f}% ".format( + return "\r{}M / {}M {:.1%} ".format( bytes_written >> 20, bytes_expected >> 20, - bytes_written / bytes_expected) + float(bytes_written) / bytes_expected) else: return "\r{} ".format(bytes_written) +def progress_writer(progress_func, outfile=sys.stderr): + def write_progress(bytes_written, bytes_expected): + outfile.write(progress_func(bytes_written, bytes_expected)) + return write_progress + +def exit_signal_handler(sigcode, frame): + sys.exit(-sigcode) + def main(arguments=None): args = parse_arguments(arguments) if args.progress: - writer = CollectionWriterWithProgress() - writer.display_type = 'human' + reporter = progress_writer(human_progress) elif args.batch_progress: - writer = CollectionWriterWithProgress() - writer.display_type = 'machine' + reporter = progress_writer(machine_progress) else: - writer = arvados.CollectionWriter() - - # Walk the given directory trees and stat files, adding up file sizes, - # so we can display progress as percent - writer.bytes_expected = expected_bytes_for(args.paths) - if writer.bytes_expected is None: - del writer.bytes_expected - - # Copy file data to Keep. - for path in args.paths: + reporter = None + bytes_expected = expected_bytes_for(args.paths) + + try: + ResumeCache.setup_user_cache() + resume_cache = ResumeCache(ResumeCache.make_path(args)) + except (IOError, OSError): + # Couldn't open cache directory/file. Continue without it. + resume_cache = None + writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected) + except ResumeCacheConflict: + print "arv-put: Another process is already uploading this data." + sys.exit(1) + else: + if not args.resume: + resume_cache.restart() + writer = ArvPutCollectionWriter.from_cache( + resume_cache, reporter, bytes_expected) + + # Install our signal handler for each code in CAUGHT_SIGNALS, and save + # the originals. + orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler) + for sigcode in CAUGHT_SIGNALS} + + if writer.bytes_written > 0: # We're resuming a previous upload. + print >>sys.stderr, "\n".join([ + "arv-put: Resuming previous upload from last checkpoint.", + " Use the --no-resume option to start over."]) + writer.report_progress() + + writer.do_queued_work() # Do work resumed from cache. + for path in args.paths: # Copy file data to Keep. if os.path.isdir(path): writer.write_directory_tree( path, max_manifest_depth=args.max_manifest_depth) else: writer.start_new_stream() writer.write_file(path, args.filename or os.path.basename(path)) + writer.finish_current_stream() + + if args.progress: # Print newline to split stderr from stdout for humans. + print >>sys.stderr if args.stream: print writer.manifest_text(), elif args.raw: - writer.finish_current_stream() print ','.join(writer.data_locators()) else: # Register the resulting collection in Arvados. - arvados.api().collections().create( + collection = arvados.api().collections().create( body={ 'uuid': writer.finish(), 'manifest_text': writer.manifest_text(), @@ -331,7 +393,13 @@ def main(arguments=None): ).execute() # Print the locator (uuid) of the new collection. - print writer.finish() + print collection['uuid'] + + for sigcode, orig_handler in orig_signal_handlers.items(): + signal.signal(sigcode, orig_handler) + + if resume_cache is not None: + resume_cache.destroy() if __name__ == '__main__': main()