X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a46ef7496b83b2778de8db36e4948b55dddf3754..c340eecc7a03dd066792e5f046f087b8b3dfced6:/services/fuse/arvados_fuse/fusedir.py diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index db5020cfef..2b963d9a68 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -298,20 +298,52 @@ class CollectionDirectoryBase(Directory): def on_event(self, event, collection, name, item): if collection == self.collection: name = self.sanitize_filename(name) - _logger.debug("collection notify %s %s %s %s", event, collection, name, item) - with llfuse.lock: - if event == arvados.collection.ADD: - self.new_entry(name, item, self.mtime()) - elif event == arvados.collection.DEL: - ent = self._entries[name] - del self._entries[name] - self.inodes.invalidate_entry(self, name) - self.inodes.del_entry(ent) - elif event == arvados.collection.MOD: - if hasattr(item, "fuse_entry") and item.fuse_entry is not None: - self.inodes.invalidate_inode(item.fuse_entry) - elif name in self._entries: - self.inodes.invalidate_inode(self._entries[name]) + + # + # It's possible for another thread to have llfuse.lock and + # be waiting on collection.lock. Meanwhile, we released + # llfuse.lock earlier in the stack, but are still holding + # on to the collection lock, and now we need to re-acquire + # llfuse.lock. If we don't release the collection lock, + # we'll deadlock where we're holding the collection lock + # waiting for llfuse.lock and the other thread is holding + # llfuse.lock and waiting for the collection lock. + # + # The correct locking order here is to take llfuse.lock + # first, then the collection lock. + # + # Since collection.lock is an RLock, it might be locked + # multiple times, so we need to release it multiple times, + # keep a count, then re-lock it the correct number of + # times. + # + lockcount = 0 + try: + while True: + self.collection.lock.release() + lockcount += 1 + except RuntimeError: + pass + + try: + with llfuse.lock: + with self.collection.lock: + if event == arvados.collection.ADD: + self.new_entry(name, item, self.mtime()) + elif event == arvados.collection.DEL: + ent = self._entries[name] + del self._entries[name] + self.inodes.invalidate_entry(self, name) + self.inodes.del_entry(ent) + elif event == arvados.collection.MOD: + if hasattr(item, "fuse_entry") and item.fuse_entry is not None: + self.inodes.invalidate_inode(item.fuse_entry) + elif name in self._entries: + self.inodes.invalidate_inode(self._entries[name]) + finally: + while lockcount > 0: + self.collection.lock.acquire() + lockcount -= 1 def populate(self, mtime): self._mtime = mtime @@ -487,6 +519,8 @@ class CollectionDirectory(CollectionDirectoryBase): new_collection_record["portable_data_hash"] = new_collection_record["uuid"] if 'manifest_text' not in new_collection_record: new_collection_record['manifest_text'] = coll_reader.manifest_text() + if 'storage_classes_desired' not in new_collection_record: + new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired() if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"): self.new_collection(new_collection_record, coll_reader) @@ -571,11 +605,12 @@ class TmpCollectionDirectory(CollectionDirectoryBase): def save_new(self): pass - def __init__(self, parent_inode, inodes, api_client, num_retries): + def __init__(self, parent_inode, inodes, api_client, num_retries, storage_classes=None): collection = self.UnsaveableCollection( api_client=api_client, keep_client=api_client.keep, - num_retries=num_retries) + num_retries=num_retries, + storage_classes_desired=storage_classes) super(TmpCollectionDirectory, self).__init__( parent_inode, inodes, api_client.config, collection) self.collection_record_file = None @@ -584,10 +619,26 @@ class TmpCollectionDirectory(CollectionDirectoryBase): def on_event(self, *args, **kwargs): super(TmpCollectionDirectory, self).on_event(*args, **kwargs) if self.collection_record_file: - with llfuse.lock: - self.collection_record_file.invalidate() - self.inodes.invalidate_inode(self.collection_record_file) - _logger.debug("%s invalidated collection record", self) + + # See discussion in CollectionDirectoryBase.on_event + lockcount = 0 + try: + while True: + self.collection.lock.release() + lockcount += 1 + except RuntimeError: + pass + + try: + with llfuse.lock: + with self.collection.lock: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) + _logger.debug("%s invalidated collection record", self) + finally: + while lockcount > 0: + self.collection.lock.acquire() + lockcount -= 1 def collection_record(self): with llfuse.lock_released: @@ -595,6 +646,7 @@ class TmpCollectionDirectory(CollectionDirectoryBase): "uuid": None, "manifest_text": self.collection.manifest_text(), "portable_data_hash": self.collection.portable_data_hash(), + "storage_classes_desired": self.collection.storage_classes_desired(), } def __contains__(self, k): @@ -653,11 +705,12 @@ and the directory will appear if it exists. """.lstrip() - def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False): + def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False, storage_classes=None): super(MagicDirectory, self).__init__(parent_inode, inodes, api.config) self.api = api self.num_retries = num_retries self.pdh_only = pdh_only + self.storage_classes = storage_classes def __setattr__(self, name, value): super(MagicDirectory, self).__setattr__(name, value) @@ -683,11 +736,12 @@ and the directory will appear if it exists. if group_uuid_pattern.match(k): project = self.api.groups().list( - filters=[['group_class', '=', 'project'], ["uuid", "=", k]]).execute(num_retries=self.num_retries) + filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries) if project[u'items_available'] == 0: return False e = self.inodes.add_entry(ProjectDirectory( - self.inode, self.inodes, self.api, self.num_retries, project[u'items'][0])) + self.inode, self.inodes, self.api, self.num_retries, + project[u'items'][0], storage_classes=self.storage_classes)) else: e = self.inodes.add_entry(CollectionDirectory( self.inode, self.inodes, self.api, self.num_retries, k)) @@ -811,7 +865,7 @@ class ProjectDirectory(Directory): """A special directory that contains the contents of a project.""" def __init__(self, parent_inode, inodes, api, num_retries, project_object, - poll=False, poll_time=60): + poll=True, poll_time=3, storage_classes=None): super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config) self.api = api self.num_retries = num_retries @@ -823,6 +877,7 @@ class ProjectDirectory(Directory): self._updating_lock = threading.Lock() self._current_user = None self._full_listing = False + self.storage_classes = storage_classes def want_event_subscribe(self): return True @@ -831,7 +886,7 @@ class ProjectDirectory(Directory): if collection_uuid_pattern.match(i['uuid']): return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i) elif group_uuid_pattern.match(i['uuid']): - return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time) + return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time, self.storage_classes) elif link_uuid_pattern.match(i['uuid']): if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']): return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']) @@ -894,14 +949,18 @@ class ProjectDirectory(Directory): elif user_uuid_pattern.match(self.project_uuid): self.project_object = self.api.users().get( uuid=self.project_uuid).execute(num_retries=self.num_retries) - - contents = arvados.util.list_all(self.api.groups().list, - self.num_retries, - filters=[["owner_uuid", "=", self.project_uuid], - ["group_class", "=", "project"]]) - contents.extend(arvados.util.list_all(self.api.collections().list, - self.num_retries, - filters=[["owner_uuid", "=", self.project_uuid]])) + # do this in 2 steps until #17424 is fixed + contents = list(arvados.util.keyset_list_all(self.api.groups().contents, + order_key="uuid", + num_retries=self.num_retries, + uuid=self.project_uuid, + filters=[["uuid", "is_a", "arvados#group"], + ["groups.group_class", "in", ["project","filter"]]])) + contents.extend(arvados.util.keyset_list_all(self.api.groups().contents, + order_key="uuid", + num_retries=self.num_retries, + uuid=self.project_uuid, + filters=[["uuid", "is_a", "arvados#collection"]])) # end with llfuse.lock_released, re-acquire lock @@ -932,7 +991,7 @@ class ProjectDirectory(Directory): else: namefilter = ["name", "in", [k, k2]] contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid], - ["group_class", "=", "project"], + ["group_class", "in", ["project","filter"]], namefilter], limit=2).execute(num_retries=self.num_retries)["items"] if not contents: @@ -978,9 +1037,16 @@ class ProjectDirectory(Directory): def mkdir(self, name): try: with llfuse.lock_released: - self.api.collections().create(body={"owner_uuid": self.project_uuid, - "name": name, - "manifest_text": ""}).execute(num_retries=self.num_retries) + c = { + "owner_uuid": self.project_uuid, + "name": name, + "manifest_text": "" } + if self.storage_classes is not None: + c["storage_classes_desired"] = self.storage_classes + try: + self.api.collections().create(body=c).execute(num_retries=self.num_retries) + except Exception as e: + raise self.invalidate() except apiclient_errors.Error as error: _logger.error(error) @@ -1075,7 +1141,7 @@ class SharedDirectory(Directory): """A special directory that represents users or groups who have shared projects with me.""" def __init__(self, parent_inode, inodes, api, num_retries, exclude, - poll=False, poll_time=60): + poll=False, poll_time=60, storage_classes=None): super(SharedDirectory, self).__init__(parent_inode, inodes, api.config) self.api = api self.num_retries = num_retries @@ -1083,6 +1149,7 @@ class SharedDirectory(Directory): self._poll = True self._poll_time = poll_time self._updating_lock = threading.Lock() + self.storage_classes = storage_classes @use_counter def update(self): @@ -1101,7 +1168,7 @@ class SharedDirectory(Directory): if 'httpMethod' in methods.get('shared', {}): page = [] while True: - resp = self.api.groups().shared(filters=[['group_class', '=', 'project']]+page, + resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page, order="uuid", limit=10000, count="none", @@ -1116,10 +1183,12 @@ class SharedDirectory(Directory): objects[r["uuid"]] = r root_owners.add(r["uuid"]) else: - all_projects = arvados.util.list_all( - self.api.groups().list, self.num_retries, - filters=[['group_class','=','project']], - select=["uuid", "owner_uuid"]) + all_projects = list(arvados.util.keyset_list_all( + self.api.groups().list, + order_key="uuid", + num_retries=self.num_retries, + filters=[['group_class','in',['project','filter']]], + select=["uuid", "owner_uuid"])) for ob in all_projects: objects[ob['uuid']] = ob @@ -1129,11 +1198,15 @@ class SharedDirectory(Directory): roots.append(ob['uuid']) root_owners.add(ob['owner_uuid']) - lusers = arvados.util.list_all( - self.api.users().list, self.num_retries, + lusers = arvados.util.keyset_list_all( + self.api.users().list, + order_key="uuid", + num_retries=self.num_retries, filters=[['uuid','in', list(root_owners)]]) - lgroups = arvados.util.list_all( - self.api.groups().list, self.num_retries, + lgroups = arvados.util.keyset_list_all( + self.api.groups().list, + order_key="uuid", + num_retries=self.num_retries, filters=[['uuid','in', list(root_owners)+roots]]) for l in lusers: @@ -1146,8 +1219,6 @@ class SharedDirectory(Directory): obr = objects[r] if obr.get("name"): contents[obr["name"]] = obr - #elif obr.get("username"): - # contents[obr["username"]] = obr elif "first_name" in obr: contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr @@ -1162,7 +1233,7 @@ class SharedDirectory(Directory): self.merge(viewitems(contents), lambda i: i[0], lambda a, i: a.uuid() == i[1]['uuid'], - lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time)) + lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes)) except Exception: _logger.exception("arv-mount shared dir error") finally: