3198: Enable support for event bus based updates in arv-mount.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 18 May 2015 20:49:18 +0000 (16:49 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 18 May 2015 20:49:18 +0000 (16:49 -0400)
sdk/python/arvados/_ranges.py
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/bin/arv-mount

index d4f20f00087507a89c3d363b29a400176e45d542..d5ff6ed1b581f1f1b2047c8fa9198e5371a5e917 100644 (file)
@@ -2,6 +2,9 @@ import logging
 
 _logger = logging.getLogger('arvados.ranges')
 
+# Log level below 'debug' !
+RANGES_SPAM = 9
+
 class Range(object):
     def __init__(self, locator, range_start, range_size, segment_offset=0):
         self.locator = locator
@@ -96,7 +99,7 @@ def locators_and_ranges(data_locators, range_start, range_size):
         block_start = dl.range_start
         block_size = dl.range_size
         block_end = block_start + block_size
-        _logger.debug(
+        _logger.log(RANGES_SPAM,
             "%s range_start %s block_start %s range_end %s block_end %s",
             dl.locator, range_start, block_start, range_end, block_end)
         if range_end <= block_start:
@@ -170,7 +173,7 @@ def replace_range(data_locators, new_range_start, new_range_size, new_locator, n
         dl = data_locators[i]
         old_segment_start = dl.range_start
         old_segment_end = old_segment_start + dl.range_size
-        _logger.debug(
+        _logger.log(RANGES_SPAM,
             "%s range_start %s segment_start %s range_end %s segment_end %s",
             dl, new_range_start, old_segment_start, new_range_end,
             old_segment_end)
index 913db4ccfea6fa12bde06b7a623f760f03f99374..46c5a1b06bd88fdd22ddbe779bd195e2006cfc0f 100644 (file)
@@ -302,7 +302,16 @@ class Operations(llfuse.Operations):
                 item = self.inodes.inode_cache.find(ev["object_uuid"])
                 if item is not None:
                     item.invalidate()
-                    item.update()
+                    if ev["object_kind"] == "arvados#collection":
+                        item.update(to_pdh=ev.get("properties", {}).get("new_attributes", {}).get("portable_data_hash"))
+                    else:
+                        item.update()
+
+                oldowner = ev.get("properties", {}).get("old_attributes", {}).get("owner_uuid")
+                olditemparent = self.inodes.inode_cache.find(oldowner)
+                if olditemparent is not None:
+                    olditemparent.invalidate()
+                    olditemparent.update()
 
                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
                 if itemparent is not None:
index 2fca36eb0a5d62162ff831ee60054f55d4889246..85f4bca833f820835d72057875536a3b53944fdb 100644 (file)
@@ -371,7 +371,7 @@ class CollectionDirectory(CollectionDirectoryBase):
         return self.collection_locator
 
     @use_counter
-    def update(self):
+    def update(self, to_pdh=None):
         try:
             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
                 return True
@@ -388,7 +388,10 @@ class CollectionDirectory(CollectionDirectoryBase):
 
                     _logger.debug("Updating %s", self.collection_locator)
                     if self.collection:
-                        self.collection.update()
+                        if self.collection.portable_data_hash() == to_pdh:
+                            _logger.debug("%s is fresh at pdh '%s'", self.collection_locator, to_pdh)
+                        else:
+                            self.collection.update()
                     else:
                         if uuid_pattern.match(self.collection_locator):
                             coll_reader = arvados.collection.Collection(
index 76476da1cb488ec8f8019e57dc8ed54e9caf68e3..c3f4ab01ffb46abea8fb07419835a8eda91366dc 100755 (executable)
@@ -163,6 +163,9 @@ From here, the following directories are available:
         # Initialize the fuse connection
         llfuse.init(operations, args.mountpoint, opts)
 
+        # Subscribe to change events from API server
+        operations.listen_for_events(api)
+
         t = threading.Thread(None, lambda: llfuse.main())
         t.start()
 
@@ -199,6 +202,10 @@ From here, the following directories are available:
     else:
         try:
             llfuse.init(operations, args.mountpoint, opts)
+
+            # Subscribe to change events from API server
+            operations.listen_for_events(api)
+
             llfuse.main()
         except Exception as e:
             logger.exception('arv-mount: exception during mount')