1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
15 from apiclient import errors as apiclient_errors
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
23 _logger = logging.getLogger('arvados.arvados_fuse')
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
32 class Directory(FreshBase):
33 """Generic directory object, backed by a dict.
35 Consists of a set of entries with the key representing the filename
36 and the value referencing a File or Directory object.
39 __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
41 def __init__(self, parent_inode, inodes, enable_write, filters):
42 """parent_inode is the integer inode number"""
44 super(Directory, self).__init__()
47 if not isinstance(parent_inode, int):
48 raise Exception("parent_inode should be an int")
49 self.parent_inode = parent_inode
52 self._mtime = time.time()
53 self._enable_write = enable_write
54 self._filters = filters or []
56 def _filters_for(self, subtype, *, qualified):
57 for f in self._filters:
58 f_type, _, f_name = f[0].partition('.')
61 elif f_type != subtype:
66 yield [f_name, *f[1:]]
68 def unsanitize_filename(self, incoming):
69 """Replace ForwardSlashNameSubstitution value with /"""
70 fsns = self.inodes.forward_slash_subst()
71 if isinstance(fsns, str):
72 return incoming.replace(fsns, '/')
76 def sanitize_filename(self, dirty):
77 """Replace disallowed filename characters according to
78 ForwardSlashNameSubstitution in self.api_config."""
79 # '.' and '..' are not reachable if API server is newer than #6277
89 fsns = self.inodes.forward_slash_subst()
90 if isinstance(fsns, str):
91 dirty = dirty.replace('/', fsns)
92 return _disallowed_filename_characters.sub('_', dirty)
95 # Overridden by subclasses to implement logic to update the
96 # entries dict when the directory is stale
101 # Only used when computing the size of the disk footprint of the directory
109 def checkupdate(self):
113 except apiclient.errors.HttpError as e:
118 def __getitem__(self, item):
119 return self._entries[item]
124 return list(self._entries.items())
128 def __contains__(self, k):
129 return k in self._entries
134 return len(self._entries)
137 self.inodes.touch(self)
138 super(Directory, self).fresh()
141 # Rough estimate of memory footprint based on using pympler
142 return len(self._entries) * 1024
144 def merge(self, items, fn, same, new_entry):
145 """Helper method for updating the contents of the directory.
147 Takes a list describing the new contents of the directory, reuse
148 entries that are the same in both the old and new lists, create new
149 entries, and delete old entries missing from the new list.
152 * items: Iterable --- New directory contents
154 * fn: Callable --- Takes an entry in 'items' and return the desired file or
155 directory name, or None if this entry should be skipped
157 * same: Callable --- Compare an existing entry (a File or Directory
158 object) with an entry in the items list to determine whether to keep
161 * new_entry: Callable --- Create a new directory entry (File or Directory
162 object) from an entry in the items list.
166 oldentries = self._entries
170 name = self.sanitize_filename(fn(i))
173 if name in oldentries:
174 ent = oldentries[name]
175 if same(ent, i) and ent.parent_inode == self.inode:
176 # move existing directory entry over
177 self._entries[name] = ent
179 self.inodes.inode_cache.touch(ent)
182 name = self.sanitize_filename(fn(i))
185 if name not in self._entries:
186 # create new directory entry
189 self._entries[name] = self.inodes.add_entry(ent)
190 # need to invalidate this just in case there was a
191 # previous entry that couldn't be moved over or a
192 # lookup that returned file not found and cached
194 self.inodes.invalidate_entry(self, name)
196 _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
198 # delete any other directory entries that were not in found in 'items'
199 for name, ent in oldentries.items():
200 _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
201 self.inodes.invalidate_entry(self, name)
202 self.inodes.del_entry(ent)
206 self._mtime = time.time()
207 self.inodes.inode_cache.update_cache_size(self)
212 if super(Directory, self).in_use():
214 for v in self._entries.values():
220 """Delete all entries"""
221 if not self._entries:
223 oldentries = self._entries
226 for name, ent in oldentries.items():
228 self.inodes.invalidate_entry(self, name)
229 self.inodes.del_entry(ent)
230 self.inodes.inode_cache.update_cache_size(self)
232 def kernel_invalidate(self):
233 # Invalidating the dentry on the parent implies invalidating all paths
235 if self.parent_inode in self.inodes:
236 parent = self.inodes[self.parent_inode]
238 # parent was removed already.
241 # Find self on the parent in order to invalidate this path.
242 # Calling the public items() method might trigger a refresh,
243 # which we definitely don't want, so read the internal dict directly.
244 for k,v in parent._entries.items():
246 self.inodes.invalidate_entry(parent, k)
258 def want_event_subscribe(self):
259 raise NotImplementedError()
261 def create(self, name):
262 raise NotImplementedError()
264 def mkdir(self, name):
265 raise NotImplementedError()
267 def unlink(self, name):
268 raise NotImplementedError()
270 def rmdir(self, name):
271 raise NotImplementedError()
273 def rename(self, name_old, name_new, src):
274 raise NotImplementedError()
277 class CollectionDirectoryBase(Directory):
278 """Represent an Arvados Collection as a directory.
280 This class is used for Subcollections, and is also the base class for
281 CollectionDirectory, which implements collection loading/saving on
284 Most operations act only the underlying Arvados `Collection` object. The
285 `Collection` object signals via a notify callback to
286 `CollectionDirectoryBase.on_event` that an item was added, removed or
287 modified. FUSE inodes and directory entries are created, deleted or
288 invalidated in response to these events.
292 __slots__ = ("collection", "collection_root", "collection_record_file")
294 def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
295 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
296 self.collection = collection
297 self.collection_root = collection_root
298 self.collection_record_file = None
300 def new_entry(self, name, item, mtime):
301 name = self.sanitize_filename(name)
302 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
303 if item.fuse_entry.parent_inode is not None:
304 raise Exception("Can only reparent unparented inode entry")
305 if item.fuse_entry.inode is None:
306 raise Exception("Reparented entry must still have valid inode")
307 item.fuse_entry.parent_inode = self.inode
308 self._entries[name] = item.fuse_entry
309 elif isinstance(item, arvados.collection.RichCollectionBase):
310 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
316 self.collection_root,
318 self._entries[name].populate(mtime)
320 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write, self._poll, self._poll_time))
321 item.fuse_entry = self._entries[name]
323 def on_event(self, event, collection, name, item):
325 # These are events from the Collection object (ADD/DEL/MOD)
326 # emitted by operations on the Collection object (like
327 # "mkdirs" or "remove"), and by "update", which we need to
328 # synchronize with our FUSE objects that are assigned inodes.
329 if collection != self.collection:
332 name = self.sanitize_filename(name)
335 # It's possible for another thread to have llfuse.lock and
336 # be waiting on collection.lock. Meanwhile, we released
337 # llfuse.lock earlier in the stack, but are still holding
338 # on to the collection lock, and now we need to re-acquire
339 # llfuse.lock. If we don't release the collection lock,
340 # we'll deadlock where we're holding the collection lock
341 # waiting for llfuse.lock and the other thread is holding
342 # llfuse.lock and waiting for the collection lock.
344 # The correct locking order here is to take llfuse.lock
345 # first, then the collection lock.
347 # Since collection.lock is an RLock, it might be locked
348 # multiple times, so we need to release it multiple times,
349 # keep a count, then re-lock it the correct number of
355 self.collection.lock.release()
362 with self.collection.lock:
363 if event == arvados.collection.ADD:
364 self.new_entry(name, item, self.mtime())
365 elif event == arvados.collection.DEL:
366 ent = self._entries[name]
367 del self._entries[name]
368 self.inodes.invalidate_entry(self, name)
369 self.inodes.del_entry(ent)
370 elif event == arvados.collection.MOD:
371 # MOD events have (modified_from, newitem)
374 if hasattr(newitem, "fuse_entry") and newitem.fuse_entry is not None:
375 entry = newitem.fuse_entry
376 elif name in self._entries:
377 entry = self._entries[name]
379 if entry is not None:
381 self.inodes.invalidate_inode(entry)
383 if name in self._entries:
384 self.inodes.invalidate_entry(self, name)
386 # TOK and WRITE events just invalidate the
387 # collection record file.
389 if self.collection_record_file is not None:
390 self.collection_record_file.invalidate()
391 self.inodes.invalidate_inode(self.collection_record_file)
394 self.collection.lock.acquire()
397 def populate(self, mtime):
399 with self.collection.lock:
400 self.collection.subscribe(self.on_event)
401 for entry, item in self.collection.items():
402 self.new_entry(entry, item, self.mtime())
405 return self._enable_write and self.collection.writable()
409 self.collection_root.flush()
413 def create(self, name):
414 if not self.writable():
415 raise llfuse.FUSEError(errno.EROFS)
416 with llfuse.lock_released:
417 self.collection.open(name, "w").close()
421 def mkdir(self, name):
422 if not self.writable():
423 raise llfuse.FUSEError(errno.EROFS)
424 with llfuse.lock_released:
425 self.collection.mkdirs(name)
429 def unlink(self, name):
430 if not self.writable():
431 raise llfuse.FUSEError(errno.EROFS)
432 with llfuse.lock_released:
433 self.collection.remove(name)
438 def rmdir(self, name):
439 if not self.writable():
440 raise llfuse.FUSEError(errno.EROFS)
441 with llfuse.lock_released:
442 self.collection.remove(name)
447 def rename(self, name_old, name_new, src):
448 if not self.writable():
449 raise llfuse.FUSEError(errno.EROFS)
451 if not isinstance(src, CollectionDirectoryBase):
452 raise llfuse.FUSEError(errno.EPERM)
457 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
459 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
461 raise llfuse.FUSEError(errno.ENOTEMPTY)
462 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
463 raise llfuse.FUSEError(errno.ENOTDIR)
464 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
465 raise llfuse.FUSEError(errno.EISDIR)
467 with llfuse.lock_released:
468 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
473 super(CollectionDirectoryBase, self).clear()
474 if self.collection is not None:
475 self.collection.unsubscribe()
476 self.collection = None
479 # objsize for the whole collection is represented at the root,
480 # don't double-count it
483 class CollectionDirectory(CollectionDirectoryBase):
484 """Represents the root of a directory tree representing a collection."""
486 __slots__ = ("api", "num_retries", "collection_locator",
487 "_manifest_size", "_writable", "_updating_lock")
489 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
490 super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
492 self.num_retries = num_retries
495 if isinstance(collection_record, dict):
496 self.collection_locator = collection_record['uuid']
497 self._mtime = convertTime(collection_record.get('modified_at'))
499 self.collection_locator = collection_record
502 is_uuid = (self.collection_locator is not None) and (uuid_pattern.match(self.collection_locator) is not None)
505 # It is a uuid, it may be updated upstream, so recheck it periodically.
508 # It is not a uuid. For immutable collections, collection
509 # only needs to be refreshed if it is very long lived
510 # (long enough that there's a risk of the blob signatures
513 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
515 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
516 self._poll_time = 60*60
518 self._writable = is_uuid and enable_write
519 self._manifest_size = 0
520 self._updating_lock = threading.Lock()
523 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
526 return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
530 with llfuse.lock_released:
531 with self._updating_lock:
532 if self.collection.committed():
533 self.collection.update()
535 self.collection.save()
536 self.new_collection_record(self.collection.api_response())
538 def want_event_subscribe(self):
539 return (uuid_pattern.match(self.collection_locator) is not None)
541 def new_collection(self, new_collection_record, coll_reader):
544 self.collection = coll_reader
545 self.new_collection_record(new_collection_record)
546 self.populate(self.mtime())
548 def new_collection_record(self, new_collection_record):
549 if not new_collection_record:
550 raise Exception("invalid new_collection_record")
551 self._mtime = convertTime(new_collection_record.get('modified_at'))
552 self._manifest_size = len(new_collection_record["manifest_text"])
553 self.collection_locator = new_collection_record["uuid"]
554 if self.collection_record_file is not None:
555 self.collection_record_file.invalidate()
556 self.inodes.invalidate_inode(self.collection_record_file)
557 _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
558 self.collection_record_file.inode)
559 self.inodes.update_uuid(self)
560 self.inodes.inode_cache.update_cache_size(self)
564 return self.collection_locator
569 if self.collection_locator is None:
570 # No collection locator to retrieve from
574 new_collection_record = None
576 with llfuse.lock_released:
577 self._updating_lock.acquire()
581 _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
583 if self.collection is not None:
584 # Already have a collection object
585 self.collection.update()
586 new_collection_record = self.collection.api_response()
588 # Create a new collection object
589 if uuid_pattern.match(self.collection_locator):
590 coll_reader = arvados.collection.Collection(
591 self.collection_locator, self.api, self.api.keep,
592 num_retries=self.num_retries)
594 coll_reader = arvados.collection.CollectionReader(
595 self.collection_locator, self.api, self.api.keep,
596 num_retries=self.num_retries)
597 new_collection_record = coll_reader.api_response() or {}
598 # If the Collection only exists in Keep, there will be no API
599 # response. Fill in the fields we need.
600 if 'uuid' not in new_collection_record:
601 new_collection_record['uuid'] = self.collection_locator
602 if "portable_data_hash" not in new_collection_record:
603 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
604 if 'manifest_text' not in new_collection_record:
605 new_collection_record['manifest_text'] = coll_reader.manifest_text()
606 if 'storage_classes_desired' not in new_collection_record:
607 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
609 # end with llfuse.lock_released, re-acquire lock
611 if new_collection_record is not None:
612 if coll_reader is not None:
613 self.new_collection(new_collection_record, coll_reader)
615 self.new_collection_record(new_collection_record)
619 self._updating_lock.release()
620 except arvados.errors.NotFoundError as e:
621 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
622 except arvados.errors.ArgumentError as detail:
623 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
624 if new_collection_record is not None and "manifest_text" in new_collection_record:
625 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
627 _logger.exception("arv-mount %s: error", self.collection_locator)
628 if new_collection_record is not None and "manifest_text" in new_collection_record:
629 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
635 def collection_record(self):
637 return self.collection.api_response()
641 def __getitem__(self, item):
642 if item == '.arvados#collection':
643 if self.collection_record_file is None:
644 self.collection_record_file = FuncToJSONFile(
645 self.inode, self.collection_record)
646 self.inodes.add_entry(self.collection_record_file)
647 self.invalidate() # use lookup as a signal to force update
648 return self.collection_record_file
650 return super(CollectionDirectory, self).__getitem__(item)
652 def __contains__(self, k):
653 if k == '.arvados#collection':
656 return super(CollectionDirectory, self).__contains__(k)
658 def invalidate(self):
659 if self.collection_record_file is not None:
660 self.collection_record_file.invalidate()
661 self.inodes.invalidate_inode(self.collection_record_file)
662 super(CollectionDirectory, self).invalidate()
665 return (self.collection_locator is not None)
668 # This is a rough guess of the amount of overhead involved for
669 # a collection; the assumptions are that that each file
670 # averages 128 bytes in the manifest, but consume 1024 bytes
671 # of Python data structures, so 1024/128=8 means we estimate
672 # the RAM footprint at 8 times the size of bare manifest text.
673 return self._manifest_size * 8
676 if self.collection is None:
681 self.collection.save()
682 except Exception as e:
683 _logger.exception("Failed to save collection %s", self.collection_locator)
684 self.collection.stop_threads()
687 if self.collection is not None:
688 self.collection.stop_threads()
689 self._manifest_size = 0
690 super(CollectionDirectory, self).clear()
691 if self.collection_record_file is not None:
692 self.inodes.del_entry(self.collection_record_file)
693 self.collection_record_file = None
696 class TmpCollectionDirectory(CollectionDirectoryBase):
697 """A directory backed by an Arvados collection that never gets saved.
699 This supports using Keep as scratch space. A userspace program can
700 read the .arvados#collection file to get a current manifest in
701 order to save a snapshot of the scratch data or use it as a crunch
705 class UnsaveableCollection(arvados.collection.Collection):
711 def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
712 collection = self.UnsaveableCollection(
713 api_client=api_client,
714 keep_client=api_client.keep,
715 num_retries=num_retries,
716 storage_classes_desired=storage_classes)
717 # This is always enable_write=True because it never tries to
718 # save to the backend
719 super(TmpCollectionDirectory, self).__init__(
720 parent_inode, inodes, True, filters, collection, self)
721 self.populate(self.mtime())
723 def collection_record(self):
724 with llfuse.lock_released:
727 "manifest_text": self.collection.manifest_text(),
728 "portable_data_hash": self.collection.portable_data_hash(),
729 "storage_classes_desired": self.collection.storage_classes_desired(),
732 def __contains__(self, k):
733 return (k == '.arvados#collection' or
734 super(TmpCollectionDirectory, self).__contains__(k))
737 def __getitem__(self, item):
738 if item == '.arvados#collection':
739 if self.collection_record_file is None:
740 self.collection_record_file = FuncToJSONFile(
741 self.inode, self.collection_record)
742 self.inodes.add_entry(self.collection_record_file)
743 return self.collection_record_file
744 return super(TmpCollectionDirectory, self).__getitem__(item)
755 def want_event_subscribe(self):
759 self.collection.stop_threads()
761 def invalidate(self):
762 if self.collection_record_file:
763 self.collection_record_file.invalidate()
764 super(TmpCollectionDirectory, self).invalidate()
767 class MagicDirectory(Directory):
768 """A special directory that logically contains the set of all extant keep locators.
770 When a file is referenced by lookup(), it is tested to see if it is a valid
771 keep locator to a manifest, and if so, loads the manifest contents as a
772 subdirectory of this directory with the locator as the directory name.
773 Since querying a list of all extant keep locators is impractical, only
774 collections that have already been accessed are visible to readdir().
779 This directory provides access to Arvados collections as subdirectories listed
780 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
781 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
782 (in the form 'zzzzz-j7d0g-1234567890abcde').
784 Note that this directory will appear empty until you attempt to access a
785 specific collection or project subdirectory (such as trying to 'cd' into it),
786 at which point the collection or project will actually be looked up on the server
787 and the directory will appear if it exists.
791 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
792 super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
794 self.num_retries = num_retries
795 self.pdh_only = pdh_only
796 self.storage_classes = storage_classes
798 def __setattr__(self, name, value):
799 super(MagicDirectory, self).__setattr__(name, value)
800 # When we're assigned an inode, add a README.
801 if ((name == 'inode') and (self.inode is not None) and
802 (not self._entries)):
803 self._entries['README'] = self.inodes.add_entry(
804 StringFile(self.inode, self.README_TEXT, time.time()))
805 # If we're the root directory, add an identical by_id subdirectory.
806 if self.inode == llfuse.ROOT_INODE:
807 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
817 def __contains__(self, k):
818 if k in self._entries:
821 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
827 if group_uuid_pattern.match(k):
828 project = self.api.groups().list(
830 ['group_class', 'in', ['project','filter']],
832 *self._filters_for('groups', qualified=False),
834 ).execute(num_retries=self.num_retries)
835 if project[u'items_available'] == 0:
837 e = self.inodes.add_entry(ProjectDirectory(
844 project[u'items'][0],
845 storage_classes=self.storage_classes,
848 e = self.inodes.add_entry(CollectionDirectory(
859 if k not in self._entries:
862 self.inodes.del_entry(e)
865 self.inodes.invalidate_entry(self, k)
866 self.inodes.del_entry(e)
868 except Exception as ex:
869 _logger.exception("arv-mount lookup '%s':", k)
871 self.inodes.del_entry(e)
874 def __getitem__(self, item):
876 return self._entries[item]
878 raise KeyError("No collection with id " + item)
883 def want_event_subscribe(self):
884 return not self.pdh_only
887 class TagsDirectory(Directory):
888 """A special directory that contains as subdirectories all tags visible to the user."""
890 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
891 super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
893 self.num_retries = num_retries
895 self._poll_time = poll_time
898 def want_event_subscribe(self):
903 with llfuse.lock_released:
904 tags = self.api.links().list(
906 ['link_class', '=', 'tag'],
908 *self._filters_for('links', qualified=False),
913 ).execute(num_retries=self.num_retries)
916 tags['items']+[{"name": n} for n in self._extra],
918 lambda a, i: a.tag == i['name'],
919 lambda i: TagDirectory(
928 poll_time=self._poll_time,
934 def __getitem__(self, item):
935 if super(TagsDirectory, self).__contains__(item):
936 return super(TagsDirectory, self).__getitem__(item)
937 with llfuse.lock_released:
938 tags = self.api.links().list(
940 ['link_class', '=', 'tag'],
942 *self._filters_for('links', qualified=False),
945 ).execute(num_retries=self.num_retries)
947 self._extra.add(item)
949 return super(TagsDirectory, self).__getitem__(item)
953 def __contains__(self, k):
954 if super(TagsDirectory, self).__contains__(k):
964 class TagDirectory(Directory):
965 """A special directory that contains as subdirectories all collections visible
966 to the user that are tagged with a particular tag.
969 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
970 poll=False, poll_time=60):
971 super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
973 self.num_retries = num_retries
976 self._poll_time = poll_time
978 def want_event_subscribe(self):
983 with llfuse.lock_released:
984 taggedcollections = self.api.links().list(
986 ['link_class', '=', 'tag'],
987 ['name', '=', self.tag],
988 ['head_uuid', 'is_a', 'arvados#collection'],
989 *self._filters_for('links', qualified=False),
991 select=['head_uuid'],
992 ).execute(num_retries=self.num_retries)
994 taggedcollections['items'],
995 lambda i: i['head_uuid'],
996 lambda a, i: a.collection_locator == i['head_uuid'],
997 lambda i: CollectionDirectory(
1009 class ProjectDirectory(Directory):
1010 """A special directory that contains the contents of a project."""
1012 __slots__ = ("api", "num_retries", "project_object", "project_object_file",
1013 "project_uuid", "_updating_lock",
1014 "_current_user", "_full_listing", "storage_classes", "recursively_contained")
1016 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1017 project_object, poll=True, poll_time=15, storage_classes=None):
1018 super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1020 self.num_retries = num_retries
1021 self.project_object = project_object
1022 self.project_object_file = None
1023 self.project_uuid = project_object['uuid']
1025 self._poll_time = poll_time
1026 self._updating_lock = threading.Lock()
1027 self._current_user = None
1028 self._full_listing = False
1029 self.storage_classes = storage_classes
1030 self.recursively_contained = False
1032 # Filter groups can contain themselves, which causes tools
1033 # that walk the filesystem to get stuck in an infinite loop,
1034 # so suppress returning a listing in that case.
1035 if self.project_object.get("group_class") == "filter":
1036 iter_parent_inode = parent_inode
1037 while iter_parent_inode != llfuse.ROOT_INODE:
1038 parent_dir = self.inodes[iter_parent_inode]
1039 if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
1040 self.recursively_contained = True
1042 iter_parent_inode = parent_dir.parent_inode
1044 def want_event_subscribe(self):
1047 def createDirectory(self, i):
1048 common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1049 if collection_uuid_pattern.match(i['uuid']):
1050 return CollectionDirectory(*common_args, i)
1051 elif group_uuid_pattern.match(i['uuid']):
1052 return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
1053 elif link_uuid_pattern.match(i['uuid']):
1054 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1055 return CollectionDirectory(*common_args, i['head_uuid'])
1058 elif uuid_pattern.match(i['uuid']):
1059 return ObjectFile(self.parent_inode, i)
1064 return self.project_uuid
1067 self._full_listing = True
1068 return super(ProjectDirectory, self).items()
1070 def namefn(self, i):
1072 if i['name'] is None or len(i['name']) == 0:
1074 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1075 # collection or subproject
1077 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1080 elif 'kind' in i and i['kind'].startswith('arvados#'):
1082 return "{}.{}".format(i['name'], i['kind'][8:])
1089 if self.project_object_file == None:
1090 self.project_object_file = ObjectFile(self.inode, self.project_object)
1091 self.inodes.add_entry(self.project_object_file)
1093 if self.recursively_contained or not self._full_listing:
1097 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1098 return a.uuid() == i['uuid']
1099 elif isinstance(a, ObjectFile):
1100 return a.uuid() == i['uuid'] and not a.stale()
1104 with llfuse.lock_released:
1105 self._updating_lock.acquire()
1106 if not self.stale():
1109 if group_uuid_pattern.match(self.project_uuid):
1110 self.project_object = self.api.groups().get(
1111 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1112 elif user_uuid_pattern.match(self.project_uuid):
1113 self.project_object = self.api.users().get(
1114 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1115 # do this in 2 steps until #17424 is fixed
1116 contents = list(arvados.util.keyset_list_all(
1117 self.api.groups().contents,
1119 num_retries=self.num_retries,
1120 uuid=self.project_uuid,
1122 ['uuid', 'is_a', 'arvados#group'],
1123 ['groups.group_class', 'in', ['project', 'filter']],
1124 *self._filters_for('groups', qualified=True),
1127 contents.extend(obj for obj in arvados.util.keyset_list_all(
1128 self.api.groups().contents,
1130 num_retries=self.num_retries,
1131 uuid=self.project_uuid,
1133 ['uuid', 'is_a', 'arvados#collection'],
1134 *self._filters_for('collections', qualified=True),
1136 ) if obj['current_version_uuid'] == obj['uuid'])
1137 # end with llfuse.lock_released, re-acquire lock
1139 self.merge(contents,
1142 self.createDirectory)
1145 self._updating_lock.release()
1147 def _add_entry(self, i, name):
1148 ent = self.createDirectory(i)
1149 self._entries[name] = self.inodes.add_entry(ent)
1150 return self._entries[name]
1154 def __getitem__(self, k):
1155 if k == '.arvados#project':
1156 return self.project_object_file
1157 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1158 return super(ProjectDirectory, self).__getitem__(k)
1159 with llfuse.lock_released:
1160 k2 = self.unsanitize_filename(k)
1162 namefilter = ["name", "=", k]
1164 namefilter = ["name", "in", [k, k2]]
1165 contents = self.api.groups().list(
1167 ["owner_uuid", "=", self.project_uuid],
1168 ["group_class", "in", ["project","filter"]],
1170 *self._filters_for('groups', qualified=False),
1173 ).execute(num_retries=self.num_retries)["items"]
1175 contents = self.api.collections().list(
1177 ["owner_uuid", "=", self.project_uuid],
1179 *self._filters_for('collections', qualified=False),
1182 ).execute(num_retries=self.num_retries)["items"]
1184 if len(contents) > 1 and contents[1]['name'] == k:
1185 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1187 contents = [contents[1]]
1188 name = self.sanitize_filename(self.namefn(contents[0]))
1191 return self._add_entry(contents[0], name)
1196 def __contains__(self, k):
1197 if k == '.arvados#project':
1209 if not self._enable_write:
1211 return self.project_object.get("can_write") is True
1213 def persisted(self):
1217 super(ProjectDirectory, self).clear()
1218 if self.project_object_file is not None:
1219 self.inodes.del_entry(self.project_object_file)
1220 self.project_object_file = None
1224 def mkdir(self, name):
1225 if not self.writable():
1226 raise llfuse.FUSEError(errno.EROFS)
1229 with llfuse.lock_released:
1231 "owner_uuid": self.project_uuid,
1233 "manifest_text": "" }
1234 if self.storage_classes is not None:
1235 c["storage_classes_desired"] = self.storage_classes
1237 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1238 except Exception as e:
1241 except apiclient_errors.Error as error:
1242 _logger.error(error)
1243 raise llfuse.FUSEError(errno.EEXIST)
1247 def rmdir(self, name):
1248 if not self.writable():
1249 raise llfuse.FUSEError(errno.EROFS)
1251 if name not in self:
1252 raise llfuse.FUSEError(errno.ENOENT)
1253 if not isinstance(self[name], CollectionDirectory):
1254 raise llfuse.FUSEError(errno.EPERM)
1255 if len(self[name]) > 0:
1256 raise llfuse.FUSEError(errno.ENOTEMPTY)
1257 with llfuse.lock_released:
1258 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1263 def rename(self, name_old, name_new, src):
1264 if not self.writable():
1265 raise llfuse.FUSEError(errno.EROFS)
1267 if not isinstance(src, ProjectDirectory):
1268 raise llfuse.FUSEError(errno.EPERM)
1272 if not isinstance(ent, CollectionDirectory):
1273 raise llfuse.FUSEError(errno.EPERM)
1275 if name_new in self:
1276 # POSIX semantics for replacing one directory with another is
1277 # tricky (the target directory must be empty, the operation must be
1278 # atomic which isn't possible with the Arvados API as of this
1279 # writing) so don't support that.
1280 raise llfuse.FUSEError(errno.EPERM)
1282 self.api.collections().update(uuid=ent.uuid(),
1283 body={"owner_uuid": self.uuid(),
1284 "name": name_new}).execute(num_retries=self.num_retries)
1286 # Acually move the entry from source directory to this directory.
1287 del src._entries[name_old]
1288 self._entries[name_new] = ent
1289 self.inodes.invalidate_entry(src, name_old)
1292 def child_event(self, ev):
1293 properties = ev.get("properties") or {}
1294 old_attrs = properties.get("old_attributes") or {}
1295 new_attrs = properties.get("new_attributes") or {}
1296 old_attrs["uuid"] = ev["object_uuid"]
1297 new_attrs["uuid"] = ev["object_uuid"]
1298 old_name = self.sanitize_filename(self.namefn(old_attrs))
1299 new_name = self.sanitize_filename(self.namefn(new_attrs))
1301 # create events will have a new name, but not an old name
1302 # delete events will have an old name, but not a new name
1303 # update events will have an old and new name, and they may be same or different
1304 # if they are the same, an unrelated field changed and there is nothing to do.
1306 if old_attrs.get("owner_uuid") != self.project_uuid:
1307 # Was moved from somewhere else, so don't try to remove entry.
1309 if ev.get("object_owner_uuid") != self.project_uuid:
1310 # Was moved to somewhere else, so don't try to add entry
1313 if old_attrs.get("is_trashed"):
1314 # Was previously deleted
1316 if new_attrs.get("is_trashed"):
1320 if new_name != old_name:
1322 if old_name in self._entries:
1323 ent = self._entries[old_name]
1324 del self._entries[old_name]
1325 self.inodes.invalidate_entry(self, old_name)
1329 self._entries[new_name] = ent
1331 self._add_entry(new_attrs, new_name)
1332 elif ent is not None:
1333 self.inodes.del_entry(ent)
1336 class SharedDirectory(Directory):
1337 """A special directory that represents users or groups who have shared projects with me."""
1339 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1340 exclude, poll=False, poll_time=60, storage_classes=None):
1341 super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1343 self.num_retries = num_retries
1344 self.current_user = api.users().current().execute(num_retries=num_retries)
1346 self._poll_time = poll_time
1347 self._updating_lock = threading.Lock()
1348 self.storage_classes = storage_classes
1353 with llfuse.lock_released:
1354 self._updating_lock.acquire()
1355 if not self.stale():
1363 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1364 if 'httpMethod' in methods.get('shared', {}):
1367 resp = self.api.groups().shared(
1369 ['group_class', 'in', ['project','filter']],
1371 *self._filters_for('groups', qualified=False),
1376 include="owner_uuid",
1378 if not resp["items"]:
1380 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1381 for r in resp["items"]:
1382 objects[r["uuid"]] = r
1383 roots.append(r["uuid"])
1384 for r in resp["included"]:
1385 objects[r["uuid"]] = r
1386 root_owners.add(r["uuid"])
1388 all_projects = list(arvados.util.keyset_list_all(
1389 self.api.groups().list,
1391 num_retries=self.num_retries,
1393 ['group_class', 'in', ['project','filter']],
1394 *self._filters_for('groups', qualified=False),
1396 select=["uuid", "owner_uuid"],
1398 for ob in all_projects:
1399 objects[ob['uuid']] = ob
1401 current_uuid = self.current_user['uuid']
1402 for ob in all_projects:
1403 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1404 roots.append(ob['uuid'])
1405 root_owners.add(ob['owner_uuid'])
1407 lusers = arvados.util.keyset_list_all(
1408 self.api.users().list,
1410 num_retries=self.num_retries,
1412 ['uuid', 'in', list(root_owners)],
1413 *self._filters_for('users', qualified=False),
1416 lgroups = arvados.util.keyset_list_all(
1417 self.api.groups().list,
1419 num_retries=self.num_retries,
1421 ['uuid', 'in', list(root_owners)+roots],
1422 *self._filters_for('groups', qualified=False),
1426 objects[l["uuid"]] = l
1428 objects[l["uuid"]] = l
1430 for r in root_owners:
1434 contents[obr["name"]] = obr
1435 elif "first_name" in obr:
1436 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1441 if obr['owner_uuid'] not in objects:
1442 contents[obr["name"]] = obr
1444 # end with llfuse.lock_released, re-acquire lock
1449 lambda a, i: a.uuid() == i[1]['uuid'],
1450 lambda i: ProjectDirectory(
1459 poll_time=self._poll_time,
1460 storage_classes=self.storage_classes,
1464 _logger.exception("arv-mount shared dir error")
1466 self._updating_lock.release()
1468 def want_event_subscribe(self):