1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
15 from apiclient import errors as apiclient_errors
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
23 _logger = logging.getLogger('arvados.arvados_fuse')
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
32 class Directory(FreshBase):
33 """Generic directory object, backed by a dict.
35 Consists of a set of entries with the key representing the filename
36 and the value referencing a File or Directory object.
39 __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
41 def __init__(self, parent_inode, inodes, enable_write, filters):
42 """parent_inode is the integer inode number"""
44 super(Directory, self).__init__()
47 if not isinstance(parent_inode, int):
48 raise Exception("parent_inode should be an int")
49 self.parent_inode = parent_inode
52 self._mtime = time.time()
53 self._enable_write = enable_write
54 self._filters = filters or []
56 def _filters_for(self, subtype, *, qualified):
57 for f in self._filters:
58 f_type, _, f_name = f[0].partition('.')
61 elif f_type != subtype:
66 yield [f_name, *f[1:]]
68 def unsanitize_filename(self, incoming):
69 """Replace ForwardSlashNameSubstitution value with /"""
70 fsns = self.inodes.forward_slash_subst()
71 if isinstance(fsns, str):
72 return incoming.replace(fsns, '/')
76 def sanitize_filename(self, dirty):
77 """Replace disallowed filename characters according to
78 ForwardSlashNameSubstitution in self.api_config."""
79 # '.' and '..' are not reachable if API server is newer than #6277
89 fsns = self.inodes.forward_slash_subst()
90 if isinstance(fsns, str):
91 dirty = dirty.replace('/', fsns)
92 return _disallowed_filename_characters.sub('_', dirty)
95 # Overridden by subclasses to implement logic to update the
96 # entries dict when the directory is stale
101 # Only used when computing the size of the disk footprint of the directory
109 def checkupdate(self):
113 except apiclient.errors.HttpError as e:
118 def __getitem__(self, item):
119 return self._entries[item]
124 return list(self._entries.items())
128 def __contains__(self, k):
129 return k in self._entries
134 return len(self._entries)
137 self.inodes.touch(self)
138 super(Directory, self).fresh()
141 # Rough estimate of memory footprint based on using pympler
142 return len(self._entries) * 1024
144 def merge(self, items, fn, same, new_entry):
145 """Helper method for updating the contents of the directory.
147 Takes a list describing the new contents of the directory, reuse
148 entries that are the same in both the old and new lists, create new
149 entries, and delete old entries missing from the new list.
152 * items: Iterable --- New directory contents
154 * fn: Callable --- Takes an entry in 'items' and return the desired file or
155 directory name, or None if this entry should be skipped
157 * same: Callable --- Compare an existing entry (a File or Directory
158 object) with an entry in the items list to determine whether to keep
161 * new_entry: Callable --- Create a new directory entry (File or Directory
162 object) from an entry in the items list.
166 oldentries = self._entries
170 name = self.sanitize_filename(fn(i))
173 if name in oldentries:
174 ent = oldentries[name]
175 if same(ent, i) and ent.parent_inode == self.inode:
176 # move existing directory entry over
177 self._entries[name] = ent
179 self.inodes.inode_cache.touch(ent)
182 name = self.sanitize_filename(fn(i))
185 if name not in self._entries:
186 # create new directory entry
189 self._entries[name] = self.inodes.add_entry(ent)
190 # need to invalidate this just in case there was a
191 # previous entry that couldn't be moved over or a
192 # lookup that returned file not found and cached
194 self.inodes.invalidate_entry(self, name)
196 _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
198 # delete any other directory entries that were not in found in 'items'
199 for name, ent in oldentries.items():
200 _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
201 self.inodes.invalidate_entry(self, name)
202 self.inodes.del_entry(ent)
206 self._mtime = time.time()
207 self.inodes.inode_cache.update_cache_size(self)
212 if super(Directory, self).in_use():
214 for v in self._entries.values():
220 """Delete all entries"""
221 if not self._entries:
223 oldentries = self._entries
226 for name, ent in oldentries.items():
228 self.inodes.invalidate_entry(self, name)
229 self.inodes.del_entry(ent)
230 self.inodes.inode_cache.update_cache_size(self)
232 def kernel_invalidate(self):
233 # Invalidating the dentry on the parent implies invalidating all paths
235 if self.parent_inode in self.inodes:
236 parent = self.inodes[self.parent_inode]
238 # parent was removed already.
241 # Find self on the parent in order to invalidate this path.
242 # Calling the public items() method might trigger a refresh,
243 # which we definitely don't want, so read the internal dict directly.
244 for k,v in parent._entries.items():
246 self.inodes.invalidate_entry(parent, k)
258 def want_event_subscribe(self):
259 raise NotImplementedError()
261 def create(self, name):
262 raise NotImplementedError()
264 def mkdir(self, name):
265 raise NotImplementedError()
267 def unlink(self, name):
268 raise NotImplementedError()
270 def rmdir(self, name):
271 raise NotImplementedError()
273 def rename(self, name_old, name_new, src):
274 raise NotImplementedError()
277 class CollectionDirectoryBase(Directory):
278 """Represent an Arvados Collection as a directory.
280 This class is used for Subcollections, and is also the base class for
281 CollectionDirectory, which implements collection loading/saving on
284 Most operations act only the underlying Arvados `Collection` object. The
285 `Collection` object signals via a notify callback to
286 `CollectionDirectoryBase.on_event` that an item was added, removed or
287 modified. FUSE inodes and directory entries are created, deleted or
288 invalidated in response to these events.
292 __slots__ = ("collection", "collection_root", "collection_record_file")
294 def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root, poll_time=15):
295 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
296 self.collection = collection
297 self.collection_root = collection_root
298 self.collection_record_file = None
299 self._poll_time = poll_time
301 def new_entry(self, name, item, mtime):
302 name = self.sanitize_filename(name)
303 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
304 if item.fuse_entry.parent_inode is not None:
305 raise Exception("Can only reparent unparented inode entry")
306 if item.fuse_entry.inode is None:
307 raise Exception("Reparented entry must still have valid inode")
308 item.fuse_entry.parent_inode = self.inode
309 self._entries[name] = item.fuse_entry
310 elif isinstance(item, arvados.collection.RichCollectionBase):
311 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
317 self.collection_root,
318 poll_time=self._poll_time
320 self._entries[name].populate(mtime)
322 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime,
324 self._poll, self._poll_time))
325 item.fuse_entry = self._entries[name]
327 def on_event(self, event, collection, name, item):
329 # These are events from the Collection object (ADD/DEL/MOD)
330 # emitted by operations on the Collection object (like
331 # "mkdirs" or "remove"), and by "update", which we need to
332 # synchronize with our FUSE objects that are assigned inodes.
333 if collection != self.collection:
336 name = self.sanitize_filename(name)
339 # It's possible for another thread to have llfuse.lock and
340 # be waiting on collection.lock. Meanwhile, we released
341 # llfuse.lock earlier in the stack, but are still holding
342 # on to the collection lock, and now we need to re-acquire
343 # llfuse.lock. If we don't release the collection lock,
344 # we'll deadlock where we're holding the collection lock
345 # waiting for llfuse.lock and the other thread is holding
346 # llfuse.lock and waiting for the collection lock.
348 # The correct locking order here is to take llfuse.lock
349 # first, then the collection lock.
351 # Since collection.lock is an RLock, it might be locked
352 # multiple times, so we need to release it multiple times,
353 # keep a count, then re-lock it the correct number of
359 self.collection.lock.release()
366 with self.collection.lock:
367 if event == arvados.collection.ADD:
368 self.new_entry(name, item, self.mtime())
369 elif event == arvados.collection.DEL:
370 ent = self._entries[name]
371 del self._entries[name]
372 self.inodes.invalidate_entry(self, name)
373 self.inodes.del_entry(ent)
374 elif event == arvados.collection.MOD:
375 # MOD events have (modified_from, newitem)
378 if hasattr(newitem, "fuse_entry") and newitem.fuse_entry is not None:
379 entry = newitem.fuse_entry
380 elif name in self._entries:
381 entry = self._entries[name]
383 if entry is not None:
385 self.inodes.invalidate_inode(entry)
387 if name in self._entries:
388 self.inodes.invalidate_entry(self, name)
390 # TOK and WRITE events just invalidate the
391 # collection record file.
393 if self.collection_record_file is not None:
394 self.collection_record_file.invalidate()
395 self.inodes.invalidate_inode(self.collection_record_file)
398 self.collection.lock.acquire()
401 def populate(self, mtime):
403 with self.collection.lock:
404 self.collection.subscribe(self.on_event)
405 for entry, item in self.collection.items():
406 self.new_entry(entry, item, self.mtime())
409 return self._enable_write and self.collection.writable()
413 self.collection_root.flush()
417 def create(self, name):
418 if not self.writable():
419 raise llfuse.FUSEError(errno.EROFS)
420 with llfuse.lock_released:
421 self.collection.open(name, "w").close()
425 def mkdir(self, name):
426 if not self.writable():
427 raise llfuse.FUSEError(errno.EROFS)
428 with llfuse.lock_released:
429 self.collection.mkdirs(name)
433 def unlink(self, name):
434 if not self.writable():
435 raise llfuse.FUSEError(errno.EROFS)
436 with llfuse.lock_released:
437 self.collection.remove(name)
442 def rmdir(self, name):
443 if not self.writable():
444 raise llfuse.FUSEError(errno.EROFS)
445 with llfuse.lock_released:
446 self.collection.remove(name)
451 def rename(self, name_old, name_new, src):
452 if not self.writable():
453 raise llfuse.FUSEError(errno.EROFS)
455 if not isinstance(src, CollectionDirectoryBase):
456 raise llfuse.FUSEError(errno.EPERM)
461 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
463 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
465 raise llfuse.FUSEError(errno.ENOTEMPTY)
466 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
467 raise llfuse.FUSEError(errno.ENOTDIR)
468 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
469 raise llfuse.FUSEError(errno.EISDIR)
471 with llfuse.lock_released:
472 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
477 super(CollectionDirectoryBase, self).clear()
478 if self.collection is not None:
479 self.collection.unsubscribe()
480 self.collection = None
483 # objsize for the whole collection is represented at the root,
484 # don't double-count it
487 class CollectionDirectory(CollectionDirectoryBase):
488 """Represents the root of a directory tree representing a collection."""
490 __slots__ = ("api", "num_retries", "collection_locator",
491 "_manifest_size", "_writable", "_updating_lock")
493 def __init__(self, parent_inode, inodes, api, num_retries, enable_write,
494 filters=None, collection_record=None,
496 super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
498 self.num_retries = num_retries
501 if isinstance(collection_record, dict):
502 self.collection_locator = collection_record['uuid']
503 self._mtime = convertTime(collection_record.get('modified_at'))
505 self.collection_locator = collection_record
508 is_uuid = (self.collection_locator is not None) and (uuid_pattern.match(self.collection_locator) is not None)
511 # It is a uuid, it may be updated upstream, so recheck it periodically.
512 self._poll_time = poll_time
514 # It is not a uuid. For immutable collections, collection
515 # only needs to be refreshed if it is very long lived
516 # (long enough that there's a risk of the blob signatures
519 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
521 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
522 self._poll_time = 60*60
524 self._writable = is_uuid and enable_write
525 self._manifest_size = 0
526 self._updating_lock = threading.Lock()
529 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
532 return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
536 with llfuse.lock_released:
537 with self._updating_lock:
538 if self.collection.committed():
539 self.collection.update()
541 self.collection.save()
542 self.new_collection_record(self.collection.api_response())
544 def want_event_subscribe(self):
545 return (uuid_pattern.match(self.collection_locator) is not None)
547 def new_collection(self, new_collection_record, coll_reader):
550 self.collection = coll_reader
551 self.new_collection_record(new_collection_record)
552 self.populate(self.mtime())
554 def new_collection_record(self, new_collection_record):
555 if not new_collection_record:
556 raise Exception("invalid new_collection_record")
557 self._mtime = convertTime(new_collection_record.get('modified_at'))
558 self._manifest_size = len(new_collection_record["manifest_text"])
559 self.collection_locator = new_collection_record["uuid"]
560 if self.collection_record_file is not None:
561 self.collection_record_file.invalidate()
562 self.inodes.invalidate_inode(self.collection_record_file)
563 _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
564 self.collection_record_file.inode)
565 self.inodes.update_uuid(self)
566 self.inodes.inode_cache.update_cache_size(self)
570 return self.collection_locator
575 if self.collection_locator is None:
576 # No collection locator to retrieve from
580 new_collection_record = None
582 with llfuse.lock_released:
583 self._updating_lock.acquire()
587 _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
589 if self.collection is not None:
590 # Already have a collection object
591 self.collection.update()
592 new_collection_record = self.collection.api_response()
594 # Create a new collection object
595 if uuid_pattern.match(self.collection_locator):
596 coll_reader = arvados.collection.Collection(
597 self.collection_locator, self.api, self.api.keep,
598 num_retries=self.num_retries)
600 coll_reader = arvados.collection.CollectionReader(
601 self.collection_locator, self.api, self.api.keep,
602 num_retries=self.num_retries)
603 new_collection_record = coll_reader.api_response() or {}
604 # If the Collection only exists in Keep, there will be no API
605 # response. Fill in the fields we need.
606 if 'uuid' not in new_collection_record:
607 new_collection_record['uuid'] = self.collection_locator
608 if "portable_data_hash" not in new_collection_record:
609 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
610 if 'manifest_text' not in new_collection_record:
611 new_collection_record['manifest_text'] = coll_reader.manifest_text()
612 if 'storage_classes_desired' not in new_collection_record:
613 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
615 # end with llfuse.lock_released, re-acquire lock
617 if new_collection_record is not None:
618 if coll_reader is not None:
619 self.new_collection(new_collection_record, coll_reader)
621 self.new_collection_record(new_collection_record)
625 self._updating_lock.release()
626 except arvados.errors.NotFoundError as e:
627 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
628 except arvados.errors.ArgumentError as detail:
629 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
630 if new_collection_record is not None and "manifest_text" in new_collection_record:
631 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
633 _logger.exception("arv-mount %s: error", self.collection_locator)
634 if new_collection_record is not None and "manifest_text" in new_collection_record:
635 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
641 def collection_record(self):
643 return self.collection.api_response()
647 def __getitem__(self, item):
648 if item == '.arvados#collection':
649 if self.collection_record_file is None:
650 self.collection_record_file = FuncToJSONFile(
651 self.inode, self.collection_record)
652 self.inodes.add_entry(self.collection_record_file)
653 self.invalidate() # use lookup as a signal to force update
654 return self.collection_record_file
656 return super(CollectionDirectory, self).__getitem__(item)
658 def __contains__(self, k):
659 if k == '.arvados#collection':
662 return super(CollectionDirectory, self).__contains__(k)
664 def invalidate(self):
665 if self.collection_record_file is not None:
666 self.collection_record_file.invalidate()
667 self.inodes.invalidate_inode(self.collection_record_file)
668 super(CollectionDirectory, self).invalidate()
671 return (self.collection_locator is not None)
674 # This is a rough guess of the amount of overhead involved for
675 # a collection; the assumptions are that that each file
676 # averages 128 bytes in the manifest, but consume 1024 bytes
677 # of Python data structures, so 1024/128=8 means we estimate
678 # the RAM footprint at 8 times the size of bare manifest text.
679 return self._manifest_size * 8
682 if self.collection is None:
687 self.collection.save()
688 except Exception as e:
689 _logger.exception("Failed to save collection %s", self.collection_locator)
690 self.collection.stop_threads()
693 if self.collection is not None:
694 self.collection.stop_threads()
695 self._manifest_size = 0
696 super(CollectionDirectory, self).clear()
697 if self.collection_record_file is not None:
698 self.inodes.del_entry(self.collection_record_file)
699 self.collection_record_file = None
702 class TmpCollectionDirectory(CollectionDirectoryBase):
703 """A directory backed by an Arvados collection that never gets saved.
705 This supports using Keep as scratch space. A userspace program can
706 read the .arvados#collection file to get a current manifest in
707 order to save a snapshot of the scratch data or use it as a crunch
711 class UnsaveableCollection(arvados.collection.Collection):
717 def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
718 collection = self.UnsaveableCollection(
719 api_client=api_client,
720 keep_client=api_client.keep,
721 num_retries=num_retries,
722 storage_classes_desired=storage_classes)
723 # This is always enable_write=True because it never tries to
724 # save to the backend
725 super(TmpCollectionDirectory, self).__init__(
726 parent_inode, inodes, True, filters, collection, self)
728 self.populate(self.mtime())
730 def collection_record(self):
731 with llfuse.lock_released:
734 "manifest_text": self.collection.manifest_text(),
735 "portable_data_hash": self.collection.portable_data_hash(),
736 "storage_classes_desired": self.collection.storage_classes_desired(),
739 def __contains__(self, k):
740 return (k == '.arvados#collection' or
741 super(TmpCollectionDirectory, self).__contains__(k))
744 def __getitem__(self, item):
745 if item == '.arvados#collection':
746 if self.collection_record_file is None:
747 self.collection_record_file = FuncToJSONFile(
748 self.inode, self.collection_record)
749 self.inodes.add_entry(self.collection_record_file)
750 return self.collection_record_file
751 return super(TmpCollectionDirectory, self).__getitem__(item)
762 def want_event_subscribe(self):
766 self.collection.stop_threads()
768 def invalidate(self):
769 if self.collection_record_file:
770 self.collection_record_file.invalidate()
771 super(TmpCollectionDirectory, self).invalidate()
774 class MagicDirectory(Directory):
775 """A special directory that logically contains the set of all extant keep locators.
777 When a file is referenced by lookup(), it is tested to see if it is a valid
778 keep locator to a manifest, and if so, loads the manifest contents as a
779 subdirectory of this directory with the locator as the directory name.
780 Since querying a list of all extant keep locators is impractical, only
781 collections that have already been accessed are visible to readdir().
786 This directory provides access to Arvados collections as subdirectories listed
787 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
788 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
789 (in the form 'zzzzz-j7d0g-1234567890abcde').
791 Note that this directory will appear empty until you attempt to access a
792 specific collection or project subdirectory (such as trying to 'cd' into it),
793 at which point the collection or project will actually be looked up on the server
794 and the directory will appear if it exists.
798 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
799 pdh_only=False, storage_classes=None, poll_time=15):
800 super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
802 self.num_retries = num_retries
803 self.pdh_only = pdh_only
804 self.storage_classes = storage_classes
806 self._poll_time = poll_time
808 def __setattr__(self, name, value):
809 super(MagicDirectory, self).__setattr__(name, value)
810 # When we're assigned an inode, add a README.
811 if ((name == 'inode') and (self.inode is not None) and
812 (not self._entries)):
813 self._entries['README'] = self.inodes.add_entry(
814 StringFile(self.inode, self.README_TEXT, time.time()))
815 # If we're the root directory, add an identical by_id subdirectory.
816 if self.inode == llfuse.ROOT_INODE:
817 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
824 pdh_only=self.pdh_only,
825 storage_classes=self.storage_classes,
826 poll_time=self._poll_time
829 def __contains__(self, k):
830 if k in self._entries:
833 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
839 if group_uuid_pattern.match(k):
840 project = self.api.groups().list(
842 ['group_class', 'in', ['project','filter']],
844 *self._filters_for('groups', qualified=False),
846 ).execute(num_retries=self.num_retries)
847 if project[u'items_available'] == 0:
849 e = self.inodes.add_entry(ProjectDirectory(
856 project[u'items'][0],
857 storage_classes=self.storage_classes,
858 poll_time=self._poll_time
861 e = self.inodes.add_entry(CollectionDirectory(
869 poll_time=self._poll_time
873 if k not in self._entries:
876 self.inodes.del_entry(e)
879 self.inodes.invalidate_entry(self, k)
880 self.inodes.del_entry(e)
882 except Exception as ex:
883 _logger.exception("arv-mount lookup '%s':", k)
885 self.inodes.del_entry(e)
888 def __getitem__(self, item):
890 return self._entries[item]
892 raise KeyError("No collection with id " + item)
897 def want_event_subscribe(self):
898 return not self.pdh_only
901 class TagsDirectory(Directory):
902 """A special directory that contains as subdirectories all tags visible to the user."""
904 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
905 super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
907 self.num_retries = num_retries
909 self._poll_time = poll_time
912 def want_event_subscribe(self):
917 with llfuse.lock_released:
918 tags = self.api.links().list(
920 ['link_class', '=', 'tag'],
922 *self._filters_for('links', qualified=False),
927 ).execute(num_retries=self.num_retries)
930 tags['items']+[{"name": n} for n in self._extra],
932 lambda a, i: a.tag == i['name'],
933 lambda i: TagDirectory(
942 poll_time=self._poll_time,
948 def __getitem__(self, item):
949 if super(TagsDirectory, self).__contains__(item):
950 return super(TagsDirectory, self).__getitem__(item)
951 with llfuse.lock_released:
952 tags = self.api.links().list(
954 ['link_class', '=', 'tag'],
956 *self._filters_for('links', qualified=False),
959 ).execute(num_retries=self.num_retries)
961 self._extra.add(item)
963 return super(TagsDirectory, self).__getitem__(item)
967 def __contains__(self, k):
968 if super(TagsDirectory, self).__contains__(k):
978 class TagDirectory(Directory):
979 """A special directory that contains as subdirectories all collections visible
980 to the user that are tagged with a particular tag.
983 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
984 poll=False, poll_time=60):
985 super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
987 self.num_retries = num_retries
990 self._poll_time = poll_time
992 def want_event_subscribe(self):
997 with llfuse.lock_released:
998 taggedcollections = self.api.links().list(
1000 ['link_class', '=', 'tag'],
1001 ['name', '=', self.tag],
1002 ['head_uuid', 'is_a', 'arvados#collection'],
1003 *self._filters_for('links', qualified=False),
1005 select=['head_uuid'],
1006 ).execute(num_retries=self.num_retries)
1008 taggedcollections['items'],
1009 lambda i: i['head_uuid'],
1010 lambda a, i: a.collection_locator == i['head_uuid'],
1011 lambda i: CollectionDirectory(
1023 class ProjectDirectory(Directory):
1024 """A special directory that contains the contents of a project."""
1026 __slots__ = ("api", "num_retries", "project_object", "project_object_file",
1027 "project_uuid", "_updating_lock",
1028 "_current_user", "_full_listing", "storage_classes", "recursively_contained")
1030 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1031 project_object, poll_time=15, storage_classes=None):
1032 super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1034 self.num_retries = num_retries
1035 self.project_object = project_object
1036 self.project_object_file = None
1037 self.project_uuid = project_object['uuid']
1039 self._poll_time = poll_time
1040 self._updating_lock = threading.Lock()
1041 self._current_user = None
1042 self._full_listing = False
1043 self.storage_classes = storage_classes
1044 self.recursively_contained = False
1046 # Filter groups can contain themselves, which causes tools
1047 # that walk the filesystem to get stuck in an infinite loop,
1048 # so suppress returning a listing in that case.
1049 if self.project_object.get("group_class") == "filter":
1050 iter_parent_inode = parent_inode
1051 while iter_parent_inode != llfuse.ROOT_INODE:
1052 parent_dir = self.inodes[iter_parent_inode]
1053 if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
1054 self.recursively_contained = True
1056 iter_parent_inode = parent_dir.parent_inode
1058 def want_event_subscribe(self):
1061 def createDirectory(self, i):
1062 common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1063 if collection_uuid_pattern.match(i['uuid']):
1064 return CollectionDirectory(*common_args, i, poll_time=self._poll_time)
1065 elif group_uuid_pattern.match(i['uuid']):
1066 return ProjectDirectory(*common_args, i, poll_time=self._poll_time,
1067 storage_classes=self.storage_classes)
1068 elif link_uuid_pattern.match(i['uuid']):
1069 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1070 return CollectionDirectory(*common_args, i['head_uuid'], poll_time=self._poll_time)
1073 elif uuid_pattern.match(i['uuid']):
1074 return ObjectFile(self.parent_inode, i)
1079 return self.project_uuid
1082 self._full_listing = True
1083 return super(ProjectDirectory, self).items()
1085 def namefn(self, i):
1087 if i['name'] is None or len(i['name']) == 0:
1089 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1090 # collection or subproject
1092 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1095 elif 'kind' in i and i['kind'].startswith('arvados#'):
1097 return "{}.{}".format(i['name'], i['kind'][8:])
1104 if self.project_object_file == None:
1105 self.project_object_file = ObjectFile(self.inode, self.project_object)
1106 self.inodes.add_entry(self.project_object_file)
1108 if self.recursively_contained or not self._full_listing:
1112 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1113 return a.uuid() == i['uuid']
1114 elif isinstance(a, ObjectFile):
1115 return a.uuid() == i['uuid'] and not a.stale()
1119 with llfuse.lock_released:
1120 self._updating_lock.acquire()
1121 if not self.stale():
1124 if group_uuid_pattern.match(self.project_uuid):
1125 self.project_object = self.api.groups().get(
1126 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1127 elif user_uuid_pattern.match(self.project_uuid):
1128 self.project_object = self.api.users().get(
1129 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1130 # do this in 2 steps until #17424 is fixed
1131 contents = list(arvados.util.keyset_list_all(
1132 self.api.groups().contents,
1134 num_retries=self.num_retries,
1135 uuid=self.project_uuid,
1137 ['uuid', 'is_a', 'arvados#group'],
1138 ['groups.group_class', 'in', ['project', 'filter']],
1139 *self._filters_for('groups', qualified=True),
1142 contents.extend(obj for obj in arvados.util.keyset_list_all(
1143 self.api.groups().contents,
1145 num_retries=self.num_retries,
1146 uuid=self.project_uuid,
1148 ['uuid', 'is_a', 'arvados#collection'],
1149 *self._filters_for('collections', qualified=True),
1151 ) if obj['current_version_uuid'] == obj['uuid'])
1152 # end with llfuse.lock_released, re-acquire lock
1154 self.merge(contents,
1157 self.createDirectory)
1160 self._updating_lock.release()
1162 def _add_entry(self, i, name):
1163 ent = self.createDirectory(i)
1164 self._entries[name] = self.inodes.add_entry(ent)
1165 return self._entries[name]
1169 def __getitem__(self, k):
1170 if k == '.arvados#project':
1171 return self.project_object_file
1172 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1173 return super(ProjectDirectory, self).__getitem__(k)
1174 with llfuse.lock_released:
1175 k2 = self.unsanitize_filename(k)
1177 namefilter = ["name", "=", k]
1179 namefilter = ["name", "in", [k, k2]]
1180 contents = self.api.groups().list(
1182 ["owner_uuid", "=", self.project_uuid],
1183 ["group_class", "in", ["project","filter"]],
1185 *self._filters_for('groups', qualified=False),
1188 ).execute(num_retries=self.num_retries)["items"]
1190 contents = self.api.collections().list(
1192 ["owner_uuid", "=", self.project_uuid],
1194 *self._filters_for('collections', qualified=False),
1197 ).execute(num_retries=self.num_retries)["items"]
1199 if len(contents) > 1 and contents[1]['name'] == k:
1200 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1202 contents = [contents[1]]
1203 name = self.sanitize_filename(self.namefn(contents[0]))
1206 return self._add_entry(contents[0], name)
1211 def __contains__(self, k):
1212 if k == '.arvados#project':
1224 if not self._enable_write:
1226 return self.project_object.get("can_write") is True
1228 def persisted(self):
1232 super(ProjectDirectory, self).clear()
1233 if self.project_object_file is not None:
1234 self.inodes.del_entry(self.project_object_file)
1235 self.project_object_file = None
1239 def mkdir(self, name):
1240 if not self.writable():
1241 raise llfuse.FUSEError(errno.EROFS)
1244 with llfuse.lock_released:
1246 "owner_uuid": self.project_uuid,
1248 "manifest_text": "" }
1249 if self.storage_classes is not None:
1250 c["storage_classes_desired"] = self.storage_classes
1252 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1253 except Exception as e:
1256 except apiclient_errors.Error as error:
1257 _logger.error(error)
1258 raise llfuse.FUSEError(errno.EEXIST)
1262 def rmdir(self, name):
1263 if not self.writable():
1264 raise llfuse.FUSEError(errno.EROFS)
1266 if name not in self:
1267 raise llfuse.FUSEError(errno.ENOENT)
1268 if not isinstance(self[name], CollectionDirectory):
1269 raise llfuse.FUSEError(errno.EPERM)
1270 if len(self[name]) > 0:
1271 raise llfuse.FUSEError(errno.ENOTEMPTY)
1272 with llfuse.lock_released:
1273 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1278 def rename(self, name_old, name_new, src):
1279 if not self.writable():
1280 raise llfuse.FUSEError(errno.EROFS)
1282 if not isinstance(src, ProjectDirectory):
1283 raise llfuse.FUSEError(errno.EPERM)
1287 if not isinstance(ent, CollectionDirectory):
1288 raise llfuse.FUSEError(errno.EPERM)
1290 if name_new in self:
1291 # POSIX semantics for replacing one directory with another is
1292 # tricky (the target directory must be empty, the operation must be
1293 # atomic which isn't possible with the Arvados API as of this
1294 # writing) so don't support that.
1295 raise llfuse.FUSEError(errno.EPERM)
1297 self.api.collections().update(uuid=ent.uuid(),
1298 body={"owner_uuid": self.uuid(),
1299 "name": name_new}).execute(num_retries=self.num_retries)
1301 # Acually move the entry from source directory to this directory.
1302 del src._entries[name_old]
1303 self._entries[name_new] = ent
1304 self.inodes.invalidate_entry(src, name_old)
1307 def child_event(self, ev):
1308 properties = ev.get("properties") or {}
1309 old_attrs = properties.get("old_attributes") or {}
1310 new_attrs = properties.get("new_attributes") or {}
1311 old_attrs["uuid"] = ev["object_uuid"]
1312 new_attrs["uuid"] = ev["object_uuid"]
1313 old_name = self.sanitize_filename(self.namefn(old_attrs))
1314 new_name = self.sanitize_filename(self.namefn(new_attrs))
1316 # create events will have a new name, but not an old name
1317 # delete events will have an old name, but not a new name
1318 # update events will have an old and new name, and they may be same or different
1319 # if they are the same, an unrelated field changed and there is nothing to do.
1321 if old_attrs.get("owner_uuid") != self.project_uuid:
1322 # Was moved from somewhere else, so don't try to remove entry.
1324 if ev.get("object_owner_uuid") != self.project_uuid:
1325 # Was moved to somewhere else, so don't try to add entry
1328 if old_attrs.get("is_trashed"):
1329 # Was previously deleted
1331 if new_attrs.get("is_trashed"):
1335 if new_name != old_name:
1337 if old_name in self._entries:
1338 ent = self._entries[old_name]
1339 del self._entries[old_name]
1340 self.inodes.invalidate_entry(self, old_name)
1344 self._entries[new_name] = ent
1346 self._add_entry(new_attrs, new_name)
1347 elif ent is not None:
1348 self.inodes.del_entry(ent)
1351 class SharedDirectory(Directory):
1352 """A special directory that represents users or groups who have shared projects with me."""
1354 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1355 exclude, poll_time=60, storage_classes=None):
1356 super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1358 self.num_retries = num_retries
1359 self.current_user = api.users().current().execute(num_retries=num_retries)
1361 self._poll_time = poll_time
1362 self._updating_lock = threading.Lock()
1363 self.storage_classes = storage_classes
1368 with llfuse.lock_released:
1369 self._updating_lock.acquire()
1370 if not self.stale():
1378 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1379 if 'httpMethod' in methods.get('shared', {}):
1382 resp = self.api.groups().shared(
1384 ['group_class', 'in', ['project','filter']],
1386 *self._filters_for('groups', qualified=False),
1391 include="owner_uuid",
1393 if not resp["items"]:
1395 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1396 for r in resp["items"]:
1397 objects[r["uuid"]] = r
1398 roots.append(r["uuid"])
1399 for r in resp["included"]:
1400 objects[r["uuid"]] = r
1401 root_owners.add(r["uuid"])
1403 all_projects = list(arvados.util.keyset_list_all(
1404 self.api.groups().list,
1406 num_retries=self.num_retries,
1408 ['group_class', 'in', ['project','filter']],
1409 *self._filters_for('groups', qualified=False),
1411 select=["uuid", "owner_uuid"],
1413 for ob in all_projects:
1414 objects[ob['uuid']] = ob
1416 current_uuid = self.current_user['uuid']
1417 for ob in all_projects:
1418 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1419 roots.append(ob['uuid'])
1420 root_owners.add(ob['owner_uuid'])
1422 lusers = arvados.util.keyset_list_all(
1423 self.api.users().list,
1425 num_retries=self.num_retries,
1427 ['uuid', 'in', list(root_owners)],
1428 *self._filters_for('users', qualified=False),
1431 lgroups = arvados.util.keyset_list_all(
1432 self.api.groups().list,
1434 num_retries=self.num_retries,
1436 ['uuid', 'in', list(root_owners)+roots],
1437 *self._filters_for('groups', qualified=False),
1441 objects[l["uuid"]] = l
1443 objects[l["uuid"]] = l
1445 for r in root_owners:
1449 contents[obr["name"]] = obr
1450 elif "first_name" in obr:
1451 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1456 if obr['owner_uuid'] not in objects:
1457 contents[obr["name"]] = obr
1459 # end with llfuse.lock_released, re-acquire lock
1464 lambda a, i: a.uuid() == i[1]['uuid'],
1465 lambda i: ProjectDirectory(
1473 poll_time=self._poll_time,
1474 storage_classes=self.storage_classes,
1478 _logger.exception("arv-mount shared dir error")
1480 self._updating_lock.release()
1482 def want_event_subscribe(self):