Merge branch '5043-crunchstat-long-lines' closes #5043
[arvados.git] / services / fuse / arvados_fuse / __init__.py
index d5523e80838f46af3839941e984c3fee0f4edeb7..2d4f6c9dcde55eac714f3b2a0171fa3c836b59b6 100644 (file)
@@ -5,7 +5,6 @@
 import os
 import sys
 import llfuse
-from llfuse import FUSEError
 import errno
 import stat
 import threading
@@ -17,70 +16,42 @@ import apiclient
 import json
 import logging
 import time
+import _strptime
 import calendar
 import threading
+import itertools
+import ciso8601
+
 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
 
 _logger = logging.getLogger('arvados.arvados_fuse')
 
-class SafeApi(object):
-    '''Threadsafe wrapper for API object.  This stores and returns a different api
-    object per thread, because httplib2 which underlies apiclient is not
-    threadsafe.
-    '''
-
-    def __init__(self, config):
-        self.host = config.get('ARVADOS_API_HOST')
-        self.api_token = config.get('ARVADOS_API_TOKEN')
-        self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
-        self.local = threading.local()
-        self.block_cache = arvados.KeepBlockCache()
-
-    def localapi(self):
-        if 'api' not in self.local.__dict__:
-            self.local.api = arvados.api('v1', False, self.host,
-                                         self.api_token, self.insecure)
-        return self.local.api
-
-    def localkeep(self):
-        if 'keep' not in self.local.__dict__:
-            self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
-        return self.local.keep
-
-    def __getattr__(self, name):
-        # Proxy nonexistent attributes to the local API client.
-        try:
-            return getattr(self.localapi(), name)
-        except AttributeError:
-            return super(SafeApi, self).__getattr__(name)
-
+# Match any character which FUSE or Linux cannot accommodate as part
+# of a filename. (If present in a collection filename, they will
+# appear as underscores in the fuse mount.)
+_disallowed_filename_characters = re.compile('[\x00/]')
 
 def convertTime(t):
-    '''Parse Arvados timestamp to unix time.'''
+    """Parse Arvados timestamp to unix time."""
+    if not t:
+        return 0
     try:
-        return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
+        return calendar.timegm(ciso8601.parse_datetime_unaware(t).timetuple())
     except (TypeError, ValueError):
         return 0
 
 def sanitize_filename(dirty):
-    '''Remove troublesome characters from filenames.'''
-    # http://www.dwheeler.com/essays/fixing-unix-linux-filenames.html
+    '''Replace disallowed filename characters with harmless "_".'''
     if dirty is None:
         return None
-
-    fn = ""
-    for c in dirty:
-        if (c >= '\x00' and c <= '\x1f') or c == '\x7f' or c == '/':
-            # skip control characters and /
-            continue
-        fn += c
-
-    # strip leading - or ~ and leading/trailing whitespace
-    stripped = fn.lstrip("-~ ").rstrip()
-    if len(stripped) > 0:
-        return stripped
+    elif dirty == '':
+        return '_'
+    elif dirty == '.':
+        return '_'
+    elif dirty == '..':
+        return '__'
     else:
-        return None
+        return _disallowed_filename_characters.sub('_', dirty)
 
 
 class FreshBase(object):
@@ -280,6 +251,7 @@ class Directory(FreshBase):
                 n.clear()
             llfuse.invalidate_entry(self.inode, str(n))
             self.inodes.del_entry(oldentries[n])
+        llfuse.invalidate_inode(self.inode)
         self.invalidate()
 
     def mtime(self):
@@ -289,29 +261,40 @@ class Directory(FreshBase):
 class CollectionDirectory(Directory):
     '''Represents the root of a directory tree holding a collection.'''
 
-    def __init__(self, parent_inode, inodes, api, collection):
+    def __init__(self, parent_inode, inodes, api, num_retries, collection):
         super(CollectionDirectory, self).__init__(parent_inode)
         self.inodes = inodes
         self.api = api
+        self.num_retries = num_retries
         self.collection_object_file = None
         self.collection_object = None
         if isinstance(collection, dict):
             self.collection_locator = collection['uuid']
+            self._mtime = convertTime(collection.get('modified_at'))
         else:
             self.collection_locator = collection
+            self._mtime = 0
 
     def same(self, i):
         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
 
-    def new_collection(self, new_collection_object):
+    # Used by arv-web.py to switch the contents of the CollectionDirectory
+    def change_collection(self, new_locator):
+        """Switch the contents of the CollectionDirectory.  Must be called with llfuse.lock held."""
+        self.collection_locator = new_locator
+        self.collection_object = None
+        self.update()
+
+    def new_collection(self, new_collection_object, coll_reader):
         self.collection_object = new_collection_object
 
+        self._mtime = convertTime(self.collection_object.get('modified_at'))
+
         if self.collection_object_file is not None:
             self.collection_object_file.update(self.collection_object)
 
         self.clear()
-        collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api, self.api.localkeep())
-        for s in collection.all_streams():
+        for s in coll_reader.all_streams():
             cwd = self
             for part in s.name().split('/'):
                 if part != '' and part != '.':
@@ -327,32 +310,41 @@ class CollectionDirectory(Directory):
             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
                 return True
 
+            if self.collection_locator is None:
+                self.fresh()
+                return True
+
             with llfuse.lock_released:
-                new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute()
+                coll_reader = arvados.CollectionReader(
+                    self.collection_locator, self.api, self.api.keep,
+                    num_retries=self.num_retries)
+                new_collection_object = coll_reader.api_response() or {}
+                # If the Collection only exists in Keep, there will be no API
+                # response.  Fill in the fields we need.
+                if 'uuid' not in new_collection_object:
+                    new_collection_object['uuid'] = self.collection_locator
                 if "portable_data_hash" not in new_collection_object:
                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
+                if 'manifest_text' not in new_collection_object:
+                    new_collection_object['manifest_text'] = coll_reader.manifest_text()
+                coll_reader.normalize()
             # end with llfuse.lock_released, re-acquire lock
 
             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
-                self.new_collection(new_collection_object)
+                self.new_collection(new_collection_object, coll_reader)
 
             self.fresh()
             return True
-        except apiclient.errors.HttpError as e:
-            if e.resp.status == 404:
-                _logger.warn("arv-mount %s: not found", self.collection_locator)
-            else:
-                _logger.error("arv-mount %s: error", self.collection_locator)
-                _logger.exception(detail)
+        except arvados.errors.NotFoundError:
+            _logger.exception("arv-mount %s: error", self.collection_locator)
         except arvados.errors.ArgumentError as detail:
             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
             if self.collection_object is not None and "manifest_text" in self.collection_object:
                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
-        except Exception as detail:
-            _logger.error("arv-mount %s: error", self.collection_locator)
+        except Exception:
+            _logger.exception("arv-mount %s: error", self.collection_locator)
             if self.collection_object is not None and "manifest_text" in self.collection_object:
                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
-            _logger.exception(detail)
         return False
 
     def __getitem__(self, item):
@@ -371,10 +363,6 @@ class CollectionDirectory(Directory):
         else:
             return super(CollectionDirectory, self).__contains__(k)
 
-    def mtime(self):
-        self.checkupdate()
-        return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
-
 
 class MagicDirectory(Directory):
     '''A special directory that logically contains the set of all extant keep
@@ -386,17 +374,8 @@ class MagicDirectory(Directory):
     to readdir().
     '''
 
-    def __init__(self, parent_inode, inodes, api):
-        super(MagicDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
-        self.api = api
-        # Have to defer creating readme_file because at this point we don't
-        # yet have an inode assigned.
-        self.readme_file = None
-
-    def create_readme(self):
-        if self.readme_file is None:
-            text = '''This directory provides access to Arvados collections as subdirectories listed
+    README_TEXT = '''
+This directory provides access to Arvados collections as subdirectories listed
 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
 the form '1234567890abcdefghijklmnopqrstuv+123').
 
@@ -404,13 +383,27 @@ Note that this directory will appear empty until you attempt to access a
 specific collection subdirectory (such as trying to 'cd' into it), at which
 point the collection will actually be looked up on the server and the directory
 will appear if it exists.
-'''
-            self.readme_file = self.inodes.add_entry(StringFile(self.inode, text, time.time()))
-            self._entries["README"] = self.readme_file
+'''.lstrip()
 
-    def __contains__(self, k):
-        self.create_readme()
+    def __init__(self, parent_inode, inodes, api, num_retries):
+        super(MagicDirectory, self).__init__(parent_inode)
+        self.inodes = inodes
+        self.api = api
+        self.num_retries = num_retries
+
+    def __setattr__(self, name, value):
+        super(MagicDirectory, self).__setattr__(name, value)
+        # When we're assigned an inode, add a README.
+        if ((name == 'inode') and (self.inode is not None) and
+              (not self._entries)):
+            self._entries['README'] = self.inodes.add_entry(
+                StringFile(self.inode, self.README_TEXT, time.time()))
+            # If we're the root directory, add an identical by_id subdirectory.
+            if self.inode == llfuse.ROOT_INODE:
+                self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
+                        self.inode, self.inodes, self.api, self.num_retries))
 
+    def __contains__(self, k):
         if k in self._entries:
             return True
 
@@ -418,7 +411,8 @@ will appear if it exists.
             return False
 
         try:
-            e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, self.api, k))
+            e = self.inodes.add_entry(CollectionDirectory(
+                    self.inode, self.inodes, self.api, self.num_retries, k))
             if e.update():
                 self._entries[k] = e
                 return True
@@ -428,10 +422,6 @@ will appear if it exists.
             _logger.debug('arv-mount exception keep %s', e)
             return False
 
-    def items(self):
-        self.create_readme()
-        return self._entries.items()
-
     def __getitem__(self, item):
         if item in self:
             return self._entries[item]
@@ -447,8 +437,8 @@ class RecursiveInvalidateDirectory(Directory):
             super(RecursiveInvalidateDirectory, self).invalidate()
             for a in self._entries:
                 self._entries[a].invalidate()
-        except Exception as e:
-            _logger.exception(e)
+        except Exception:
+            _logger.exception()
         finally:
             if self.inode == llfuse.ROOT_INODE:
                 llfuse.lock.release()
@@ -457,21 +447,25 @@ class RecursiveInvalidateDirectory(Directory):
 class TagsDirectory(RecursiveInvalidateDirectory):
     '''A special directory that contains as subdirectories all tags visible to the user.'''
 
-    def __init__(self, parent_inode, inodes, api, poll_time=60):
+    def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
         super(TagsDirectory, self).__init__(parent_inode)
         self.inodes = inodes
         self.api = api
+        self.num_retries = num_retries
         self._poll = True
         self._poll_time = poll_time
 
     def update(self):
         with llfuse.lock_released:
-            tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
+            tags = self.api.links().list(
+                filters=[['link_class', '=', 'tag']],
+                select=['name'], distinct=True
+                ).execute(num_retries=self.num_retries)
         if "items" in tags:
             self.merge(tags['items'],
-                       lambda i: i['name'] if 'name' in i else i['uuid'],
-                       lambda a, i: a.tag == i,
-                       lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
+                       lambda i: i['name'],
+                       lambda a, i: a.tag == i['name'],
+                       lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
 
 
 class TagDirectory(Directory):
@@ -479,45 +473,53 @@ class TagDirectory(Directory):
     to the user that are tagged with a particular tag.
     '''
 
-    def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
+    def __init__(self, parent_inode, inodes, api, num_retries, tag,
+                 poll=False, poll_time=60):
         super(TagDirectory, self).__init__(parent_inode)
         self.inodes = inodes
         self.api = api
+        self.num_retries = num_retries
         self.tag = tag
         self._poll = poll
         self._poll_time = poll_time
 
     def update(self):
         with llfuse.lock_released:
-            taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
-                                                   ['name', '=', self.tag],
-                                                   ['head_uuid', 'is_a', 'arvados#collection']],
-                                          select=['head_uuid']).execute()
+            taggedcollections = self.api.links().list(
+                filters=[['link_class', '=', 'tag'],
+                         ['name', '=', self.tag],
+                         ['head_uuid', 'is_a', 'arvados#collection']],
+                select=['head_uuid']
+                ).execute(num_retries=self.num_retries)
         self.merge(taggedcollections['items'],
                    lambda i: i['head_uuid'],
                    lambda a, i: a.collection_locator == i['head_uuid'],
-                   lambda i: CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid']))
+                   lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
 
 
 class ProjectDirectory(Directory):
     '''A special directory that contains the contents of a project.'''
 
-    def __init__(self, parent_inode, inodes, api, project_object, poll=False, poll_time=60):
+    def __init__(self, parent_inode, inodes, api, num_retries, project_object,
+                 poll=False, poll_time=60):
         super(ProjectDirectory, self).__init__(parent_inode)
         self.inodes = inodes
         self.api = api
+        self.num_retries = num_retries
         self.project_object = project_object
         self.project_object_file = None
         self.uuid = project_object['uuid']
+        self._poll = poll
+        self._poll_time = poll_time
 
     def createDirectory(self, i):
         if collection_uuid_pattern.match(i['uuid']):
-            return CollectionDirectory(self.inode, self.inodes, self.api, i)
+            return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
         elif group_uuid_pattern.match(i['uuid']):
-            return ProjectDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
+            return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
         elif link_uuid_pattern.match(i['uuid']):
             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
-                return CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid'])
+                return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
             else:
                 return None
         elif uuid_pattern.match(i['uuid']):
@@ -557,13 +559,19 @@ class ProjectDirectory(Directory):
 
         with llfuse.lock_released:
             if group_uuid_pattern.match(self.uuid):
-                self.project_object = self.api.groups().get(uuid=self.uuid).execute()
+                self.project_object = self.api.groups().get(
+                    uuid=self.uuid).execute(num_retries=self.num_retries)
             elif user_uuid_pattern.match(self.uuid):
-                self.project_object = self.api.users().get(uuid=self.uuid).execute()
+                self.project_object = self.api.users().get(
+                    uuid=self.uuid).execute(num_retries=self.num_retries)
 
-            contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid)
+            contents = arvados.util.list_all(self.api.groups().contents,
+                                             self.num_retries, uuid=self.uuid)
             # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
-            contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']])
+            contents += arvados.util.list_all(
+                self.api.links().list, self.num_retries,
+                filters=[['tail_uuid', '=', self.uuid],
+                         ['link_class', '=', 'name']])
 
         # end with llfuse.lock_released, re-acquire lock
 
@@ -589,17 +597,21 @@ class ProjectDirectory(Directory):
 class SharedDirectory(Directory):
     '''A special directory that represents users or groups who have shared projects with me.'''
 
-    def __init__(self, parent_inode, inodes, api, exclude, poll=False, poll_time=60):
+    def __init__(self, parent_inode, inodes, api, num_retries, exclude,
+                 poll=False, poll_time=60):
         super(SharedDirectory, self).__init__(parent_inode)
-        self.current_user = api.users().current().execute()
         self.inodes = inodes
         self.api = api
+        self.num_retries = num_retries
+        self.current_user = api.users().current().execute(num_retries=num_retries)
         self._poll = True
         self._poll_time = poll_time
 
     def update(self):
         with llfuse.lock_released:
-            all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']])
+            all_projects = arvados.util.list_all(
+                self.api.groups().list, self.num_retries,
+                filters=[['group_class','=','project']])
             objects = {}
             for ob in all_projects:
                 objects[ob['uuid']] = ob
@@ -611,8 +623,12 @@ class SharedDirectory(Directory):
                     roots.append(ob)
                     root_owners[ob['owner_uuid']] = True
 
-            lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]])
-            lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]])
+            lusers = arvados.util.list_all(
+                self.api.users().list, self.num_retries,
+                filters=[['uuid','in', list(root_owners)]])
+            lgroups = arvados.util.list_all(
+                self.api.groups().list, self.num_retries,
+                filters=[['uuid','in', list(root_owners)]])
 
             users = {}
             groups = {}
@@ -641,9 +657,9 @@ class SharedDirectory(Directory):
             self.merge(contents.items(),
                        lambda i: i[0],
                        lambda a, i: a.uuid == i[1]['uuid'],
-                       lambda i: ProjectDirectory(self.inode, self.inodes, self.api, i[1], poll=self._poll, poll_time=self._poll_time))
-        except Exception as e:
-            _logger.exception(e)
+                       lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
+        except Exception:
+            _logger.exception()
 
 
 class FileHandle(object):
@@ -661,7 +677,7 @@ class Inodes(object):
 
     def __init__(self):
         self._entries = {}
-        self._counter = llfuse.ROOT_INODE
+        self._counter = itertools.count(llfuse.ROOT_INODE)
 
     def __getitem__(self, item):
         return self._entries[item]
@@ -679,9 +695,8 @@ class Inodes(object):
         return k in self._entries
 
     def add_entry(self, entry):
-        entry.inode = self._counter
+        entry.inode = next(self._counter)
         self._entries[entry.inode] = entry
-        self._counter += 1
         return entry
 
     def del_entry(self, entry):
@@ -697,12 +712,13 @@ class Operations(llfuse.Operations):
     so request handlers do not run concurrently unless the lock is explicitly released
     using "with llfuse.lock_released:"'''
 
-    def __init__(self, uid, gid):
+    def __init__(self, uid, gid, encoding="utf-8"):
         super(Operations, self).__init__()
 
         self.inodes = Inodes()
         self.uid = uid
         self.gid = gid
+        self.encoding = encoding
 
         # dict of inode to filehandle
         self._filehandles = {}
@@ -735,6 +751,8 @@ class Operations(llfuse.Operations):
         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
         if isinstance(e, Directory):
             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
+        elif isinstance(e, StreamReaderFile):
+            entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
         else:
             entry.st_mode |= stat.S_IFREG
 
@@ -754,6 +772,7 @@ class Operations(llfuse.Operations):
         return entry
 
     def lookup(self, parent_inode, name):
+        name = unicode(name, self.encoding)
         _logger.debug("arv-mount lookup: parent_inode %i name %s",
                       parent_inode, name)
         inode = None
@@ -806,8 +825,8 @@ class Operations(llfuse.Operations):
         except arvados.errors.NotFoundError as e:
             _logger.warning("Block not found: " + str(e))
             raise llfuse.FUSEError(errno.EIO)
-        except Exception as e:
-            _logger.exception(e)
+        except Exception:
+            _logger.exception()
             raise llfuse.FUSEError(errno.EIO)
 
     def release(self, fh):
@@ -851,7 +870,10 @@ class Operations(llfuse.Operations):
         e = off
         while e < len(handle.entry):
             if handle.entry[e][1].inode in self.inodes:
-                yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
+                try:
+                    yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
+                except UnicodeEncodeError:
+                    pass
             e += 1
 
     def releasedir(self, fh):
@@ -879,5 +901,5 @@ class Operations(llfuse.Operations):
     # arv-mount.
     # The workaround is to implement it with the proper number of parameters,
     # and then everything works out.
-    def create(self, p1, p2, p3, p4, p5):
+    def create(self, inode_parent, name, mode, flags, ctx):
         raise llfuse.FUSEError(errno.EROFS)