1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future.utils import viewitems
8 from future.utils import itervalues
9 from builtins import dict
20 from apiclient import errors as apiclient_errors
22 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
23 from .fresh import FreshBase, convertTime, use_counter, check_update
25 import arvados.collection
26 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
28 _logger = logging.getLogger('arvados.arvados_fuse')
31 # Match any character which FUSE or Linux cannot accommodate as part
32 # of a filename. (If present in a collection filename, they will
33 # appear as underscores in the fuse mount.)
34 _disallowed_filename_characters = re.compile('[\x00/]')
37 class Directory(FreshBase):
38 """Generic directory object, backed by a dict.
40 Consists of a set of entries with the key representing the filename
41 and the value referencing a File or Directory object.
44 def __init__(self, parent_inode, inodes, apiconfig):
45 """parent_inode is the integer inode number"""
47 super(Directory, self).__init__()
50 if not isinstance(parent_inode, int):
51 raise Exception("parent_inode should be an int")
52 self.parent_inode = parent_inode
54 self.apiconfig = apiconfig
56 self._mtime = time.time()
58 def forward_slash_subst(self):
59 if not hasattr(self, '_fsns'):
61 config = self.apiconfig()
63 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
65 # old API server with no FSNS config
68 if self._fsns == '' or self._fsns == '/':
72 def unsanitize_filename(self, incoming):
73 """Replace ForwardSlashNameSubstitution value with /"""
74 fsns = self.forward_slash_subst()
75 if isinstance(fsns, str):
76 return incoming.replace(fsns, '/')
80 def sanitize_filename(self, dirty):
81 """Replace disallowed filename characters according to
82 ForwardSlashNameSubstitution in self.api_config."""
83 # '.' and '..' are not reachable if API server is newer than #6277
93 fsns = self.forward_slash_subst()
94 if isinstance(fsns, str):
95 dirty = dirty.replace('/', fsns)
96 return _disallowed_filename_characters.sub('_', dirty)
99 # Overridden by subclasses to implement logic to update the
100 # entries dict when the directory is stale
105 # Only used when computing the size of the disk footprint of the directory
113 def checkupdate(self):
117 except apiclient.errors.HttpError as e:
122 def __getitem__(self, item):
123 return self._entries[item]
128 return list(self._entries.items())
132 def __contains__(self, k):
133 return k in self._entries
138 return len(self._entries)
141 self.inodes.touch(self)
142 super(Directory, self).fresh()
144 def merge(self, items, fn, same, new_entry):
145 """Helper method for updating the contents of the directory.
147 Takes a list describing the new contents of the directory, reuse
148 entries that are the same in both the old and new lists, create new
149 entries, and delete old entries missing from the new list.
151 :items: iterable with new directory contents
153 :fn: function to take an entry in 'items' and return the desired file or
154 directory name, or None if this entry should be skipped
156 :same: function to compare an existing entry (a File or Directory
157 object) with an entry in the items list to determine whether to keep
160 :new_entry: function to create a new directory entry (File or Directory
161 object) from an entry in the items list.
165 oldentries = self._entries
169 name = self.sanitize_filename(fn(i))
171 if name in oldentries and same(oldentries[name], i):
172 # move existing directory entry over
173 self._entries[name] = oldentries[name]
176 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
177 # create new directory entry
180 self._entries[name] = self.inodes.add_entry(ent)
183 # delete any other directory entries that were not in found in 'items'
185 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
186 self.inodes.invalidate_entry(self, i)
187 self.inodes.del_entry(oldentries[i])
191 self.inodes.invalidate_inode(self)
192 self._mtime = time.time()
197 if super(Directory, self).in_use():
199 for v in itervalues(self._entries):
204 def has_ref(self, only_children):
205 if super(Directory, self).has_ref(only_children):
207 for v in itervalues(self._entries):
213 """Delete all entries"""
214 oldentries = self._entries
217 oldentries[n].clear()
218 self.inodes.del_entry(oldentries[n])
221 def kernel_invalidate(self):
222 # Invalidating the dentry on the parent implies invalidating all paths
224 parent = self.inodes[self.parent_inode]
226 # Find self on the parent in order to invalidate this path.
227 # Calling the public items() method might trigger a refresh,
228 # which we definitely don't want, so read the internal dict directly.
229 for k,v in viewitems(parent._entries):
231 self.inodes.invalidate_entry(parent, k)
243 def want_event_subscribe(self):
244 raise NotImplementedError()
246 def create(self, name):
247 raise NotImplementedError()
249 def mkdir(self, name):
250 raise NotImplementedError()
252 def unlink(self, name):
253 raise NotImplementedError()
255 def rmdir(self, name):
256 raise NotImplementedError()
258 def rename(self, name_old, name_new, src):
259 raise NotImplementedError()
262 class CollectionDirectoryBase(Directory):
263 """Represent an Arvados Collection as a directory.
265 This class is used for Subcollections, and is also the base class for
266 CollectionDirectory, which implements collection loading/saving on
269 Most operations act only the underlying Arvados `Collection` object. The
270 `Collection` object signals via a notify callback to
271 `CollectionDirectoryBase.on_event` that an item was added, removed or
272 modified. FUSE inodes and directory entries are created, deleted or
273 invalidated in response to these events.
277 def __init__(self, parent_inode, inodes, apiconfig, collection):
278 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig)
279 self.apiconfig = apiconfig
280 self.collection = collection
282 def new_entry(self, name, item, mtime):
283 name = self.sanitize_filename(name)
284 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
285 if item.fuse_entry.dead is not True:
286 raise Exception("Can only reparent dead inode entry")
287 if item.fuse_entry.inode is None:
288 raise Exception("Reparented entry must still have valid inode")
289 item.fuse_entry.dead = False
290 self._entries[name] = item.fuse_entry
291 elif isinstance(item, arvados.collection.RichCollectionBase):
292 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, item))
293 self._entries[name].populate(mtime)
295 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
296 item.fuse_entry = self._entries[name]
298 def on_event(self, event, collection, name, item):
299 if collection == self.collection:
300 name = self.sanitize_filename(name)
301 _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
303 if event == arvados.collection.ADD:
304 self.new_entry(name, item, self.mtime())
305 elif event == arvados.collection.DEL:
306 ent = self._entries[name]
307 del self._entries[name]
308 self.inodes.invalidate_entry(self, name)
309 self.inodes.del_entry(ent)
310 elif event == arvados.collection.MOD:
311 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
312 self.inodes.invalidate_inode(item.fuse_entry)
313 elif name in self._entries:
314 self.inodes.invalidate_inode(self._entries[name])
316 def populate(self, mtime):
318 self.collection.subscribe(self.on_event)
319 for entry, item in viewitems(self.collection):
320 self.new_entry(entry, item, self.mtime())
323 return self.collection.writable()
327 with llfuse.lock_released:
328 self.collection.root_collection().save()
332 def create(self, name):
333 with llfuse.lock_released:
334 self.collection.open(name, "w").close()
338 def mkdir(self, name):
339 with llfuse.lock_released:
340 self.collection.mkdirs(name)
344 def unlink(self, name):
345 with llfuse.lock_released:
346 self.collection.remove(name)
351 def rmdir(self, name):
352 with llfuse.lock_released:
353 self.collection.remove(name)
358 def rename(self, name_old, name_new, src):
359 if not isinstance(src, CollectionDirectoryBase):
360 raise llfuse.FUSEError(errno.EPERM)
365 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
367 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
369 raise llfuse.FUSEError(errno.ENOTEMPTY)
370 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
371 raise llfuse.FUSEError(errno.ENOTDIR)
372 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
373 raise llfuse.FUSEError(errno.EISDIR)
375 with llfuse.lock_released:
376 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
381 super(CollectionDirectoryBase, self).clear()
382 self.collection = None
385 class CollectionDirectory(CollectionDirectoryBase):
386 """Represents the root of a directory tree representing a collection."""
388 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
389 super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, None)
391 self.num_retries = num_retries
392 self.collection_record_file = None
393 self.collection_record = None
396 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
398 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
399 self._poll_time = 60*60
401 if isinstance(collection_record, dict):
402 self.collection_locator = collection_record['uuid']
403 self._mtime = convertTime(collection_record.get('modified_at'))
405 self.collection_locator = collection_record
407 self._manifest_size = 0
408 if self.collection_locator:
409 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
410 self._updating_lock = threading.Lock()
413 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
416 return self.collection.writable() if self.collection is not None else self._writable
418 def want_event_subscribe(self):
419 return (uuid_pattern.match(self.collection_locator) is not None)
421 # Used by arv-web.py to switch the contents of the CollectionDirectory
422 def change_collection(self, new_locator):
423 """Switch the contents of the CollectionDirectory.
425 Must be called with llfuse.lock held.
428 self.collection_locator = new_locator
429 self.collection_record = None
432 def new_collection(self, new_collection_record, coll_reader):
436 self.collection_record = new_collection_record
438 if self.collection_record:
439 self._mtime = convertTime(self.collection_record.get('modified_at'))
440 self.collection_locator = self.collection_record["uuid"]
441 if self.collection_record_file is not None:
442 self.collection_record_file.update(self.collection_record)
444 self.collection = coll_reader
445 self.populate(self.mtime())
448 return self.collection_locator
451 def update(self, to_record_version=None):
453 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
456 if self.collection_locator is None:
461 with llfuse.lock_released:
462 self._updating_lock.acquire()
466 _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
467 if self.collection is not None:
468 if self.collection.known_past_version(to_record_version):
469 _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
471 self.collection.update()
473 if uuid_pattern.match(self.collection_locator):
474 coll_reader = arvados.collection.Collection(
475 self.collection_locator, self.api, self.api.keep,
476 num_retries=self.num_retries)
478 coll_reader = arvados.collection.CollectionReader(
479 self.collection_locator, self.api, self.api.keep,
480 num_retries=self.num_retries)
481 new_collection_record = coll_reader.api_response() or {}
482 # If the Collection only exists in Keep, there will be no API
483 # response. Fill in the fields we need.
484 if 'uuid' not in new_collection_record:
485 new_collection_record['uuid'] = self.collection_locator
486 if "portable_data_hash" not in new_collection_record:
487 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
488 if 'manifest_text' not in new_collection_record:
489 new_collection_record['manifest_text'] = coll_reader.manifest_text()
491 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
492 self.new_collection(new_collection_record, coll_reader)
494 self._manifest_size = len(coll_reader.manifest_text())
495 _logger.debug("%s manifest_size %i", self, self._manifest_size)
496 # end with llfuse.lock_released, re-acquire lock
501 self._updating_lock.release()
502 except arvados.errors.NotFoundError as e:
503 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
504 except arvados.errors.ArgumentError as detail:
505 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
506 if self.collection_record is not None and "manifest_text" in self.collection_record:
507 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
509 _logger.exception("arv-mount %s: error", self.collection_locator)
510 if self.collection_record is not None and "manifest_text" in self.collection_record:
511 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
517 def __getitem__(self, item):
518 if item == '.arvados#collection':
519 if self.collection_record_file is None:
520 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
521 self.inodes.add_entry(self.collection_record_file)
522 return self.collection_record_file
524 return super(CollectionDirectory, self).__getitem__(item)
526 def __contains__(self, k):
527 if k == '.arvados#collection':
530 return super(CollectionDirectory, self).__contains__(k)
532 def invalidate(self):
533 self.collection_record = None
534 self.collection_record_file = None
535 super(CollectionDirectory, self).invalidate()
538 return (self.collection_locator is not None)
541 # This is an empirically-derived heuristic to estimate the memory used
542 # to store this collection's metadata. Calculating the memory
543 # footprint directly would be more accurate, but also more complicated.
544 return self._manifest_size * 128
547 if self.collection is not None:
549 self.collection.save()
550 self.collection.stop_threads()
553 if self.collection is not None:
554 self.collection.stop_threads()
555 super(CollectionDirectory, self).clear()
556 self._manifest_size = 0
559 class TmpCollectionDirectory(CollectionDirectoryBase):
560 """A directory backed by an Arvados collection that never gets saved.
562 This supports using Keep as scratch space. A userspace program can
563 read the .arvados#collection file to get a current manifest in
564 order to save a snapshot of the scratch data or use it as a crunch
568 class UnsaveableCollection(arvados.collection.Collection):
574 def __init__(self, parent_inode, inodes, api_client, num_retries, storage_classes=None):
575 collection = self.UnsaveableCollection(
576 api_client=api_client,
577 keep_client=api_client.keep,
578 num_retries=num_retries,
579 storage_classes_desired=storage_classes)
580 super(TmpCollectionDirectory, self).__init__(
581 parent_inode, inodes, api_client.config, collection)
582 self.collection_record_file = None
583 self.populate(self.mtime())
585 def on_event(self, *args, **kwargs):
586 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
587 if self.collection_record_file:
589 self.collection_record_file.invalidate()
590 self.inodes.invalidate_inode(self.collection_record_file)
591 _logger.debug("%s invalidated collection record", self)
593 def collection_record(self):
594 with llfuse.lock_released:
597 "manifest_text": self.collection.manifest_text(),
598 "portable_data_hash": self.collection.portable_data_hash(),
601 def __contains__(self, k):
602 return (k == '.arvados#collection' or
603 super(TmpCollectionDirectory, self).__contains__(k))
606 def __getitem__(self, item):
607 if item == '.arvados#collection':
608 if self.collection_record_file is None:
609 self.collection_record_file = FuncToJSONFile(
610 self.inode, self.collection_record)
611 self.inodes.add_entry(self.collection_record_file)
612 return self.collection_record_file
613 return super(TmpCollectionDirectory, self).__getitem__(item)
621 def want_event_subscribe(self):
625 self.collection.stop_threads()
627 def invalidate(self):
628 if self.collection_record_file:
629 self.collection_record_file.invalidate()
630 super(TmpCollectionDirectory, self).invalidate()
633 class MagicDirectory(Directory):
634 """A special directory that logically contains the set of all extant keep locators.
636 When a file is referenced by lookup(), it is tested to see if it is a valid
637 keep locator to a manifest, and if so, loads the manifest contents as a
638 subdirectory of this directory with the locator as the directory name.
639 Since querying a list of all extant keep locators is impractical, only
640 collections that have already been accessed are visible to readdir().
645 This directory provides access to Arvados collections as subdirectories listed
646 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
647 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
648 (in the form 'zzzzz-j7d0g-1234567890abcde').
650 Note that this directory will appear empty until you attempt to access a
651 specific collection or project subdirectory (such as trying to 'cd' into it),
652 at which point the collection or project will actually be looked up on the server
653 and the directory will appear if it exists.
657 def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False, storage_classes=None):
658 super(MagicDirectory, self).__init__(parent_inode, inodes, api.config)
660 self.num_retries = num_retries
661 self.pdh_only = pdh_only
662 self.storage_classes = storage_classes
664 def __setattr__(self, name, value):
665 super(MagicDirectory, self).__setattr__(name, value)
666 # When we're assigned an inode, add a README.
667 if ((name == 'inode') and (self.inode is not None) and
668 (not self._entries)):
669 self._entries['README'] = self.inodes.add_entry(
670 StringFile(self.inode, self.README_TEXT, time.time()))
671 # If we're the root directory, add an identical by_id subdirectory.
672 if self.inode == llfuse.ROOT_INODE:
673 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
674 self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
676 def __contains__(self, k):
677 if k in self._entries:
680 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
686 if group_uuid_pattern.match(k):
687 project = self.api.groups().list(
688 filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
689 if project[u'items_available'] == 0:
691 e = self.inodes.add_entry(ProjectDirectory(
692 self.inode, self.inodes, self.api, self.num_retries,
693 project[u'items'][0], storage_classes=self.storage_classes))
695 e = self.inodes.add_entry(CollectionDirectory(
696 self.inode, self.inodes, self.api, self.num_retries, k))
699 if k not in self._entries:
702 self.inodes.del_entry(e)
705 self.inodes.invalidate_entry(self, k)
706 self.inodes.del_entry(e)
708 except Exception as ex:
709 _logger.exception("arv-mount lookup '%s':", k)
711 self.inodes.del_entry(e)
714 def __getitem__(self, item):
716 return self._entries[item]
718 raise KeyError("No collection with id " + item)
723 def want_event_subscribe(self):
724 return not self.pdh_only
727 class TagsDirectory(Directory):
728 """A special directory that contains as subdirectories all tags visible to the user."""
730 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
731 super(TagsDirectory, self).__init__(parent_inode, inodes, api.config)
733 self.num_retries = num_retries
735 self._poll_time = poll_time
738 def want_event_subscribe(self):
743 with llfuse.lock_released:
744 tags = self.api.links().list(
745 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
746 select=['name'], distinct=True, limit=1000
747 ).execute(num_retries=self.num_retries)
749 self.merge(tags['items']+[{"name": n} for n in self._extra],
751 lambda a, i: a.tag == i['name'],
752 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
756 def __getitem__(self, item):
757 if super(TagsDirectory, self).__contains__(item):
758 return super(TagsDirectory, self).__getitem__(item)
759 with llfuse.lock_released:
760 tags = self.api.links().list(
761 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
762 ).execute(num_retries=self.num_retries)
764 self._extra.add(item)
766 return super(TagsDirectory, self).__getitem__(item)
770 def __contains__(self, k):
771 if super(TagsDirectory, self).__contains__(k):
781 class TagDirectory(Directory):
782 """A special directory that contains as subdirectories all collections visible
783 to the user that are tagged with a particular tag.
786 def __init__(self, parent_inode, inodes, api, num_retries, tag,
787 poll=False, poll_time=60):
788 super(TagDirectory, self).__init__(parent_inode, inodes, api.config)
790 self.num_retries = num_retries
793 self._poll_time = poll_time
795 def want_event_subscribe(self):
800 with llfuse.lock_released:
801 taggedcollections = self.api.links().list(
802 filters=[['link_class', '=', 'tag'],
803 ['name', '=', self.tag],
804 ['head_uuid', 'is_a', 'arvados#collection']],
806 ).execute(num_retries=self.num_retries)
807 self.merge(taggedcollections['items'],
808 lambda i: i['head_uuid'],
809 lambda a, i: a.collection_locator == i['head_uuid'],
810 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
813 class ProjectDirectory(Directory):
814 """A special directory that contains the contents of a project."""
816 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
817 poll=True, poll_time=3, storage_classes=None):
818 super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config)
820 self.num_retries = num_retries
821 self.project_object = project_object
822 self.project_object_file = None
823 self.project_uuid = project_object['uuid']
825 self._poll_time = poll_time
826 self._updating_lock = threading.Lock()
827 self._current_user = None
828 self._full_listing = False
829 self.storage_classes = storage_classes
831 def want_event_subscribe(self):
834 def createDirectory(self, i):
835 if collection_uuid_pattern.match(i['uuid']):
836 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
837 elif group_uuid_pattern.match(i['uuid']):
838 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time, self.storage_classes)
839 elif link_uuid_pattern.match(i['uuid']):
840 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
841 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
844 elif uuid_pattern.match(i['uuid']):
845 return ObjectFile(self.parent_inode, i)
850 return self.project_uuid
853 self._full_listing = True
854 return super(ProjectDirectory, self).items()
858 if i['name'] is None or len(i['name']) == 0:
860 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
861 # collection or subproject
863 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
866 elif 'kind' in i and i['kind'].startswith('arvados#'):
868 return "{}.{}".format(i['name'], i['kind'][8:])
875 if self.project_object_file == None:
876 self.project_object_file = ObjectFile(self.inode, self.project_object)
877 self.inodes.add_entry(self.project_object_file)
879 if not self._full_listing:
883 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
884 return a.uuid() == i['uuid']
885 elif isinstance(a, ObjectFile):
886 return a.uuid() == i['uuid'] and not a.stale()
890 with llfuse.lock_released:
891 self._updating_lock.acquire()
895 if group_uuid_pattern.match(self.project_uuid):
896 self.project_object = self.api.groups().get(
897 uuid=self.project_uuid).execute(num_retries=self.num_retries)
898 elif user_uuid_pattern.match(self.project_uuid):
899 self.project_object = self.api.users().get(
900 uuid=self.project_uuid).execute(num_retries=self.num_retries)
901 # do this in 2 steps until #17424 is fixed
902 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
904 num_retries=self.num_retries,
905 uuid=self.project_uuid,
906 filters=[["uuid", "is_a", "arvados#group"],
907 ["groups.group_class", "in", ["project","filter"]]]))
908 contents.extend(arvados.util.keyset_list_all(self.api.groups().contents,
910 num_retries=self.num_retries,
911 uuid=self.project_uuid,
912 filters=[["uuid", "is_a", "arvados#collection"]]))
914 # end with llfuse.lock_released, re-acquire lock
919 self.createDirectory)
922 self._updating_lock.release()
924 def _add_entry(self, i, name):
925 ent = self.createDirectory(i)
926 self._entries[name] = self.inodes.add_entry(ent)
927 return self._entries[name]
931 def __getitem__(self, k):
932 if k == '.arvados#project':
933 return self.project_object_file
934 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
935 return super(ProjectDirectory, self).__getitem__(k)
936 with llfuse.lock_released:
937 k2 = self.unsanitize_filename(k)
939 namefilter = ["name", "=", k]
941 namefilter = ["name", "in", [k, k2]]
942 contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
943 ["group_class", "in", ["project","filter"]],
945 limit=2).execute(num_retries=self.num_retries)["items"]
947 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
949 limit=2).execute(num_retries=self.num_retries)["items"]
951 if len(contents) > 1 and contents[1]['name'] == k:
952 # If "foo/bar" and "foo[SUBST]bar" both exist, use
954 contents = [contents[1]]
955 name = self.sanitize_filename(self.namefn(contents[0]))
958 return self._add_entry(contents[0], name)
963 def __contains__(self, k):
964 if k == '.arvados#project':
976 with llfuse.lock_released:
977 if not self._current_user:
978 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
979 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
986 def mkdir(self, name):
988 with llfuse.lock_released:
990 "owner_uuid": self.project_uuid,
992 "manifest_text": "" }
993 if self.storage_classes is not None:
994 c["storage_classes_desired"] = self.storage_classes
996 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
997 except Exception as e:
1000 except apiclient_errors.Error as error:
1001 _logger.error(error)
1002 raise llfuse.FUSEError(errno.EEXIST)
1006 def rmdir(self, name):
1007 if name not in self:
1008 raise llfuse.FUSEError(errno.ENOENT)
1009 if not isinstance(self[name], CollectionDirectory):
1010 raise llfuse.FUSEError(errno.EPERM)
1011 if len(self[name]) > 0:
1012 raise llfuse.FUSEError(errno.ENOTEMPTY)
1013 with llfuse.lock_released:
1014 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1019 def rename(self, name_old, name_new, src):
1020 if not isinstance(src, ProjectDirectory):
1021 raise llfuse.FUSEError(errno.EPERM)
1025 if not isinstance(ent, CollectionDirectory):
1026 raise llfuse.FUSEError(errno.EPERM)
1028 if name_new in self:
1029 # POSIX semantics for replacing one directory with another is
1030 # tricky (the target directory must be empty, the operation must be
1031 # atomic which isn't possible with the Arvados API as of this
1032 # writing) so don't support that.
1033 raise llfuse.FUSEError(errno.EPERM)
1035 self.api.collections().update(uuid=ent.uuid(),
1036 body={"owner_uuid": self.uuid(),
1037 "name": name_new}).execute(num_retries=self.num_retries)
1039 # Acually move the entry from source directory to this directory.
1040 del src._entries[name_old]
1041 self._entries[name_new] = ent
1042 self.inodes.invalidate_entry(src, name_old)
1045 def child_event(self, ev):
1046 properties = ev.get("properties") or {}
1047 old_attrs = properties.get("old_attributes") or {}
1048 new_attrs = properties.get("new_attributes") or {}
1049 old_attrs["uuid"] = ev["object_uuid"]
1050 new_attrs["uuid"] = ev["object_uuid"]
1051 old_name = self.sanitize_filename(self.namefn(old_attrs))
1052 new_name = self.sanitize_filename(self.namefn(new_attrs))
1054 # create events will have a new name, but not an old name
1055 # delete events will have an old name, but not a new name
1056 # update events will have an old and new name, and they may be same or different
1057 # if they are the same, an unrelated field changed and there is nothing to do.
1059 if old_attrs.get("owner_uuid") != self.project_uuid:
1060 # Was moved from somewhere else, so don't try to remove entry.
1062 if ev.get("object_owner_uuid") != self.project_uuid:
1063 # Was moved to somewhere else, so don't try to add entry
1066 if old_attrs.get("is_trashed"):
1067 # Was previously deleted
1069 if new_attrs.get("is_trashed"):
1073 if new_name != old_name:
1075 if old_name in self._entries:
1076 ent = self._entries[old_name]
1077 del self._entries[old_name]
1078 self.inodes.invalidate_entry(self, old_name)
1082 self._entries[new_name] = ent
1084 self._add_entry(new_attrs, new_name)
1085 elif ent is not None:
1086 self.inodes.del_entry(ent)
1089 class SharedDirectory(Directory):
1090 """A special directory that represents users or groups who have shared projects with me."""
1092 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1093 poll=False, poll_time=60, storage_classes=None):
1094 super(SharedDirectory, self).__init__(parent_inode, inodes, api.config)
1096 self.num_retries = num_retries
1097 self.current_user = api.users().current().execute(num_retries=num_retries)
1099 self._poll_time = poll_time
1100 self._updating_lock = threading.Lock()
1101 self.storage_classes = storage_classes
1106 with llfuse.lock_released:
1107 self._updating_lock.acquire()
1108 if not self.stale():
1116 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1117 if 'httpMethod' in methods.get('shared', {}):
1120 resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1124 include="owner_uuid").execute()
1125 if not resp["items"]:
1127 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1128 for r in resp["items"]:
1129 objects[r["uuid"]] = r
1130 roots.append(r["uuid"])
1131 for r in resp["included"]:
1132 objects[r["uuid"]] = r
1133 root_owners.add(r["uuid"])
1135 all_projects = list(arvados.util.keyset_list_all(
1136 self.api.groups().list,
1138 num_retries=self.num_retries,
1139 filters=[['group_class','in',['project','filter']]],
1140 select=["uuid", "owner_uuid"]))
1141 for ob in all_projects:
1142 objects[ob['uuid']] = ob
1144 current_uuid = self.current_user['uuid']
1145 for ob in all_projects:
1146 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1147 roots.append(ob['uuid'])
1148 root_owners.add(ob['owner_uuid'])
1150 lusers = arvados.util.keyset_list_all(
1151 self.api.users().list,
1153 num_retries=self.num_retries,
1154 filters=[['uuid','in', list(root_owners)]])
1155 lgroups = arvados.util.keyset_list_all(
1156 self.api.groups().list,
1158 num_retries=self.num_retries,
1159 filters=[['uuid','in', list(root_owners)+roots]])
1162 objects[l["uuid"]] = l
1164 objects[l["uuid"]] = l
1166 for r in root_owners:
1170 contents[obr["name"]] = obr
1171 elif "first_name" in obr:
1172 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1177 if obr['owner_uuid'] not in objects:
1178 contents[obr["name"]] = obr
1180 # end with llfuse.lock_released, re-acquire lock
1182 self.merge(viewitems(contents),
1184 lambda a, i: a.uuid() == i[1]['uuid'],
1185 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1187 _logger.exception("arv-mount shared dir error")
1189 self._updating_lock.release()
1191 def want_event_subscribe(self):