import time
import calendar
import threading
+from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
_logger = logging.getLogger('arvados.arvados_fuse')
-portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
-uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
-collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
-group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
-user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
-link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
-
class SafeApi(object):
'''Threadsafe wrapper for API object. This stores and returns a different api
object per thread, because httplib2 which underlies apiclient is not
def __init__(self, config):
self.host = config.get('ARVADOS_API_HOST')
- self.token = config.get('ARVADOS_API_TOKEN')
+ self.api_token = config.get('ARVADOS_API_TOKEN')
self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
self.local = threading.local()
+ self.block_cache = arvados.KeepBlockCache()
def localapi(self):
if 'api' not in self.local.__dict__:
- self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure)
+ self.local.api = arvados.api('v1', False, self.host,
+ self.api_token, self.insecure)
return self.local.api
- def collections(self):
- return self.localapi().collections()
-
- def links(self):
- return self.localapi().links()
+ def localkeep(self):
+ if 'keep' not in self.local.__dict__:
+ self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
+ return self.local.keep
- def groups(self):
- return self.localapi().groups()
+ def __getattr__(self, name):
+ # Proxy nonexistent attributes to the local API client.
+ try:
+ return getattr(self.localapi(), name)
+ except AttributeError:
+ return super(SafeApi, self).__getattr__(name)
- def users(self):
- return self.localapi().users()
def convertTime(t):
'''Parse Arvados timestamp to unix time.'''
class CollectionDirectory(Directory):
'''Represents the root of a directory tree holding a collection.'''
- def __init__(self, parent_inode, inodes, api, collection):
+ def __init__(self, parent_inode, inodes, api, num_retries, collection):
super(CollectionDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
self.collection_object_file = None
self.collection_object = None
if isinstance(collection, dict):
self.collection_object_file.update(self.collection_object)
self.clear()
- collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api)
+ collection = arvados.CollectionReader(
+ self.collection_object["manifest_text"], self.api,
+ self.api.localkeep(), num_retries=self.num_retries)
for s in collection.all_streams():
cwd = self
for part in s.name().split('/'):
return True
with llfuse.lock_released:
- new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute()
+ new_collection_object = self.api.collections().get(
+ uuid=self.collection_locator
+ ).execute(num_retries=self.num_retries)
if "portable_data_hash" not in new_collection_object:
new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
# end with llfuse.lock_released, re-acquire lock
else:
_logger.error("arv-mount %s: error", self.collection_locator)
_logger.exception(detail)
+ except arvados.errors.ArgumentError as detail:
+ _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
+ if self.collection_object is not None and "manifest_text" in self.collection_object:
+ _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
except Exception as detail:
_logger.error("arv-mount %s: error", self.collection_locator)
if self.collection_object is not None and "manifest_text" in self.collection_object:
to readdir().
'''
- def __init__(self, parent_inode, inodes, api):
+ def __init__(self, parent_inode, inodes, api, num_retries):
super(MagicDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
# Have to defer creating readme_file because at this point we don't
# yet have an inode assigned.
self.readme_file = None
return False
try:
- e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, self.api, k))
+ e = self.inodes.add_entry(CollectionDirectory(
+ self.inode, self.inodes, self.api, self.num_retries, k))
if e.update():
self._entries[k] = e
return True
class TagsDirectory(RecursiveInvalidateDirectory):
'''A special directory that contains as subdirectories all tags visible to the user.'''
- def __init__(self, parent_inode, inodes, api, poll_time=60):
+ def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
super(TagsDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
self._poll = True
self._poll_time = poll_time
def update(self):
with llfuse.lock_released:
- tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
+ tags = self.api.links().list(
+ filters=[['link_class', '=', 'tag']],
+ select=['name'], distinct=True
+ ).execute(num_retries=self.num_retries)
if "items" in tags:
self.merge(tags['items'],
lambda i: i['name'] if 'name' in i else i['uuid'],
lambda a, i: a.tag == i,
- lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
+ lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
class TagDirectory(Directory):
to the user that are tagged with a particular tag.
'''
- def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
+ def __init__(self, parent_inode, inodes, api, num_retries, tag,
+ poll=False, poll_time=60):
super(TagDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
self.tag = tag
self._poll = poll
self._poll_time = poll_time
def update(self):
with llfuse.lock_released:
- taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
- ['name', '=', self.tag],
- ['head_uuid', 'is_a', 'arvados#collection']],
- select=['head_uuid']).execute()
+ taggedcollections = self.api.links().list(
+ filters=[['link_class', '=', 'tag'],
+ ['name', '=', self.tag],
+ ['head_uuid', 'is_a', 'arvados#collection']],
+ select=['head_uuid']
+ ).execute(num_retries=self.num_retries)
self.merge(taggedcollections['items'],
lambda i: i['head_uuid'],
lambda a, i: a.collection_locator == i['head_uuid'],
- lambda i: CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid']))
+ lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
class ProjectDirectory(Directory):
'''A special directory that contains the contents of a project.'''
- def __init__(self, parent_inode, inodes, api, project_object, poll=False, poll_time=60):
+ def __init__(self, parent_inode, inodes, api, num_retries, project_object,
+ poll=False, poll_time=60):
super(ProjectDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
self.project_object = project_object
self.project_object_file = None
self.uuid = project_object['uuid']
def createDirectory(self, i):
if collection_uuid_pattern.match(i['uuid']):
- return CollectionDirectory(self.inode, self.inodes, self.api, i)
+ return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
elif group_uuid_pattern.match(i['uuid']):
- return ProjectDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
+ return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
elif link_uuid_pattern.match(i['uuid']):
if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
- return CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid'])
+ return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
else:
return None
elif uuid_pattern.match(i['uuid']):
with llfuse.lock_released:
if group_uuid_pattern.match(self.uuid):
- self.project_object = self.api.groups().get(uuid=self.uuid).execute()
+ self.project_object = self.api.groups().get(
+ uuid=self.uuid).execute(num_retries=self.num_retries)
elif user_uuid_pattern.match(self.uuid):
- self.project_object = self.api.users().get(uuid=self.uuid).execute()
+ self.project_object = self.api.users().get(
+ uuid=self.uuid).execute(num_retries=self.num_retries)
- contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid)
+ contents = arvados.util.list_all(self.api.groups().contents,
+ self.num_retries, uuid=self.uuid)
# Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
- contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']])
+ contents += arvados.util.list_all(
+ self.api.links().list, self.num_retries,
+ filters=[['tail_uuid', '=', self.uuid],
+ ['link_class', '=', 'name']])
# end with llfuse.lock_released, re-acquire lock
class SharedDirectory(Directory):
'''A special directory that represents users or groups who have shared projects with me.'''
- def __init__(self, parent_inode, inodes, api, exclude, poll=False, poll_time=60):
+ def __init__(self, parent_inode, inodes, api, num_retries, exclude,
+ poll=False, poll_time=60):
super(SharedDirectory, self).__init__(parent_inode)
- self.current_user = api.users().current().execute()
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
+ self.current_user = api.users().current().execute(num_retries=num_retries)
self._poll = True
self._poll_time = poll_time
def update(self):
with llfuse.lock_released:
- all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']])
+ all_projects = arvados.util.list_all(
+ self.api.groups().list, self.num_retries,
+ filters=[['group_class','=','project']])
objects = {}
for ob in all_projects:
objects[ob['uuid']] = ob
roots.append(ob)
root_owners[ob['owner_uuid']] = True
- lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]])
- lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]])
+ lusers = arvados.util.list_all(
+ self.api.users().list, self.num_retries,
+ filters=[['uuid','in', list(root_owners)]])
+ lgroups = arvados.util.list_all(
+ self.api.groups().list, self.num_retries,
+ filters=[['uuid','in', list(root_owners)]])
users = {}
groups = {}
self.merge(contents.items(),
lambda i: i[0],
lambda a, i: a.uuid == i[1]['uuid'],
- lambda i: ProjectDirectory(self.inode, self.inodes, self.api, i[1], poll=self._poll, poll_time=self._poll_time))
+ lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
except Exception as e:
_logger.exception(e)
so request handlers do not run concurrently unless the lock is explicitly released
using "with llfuse.lock_released:"'''
- def __init__(self, uid, gid):
+ def __init__(self, uid, gid, encoding="utf-8"):
super(Operations, self).__init__()
self.inodes = Inodes()
self.uid = uid
self.gid = gid
+ self.encoding = encoding
# dict of inode to filehandle
self._filehandles = {}
return entry
def lookup(self, parent_inode, name):
+ name = unicode(name, self.encoding)
_logger.debug("arv-mount lookup: parent_inode %i name %s",
parent_inode, name)
inode = None
try:
with llfuse.lock_released:
return handle.entry.readfrom(off, size)
- except:
+ except arvados.errors.NotFoundError as e:
+ _logger.warning("Block not found: " + str(e))
+ raise llfuse.FUSEError(errno.EIO)
+ except Exception as e:
+ _logger.exception(e)
raise llfuse.FUSEError(errno.EIO)
def release(self, fh):
e = off
while e < len(handle.entry):
if handle.entry[e][1].inode in self.inodes:
- yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
+ try:
+ yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
+ except UnicodeEncodeError:
+ pass
e += 1
def releasedir(self, fh):