#
# SPDX-License-Identifier: AGPL-3.0
-from __future__ import absolute_import
-from __future__ import division
-from future.utils import viewitems
-from future.utils import itervalues
-from builtins import dict
import apiclient
import arvados
import errno
def in_use(self):
if super(Directory, self).in_use():
return True
- for v in itervalues(self._entries):
+ for v in self._entries.values():
if v.in_use():
return True
return False
def has_ref(self, only_children):
if super(Directory, self).has_ref(only_children):
return True
- for v in itervalues(self._entries):
+ for v in self._entries.values():
if v.has_ref(False):
return True
return False
# Find self on the parent in order to invalidate this path.
# Calling the public items() method might trigger a refresh,
# which we definitely don't want, so read the internal dict directly.
- for k,v in viewitems(parent._entries):
+ for k,v in parent._entries.items():
if v is self:
self.inodes.invalidate_entry(parent, k)
break
def on_event(self, event, collection, name, item):
if collection == self.collection:
name = self.sanitize_filename(name)
- _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
- with llfuse.lock:
- if event == arvados.collection.ADD:
- self.new_entry(name, item, self.mtime())
- elif event == arvados.collection.DEL:
- ent = self._entries[name]
- del self._entries[name]
- self.inodes.invalidate_entry(self, name)
- self.inodes.del_entry(ent)
- elif event == arvados.collection.MOD:
- if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
- self.inodes.invalidate_inode(item.fuse_entry)
- elif name in self._entries:
- self.inodes.invalidate_inode(self._entries[name])
+
+ #
+ # It's possible for another thread to have llfuse.lock and
+ # be waiting on collection.lock. Meanwhile, we released
+ # llfuse.lock earlier in the stack, but are still holding
+ # on to the collection lock, and now we need to re-acquire
+ # llfuse.lock. If we don't release the collection lock,
+ # we'll deadlock where we're holding the collection lock
+ # waiting for llfuse.lock and the other thread is holding
+ # llfuse.lock and waiting for the collection lock.
+ #
+ # The correct locking order here is to take llfuse.lock
+ # first, then the collection lock.
+ #
+ # Since collection.lock is an RLock, it might be locked
+ # multiple times, so we need to release it multiple times,
+ # keep a count, then re-lock it the correct number of
+ # times.
+ #
+ lockcount = 0
+ try:
+ while True:
+ self.collection.lock.release()
+ lockcount += 1
+ except RuntimeError:
+ pass
+
+ try:
+ with llfuse.lock:
+ with self.collection.lock:
+ if event == arvados.collection.ADD:
+ self.new_entry(name, item, self.mtime())
+ elif event == arvados.collection.DEL:
+ ent = self._entries[name]
+ del self._entries[name]
+ self.inodes.invalidate_entry(self, name)
+ self.inodes.del_entry(ent)
+ elif event == arvados.collection.MOD:
+ if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
+ self.inodes.invalidate_inode(item.fuse_entry)
+ elif name in self._entries:
+ self.inodes.invalidate_inode(self._entries[name])
+ finally:
+ while lockcount > 0:
+ self.collection.lock.acquire()
+ lockcount -= 1
def populate(self, mtime):
self._mtime = mtime
- self.collection.subscribe(self.on_event)
- for entry, item in viewitems(self.collection):
- self.new_entry(entry, item, self.mtime())
+ with self.collection.lock:
+ self.collection.subscribe(self.on_event)
+ for entry, item in self.collection.items():
+ self.new_entry(entry, item, self.mtime())
def writable(self):
return self.collection.writable()
return
_logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
+ new_collection_record = None
if self.collection is not None:
if self.collection.known_past_version(to_record_version):
_logger.debug("%s already processed %s", self.collection_locator, to_record_version)
new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
if 'manifest_text' not in new_collection_record:
new_collection_record['manifest_text'] = coll_reader.manifest_text()
+ if 'storage_classes_desired' not in new_collection_record:
+ new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
- if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
- self.new_collection(new_collection_record, coll_reader)
-
- self._manifest_size = len(coll_reader.manifest_text())
- _logger.debug("%s manifest_size %i", self, self._manifest_size)
# end with llfuse.lock_released, re-acquire lock
+ if (new_collection_record is not None and
+ (self.collection_record is None or
+ self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"))):
+ self.new_collection(new_collection_record, coll_reader)
+ self._manifest_size = len(coll_reader.manifest_text())
+ _logger.debug("%s manifest_size %i", self, self._manifest_size)
self.fresh()
return True
def on_event(self, *args, **kwargs):
super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
if self.collection_record_file:
- with llfuse.lock:
- self.collection_record_file.invalidate()
- self.inodes.invalidate_inode(self.collection_record_file)
- _logger.debug("%s invalidated collection record", self)
+
+ # See discussion in CollectionDirectoryBase.on_event
+ lockcount = 0
+ try:
+ while True:
+ self.collection.lock.release()
+ lockcount += 1
+ except RuntimeError:
+ pass
+
+ try:
+ with llfuse.lock:
+ with self.collection.lock:
+ self.collection_record_file.invalidate()
+ self.inodes.invalidate_inode(self.collection_record_file)
+ _logger.debug("%s invalidated collection record", self)
+ finally:
+ while lockcount > 0:
+ self.collection.lock.acquire()
+ lockcount -= 1
def collection_record(self):
with llfuse.lock_released:
"uuid": None,
"manifest_text": self.collection.manifest_text(),
"portable_data_hash": self.collection.portable_data_hash(),
+ "storage_classes_desired": self.collection.storage_classes_desired(),
}
def __contains__(self, k):
# end with llfuse.lock_released, re-acquire lock
- self.merge(viewitems(contents),
+ self.merge(contents.items(),
lambda i: i[0],
lambda a, i: a.uuid() == i[1]['uuid'],
lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))