-class SafeApi(object):
- """Threadsafe wrapper for API object.
-
- This stores and returns a different api object per thread, because
- httplib2 which underlies apiclient is not threadsafe.
- """
-
- def __init__(self, config):
- self.host = config.get('ARVADOS_API_HOST')
- self.api_token = config.get('ARVADOS_API_TOKEN')
- self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
- self.local = threading.local()
- self.block_cache = arvados.KeepBlockCache()
-
- def localapi(self):
- if 'api' not in self.local.__dict__:
- self.local.api = arvados.api(
- version='v1',
- host=self.host, token=self.api_token, insecure=self.insecure)
- return self.local.api
-
- def localkeep(self):
- if 'keep' not in self.local.__dict__:
- self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
- return self.local.keep
-
- def __getattr__(self, name):
- # Proxy nonexistent attributes to the local API client.
- try:
- return getattr(self.localapi(), name)
- except AttributeError:
- return super(SafeApi, self).__getattr__(name)
-
-
-def convertTime(t):
- """Parse Arvados timestamp to unix time."""
- if not t:
- return 0
- try:
- return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
- except (TypeError, ValueError):
- return 0
-
-def sanitize_filename(dirty):
- '''Replace disallowed filename characters with harmless "_".'''
- if dirty is None:
- return None
- elif dirty == '':
- return '_'
- elif dirty == '.':
- return '_'
- elif dirty == '..':
- return '__'
- else:
- return _disallowed_filename_characters.sub('_', dirty)
-
-
-class FreshBase(object):
- '''Base class for maintaining fresh/stale state to determine when to update.'''
- def __init__(self):
- self._stale = True
- self._poll = False
- self._last_update = time.time()
- self._atime = time.time()
- self._poll_time = 60
-
- # Mark the value as stale
- def invalidate(self):
- self._stale = True
-
- # Test if the entries dict is stale.
- def stale(self):
- if self._stale:
- return True
- if self._poll:
- return (self._last_update + self._poll_time) < self._atime
- return False
-
- def fresh(self):
- self._stale = False
- self._last_update = time.time()
-
- def atime(self):
- return self._atime
-
-class File(FreshBase):
- '''Base for file objects.'''
-
- def __init__(self, parent_inode, _mtime=0):
- super(File, self).__init__()
- self.inode = None
- self.parent_inode = parent_inode
- self._mtime = _mtime
-
- def size(self):
- return 0
-
- def readfrom(self, off, size):
- return ''
-
- def mtime(self):
- return self._mtime
-
-
-class StreamReaderFile(File):
- '''Wraps a StreamFileReader as a file.'''
-
- def __init__(self, parent_inode, reader, _mtime):
- super(StreamReaderFile, self).__init__(parent_inode, _mtime)
- self.reader = reader
-
- def size(self):
- return self.reader.size()
-
- def readfrom(self, off, size):
- return self.reader.readfrom(off, size)
-
- def stale(self):
- return False
-
-
-class StringFile(File):
- '''Wrap a simple string as a file'''
- def __init__(self, parent_inode, contents, _mtime):
- super(StringFile, self).__init__(parent_inode, _mtime)
- self.contents = contents
-
- def size(self):
- return len(self.contents)
-
- def readfrom(self, off, size):
- return self.contents[off:(off+size)]
-
-
-class ObjectFile(StringFile):
- '''Wrap a dict as a serialized json object.'''
-
- def __init__(self, parent_inode, obj):
- super(ObjectFile, self).__init__(parent_inode, "", 0)
- self.uuid = obj['uuid']
- self.update(obj)
-
- def update(self, obj):
- self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
- self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
-
-
-class Directory(FreshBase):
- '''Generic directory object, backed by a dict.
- Consists of a set of entries with the key representing the filename
- and the value referencing a File or Directory object.
- '''
-
- def __init__(self, parent_inode):
- super(Directory, self).__init__()
-
- '''parent_inode is the integer inode number'''
- self.inode = None
- if not isinstance(parent_inode, int):
- raise Exception("parent_inode should be an int")
- self.parent_inode = parent_inode
- self._entries = {}
- self._mtime = time.time()
-
- # Overriden by subclasses to implement logic to update the entries dict
- # when the directory is stale
- def update(self):
- pass
-
- # Only used when computing the size of the disk footprint of the directory
- # (stub)
- def size(self):
- return 0
-
- def checkupdate(self):
- if self.stale():
- try:
- self.update()
- except apiclient.errors.HttpError as e:
- _logger.debug(e)
-
- def __getitem__(self, item):
- self.checkupdate()
- return self._entries[item]
-
- def items(self):
- self.checkupdate()
- return self._entries.items()
-
- def __iter__(self):
- self.checkupdate()
- return self._entries.iterkeys()
-
- def __contains__(self, k):
- self.checkupdate()
- return k in self._entries
-
- def merge(self, items, fn, same, new_entry):
- '''Helper method for updating the contents of the directory. Takes a list
- describing the new contents of the directory, reuse entries that are
- the same in both the old and new lists, create new entries, and delete
- old entries missing from the new list.
-
- items: iterable with new directory contents
-
- fn: function to take an entry in 'items' and return the desired file or
- directory name, or None if this entry should be skipped
-
- same: function to compare an existing entry (a File or Directory
- object) with an entry in the items list to determine whether to keep
- the existing entry.
-
- new_entry: function to create a new directory entry (File or Directory
- object) from an entry in the items list.
-
- '''
-
- oldentries = self._entries
- self._entries = {}
- changed = False
- for i in items:
- name = sanitize_filename(fn(i))
- if name:
- if name in oldentries and same(oldentries[name], i):
- # move existing directory entry over
- self._entries[name] = oldentries[name]
- del oldentries[name]
- else:
- # create new directory entry
- ent = new_entry(i)
- if ent is not None:
- self._entries[name] = self.inodes.add_entry(ent)
- changed = True
-
- # delete any other directory entries that were not in found in 'items'
- for i in oldentries:
- llfuse.invalidate_entry(self.inode, str(i))
- self.inodes.del_entry(oldentries[i])
- changed = True
-
- if changed:
- self._mtime = time.time()
-
- self.fresh()
-
- def clear(self):
- '''Delete all entries'''
- oldentries = self._entries
- self._entries = {}
- for n in oldentries:
- if isinstance(n, Directory):
- n.clear()
- llfuse.invalidate_entry(self.inode, str(n))
- self.inodes.del_entry(oldentries[n])
- llfuse.invalidate_inode(self.inode)
- self.invalidate()
-
- def mtime(self):
- return self._mtime
-
-
-class CollectionDirectory(Directory):
- '''Represents the root of a directory tree holding a collection.'''
-
- def __init__(self, parent_inode, inodes, api, num_retries, collection):
- super(CollectionDirectory, self).__init__(parent_inode)
- self.inodes = inodes
- self.api = api
- self.num_retries = num_retries
- self.collection_object_file = None
- self.collection_object = None
- if isinstance(collection, dict):
- self.collection_locator = collection['uuid']
- self._mtime = convertTime(collection.get('modified_at'))
- else:
- self.collection_locator = collection
-
- def same(self, i):
- return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
-
- # Used by arv-web.py to switch the contents of the CollectionDirectory
- def change_collection(self, new_locator):
- """Switch the contents of the CollectionDirectory. Must be called with llfuse.lock held."""
- self.collection_locator = new_locator
- self.collection_object = None
- self.update()
-
- def new_collection(self, new_collection_object, coll_reader):
- self.collection_object = new_collection_object
-
- self._mtime = convertTime(self.collection_object.get('modified_at'))
-
- if self.collection_object_file is not None:
- self.collection_object_file.update(self.collection_object)
-
- self.clear()
- for s in coll_reader.all_streams():
- cwd = self
- for part in s.name().split('/'):
- if part != '' and part != '.':
- partname = sanitize_filename(part)
- if partname not in cwd._entries:
- cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
- cwd = cwd._entries[partname]
- for k, v in s.files().items():
- cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
-
- def update(self):
- try:
- if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
- return True