1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
15 from apiclient import errors as apiclient_errors
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
23 _logger = logging.getLogger('arvados.arvados_fuse')
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
32 class Directory(FreshBase):
33 """Generic directory object, backed by a dict.
35 Consists of a set of entries with the key representing the filename
36 and the value referencing a File or Directory object.
39 def __init__(self, parent_inode, inodes, apiconfig):
40 """parent_inode is the integer inode number"""
42 super(Directory, self).__init__()
45 if not isinstance(parent_inode, int):
46 raise Exception("parent_inode should be an int")
47 self.parent_inode = parent_inode
49 self.apiconfig = apiconfig
51 self._mtime = time.time()
53 def forward_slash_subst(self):
54 if not hasattr(self, '_fsns'):
56 config = self.apiconfig()
58 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
60 # old API server with no FSNS config
63 if self._fsns == '' or self._fsns == '/':
67 def unsanitize_filename(self, incoming):
68 """Replace ForwardSlashNameSubstitution value with /"""
69 fsns = self.forward_slash_subst()
70 if isinstance(fsns, str):
71 return incoming.replace(fsns, '/')
75 def sanitize_filename(self, dirty):
76 """Replace disallowed filename characters according to
77 ForwardSlashNameSubstitution in self.api_config."""
78 # '.' and '..' are not reachable if API server is newer than #6277
88 fsns = self.forward_slash_subst()
89 if isinstance(fsns, str):
90 dirty = dirty.replace('/', fsns)
91 return _disallowed_filename_characters.sub('_', dirty)
94 # Overridden by subclasses to implement logic to update the
95 # entries dict when the directory is stale
100 # Only used when computing the size of the disk footprint of the directory
108 def checkupdate(self):
112 except apiclient.errors.HttpError as e:
117 def __getitem__(self, item):
118 return self._entries[item]
123 return list(self._entries.items())
127 def __contains__(self, k):
128 return k in self._entries
133 return len(self._entries)
136 self.inodes.touch(self)
137 super(Directory, self).fresh()
139 def merge(self, items, fn, same, new_entry):
140 """Helper method for updating the contents of the directory.
142 Takes a list describing the new contents of the directory, reuse
143 entries that are the same in both the old and new lists, create new
144 entries, and delete old entries missing from the new list.
146 :items: iterable with new directory contents
148 :fn: function to take an entry in 'items' and return the desired file or
149 directory name, or None if this entry should be skipped
151 :same: function to compare an existing entry (a File or Directory
152 object) with an entry in the items list to determine whether to keep
155 :new_entry: function to create a new directory entry (File or Directory
156 object) from an entry in the items list.
160 oldentries = self._entries
164 name = self.sanitize_filename(fn(i))
166 if name in oldentries and same(oldentries[name], i):
167 # move existing directory entry over
168 self._entries[name] = oldentries[name]
171 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
172 # create new directory entry
175 self._entries[name] = self.inodes.add_entry(ent)
178 # delete any other directory entries that were not in found in 'items'
180 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
181 self.inodes.invalidate_entry(self, i)
182 self.inodes.del_entry(oldentries[i])
186 self.inodes.invalidate_inode(self)
187 self._mtime = time.time()
192 if super(Directory, self).in_use():
194 for v in self._entries.values():
199 def has_ref(self, only_children):
200 if super(Directory, self).has_ref(only_children):
202 for v in self._entries.values():
208 """Delete all entries"""
209 oldentries = self._entries
212 oldentries[n].clear()
213 self.inodes.del_entry(oldentries[n])
216 def kernel_invalidate(self):
217 # Invalidating the dentry on the parent implies invalidating all paths
219 parent = self.inodes[self.parent_inode]
221 # Find self on the parent in order to invalidate this path.
222 # Calling the public items() method might trigger a refresh,
223 # which we definitely don't want, so read the internal dict directly.
224 for k,v in parent._entries.items():
226 self.inodes.invalidate_entry(parent, k)
238 def want_event_subscribe(self):
239 raise NotImplementedError()
241 def create(self, name):
242 raise NotImplementedError()
244 def mkdir(self, name):
245 raise NotImplementedError()
247 def unlink(self, name):
248 raise NotImplementedError()
250 def rmdir(self, name):
251 raise NotImplementedError()
253 def rename(self, name_old, name_new, src):
254 raise NotImplementedError()
257 class CollectionDirectoryBase(Directory):
258 """Represent an Arvados Collection as a directory.
260 This class is used for Subcollections, and is also the base class for
261 CollectionDirectory, which implements collection loading/saving on
264 Most operations act only the underlying Arvados `Collection` object. The
265 `Collection` object signals via a notify callback to
266 `CollectionDirectoryBase.on_event` that an item was added, removed or
267 modified. FUSE inodes and directory entries are created, deleted or
268 invalidated in response to these events.
272 def __init__(self, parent_inode, inodes, apiconfig, collection):
273 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig)
274 self.apiconfig = apiconfig
275 self.collection = collection
277 def new_entry(self, name, item, mtime):
278 name = self.sanitize_filename(name)
279 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
280 if item.fuse_entry.dead is not True:
281 raise Exception("Can only reparent dead inode entry")
282 if item.fuse_entry.inode is None:
283 raise Exception("Reparented entry must still have valid inode")
284 item.fuse_entry.dead = False
285 self._entries[name] = item.fuse_entry
286 elif isinstance(item, arvados.collection.RichCollectionBase):
287 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, item))
288 self._entries[name].populate(mtime)
290 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
291 item.fuse_entry = self._entries[name]
293 def on_event(self, event, collection, name, item):
294 if collection == self.collection:
295 name = self.sanitize_filename(name)
298 # It's possible for another thread to have llfuse.lock and
299 # be waiting on collection.lock. Meanwhile, we released
300 # llfuse.lock earlier in the stack, but are still holding
301 # on to the collection lock, and now we need to re-acquire
302 # llfuse.lock. If we don't release the collection lock,
303 # we'll deadlock where we're holding the collection lock
304 # waiting for llfuse.lock and the other thread is holding
305 # llfuse.lock and waiting for the collection lock.
307 # The correct locking order here is to take llfuse.lock
308 # first, then the collection lock.
310 # Since collection.lock is an RLock, it might be locked
311 # multiple times, so we need to release it multiple times,
312 # keep a count, then re-lock it the correct number of
318 self.collection.lock.release()
325 with self.collection.lock:
326 if event == arvados.collection.ADD:
327 self.new_entry(name, item, self.mtime())
328 elif event == arvados.collection.DEL:
329 ent = self._entries[name]
330 del self._entries[name]
331 self.inodes.invalidate_entry(self, name)
332 self.inodes.del_entry(ent)
333 elif event == arvados.collection.MOD:
334 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
335 self.inodes.invalidate_inode(item.fuse_entry)
336 elif name in self._entries:
337 self.inodes.invalidate_inode(self._entries[name])
340 self.collection.lock.acquire()
343 def populate(self, mtime):
345 with self.collection.lock:
346 self.collection.subscribe(self.on_event)
347 for entry, item in self.collection.items():
348 self.new_entry(entry, item, self.mtime())
351 return self.collection.writable()
355 with llfuse.lock_released:
356 self.collection.root_collection().save()
360 def create(self, name):
361 with llfuse.lock_released:
362 self.collection.open(name, "w").close()
366 def mkdir(self, name):
367 with llfuse.lock_released:
368 self.collection.mkdirs(name)
372 def unlink(self, name):
373 with llfuse.lock_released:
374 self.collection.remove(name)
379 def rmdir(self, name):
380 with llfuse.lock_released:
381 self.collection.remove(name)
386 def rename(self, name_old, name_new, src):
387 if not isinstance(src, CollectionDirectoryBase):
388 raise llfuse.FUSEError(errno.EPERM)
393 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
395 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
397 raise llfuse.FUSEError(errno.ENOTEMPTY)
398 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
399 raise llfuse.FUSEError(errno.ENOTDIR)
400 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
401 raise llfuse.FUSEError(errno.EISDIR)
403 with llfuse.lock_released:
404 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
409 super(CollectionDirectoryBase, self).clear()
410 self.collection = None
413 class CollectionDirectory(CollectionDirectoryBase):
414 """Represents the root of a directory tree representing a collection."""
416 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
417 super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, None)
419 self.num_retries = num_retries
420 self.collection_record_file = None
421 self.collection_record = None
424 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
426 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
427 self._poll_time = 60*60
429 if isinstance(collection_record, dict):
430 self.collection_locator = collection_record['uuid']
431 self._mtime = convertTime(collection_record.get('modified_at'))
433 self.collection_locator = collection_record
435 self._manifest_size = 0
436 if self.collection_locator:
437 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
438 self._updating_lock = threading.Lock()
441 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
444 return self.collection.writable() if self.collection is not None else self._writable
446 def want_event_subscribe(self):
447 return (uuid_pattern.match(self.collection_locator) is not None)
449 # Used by arv-web.py to switch the contents of the CollectionDirectory
450 def change_collection(self, new_locator):
451 """Switch the contents of the CollectionDirectory.
453 Must be called with llfuse.lock held.
456 self.collection_locator = new_locator
457 self.collection_record = None
460 def new_collection(self, new_collection_record, coll_reader):
464 self.collection_record = new_collection_record
466 if self.collection_record:
467 self._mtime = convertTime(self.collection_record.get('modified_at'))
468 self.collection_locator = self.collection_record["uuid"]
469 if self.collection_record_file is not None:
470 self.collection_record_file.update(self.collection_record)
472 self.collection = coll_reader
473 self.populate(self.mtime())
476 return self.collection_locator
479 def update(self, to_record_version=None):
481 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
484 if self.collection_locator is None:
489 with llfuse.lock_released:
490 self._updating_lock.acquire()
494 _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
495 new_collection_record = None
496 if self.collection is not None:
497 if self.collection.known_past_version(to_record_version):
498 _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
500 self.collection.update()
502 if uuid_pattern.match(self.collection_locator):
503 coll_reader = arvados.collection.Collection(
504 self.collection_locator, self.api, self.api.keep,
505 num_retries=self.num_retries)
507 coll_reader = arvados.collection.CollectionReader(
508 self.collection_locator, self.api, self.api.keep,
509 num_retries=self.num_retries)
510 new_collection_record = coll_reader.api_response() or {}
511 # If the Collection only exists in Keep, there will be no API
512 # response. Fill in the fields we need.
513 if 'uuid' not in new_collection_record:
514 new_collection_record['uuid'] = self.collection_locator
515 if "portable_data_hash" not in new_collection_record:
516 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
517 if 'manifest_text' not in new_collection_record:
518 new_collection_record['manifest_text'] = coll_reader.manifest_text()
519 if 'storage_classes_desired' not in new_collection_record:
520 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
522 # end with llfuse.lock_released, re-acquire lock
523 if (new_collection_record is not None and
524 (self.collection_record is None or
525 self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"))):
526 self.new_collection(new_collection_record, coll_reader)
527 self._manifest_size = len(coll_reader.manifest_text())
528 _logger.debug("%s manifest_size %i", self, self._manifest_size)
533 self._updating_lock.release()
534 except arvados.errors.NotFoundError as e:
535 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
536 except arvados.errors.ArgumentError as detail:
537 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
538 if self.collection_record is not None and "manifest_text" in self.collection_record:
539 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
541 _logger.exception("arv-mount %s: error", self.collection_locator)
542 if self.collection_record is not None and "manifest_text" in self.collection_record:
543 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
549 def __getitem__(self, item):
550 if item == '.arvados#collection':
551 if self.collection_record_file is None:
552 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
553 self.inodes.add_entry(self.collection_record_file)
554 return self.collection_record_file
556 return super(CollectionDirectory, self).__getitem__(item)
558 def __contains__(self, k):
559 if k == '.arvados#collection':
562 return super(CollectionDirectory, self).__contains__(k)
564 def invalidate(self):
565 self.collection_record = None
566 self.collection_record_file = None
567 super(CollectionDirectory, self).invalidate()
570 return (self.collection_locator is not None)
573 # This is an empirically-derived heuristic to estimate the memory used
574 # to store this collection's metadata. Calculating the memory
575 # footprint directly would be more accurate, but also more complicated.
576 return self._manifest_size * 128
579 if self.collection is not None:
581 self.collection.save()
582 self.collection.stop_threads()
585 if self.collection is not None:
586 self.collection.stop_threads()
587 super(CollectionDirectory, self).clear()
588 self._manifest_size = 0
591 class TmpCollectionDirectory(CollectionDirectoryBase):
592 """A directory backed by an Arvados collection that never gets saved.
594 This supports using Keep as scratch space. A userspace program can
595 read the .arvados#collection file to get a current manifest in
596 order to save a snapshot of the scratch data or use it as a crunch
600 class UnsaveableCollection(arvados.collection.Collection):
606 def __init__(self, parent_inode, inodes, api_client, num_retries, storage_classes=None):
607 collection = self.UnsaveableCollection(
608 api_client=api_client,
609 keep_client=api_client.keep,
610 num_retries=num_retries,
611 storage_classes_desired=storage_classes)
612 super(TmpCollectionDirectory, self).__init__(
613 parent_inode, inodes, api_client.config, collection)
614 self.collection_record_file = None
615 self.populate(self.mtime())
617 def on_event(self, *args, **kwargs):
618 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
619 if self.collection_record_file:
621 # See discussion in CollectionDirectoryBase.on_event
625 self.collection.lock.release()
632 with self.collection.lock:
633 self.collection_record_file.invalidate()
634 self.inodes.invalidate_inode(self.collection_record_file)
635 _logger.debug("%s invalidated collection record", self)
638 self.collection.lock.acquire()
641 def collection_record(self):
642 with llfuse.lock_released:
645 "manifest_text": self.collection.manifest_text(),
646 "portable_data_hash": self.collection.portable_data_hash(),
647 "storage_classes_desired": self.collection.storage_classes_desired(),
650 def __contains__(self, k):
651 return (k == '.arvados#collection' or
652 super(TmpCollectionDirectory, self).__contains__(k))
655 def __getitem__(self, item):
656 if item == '.arvados#collection':
657 if self.collection_record_file is None:
658 self.collection_record_file = FuncToJSONFile(
659 self.inode, self.collection_record)
660 self.inodes.add_entry(self.collection_record_file)
661 return self.collection_record_file
662 return super(TmpCollectionDirectory, self).__getitem__(item)
670 def want_event_subscribe(self):
674 self.collection.stop_threads()
676 def invalidate(self):
677 if self.collection_record_file:
678 self.collection_record_file.invalidate()
679 super(TmpCollectionDirectory, self).invalidate()
682 class MagicDirectory(Directory):
683 """A special directory that logically contains the set of all extant keep locators.
685 When a file is referenced by lookup(), it is tested to see if it is a valid
686 keep locator to a manifest, and if so, loads the manifest contents as a
687 subdirectory of this directory with the locator as the directory name.
688 Since querying a list of all extant keep locators is impractical, only
689 collections that have already been accessed are visible to readdir().
694 This directory provides access to Arvados collections as subdirectories listed
695 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
696 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
697 (in the form 'zzzzz-j7d0g-1234567890abcde').
699 Note that this directory will appear empty until you attempt to access a
700 specific collection or project subdirectory (such as trying to 'cd' into it),
701 at which point the collection or project will actually be looked up on the server
702 and the directory will appear if it exists.
706 def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False, storage_classes=None):
707 super(MagicDirectory, self).__init__(parent_inode, inodes, api.config)
709 self.num_retries = num_retries
710 self.pdh_only = pdh_only
711 self.storage_classes = storage_classes
713 def __setattr__(self, name, value):
714 super(MagicDirectory, self).__setattr__(name, value)
715 # When we're assigned an inode, add a README.
716 if ((name == 'inode') and (self.inode is not None) and
717 (not self._entries)):
718 self._entries['README'] = self.inodes.add_entry(
719 StringFile(self.inode, self.README_TEXT, time.time()))
720 # If we're the root directory, add an identical by_id subdirectory.
721 if self.inode == llfuse.ROOT_INODE:
722 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
723 self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
725 def __contains__(self, k):
726 if k in self._entries:
729 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
735 if group_uuid_pattern.match(k):
736 project = self.api.groups().list(
737 filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
738 if project[u'items_available'] == 0:
740 e = self.inodes.add_entry(ProjectDirectory(
741 self.inode, self.inodes, self.api, self.num_retries,
742 project[u'items'][0], storage_classes=self.storage_classes))
744 e = self.inodes.add_entry(CollectionDirectory(
745 self.inode, self.inodes, self.api, self.num_retries, k))
748 if k not in self._entries:
751 self.inodes.del_entry(e)
754 self.inodes.invalidate_entry(self, k)
755 self.inodes.del_entry(e)
757 except Exception as ex:
758 _logger.exception("arv-mount lookup '%s':", k)
760 self.inodes.del_entry(e)
763 def __getitem__(self, item):
765 return self._entries[item]
767 raise KeyError("No collection with id " + item)
772 def want_event_subscribe(self):
773 return not self.pdh_only
776 class TagsDirectory(Directory):
777 """A special directory that contains as subdirectories all tags visible to the user."""
779 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
780 super(TagsDirectory, self).__init__(parent_inode, inodes, api.config)
782 self.num_retries = num_retries
784 self._poll_time = poll_time
787 def want_event_subscribe(self):
792 with llfuse.lock_released:
793 tags = self.api.links().list(
794 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
795 select=['name'], distinct=True, limit=1000
796 ).execute(num_retries=self.num_retries)
798 self.merge(tags['items']+[{"name": n} for n in self._extra],
800 lambda a, i: a.tag == i['name'],
801 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
805 def __getitem__(self, item):
806 if super(TagsDirectory, self).__contains__(item):
807 return super(TagsDirectory, self).__getitem__(item)
808 with llfuse.lock_released:
809 tags = self.api.links().list(
810 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
811 ).execute(num_retries=self.num_retries)
813 self._extra.add(item)
815 return super(TagsDirectory, self).__getitem__(item)
819 def __contains__(self, k):
820 if super(TagsDirectory, self).__contains__(k):
830 class TagDirectory(Directory):
831 """A special directory that contains as subdirectories all collections visible
832 to the user that are tagged with a particular tag.
835 def __init__(self, parent_inode, inodes, api, num_retries, tag,
836 poll=False, poll_time=60):
837 super(TagDirectory, self).__init__(parent_inode, inodes, api.config)
839 self.num_retries = num_retries
842 self._poll_time = poll_time
844 def want_event_subscribe(self):
849 with llfuse.lock_released:
850 taggedcollections = self.api.links().list(
851 filters=[['link_class', '=', 'tag'],
852 ['name', '=', self.tag],
853 ['head_uuid', 'is_a', 'arvados#collection']],
855 ).execute(num_retries=self.num_retries)
856 self.merge(taggedcollections['items'],
857 lambda i: i['head_uuid'],
858 lambda a, i: a.collection_locator == i['head_uuid'],
859 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
862 class ProjectDirectory(Directory):
863 """A special directory that contains the contents of a project."""
865 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
866 poll=True, poll_time=3, storage_classes=None):
867 super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config)
869 self.num_retries = num_retries
870 self.project_object = project_object
871 self.project_object_file = None
872 self.project_uuid = project_object['uuid']
874 self._poll_time = poll_time
875 self._updating_lock = threading.Lock()
876 self._current_user = None
877 self._full_listing = False
878 self.storage_classes = storage_classes
880 def want_event_subscribe(self):
883 def createDirectory(self, i):
884 if collection_uuid_pattern.match(i['uuid']):
885 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
886 elif group_uuid_pattern.match(i['uuid']):
887 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time, self.storage_classes)
888 elif link_uuid_pattern.match(i['uuid']):
889 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
890 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
893 elif uuid_pattern.match(i['uuid']):
894 return ObjectFile(self.parent_inode, i)
899 return self.project_uuid
902 self._full_listing = True
903 return super(ProjectDirectory, self).items()
907 if i['name'] is None or len(i['name']) == 0:
909 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
910 # collection or subproject
912 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
915 elif 'kind' in i and i['kind'].startswith('arvados#'):
917 return "{}.{}".format(i['name'], i['kind'][8:])
924 if self.project_object_file == None:
925 self.project_object_file = ObjectFile(self.inode, self.project_object)
926 self.inodes.add_entry(self.project_object_file)
928 if not self._full_listing:
932 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
933 return a.uuid() == i['uuid']
934 elif isinstance(a, ObjectFile):
935 return a.uuid() == i['uuid'] and not a.stale()
939 with llfuse.lock_released:
940 self._updating_lock.acquire()
944 if group_uuid_pattern.match(self.project_uuid):
945 self.project_object = self.api.groups().get(
946 uuid=self.project_uuid).execute(num_retries=self.num_retries)
947 elif user_uuid_pattern.match(self.project_uuid):
948 self.project_object = self.api.users().get(
949 uuid=self.project_uuid).execute(num_retries=self.num_retries)
950 # do this in 2 steps until #17424 is fixed
951 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
953 num_retries=self.num_retries,
954 uuid=self.project_uuid,
955 filters=[["uuid", "is_a", "arvados#group"],
956 ["groups.group_class", "in", ["project","filter"]]]))
957 contents.extend(arvados.util.keyset_list_all(self.api.groups().contents,
959 num_retries=self.num_retries,
960 uuid=self.project_uuid,
961 filters=[["uuid", "is_a", "arvados#collection"]]))
963 # end with llfuse.lock_released, re-acquire lock
968 self.createDirectory)
971 self._updating_lock.release()
973 def _add_entry(self, i, name):
974 ent = self.createDirectory(i)
975 self._entries[name] = self.inodes.add_entry(ent)
976 return self._entries[name]
980 def __getitem__(self, k):
981 if k == '.arvados#project':
982 return self.project_object_file
983 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
984 return super(ProjectDirectory, self).__getitem__(k)
985 with llfuse.lock_released:
986 k2 = self.unsanitize_filename(k)
988 namefilter = ["name", "=", k]
990 namefilter = ["name", "in", [k, k2]]
991 contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
992 ["group_class", "in", ["project","filter"]],
994 limit=2).execute(num_retries=self.num_retries)["items"]
996 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
998 limit=2).execute(num_retries=self.num_retries)["items"]
1000 if len(contents) > 1 and contents[1]['name'] == k:
1001 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1003 contents = [contents[1]]
1004 name = self.sanitize_filename(self.namefn(contents[0]))
1007 return self._add_entry(contents[0], name)
1012 def __contains__(self, k):
1013 if k == '.arvados#project':
1025 with llfuse.lock_released:
1026 if not self._current_user:
1027 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1028 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1030 def persisted(self):
1035 def mkdir(self, name):
1037 with llfuse.lock_released:
1039 "owner_uuid": self.project_uuid,
1041 "manifest_text": "" }
1042 if self.storage_classes is not None:
1043 c["storage_classes_desired"] = self.storage_classes
1045 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1046 except Exception as e:
1049 except apiclient_errors.Error as error:
1050 _logger.error(error)
1051 raise llfuse.FUSEError(errno.EEXIST)
1055 def rmdir(self, name):
1056 if name not in self:
1057 raise llfuse.FUSEError(errno.ENOENT)
1058 if not isinstance(self[name], CollectionDirectory):
1059 raise llfuse.FUSEError(errno.EPERM)
1060 if len(self[name]) > 0:
1061 raise llfuse.FUSEError(errno.ENOTEMPTY)
1062 with llfuse.lock_released:
1063 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1068 def rename(self, name_old, name_new, src):
1069 if not isinstance(src, ProjectDirectory):
1070 raise llfuse.FUSEError(errno.EPERM)
1074 if not isinstance(ent, CollectionDirectory):
1075 raise llfuse.FUSEError(errno.EPERM)
1077 if name_new in self:
1078 # POSIX semantics for replacing one directory with another is
1079 # tricky (the target directory must be empty, the operation must be
1080 # atomic which isn't possible with the Arvados API as of this
1081 # writing) so don't support that.
1082 raise llfuse.FUSEError(errno.EPERM)
1084 self.api.collections().update(uuid=ent.uuid(),
1085 body={"owner_uuid": self.uuid(),
1086 "name": name_new}).execute(num_retries=self.num_retries)
1088 # Acually move the entry from source directory to this directory.
1089 del src._entries[name_old]
1090 self._entries[name_new] = ent
1091 self.inodes.invalidate_entry(src, name_old)
1094 def child_event(self, ev):
1095 properties = ev.get("properties") or {}
1096 old_attrs = properties.get("old_attributes") or {}
1097 new_attrs = properties.get("new_attributes") or {}
1098 old_attrs["uuid"] = ev["object_uuid"]
1099 new_attrs["uuid"] = ev["object_uuid"]
1100 old_name = self.sanitize_filename(self.namefn(old_attrs))
1101 new_name = self.sanitize_filename(self.namefn(new_attrs))
1103 # create events will have a new name, but not an old name
1104 # delete events will have an old name, but not a new name
1105 # update events will have an old and new name, and they may be same or different
1106 # if they are the same, an unrelated field changed and there is nothing to do.
1108 if old_attrs.get("owner_uuid") != self.project_uuid:
1109 # Was moved from somewhere else, so don't try to remove entry.
1111 if ev.get("object_owner_uuid") != self.project_uuid:
1112 # Was moved to somewhere else, so don't try to add entry
1115 if old_attrs.get("is_trashed"):
1116 # Was previously deleted
1118 if new_attrs.get("is_trashed"):
1122 if new_name != old_name:
1124 if old_name in self._entries:
1125 ent = self._entries[old_name]
1126 del self._entries[old_name]
1127 self.inodes.invalidate_entry(self, old_name)
1131 self._entries[new_name] = ent
1133 self._add_entry(new_attrs, new_name)
1134 elif ent is not None:
1135 self.inodes.del_entry(ent)
1138 class SharedDirectory(Directory):
1139 """A special directory that represents users or groups who have shared projects with me."""
1141 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1142 poll=False, poll_time=60, storage_classes=None):
1143 super(SharedDirectory, self).__init__(parent_inode, inodes, api.config)
1145 self.num_retries = num_retries
1146 self.current_user = api.users().current().execute(num_retries=num_retries)
1148 self._poll_time = poll_time
1149 self._updating_lock = threading.Lock()
1150 self.storage_classes = storage_classes
1155 with llfuse.lock_released:
1156 self._updating_lock.acquire()
1157 if not self.stale():
1165 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1166 if 'httpMethod' in methods.get('shared', {}):
1169 resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1173 include="owner_uuid").execute()
1174 if not resp["items"]:
1176 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1177 for r in resp["items"]:
1178 objects[r["uuid"]] = r
1179 roots.append(r["uuid"])
1180 for r in resp["included"]:
1181 objects[r["uuid"]] = r
1182 root_owners.add(r["uuid"])
1184 all_projects = list(arvados.util.keyset_list_all(
1185 self.api.groups().list,
1187 num_retries=self.num_retries,
1188 filters=[['group_class','in',['project','filter']]],
1189 select=["uuid", "owner_uuid"]))
1190 for ob in all_projects:
1191 objects[ob['uuid']] = ob
1193 current_uuid = self.current_user['uuid']
1194 for ob in all_projects:
1195 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1196 roots.append(ob['uuid'])
1197 root_owners.add(ob['owner_uuid'])
1199 lusers = arvados.util.keyset_list_all(
1200 self.api.users().list,
1202 num_retries=self.num_retries,
1203 filters=[['uuid','in', list(root_owners)]])
1204 lgroups = arvados.util.keyset_list_all(
1205 self.api.groups().list,
1207 num_retries=self.num_retries,
1208 filters=[['uuid','in', list(root_owners)+roots]])
1211 objects[l["uuid"]] = l
1213 objects[l["uuid"]] = l
1215 for r in root_owners:
1219 contents[obr["name"]] = obr
1220 elif "first_name" in obr:
1221 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1226 if obr['owner_uuid'] not in objects:
1227 contents[obr["name"]] = obr
1229 # end with llfuse.lock_released, re-acquire lock
1231 self.merge(contents.items(),
1233 lambda a, i: a.uuid() == i[1]['uuid'],
1234 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1236 _logger.exception("arv-mount shared dir error")
1238 self._updating_lock.release()
1240 def want_event_subscribe(self):