9986: Move llfuse thread-shutdown check into a "finally" block so it actually runs.
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
index fdc93fb3beb37b7e4d4eda8400eb6f72ff82423a..3f2bcd5ec2464fb5351a42be205b17f1fe93e49c 100644 (file)
@@ -8,8 +8,9 @@ import functools
 import threading
 from apiclient import errors as apiclient_errors
 import errno
+import time
 
-from fusefile import StringFile, ObjectFile, FuseArvadosFile
+from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
 from fresh import FreshBase, convertTime, use_counter, check_update
 
 import arvados.collection
@@ -183,6 +184,9 @@ class Directory(FreshBase):
     def flush(self):
         pass
 
+    def want_event_subscribe(self):
+        raise NotImplementedError()
+
     def create(self, name):
         raise NotImplementedError()
 
@@ -316,6 +320,11 @@ class CollectionDirectoryBase(Directory):
         self.flush()
         src.flush()
 
+    def clear(self, force=False):
+        r = super(CollectionDirectoryBase, self).clear(force)
+        self.collection = None
+        return r
+
 
 class CollectionDirectory(CollectionDirectoryBase):
     """Represents the root of a directory tree representing a collection."""
@@ -326,6 +335,13 @@ class CollectionDirectory(CollectionDirectoryBase):
         self.num_retries = num_retries
         self.collection_record_file = None
         self.collection_record = None
+        self._poll = True
+        try:
+            self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
+        except:
+            _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
+            self._poll_time = 60*60
+
         if isinstance(collection_record, dict):
             self.collection_locator = collection_record['uuid']
             self._mtime = convertTime(collection_record.get('modified_at'))
@@ -343,6 +359,9 @@ class CollectionDirectory(CollectionDirectoryBase):
     def writable(self):
         return self.collection.writable() if self.collection is not None else self._writable
 
+    def want_event_subscribe(self):
+        return (uuid_pattern.match(self.collection_locator) is not None)
+
     # Used by arv-web.py to switch the contents of the CollectionDirectory
     def change_collection(self, new_locator):
         """Switch the contents of the CollectionDirectory.
@@ -434,6 +453,7 @@ class CollectionDirectory(CollectionDirectoryBase):
             _logger.exception("arv-mount %s: error", self.collection_locator)
             if self.collection_record is not None and "manifest_text" in self.collection_record:
                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
+        self.invalidate()
         return False
 
     @use_counter
@@ -474,6 +494,79 @@ class CollectionDirectory(CollectionDirectoryBase):
             self.collection.stop_threads()
 
 
+class TmpCollectionDirectory(CollectionDirectoryBase):
+    """A directory backed by an Arvados collection that never gets saved.
+
+    This supports using Keep as scratch space. A userspace program can
+    read the .arvados#collection file to get a current manifest in
+    order to save a snapshot of the scratch data or use it as a crunch
+    job output.
+    """
+
+    class UnsaveableCollection(arvados.collection.Collection):
+        def save(self):
+            pass
+        def save_new(self):
+            pass
+
+    def __init__(self, parent_inode, inodes, api_client, num_retries):
+        collection = self.UnsaveableCollection(
+            api_client=api_client,
+            keep_client=api_client.keep,
+            num_retries=num_retries)
+        super(TmpCollectionDirectory, self).__init__(
+            parent_inode, inodes, collection)
+        self.collection_record_file = None
+        self.populate(self.mtime())
+
+    def on_event(self, *args, **kwargs):
+        super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
+        if self.collection_record_file:
+            with llfuse.lock:
+                self.collection_record_file.invalidate()
+            self.inodes.invalidate_inode(self.collection_record_file.inode)
+            _logger.debug("%s invalidated collection record", self)
+
+    def collection_record(self):
+        with llfuse.lock_released:
+            return {
+                "uuid": None,
+                "manifest_text": self.collection.manifest_text(),
+                "portable_data_hash": self.collection.portable_data_hash(),
+            }
+
+    def __contains__(self, k):
+        return (k == '.arvados#collection' or
+                super(TmpCollectionDirectory, self).__contains__(k))
+
+    @use_counter
+    def __getitem__(self, item):
+        if item == '.arvados#collection':
+            if self.collection_record_file is None:
+                self.collection_record_file = FuncToJSONFile(
+                    self.inode, self.collection_record)
+                self.inodes.add_entry(self.collection_record_file)
+            return self.collection_record_file
+        return super(TmpCollectionDirectory, self).__getitem__(item)
+
+    def persisted(self):
+        return False
+
+    def writable(self):
+        return True
+
+    def want_event_subscribe(self):
+        return False
+
+    def finalize(self):
+        self.collection.stop_threads()
+
+    def invalidate(self):
+        if self.collection_record_file:
+            self.collection_record_file.invalidate()
+        super(TmpCollectionDirectory, self).invalidate()
+
+
 class MagicDirectory(Directory):
     """A special directory that logically contains the set of all extant keep locators.
 
@@ -488,12 +581,13 @@ class MagicDirectory(Directory):
     README_TEXT = """
 This directory provides access to Arvados collections as subdirectories listed
 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
-the form '1234567890abcdefghijklmnopqrstuv+123').
+the form '1234567890abcdef0123456789abcdef+123').
 
 Note that this directory will appear empty until you attempt to access a
 specific collection subdirectory (such as trying to 'cd' into it), at which
 point the collection will actually be looked up on the server and the directory
 will appear if it exists.
+
 """.lstrip()
 
     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
@@ -532,10 +626,11 @@ will appear if it exists.
                     self.inodes.del_entry(e)
                 return True
             else:
+                self.inodes.invalidate_entry(self.inode, k)
                 self.inodes.del_entry(e)
                 return False
-        except Exception as e:
-            _logger.debug('arv-mount exception keep %s', e)
+        except Exception as ex:
+            _logger.debug('arv-mount exception keep %s', ex)
             self.inodes.del_entry(e)
             return False
 
@@ -548,6 +643,9 @@ will appear if it exists.
     def clear(self, force=False):
         pass
 
+    def want_event_subscribe(self):
+        return not self.pdh_only
+
 
 class RecursiveInvalidateDirectory(Directory):
     def invalidate(self):
@@ -569,6 +667,9 @@ class TagsDirectory(RecursiveInvalidateDirectory):
         self._poll = True
         self._poll_time = poll_time
 
+    def want_event_subscribe(self):
+        return True
+
     @use_counter
     def update(self):
         with llfuse.lock_released:
@@ -597,6 +698,9 @@ class TagDirectory(Directory):
         self._poll = poll
         self._poll_time = poll_time
 
+    def want_event_subscribe(self):
+        return True
+
     @use_counter
     def update(self):
         with llfuse.lock_released:
@@ -628,6 +732,9 @@ class ProjectDirectory(Directory):
         self._updating_lock = threading.Lock()
         self._current_user = None
 
+    def want_event_subscribe(self):
+        return True
+
     def createDirectory(self, i):
         if collection_uuid_pattern.match(i['uuid']):
             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
@@ -848,3 +955,6 @@ class SharedDirectory(Directory):
                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
         except Exception:
             _logger.exception()
+
+    def want_event_subscribe(self):
+        return True