2752: arv-put works when it can't write a cache file.
[arvados.git] / sdk / python / arvados / commands / put.py
index 439504786ec033de6a3b07a32ea82206e7e0237a..01bae2feade3e7345c1e57c352eba713cafe4a81 100644 (file)
@@ -11,9 +11,12 @@ import fcntl
 import hashlib
 import json
 import os
+import signal
 import sys
 import tempfile
 
+CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
+
 def parse_arguments(arguments):
     parser = argparse.ArgumentParser(
         description='Copy data from the local filesystem to Keep.')
@@ -108,6 +111,16 @@ def parse_arguments(arguments):
     total data size).
     """)
 
+    group = parser.add_mutually_exclusive_group()
+    group.add_argument('--resume', action='store_true', default=True,
+                       help="""
+    Continue interrupted uploads from cached state (default).
+    """)
+    group.add_argument('--no-resume', action='store_false', dest='resume',
+                       help="""
+    Do not continue interrupted uploads from cached state.
+    """)
+
     args = parser.parse_args(arguments)
 
     if len(args.paths) == 0:
@@ -139,12 +152,18 @@ class ResumeCacheConflict(Exception):
 class ResumeCache(object):
     CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
 
-    def __init__(self, file_spec):
+    @classmethod
+    def setup_user_cache(cls):
         try:
-            self.cache_file = open(file_spec, 'a+')
-        except TypeError:
-            file_spec = self.make_path(file_spec)
-            self.cache_file = open(file_spec, 'a+')
+            os.makedirs(cls.CACHE_DIR)
+        except OSError as error:
+            if error.errno != errno.EEXIST:
+                raise
+        else:
+            os.chmod(cls.CACHE_DIR, 0o700)
+
+    def __init__(self, file_spec):
+        self.cache_file = open(file_spec, 'a+')
         self._lock_file(self.cache_file)
         self.filename = self.cache_file.name
 
@@ -153,7 +172,7 @@ class ResumeCache(object):
         md5 = hashlib.md5()
         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
         realpaths = sorted(os.path.realpath(path) for path in args.paths)
-        md5.update(''.join(realpaths))
+        md5.update('\0'.join(realpaths))
         if any(os.path.isdir(path) for path in realpaths):
             md5.update(str(max(args.max_manifest_depth, -1)))
         elif args.filename:
@@ -204,30 +223,30 @@ class ResumeCache(object):
 
 
 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
-    def __init__(self, cache=None, reporter=None, bytes_expected=None):
-        self.__init_locals__(cache, reporter, bytes_expected)
-        super(ArvPutCollectionWriter, self).__init__()
+    STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
+                   ['bytes_written', '_seen_inputs'])
 
-    def __init_locals__(self, cache, reporter, bytes_expected):
-        self.cache = cache
-        self.report_func = reporter
+    def __init__(self, cache=None, reporter=None, bytes_expected=None):
         self.bytes_written = 0
+        self._seen_inputs = []
+        self.cache = cache
+        self.reporter = reporter
         self.bytes_expected = bytes_expected
+        super(ArvPutCollectionWriter, self).__init__()
 
     @classmethod
     def from_cache(cls, cache, reporter=None, bytes_expected=None):
         try:
             state = cache.load()
             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
-            writer = cls.from_state(state)
+            writer = cls.from_state(state, cache, reporter, bytes_expected)
         except (TypeError, ValueError,
                 arvados.errors.StaleWriterStateError) as error:
             return cls(cache, reporter, bytes_expected)
         else:
-            writer.__init_locals__(cache, reporter, bytes_expected)
             return writer
 
-    def checkpoint_state(self):
+    def cache_state(self):
         if self.cache is None:
             return
         state = self.dump_state()
@@ -239,12 +258,38 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
                 state[attr] = list(value)
         self.cache.save(state)
 
+    def report_progress(self):
+        if self.reporter is not None:
+            self.reporter(self.bytes_written, self.bytes_expected)
+
     def flush_data(self):
-        bytes_buffered = self._data_buffer_len
+        start_buffer_len = self._data_buffer_len
+        start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
         super(ArvPutCollectionWriter, self).flush_data()
-        self.bytes_written += (bytes_buffered - self._data_buffer_len)
-        if self.report_func is not None:
-            self.report_func(self.bytes_written, self.bytes_expected)
+        if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
+            self.bytes_written += (start_buffer_len - self._data_buffer_len)
+            self.report_progress()
+            if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
+                self.cache_state()
+
+    def _record_new_input(self, input_type, source_name, dest_name):
+        # The key needs to be a list because that's what we'll get back
+        # from JSON deserialization.
+        key = [input_type, source_name, dest_name]
+        if key in self._seen_inputs:
+            return False
+        self._seen_inputs.append(key)
+        return True
+
+    def write_file(self, source, filename=None):
+        if self._record_new_input('file', source, filename):
+            super(ArvPutCollectionWriter, self).write_file(source, filename)
+
+    def write_directory_tree(self,
+                             path, stream_name='.', max_manifest_depth=-1):
+        if self._record_new_input('directory', path, stream_name):
+            super(ArvPutCollectionWriter, self).write_directory_tree(
+                path, stream_name, max_manifest_depth)
 
 
 def expected_bytes_for(pathlist):
@@ -269,9 +314,9 @@ def machine_progress(bytes_written, bytes_expected):
 
 def human_progress(bytes_written, bytes_expected):
     if bytes_expected:
-        return "\r{}M / {}M {:.1f}% ".format(
+        return "\r{}M / {}M {:.1%} ".format(
             bytes_written >> 20, bytes_expected >> 20,
-            bytes_written / bytes_expected)
+            float(bytes_written) / bytes_expected)
     else:
         return "\r{} ".format(bytes_written)
 
@@ -280,6 +325,9 @@ def progress_writer(progress_func, outfile=sys.stderr):
         outfile.write(progress_func(bytes_written, bytes_expected))
     return write_progress
 
+def exit_signal_handler(sigcode, frame):
+    sys.exit(-sigcode)
+
 def main(arguments=None):
     args = parse_arguments(arguments)
 
@@ -289,27 +337,55 @@ def main(arguments=None):
         reporter = progress_writer(machine_progress)
     else:
         reporter = None
-
-    writer = ArvPutCollectionWriter(
-        reporter=reporter, bytes_expected=expected_bytes_for(args.paths))
-
-    # Copy file data to Keep.
-    for path in args.paths:
+    bytes_expected = expected_bytes_for(args.paths)
+
+    try:
+        ResumeCache.setup_user_cache()
+        resume_cache = ResumeCache(ResumeCache.make_path(args))
+    except (IOError, OSError):
+        # Couldn't open cache directory/file.  Continue without it.
+        resume_cache = None
+        writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
+    except ResumeCacheConflict:
+        print "arv-put: Another process is already uploading this data."
+        sys.exit(1)
+    else:
+        if not args.resume:
+            resume_cache.restart()
+        writer = ArvPutCollectionWriter.from_cache(
+            resume_cache, reporter, bytes_expected)
+
+    # Install our signal handler for each code in CAUGHT_SIGNALS, and save
+    # the originals.
+    orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
+                            for sigcode in CAUGHT_SIGNALS}
+
+    if writer.bytes_written > 0:  # We're resuming a previous upload.
+        print >>sys.stderr, "\n".join([
+                "arv-put: Resuming previous upload from last checkpoint.",
+                "         Use the --no-resume option to start over."])
+        writer.report_progress()
+
+    writer.do_queued_work()  # Do work resumed from cache.
+    for path in args.paths:  # Copy file data to Keep.
         if os.path.isdir(path):
             writer.write_directory_tree(
                 path, max_manifest_depth=args.max_manifest_depth)
         else:
             writer.start_new_stream()
             writer.write_file(path, args.filename or os.path.basename(path))
+    writer.finish_current_stream()
+
+    if args.progress:  # Print newline to split stderr from stdout for humans.
+        print >>sys.stderr
 
     if args.stream:
         print writer.manifest_text(),
     elif args.raw:
-        writer.finish_current_stream()
         print ','.join(writer.data_locators())
     else:
         # Register the resulting collection in Arvados.
-        arvados.api().collections().create(
+        collection = arvados.api().collections().create(
             body={
                 'uuid': writer.finish(),
                 'manifest_text': writer.manifest_text(),
@@ -317,7 +393,13 @@ def main(arguments=None):
             ).execute()
 
         # Print the locator (uuid) of the new collection.
-        print writer.finish()
+        print collection['uuid']
+
+    for sigcode, orig_handler in orig_signal_handlers.items():
+        signal.signal(sigcode, orig_handler)
+
+    if resume_cache is not None:
+        resume_cache.destroy()
 
 if __name__ == '__main__':
     main()