7751: Refactor TmpCollectionDirectory: generate .arvados#collection less often.
authorTom Clegg <tom@curoverse.com>
Tue, 24 Nov 2015 18:43:42 +0000 (13:43 -0500)
committerTom Clegg <tom@curoverse.com>
Wed, 25 Nov 2015 15:44:59 +0000 (10:44 -0500)
services/fuse/arvados_fuse/fusedir.py
services/fuse/arvados_fuse/fusefile.py

index 04c2d5064234b7e535dabebd7a164d1526be0381..a2135b3defc299e29b3f5a284f61653d24037fe7 100644 (file)
@@ -8,8 +8,9 @@ import functools
 import threading
 from apiclient import errors as apiclient_errors
 import errno
+import time
 
-from fusefile import StringFile, ObjectFile, FuseArvadosFile
+from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
 from fresh import FreshBase, convertTime, use_counter, check_update
 
 import arvados.collection
@@ -483,43 +484,36 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
     job output.
     """
 
+    class UnsaveableCollection(arvados.collection.Collection):
+        def save(self):
+            pass
+        def save_new(self):
+            pass
+
     def __init__(self, parent_inode, inodes, api_client, num_retries):
-        collection = arvados.collection.Collection(
+        collection = self.UnsaveableCollection(
             api_client=api_client,
             keep_client=api_client.keep)
-        collection.save = self._commit_collection
-        collection.save_new = self._commit_collection
         super(TmpCollectionDirectory, self).__init__(
             parent_inode, inodes, collection)
         self.collection_record_file = None
-        self._subscribed = False
-        self._update_collection_record()
-
-    def update(self, *args, **kwargs):
-        if not self._subscribed:
-            with llfuse.lock_released:
-                self.populate(self.mtime())
-            self._subscribed = True
-
-    @use_counter
-    def _commit_collection(self):
-        """Commit the data blocks, but don't save the collection to API.
+        self.populate(self.mtime())
 
-        Update the content of the special .arvados#collection file, if
-        it has been instantiated.
-        """
-        self.collection.flush()
-        self._update_collection_record()
-        if self.collection_record_file is not None:
-            self.collection_record_file.update(self.collection_record)
+    def on_event(self, *args, **kwargs):
+        super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
+        if self.collection_record_file:
+            with llfuse.lock:
+                self.collection_record_file.invalidate()
             self.inodes.invalidate_inode(self.collection_record_file.inode)
+            _logger.debug("%s invalidated collection record", self)
 
-    def _update_collection_record(self):
-        self.collection_record = {
-            "uuid": None,
-            "manifest_text": self.collection.manifest_text(),
-            "portable_data_hash": self.collection.portable_data_hash(),
-        }
+    def collection_record(self):
+        with llfuse.lock_released:
+            return {
+                "uuid": None,
+                "manifest_text": self.collection.manifest_text(),
+                "portable_data_hash": self.collection.portable_data_hash(),
+            }
 
     def __contains__(self, k):
         return (k == '.arvados#collection' or
@@ -529,18 +523,26 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
     def __getitem__(self, item):
         if item == '.arvados#collection':
             if self.collection_record_file is None:
-                self.collection_record_file = ObjectFile(
+                self.collection_record_file = FuncToJSONFile(
                     self.inode, self.collection_record)
                 self.inodes.add_entry(self.collection_record_file)
             return self.collection_record_file
         return super(TmpCollectionDirectory, self).__getitem__(item)
 
+    def persisted(self):
+        return False
+
     def writable(self):
         return True
 
     def finalize(self):
         self.collection.stop_threads()
 
+    def invalidate(self):
+        if self.collection_record_file:
+            self.collection_record_file.invalidate()
+        super(TmpCollectionDirectory, self).invalidate()
+
 
 class MagicDirectory(Directory):
     """A special directory that logically contains the set of all extant keep locators.
index 4d472cff1cca38380d80afa63b9783027ad1db30..d4c075a40d292f4515611a34706b7c670f831e18 100644 (file)
@@ -1,7 +1,8 @@
-import logging
-import re
 import json
 import llfuse
+import logging
+import re
+import time
 
 from fresh import FreshBase, convertTime
 
@@ -99,3 +100,30 @@ class ObjectFile(StringFile):
 
     def persisted(self):
         return True
+
+
+class FuncToJSONFile(StringFile):
+    """File content is the return value of a given function, encoded as JSON.
+
+    The function is called at the time the file is read. The result is
+    cached until invalidate() is called.
+    """
+    def __init__(self, parent_inode, func):
+        super(FuncToJSONFile, self).__init__(parent_inode, "", 0)
+        self.func = func
+
+    def size(self):
+        self._update()
+        return super(FuncToJSONFile, self).size()
+
+    def readfrom(self, *args, **kwargs):
+        self._update()
+        return super(FuncToJSONFile, self).readfrom(*args, **kwargs)
+
+    def _update(self):
+        if not self.stale():
+            return
+        self._mtime = time.time()
+        obj = self.func()
+        self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
+        self.fresh()