5414: Control initializer load order using require_relative instead of alphabetical...
[arvados.git] / services / fuse / arvados_fuse / __init__.py
index b2a3b2e7a1bd3059eaf1b265a90edced0ef69e13..2d4f6c9dcde55eac714f3b2a0171fa3c836b59b6 100644 (file)
@@ -20,6 +20,7 @@ import _strptime
 import calendar
 import threading
 import itertools
+import ciso8601
 
 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
 
@@ -30,42 +31,12 @@ _logger = logging.getLogger('arvados.arvados_fuse')
 # appear as underscores in the fuse mount.)
 _disallowed_filename_characters = re.compile('[\x00/]')
 
-class SafeApi(object):
-    '''Threadsafe wrapper for API object.  This stores and returns a different api
-    object per thread, because httplib2 which underlies apiclient is not
-    threadsafe.
-    '''
-
-    def __init__(self, config):
-        self.host = config.get('ARVADOS_API_HOST')
-        self.api_token = config.get('ARVADOS_API_TOKEN')
-        self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
-        self.local = threading.local()
-        self.block_cache = arvados.KeepBlockCache()
-
-    def localapi(self):
-        if 'api' not in self.local.__dict__:
-            self.local.api = arvados.api('v1', False, self.host,
-                                         self.api_token, self.insecure)
-        return self.local.api
-
-    def localkeep(self):
-        if 'keep' not in self.local.__dict__:
-            self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
-        return self.local.keep
-
-    def __getattr__(self, name):
-        # Proxy nonexistent attributes to the local API client.
-        try:
-            return getattr(self.localapi(), name)
-        except AttributeError:
-            return super(SafeApi, self).__getattr__(name)
-
-
 def convertTime(t):
-    '''Parse Arvados timestamp to unix time.'''
+    """Parse Arvados timestamp to unix time."""
+    if not t:
+        return 0
     try:
-        return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
+        return calendar.timegm(ciso8601.parse_datetime_unaware(t).timetuple())
     except (TypeError, ValueError):
         return 0
 
@@ -280,6 +251,7 @@ class Directory(FreshBase):
                 n.clear()
             llfuse.invalidate_entry(self.inode, str(n))
             self.inodes.del_entry(oldentries[n])
+        llfuse.invalidate_inode(self.inode)
         self.invalidate()
 
     def mtime(self):
@@ -298,24 +270,31 @@ class CollectionDirectory(Directory):
         self.collection_object = None
         if isinstance(collection, dict):
             self.collection_locator = collection['uuid']
+            self._mtime = convertTime(collection.get('modified_at'))
         else:
             self.collection_locator = collection
+            self._mtime = 0
 
     def same(self, i):
         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
 
-    def new_collection(self, new_collection_object):
+    # Used by arv-web.py to switch the contents of the CollectionDirectory
+    def change_collection(self, new_locator):
+        """Switch the contents of the CollectionDirectory.  Must be called with llfuse.lock held."""
+        self.collection_locator = new_locator
+        self.collection_object = None
+        self.update()
+
+    def new_collection(self, new_collection_object, coll_reader):
         self.collection_object = new_collection_object
 
+        self._mtime = convertTime(self.collection_object.get('modified_at'))
+
         if self.collection_object_file is not None:
             self.collection_object_file.update(self.collection_object)
 
         self.clear()
-        collection = arvados.CollectionReader(
-            self.collection_object["manifest_text"], self.api,
-            self.api.localkeep(), num_retries=self.num_retries)
-        collection.normalize()
-        for s in collection.all_streams():
+        for s in coll_reader.all_streams():
             cwd = self
             for part in s.name().split('/'):
                 if part != '' and part != '.':
@@ -331,34 +310,41 @@ class CollectionDirectory(Directory):
             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
                 return True
 
+            if self.collection_locator is None:
+                self.fresh()
+                return True
+
             with llfuse.lock_released:
-                new_collection_object = self.api.collections().get(
-                    uuid=self.collection_locator
-                    ).execute(num_retries=self.num_retries)
+                coll_reader = arvados.CollectionReader(
+                    self.collection_locator, self.api, self.api.keep,
+                    num_retries=self.num_retries)
+                new_collection_object = coll_reader.api_response() or {}
+                # If the Collection only exists in Keep, there will be no API
+                # response.  Fill in the fields we need.
+                if 'uuid' not in new_collection_object:
+                    new_collection_object['uuid'] = self.collection_locator
                 if "portable_data_hash" not in new_collection_object:
                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
+                if 'manifest_text' not in new_collection_object:
+                    new_collection_object['manifest_text'] = coll_reader.manifest_text()
+                coll_reader.normalize()
             # end with llfuse.lock_released, re-acquire lock
 
             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
-                self.new_collection(new_collection_object)
+                self.new_collection(new_collection_object, coll_reader)
 
             self.fresh()
             return True
-        except apiclient.errors.HttpError as e:
-            if e.resp.status == 404:
-                _logger.warn("arv-mount %s: not found", self.collection_locator)
-            else:
-                _logger.error("arv-mount %s: error", self.collection_locator)
-                _logger.exception(detail)
+        except arvados.errors.NotFoundError:
+            _logger.exception("arv-mount %s: error", self.collection_locator)
         except arvados.errors.ArgumentError as detail:
             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
             if self.collection_object is not None and "manifest_text" in self.collection_object:
                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
-        except Exception as detail:
-            _logger.error("arv-mount %s: error", self.collection_locator)
+        except Exception:
+            _logger.exception("arv-mount %s: error", self.collection_locator)
             if self.collection_object is not None and "manifest_text" in self.collection_object:
                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
-            _logger.exception(detail)
         return False
 
     def __getitem__(self, item):
@@ -377,10 +363,6 @@ class CollectionDirectory(Directory):
         else:
             return super(CollectionDirectory, self).__contains__(k)
 
-    def mtime(self):
-        self.checkupdate()
-        return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
-
 
 class MagicDirectory(Directory):
     '''A special directory that logically contains the set of all extant keep
@@ -455,8 +437,8 @@ class RecursiveInvalidateDirectory(Directory):
             super(RecursiveInvalidateDirectory, self).invalidate()
             for a in self._entries:
                 self._entries[a].invalidate()
-        except Exception as e:
-            _logger.exception(e)
+        except Exception:
+            _logger.exception()
         finally:
             if self.inode == llfuse.ROOT_INODE:
                 llfuse.lock.release()
@@ -481,8 +463,8 @@ class TagsDirectory(RecursiveInvalidateDirectory):
                 ).execute(num_retries=self.num_retries)
         if "items" in tags:
             self.merge(tags['items'],
-                       lambda i: i['name'] if 'name' in i else i['uuid'],
-                       lambda a, i: a.tag == i,
+                       lambda i: i['name'],
+                       lambda a, i: a.tag == i['name'],
                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
 
 
@@ -527,6 +509,8 @@ class ProjectDirectory(Directory):
         self.project_object = project_object
         self.project_object_file = None
         self.uuid = project_object['uuid']
+        self._poll = poll
+        self._poll_time = poll_time
 
     def createDirectory(self, i):
         if collection_uuid_pattern.match(i['uuid']):
@@ -674,8 +658,8 @@ class SharedDirectory(Directory):
                        lambda i: i[0],
                        lambda a, i: a.uuid == i[1]['uuid'],
                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
-        except Exception as e:
-            _logger.exception(e)
+        except Exception:
+            _logger.exception()
 
 
 class FileHandle(object):
@@ -767,6 +751,8 @@ class Operations(llfuse.Operations):
         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
         if isinstance(e, Directory):
             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
+        elif isinstance(e, StreamReaderFile):
+            entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
         else:
             entry.st_mode |= stat.S_IFREG
 
@@ -839,8 +825,8 @@ class Operations(llfuse.Operations):
         except arvados.errors.NotFoundError as e:
             _logger.warning("Block not found: " + str(e))
             raise llfuse.FUSEError(errno.EIO)
-        except Exception as e:
-            _logger.exception(e)
+        except Exception:
+            _logger.exception()
             raise llfuse.FUSEError(errno.EIO)
 
     def release(self, fh):
@@ -915,5 +901,5 @@ class Operations(llfuse.Operations):
     # arv-mount.
     # The workaround is to implement it with the proper number of parameters,
     # and then everything works out.
-    def create(self, p1, p2, p3, p4, p5):
+    def create(self, inode_parent, name, mode, flags, ctx):
         raise llfuse.FUSEError(errno.EROFS)