#
# SPDX-License-Identifier: AGPL-3.0
-from __future__ import absolute_import
-from __future__ import division
-from future.utils import viewitems
-from future.utils import itervalues
-from builtins import dict
import apiclient
import arvados
import errno
def in_use(self):
if super(Directory, self).in_use():
return True
- for v in itervalues(self._entries):
+ for v in self._entries.values():
if v.in_use():
return True
return False
def has_ref(self, only_children):
if super(Directory, self).has_ref(only_children):
return True
- for v in itervalues(self._entries):
+ for v in self._entries.values():
if v.has_ref(False):
return True
return False
# Find self on the parent in order to invalidate this path.
# Calling the public items() method might trigger a refresh,
# which we definitely don't want, so read the internal dict directly.
- for k,v in viewitems(parent._entries):
+ for k,v in parent._entries.items():
if v is self:
self.inodes.invalidate_entry(parent, k)
break
def on_event(self, event, collection, name, item):
if collection == self.collection:
name = self.sanitize_filename(name)
- _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
- with llfuse.lock:
- if event == arvados.collection.ADD:
- self.new_entry(name, item, self.mtime())
- elif event == arvados.collection.DEL:
- ent = self._entries[name]
- del self._entries[name]
- self.inodes.invalidate_entry(self, name)
- self.inodes.del_entry(ent)
- elif event == arvados.collection.MOD:
- if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
- self.inodes.invalidate_inode(item.fuse_entry)
- elif name in self._entries:
- self.inodes.invalidate_inode(self._entries[name])
+
+ #
+ # It's possible for another thread to have llfuse.lock and
+ # be waiting on collection.lock. Meanwhile, we released
+ # llfuse.lock earlier in the stack, but are still holding
+ # on to the collection lock, and now we need to re-acquire
+ # llfuse.lock. If we don't release the collection lock,
+ # we'll deadlock where we're holding the collection lock
+ # waiting for llfuse.lock and the other thread is holding
+ # llfuse.lock and waiting for the collection lock.
+ #
+ # The correct locking order here is to take llfuse.lock
+ # first, then the collection lock.
+ #
+ # Since collection.lock is an RLock, it might be locked
+ # multiple times, so we need to release it multiple times,
+ # keep a count, then re-lock it the correct number of
+ # times.
+ #
+ lockcount = 0
+ try:
+ while True:
+ self.collection.lock.release()
+ lockcount += 1
+ except RuntimeError:
+ pass
+
+ try:
+ with llfuse.lock:
+ with self.collection.lock:
+ if event == arvados.collection.ADD:
+ self.new_entry(name, item, self.mtime())
+ elif event == arvados.collection.DEL:
+ ent = self._entries[name]
+ del self._entries[name]
+ self.inodes.invalidate_entry(self, name)
+ self.inodes.del_entry(ent)
+ elif event == arvados.collection.MOD:
+ if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
+ self.inodes.invalidate_inode(item.fuse_entry)
+ elif name in self._entries:
+ self.inodes.invalidate_inode(self._entries[name])
+ finally:
+ while lockcount > 0:
+ self.collection.lock.acquire()
+ lockcount -= 1
def populate(self, mtime):
self._mtime = mtime
- self.collection.subscribe(self.on_event)
- for entry, item in viewitems(self.collection):
- self.new_entry(entry, item, self.mtime())
+ with self.collection.lock:
+ self.collection.subscribe(self.on_event)
+ for entry, item in self.collection.items():
+ self.new_entry(entry, item, self.mtime())
def writable(self):
return self.collection.writable()
return
_logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
+ new_collection_record = None
if self.collection is not None:
if self.collection.known_past_version(to_record_version):
_logger.debug("%s already processed %s", self.collection_locator, to_record_version)
new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
if 'manifest_text' not in new_collection_record:
new_collection_record['manifest_text'] = coll_reader.manifest_text()
+ if 'storage_classes_desired' not in new_collection_record:
+ new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
- if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
- self.new_collection(new_collection_record, coll_reader)
-
- self._manifest_size = len(coll_reader.manifest_text())
- _logger.debug("%s manifest_size %i", self, self._manifest_size)
# end with llfuse.lock_released, re-acquire lock
+ if (new_collection_record is not None and
+ (self.collection_record is None or
+ self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"))):
+ self.new_collection(new_collection_record, coll_reader)
+ self._manifest_size = len(coll_reader.manifest_text())
+ _logger.debug("%s manifest_size %i", self, self._manifest_size)
self.fresh()
return True
def save_new(self):
pass
- def __init__(self, parent_inode, inodes, api_client, num_retries):
+ def __init__(self, parent_inode, inodes, api_client, num_retries, storage_classes=None):
collection = self.UnsaveableCollection(
api_client=api_client,
keep_client=api_client.keep,
- num_retries=num_retries)
+ num_retries=num_retries,
+ storage_classes_desired=storage_classes)
super(TmpCollectionDirectory, self).__init__(
parent_inode, inodes, api_client.config, collection)
self.collection_record_file = None
def on_event(self, *args, **kwargs):
super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
if self.collection_record_file:
- with llfuse.lock:
- self.collection_record_file.invalidate()
- self.inodes.invalidate_inode(self.collection_record_file)
- _logger.debug("%s invalidated collection record", self)
+
+ # See discussion in CollectionDirectoryBase.on_event
+ lockcount = 0
+ try:
+ while True:
+ self.collection.lock.release()
+ lockcount += 1
+ except RuntimeError:
+ pass
+
+ try:
+ with llfuse.lock:
+ with self.collection.lock:
+ self.collection_record_file.invalidate()
+ self.inodes.invalidate_inode(self.collection_record_file)
+ _logger.debug("%s invalidated collection record", self)
+ finally:
+ while lockcount > 0:
+ self.collection.lock.acquire()
+ lockcount -= 1
def collection_record(self):
with llfuse.lock_released:
"uuid": None,
"manifest_text": self.collection.manifest_text(),
"portable_data_hash": self.collection.portable_data_hash(),
+ "storage_classes_desired": self.collection.storage_classes_desired(),
}
def __contains__(self, k):
""".lstrip()
- def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
+ def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False, storage_classes=None):
super(MagicDirectory, self).__init__(parent_inode, inodes, api.config)
self.api = api
self.num_retries = num_retries
self.pdh_only = pdh_only
+ self.storage_classes = storage_classes
def __setattr__(self, name, value):
super(MagicDirectory, self).__setattr__(name, value)
if project[u'items_available'] == 0:
return False
e = self.inodes.add_entry(ProjectDirectory(
- self.inode, self.inodes, self.api, self.num_retries, project[u'items'][0]))
+ self.inode, self.inodes, self.api, self.num_retries,
+ project[u'items'][0], storage_classes=self.storage_classes))
else:
e = self.inodes.add_entry(CollectionDirectory(
self.inode, self.inodes, self.api, self.num_retries, k))
"""A special directory that contains the contents of a project."""
def __init__(self, parent_inode, inodes, api, num_retries, project_object,
- poll=True, poll_time=3):
+ poll=True, poll_time=3, storage_classes=None):
super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config)
self.api = api
self.num_retries = num_retries
self._updating_lock = threading.Lock()
self._current_user = None
self._full_listing = False
+ self.storage_classes = storage_classes
def want_event_subscribe(self):
return True
if collection_uuid_pattern.match(i['uuid']):
return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
elif group_uuid_pattern.match(i['uuid']):
- return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
+ return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time, self.storage_classes)
elif link_uuid_pattern.match(i['uuid']):
if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
def mkdir(self, name):
try:
with llfuse.lock_released:
- self.api.collections().create(body={"owner_uuid": self.project_uuid,
- "name": name,
- "manifest_text": ""}).execute(num_retries=self.num_retries)
+ c = {
+ "owner_uuid": self.project_uuid,
+ "name": name,
+ "manifest_text": "" }
+ if self.storage_classes is not None:
+ c["storage_classes_desired"] = self.storage_classes
+ try:
+ self.api.collections().create(body=c).execute(num_retries=self.num_retries)
+ except Exception as e:
+ raise
self.invalidate()
except apiclient_errors.Error as error:
_logger.error(error)
"""A special directory that represents users or groups who have shared projects with me."""
def __init__(self, parent_inode, inodes, api, num_retries, exclude,
- poll=False, poll_time=60):
+ poll=False, poll_time=60, storage_classes=None):
super(SharedDirectory, self).__init__(parent_inode, inodes, api.config)
self.api = api
self.num_retries = num_retries
self._poll = True
self._poll_time = poll_time
self._updating_lock = threading.Lock()
+ self.storage_classes = storage_classes
@use_counter
def update(self):
obr = objects[r]
if obr.get("name"):
contents[obr["name"]] = obr
- #elif obr.get("username"):
- # contents[obr["username"]] = obr
elif "first_name" in obr:
contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
# end with llfuse.lock_released, re-acquire lock
- self.merge(viewitems(contents),
+ self.merge(contents.items(),
lambda i: i[0],
lambda a, i: a.uuid() == i[1]['uuid'],
- lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
+ lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
except Exception:
_logger.exception("arv-mount shared dir error")
finally: