Declare serialize / accept json fields, start working on states
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
index 04c2d5064234b7e535dabebd7a164d1526be0381..a2135b3defc299e29b3f5a284f61653d24037fe7 100644 (file)
@@ -8,8 +8,9 @@ import functools
 import threading
 from apiclient import errors as apiclient_errors
 import errno
+import time
 
-from fusefile import StringFile, ObjectFile, FuseArvadosFile
+from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
 from fresh import FreshBase, convertTime, use_counter, check_update
 
 import arvados.collection
@@ -483,43 +484,36 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
     job output.
     """
 
+    class UnsaveableCollection(arvados.collection.Collection):
+        def save(self):
+            pass
+        def save_new(self):
+            pass
+
     def __init__(self, parent_inode, inodes, api_client, num_retries):
-        collection = arvados.collection.Collection(
+        collection = self.UnsaveableCollection(
             api_client=api_client,
             keep_client=api_client.keep)
-        collection.save = self._commit_collection
-        collection.save_new = self._commit_collection
         super(TmpCollectionDirectory, self).__init__(
             parent_inode, inodes, collection)
         self.collection_record_file = None
-        self._subscribed = False
-        self._update_collection_record()
-
-    def update(self, *args, **kwargs):
-        if not self._subscribed:
-            with llfuse.lock_released:
-                self.populate(self.mtime())
-            self._subscribed = True
-
-    @use_counter
-    def _commit_collection(self):
-        """Commit the data blocks, but don't save the collection to API.
+        self.populate(self.mtime())
 
-        Update the content of the special .arvados#collection file, if
-        it has been instantiated.
-        """
-        self.collection.flush()
-        self._update_collection_record()
-        if self.collection_record_file is not None:
-            self.collection_record_file.update(self.collection_record)
+    def on_event(self, *args, **kwargs):
+        super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
+        if self.collection_record_file:
+            with llfuse.lock:
+                self.collection_record_file.invalidate()
             self.inodes.invalidate_inode(self.collection_record_file.inode)
+            _logger.debug("%s invalidated collection record", self)
 
-    def _update_collection_record(self):
-        self.collection_record = {
-            "uuid": None,
-            "manifest_text": self.collection.manifest_text(),
-            "portable_data_hash": self.collection.portable_data_hash(),
-        }
+    def collection_record(self):
+        with llfuse.lock_released:
+            return {
+                "uuid": None,
+                "manifest_text": self.collection.manifest_text(),
+                "portable_data_hash": self.collection.portable_data_hash(),
+            }
 
     def __contains__(self, k):
         return (k == '.arvados#collection' or
@@ -529,18 +523,26 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
     def __getitem__(self, item):
         if item == '.arvados#collection':
             if self.collection_record_file is None:
-                self.collection_record_file = ObjectFile(
+                self.collection_record_file = FuncToJSONFile(
                     self.inode, self.collection_record)
                 self.inodes.add_entry(self.collection_record_file)
             return self.collection_record_file
         return super(TmpCollectionDirectory, self).__getitem__(item)
 
+    def persisted(self):
+        return False
+
     def writable(self):
         return True
 
     def finalize(self):
         self.collection.stop_threads()
 
+    def invalidate(self):
+        if self.collection_record_file:
+            self.collection_record_file.invalidate()
+        super(TmpCollectionDirectory, self).invalidate()
+
 
 class MagicDirectory(Directory):
     """A special directory that logically contains the set of all extant keep locators.