closes #2896
[arvados.git] / sdk / python / arvados / collection.py
index 7b38ccd180444a92a8bd1c3684b1082b49568f4c..e7b26017d613ea8d0c792da9d228bb722ba9af44 100644 (file)
@@ -171,7 +171,7 @@ class CollectionWriter(object):
     def __exit__(self):
         self.finish()
 
-    def _do_queued_work(self):
+    def do_queued_work(self):
         # The work queue consists of three pieces:
         # * _queued_file: The file object we're currently writing to the
         #   Collection.
@@ -194,11 +194,6 @@ class CollectionWriter(object):
                 self._work_trees()
             else:
                 break
-            self.checkpoint_state()
-
-    def checkpoint_state(self):
-        # Subclasses can implement this method to, e.g., report or record state.
-        pass
 
     def _work_file(self):
         while True:
@@ -256,12 +251,12 @@ class CollectionWriter(object):
 
     def write_file(self, source, filename=None):
         self._queue_file(source, filename)
-        self._do_queued_work()
+        self.do_queued_work()
 
     def write_directory_tree(self,
                              path, stream_name='.', max_manifest_depth=-1):
         self._queue_tree(path, stream_name, max_manifest_depth)
-        self._do_queued_work()
+        self.do_queued_work()
 
     def write(self, newdata):
         if hasattr(newdata, '__iter__'):
@@ -280,7 +275,6 @@ class CollectionWriter(object):
             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
             self._data_buffer_len = len(self._data_buffer[0])
-            self.checkpoint_state()
 
     def start_new_file(self, newfilename=None):
         self.finish_current_file()
@@ -347,8 +341,25 @@ class CollectionWriter(object):
         self._current_file_name = None
 
     def finish(self):
-        return Keep.put(self.manifest_text())
-
+        # Send the stripped manifest to Keep, to ensure that we use the
+        # same UUID regardless of what hints are used on the collection.
+        return Keep.put(self.stripped_manifest())
+
+    def stripped_manifest(self):
+        """
+        Return the manifest for the current collection with all permission
+        hints removed from the locators in the manifest.
+        """
+        raw = self.manifest_text()
+        clean = ''
+        for line in raw.split("\n"):
+            fields = line.split()
+            if len(fields) > 0:
+                locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
+                             for x in fields[1:-1] ]
+                clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
+        return clean
+        
     def manifest_text(self):
         self.finish_current_stream()
         manifest = ''
@@ -385,8 +396,14 @@ class ResumableCollectionWriter(CollectionWriter):
         super(ResumableCollectionWriter, self).__init__()
 
     @classmethod
-    def from_state(cls, state):
-        writer = cls()
+    def from_state(cls, state, *init_args, **init_kwargs):
+        # Try to build a new writer from scratch with the given state.
+        # If the state is not suitable to resume (because files have changed,
+        # been deleted, aren't predictable, etc.), raise a
+        # StaleWriterStateError.  Otherwise, return the initialized writer.
+        # The caller is responsible for calling writer.do_queued_work()
+        # appropriately after it's returned.
+        writer = cls(*init_args, **init_kwargs)
         for attr_name in cls.STATE_PROPS:
             attr_value = state[attr_name]
             attr_class = getattr(writer, attr_name).__class__
@@ -409,7 +426,6 @@ class ResumableCollectionWriter(CollectionWriter):
             except IOError as error:
                 raise errors.StaleWriterStateError(
                     "failed to reopen active file {}: {}".format(path, error))
-        writer._do_queued_work()
         return writer
 
     def check_dependencies(self):
@@ -443,15 +459,22 @@ class ResumableCollectionWriter(CollectionWriter):
             raise errors.AssertionError("{} not a file path".format(source))
         try:
             path_stat = os.stat(src_path)
-        except OSError as error:
-            raise errors.AssertionError(
-                "could not stat {}: {}".format(source, error))
+        except OSError as stat_error:
+            path_stat = None
         super(ResumableCollectionWriter, self)._queue_file(source, filename)
         fd_stat = os.fstat(self._queued_file.fileno())
-        if path_stat.st_ino != fd_stat.st_ino:
+        if not S_ISREG(fd_stat.st_mode):
+            # We won't be able to resume from this cache anyway, so don't
+            # worry about further checks.
+            self._dependencies[source] = tuple(fd_stat)
+        elif path_stat is None:
+            raise errors.AssertionError(
+                "could not stat {}: {}".format(source, stat_error))
+        elif path_stat.st_ino != fd_stat.st_ino:
             raise errors.AssertionError(
                 "{} changed between open and stat calls".format(source))
-        self._dependencies[src_path] = tuple(fd_stat)
+        else:
+            self._dependencies[src_path] = tuple(fd_stat)
 
     def write(self, data):
         if self._queued_file is None: