X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4d84bf681a8a197e6e900dcfc9d82e7fe13bac5f..13345f2805097df138faf7fcd1f0ed31a0d948fb:/sdk/python/arvados/commands/put.py diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py index a0dec2b05a..d67cf8494b 100644 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -5,6 +5,7 @@ import argparse import arvados +import base64 import errno import fcntl import hashlib @@ -107,6 +108,16 @@ def parse_arguments(arguments): total data size). """) + group = parser.add_mutually_exclusive_group() + group.add_argument('--resume', action='store_true', default=True, + help=""" + Continue interrupted uploads from cached state (default). + """) + group.add_argument('--no-resume', action='store_false', dest='resume', + help=""" + Do not continue interrupted uploads from cached state. + """) + args = parser.parse_args(arguments) if len(args.paths) == 0: @@ -138,12 +149,18 @@ class ResumeCacheConflict(Exception): class ResumeCache(object): CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put') - def __init__(self, file_spec): + @classmethod + def setup_user_cache(cls): try: - self.cache_file = open(file_spec, 'a+') - except TypeError: - file_spec = self.make_path(file_spec) - self.cache_file = open(file_spec, 'a+') + os.makedirs(cls.CACHE_DIR) + except OSError as error: + if error.errno != errno.EEXIST: + raise + else: + os.chmod(cls.CACHE_DIR, 0o700) + + def __init__(self, file_spec): + self.cache_file = open(file_spec, 'a+') self._lock_file(self.cache_file) self.filename = self.cache_file.name @@ -197,65 +214,111 @@ class ResumeCache(object): raise self.close() + def restart(self): + self.destroy() + self.__init__(self.filename) -class CollectionWriterWithProgress(arvados.CollectionWriter): - def flush_data(self, *args, **kwargs): - if not getattr(self, 'display_type', None): - return - if not hasattr(self, 'bytes_flushed'): - self.bytes_flushed = 0 - self.bytes_flushed += self._data_buffer_len - super(CollectionWriterWithProgress, self).flush_data(*args, **kwargs) - self.bytes_flushed -= self._data_buffer_len - if self.display_type == 'machine': - sys.stderr.write('%s %d: %d written %d total\n' % - (sys.argv[0], - os.getpid(), - self.bytes_flushed, - getattr(self, 'bytes_expected', -1))) - elif getattr(self, 'bytes_expected', 0) > 0: - pct = 100.0 * self.bytes_flushed / self.bytes_expected - sys.stderr.write('\r%dM / %dM %.1f%% ' % - (self.bytes_flushed >> 20, - self.bytes_expected >> 20, pct)) - else: - sys.stderr.write('\r%d ' % self.bytes_flushed) - def manifest_text(self, *args, **kwargs): - manifest_text = (super(CollectionWriterWithProgress, self) - .manifest_text(*args, **kwargs)) - if getattr(self, 'display_type', None): - if self.display_type == 'human': - sys.stderr.write('\n') - self.display_type = None - return manifest_text +class ArvPutCollectionWriter(arvados.ResumableCollectionWriter): + STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS + + ['bytes_written']) + def __init__(self, cache=None, reporter=None, bytes_expected=None): + self.bytes_written = 0 + self.cache = cache + self.report_func = reporter + self.bytes_expected = bytes_expected + super(ArvPutCollectionWriter, self).__init__() -def main(arguments=None): - args = parse_arguments(arguments) + @classmethod + def from_cache(cls, cache, reporter=None, bytes_expected=None): + try: + state = cache.load() + state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])] + writer = cls.from_state(state, cache, reporter, bytes_expected) + except (TypeError, ValueError, + arvados.errors.StaleWriterStateError) as error: + return cls(cache, reporter, bytes_expected) + else: + return writer - if args.progress: - writer = CollectionWriterWithProgress() - writer.display_type = 'human' - elif args.batch_progress: - writer = CollectionWriterWithProgress() - writer.display_type = 'machine' - else: - writer = arvados.CollectionWriter() + def preresume_hook(self): + print >>sys.stderr, "arv-put: Resuming previous upload. Bypass with the --no-resume option." + def checkpoint_state(self): + if self.cache is None: + return + state = self.dump_state() + # Transform attributes for serialization. + for attr, value in state.items(): + if attr == '_data_buffer': + state[attr] = base64.encodestring(''.join(value)) + elif hasattr(value, 'popleft'): + state[attr] = list(value) + self.cache.save(state) + + def flush_data(self): + bytes_buffered = self._data_buffer_len + super(ArvPutCollectionWriter, self).flush_data() + self.bytes_written += (bytes_buffered - self._data_buffer_len) + if self.report_func is not None: + self.report_func(self.bytes_written, self.bytes_expected) + + +def expected_bytes_for(pathlist): # Walk the given directory trees and stat files, adding up file sizes, # so we can display progress as percent - writer.bytes_expected = 0 - for path in args.paths: + bytesum = 0 + for path in pathlist: if os.path.isdir(path): for filename in arvados.util.listdir_recursive(path): - writer.bytes_expected += os.path.getsize( - os.path.join(path, filename)) + bytesum += os.path.getsize(os.path.join(path, filename)) elif not os.path.isfile(path): - del writer.bytes_expected - break + return None else: - writer.bytes_expected += os.path.getsize(path) + bytesum += os.path.getsize(path) + return bytesum + +_machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0], + os.getpid()) +def machine_progress(bytes_written, bytes_expected): + return _machine_format.format( + bytes_written, -1 if (bytes_expected is None) else bytes_expected) + +def human_progress(bytes_written, bytes_expected): + if bytes_expected: + return "\r{}M / {}M {:.1%} ".format( + bytes_written >> 20, bytes_expected >> 20, + float(bytes_written) / bytes_expected) + else: + return "\r{} ".format(bytes_written) + +def progress_writer(progress_func, outfile=sys.stderr): + def write_progress(bytes_written, bytes_expected): + outfile.write(progress_func(bytes_written, bytes_expected)) + return write_progress + +def main(arguments=None): + ResumeCache.setup_user_cache() + args = parse_arguments(arguments) + + if args.progress: + reporter = progress_writer(human_progress) + elif args.batch_progress: + reporter = progress_writer(machine_progress) + else: + reporter = None + + try: + resume_cache = ResumeCache(ResumeCache.make_path(args)) + if not args.resume: + resume_cache.restart() + except ResumeCacheConflict: + print "arv-put: Another process is already uploading this data." + sys.exit(1) + + writer = ArvPutCollectionWriter.from_cache( + resume_cache, reporter, expected_bytes_for(args.paths)) # Copy file data to Keep. for path in args.paths: @@ -282,6 +345,7 @@ def main(arguments=None): # Print the locator (uuid) of the new collection. print writer.finish() + resume_cache.destroy() if __name__ == '__main__': main()