2752: Don't duplicate arv-put work after resume.
authorBrett Smith <brett@curoverse.com>
Thu, 29 May 2014 17:02:14 +0000 (13:02 -0400)
committerBrett Smith <brett@curoverse.com>
Thu, 29 May 2014 17:10:17 +0000 (13:10 -0400)
This change serializes the command-line arguments that we've actually
processed, vs. not.  That allows us to safely iterate over them again
to upload any files that we hadn't started, while skipping the ones
we've already done.

sdk/python/arvados/commands/put.py

index 1ccf7861144ce4b61043df96914c944f2089f778..44f911e60b5ed354124b4f532feba157267859f7 100644 (file)
@@ -221,10 +221,11 @@ class ResumeCache(object):
 
 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
-                   ['bytes_written'])
+                   ['bytes_written', '_seen_inputs'])
 
     def __init__(self, cache=None, reporter=None, bytes_expected=None):
         self.bytes_written = 0
+        self._seen_inputs = []
         self.cache = cache
         if reporter is None:
             self.report_progress = lambda bytes_w, bytes_e: None
@@ -267,6 +268,25 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
         self.bytes_written += (bytes_buffered - self._data_buffer_len)
         self.report_progress(self.bytes_written, self.bytes_expected)
 
+    def _record_new_input(self, input_type, source_name, dest_name):
+        # The key needs to be a list because that's what we'll get back
+        # from JSON deserialization.
+        key = [input_type, source_name, dest_name]
+        if key in self._seen_inputs:
+            return False
+        self._seen_inputs.append(key)
+        return True
+
+    def write_file(self, source, filename=None):
+        if self._record_new_input('file', source, filename):
+            super(ArvPutCollectionWriter, self).write_file(source, filename)
+
+    def write_directory_tree(self,
+                             path, stream_name='.', max_manifest_depth=-1):
+        if self._record_new_input('directory', path, stream_name):
+            super(ArvPutCollectionWriter, self).write_directory_tree(
+                path, stream_name, max_manifest_depth)
+
 
 def expected_bytes_for(pathlist):
     # Walk the given directory trees and stat files, adding up file sizes,