2752: Don't duplicate arv-put work after resume.
[arvados.git] / sdk / python / arvados / commands / put.py
index 4568565d11acd1f09abc427cc7ed26ff996d0b73..44f911e60b5ed354124b4f532feba157267859f7 100644 (file)
@@ -160,11 +160,7 @@ class ResumeCache(object):
             os.chmod(cls.CACHE_DIR, 0o700)
 
     def __init__(self, file_spec):
-        try:
-            self.cache_file = open(file_spec, 'a+')
-        except TypeError:
-            file_spec = self.make_path(file_spec)
-            self.cache_file = open(file_spec, 'a+')
+        self.cache_file = open(file_spec, 'a+')
         self._lock_file(self.cache_file)
         self.filename = self.cache_file.name
 
@@ -224,29 +220,36 @@ class ResumeCache(object):
 
 
 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
-    def __init__(self, cache=None, reporter=None, bytes_expected=None):
-        self.__init_locals__(cache, reporter, bytes_expected)
-        super(ArvPutCollectionWriter, self).__init__()
+    STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
+                   ['bytes_written', '_seen_inputs'])
 
-    def __init_locals__(self, cache, reporter, bytes_expected):
-        self.cache = cache
-        self.report_func = reporter
+    def __init__(self, cache=None, reporter=None, bytes_expected=None):
         self.bytes_written = 0
+        self._seen_inputs = []
+        self.cache = cache
+        if reporter is None:
+            self.report_progress = lambda bytes_w, bytes_e: None
+        else:
+            self.report_progress = reporter
         self.bytes_expected = bytes_expected
+        super(ArvPutCollectionWriter, self).__init__()
 
     @classmethod
     def from_cache(cls, cache, reporter=None, bytes_expected=None):
         try:
             state = cache.load()
             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
-            writer = cls.from_state(state)
+            writer = cls.from_state(state, cache, reporter, bytes_expected)
         except (TypeError, ValueError,
                 arvados.errors.StaleWriterStateError) as error:
             return cls(cache, reporter, bytes_expected)
         else:
-            writer.__init_locals__(cache, reporter, bytes_expected)
             return writer
 
+    def preresume_hook(self):
+        print >>sys.stderr, "arv-put: Resuming previous upload.  Bypass with the --no-resume option."
+        self.report_progress(self.bytes_written, self.bytes_expected)
+
     def checkpoint_state(self):
         if self.cache is None:
             return
@@ -263,8 +266,26 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
         bytes_buffered = self._data_buffer_len
         super(ArvPutCollectionWriter, self).flush_data()
         self.bytes_written += (bytes_buffered - self._data_buffer_len)
-        if self.report_func is not None:
-            self.report_func(self.bytes_written, self.bytes_expected)
+        self.report_progress(self.bytes_written, self.bytes_expected)
+
+    def _record_new_input(self, input_type, source_name, dest_name):
+        # The key needs to be a list because that's what we'll get back
+        # from JSON deserialization.
+        key = [input_type, source_name, dest_name]
+        if key in self._seen_inputs:
+            return False
+        self._seen_inputs.append(key)
+        return True
+
+    def write_file(self, source, filename=None):
+        if self._record_new_input('file', source, filename):
+            super(ArvPutCollectionWriter, self).write_file(source, filename)
+
+    def write_directory_tree(self,
+                             path, stream_name='.', max_manifest_depth=-1):
+        if self._record_new_input('directory', path, stream_name):
+            super(ArvPutCollectionWriter, self).write_directory_tree(
+                path, stream_name, max_manifest_depth)
 
 
 def expected_bytes_for(pathlist):
@@ -289,9 +310,9 @@ def machine_progress(bytes_written, bytes_expected):
 
 def human_progress(bytes_written, bytes_expected):
     if bytes_expected:
-        return "\r{}M / {}M {:.1f}% ".format(
+        return "\r{}M / {}M {:.1%} ".format(
             bytes_written >> 20, bytes_expected >> 20,
-            bytes_written / bytes_expected)
+            float(bytes_written) / bytes_expected)
     else:
         return "\r{} ".format(bytes_written)
 
@@ -312,7 +333,7 @@ def main(arguments=None):
         reporter = None
 
     try:
-        resume_cache = ResumeCache(args)
+        resume_cache = ResumeCache(ResumeCache.make_path(args))
         if not args.resume:
             resume_cache.restart()
     except ResumeCacheConflict: