3198: Store filename/directory name in ArvadosFile/Subcollection object.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 15 Apr 2015 19:39:00 +0000 (15:39 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 15 Apr 2015 19:39:00 +0000 (15:39 -0400)
Simplify callbacks to support a single callback, and allow callback on
Subcollection.  Notify MOD event on file flush.

sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/tests/test_arvfile.py

index ce342b5a41a887ec90cf6a4a979af06cd669e5e7..53653c052984b8de16ffb93d37fbb4a4a27369ae 100644 (file)
@@ -16,6 +16,8 @@ from ._normalize_stream import normalize_stream
 from ._ranges import locators_and_ranges, replace_range, Range
 from .retry import retry_method
 
+MOD = "mod"
+
 def split(path):
     """split(path) -> streamname, filename
 
@@ -588,7 +590,7 @@ class ArvadosFile(object):
 
     """
 
-    def __init__(self, parent, stream=[], segments=[]):
+    def __init__(self, parent, name, stream=[], segments=[]):
         """
         ArvadosFile constructor.
 
@@ -605,6 +607,7 @@ class ArvadosFile(object):
         for s in segments:
             self._add_segment(stream, s.locator, s.range_size)
         self._current_bblock = None
+        self.name = name
 
     def writable(self):
         return self.parent.writable()
@@ -614,9 +617,9 @@ class ArvadosFile(object):
         return copy.copy(self._segments)
 
     @synchronized
-    def clone(self, new_parent):
+    def clone(self, new_parent, new_name):
         """Make a copy of this file."""
-        cp = ArvadosFile(new_parent)
+        cp = ArvadosFile(new_parent, new_name)
         cp.replace_contents(self)
         return cp
 
@@ -799,6 +802,7 @@ class ArvadosFile(object):
         if self._current_bblock:
             self._repack_writes()
             self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+        self.parent.notify(MOD, self.parent, self.name, (self, self))
 
     @must_be_writable
     @synchronized
@@ -853,8 +857,8 @@ class ArvadosFileReader(ArvadosFileReaderBase):
 
     """
 
-    def __init__(self, arvadosfile, name, mode="r", num_retries=None):
-        super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
+    def __init__(self, arvadosfile,  mode="r", num_retries=None):
+        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries)
         self.arvadosfile = arvadosfile
 
     def size(self):
@@ -889,8 +893,8 @@ class ArvadosFileWriter(ArvadosFileReader):
 
     """
 
-    def __init__(self, arvadosfile, name, mode, num_retries=None):
-        super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
+    def __init__(self, arvadosfile, mode, num_retries=None):
+        super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries)
 
     @_FileLikeObjectBase._before_close
     @retry_method
index f03deedb18aece57b374a2e94c0e29d26b42319c..8091e8b461f121cef6f0de8629539e6edc1c4be8 100644 (file)
@@ -488,6 +488,7 @@ class RichCollectionBase(CollectionBase):
     def __init__(self, parent=None):
         self.parent = parent
         self._modified = True
+        self._callback = None
         self._items = {}
 
     def _my_api(self):
@@ -537,9 +538,9 @@ class RichCollectionBase(CollectionBase):
                 if item is None:
                     # create new file
                     if create_type == COLLECTION:
-                        item = Subcollection(self)
+                        item = Subcollection(self, pathcomponents[0])
                     else:
-                        item = ArvadosFile(self)
+                        item = ArvadosFile(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     self._modified = True
                     self.notify(ADD, self, pathcomponents[0], item)
@@ -547,7 +548,7 @@ class RichCollectionBase(CollectionBase):
             else:
                 if item is None:
                     # create new collection
-                    item = Subcollection(self)
+                    item = Subcollection(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     self._modified = True
                     self.notify(ADD, self, pathcomponents[0], item)
@@ -633,9 +634,9 @@ class RichCollectionBase(CollectionBase):
         name = os.path.basename(path)
 
         if mode == "r":
-            return ArvadosFileReader(arvfile, name, mode, num_retries=self.num_retries)
+            return ArvadosFileReader(arvfile, mode, num_retries=self.num_retries)
         else:
-            return ArvadosFileWriter(arvfile, name, mode, num_retries=self.num_retries)
+            return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
 
     @synchronized
     def modified(self):
@@ -733,7 +734,7 @@ class RichCollectionBase(CollectionBase):
 
     def _clonefrom(self, source):
         for k,v in source.items():
-            self._items[k] = v.clone(self)
+            self._items[k] = v.clone(self, k)
 
     def clone(self):
         raise NotImplementedError()
@@ -763,7 +764,7 @@ class RichCollectionBase(CollectionBase):
             modified_from = self[target_name]
 
         # Actually make the copy.
-        dup = source_obj.clone(self)
+        dup = source_obj.clone(self, target_name)
         self._items[target_name] = dup
         self._modified = True
 
@@ -881,15 +882,15 @@ class RichCollectionBase(CollectionBase):
             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
         for k in self:
             if k not in end_collection:
-               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
+               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
         for k in end_collection:
             if k in self:
                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
                 elif end_collection[k] != self[k]:
-                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
+                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
             else:
-                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
+                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
         return changes
 
     @must_be_writable
@@ -944,6 +945,24 @@ class RichCollectionBase(CollectionBase):
         stripped = self.manifest_text(strip=True)
         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
+    @synchronized
+    def subscribe(self, callback):
+        if self._callback is None:
+            self._callback = callback
+        else:
+            raise errors.ArgumentError("A callback is already set on this collection.")
+
+    @synchronized
+    def unsubscribe(self):
+        if self._callback is not None:
+            self._callback = None
+
+    @synchronized
+    def notify(self, event, collection, name, item):
+        if self._callback:
+            self._callback(event, collection, name, item)
+        self.root_collection().notify(event, collection, name, item)
+
     @synchronized
     def __eq__(self, other):
         if other is self:
@@ -1046,7 +1065,6 @@ class Collection(RichCollectionBase):
         self._api_response = None
 
         self.lock = threading.RLock()
-        self.callbacks = []
         self.events = None
 
         if manifest_locator_or_text:
@@ -1200,7 +1218,7 @@ class Collection(RichCollectionBase):
         return self._manifest_locator
 
     @synchronized
-    def clone(self, new_parent=None, readonly=False, new_config=None):
+    def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
         if new_config is None:
             new_config = self._config
         if readonly:
@@ -1333,19 +1351,6 @@ class Collection(RichCollectionBase):
         self._manifest_text = text
         self.set_unmodified()
 
-    @synchronized
-    def subscribe(self, callback):
-        self.callbacks.append(callback)
-
-    @synchronized
-    def unsubscribe(self, callback):
-        self.callbacks.remove(callback)
-
-    @synchronized
-    def notify(self, event, collection, name, item):
-        for c in self.callbacks:
-            c(event, collection, name, item)
-
     @synchronized
     def _import_manifest(self, manifest_text):
         """Import a manifest into a `Collection`.
@@ -1408,6 +1413,11 @@ class Collection(RichCollectionBase):
 
         self.set_unmodified()
 
+    @synchronized
+    def notify(self, event, collection, name, item):
+        if self._callback:
+            self._callback(event, collection, name, item)
+
 
 class Subcollection(RichCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
@@ -1417,10 +1427,11 @@ class Subcollection(RichCollectionBase):
 
     """
 
-    def __init__(self, parent):
+    def __init__(self, parent, name):
         super(Subcollection, self).__init__(parent)
         self.lock = self.root_collection().lock
         self._manifest_text = None
+        self.name = name
 
     def root_collection(self):
         return self.parent.root_collection()
@@ -1437,18 +1448,12 @@ class Subcollection(RichCollectionBase):
     def _my_block_manager(self):
         return self.root_collection()._my_block_manager()
 
-    def notify(self, event, collection, name, item):
-        return self.root_collection().notify(event, collection, name, item)
-
     def stream_name(self):
-        for k, v in self.parent.items():
-            if v is self:
-                return os.path.join(self.parent.stream_name(), k)
-        return '.'
+        return os.path.join(self.parent.stream_name(), self.name)
 
     @synchronized
-    def clone(self, new_parent):
-        c = Subcollection(new_parent)
+    def clone(self, new_parent, new_name):
+        c = Subcollection(new_parent, new_name)
         c._clonefrom(self)
         return c
 
index 825465cb4aae15e05876217c9fcbb74db3dce0c4..92c778e1c5076d2204dd3da435dc89841b53416c 100644 (file)
@@ -415,8 +415,8 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
             blocks[loc] = d
             stream.append(Range(loc, n, len(d)))
             n += len(d)
-        af = ArvadosFile(ArvadosFileReaderTestCase.MockParent(blocks, nocache), stream=stream, segments=[Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)])
-        return ArvadosFileReader(af, "count.txt")
+        af = ArvadosFile(ArvadosFileReaderTestCase.MockParent(blocks, nocache), "count.txt", stream=stream, segments=[Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)])
+        return ArvadosFileReader(af)
 
     def test_read_returns_first_block(self):
         # read() calls will be aligned on block boundaries - see #3663.
@@ -483,10 +483,10 @@ class ArvadosFileReadTestCase(unittest.TestCase, StreamRetryTestMixin):
         blockmanager = arvados.arvfile._BlockManager(self.keep_client())
         blockmanager.prefetch_enabled = False
         col = Collection(keep_client=self.keep_client(), block_manager=blockmanager)
-        af = ArvadosFile(col,
+        af = ArvadosFile(col, "test",
                          stream=stream,
                          segments=segments)
-        return ArvadosFileReader(af, "test", **kwargs)
+        return ArvadosFileReader(af, **kwargs)
 
     def read_for_test(self, reader, byte_count, **kwargs):
         return reader.read(byte_count, **kwargs)