import threading
from apiclient import errors as apiclient_errors
import errno
+import time
-from fusefile import StringFile, ObjectFile, FuseArvadosFile
+from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
from fresh import FreshBase, convertTime, use_counter, check_update
import arvados.collection
# appear as underscores in the fuse mount.)
_disallowed_filename_characters = re.compile('[\x00/]')
+# '.' and '..' are not reachable if API server is newer than #6277
def sanitize_filename(dirty):
"""Replace disallowed filename characters with harmless "_"."""
if dirty is None:
# delete any other directory entries that were not in found in 'items'
for i in oldentries:
- _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode)
- llfuse.invalidate_entry(self.inode, str(i))
+ _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
+ self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
self.inodes.del_entry(oldentries[i])
changed = True
if changed:
- llfuse.invalidate_inode(self.inode)
+ self.inodes.invalidate_inode(self.inode)
self._mtime = time.time()
self.fresh()
- def clear(self, force=False):
- """Delete all entries"""
+ def in_use(self):
+ if super(Directory, self).in_use():
+ return True
+ for v in self._entries.itervalues():
+ if v.in_use():
+ return True
+ return False
- if not self.in_use() or force:
- oldentries = self._entries
- self._entries = {}
- for n in oldentries:
- if not oldentries[n].clear(force):
- self._entries = oldentries
- return False
- for n in oldentries:
- llfuse.invalidate_entry(self.inode, str(n))
- self.inodes.del_entry(oldentries[n])
- llfuse.invalidate_inode(self.inode)
- self.invalidate()
+ def has_ref(self, only_children):
+ if super(Directory, self).has_ref(only_children):
return True
- else:
- return False
+ for v in self._entries.itervalues():
+ if v.has_ref(False):
+ return True
+ return False
+
+ def clear(self):
+ """Delete all entries"""
+ oldentries = self._entries
+ self._entries = {}
+ for n in oldentries:
+ oldentries[n].clear()
+ self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
+ self.inodes.del_entry(oldentries[n])
+ self.inodes.invalidate_inode(self.inode)
+ self.invalidate()
+
+ def kernel_invalidate(self):
+ for n, e in self._entries.iteritems():
+ self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
+ e.kernel_invalidate()
+ self.inodes.invalidate_inode(self.inode)
def mtime(self):
return self._mtime
def flush(self):
pass
+ def want_event_subscribe(self):
+ raise NotImplementedError()
+
def create(self, name):
raise NotImplementedError()
def on_event(self, event, collection, name, item):
if collection == self.collection:
- _logger.debug("%s %s %s %s", event, collection, name, item)
+ name = sanitize_filename(name)
+ _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
with llfuse.lock:
if event == arvados.collection.ADD:
self.new_entry(name, item, self.mtime())
elif event == arvados.collection.DEL:
ent = self._entries[name]
del self._entries[name]
- llfuse.invalidate_entry(self.inode, name)
+ self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
self.inodes.del_entry(ent)
elif event == arvados.collection.MOD:
if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
- llfuse.invalidate_inode(item.fuse_entry.inode)
+ self.inodes.invalidate_inode(item.fuse_entry.inode)
elif name in self._entries:
- llfuse.invalidate_inode(self._entries[name].inode)
+ self.inodes.invalidate_inode(self._entries[name].inode)
def populate(self, mtime):
self._mtime = mtime
self.flush()
src.flush()
+ def clear(self):
+ super(CollectionDirectoryBase, self).clear()
+ self.collection = None
+
class CollectionDirectory(CollectionDirectoryBase):
"""Represents the root of a directory tree representing a collection."""
self.num_retries = num_retries
self.collection_record_file = None
self.collection_record = None
+ self._poll = True
+ try:
+ self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
+ except:
+ _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
+ self._poll_time = 60*60
+
if isinstance(collection_record, dict):
self.collection_locator = collection_record['uuid']
self._mtime = convertTime(collection_record.get('modified_at'))
def writable(self):
return self.collection.writable() if self.collection is not None else self._writable
+ def want_event_subscribe(self):
+ return (uuid_pattern.match(self.collection_locator) is not None)
+
# Used by arv-web.py to switch the contents of the CollectionDirectory
def change_collection(self, new_locator):
"""Switch the contents of the CollectionDirectory.
def new_collection(self, new_collection_record, coll_reader):
if self.inode:
- self.clear(force=True)
+ self.clear()
self.collection_record = new_collection_record
return self.collection_locator
@use_counter
- def update(self, to_pdh=None):
+ def update(self, to_record_version=None):
try:
if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
return True
if not self.stale():
return
- _logger.debug("Updating %s", self.collection_locator)
- if self.collection:
- if self.collection.portable_data_hash() == to_pdh:
- _logger.debug("%s is fresh at pdh '%s'", self.collection_locator, to_pdh)
+ _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
+ if self.collection is not None:
+ if self.collection.known_past_version(to_record_version):
+ _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
else:
self.collection.update()
else:
return True
finally:
self._updating_lock.release()
- except arvados.errors.NotFoundError:
- _logger.exception("arv-mount %s: error", self.collection_locator)
+ except arvados.errors.NotFoundError as e:
+ _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
except arvados.errors.ArgumentError as detail:
_logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
if self.collection_record is not None and "manifest_text" in self.collection_record:
_logger.exception("arv-mount %s: error", self.collection_locator)
if self.collection_record is not None and "manifest_text" in self.collection_record:
_logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
+ self.invalidate()
return False
@use_counter
# footprint directly would be more accurate, but also more complicated.
return self._manifest_size * 128
+ def finalize(self):
+ if self.collection is not None:
+ if self.writable():
+ self.collection.save()
+ self.collection.stop_threads()
+
+ def clear(self):
+ super(CollectionDirectory, self).clear()
+ self._manifest_size = 0
+
+
+class TmpCollectionDirectory(CollectionDirectoryBase):
+ """A directory backed by an Arvados collection that never gets saved.
+
+ This supports using Keep as scratch space. A userspace program can
+ read the .arvados#collection file to get a current manifest in
+ order to save a snapshot of the scratch data or use it as a crunch
+ job output.
+ """
+
+ class UnsaveableCollection(arvados.collection.Collection):
+ def save(self):
+ pass
+ def save_new(self):
+ pass
+
+ def __init__(self, parent_inode, inodes, api_client, num_retries):
+ collection = self.UnsaveableCollection(
+ api_client=api_client,
+ keep_client=api_client.keep,
+ num_retries=num_retries)
+ super(TmpCollectionDirectory, self).__init__(
+ parent_inode, inodes, collection)
+ self.collection_record_file = None
+ self.populate(self.mtime())
+
+ def on_event(self, *args, **kwargs):
+ super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
+ if self.collection_record_file:
+ with llfuse.lock:
+ self.collection_record_file.invalidate()
+ self.inodes.invalidate_inode(self.collection_record_file.inode)
+ _logger.debug("%s invalidated collection record", self)
+
+ def collection_record(self):
+ with llfuse.lock_released:
+ return {
+ "uuid": None,
+ "manifest_text": self.collection.manifest_text(),
+ "portable_data_hash": self.collection.portable_data_hash(),
+ }
+
+ def __contains__(self, k):
+ return (k == '.arvados#collection' or
+ super(TmpCollectionDirectory, self).__contains__(k))
+
+ @use_counter
+ def __getitem__(self, item):
+ if item == '.arvados#collection':
+ if self.collection_record_file is None:
+ self.collection_record_file = FuncToJSONFile(
+ self.inode, self.collection_record)
+ self.inodes.add_entry(self.collection_record_file)
+ return self.collection_record_file
+ return super(TmpCollectionDirectory, self).__getitem__(item)
+
+ def persisted(self):
+ return False
+
+ def writable(self):
+ return True
+
+ def want_event_subscribe(self):
+ return False
+
+ def finalize(self):
+ self.collection.stop_threads()
+
+ def invalidate(self):
+ if self.collection_record_file:
+ self.collection_record_file.invalidate()
+ super(TmpCollectionDirectory, self).invalidate()
+
class MagicDirectory(Directory):
"""A special directory that logically contains the set of all extant keep locators.
README_TEXT = """
This directory provides access to Arvados collections as subdirectories listed
by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
-the form '1234567890abcdefghijklmnopqrstuv+123').
+the form '1234567890abcdef0123456789abcdef+123').
Note that this directory will appear empty until you attempt to access a
specific collection subdirectory (such as trying to 'cd' into it), at which
point the collection will actually be looked up on the server and the directory
will appear if it exists.
+
""".lstrip()
- def __init__(self, parent_inode, inodes, api, num_retries):
+ def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
super(MagicDirectory, self).__init__(parent_inode, inodes)
self.api = api
self.num_retries = num_retries
+ self.pdh_only = pdh_only
def __setattr__(self, name, value):
super(MagicDirectory, self).__setattr__(name, value)
# If we're the root directory, add an identical by_id subdirectory.
if self.inode == llfuse.ROOT_INODE:
self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
- self.inode, self.inodes, self.api, self.num_retries))
+ self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
def __contains__(self, k):
if k in self._entries:
return True
- if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
+ if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
return False
try:
self.inode, self.inodes, self.api, self.num_retries, k))
if e.update():
- self._entries[k] = e
+ if k not in self._entries:
+ self._entries[k] = e
+ else:
+ self.inodes.del_entry(e)
return True
else:
+ self.inodes.invalidate_entry(self.inode, k)
+ self.inodes.del_entry(e)
return False
- except Exception as e:
- _logger.debug('arv-mount exception keep %s', e)
+ except Exception as ex:
+ _logger.debug('arv-mount exception keep %s', ex)
+ self.inodes.del_entry(e)
return False
def __getitem__(self, item):
else:
raise KeyError("No collection with id " + item)
- def clear(self, force=False):
+ def clear(self):
pass
-
-class RecursiveInvalidateDirectory(Directory):
- def invalidate(self):
- try:
- super(RecursiveInvalidateDirectory, self).invalidate()
- for a in self._entries:
- self._entries[a].invalidate()
- except Exception:
- _logger.exception()
+ def want_event_subscribe(self):
+ return not self.pdh_only
-class TagsDirectory(RecursiveInvalidateDirectory):
+class TagsDirectory(Directory):
"""A special directory that contains as subdirectories all tags visible to the user."""
def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
self._poll = True
self._poll_time = poll_time
+ def want_event_subscribe(self):
+ return True
+
@use_counter
def update(self):
with llfuse.lock_released:
self._poll = poll
self._poll_time = poll_time
+ def want_event_subscribe(self):
+ return True
+
@use_counter
def update(self):
with llfuse.lock_released:
self._updating_lock = threading.Lock()
self._current_user = None
+ def want_event_subscribe(self):
+ return True
+
def createDirectory(self, i):
if collection_uuid_pattern.match(i['uuid']):
return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
# Acually move the entry from source directory to this directory.
del src._entries[name_old]
self._entries[name_new] = ent
- llfuse.invalidate_entry(src.inode, name_old)
+ self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
class SharedDirectory(Directory):
lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
except Exception:
_logger.exception()
+
+ def want_event_subscribe(self):
+ return True