1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
15 from apiclient import errors as apiclient_errors
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
23 _logger = logging.getLogger('arvados.arvados_fuse')
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
32 class Directory(FreshBase):
33 """Generic directory object, backed by a dict.
35 Consists of a set of entries with the key representing the filename
36 and the value referencing a File or Directory object.
39 __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
41 def __init__(self, parent_inode, inodes, enable_write, filters):
42 """parent_inode is the integer inode number"""
44 super(Directory, self).__init__()
47 if not isinstance(parent_inode, int):
48 raise Exception("parent_inode should be an int")
49 self.parent_inode = parent_inode
52 self._mtime = time.time()
53 self._enable_write = enable_write
54 self._filters = filters or []
56 def _filters_for(self, subtype, *, qualified):
57 for f in self._filters:
58 f_type, _, f_name = f[0].partition('.')
61 elif f_type != subtype:
66 yield [f_name, *f[1:]]
68 def unsanitize_filename(self, incoming):
69 """Replace ForwardSlashNameSubstitution value with /"""
70 fsns = self.inodes.forward_slash_subst()
71 if isinstance(fsns, str):
72 return incoming.replace(fsns, '/')
76 def sanitize_filename(self, dirty):
77 """Replace disallowed filename characters according to
78 ForwardSlashNameSubstitution in self.api_config."""
79 # '.' and '..' are not reachable if API server is newer than #6277
89 fsns = self.inodes.forward_slash_subst()
90 if isinstance(fsns, str):
91 dirty = dirty.replace('/', fsns)
92 return _disallowed_filename_characters.sub('_', dirty)
95 # Overridden by subclasses to implement logic to update the
96 # entries dict when the directory is stale
101 # Only used when computing the size of the disk footprint of the directory
109 def checkupdate(self):
113 except apiclient.errors.HttpError as e:
118 def __getitem__(self, item):
119 return self._entries[item]
124 return list(self._entries.items())
128 def __contains__(self, k):
129 return k in self._entries
134 return len(self._entries)
137 self.inodes.touch(self)
138 super(Directory, self).fresh()
141 # Rough estimate of memory footprint based on using pympler
142 return len(self._entries) * 1024
144 def merge(self, items, fn, same, new_entry):
145 """Helper method for updating the contents of the directory.
147 Takes a list describing the new contents of the directory, reuse
148 entries that are the same in both the old and new lists, create new
149 entries, and delete old entries missing from the new list.
152 * items: Iterable --- New directory contents
154 * fn: Callable --- Takes an entry in 'items' and return the desired file or
155 directory name, or None if this entry should be skipped
157 * same: Callable --- Compare an existing entry (a File or Directory
158 object) with an entry in the items list to determine whether to keep
161 * new_entry: Callable --- Create a new directory entry (File or Directory
162 object) from an entry in the items list.
166 oldentries = self._entries
170 name = self.sanitize_filename(fn(i))
173 if name in oldentries:
174 ent = oldentries[name]
175 if same(ent, i) and ent.parent_inode == self.inode:
176 # move existing directory entry over
177 self._entries[name] = ent
179 self.inodes.inode_cache.touch(ent)
182 name = self.sanitize_filename(fn(i))
185 if name not in self._entries:
186 # create new directory entry
189 self._entries[name] = self.inodes.add_entry(ent)
190 # need to invalidate this just in case there was a
191 # previous entry that couldn't be moved over or a
192 # lookup that returned file not found and cached
194 self.inodes.invalidate_entry(self, name)
196 _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
198 # delete any other directory entries that were not in found in 'items'
199 for name, ent in oldentries.items():
200 _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
201 self.inodes.invalidate_entry(self, name)
202 self.inodes.del_entry(ent)
206 self._mtime = time.time()
207 self.inodes.inode_cache.update_cache_size(self)
212 if super(Directory, self).in_use():
214 for v in self._entries.values():
220 """Delete all entries"""
221 if not self._entries:
223 oldentries = self._entries
226 for name, ent in oldentries.items():
228 self.inodes.invalidate_entry(self, name)
229 self.inodes.del_entry(ent)
230 self.inodes.inode_cache.update_cache_size(self)
232 def kernel_invalidate(self):
233 # Invalidating the dentry on the parent implies invalidating all paths
235 if self.parent_inode in self.inodes:
236 parent = self.inodes[self.parent_inode]
238 # parent was removed already.
241 # Find self on the parent in order to invalidate this path.
242 # Calling the public items() method might trigger a refresh,
243 # which we definitely don't want, so read the internal dict directly.
244 for k,v in parent._entries.items():
246 self.inodes.invalidate_entry(parent, k)
258 def want_event_subscribe(self):
259 raise NotImplementedError()
261 def create(self, name):
262 raise NotImplementedError()
264 def mkdir(self, name):
265 raise NotImplementedError()
267 def unlink(self, name):
268 raise NotImplementedError()
270 def rmdir(self, name):
271 raise NotImplementedError()
273 def rename(self, name_old, name_new, src):
274 raise NotImplementedError()
277 class CollectionDirectoryBase(Directory):
278 """Represent an Arvados Collection as a directory.
280 This class is used for Subcollections, and is also the base class for
281 CollectionDirectory, which implements collection loading/saving on
284 Most operations act only the underlying Arvados `Collection` object. The
285 `Collection` object signals via a notify callback to
286 `CollectionDirectoryBase.on_event` that an item was added, removed or
287 modified. FUSE inodes and directory entries are created, deleted or
288 invalidated in response to these events.
292 __slots__ = ("collection", "collection_root", "collection_record_file")
294 def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
295 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
296 self.collection = collection
297 self.collection_root = collection_root
298 self.collection_record_file = None
300 def new_entry(self, name, item, mtime):
301 name = self.sanitize_filename(name)
302 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
303 if item.fuse_entry.parent_inode is not None:
304 raise Exception("Can only reparent unparented inode entry")
305 if item.fuse_entry.inode is None:
306 raise Exception("Reparented entry must still have valid inode")
307 item.fuse_entry.parent_inode = self.inode
308 self._entries[name] = item.fuse_entry
309 elif isinstance(item, arvados.collection.RichCollectionBase):
310 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
316 self.collection_root,
318 self._entries[name].populate(mtime)
320 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write, self._poll, self._poll_time))
321 item.fuse_entry = self._entries[name]
323 def on_event(self, event, collection, name, item):
324 if event in (arvados.collection.TOK, arvados.collection.WRITE):
325 # We don't care about TOK events, that means only token
326 # signatures were updated, and WRITE events were initiated
330 # These are events from the Collection object (ADD/DEL/MOD)
331 # emitted by operations on the Collection object (like
332 # "mkdirs" or "remove"), and by "update", which we need to
333 # synchronize with our FUSE objects that are assigned inodes.
334 if collection != self.collection:
337 name = self.sanitize_filename(name)
340 # It's possible for another thread to have llfuse.lock and
341 # be waiting on collection.lock. Meanwhile, we released
342 # llfuse.lock earlier in the stack, but are still holding
343 # on to the collection lock, and now we need to re-acquire
344 # llfuse.lock. If we don't release the collection lock,
345 # we'll deadlock where we're holding the collection lock
346 # waiting for llfuse.lock and the other thread is holding
347 # llfuse.lock and waiting for the collection lock.
349 # The correct locking order here is to take llfuse.lock
350 # first, then the collection lock.
352 # Since collection.lock is an RLock, it might be locked
353 # multiple times, so we need to release it multiple times,
354 # keep a count, then re-lock it the correct number of
360 self.collection.lock.release()
367 with self.collection.lock:
368 if event == arvados.collection.ADD:
369 self.new_entry(name, item, self.mtime())
370 elif event == arvados.collection.DEL:
371 ent = self._entries[name]
372 del self._entries[name]
373 self.inodes.invalidate_entry(self, name)
374 self.inodes.del_entry(ent)
375 elif event == arvados.collection.MOD:
376 # MOD events have (modified_from, newitem)
379 if hasattr(newitem, "fuse_entry") and newitem.fuse_entry is not None:
380 entry = newitem.fuse_entry
381 elif name in self._entries:
382 entry = self._entries[name]
384 if entry is not None:
386 self.inodes.invalidate_inode(entry)
388 if name in self._entries:
389 self.inodes.invalidate_entry(self, name)
391 # we don't care about TOK events, those mean
392 # only token signatures were updated
394 if self.collection_record_file is not None:
395 self.collection_record_file.invalidate()
396 self.inodes.invalidate_inode(self.collection_record_file)
399 self.collection.lock.acquire()
402 def populate(self, mtime):
404 with self.collection.lock:
405 self.collection.subscribe(self.on_event)
406 for entry, item in self.collection.items():
407 self.new_entry(entry, item, self.mtime())
410 return self._enable_write and self.collection.writable()
414 self.collection_root.flush()
418 def create(self, name):
419 if not self.writable():
420 raise llfuse.FUSEError(errno.EROFS)
421 with llfuse.lock_released:
422 self.collection.open(name, "w").close()
426 def mkdir(self, name):
427 if not self.writable():
428 raise llfuse.FUSEError(errno.EROFS)
429 with llfuse.lock_released:
430 self.collection.mkdirs(name)
434 def unlink(self, name):
435 if not self.writable():
436 raise llfuse.FUSEError(errno.EROFS)
437 with llfuse.lock_released:
438 self.collection.remove(name)
443 def rmdir(self, name):
444 if not self.writable():
445 raise llfuse.FUSEError(errno.EROFS)
446 with llfuse.lock_released:
447 self.collection.remove(name)
452 def rename(self, name_old, name_new, src):
453 if not self.writable():
454 raise llfuse.FUSEError(errno.EROFS)
456 if not isinstance(src, CollectionDirectoryBase):
457 raise llfuse.FUSEError(errno.EPERM)
462 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
464 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
466 raise llfuse.FUSEError(errno.ENOTEMPTY)
467 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
468 raise llfuse.FUSEError(errno.ENOTDIR)
469 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
470 raise llfuse.FUSEError(errno.EISDIR)
472 with llfuse.lock_released:
473 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
478 super(CollectionDirectoryBase, self).clear()
479 if self.collection is not None:
480 self.collection.unsubscribe()
481 self.collection = None
484 # objsize for the whole collection is represented at the root,
485 # don't double-count it
488 class CollectionDirectory(CollectionDirectoryBase):
489 """Represents the root of a directory tree representing a collection."""
491 __slots__ = ("api", "num_retries", "collection_locator",
492 "_manifest_size", "_writable", "_updating_lock")
494 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
495 super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
497 self.num_retries = num_retries
500 if isinstance(collection_record, dict):
501 self.collection_locator = collection_record['uuid']
502 self._mtime = convertTime(collection_record.get('modified_at'))
504 self.collection_locator = collection_record
507 is_uuid = (self.collection_locator is not None) and (uuid_pattern.match(self.collection_locator) is not None)
510 # It is a uuid, it may be updated upstream, so recheck it periodically.
513 # It is not a uuid. For immutable collections, collection
514 # only needs to be refreshed if it is very long lived
515 # (long enough that there's a risk of the blob signatures
518 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
520 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
521 self._poll_time = 60*60
523 self._writable = is_uuid and enable_write
524 self._manifest_size = 0
525 self._updating_lock = threading.Lock()
528 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
531 return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
535 with llfuse.lock_released:
536 with self._updating_lock:
537 if self.collection.committed():
538 self.collection.update()
540 self.collection.save()
541 self.new_collection_record(self.collection.api_response())
543 def want_event_subscribe(self):
544 return (uuid_pattern.match(self.collection_locator) is not None)
546 def new_collection(self, new_collection_record, coll_reader):
549 self.collection = coll_reader
550 self.new_collection_record(new_collection_record)
551 self.populate(self.mtime())
553 def new_collection_record(self, new_collection_record):
554 if not new_collection_record:
555 raise Exception("invalid new_collection_record")
556 self._mtime = convertTime(new_collection_record.get('modified_at'))
557 self._manifest_size = len(new_collection_record["manifest_text"])
558 self.collection_locator = new_collection_record["uuid"]
559 if self.collection_record_file is not None:
560 self.collection_record_file.invalidate()
561 self.inodes.invalidate_inode(self.collection_record_file)
562 _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
563 self.collection_record_file.inode)
564 self.inodes.update_uuid(self)
565 self.inodes.inode_cache.update_cache_size(self)
569 return self.collection_locator
574 if self.collection_locator is None:
575 # No collection locator to retrieve from
579 new_collection_record = None
581 with llfuse.lock_released:
582 self._updating_lock.acquire()
586 _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
588 if self.collection is not None:
589 # Already have a collection object
590 self.collection.update()
591 new_collection_record = self.collection.api_response()
593 # Create a new collection object
594 if uuid_pattern.match(self.collection_locator):
595 coll_reader = arvados.collection.Collection(
596 self.collection_locator, self.api, self.api.keep,
597 num_retries=self.num_retries)
599 coll_reader = arvados.collection.CollectionReader(
600 self.collection_locator, self.api, self.api.keep,
601 num_retries=self.num_retries)
602 new_collection_record = coll_reader.api_response() or {}
603 # If the Collection only exists in Keep, there will be no API
604 # response. Fill in the fields we need.
605 if 'uuid' not in new_collection_record:
606 new_collection_record['uuid'] = self.collection_locator
607 if "portable_data_hash" not in new_collection_record:
608 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
609 if 'manifest_text' not in new_collection_record:
610 new_collection_record['manifest_text'] = coll_reader.manifest_text()
611 if 'storage_classes_desired' not in new_collection_record:
612 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
614 # end with llfuse.lock_released, re-acquire lock
616 if new_collection_record is not None:
617 if coll_reader is not None:
618 self.new_collection(new_collection_record, coll_reader)
620 self.new_collection_record(new_collection_record)
624 self._updating_lock.release()
625 except arvados.errors.NotFoundError as e:
626 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
627 except arvados.errors.ArgumentError as detail:
628 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
629 if new_collection_record is not None and "manifest_text" in new_collection_record:
630 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
632 _logger.exception("arv-mount %s: error", self.collection_locator)
633 if new_collection_record is not None and "manifest_text" in new_collection_record:
634 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
640 def collection_record(self):
642 return self.collection.api_response()
646 def __getitem__(self, item):
647 if item == '.arvados#collection':
648 if self.collection_record_file is None:
649 self.collection_record_file = FuncToJSONFile(
650 self.inode, self.collection_record)
651 self.inodes.add_entry(self.collection_record_file)
652 self.invalidate() # use lookup as a signal to force update
653 return self.collection_record_file
655 return super(CollectionDirectory, self).__getitem__(item)
657 def __contains__(self, k):
658 if k == '.arvados#collection':
661 return super(CollectionDirectory, self).__contains__(k)
663 def invalidate(self):
664 if self.collection_record_file is not None:
665 self.collection_record_file.invalidate()
666 self.inodes.invalidate_inode(self.collection_record_file)
667 super(CollectionDirectory, self).invalidate()
670 return (self.collection_locator is not None)
673 # This is a rough guess of the amount of overhead involved for
674 # a collection; the assumptions are that that each file
675 # averages 128 bytes in the manifest, but consume 1024 bytes
676 # of Python data structures, so 1024/128=8 means we estimate
677 # the RAM footprint at 8 times the size of bare manifest text.
678 return self._manifest_size * 8
681 if self.collection is None:
686 self.collection.save()
687 except Exception as e:
688 _logger.exception("Failed to save collection %s", self.collection_locator)
689 self.collection.stop_threads()
692 if self.collection is not None:
693 self.collection.stop_threads()
694 self._manifest_size = 0
695 super(CollectionDirectory, self).clear()
696 if self.collection_record_file is not None:
697 self.inodes.del_entry(self.collection_record_file)
698 self.collection_record_file = None
701 class TmpCollectionDirectory(CollectionDirectoryBase):
702 """A directory backed by an Arvados collection that never gets saved.
704 This supports using Keep as scratch space. A userspace program can
705 read the .arvados#collection file to get a current manifest in
706 order to save a snapshot of the scratch data or use it as a crunch
710 class UnsaveableCollection(arvados.collection.Collection):
716 def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
717 collection = self.UnsaveableCollection(
718 api_client=api_client,
719 keep_client=api_client.keep,
720 num_retries=num_retries,
721 storage_classes_desired=storage_classes)
722 # This is always enable_write=True because it never tries to
723 # save to the backend
724 super(TmpCollectionDirectory, self).__init__(
725 parent_inode, inodes, True, filters, collection, self)
726 self.populate(self.mtime())
728 def on_event(self, *args, **kwargs):
729 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
730 if self.collection_record_file is None:
733 # See discussion in CollectionDirectoryBase.on_event
737 self.collection.lock.release()
744 with self.collection.lock:
745 self.collection_record_file.invalidate()
746 self.inodes.invalidate_inode(self.collection_record_file)
747 _logger.debug("%s invalidated collection record", self.inode)
750 self.collection.lock.acquire()
753 def collection_record(self):
754 with llfuse.lock_released:
757 "manifest_text": self.collection.manifest_text(),
758 "portable_data_hash": self.collection.portable_data_hash(),
759 "storage_classes_desired": self.collection.storage_classes_desired(),
762 def __contains__(self, k):
763 return (k == '.arvados#collection' or
764 super(TmpCollectionDirectory, self).__contains__(k))
767 def __getitem__(self, item):
768 if item == '.arvados#collection':
769 if self.collection_record_file is None:
770 self.collection_record_file = FuncToJSONFile(
771 self.inode, self.collection_record)
772 self.inodes.add_entry(self.collection_record_file)
773 return self.collection_record_file
774 return super(TmpCollectionDirectory, self).__getitem__(item)
785 def want_event_subscribe(self):
789 self.collection.stop_threads()
791 def invalidate(self):
792 if self.collection_record_file:
793 self.collection_record_file.invalidate()
794 super(TmpCollectionDirectory, self).invalidate()
797 class MagicDirectory(Directory):
798 """A special directory that logically contains the set of all extant keep locators.
800 When a file is referenced by lookup(), it is tested to see if it is a valid
801 keep locator to a manifest, and if so, loads the manifest contents as a
802 subdirectory of this directory with the locator as the directory name.
803 Since querying a list of all extant keep locators is impractical, only
804 collections that have already been accessed are visible to readdir().
809 This directory provides access to Arvados collections as subdirectories listed
810 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
811 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
812 (in the form 'zzzzz-j7d0g-1234567890abcde').
814 Note that this directory will appear empty until you attempt to access a
815 specific collection or project subdirectory (such as trying to 'cd' into it),
816 at which point the collection or project will actually be looked up on the server
817 and the directory will appear if it exists.
821 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
822 super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
824 self.num_retries = num_retries
825 self.pdh_only = pdh_only
826 self.storage_classes = storage_classes
828 def __setattr__(self, name, value):
829 super(MagicDirectory, self).__setattr__(name, value)
830 # When we're assigned an inode, add a README.
831 if ((name == 'inode') and (self.inode is not None) and
832 (not self._entries)):
833 self._entries['README'] = self.inodes.add_entry(
834 StringFile(self.inode, self.README_TEXT, time.time()))
835 # If we're the root directory, add an identical by_id subdirectory.
836 if self.inode == llfuse.ROOT_INODE:
837 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
847 def __contains__(self, k):
848 if k in self._entries:
851 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
857 if group_uuid_pattern.match(k):
858 project = self.api.groups().list(
860 ['group_class', 'in', ['project','filter']],
862 *self._filters_for('groups', qualified=False),
864 ).execute(num_retries=self.num_retries)
865 if project[u'items_available'] == 0:
867 e = self.inodes.add_entry(ProjectDirectory(
874 project[u'items'][0],
875 storage_classes=self.storage_classes,
878 e = self.inodes.add_entry(CollectionDirectory(
889 if k not in self._entries:
892 self.inodes.del_entry(e)
895 self.inodes.invalidate_entry(self, k)
896 self.inodes.del_entry(e)
898 except Exception as ex:
899 _logger.exception("arv-mount lookup '%s':", k)
901 self.inodes.del_entry(e)
904 def __getitem__(self, item):
906 return self._entries[item]
908 raise KeyError("No collection with id " + item)
913 def want_event_subscribe(self):
914 return not self.pdh_only
917 class TagsDirectory(Directory):
918 """A special directory that contains as subdirectories all tags visible to the user."""
920 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
921 super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
923 self.num_retries = num_retries
925 self._poll_time = poll_time
928 def want_event_subscribe(self):
933 with llfuse.lock_released:
934 tags = self.api.links().list(
936 ['link_class', '=', 'tag'],
938 *self._filters_for('links', qualified=False),
943 ).execute(num_retries=self.num_retries)
946 tags['items']+[{"name": n} for n in self._extra],
948 lambda a, i: a.tag == i['name'],
949 lambda i: TagDirectory(
958 poll_time=self._poll_time,
964 def __getitem__(self, item):
965 if super(TagsDirectory, self).__contains__(item):
966 return super(TagsDirectory, self).__getitem__(item)
967 with llfuse.lock_released:
968 tags = self.api.links().list(
970 ['link_class', '=', 'tag'],
972 *self._filters_for('links', qualified=False),
975 ).execute(num_retries=self.num_retries)
977 self._extra.add(item)
979 return super(TagsDirectory, self).__getitem__(item)
983 def __contains__(self, k):
984 if super(TagsDirectory, self).__contains__(k):
994 class TagDirectory(Directory):
995 """A special directory that contains as subdirectories all collections visible
996 to the user that are tagged with a particular tag.
999 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
1000 poll=False, poll_time=60):
1001 super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1003 self.num_retries = num_retries
1006 self._poll_time = poll_time
1008 def want_event_subscribe(self):
1013 with llfuse.lock_released:
1014 taggedcollections = self.api.links().list(
1016 ['link_class', '=', 'tag'],
1017 ['name', '=', self.tag],
1018 ['head_uuid', 'is_a', 'arvados#collection'],
1019 *self._filters_for('links', qualified=False),
1021 select=['head_uuid'],
1022 ).execute(num_retries=self.num_retries)
1024 taggedcollections['items'],
1025 lambda i: i['head_uuid'],
1026 lambda a, i: a.collection_locator == i['head_uuid'],
1027 lambda i: CollectionDirectory(
1039 class ProjectDirectory(Directory):
1040 """A special directory that contains the contents of a project."""
1042 __slots__ = ("api", "num_retries", "project_object", "project_object_file",
1043 "project_uuid", "_updating_lock",
1044 "_current_user", "_full_listing", "storage_classes", "recursively_contained")
1046 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1047 project_object, poll=True, poll_time=15, storage_classes=None):
1048 super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1050 self.num_retries = num_retries
1051 self.project_object = project_object
1052 self.project_object_file = None
1053 self.project_uuid = project_object['uuid']
1055 self._poll_time = poll_time
1056 self._updating_lock = threading.Lock()
1057 self._current_user = None
1058 self._full_listing = False
1059 self.storage_classes = storage_classes
1060 self.recursively_contained = False
1062 # Filter groups can contain themselves, which causes tools
1063 # that walk the filesystem to get stuck in an infinite loop,
1064 # so suppress returning a listing in that case.
1065 if self.project_object.get("group_class") == "filter":
1066 iter_parent_inode = parent_inode
1067 while iter_parent_inode != llfuse.ROOT_INODE:
1068 parent_dir = self.inodes[iter_parent_inode]
1069 if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
1070 self.recursively_contained = True
1072 iter_parent_inode = parent_dir.parent_inode
1074 def want_event_subscribe(self):
1077 def createDirectory(self, i):
1078 common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1079 if collection_uuid_pattern.match(i['uuid']):
1080 return CollectionDirectory(*common_args, i)
1081 elif group_uuid_pattern.match(i['uuid']):
1082 return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
1083 elif link_uuid_pattern.match(i['uuid']):
1084 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1085 return CollectionDirectory(*common_args, i['head_uuid'])
1088 elif uuid_pattern.match(i['uuid']):
1089 return ObjectFile(self.parent_inode, i)
1094 return self.project_uuid
1097 self._full_listing = True
1098 return super(ProjectDirectory, self).items()
1100 def namefn(self, i):
1102 if i['name'] is None or len(i['name']) == 0:
1104 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1105 # collection or subproject
1107 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1110 elif 'kind' in i and i['kind'].startswith('arvados#'):
1112 return "{}.{}".format(i['name'], i['kind'][8:])
1119 if self.project_object_file == None:
1120 self.project_object_file = ObjectFile(self.inode, self.project_object)
1121 self.inodes.add_entry(self.project_object_file)
1123 if self.recursively_contained or not self._full_listing:
1127 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1128 return a.uuid() == i['uuid']
1129 elif isinstance(a, ObjectFile):
1130 return a.uuid() == i['uuid'] and not a.stale()
1134 with llfuse.lock_released:
1135 self._updating_lock.acquire()
1136 if not self.stale():
1139 if group_uuid_pattern.match(self.project_uuid):
1140 self.project_object = self.api.groups().get(
1141 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1142 elif user_uuid_pattern.match(self.project_uuid):
1143 self.project_object = self.api.users().get(
1144 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1145 # do this in 2 steps until #17424 is fixed
1146 contents = list(arvados.util.keyset_list_all(
1147 self.api.groups().contents,
1149 num_retries=self.num_retries,
1150 uuid=self.project_uuid,
1152 ['uuid', 'is_a', 'arvados#group'],
1153 ['groups.group_class', 'in', ['project', 'filter']],
1154 *self._filters_for('groups', qualified=True),
1157 contents.extend(obj for obj in arvados.util.keyset_list_all(
1158 self.api.groups().contents,
1160 num_retries=self.num_retries,
1161 uuid=self.project_uuid,
1163 ['uuid', 'is_a', 'arvados#collection'],
1164 *self._filters_for('collections', qualified=True),
1166 ) if obj['current_version_uuid'] == obj['uuid'])
1167 # end with llfuse.lock_released, re-acquire lock
1169 self.merge(contents,
1172 self.createDirectory)
1175 self._updating_lock.release()
1177 def _add_entry(self, i, name):
1178 ent = self.createDirectory(i)
1179 self._entries[name] = self.inodes.add_entry(ent)
1180 return self._entries[name]
1184 def __getitem__(self, k):
1185 if k == '.arvados#project':
1186 return self.project_object_file
1187 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1188 return super(ProjectDirectory, self).__getitem__(k)
1189 with llfuse.lock_released:
1190 k2 = self.unsanitize_filename(k)
1192 namefilter = ["name", "=", k]
1194 namefilter = ["name", "in", [k, k2]]
1195 contents = self.api.groups().list(
1197 ["owner_uuid", "=", self.project_uuid],
1198 ["group_class", "in", ["project","filter"]],
1200 *self._filters_for('groups', qualified=False),
1203 ).execute(num_retries=self.num_retries)["items"]
1205 contents = self.api.collections().list(
1207 ["owner_uuid", "=", self.project_uuid],
1209 *self._filters_for('collections', qualified=False),
1212 ).execute(num_retries=self.num_retries)["items"]
1214 if len(contents) > 1 and contents[1]['name'] == k:
1215 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1217 contents = [contents[1]]
1218 name = self.sanitize_filename(self.namefn(contents[0]))
1221 return self._add_entry(contents[0], name)
1226 def __contains__(self, k):
1227 if k == '.arvados#project':
1239 if not self._enable_write:
1241 return self.project_object.get("can_write") is True
1243 def persisted(self):
1247 super(ProjectDirectory, self).clear()
1248 if self.project_object_file is not None:
1249 self.inodes.del_entry(self.project_object_file)
1250 self.project_object_file = None
1254 def mkdir(self, name):
1255 if not self.writable():
1256 raise llfuse.FUSEError(errno.EROFS)
1259 with llfuse.lock_released:
1261 "owner_uuid": self.project_uuid,
1263 "manifest_text": "" }
1264 if self.storage_classes is not None:
1265 c["storage_classes_desired"] = self.storage_classes
1267 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1268 except Exception as e:
1271 except apiclient_errors.Error as error:
1272 _logger.error(error)
1273 raise llfuse.FUSEError(errno.EEXIST)
1277 def rmdir(self, name):
1278 if not self.writable():
1279 raise llfuse.FUSEError(errno.EROFS)
1281 if name not in self:
1282 raise llfuse.FUSEError(errno.ENOENT)
1283 if not isinstance(self[name], CollectionDirectory):
1284 raise llfuse.FUSEError(errno.EPERM)
1285 if len(self[name]) > 0:
1286 raise llfuse.FUSEError(errno.ENOTEMPTY)
1287 with llfuse.lock_released:
1288 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1293 def rename(self, name_old, name_new, src):
1294 if not self.writable():
1295 raise llfuse.FUSEError(errno.EROFS)
1297 if not isinstance(src, ProjectDirectory):
1298 raise llfuse.FUSEError(errno.EPERM)
1302 if not isinstance(ent, CollectionDirectory):
1303 raise llfuse.FUSEError(errno.EPERM)
1305 if name_new in self:
1306 # POSIX semantics for replacing one directory with another is
1307 # tricky (the target directory must be empty, the operation must be
1308 # atomic which isn't possible with the Arvados API as of this
1309 # writing) so don't support that.
1310 raise llfuse.FUSEError(errno.EPERM)
1312 self.api.collections().update(uuid=ent.uuid(),
1313 body={"owner_uuid": self.uuid(),
1314 "name": name_new}).execute(num_retries=self.num_retries)
1316 # Acually move the entry from source directory to this directory.
1317 del src._entries[name_old]
1318 self._entries[name_new] = ent
1319 self.inodes.invalidate_entry(src, name_old)
1322 def child_event(self, ev):
1323 properties = ev.get("properties") or {}
1324 old_attrs = properties.get("old_attributes") or {}
1325 new_attrs = properties.get("new_attributes") or {}
1326 old_attrs["uuid"] = ev["object_uuid"]
1327 new_attrs["uuid"] = ev["object_uuid"]
1328 old_name = self.sanitize_filename(self.namefn(old_attrs))
1329 new_name = self.sanitize_filename(self.namefn(new_attrs))
1331 # create events will have a new name, but not an old name
1332 # delete events will have an old name, but not a new name
1333 # update events will have an old and new name, and they may be same or different
1334 # if they are the same, an unrelated field changed and there is nothing to do.
1336 if old_attrs.get("owner_uuid") != self.project_uuid:
1337 # Was moved from somewhere else, so don't try to remove entry.
1339 if ev.get("object_owner_uuid") != self.project_uuid:
1340 # Was moved to somewhere else, so don't try to add entry
1343 if old_attrs.get("is_trashed"):
1344 # Was previously deleted
1346 if new_attrs.get("is_trashed"):
1350 if new_name != old_name:
1352 if old_name in self._entries:
1353 ent = self._entries[old_name]
1354 del self._entries[old_name]
1355 self.inodes.invalidate_entry(self, old_name)
1359 self._entries[new_name] = ent
1361 self._add_entry(new_attrs, new_name)
1362 elif ent is not None:
1363 self.inodes.del_entry(ent)
1366 class SharedDirectory(Directory):
1367 """A special directory that represents users or groups who have shared projects with me."""
1369 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1370 exclude, poll=False, poll_time=60, storage_classes=None):
1371 super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1373 self.num_retries = num_retries
1374 self.current_user = api.users().current().execute(num_retries=num_retries)
1376 self._poll_time = poll_time
1377 self._updating_lock = threading.Lock()
1378 self.storage_classes = storage_classes
1383 with llfuse.lock_released:
1384 self._updating_lock.acquire()
1385 if not self.stale():
1393 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1394 if 'httpMethod' in methods.get('shared', {}):
1397 resp = self.api.groups().shared(
1399 ['group_class', 'in', ['project','filter']],
1401 *self._filters_for('groups', qualified=False),
1406 include="owner_uuid",
1408 if not resp["items"]:
1410 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1411 for r in resp["items"]:
1412 objects[r["uuid"]] = r
1413 roots.append(r["uuid"])
1414 for r in resp["included"]:
1415 objects[r["uuid"]] = r
1416 root_owners.add(r["uuid"])
1418 all_projects = list(arvados.util.keyset_list_all(
1419 self.api.groups().list,
1421 num_retries=self.num_retries,
1423 ['group_class', 'in', ['project','filter']],
1424 *self._filters_for('groups', qualified=False),
1426 select=["uuid", "owner_uuid"],
1428 for ob in all_projects:
1429 objects[ob['uuid']] = ob
1431 current_uuid = self.current_user['uuid']
1432 for ob in all_projects:
1433 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1434 roots.append(ob['uuid'])
1435 root_owners.add(ob['owner_uuid'])
1437 lusers = arvados.util.keyset_list_all(
1438 self.api.users().list,
1440 num_retries=self.num_retries,
1442 ['uuid', 'in', list(root_owners)],
1443 *self._filters_for('users', qualified=False),
1446 lgroups = arvados.util.keyset_list_all(
1447 self.api.groups().list,
1449 num_retries=self.num_retries,
1451 ['uuid', 'in', list(root_owners)+roots],
1452 *self._filters_for('groups', qualified=False),
1456 objects[l["uuid"]] = l
1458 objects[l["uuid"]] = l
1460 for r in root_owners:
1464 contents[obr["name"]] = obr
1465 elif "first_name" in obr:
1466 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1471 if obr['owner_uuid'] not in objects:
1472 contents[obr["name"]] = obr
1474 # end with llfuse.lock_released, re-acquire lock
1479 lambda a, i: a.uuid() == i[1]['uuid'],
1480 lambda i: ProjectDirectory(
1489 poll_time=self._poll_time,
1490 storage_classes=self.storage_classes,
1494 _logger.exception("arv-mount shared dir error")
1496 self._updating_lock.release()
1498 def want_event_subscribe(self):