1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future.utils import viewitems
8 from future.utils import itervalues
9 from builtins import dict
18 from apiclient import errors as apiclient_errors
22 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
23 from .fresh import FreshBase, convertTime, use_counter, check_update
25 import arvados.collection
26 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
28 _logger = logging.getLogger('arvados.arvados_fuse')
31 # Match any character which FUSE or Linux cannot accommodate as part
32 # of a filename. (If present in a collection filename, they will
33 # appear as underscores in the fuse mount.)
34 _disallowed_filename_characters = re.compile('[\x00/]')
37 class Directory(FreshBase):
38 """Generic directory object, backed by a dict.
40 Consists of a set of entries with the key representing the filename
41 and the value referencing a File or Directory object.
44 def __init__(self, parent_inode, inodes, apiconfig):
45 """parent_inode is the integer inode number"""
47 super(Directory, self).__init__()
50 if not isinstance(parent_inode, int):
51 raise Exception("parent_inode should be an int")
52 self.parent_inode = parent_inode
54 self.apiconfig = apiconfig
56 self._mtime = time.time()
58 def forward_slash_subst(self):
59 if not hasattr(self, '_fsns'):
61 config = self.apiconfig()
63 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
65 # old API server with no FSNS config
68 if self._fsns == '' or self._fsns == '/':
72 def unsanitize_filename(self, incoming):
73 """Replace ForwardSlashNameSubstitution value with /"""
74 fsns = self.forward_slash_subst()
75 if isinstance(fsns, str):
76 return incoming.replace(fsns, '/')
80 def sanitize_filename(self, dirty):
81 """Replace disallowed filename characters according to
82 ForwardSlashNameSubstitution in self.api_config."""
83 # '.' and '..' are not reachable if API server is newer than #6277
93 fsns = self.forward_slash_subst()
94 if isinstance(fsns, str):
95 dirty = dirty.replace('/', fsns)
96 return _disallowed_filename_characters.sub('_', dirty)
99 # Overridden by subclasses to implement logic to update the
100 # entries dict when the directory is stale
105 # Only used when computing the size of the disk footprint of the directory
113 def checkupdate(self):
117 except apiclient.errors.HttpError as e:
122 def __getitem__(self, item):
123 return self._entries[item]
128 return list(self._entries.items())
132 def __contains__(self, k):
133 return k in self._entries
138 return len(self._entries)
141 self.inodes.touch(self)
142 super(Directory, self).fresh()
144 def merge(self, items, fn, same, new_entry):
145 """Helper method for updating the contents of the directory.
147 Takes a list describing the new contents of the directory, reuse
148 entries that are the same in both the old and new lists, create new
149 entries, and delete old entries missing from the new list.
151 :items: iterable with new directory contents
153 :fn: function to take an entry in 'items' and return the desired file or
154 directory name, or None if this entry should be skipped
156 :same: function to compare an existing entry (a File or Directory
157 object) with an entry in the items list to determine whether to keep
160 :new_entry: function to create a new directory entry (File or Directory
161 object) from an entry in the items list.
165 oldentries = self._entries
169 name = self.sanitize_filename(fn(i))
171 if name in oldentries and same(oldentries[name], i):
172 # move existing directory entry over
173 self._entries[name] = oldentries[name]
176 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
177 # create new directory entry
180 self._entries[name] = self.inodes.add_entry(ent)
183 # delete any other directory entries that were not in found in 'items'
185 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
186 self.inodes.invalidate_entry(self, i)
187 self.inodes.del_entry(oldentries[i])
191 self.inodes.invalidate_inode(self)
192 self._mtime = time.time()
197 if super(Directory, self).in_use():
199 for v in itervalues(self._entries):
204 def has_ref(self, only_children):
205 if super(Directory, self).has_ref(only_children):
207 for v in itervalues(self._entries):
213 """Delete all entries"""
214 oldentries = self._entries
217 oldentries[n].clear()
218 self.inodes.del_entry(oldentries[n])
221 def kernel_invalidate(self):
222 # Invalidating the dentry on the parent implies invalidating all paths
224 parent = self.inodes[self.parent_inode]
226 # Find self on the parent in order to invalidate this path.
227 # Calling the public items() method might trigger a refresh,
228 # which we definitely don't want, so read the internal dict directly.
229 for k,v in viewitems(parent._entries):
231 self.inodes.invalidate_entry(parent, k)
243 def want_event_subscribe(self):
244 raise NotImplementedError()
246 def create(self, name):
247 raise NotImplementedError()
249 def mkdir(self, name):
250 raise NotImplementedError()
252 def unlink(self, name):
253 raise NotImplementedError()
255 def rmdir(self, name):
256 raise NotImplementedError()
258 def rename(self, name_old, name_new, src):
259 raise NotImplementedError()
262 class CollectionDirectoryBase(Directory):
263 """Represent an Arvados Collection as a directory.
265 This class is used for Subcollections, and is also the base class for
266 CollectionDirectory, which implements collection loading/saving on
269 Most operations act only the underlying Arvados `Collection` object. The
270 `Collection` object signals via a notify callback to
271 `CollectionDirectoryBase.on_event` that an item was added, removed or
272 modified. FUSE inodes and directory entries are created, deleted or
273 invalidated in response to these events.
277 def __init__(self, parent_inode, inodes, apiconfig, collection):
278 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig)
279 self.apiconfig = apiconfig
280 self.collection = collection
282 def new_entry(self, name, item, mtime):
283 name = self.sanitize_filename(name)
284 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
285 if item.fuse_entry.dead is not True:
286 raise Exception("Can only reparent dead inode entry")
287 if item.fuse_entry.inode is None:
288 raise Exception("Reparented entry must still have valid inode")
289 item.fuse_entry.dead = False
290 self._entries[name] = item.fuse_entry
291 elif isinstance(item, arvados.collection.RichCollectionBase):
292 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, item))
293 self._entries[name].populate(mtime)
295 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
296 item.fuse_entry = self._entries[name]
298 def on_event(self, event, collection, name, item):
299 if collection == self.collection:
300 name = self.sanitize_filename(name)
301 _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
303 if event == arvados.collection.ADD:
304 self.new_entry(name, item, self.mtime())
305 elif event == arvados.collection.DEL:
306 ent = self._entries[name]
307 del self._entries[name]
308 self.inodes.invalidate_entry(self, name)
309 self.inodes.del_entry(ent)
310 elif event == arvados.collection.MOD:
311 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
312 self.inodes.invalidate_inode(item.fuse_entry)
313 elif name in self._entries:
314 self.inodes.invalidate_inode(self._entries[name])
316 def populate(self, mtime):
318 self.collection.subscribe(self.on_event)
319 for entry, item in viewitems(self.collection):
320 self.new_entry(entry, item, self.mtime())
323 return self.collection.writable()
327 with llfuse.lock_released:
328 self.collection.root_collection().save()
332 def create(self, name):
333 with llfuse.lock_released:
334 self.collection.open(name, "w").close()
338 def mkdir(self, name):
339 with llfuse.lock_released:
340 self.collection.mkdirs(name)
344 def unlink(self, name):
345 with llfuse.lock_released:
346 self.collection.remove(name)
351 def rmdir(self, name):
352 with llfuse.lock_released:
353 self.collection.remove(name)
358 def rename(self, name_old, name_new, src):
359 if not isinstance(src, CollectionDirectoryBase):
360 raise llfuse.FUSEError(errno.EPERM)
365 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
367 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
369 raise llfuse.FUSEError(errno.ENOTEMPTY)
370 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
371 raise llfuse.FUSEError(errno.ENOTDIR)
372 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
373 raise llfuse.FUSEError(errno.EISDIR)
375 with llfuse.lock_released:
376 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
381 super(CollectionDirectoryBase, self).clear()
382 self.collection = None
385 class CollectionDirectory(CollectionDirectoryBase):
386 """Represents the root of a directory tree representing a collection."""
388 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
389 super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, None)
391 self.num_retries = num_retries
392 self.collection_record_file = None
393 self.collection_record = None
396 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
398 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
399 self._poll_time = 60*60
401 if isinstance(collection_record, dict):
402 self.collection_locator = collection_record['uuid']
403 self._mtime = convertTime(collection_record.get('modified_at'))
405 self.collection_locator = collection_record
407 self._manifest_size = 0
408 if self.collection_locator:
409 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
410 self._updating_lock = threading.Lock()
413 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
416 return self.collection.writable() if self.collection is not None else self._writable
418 def want_event_subscribe(self):
419 return (uuid_pattern.match(self.collection_locator) is not None)
421 # Used by arv-web.py to switch the contents of the CollectionDirectory
422 def change_collection(self, new_locator):
423 """Switch the contents of the CollectionDirectory.
425 Must be called with llfuse.lock held.
428 self.collection_locator = new_locator
429 self.collection_record = None
432 def new_collection(self, new_collection_record, coll_reader):
436 self.collection_record = new_collection_record
438 if self.collection_record:
439 self._mtime = convertTime(self.collection_record.get('modified_at'))
440 self.collection_locator = self.collection_record["uuid"]
441 if self.collection_record_file is not None:
442 self.collection_record_file.update(self.collection_record)
444 self.collection = coll_reader
445 self.populate(self.mtime())
448 return self.collection_locator
451 def update(self, to_record_version=None):
453 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
456 if self.collection_locator is None:
461 with llfuse.lock_released:
462 self._updating_lock.acquire()
466 _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
467 if self.collection is not None:
468 if self.collection.known_past_version(to_record_version):
469 _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
471 self.collection.update()
473 if uuid_pattern.match(self.collection_locator):
474 coll_reader = arvados.collection.Collection(
475 self.collection_locator, self.api, self.api.keep,
476 num_retries=self.num_retries)
478 coll_reader = arvados.collection.CollectionReader(
479 self.collection_locator, self.api, self.api.keep,
480 num_retries=self.num_retries)
481 new_collection_record = coll_reader.api_response() or {}
482 # If the Collection only exists in Keep, there will be no API
483 # response. Fill in the fields we need.
484 if 'uuid' not in new_collection_record:
485 new_collection_record['uuid'] = self.collection_locator
486 if "portable_data_hash" not in new_collection_record:
487 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
488 if 'manifest_text' not in new_collection_record:
489 new_collection_record['manifest_text'] = coll_reader.manifest_text()
491 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
492 self.new_collection(new_collection_record, coll_reader)
494 self._manifest_size = len(coll_reader.manifest_text())
495 _logger.debug("%s manifest_size %i", self, self._manifest_size)
496 # end with llfuse.lock_released, re-acquire lock
501 self._updating_lock.release()
502 except arvados.errors.NotFoundError as e:
503 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
504 except arvados.errors.ArgumentError as detail:
505 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
506 if self.collection_record is not None and "manifest_text" in self.collection_record:
507 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
509 _logger.exception("arv-mount %s: error", self.collection_locator)
510 if self.collection_record is not None and "manifest_text" in self.collection_record:
511 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
517 def __getitem__(self, item):
518 if item == '.arvados#collection':
519 if self.collection_record_file is None:
520 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
521 self.inodes.add_entry(self.collection_record_file)
522 return self.collection_record_file
524 return super(CollectionDirectory, self).__getitem__(item)
526 def __contains__(self, k):
527 if k == '.arvados#collection':
530 return super(CollectionDirectory, self).__contains__(k)
532 def invalidate(self):
533 self.collection_record = None
534 self.collection_record_file = None
535 super(CollectionDirectory, self).invalidate()
538 return (self.collection_locator is not None)
541 # This is an empirically-derived heuristic to estimate the memory used
542 # to store this collection's metadata. Calculating the memory
543 # footprint directly would be more accurate, but also more complicated.
544 return self._manifest_size * 128
547 if self.collection is not None:
549 self.collection.save()
550 self.collection.stop_threads()
553 if self.collection is not None:
554 self.collection.stop_threads()
555 super(CollectionDirectory, self).clear()
556 self._manifest_size = 0
559 class TmpCollectionDirectory(CollectionDirectoryBase):
560 """A directory backed by an Arvados collection that never gets saved.
562 This supports using Keep as scratch space. A userspace program can
563 read the .arvados#collection file to get a current manifest in
564 order to save a snapshot of the scratch data or use it as a crunch
568 class UnsaveableCollection(arvados.collection.Collection):
574 def __init__(self, parent_inode, inodes, api_client, num_retries):
575 collection = self.UnsaveableCollection(
576 api_client=api_client,
577 keep_client=api_client.keep,
578 num_retries=num_retries)
579 super(TmpCollectionDirectory, self).__init__(
580 parent_inode, inodes, api_client.config, collection)
581 self.collection_record_file = None
582 self.populate(self.mtime())
584 def on_event(self, *args, **kwargs):
585 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
586 if self.collection_record_file:
588 self.collection_record_file.invalidate()
589 self.inodes.invalidate_inode(self.collection_record_file)
590 _logger.debug("%s invalidated collection record", self)
592 def collection_record(self):
593 with llfuse.lock_released:
596 "manifest_text": self.collection.manifest_text(),
597 "portable_data_hash": self.collection.portable_data_hash(),
600 def __contains__(self, k):
601 return (k == '.arvados#collection' or
602 super(TmpCollectionDirectory, self).__contains__(k))
605 def __getitem__(self, item):
606 if item == '.arvados#collection':
607 if self.collection_record_file is None:
608 self.collection_record_file = FuncToJSONFile(
609 self.inode, self.collection_record)
610 self.inodes.add_entry(self.collection_record_file)
611 return self.collection_record_file
612 return super(TmpCollectionDirectory, self).__getitem__(item)
620 def want_event_subscribe(self):
624 self.collection.stop_threads()
626 def invalidate(self):
627 if self.collection_record_file:
628 self.collection_record_file.invalidate()
629 super(TmpCollectionDirectory, self).invalidate()
632 class MagicDirectory(Directory):
633 """A special directory that logically contains the set of all extant keep locators.
635 When a file is referenced by lookup(), it is tested to see if it is a valid
636 keep locator to a manifest, and if so, loads the manifest contents as a
637 subdirectory of this directory with the locator as the directory name.
638 Since querying a list of all extant keep locators is impractical, only
639 collections that have already been accessed are visible to readdir().
644 This directory provides access to Arvados collections as subdirectories listed
645 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
646 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
647 (in the form 'zzzzz-j7d0g-1234567890abcde').
649 Note that this directory will appear empty until you attempt to access a
650 specific collection or project subdirectory (such as trying to 'cd' into it),
651 at which point the collection or project will actually be looked up on the server
652 and the directory will appear if it exists.
656 def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
657 super(MagicDirectory, self).__init__(parent_inode, inodes, api.config)
659 self.num_retries = num_retries
660 self.pdh_only = pdh_only
662 def __setattr__(self, name, value):
663 super(MagicDirectory, self).__setattr__(name, value)
664 # When we're assigned an inode, add a README.
665 if ((name == 'inode') and (self.inode is not None) and
666 (not self._entries)):
667 self._entries['README'] = self.inodes.add_entry(
668 StringFile(self.inode, self.README_TEXT, time.time()))
669 # If we're the root directory, add an identical by_id subdirectory.
670 if self.inode == llfuse.ROOT_INODE:
671 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
672 self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
674 def __contains__(self, k):
675 if k in self._entries:
678 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
684 if group_uuid_pattern.match(k):
685 project = self.api.groups().list(
686 filters=[['group_class', '=', 'project'], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
687 if project[u'items_available'] == 0:
689 e = self.inodes.add_entry(ProjectDirectory(
690 self.inode, self.inodes, self.api, self.num_retries, project[u'items'][0]))
693 e = self.inodes.add_entry(CollectionDirectory(
694 self.inode, self.inodes, self.api, self.num_retries, k))
697 if k not in self._entries:
700 self.inodes.del_entry(e)
703 self.inodes.invalidate_entry(self, k)
704 self.inodes.del_entry(e)
706 except Exception as ex:
707 _logger.exception("arv-mount lookup '%s':", k)
709 self.inodes.del_entry(e)
712 def __getitem__(self, item):
714 return self._entries[item]
716 raise KeyError("No collection with id " + item)
721 def want_event_subscribe(self):
722 return not self.pdh_only
725 class TagsDirectory(Directory):
726 """A special directory that contains as subdirectories all tags visible to the user."""
728 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
729 super(TagsDirectory, self).__init__(parent_inode, inodes, api.config)
731 self.num_retries = num_retries
733 self._poll_time = poll_time
736 def want_event_subscribe(self):
741 with llfuse.lock_released:
742 tags = self.api.links().list(
743 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
744 select=['name'], distinct=True, limit=1000
745 ).execute(num_retries=self.num_retries)
747 self.merge(tags['items']+[{"name": n} for n in self._extra],
749 lambda a, i: a.tag == i['name'],
750 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
754 def __getitem__(self, item):
755 if super(TagsDirectory, self).__contains__(item):
756 return super(TagsDirectory, self).__getitem__(item)
757 with llfuse.lock_released:
758 tags = self.api.links().list(
759 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
760 ).execute(num_retries=self.num_retries)
762 self._extra.add(item)
764 return super(TagsDirectory, self).__getitem__(item)
768 def __contains__(self, k):
769 if super(TagsDirectory, self).__contains__(k):
779 class TagDirectory(Directory):
780 """A special directory that contains as subdirectories all collections visible
781 to the user that are tagged with a particular tag.
784 def __init__(self, parent_inode, inodes, api, num_retries, tag,
785 poll=False, poll_time=60):
786 super(TagDirectory, self).__init__(parent_inode, inodes, api.config)
788 self.num_retries = num_retries
791 self._poll_time = poll_time
793 def want_event_subscribe(self):
798 with llfuse.lock_released:
799 taggedcollections = self.api.links().list(
800 filters=[['link_class', '=', 'tag'],
801 ['name', '=', self.tag],
802 ['head_uuid', 'is_a', 'arvados#collection']],
804 ).execute(num_retries=self.num_retries)
805 self.merge(taggedcollections['items'],
806 lambda i: i['head_uuid'],
807 lambda a, i: a.collection_locator == i['head_uuid'],
808 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
811 class ProjectDirectory(Directory):
812 """A special directory that contains the contents of a project."""
814 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
815 poll=False, poll_time=60):
816 super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config)
818 self.num_retries = num_retries
819 self.project_object = project_object
820 self.project_object_file = None
821 self.project_uuid = project_object['uuid']
823 self._poll_time = poll_time
824 self._updating_lock = threading.Lock()
825 self._current_user = None
826 self._full_listing = False
828 def want_event_subscribe(self):
831 def createDirectory(self, i):
832 if collection_uuid_pattern.match(i['uuid']):
833 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
834 elif group_uuid_pattern.match(i['uuid']):
835 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
836 elif link_uuid_pattern.match(i['uuid']):
837 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
838 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
841 elif uuid_pattern.match(i['uuid']):
842 return ObjectFile(self.parent_inode, i)
847 return self.project_uuid
850 self._full_listing = True
851 return super(ProjectDirectory, self).items()
855 if i['name'] is None or len(i['name']) == 0:
857 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
858 # collection or subproject
860 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
863 elif 'kind' in i and i['kind'].startswith('arvados#'):
865 return "{}.{}".format(i['name'], i['kind'][8:])
872 if self.project_object_file == None:
873 self.project_object_file = ObjectFile(self.inode, self.project_object)
874 self.inodes.add_entry(self.project_object_file)
876 if not self._full_listing:
880 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
881 return a.uuid() == i['uuid']
882 elif isinstance(a, ObjectFile):
883 return a.uuid() == i['uuid'] and not a.stale()
887 with llfuse.lock_released:
888 self._updating_lock.acquire()
892 if group_uuid_pattern.match(self.project_uuid):
893 self.project_object = self.api.groups().get(
894 uuid=self.project_uuid).execute(num_retries=self.num_retries)
895 elif user_uuid_pattern.match(self.project_uuid):
896 self.project_object = self.api.users().get(
897 uuid=self.project_uuid).execute(num_retries=self.num_retries)
899 contents = arvados.util.list_all(self.api.groups().list,
901 filters=[["owner_uuid", "=", self.project_uuid],
902 ["group_class", "=", "project"]])
903 contents.extend(arvados.util.list_all(self.api.collections().list,
905 filters=[["owner_uuid", "=", self.project_uuid]]))
907 # end with llfuse.lock_released, re-acquire lock
912 self.createDirectory)
915 self._updating_lock.release()
917 def _add_entry(self, i, name):
918 ent = self.createDirectory(i)
919 self._entries[name] = self.inodes.add_entry(ent)
920 return self._entries[name]
924 def __getitem__(self, k):
925 if k == '.arvados#project':
926 return self.project_object_file
927 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
928 return super(ProjectDirectory, self).__getitem__(k)
929 with llfuse.lock_released:
930 k2 = self.unsanitize_filename(k)
932 namefilter = ["name", "=", k]
934 namefilter = ["name", "in", [k, k2]]
935 contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
936 ["group_class", "=", "project"],
938 limit=2).execute(num_retries=self.num_retries)["items"]
940 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
942 limit=2).execute(num_retries=self.num_retries)["items"]
944 if len(contents) > 1 and contents[1]['name'] == k:
945 # If "foo/bar" and "foo[SUBST]bar" both exist, use
947 contents = [contents[1]]
948 name = self.sanitize_filename(self.namefn(contents[0]))
951 return self._add_entry(contents[0], name)
956 def __contains__(self, k):
957 if k == '.arvados#project':
969 with llfuse.lock_released:
970 if not self._current_user:
971 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
972 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
979 def mkdir(self, name):
981 with llfuse.lock_released:
982 self.api.collections().create(body={"owner_uuid": self.project_uuid,
984 "manifest_text": ""}).execute(num_retries=self.num_retries)
986 except apiclient_errors.Error as error:
988 raise llfuse.FUSEError(errno.EEXIST)
992 def rmdir(self, name):
994 raise llfuse.FUSEError(errno.ENOENT)
995 if not isinstance(self[name], CollectionDirectory):
996 raise llfuse.FUSEError(errno.EPERM)
997 if len(self[name]) > 0:
998 raise llfuse.FUSEError(errno.ENOTEMPTY)
999 with llfuse.lock_released:
1000 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1005 def rename(self, name_old, name_new, src):
1006 if not isinstance(src, ProjectDirectory):
1007 raise llfuse.FUSEError(errno.EPERM)
1011 if not isinstance(ent, CollectionDirectory):
1012 raise llfuse.FUSEError(errno.EPERM)
1014 if name_new in self:
1015 # POSIX semantics for replacing one directory with another is
1016 # tricky (the target directory must be empty, the operation must be
1017 # atomic which isn't possible with the Arvados API as of this
1018 # writing) so don't support that.
1019 raise llfuse.FUSEError(errno.EPERM)
1021 self.api.collections().update(uuid=ent.uuid(),
1022 body={"owner_uuid": self.uuid(),
1023 "name": name_new}).execute(num_retries=self.num_retries)
1025 # Acually move the entry from source directory to this directory.
1026 del src._entries[name_old]
1027 self._entries[name_new] = ent
1028 self.inodes.invalidate_entry(src, name_old)
1031 def child_event(self, ev):
1032 properties = ev.get("properties") or {}
1033 old_attrs = properties.get("old_attributes") or {}
1034 new_attrs = properties.get("new_attributes") or {}
1035 old_attrs["uuid"] = ev["object_uuid"]
1036 new_attrs["uuid"] = ev["object_uuid"]
1037 old_name = self.sanitize_filename(self.namefn(old_attrs))
1038 new_name = self.sanitize_filename(self.namefn(new_attrs))
1040 # create events will have a new name, but not an old name
1041 # delete events will have an old name, but not a new name
1042 # update events will have an old and new name, and they may be same or different
1043 # if they are the same, an unrelated field changed and there is nothing to do.
1045 if old_attrs.get("owner_uuid") != self.project_uuid:
1046 # Was moved from somewhere else, so don't try to remove entry.
1048 if ev.get("object_owner_uuid") != self.project_uuid:
1049 # Was moved to somewhere else, so don't try to add entry
1052 if old_attrs.get("is_trashed"):
1053 # Was previously deleted
1055 if new_attrs.get("is_trashed"):
1059 if new_name != old_name:
1061 if old_name in self._entries:
1062 ent = self._entries[old_name]
1063 del self._entries[old_name]
1064 self.inodes.invalidate_entry(self, old_name)
1068 self._entries[new_name] = ent
1070 self._add_entry(new_attrs, new_name)
1071 elif ent is not None:
1072 self.inodes.del_entry(ent)
1075 class SharedDirectory(Directory):
1076 """A special directory that represents users or groups who have shared projects with me."""
1078 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1079 poll=False, poll_time=60):
1080 super(SharedDirectory, self).__init__(parent_inode, inodes, api.config)
1082 self.num_retries = num_retries
1083 self.current_user = api.users().current().execute(num_retries=num_retries)
1085 self._poll_time = poll_time
1086 self._updating_lock = threading.Lock()
1091 with llfuse.lock_released:
1092 self._updating_lock.acquire()
1093 if not self.stale():
1101 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1102 if 'httpMethod' in methods.get('shared', {}):
1105 resp = self.api.groups().shared(filters=[['group_class', '=', 'project']]+page,
1109 include="owner_uuid").execute()
1110 if not resp["items"]:
1112 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1113 for r in resp["items"]:
1114 objects[r["uuid"]] = r
1115 roots.append(r["uuid"])
1116 for r in resp["included"]:
1117 objects[r["uuid"]] = r
1118 root_owners.add(r["uuid"])
1120 all_projects = arvados.util.list_all(
1121 self.api.groups().list, self.num_retries,
1122 filters=[['group_class','=','project']],
1123 select=["uuid", "owner_uuid"])
1124 for ob in all_projects:
1125 objects[ob['uuid']] = ob
1127 current_uuid = self.current_user['uuid']
1128 for ob in all_projects:
1129 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1130 roots.append(ob['uuid'])
1131 root_owners.add(ob['owner_uuid'])
1133 lusers = arvados.util.list_all(
1134 self.api.users().list, self.num_retries,
1135 filters=[['uuid','in', list(root_owners)]])
1136 lgroups = arvados.util.list_all(
1137 self.api.groups().list, self.num_retries,
1138 filters=[['uuid','in', list(root_owners)+roots]])
1141 objects[l["uuid"]] = l
1143 objects[l["uuid"]] = l
1145 for r in root_owners:
1149 contents[obr["name"]] = obr
1150 #elif obr.get("username"):
1151 # contents[obr["username"]] = obr
1152 elif "first_name" in obr:
1153 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1158 if obr['owner_uuid'] not in objects:
1159 contents[obr["name"]] = obr
1161 # end with llfuse.lock_released, re-acquire lock
1163 self.merge(viewitems(contents),
1165 lambda a, i: a.uuid() == i[1]['uuid'],
1166 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
1168 _logger.exception("arv-mount shared dir error")
1170 self._updating_lock.release()
1172 def want_event_subscribe(self):