From 6b4405e36bb59ebd4714690eeb8518c4a2fa019b Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 4 Sep 2014 10:59:03 -0400 Subject: [PATCH] 3644: Add threadsafe arvados api access. arv-mount now releases llfuse global lock when performing network requests. --- services/fuse/arvados_fuse/__init__.py | 172 ++++++++++++++++--------- services/fuse/bin/arv-mount | 2 +- 2 files changed, 111 insertions(+), 63 deletions(-) diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py index 6d55b3481e..adf6186dd4 100644 --- a/services/fuse/arvados_fuse/__init__.py +++ b/services/fuse/arvados_fuse/__init__.py @@ -18,13 +18,45 @@ import json import logging import time import calendar +import threading _logger = logging.getLogger('arvados.arvados_fuse') +class SafeApi(object): + '''Threadsafe wrapper for API object. This stores and returns a different api + object per thread, because httplib2 which underlies apiclient is not + threadsafe. + ''' + + def __init__(self, config): + self.host = config.get('ARVADOS_API_HOST') + self.token = config.get('ARVADOS_API_TOKEN') + self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') + self.local = threading.local() + + def localapi(self): + if 'api' not in self.local.__dict__: + self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure) + return self.local.api + + def collections(self): + return self.localapi().collections() + + def links(self): + return self.localapi().links() + + def groups(self): + return self.localapi().groups() + + def users(self): + return self.localapi().users() + def convertTime(t): + '''Parse Arvados timestamp to unix time.''' return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ")) def sanitize_filename(dirty): + '''Remove troublesome characters from filenames.''' # http://www.dwheeler.com/essays/fixing-unix-linux-filenames.html if dirty is None: return None @@ -259,10 +291,12 @@ class CollectionDirectory(Directory): try: if self.collection_object is not None and re.match(r'^[a-f0-9]{32}', self.collection_locator): return True - #with llfuse.lock_released: - new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute() - if "portable_data_hash" not in new_collection_object: - new_collection_object["portable_data_hash"] = new_collection_object["uuid"] + + with llfuse.lock_released: + new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute() + if "portable_data_hash" not in new_collection_object: + new_collection_object["portable_data_hash"] = new_collection_object["uuid"] + # end with llfuse.lock_released, re-acquire lock if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]: self.collection_object = new_collection_object @@ -290,19 +324,30 @@ class CollectionDirectory(Directory): cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.ctime(), self.mtime())) self.fresh() return True + except apiclient.errors.HttpError as e: + import pprint + pprint.pprint(self.resp.status) + if self.resp.status == 404: + _logger.warn("arv-mount %s: not found", self.collection_locator) + else: + _logger.error("arv-mount %s: error", self.collection_locator) + _logger.exception(detail) + return False except Exception as detail: _logger.error("arv-mount %s: error", self.collection_locator) - _logger.exception(detail) + if "manifest_text" in self.collection_object: + _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"]) + _logger.exception(detail) return False def __getitem__(self, item): self.checkupdate() - if item == '.manifest_text': + if item == '.arvados#collection.manifest_text': if self.manifest_text_file is None: self.manifest_text_file = StringFile(self.inode, self.collection_object["manifest_text"], self.ctime(), self.mtime()) self.inodes.add_entry(self.manifest_text_file) return self.manifest_text_file - elif item == '.portable_data_hash': + elif item == '.arvados#collection.portable_data_hash': if self.pdh_file is None: self.pdh_file = StringFile(self.inode, self.collection_object["portable_data_hash"], self.ctime(), self.mtime()) self.inodes.add_entry(self.pdh_file) @@ -311,7 +356,7 @@ class CollectionDirectory(Directory): return super(CollectionDirectory, self).__getitem__(item) def __contains__(self, k): - if k in ('.manifest_text', '.portable_data_hash'): + if k in ('.arvados#collection.manifest_text', '.arvados#collection.portable_data_hash'): return True else: return super(CollectionDirectory, self).__contains__(k) @@ -387,7 +432,8 @@ class TagsDirectory(RecursiveInvalidateDirectory): self._poll_time = poll_time def update(self): - tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute() + with llfuse.lock_released: + tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute() if "items" in tags: self.merge(tags['items'], lambda i: i['name'] if 'name' in i else i['uuid'], @@ -408,10 +454,11 @@ class TagDirectory(Directory): self._poll_time = poll_time def update(self): - taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'], - ['name', '=', self.tag], - ['head_uuid', 'is_a', 'arvados#collection']], - select=['head_uuid']).execute() + with llfuse.lock_released: + taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'], + ['name', '=', self.tag], + ['head_uuid', 'is_a', 'arvados#collection']], + select=['head_uuid']).execute() self.merge(taggedcollections['items'], lambda i: i['head_uuid'], lambda a, i: a.collection_locator == i['head_uuid'], @@ -468,17 +515,17 @@ class ProjectDirectory(RecursiveInvalidateDirectory): return a.uuid == i['uuid'] and not a.stale() return False - #with llfuse.lock_released: - if re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', self.uuid): - self.project_object = self.api.groups().get(uuid=self.uuid).execute() - elif re.match(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}', self.uuid): - self.project_object = self.api.users().get(uuid=self.uuid).execute() + with llfuse.lock_released: + if re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', self.uuid): + self.project_object = self.api.groups().get(uuid=self.uuid).execute() + elif re.match(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}', self.uuid): + self.project_object = self.api.users().get(uuid=self.uuid).execute() - contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid) - # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use. - contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']]) - - #print contents + contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid) + # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use. + contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']]) + + # end with llfuse.lock_released, re-acquire lock self.merge(contents, namefn, @@ -509,44 +556,45 @@ class SharedDirectory(RecursiveInvalidateDirectory): self._poll_time = poll_time def update(self): - #with llfuse.lock_released: - all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']]) - objects = {} - for ob in all_projects: - objects[ob['uuid']] = ob - - roots = [] - root_owners = {} - for ob in all_projects: - if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects: - roots.append(ob) - root_owners[ob['owner_uuid']] = True - - #with llfuse.lock_released: - lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]]) - lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]]) - - users = {} - groups = {} - - for l in lusers: - objects[l["uuid"]] = l - for l in lgroups: - objects[l["uuid"]] = l - - contents = {} - for r in root_owners: - if r in objects: - obr = objects[r] - if "name" in obr: - contents[obr["name"]] = obr - if "first_name" in obr: - contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr - - for r in roots: - if r['owner_uuid'] not in objects: - contents[r['name']] = r - + with llfuse.lock_released: + all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']]) + objects = {} + for ob in all_projects: + objects[ob['uuid']] = ob + + roots = [] + root_owners = {} + for ob in all_projects: + if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects: + roots.append(ob) + root_owners[ob['owner_uuid']] = True + + lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]]) + lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]]) + + users = {} + groups = {} + + for l in lusers: + objects[l["uuid"]] = l + for l in lgroups: + objects[l["uuid"]] = l + + contents = {} + for r in root_owners: + if r in objects: + obr = objects[r] + if "name" in obr: + contents[obr["name"]] = obr + if "first_name" in obr: + contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr + + for r in roots: + if r['owner_uuid'] not in objects: + contents[r['name']] = r + + # end with llfuse.lock_released, re-acquire lock + try: self.merge(contents.items(), lambda i: i[0], @@ -605,7 +653,7 @@ class Operations(llfuse.Operations): llfuse has its own global lock which is acquired before calling a request handler, so request handlers do not run concurrently unless the lock is explicitly released - with llfuse.lock_released.''' + using "with llfuse.lock_released:"''' def __init__(self, uid, gid): super(Operations, self).__init__() diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount index 89acdd6af7..8fbb45eda0 100755 --- a/services/fuse/bin/arv-mount +++ b/services/fuse/bin/arv-mount @@ -86,7 +86,7 @@ with "--". try: # Create the request handler operations = Operations(os.getuid(), os.getgid()) - api = arvados.api('v1') + api = SafeApi(arvados.config) usr = api.users().current().execute() if args.by_hash: -- 2.30.2