-class SafeApi(object):
- """Threadsafe wrapper for API object.
-
- This stores and returns a different api object per thread, because
- httplib2 which underlies apiclient is not threadsafe.
- """
-
- def __init__(self, config):
- self.host = config.get('ARVADOS_API_HOST')
- self.api_token = config.get('ARVADOS_API_TOKEN')
- self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
- self.local = threading.local()
- self.block_cache = arvados.KeepBlockCache()
-
- def localapi(self):
- if 'api' not in self.local.__dict__:
- self.local.api = arvados.api(
- version='v1',
- host=self.host, token=self.api_token, insecure=self.insecure)
- return self.local.api
-
- def localkeep(self):
- if 'keep' not in self.local.__dict__:
- self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
- return self.local.keep
-
- def __getattr__(self, name):
- # Proxy nonexistent attributes to the local API client.
- try:
- return getattr(self.localapi(), name)
- except AttributeError:
- return super(SafeApi, self).__getattr__(name)
-
-
-def convertTime(t):
- """Parse Arvados timestamp to unix time."""
- if not t:
- return 0
- try:
- return calendar.timegm(ciso8601.parse_datetime_unaware(t).timetuple())
- except (TypeError, ValueError):
- return 0
-
-def sanitize_filename(dirty):
- '''Replace disallowed filename characters with harmless "_".'''
- if dirty is None:
- return None
- elif dirty == '':
- return '_'
- elif dirty == '.':
- return '_'
- elif dirty == '..':
- return '__'
- else:
- return _disallowed_filename_characters.sub('_', dirty)
-
-
-class FreshBase(object):
- '''Base class for maintaining fresh/stale state to determine when to update.'''
- def __init__(self):
- self._stale = True
- self._poll = False
- self._last_update = time.time()
- self._atime = time.time()
- self._poll_time = 60
-
- # Mark the value as stale
- def invalidate(self):
- self._stale = True
-
- # Test if the entries dict is stale.
- def stale(self):
- if self._stale:
- return True
- if self._poll:
- return (self._last_update + self._poll_time) < self._atime
- return False
-
- def fresh(self):
- self._stale = False
- self._last_update = time.time()
-
- def atime(self):
- return self._atime
-
-class File(FreshBase):
- '''Base for file objects.'''
-
- def __init__(self, parent_inode, _mtime=0):
- super(File, self).__init__()
- self.inode = None
- self.parent_inode = parent_inode
- self._mtime = _mtime
-
- def size(self):
- return 0
-
- def readfrom(self, off, size):
- return ''
-
- def mtime(self):
- return self._mtime
-
-
-class StreamReaderFile(File):
- '''Wraps a StreamFileReader as a file.'''
-
- def __init__(self, parent_inode, reader, _mtime):
- super(StreamReaderFile, self).__init__(parent_inode, _mtime)
- self.reader = reader
-
- def size(self):
- return self.reader.size()
-
- def readfrom(self, off, size):
- return self.reader.readfrom(off, size)
-
- def stale(self):
- return False
-
-
-class StringFile(File):
- '''Wrap a simple string as a file'''
- def __init__(self, parent_inode, contents, _mtime):
- super(StringFile, self).__init__(parent_inode, _mtime)
- self.contents = contents
-
- def size(self):
- return len(self.contents)
-
- def readfrom(self, off, size):
- return self.contents[off:(off+size)]
-
-
-class ObjectFile(StringFile):
- '''Wrap a dict as a serialized json object.'''
-
- def __init__(self, parent_inode, obj):
- super(ObjectFile, self).__init__(parent_inode, "", 0)
- self.uuid = obj['uuid']
- self.update(obj)
-
- def update(self, obj):
- self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
- self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
-
-
-class Directory(FreshBase):
- '''Generic directory object, backed by a dict.
- Consists of a set of entries with the key representing the filename
- and the value referencing a File or Directory object.
- '''
-
- def __init__(self, parent_inode):
- super(Directory, self).__init__()
-
- '''parent_inode is the integer inode number'''
- self.inode = None
- if not isinstance(parent_inode, int):
- raise Exception("parent_inode should be an int")
- self.parent_inode = parent_inode
- self._entries = {}
- self._mtime = time.time()
-
- # Overriden by subclasses to implement logic to update the entries dict
- # when the directory is stale
- def update(self):
- pass
-
- # Only used when computing the size of the disk footprint of the directory
- # (stub)
- def size(self):
- return 0
-
- def checkupdate(self):
- if self.stale():
- try:
- self.update()
- except apiclient.errors.HttpError as e:
- _logger.debug(e)
-
- def __getitem__(self, item):
- self.checkupdate()
- return self._entries[item]
-
- def items(self):
- self.checkupdate()
- return self._entries.items()
-
- def __iter__(self):
- self.checkupdate()
- return self._entries.iterkeys()
-
- def __contains__(self, k):
- self.checkupdate()
- return k in self._entries
-
- def merge(self, items, fn, same, new_entry):
- '''Helper method for updating the contents of the directory. Takes a list
- describing the new contents of the directory, reuse entries that are
- the same in both the old and new lists, create new entries, and delete
- old entries missing from the new list.
-
- items: iterable with new directory contents
-
- fn: function to take an entry in 'items' and return the desired file or
- directory name, or None if this entry should be skipped
-
- same: function to compare an existing entry (a File or Directory
- object) with an entry in the items list to determine whether to keep
- the existing entry.
-
- new_entry: function to create a new directory entry (File or Directory
- object) from an entry in the items list.
-
- '''
-
- oldentries = self._entries
- self._entries = {}
- changed = False
- for i in items:
- name = sanitize_filename(fn(i))
- if name:
- if name in oldentries and same(oldentries[name], i):
- # move existing directory entry over
- self._entries[name] = oldentries[name]
- del oldentries[name]
- else:
- # create new directory entry
- ent = new_entry(i)
- if ent is not None:
- self._entries[name] = self.inodes.add_entry(ent)
- changed = True
-
- # delete any other directory entries that were not in found in 'items'
- for i in oldentries:
- llfuse.invalidate_entry(self.inode, str(i))
- self.inodes.del_entry(oldentries[i])
- changed = True
-
- if changed:
- self._mtime = time.time()
-
- self.fresh()
-
- def clear(self):
- '''Delete all entries'''
- oldentries = self._entries
- self._entries = {}
- for n in oldentries:
- if isinstance(n, Directory):
- n.clear()
- llfuse.invalidate_entry(self.inode, str(n))
- self.inodes.del_entry(oldentries[n])
- llfuse.invalidate_inode(self.inode)
- self.invalidate()
-
- def mtime(self):
- return self._mtime
-
-
-class CollectionDirectory(Directory):
- '''Represents the root of a directory tree holding a collection.'''
-
- def __init__(self, parent_inode, inodes, api, num_retries, collection):
- super(CollectionDirectory, self).__init__(parent_inode)
- self.inodes = inodes
- self.api = api
- self.num_retries = num_retries
- self.collection_object_file = None
- self.collection_object = None
- if isinstance(collection, dict):
- self.collection_locator = collection['uuid']
- self._mtime = convertTime(collection.get('modified_at'))
- else:
- self.collection_locator = collection
- self._mtime = 0
-
- def same(self, i):
- return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
-
- # Used by arv-web.py to switch the contents of the CollectionDirectory
- def change_collection(self, new_locator):
- """Switch the contents of the CollectionDirectory. Must be called with llfuse.lock held."""
- self.collection_locator = new_locator
- self.collection_object = None
- self.update()
-
- def new_collection(self, new_collection_object, coll_reader):
- self.collection_object = new_collection_object
-
- self._mtime = convertTime(self.collection_object.get('modified_at'))
-
- if self.collection_object_file is not None:
- self.collection_object_file.update(self.collection_object)
-
- self.clear()
- for s in coll_reader.all_streams():
- cwd = self
- for part in s.name().split('/'):
- if part != '' and part != '.':
- partname = sanitize_filename(part)
- if partname not in cwd._entries:
- cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
- cwd = cwd._entries[partname]
- for k, v in s.files().items():
- cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
-
- def update(self):
- try:
- if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
- return True
-
- if self.collection_locator is None:
- self.fresh()
- return True
-
- with llfuse.lock_released:
- coll_reader = arvados.CollectionReader(
- self.collection_locator, self.api, self.api.localkeep(),
- num_retries=self.num_retries)
- new_collection_object = coll_reader.api_response() or {}
- # If the Collection only exists in Keep, there will be no API
- # response. Fill in the fields we need.
- if 'uuid' not in new_collection_object:
- new_collection_object['uuid'] = self.collection_locator
- if "portable_data_hash" not in new_collection_object:
- new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
- if 'manifest_text' not in new_collection_object:
- new_collection_object['manifest_text'] = coll_reader.manifest_text()
- coll_reader.normalize()
- # end with llfuse.lock_released, re-acquire lock
-
- if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
- self.new_collection(new_collection_object, coll_reader)
-
- self.fresh()
- return True
- except arvados.errors.NotFoundError:
- _logger.exception("arv-mount %s: error", self.collection_locator)
- except arvados.errors.ArgumentError as detail:
- _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
- if self.collection_object is not None and "manifest_text" in self.collection_object:
- _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
- except Exception:
- _logger.exception("arv-mount %s: error", self.collection_locator)
- if self.collection_object is not None and "manifest_text" in self.collection_object:
- _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
- return False
-
- def __getitem__(self, item):
- self.checkupdate()
- if item == '.arvados#collection':
- if self.collection_object_file is None:
- self.collection_object_file = ObjectFile(self.inode, self.collection_object)
- self.inodes.add_entry(self.collection_object_file)
- return self.collection_object_file
- else:
- return super(CollectionDirectory, self).__getitem__(item)
-
- def __contains__(self, k):
- if k == '.arvados#collection':
- return True
- else:
- return super(CollectionDirectory, self).__contains__(k)
-
-
-class MagicDirectory(Directory):
- '''A special directory that logically contains the set of all extant keep
- locators. When a file is referenced by lookup(), it is tested to see if it
- is a valid keep locator to a manifest, and if so, loads the manifest
- contents as a subdirectory of this directory with the locator as the
- directory name. Since querying a list of all extant keep locators is
- impractical, only collections that have already been accessed are visible
- to readdir().
- '''
-
- README_TEXT = '''
-This directory provides access to Arvados collections as subdirectories listed
-by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
-the form '1234567890abcdefghijklmnopqrstuv+123').
-
-Note that this directory will appear empty until you attempt to access a
-specific collection subdirectory (such as trying to 'cd' into it), at which
-point the collection will actually be looked up on the server and the directory
-will appear if it exists.
-'''.lstrip()
-
- def __init__(self, parent_inode, inodes, api, num_retries):
- super(MagicDirectory, self).__init__(parent_inode)
- self.inodes = inodes
- self.api = api
- self.num_retries = num_retries