2879: Move Python is_hex function to arvados.util.
[arvados.git] / sdk / python / arvados / collection.py
index f339381ba1a95a34b55fa402fa882d1e9a88386b..e7b26017d613ea8d0c792da9d228bb722ba9af44 100644 (file)
@@ -171,7 +171,7 @@ class CollectionWriter(object):
     def __exit__(self):
         self.finish()
 
-    def _do_queued_work(self):
+    def do_queued_work(self):
         # The work queue consists of three pieces:
         # * _queued_file: The file object we're currently writing to the
         #   Collection.
@@ -194,11 +194,6 @@ class CollectionWriter(object):
                 self._work_trees()
             else:
                 break
-            self.checkpoint_state()
-
-    def checkpoint_state(self):
-        # Subclasses can implement this method to, e.g., report or record state.
-        pass
 
     def _work_file(self):
         while True:
@@ -256,12 +251,12 @@ class CollectionWriter(object):
 
     def write_file(self, source, filename=None):
         self._queue_file(source, filename)
-        self._do_queued_work()
+        self.do_queued_work()
 
     def write_directory_tree(self,
                              path, stream_name='.', max_manifest_depth=-1):
         self._queue_tree(path, stream_name, max_manifest_depth)
-        self._do_queued_work()
+        self.do_queued_work()
 
     def write(self, newdata):
         if hasattr(newdata, '__iter__'):
@@ -280,7 +275,6 @@ class CollectionWriter(object):
             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
             self._data_buffer_len = len(self._data_buffer[0])
-            self.checkpoint_state()
 
     def start_new_file(self, newfilename=None):
         self.finish_current_file()
@@ -347,8 +341,25 @@ class CollectionWriter(object):
         self._current_file_name = None
 
     def finish(self):
-        return Keep.put(self.manifest_text())
-
+        # Send the stripped manifest to Keep, to ensure that we use the
+        # same UUID regardless of what hints are used on the collection.
+        return Keep.put(self.stripped_manifest())
+
+    def stripped_manifest(self):
+        """
+        Return the manifest for the current collection with all permission
+        hints removed from the locators in the manifest.
+        """
+        raw = self.manifest_text()
+        clean = ''
+        for line in raw.split("\n"):
+            fields = line.split()
+            if len(fields) > 0:
+                locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
+                             for x in fields[1:-1] ]
+                clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
+        return clean
+        
     def manifest_text(self):
         self.finish_current_stream()
         manifest = ''
@@ -386,6 +397,12 @@ class ResumableCollectionWriter(CollectionWriter):
 
     @classmethod
     def from_state(cls, state, *init_args, **init_kwargs):
+        # Try to build a new writer from scratch with the given state.
+        # If the state is not suitable to resume (because files have changed,
+        # been deleted, aren't predictable, etc.), raise a
+        # StaleWriterStateError.  Otherwise, return the initialized writer.
+        # The caller is responsible for calling writer.do_queued_work()
+        # appropriately after it's returned.
         writer = cls(*init_args, **init_kwargs)
         for attr_name in cls.STATE_PROPS:
             attr_value = state[attr_name]
@@ -409,13 +426,8 @@ class ResumableCollectionWriter(CollectionWriter):
             except IOError as error:
                 raise errors.StaleWriterStateError(
                     "failed to reopen active file {}: {}".format(path, error))
-        writer.preresume_hook()
-        writer._do_queued_work()
         return writer
 
-    def preresume_hook(self):
-        pass  # Subclasses can override this as desired.
-
     def check_dependencies(self):
         for path, orig_stat in self._dependencies.items():
             if not S_ISREG(orig_stat[ST_MODE]):