#
# SPDX-License-Identifier: AGPL-3.0
-import logging
-import re
-import time
-import llfuse
-import arvados
+from __future__ import absolute_import
+from __future__ import division
+from future.utils import viewitems
+from future.utils import itervalues
+from builtins import dict
import apiclient
+import arvados
+import errno
import functools
+import llfuse
+import logging
+import re
+import sys
import threading
-from apiclient import errors as apiclient_errors
-import errno
import time
+from apiclient import errors as apiclient_errors
-from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
-from fresh import FreshBase, convertTime, use_counter, check_update
+from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
+from .fresh import FreshBase, convertTime, use_counter, check_update
import arvados.collection
from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
# appear as underscores in the fuse mount.)
_disallowed_filename_characters = re.compile('[\x00/]')
-# '.' and '..' are not reachable if API server is newer than #6277
-def sanitize_filename(dirty):
- """Replace disallowed filename characters with harmless "_"."""
- if dirty is None:
- return None
- elif dirty == '':
- return '_'
- elif dirty == '.':
- return '_'
- elif dirty == '..':
- return '__'
- else:
- return _disallowed_filename_characters.sub('_', dirty)
-
class Directory(FreshBase):
"""Generic directory object, backed by a dict.
and the value referencing a File or Directory object.
"""
- def __init__(self, parent_inode, inodes):
+ def __init__(self, parent_inode, inodes, apiconfig):
"""parent_inode is the integer inode number"""
super(Directory, self).__init__()
raise Exception("parent_inode should be an int")
self.parent_inode = parent_inode
self.inodes = inodes
+ self.apiconfig = apiconfig
self._entries = {}
self._mtime = time.time()
- # Overriden by subclasses to implement logic to update the entries dict
- # when the directory is stale
+ def forward_slash_subst(self):
+ if not hasattr(self, '_fsns'):
+ self._fsns = None
+ config = self.apiconfig()
+ try:
+ self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
+ except KeyError:
+ # old API server with no FSNS config
+ self._fsns = '_'
+ else:
+ if self._fsns == '' or self._fsns == '/':
+ self._fsns = None
+ return self._fsns
+
+ def unsanitize_filename(self, incoming):
+ """Replace ForwardSlashNameSubstitution value with /"""
+ fsns = self.forward_slash_subst()
+ if isinstance(fsns, str):
+ return incoming.replace(fsns, '/')
+ else:
+ return incoming
+
+ def sanitize_filename(self, dirty):
+ """Replace disallowed filename characters according to
+ ForwardSlashNameSubstitution in self.api_config."""
+ # '.' and '..' are not reachable if API server is newer than #6277
+ if dirty is None:
+ return None
+ elif dirty == '':
+ return '_'
+ elif dirty == '.':
+ return '_'
+ elif dirty == '..':
+ return '__'
+ else:
+ fsns = self.forward_slash_subst()
+ if isinstance(fsns, str):
+ dirty = dirty.replace('/', fsns)
+ return _disallowed_filename_characters.sub('_', dirty)
+
+
+ # Overridden by subclasses to implement logic to update the
+ # entries dict when the directory is stale
@use_counter
def update(self):
pass
self._entries = {}
changed = False
for i in items:
- name = sanitize_filename(fn(i))
+ name = self.sanitize_filename(fn(i))
if name:
if name in oldentries and same(oldentries[name], i):
# move existing directory entry over
def in_use(self):
if super(Directory, self).in_use():
return True
- for v in self._entries.itervalues():
+ for v in itervalues(self._entries):
if v.in_use():
return True
return False
def has_ref(self, only_children):
if super(Directory, self).has_ref(only_children):
return True
- for v in self._entries.itervalues():
+ for v in itervalues(self._entries):
if v.has_ref(False):
return True
return False
# Find self on the parent in order to invalidate this path.
# Calling the public items() method might trigger a refresh,
# which we definitely don't want, so read the internal dict directly.
- for k,v in parent._entries.items():
+ for k,v in viewitems(parent._entries):
if v is self:
self.inodes.invalidate_entry(parent, k)
break
"""
- def __init__(self, parent_inode, inodes, collection):
- super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
+ def __init__(self, parent_inode, inodes, apiconfig, collection):
+ super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig)
+ self.apiconfig = apiconfig
self.collection = collection
def new_entry(self, name, item, mtime):
- name = sanitize_filename(name)
+ name = self.sanitize_filename(name)
if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
if item.fuse_entry.dead is not True:
raise Exception("Can only reparent dead inode entry")
item.fuse_entry.dead = False
self._entries[name] = item.fuse_entry
elif isinstance(item, arvados.collection.RichCollectionBase):
- self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
+ self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, item))
self._entries[name].populate(mtime)
else:
self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
def on_event(self, event, collection, name, item):
if collection == self.collection:
- name = sanitize_filename(name)
+ name = self.sanitize_filename(name)
_logger.debug("collection notify %s %s %s %s", event, collection, name, item)
with llfuse.lock:
if event == arvados.collection.ADD:
def populate(self, mtime):
self._mtime = mtime
self.collection.subscribe(self.on_event)
- for entry, item in self.collection.items():
+ for entry, item in viewitems(self.collection):
self.new_entry(entry, item, self.mtime())
def writable(self):
"""Represents the root of a directory tree representing a collection."""
def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
- super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
+ super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, None)
self.api = api
self.num_retries = num_retries
self.collection_record_file = None
self.collection_record = None
self._poll = True
try:
- self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
+ self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
except:
_logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
self._poll_time = 60*60
new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
if 'manifest_text' not in new_collection_record:
new_collection_record['manifest_text'] = coll_reader.manifest_text()
+ if 'storage_classes_desired' not in new_collection_record:
+ new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
self.new_collection(new_collection_record, coll_reader)
def save_new(self):
pass
- def __init__(self, parent_inode, inodes, api_client, num_retries):
+ def __init__(self, parent_inode, inodes, api_client, num_retries, storage_classes=None):
collection = self.UnsaveableCollection(
api_client=api_client,
keep_client=api_client.keep,
- num_retries=num_retries)
+ num_retries=num_retries,
+ storage_classes_desired=storage_classes)
super(TmpCollectionDirectory, self).__init__(
- parent_inode, inodes, collection)
+ parent_inode, inodes, api_client.config, collection)
self.collection_record_file = None
self.populate(self.mtime())
"uuid": None,
"manifest_text": self.collection.manifest_text(),
"portable_data_hash": self.collection.portable_data_hash(),
+ "storage_classes_desired": self.collection.storage_classes_desired(),
}
def __contains__(self, k):
""".lstrip()
- def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
- super(MagicDirectory, self).__init__(parent_inode, inodes)
+ def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False, storage_classes=None):
+ super(MagicDirectory, self).__init__(parent_inode, inodes, api.config)
self.api = api
self.num_retries = num_retries
self.pdh_only = pdh_only
+ self.storage_classes = storage_classes
def __setattr__(self, name, value):
super(MagicDirectory, self).__setattr__(name, value)
e = None
if group_uuid_pattern.match(k):
- project_object = self.api.groups().get(
- uuid=k).execute(num_retries=self.num_retries)
- if project_object[u'group_class'] != "project":
+ project = self.api.groups().list(
+ filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
+ if project[u'items_available'] == 0:
return False
e = self.inodes.add_entry(ProjectDirectory(
- self.inode, self.inodes, self.api, self.num_retries, project_object))
+ self.inode, self.inodes, self.api, self.num_retries,
+ project[u'items'][0], storage_classes=self.storage_classes))
else:
e = self.inodes.add_entry(CollectionDirectory(
self.inode, self.inodes, self.api, self.num_retries, k))
"""A special directory that contains as subdirectories all tags visible to the user."""
def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
- super(TagsDirectory, self).__init__(parent_inode, inodes)
+ super(TagsDirectory, self).__init__(parent_inode, inodes, api.config)
self.api = api
self.num_retries = num_retries
self._poll = True
def __init__(self, parent_inode, inodes, api, num_retries, tag,
poll=False, poll_time=60):
- super(TagDirectory, self).__init__(parent_inode, inodes)
+ super(TagDirectory, self).__init__(parent_inode, inodes, api.config)
self.api = api
self.num_retries = num_retries
self.tag = tag
"""A special directory that contains the contents of a project."""
def __init__(self, parent_inode, inodes, api, num_retries, project_object,
- poll=False, poll_time=60):
- super(ProjectDirectory, self).__init__(parent_inode, inodes)
+ poll=True, poll_time=3, storage_classes=None):
+ super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config)
self.api = api
self.num_retries = num_retries
self.project_object = project_object
self._updating_lock = threading.Lock()
self._current_user = None
self._full_listing = False
+ self.storage_classes = storage_classes
def want_event_subscribe(self):
return True
if collection_uuid_pattern.match(i['uuid']):
return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
elif group_uuid_pattern.match(i['uuid']):
- return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
+ return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time, self.storage_classes)
elif link_uuid_pattern.match(i['uuid']):
if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
elif user_uuid_pattern.match(self.project_uuid):
self.project_object = self.api.users().get(
uuid=self.project_uuid).execute(num_retries=self.num_retries)
-
- contents = arvados.util.list_all(self.api.groups().list,
- self.num_retries,
- filters=[["owner_uuid", "=", self.project_uuid],
- ["group_class", "=", "project"]])
- contents.extend(arvados.util.list_all(self.api.collections().list,
- self.num_retries,
- filters=[["owner_uuid", "=", self.project_uuid]]))
+ # do this in 2 steps until #17424 is fixed
+ contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
+ order_key="uuid",
+ num_retries=self.num_retries,
+ uuid=self.project_uuid,
+ filters=[["uuid", "is_a", "arvados#group"],
+ ["groups.group_class", "in", ["project","filter"]]]))
+ contents.extend(arvados.util.keyset_list_all(self.api.groups().contents,
+ order_key="uuid",
+ num_retries=self.num_retries,
+ uuid=self.project_uuid,
+ filters=[["uuid", "is_a", "arvados#collection"]]))
# end with llfuse.lock_released, re-acquire lock
elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
return super(ProjectDirectory, self).__getitem__(k)
with llfuse.lock_released:
+ k2 = self.unsanitize_filename(k)
+ if k2 == k:
+ namefilter = ["name", "=", k]
+ else:
+ namefilter = ["name", "in", [k, k2]]
contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
- ["group_class", "=", "project"],
- ["name", "=", k]],
- limit=1).execute(num_retries=self.num_retries)["items"]
+ ["group_class", "in", ["project","filter"]],
+ namefilter],
+ limit=2).execute(num_retries=self.num_retries)["items"]
if not contents:
contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
- ["name", "=", k]],
- limit=1).execute(num_retries=self.num_retries)["items"]
+ namefilter],
+ limit=2).execute(num_retries=self.num_retries)["items"]
if contents:
- name = sanitize_filename(self.namefn(contents[0]))
+ if len(contents) > 1 and contents[1]['name'] == k:
+ # If "foo/bar" and "foo[SUBST]bar" both exist, use
+ # "foo[SUBST]bar".
+ contents = [contents[1]]
+ name = self.sanitize_filename(self.namefn(contents[0]))
if name != k:
raise KeyError(k)
return self._add_entry(contents[0], name)
with llfuse.lock_released:
if not self._current_user:
self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
- return self._current_user["uuid"] in self.project_object["writable_by"]
+ return self._current_user["uuid"] in self.project_object.get("writable_by", [])
def persisted(self):
return True
def mkdir(self, name):
try:
with llfuse.lock_released:
- self.api.collections().create(body={"owner_uuid": self.project_uuid,
- "name": name,
- "manifest_text": ""}).execute(num_retries=self.num_retries)
+ c = {
+ "owner_uuid": self.project_uuid,
+ "name": name,
+ "manifest_text": "" }
+ if self.storage_classes is not None:
+ c["storage_classes_desired"] = self.storage_classes
+ try:
+ self.api.collections().create(body=c).execute(num_retries=self.num_retries)
+ except Exception as e:
+ raise
self.invalidate()
except apiclient_errors.Error as error:
_logger.error(error)
new_attrs = properties.get("new_attributes") or {}
old_attrs["uuid"] = ev["object_uuid"]
new_attrs["uuid"] = ev["object_uuid"]
- old_name = sanitize_filename(self.namefn(old_attrs))
- new_name = sanitize_filename(self.namefn(new_attrs))
+ old_name = self.sanitize_filename(self.namefn(old_attrs))
+ new_name = self.sanitize_filename(self.namefn(new_attrs))
# create events will have a new name, but not an old name
# delete events will have an old name, but not a new name
"""A special directory that represents users or groups who have shared projects with me."""
def __init__(self, parent_inode, inodes, api, num_retries, exclude,
- poll=False, poll_time=60):
- super(SharedDirectory, self).__init__(parent_inode, inodes)
+ poll=False, poll_time=60, storage_classes=None):
+ super(SharedDirectory, self).__init__(parent_inode, inodes, api.config)
self.api = api
self.num_retries = num_retries
self.current_user = api.users().current().execute(num_retries=num_retries)
self._poll = True
self._poll_time = poll_time
self._updating_lock = threading.Lock()
+ self.storage_classes = storage_classes
@use_counter
def update(self):
if not self.stale():
return
- all_projects = arvados.util.list_all(
- self.api.groups().list, self.num_retries,
- filters=[['group_class','=','project']],
- select=["uuid", "owner_uuid"])
- objects = {}
- for ob in all_projects:
- objects[ob['uuid']] = ob
-
+ contents = {}
roots = []
root_owners = set()
- current_uuid = self.current_user['uuid']
- for ob in all_projects:
- if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
- roots.append(ob['uuid'])
- root_owners.add(ob['owner_uuid'])
-
- lusers = arvados.util.list_all(
- self.api.users().list, self.num_retries,
- filters=[['uuid','in', list(root_owners)]])
- lgroups = arvados.util.list_all(
- self.api.groups().list, self.num_retries,
- filters=[['uuid','in', list(root_owners)+roots]])
-
- for l in lusers:
- objects[l["uuid"]] = l
- for l in lgroups:
- objects[l["uuid"]] = l
+ objects = {}
+
+ methods = self.api._rootDesc.get('resources')["groups"]['methods']
+ if 'httpMethod' in methods.get('shared', {}):
+ page = []
+ while True:
+ resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
+ order="uuid",
+ limit=10000,
+ count="none",
+ include="owner_uuid").execute()
+ if not resp["items"]:
+ break
+ page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
+ for r in resp["items"]:
+ objects[r["uuid"]] = r
+ roots.append(r["uuid"])
+ for r in resp["included"]:
+ objects[r["uuid"]] = r
+ root_owners.add(r["uuid"])
+ else:
+ all_projects = list(arvados.util.keyset_list_all(
+ self.api.groups().list,
+ order_key="uuid",
+ num_retries=self.num_retries,
+ filters=[['group_class','in',['project','filter']]],
+ select=["uuid", "owner_uuid"]))
+ for ob in all_projects:
+ objects[ob['uuid']] = ob
+
+ current_uuid = self.current_user['uuid']
+ for ob in all_projects:
+ if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
+ roots.append(ob['uuid'])
+ root_owners.add(ob['owner_uuid'])
+
+ lusers = arvados.util.keyset_list_all(
+ self.api.users().list,
+ order_key="uuid",
+ num_retries=self.num_retries,
+ filters=[['uuid','in', list(root_owners)]])
+ lgroups = arvados.util.keyset_list_all(
+ self.api.groups().list,
+ order_key="uuid",
+ num_retries=self.num_retries,
+ filters=[['uuid','in', list(root_owners)+roots]])
+
+ for l in lusers:
+ objects[l["uuid"]] = l
+ for l in lgroups:
+ objects[l["uuid"]] = l
- contents = {}
for r in root_owners:
if r in objects:
obr = objects[r]
if obr.get("name"):
contents[obr["name"]] = obr
- #elif obr.get("username"):
- # contents[obr["username"]] = obr
elif "first_name" in obr:
contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
# end with llfuse.lock_released, re-acquire lock
- self.merge(contents.items(),
+ self.merge(viewitems(contents),
lambda i: i[0],
lambda a, i: a.uuid() == i[1]['uuid'],
- lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
+ lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
except Exception:
_logger.exception("arv-mount shared dir error")
finally: