11308: Fix bytes vs. str problems.
[arvados.git] / sdk / python / arvados / collection.py
index f735b9ed37c8f92288e05b21d7b7cdbff5fd77fe..0d88084340dbe227d81bef0a2537618ed8fd66c2 100644 (file)
@@ -1,3 +1,7 @@
+from __future__ import absolute_import
+from builtins import str
+from past.builtins import basestring
+from builtins import object
 import functools
 import logging
 import os
@@ -11,15 +15,15 @@ from collections import deque
 from stat import *
 
 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
-from keep import KeepLocator, KeepClient
+from .keep import KeepLocator, KeepClient
 from .stream import StreamReader
 from ._normalize_stream import normalize_stream
 from ._ranges import Range, LocatorAndRange
 from .safeapi import ThreadSafeApiCache
-import config
-import errors
-import util
-import events
+import arvados.config as config
+import arvados.errors as errors
+import arvados.util
+import arvados.events as events
 from arvados.retry import retry_method
 
 _logger = logging.getLogger('arvados.collection')
@@ -51,7 +55,7 @@ class CollectionBase(object):
             if fields:
                 clean_fields = fields[:1] + [
                     (re.sub(r'\+[^\d][^\+]*', '', x)
-                     if re.match(util.keep_locator_pattern, x)
+                     if re.match(arvados.util.keep_locator_pattern, x)
                      else x)
                     for x in fields[1:]]
                 clean += [' '.join(clean_fields), "\n"]
@@ -180,7 +184,7 @@ class CollectionWriter(CollectionBase):
 
     def _work_trees(self):
         path, stream_name, max_manifest_depth = self._queued_trees[0]
-        d = util.listdir_recursive(
+        d = arvados.util.listdir_recursive(
             path, max_depth = (None if max_manifest_depth == 0 else 0))
         if d:
             self._queue_dirents(stream_name, d)
@@ -418,7 +422,7 @@ class ResumableCollectionWriter(CollectionWriter):
         return writer
 
     def check_dependencies(self):
-        for path, orig_stat in self._dependencies.items():
+        for path, orig_stat in list(self._dependencies.items()):
             if not S_ISREG(orig_stat[ST_MODE]):
                 raise errors.StaleWriterStateError("{} not file".format(path))
             try:
@@ -544,7 +548,7 @@ class RichCollectionBase(CollectionBase):
                     else:
                         item = ArvadosFile(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
-                    self._committed = False
+                    self.set_committed(False)
                     self.notify(ADD, self, pathcomponents[0], item)
                 return item
             else:
@@ -552,7 +556,7 @@ class RichCollectionBase(CollectionBase):
                     # create new collection
                     item = Subcollection(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
-                    self._committed = False
+                    self.set_committed(False)
                     self.notify(ADD, self, pathcomponents[0], item)
                 if isinstance(item, RichCollectionBase):
                     return item.find_or_create(pathcomponents[1], create_type)
@@ -659,25 +663,31 @@ class RichCollectionBase(CollectionBase):
     @synchronized
     def committed(self):
         """Determine if the collection has been committed to the API server."""
-
-        if self._committed is False:
-            return False
-        for v in self._items.values():
-            if v.committed() is False:
-                return False
-        return True
+        return self._committed
 
     @synchronized
-    def set_committed(self):
-        """Recursively set committed flag to True."""
-        self._committed = True
-        for k,v in self._items.items():
-            v.set_committed()
+    def set_committed(self, value=True):
+        """Recursively set committed flag.
+
+        If value is True, set committed to be True for this and all children.
+
+        If value is False, set committed to be False for this and all parents.
+        """
+        if value == self._committed:
+            return
+        if value:
+            for k,v in list(self._items.items()):
+                v.set_committed(True)
+            self._committed = True
+        else:
+            self._committed = False
+            if self.parent is not None:
+                self.parent.set_committed(False)
 
     @synchronized
     def __iter__(self):
         """Iterate over names of files and collections contained in this collection."""
-        return iter(self._items.keys())
+        return iter(list(self._items.keys()))
 
     @synchronized
     def __getitem__(self, k):
@@ -703,23 +713,23 @@ class RichCollectionBase(CollectionBase):
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
-        self._committed = False
+        self.set_committed(False)
         self.notify(DEL, self, p, None)
 
     @synchronized
     def keys(self):
         """Get a list of names of files and collections directly contained in this collection."""
-        return self._items.keys()
+        return list(self._items.keys())
 
     @synchronized
     def values(self):
         """Get a list of files and collection objects directly contained in this collection."""
-        return self._items.values()
+        return list(self._items.values())
 
     @synchronized
     def items(self):
         """Get a list of (name, object) tuples directly contained in this collection."""
-        return self._items.items()
+        return list(self._items.items())
 
     def exists(self, path):
         """Test if there is a file or collection at `path`."""
@@ -746,13 +756,13 @@ class RichCollectionBase(CollectionBase):
                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
             deleteditem = self._items[pathcomponents[0]]
             del self._items[pathcomponents[0]]
-            self._committed = False
+            self.set_committed(False)
             self.notify(DEL, self, pathcomponents[0], deleteditem)
         else:
             item.remove(pathcomponents[1])
 
     def _clonefrom(self, source):
-        for k,v in source.items():
+        for k,v in list(source.items()):
             self._items[k] = v.clone(self, k)
 
     def clone(self):
@@ -795,7 +805,7 @@ class RichCollectionBase(CollectionBase):
             item = source_obj.clone(self, target_name)
 
         self._items[target_name] = item
-        self._committed = False
+        self.set_committed(False)
 
         if modified_from:
             self.notify(MOD, self, target_name, (modified_from, item))
@@ -1022,7 +1032,7 @@ class RichCollectionBase(CollectionBase):
 
         """
         if changes:
-            self._committed = False
+            self.set_committed(False)
         for change in changes:
             event_type = change[0]
             path = change[1]
@@ -1110,7 +1120,7 @@ class RichCollectionBase(CollectionBase):
     @synchronized
     def flush(self):
         """Flush bufferblocks to Keep."""
-        for e in self.values():
+        for e in list(self.values()):
             e.flush()
 
 
@@ -1216,11 +1226,11 @@ class Collection(RichCollectionBase):
         self.events = None
 
         if manifest_locator_or_text:
-            if re.match(util.keep_locator_pattern, manifest_locator_or_text):
+            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
                 self._manifest_locator = manifest_locator_or_text
-            elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
+            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
                 self._manifest_locator = manifest_locator_or_text
-            elif re.match(util.manifest_pattern, manifest_locator_or_text):
+            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
                 self._manifest_text = manifest_locator_or_text
             else:
                 raise errors.ArgumentError(
@@ -1310,6 +1320,7 @@ class Collection(RichCollectionBase):
                 uuid=self._manifest_locator).execute(
                     num_retries=self.num_retries))
             self._manifest_text = self._api_response['manifest_text']
+            self._portable_data_hash = self._api_response['portable_data_hash']
             # If not overriden via kwargs, we should try to load the
             # replication_desired from the API server
             if self.replication_desired is None:
@@ -1335,10 +1346,10 @@ class Collection(RichCollectionBase):
         error_via_api = None
         error_via_keep = None
         should_try_keep = ((self._manifest_text is None) and
-                           util.keep_locator_pattern.match(
+                           arvados.util.keep_locator_pattern.match(
                                self._manifest_locator))
         if ((self._manifest_text is None) and
-            util.signed_locator_pattern.match(self._manifest_locator)):
+            arvados.util.signed_locator_pattern.match(self._manifest_locator)):
             error_via_keep = self._populate_from_keep()
         if self._manifest_text is None:
             error_via_api = self._populate_from_api_server()
@@ -1364,7 +1375,7 @@ class Collection(RichCollectionBase):
 
 
     def _has_collection_uuid(self):
-        return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
+        return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
 
     def __enter__(self):
         return self
@@ -1476,7 +1487,7 @@ class Collection(RichCollectionBase):
                     num_retries=num_retries))
             self._manifest_text = self._api_response["manifest_text"]
             self._portable_data_hash = self._api_response["portable_data_hash"]
-            self.set_committed()
+            self.set_committed(True)
 
         return self._manifest_text
 
@@ -1537,7 +1548,7 @@ class Collection(RichCollectionBase):
             self._portable_data_hash = self._api_response["portable_data_hash"]
 
             self._manifest_text = text
-            self.set_committed()
+            self.set_committed(True)
 
         return text
 
@@ -1568,7 +1579,7 @@ class Collection(RichCollectionBase):
                 stream_name = tok.replace('\\040', ' ')
                 blocks = []
                 segments = []
-                streamoffset = 0L
+                streamoffset = 0
                 state = BLOCKS
                 self.find_or_create(stream_name, COLLECTION)
                 continue
@@ -1576,7 +1587,7 @@ class Collection(RichCollectionBase):
             if state == BLOCKS:
                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
                 if block_locator:
-                    blocksize = long(block_locator.group(1))
+                    blocksize = int(block_locator.group(1))
                     blocks.append(Range(tok, streamoffset, blocksize, 0))
                     streamoffset += blocksize
                 else:
@@ -1585,8 +1596,8 @@ class Collection(RichCollectionBase):
             if state == SEGMENTS:
                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
                 if file_segment:
-                    pos = long(file_segment.group(1))
-                    size = long(file_segment.group(2))
+                    pos = int(file_segment.group(1))
+                    size = int(file_segment.group(2))
                     name = file_segment.group(3).replace('\\040', ' ')
                     filepath = os.path.join(stream_name, name)
                     afile = self.find_or_create(filepath, FILE)
@@ -1602,7 +1613,7 @@ class Collection(RichCollectionBase):
                 stream_name = None
                 state = STREAM_NAME
 
-        self.set_committed()
+        self.set_committed(True)
 
     @synchronized
     def notify(self, event, collection, name, item):
@@ -1652,7 +1663,7 @@ class Subcollection(RichCollectionBase):
     @must_be_writable
     @synchronized
     def _reparent(self, newparent, newname):
-        self._committed = False
+        self.set_committed(False)
         self.flush()
         self.parent.remove(self.name, recursive=True)
         self.parent = newparent