2752: arv-put explains resumed uploads in more detail.
[arvados.git] / sdk / python / arvados / commands / put.py
index 44f911e60b5ed354124b4f532feba157267859f7..d9e401dcbbdfedec60e573346859cb829cc51eea 100644 (file)
@@ -11,9 +11,12 @@ import fcntl
 import hashlib
 import json
 import os
+import signal
 import sys
 import tempfile
 
+CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
+
 def parse_arguments(arguments):
     parser = argparse.ArgumentParser(
         description='Copy data from the local filesystem to Keep.')
@@ -169,7 +172,7 @@ class ResumeCache(object):
         md5 = hashlib.md5()
         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
         realpaths = sorted(os.path.realpath(path) for path in args.paths)
-        md5.update(''.join(realpaths))
+        md5.update('\0'.join(realpaths))
         if any(os.path.isdir(path) for path in realpaths):
             md5.update(str(max(args.max_manifest_depth, -1)))
         elif args.filename:
@@ -227,10 +230,7 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
         self.bytes_written = 0
         self._seen_inputs = []
         self.cache = cache
-        if reporter is None:
-            self.report_progress = lambda bytes_w, bytes_e: None
-        else:
-            self.report_progress = reporter
+        self.reporter = reporter
         self.bytes_expected = bytes_expected
         super(ArvPutCollectionWriter, self).__init__()
 
@@ -246,11 +246,7 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
         else:
             return writer
 
-    def preresume_hook(self):
-        print >>sys.stderr, "arv-put: Resuming previous upload.  Bypass with the --no-resume option."
-        self.report_progress(self.bytes_written, self.bytes_expected)
-
-    def checkpoint_state(self):
+    def cache_state(self):
         if self.cache is None:
             return
         state = self.dump_state()
@@ -262,11 +258,19 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
                 state[attr] = list(value)
         self.cache.save(state)
 
+    def report_progress(self):
+        if self.reporter is not None:
+            self.reporter(self.bytes_written, self.bytes_expected)
+
     def flush_data(self):
-        bytes_buffered = self._data_buffer_len
+        start_buffer_len = self._data_buffer_len
+        start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
         super(ArvPutCollectionWriter, self).flush_data()
-        self.bytes_written += (bytes_buffered - self._data_buffer_len)
-        self.report_progress(self.bytes_written, self.bytes_expected)
+        if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
+            self.bytes_written += (start_buffer_len - self._data_buffer_len)
+            self.report_progress()
+            if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
+                self.cache_state()
 
     def _record_new_input(self, input_type, source_name, dest_name):
         # The key needs to be a list because that's what we'll get back
@@ -343,19 +347,40 @@ def main(arguments=None):
     writer = ArvPutCollectionWriter.from_cache(
         resume_cache, reporter, expected_bytes_for(args.paths))
 
-    # Copy file data to Keep.
-    for path in args.paths:
-        if os.path.isdir(path):
-            writer.write_directory_tree(
-                path, max_manifest_depth=args.max_manifest_depth)
-        else:
-            writer.start_new_stream()
-            writer.write_file(path, args.filename or os.path.basename(path))
+    def signal_handler(sigcode, frame):
+        writer.cache_state()
+        sys.exit(-sigcode)
+    # Install our signal handler for each code in CAUGHT_SIGNALS, and save
+    # the originals.
+    orig_signal_handlers = {sigcode: signal.signal(sigcode, signal_handler)
+                            for sigcode in CAUGHT_SIGNALS}
+
+    if writer.bytes_written > 0:  # We're resuming a previous upload.
+        print >>sys.stderr, "\n".join([
+                "arv-put: Resuming previous upload from last checkpoint.",
+                "         Use the --no-resume option to start over."])
+        writer.report_progress()
+
+    try:
+        writer.do_queued_work()  # Do work resumed from cache.
+        for path in args.paths:  # Copy file data to Keep.
+            if os.path.isdir(path):
+                writer.write_directory_tree(
+                    path, max_manifest_depth=args.max_manifest_depth)
+            else:
+                writer.start_new_stream()
+                writer.write_file(path, args.filename or os.path.basename(path))
+        writer.finish_current_stream()
+    except Exception:
+        writer.cache_state()
+        raise
+
+    if args.progress:  # Print newline to split stderr from stdout for humans.
+        print >>sys.stderr
 
     if args.stream:
         print writer.manifest_text(),
     elif args.raw:
-        writer.finish_current_stream()
         print ','.join(writer.data_locators())
     else:
         # Register the resulting collection in Arvados.
@@ -368,6 +393,10 @@ def main(arguments=None):
 
         # Print the locator (uuid) of the new collection.
         print writer.finish()
+
+    for sigcode, orig_handler in orig_signal_handlers.items():
+        signal.signal(sigcode, orig_handler)
+
     resume_cache.destroy()
 
 if __name__ == '__main__':