1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
15 from apiclient import errors as apiclient_errors
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
23 _logger = logging.getLogger('arvados.arvados_fuse')
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
32 class Directory(FreshBase):
33 """Generic directory object, backed by a dict.
35 Consists of a set of entries with the key representing the filename
36 and the value referencing a File or Directory object.
39 __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
41 def __init__(self, parent_inode, inodes, enable_write, filters):
42 """parent_inode is the integer inode number"""
44 super(Directory, self).__init__()
47 if not isinstance(parent_inode, int):
48 raise Exception("parent_inode should be an int")
49 self.parent_inode = parent_inode
52 self._mtime = time.time()
53 self._enable_write = enable_write
54 self._filters = filters or []
56 def _filters_for(self, subtype, *, qualified):
57 for f in self._filters:
58 f_type, _, f_name = f[0].partition('.')
61 elif f_type != subtype:
66 yield [f_name, *f[1:]]
68 def unsanitize_filename(self, incoming):
69 """Replace ForwardSlashNameSubstitution value with /"""
70 fsns = self.inodes.forward_slash_subst()
71 if isinstance(fsns, str):
72 return incoming.replace(fsns, '/')
76 def sanitize_filename(self, dirty):
77 """Replace disallowed filename characters according to
78 ForwardSlashNameSubstitution in self.api_config."""
79 # '.' and '..' are not reachable if API server is newer than #6277
89 fsns = self.inodes.forward_slash_subst()
90 if isinstance(fsns, str):
91 dirty = dirty.replace('/', fsns)
92 return _disallowed_filename_characters.sub('_', dirty)
95 # Overridden by subclasses to implement logic to update the
96 # entries dict when the directory is stale
101 # Only used when computing the size of the disk footprint of the directory
109 def checkupdate(self):
113 except apiclient.errors.HttpError as e:
118 def __getitem__(self, item):
119 return self._entries[item]
124 return list(self._entries.items())
128 def __contains__(self, k):
129 return k in self._entries
134 return len(self._entries)
137 self.inodes.touch(self)
138 super(Directory, self).fresh()
141 # Rough estimate of memory footprint based on using pympler
142 return len(self._entries) * 1024
144 def merge(self, items, fn, same, new_entry):
145 """Helper method for updating the contents of the directory.
147 Takes a list describing the new contents of the directory, reuse
148 entries that are the same in both the old and new lists, create new
149 entries, and delete old entries missing from the new list.
152 * items: Iterable --- New directory contents
154 * fn: Callable --- Takes an entry in 'items' and return the desired file or
155 directory name, or None if this entry should be skipped
157 * same: Callable --- Compare an existing entry (a File or Directory
158 object) with an entry in the items list to determine whether to keep
161 * new_entry: Callable --- Create a new directory entry (File or Directory
162 object) from an entry in the items list.
166 oldentries = self._entries
170 name = self.sanitize_filename(fn(i))
173 if name in oldentries:
174 ent = oldentries[name]
175 if same(ent, i) and ent.parent_inode == self.inode:
176 # move existing directory entry over
177 self._entries[name] = ent
179 self.inodes.inode_cache.touch(ent)
182 name = self.sanitize_filename(fn(i))
185 if name not in self._entries:
186 # create new directory entry
189 self._entries[name] = self.inodes.add_entry(ent)
190 # need to invalidate this just in case there was a
191 # previous entry that couldn't be moved over or a
192 # lookup that returned file not found and cached
194 self.inodes.invalidate_entry(self, name)
196 _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
198 # delete any other directory entries that were not in found in 'items'
199 for name, ent in oldentries.items():
200 _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
201 self.inodes.invalidate_entry(self, name)
202 self.inodes.del_entry(ent)
206 self._mtime = time.time()
207 self.inodes.inode_cache.update_cache_size(self)
212 if super(Directory, self).in_use():
214 for v in self._entries.values():
220 """Delete all entries"""
221 if not self._entries:
223 oldentries = self._entries
226 for name, ent in oldentries.items():
228 self.inodes.invalidate_entry(self, name)
229 self.inodes.del_entry(ent)
230 self.inodes.inode_cache.update_cache_size(self)
232 def kernel_invalidate(self):
233 # Invalidating the dentry on the parent implies invalidating all paths
235 if self.parent_inode in self.inodes:
236 parent = self.inodes[self.parent_inode]
238 # parent was removed already.
241 # Find self on the parent in order to invalidate this path.
242 # Calling the public items() method might trigger a refresh,
243 # which we definitely don't want, so read the internal dict directly.
244 for k,v in parent._entries.items():
246 self.inodes.invalidate_entry(parent, k)
258 def want_event_subscribe(self):
259 raise NotImplementedError()
261 def create(self, name):
262 raise NotImplementedError()
264 def mkdir(self, name):
265 raise NotImplementedError()
267 def unlink(self, name):
268 raise NotImplementedError()
270 def rmdir(self, name):
271 raise NotImplementedError()
273 def rename(self, name_old, name_new, src):
274 raise NotImplementedError()
277 class CollectionDirectoryBase(Directory):
278 """Represent an Arvados Collection as a directory.
280 This class is used for Subcollections, and is also the base class for
281 CollectionDirectory, which implements collection loading/saving on
284 Most operations act only the underlying Arvados `Collection` object. The
285 `Collection` object signals via a notify callback to
286 `CollectionDirectoryBase.on_event` that an item was added, removed or
287 modified. FUSE inodes and directory entries are created, deleted or
288 invalidated in response to these events.
292 __slots__ = ("collection", "collection_root", "collection_record_file")
294 def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
295 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
296 self.collection = collection
297 self.collection_root = collection_root
298 self.collection_record_file = None
300 def new_entry(self, name, item, mtime):
301 name = self.sanitize_filename(name)
302 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
303 if item.fuse_entry.parent_inode is not None:
304 raise Exception("Can only reparent unparented inode entry")
305 if item.fuse_entry.inode is None:
306 raise Exception("Reparented entry must still have valid inode")
307 item.fuse_entry.parent_inode = self.inode
308 self._entries[name] = item.fuse_entry
309 elif isinstance(item, arvados.collection.RichCollectionBase):
310 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
316 self.collection_root,
318 self._entries[name].populate(mtime)
320 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write, self._poll, self._poll_time))
321 item.fuse_entry = self._entries[name]
323 def on_event(self, event, collection, name, item):
324 # These are events from the Collection object (ADD/DEL/MOD)
325 # emitted by operations on the Collection object (like
326 # "mkdirs" or "remove"), and by "update", which we need to
327 # synchronize with our FUSE objects that are assigned inodes.
328 if collection == self.collection:
329 name = self.sanitize_filename(name)
332 # It's possible for another thread to have llfuse.lock and
333 # be waiting on collection.lock. Meanwhile, we released
334 # llfuse.lock earlier in the stack, but are still holding
335 # on to the collection lock, and now we need to re-acquire
336 # llfuse.lock. If we don't release the collection lock,
337 # we'll deadlock where we're holding the collection lock
338 # waiting for llfuse.lock and the other thread is holding
339 # llfuse.lock and waiting for the collection lock.
341 # The correct locking order here is to take llfuse.lock
342 # first, then the collection lock.
344 # Since collection.lock is an RLock, it might be locked
345 # multiple times, so we need to release it multiple times,
346 # keep a count, then re-lock it the correct number of
352 self.collection.lock.release()
359 with self.collection.lock:
360 if event == arvados.collection.ADD:
361 self.new_entry(name, item, self.mtime())
362 elif event == arvados.collection.DEL:
363 ent = self._entries[name]
364 del self._entries[name]
365 self.inodes.invalidate_entry(self, name)
366 self.inodes.del_entry(ent)
367 elif event == arvados.collection.MOD:
368 # MOD events have (modified_from, newitem)
371 if hasattr(newitem, "fuse_entry") and newitem.fuse_entry is not None:
372 entry = newitem.fuse_entry
373 elif name in self._entries:
374 entry = self._entries[name]
376 if entry is not None:
378 self.inodes.invalidate_inode(entry)
380 if name in self._entries:
381 self.inodes.invalidate_entry(self, name)
383 # we don't care about TOK events, those mean
384 # only token signatures were updated
386 if self.collection_record_file is not None:
387 self.collection_record_file.invalidate()
388 self.inodes.invalidate_inode(self.collection_record_file)
391 self.collection.lock.acquire()
394 def populate(self, mtime):
396 with self.collection.lock:
397 self.collection.subscribe(self.on_event)
398 for entry, item in self.collection.items():
399 self.new_entry(entry, item, self.mtime())
402 return self._enable_write and self.collection.writable()
406 self.collection_root.flush()
410 def create(self, name):
411 if not self.writable():
412 raise llfuse.FUSEError(errno.EROFS)
413 with llfuse.lock_released:
414 self.collection.open(name, "w").close()
418 def mkdir(self, name):
419 if not self.writable():
420 raise llfuse.FUSEError(errno.EROFS)
421 with llfuse.lock_released:
422 self.collection.mkdirs(name)
426 def unlink(self, name):
427 if not self.writable():
428 raise llfuse.FUSEError(errno.EROFS)
429 with llfuse.lock_released:
430 self.collection.remove(name)
435 def rmdir(self, name):
436 if not self.writable():
437 raise llfuse.FUSEError(errno.EROFS)
438 with llfuse.lock_released:
439 self.collection.remove(name)
444 def rename(self, name_old, name_new, src):
445 if not self.writable():
446 raise llfuse.FUSEError(errno.EROFS)
448 if not isinstance(src, CollectionDirectoryBase):
449 raise llfuse.FUSEError(errno.EPERM)
454 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
456 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
458 raise llfuse.FUSEError(errno.ENOTEMPTY)
459 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
460 raise llfuse.FUSEError(errno.ENOTDIR)
461 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
462 raise llfuse.FUSEError(errno.EISDIR)
464 with llfuse.lock_released:
465 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
470 super(CollectionDirectoryBase, self).clear()
471 if self.collection is not None:
472 self.collection.unsubscribe()
473 self.collection = None
476 # objsize for the whole collection is represented at the root,
477 # don't double-count it
480 class CollectionDirectory(CollectionDirectoryBase):
481 """Represents the root of a directory tree representing a collection."""
483 __slots__ = ("api", "num_retries", "collection_locator",
484 "_manifest_size", "_writable", "_updating_lock")
486 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
487 super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
489 self.num_retries = num_retries
492 if isinstance(collection_record, dict):
493 self.collection_locator = collection_record['uuid']
494 self._mtime = convertTime(collection_record.get('modified_at'))
496 self.collection_locator = collection_record
499 is_uuid = (self.collection_locator is not None) and (uuid_pattern.match(self.collection_locator) is not None)
502 # It is a uuid, it may be updated upstream, so recheck it periodically.
505 # It is not a uuid. For immutable collections, collection
506 # only needs to be refreshed if it is very long lived
507 # (long enough that there's a risk of the blob signatures
510 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
512 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
513 self._poll_time = 60*60
515 self._writable = is_uuid and enable_write
516 self._manifest_size = 0
517 self._updating_lock = threading.Lock()
520 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
523 return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
527 with llfuse.lock_released:
528 with self._updating_lock:
529 if self.collection.committed():
530 self.collection.update()
532 self.collection.save()
533 self.new_collection_record(self.collection.api_response())
535 def want_event_subscribe(self):
536 return (uuid_pattern.match(self.collection_locator) is not None)
538 def new_collection(self, new_collection_record, coll_reader):
541 self.collection = coll_reader
542 self.new_collection_record(new_collection_record)
543 self.populate(self.mtime())
545 def new_collection_record(self, new_collection_record):
546 if not new_collection_record:
547 raise Exception("invalid new_collection_record")
548 self._mtime = convertTime(new_collection_record.get('modified_at'))
549 self._manifest_size = len(new_collection_record["manifest_text"])
550 self.collection_locator = new_collection_record["uuid"]
551 if self.collection_record_file is not None:
552 self.collection_record_file.invalidate()
553 self.inodes.invalidate_inode(self.collection_record_file)
554 _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
555 self.collection_record_file.inode)
556 self.inodes.update_uuid(self)
557 self.inodes.inode_cache.update_cache_size(self)
561 return self.collection_locator
566 if self.collection_locator is None:
567 # No collection locator to retrieve from
571 new_collection_record = None
573 with llfuse.lock_released:
574 self._updating_lock.acquire()
578 _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
580 if self.collection is not None:
581 # Already have a collection object
582 self.collection.update()
583 new_collection_record = self.collection.api_response()
585 # Create a new collection object
586 if uuid_pattern.match(self.collection_locator):
587 coll_reader = arvados.collection.Collection(
588 self.collection_locator, self.api, self.api.keep,
589 num_retries=self.num_retries)
591 coll_reader = arvados.collection.CollectionReader(
592 self.collection_locator, self.api, self.api.keep,
593 num_retries=self.num_retries)
594 new_collection_record = coll_reader.api_response() or {}
595 # If the Collection only exists in Keep, there will be no API
596 # response. Fill in the fields we need.
597 if 'uuid' not in new_collection_record:
598 new_collection_record['uuid'] = self.collection_locator
599 if "portable_data_hash" not in new_collection_record:
600 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
601 if 'manifest_text' not in new_collection_record:
602 new_collection_record['manifest_text'] = coll_reader.manifest_text()
603 if 'storage_classes_desired' not in new_collection_record:
604 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
606 # end with llfuse.lock_released, re-acquire lock
608 if new_collection_record is not None:
609 if coll_reader is not None:
610 self.new_collection(new_collection_record, coll_reader)
612 self.new_collection_record(new_collection_record)
616 self._updating_lock.release()
617 except arvados.errors.NotFoundError as e:
618 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
619 except arvados.errors.ArgumentError as detail:
620 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
621 if new_collection_record is not None and "manifest_text" in new_collection_record:
622 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
624 _logger.exception("arv-mount %s: error", self.collection_locator)
625 if new_collection_record is not None and "manifest_text" in new_collection_record:
626 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
632 def collection_record(self):
634 return self.collection.api_response()
638 def __getitem__(self, item):
639 if item == '.arvados#collection':
640 if self.collection_record_file is None:
641 self.collection_record_file = FuncToJSONFile(
642 self.inode, self.collection_record)
643 self.inodes.add_entry(self.collection_record_file)
644 self.invalidate() # use lookup as a signal to force update
645 return self.collection_record_file
647 return super(CollectionDirectory, self).__getitem__(item)
649 def __contains__(self, k):
650 if k == '.arvados#collection':
653 return super(CollectionDirectory, self).__contains__(k)
655 def invalidate(self):
656 if self.collection_record_file is not None:
657 self.collection_record_file.invalidate()
658 self.inodes.invalidate_inode(self.collection_record_file)
659 super(CollectionDirectory, self).invalidate()
662 return (self.collection_locator is not None)
665 # This is a rough guess of the amount of overhead involved for
666 # a collection; the assumptions are that that each file
667 # averages 128 bytes in the manifest, but consume 1024 bytes
668 # of Python data structures, so 1024/128=8 means we estimate
669 # the RAM footprint at 8 times the size of bare manifest text.
670 return self._manifest_size * 8
673 if self.collection is None:
678 self.collection.save()
679 except Exception as e:
680 _logger.exception("Failed to save collection %s", self.collection_locator)
681 self.collection.stop_threads()
684 if self.collection is not None:
685 self.collection.stop_threads()
686 self._manifest_size = 0
687 super(CollectionDirectory, self).clear()
688 if self.collection_record_file is not None:
689 self.inodes.del_entry(self.collection_record_file)
690 self.collection_record_file = None
693 class TmpCollectionDirectory(CollectionDirectoryBase):
694 """A directory backed by an Arvados collection that never gets saved.
696 This supports using Keep as scratch space. A userspace program can
697 read the .arvados#collection file to get a current manifest in
698 order to save a snapshot of the scratch data or use it as a crunch
702 class UnsaveableCollection(arvados.collection.Collection):
708 def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
709 collection = self.UnsaveableCollection(
710 api_client=api_client,
711 keep_client=api_client.keep,
712 num_retries=num_retries,
713 storage_classes_desired=storage_classes)
714 # This is always enable_write=True because it never tries to
715 # save to the backend
716 super(TmpCollectionDirectory, self).__init__(
717 parent_inode, inodes, True, filters, collection, self)
718 self.populate(self.mtime())
720 def on_event(self, *args, **kwargs):
721 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
722 if self.collection_record_file is None:
725 # See discussion in CollectionDirectoryBase.on_event
729 self.collection.lock.release()
736 with self.collection.lock:
737 self.collection_record_file.invalidate()
738 self.inodes.invalidate_inode(self.collection_record_file)
739 _logger.debug("%s invalidated collection record", self.inode)
742 self.collection.lock.acquire()
745 def collection_record(self):
746 with llfuse.lock_released:
749 "manifest_text": self.collection.manifest_text(),
750 "portable_data_hash": self.collection.portable_data_hash(),
751 "storage_classes_desired": self.collection.storage_classes_desired(),
754 def __contains__(self, k):
755 return (k == '.arvados#collection' or
756 super(TmpCollectionDirectory, self).__contains__(k))
759 def __getitem__(self, item):
760 if item == '.arvados#collection':
761 if self.collection_record_file is None:
762 self.collection_record_file = FuncToJSONFile(
763 self.inode, self.collection_record)
764 self.inodes.add_entry(self.collection_record_file)
765 return self.collection_record_file
766 return super(TmpCollectionDirectory, self).__getitem__(item)
777 def want_event_subscribe(self):
781 self.collection.stop_threads()
783 def invalidate(self):
784 if self.collection_record_file:
785 self.collection_record_file.invalidate()
786 super(TmpCollectionDirectory, self).invalidate()
789 class MagicDirectory(Directory):
790 """A special directory that logically contains the set of all extant keep locators.
792 When a file is referenced by lookup(), it is tested to see if it is a valid
793 keep locator to a manifest, and if so, loads the manifest contents as a
794 subdirectory of this directory with the locator as the directory name.
795 Since querying a list of all extant keep locators is impractical, only
796 collections that have already been accessed are visible to readdir().
801 This directory provides access to Arvados collections as subdirectories listed
802 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
803 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
804 (in the form 'zzzzz-j7d0g-1234567890abcde').
806 Note that this directory will appear empty until you attempt to access a
807 specific collection or project subdirectory (such as trying to 'cd' into it),
808 at which point the collection or project will actually be looked up on the server
809 and the directory will appear if it exists.
813 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
814 super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
816 self.num_retries = num_retries
817 self.pdh_only = pdh_only
818 self.storage_classes = storage_classes
820 def __setattr__(self, name, value):
821 super(MagicDirectory, self).__setattr__(name, value)
822 # When we're assigned an inode, add a README.
823 if ((name == 'inode') and (self.inode is not None) and
824 (not self._entries)):
825 self._entries['README'] = self.inodes.add_entry(
826 StringFile(self.inode, self.README_TEXT, time.time()))
827 # If we're the root directory, add an identical by_id subdirectory.
828 if self.inode == llfuse.ROOT_INODE:
829 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
839 def __contains__(self, k):
840 if k in self._entries:
843 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
849 if group_uuid_pattern.match(k):
850 project = self.api.groups().list(
852 ['group_class', 'in', ['project','filter']],
854 *self._filters_for('groups', qualified=False),
856 ).execute(num_retries=self.num_retries)
857 if project[u'items_available'] == 0:
859 e = self.inodes.add_entry(ProjectDirectory(
866 project[u'items'][0],
867 storage_classes=self.storage_classes,
870 e = self.inodes.add_entry(CollectionDirectory(
881 if k not in self._entries:
884 self.inodes.del_entry(e)
887 self.inodes.invalidate_entry(self, k)
888 self.inodes.del_entry(e)
890 except Exception as ex:
891 _logger.exception("arv-mount lookup '%s':", k)
893 self.inodes.del_entry(e)
896 def __getitem__(self, item):
898 return self._entries[item]
900 raise KeyError("No collection with id " + item)
905 def want_event_subscribe(self):
906 return not self.pdh_only
909 class TagsDirectory(Directory):
910 """A special directory that contains as subdirectories all tags visible to the user."""
912 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
913 super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
915 self.num_retries = num_retries
917 self._poll_time = poll_time
920 def want_event_subscribe(self):
925 with llfuse.lock_released:
926 tags = self.api.links().list(
928 ['link_class', '=', 'tag'],
930 *self._filters_for('links', qualified=False),
935 ).execute(num_retries=self.num_retries)
938 tags['items']+[{"name": n} for n in self._extra],
940 lambda a, i: a.tag == i['name'],
941 lambda i: TagDirectory(
950 poll_time=self._poll_time,
956 def __getitem__(self, item):
957 if super(TagsDirectory, self).__contains__(item):
958 return super(TagsDirectory, self).__getitem__(item)
959 with llfuse.lock_released:
960 tags = self.api.links().list(
962 ['link_class', '=', 'tag'],
964 *self._filters_for('links', qualified=False),
967 ).execute(num_retries=self.num_retries)
969 self._extra.add(item)
971 return super(TagsDirectory, self).__getitem__(item)
975 def __contains__(self, k):
976 if super(TagsDirectory, self).__contains__(k):
986 class TagDirectory(Directory):
987 """A special directory that contains as subdirectories all collections visible
988 to the user that are tagged with a particular tag.
991 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
992 poll=False, poll_time=60):
993 super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
995 self.num_retries = num_retries
998 self._poll_time = poll_time
1000 def want_event_subscribe(self):
1005 with llfuse.lock_released:
1006 taggedcollections = self.api.links().list(
1008 ['link_class', '=', 'tag'],
1009 ['name', '=', self.tag],
1010 ['head_uuid', 'is_a', 'arvados#collection'],
1011 *self._filters_for('links', qualified=False),
1013 select=['head_uuid'],
1014 ).execute(num_retries=self.num_retries)
1016 taggedcollections['items'],
1017 lambda i: i['head_uuid'],
1018 lambda a, i: a.collection_locator == i['head_uuid'],
1019 lambda i: CollectionDirectory(
1031 class ProjectDirectory(Directory):
1032 """A special directory that contains the contents of a project."""
1034 __slots__ = ("api", "num_retries", "project_object", "project_object_file",
1035 "project_uuid", "_updating_lock",
1036 "_current_user", "_full_listing", "storage_classes", "recursively_contained")
1038 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1039 project_object, poll=True, poll_time=15, storage_classes=None):
1040 super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1042 self.num_retries = num_retries
1043 self.project_object = project_object
1044 self.project_object_file = None
1045 self.project_uuid = project_object['uuid']
1047 self._poll_time = poll_time
1048 self._updating_lock = threading.Lock()
1049 self._current_user = None
1050 self._full_listing = False
1051 self.storage_classes = storage_classes
1052 self.recursively_contained = False
1054 # Filter groups can contain themselves, which causes tools
1055 # that walk the filesystem to get stuck in an infinite loop,
1056 # so suppress returning a listing in that case.
1057 if self.project_object.get("group_class") == "filter":
1058 iter_parent_inode = parent_inode
1059 while iter_parent_inode != llfuse.ROOT_INODE:
1060 parent_dir = self.inodes[iter_parent_inode]
1061 if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
1062 self.recursively_contained = True
1064 iter_parent_inode = parent_dir.parent_inode
1066 def want_event_subscribe(self):
1069 def createDirectory(self, i):
1070 common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1071 if collection_uuid_pattern.match(i['uuid']):
1072 return CollectionDirectory(*common_args, i)
1073 elif group_uuid_pattern.match(i['uuid']):
1074 return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
1075 elif link_uuid_pattern.match(i['uuid']):
1076 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1077 return CollectionDirectory(*common_args, i['head_uuid'])
1080 elif uuid_pattern.match(i['uuid']):
1081 return ObjectFile(self.parent_inode, i)
1086 return self.project_uuid
1089 self._full_listing = True
1090 return super(ProjectDirectory, self).items()
1092 def namefn(self, i):
1094 if i['name'] is None or len(i['name']) == 0:
1096 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1097 # collection or subproject
1099 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1102 elif 'kind' in i and i['kind'].startswith('arvados#'):
1104 return "{}.{}".format(i['name'], i['kind'][8:])
1111 if self.project_object_file == None:
1112 self.project_object_file = ObjectFile(self.inode, self.project_object)
1113 self.inodes.add_entry(self.project_object_file)
1115 if self.recursively_contained or not self._full_listing:
1119 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1120 return a.uuid() == i['uuid']
1121 elif isinstance(a, ObjectFile):
1122 return a.uuid() == i['uuid'] and not a.stale()
1126 with llfuse.lock_released:
1127 self._updating_lock.acquire()
1128 if not self.stale():
1131 if group_uuid_pattern.match(self.project_uuid):
1132 self.project_object = self.api.groups().get(
1133 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1134 elif user_uuid_pattern.match(self.project_uuid):
1135 self.project_object = self.api.users().get(
1136 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1137 # do this in 2 steps until #17424 is fixed
1138 contents = list(arvados.util.keyset_list_all(
1139 self.api.groups().contents,
1141 num_retries=self.num_retries,
1142 uuid=self.project_uuid,
1144 ['uuid', 'is_a', 'arvados#group'],
1145 ['groups.group_class', 'in', ['project', 'filter']],
1146 *self._filters_for('groups', qualified=True),
1149 contents.extend(obj for obj in arvados.util.keyset_list_all(
1150 self.api.groups().contents,
1152 num_retries=self.num_retries,
1153 uuid=self.project_uuid,
1155 ['uuid', 'is_a', 'arvados#collection'],
1156 *self._filters_for('collections', qualified=True),
1158 ) if obj['current_version_uuid'] == obj['uuid'])
1159 # end with llfuse.lock_released, re-acquire lock
1161 self.merge(contents,
1164 self.createDirectory)
1167 self._updating_lock.release()
1169 def _add_entry(self, i, name):
1170 ent = self.createDirectory(i)
1171 self._entries[name] = self.inodes.add_entry(ent)
1172 return self._entries[name]
1176 def __getitem__(self, k):
1177 if k == '.arvados#project':
1178 return self.project_object_file
1179 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1180 return super(ProjectDirectory, self).__getitem__(k)
1181 with llfuse.lock_released:
1182 k2 = self.unsanitize_filename(k)
1184 namefilter = ["name", "=", k]
1186 namefilter = ["name", "in", [k, k2]]
1187 contents = self.api.groups().list(
1189 ["owner_uuid", "=", self.project_uuid],
1190 ["group_class", "in", ["project","filter"]],
1192 *self._filters_for('groups', qualified=False),
1195 ).execute(num_retries=self.num_retries)["items"]
1197 contents = self.api.collections().list(
1199 ["owner_uuid", "=", self.project_uuid],
1201 *self._filters_for('collections', qualified=False),
1204 ).execute(num_retries=self.num_retries)["items"]
1206 if len(contents) > 1 and contents[1]['name'] == k:
1207 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1209 contents = [contents[1]]
1210 name = self.sanitize_filename(self.namefn(contents[0]))
1213 return self._add_entry(contents[0], name)
1218 def __contains__(self, k):
1219 if k == '.arvados#project':
1231 if not self._enable_write:
1233 return self.project_object.get("can_write") is True
1235 def persisted(self):
1239 super(ProjectDirectory, self).clear()
1240 if self.project_object_file is not None:
1241 self.inodes.del_entry(self.project_object_file)
1242 self.project_object_file = None
1246 def mkdir(self, name):
1247 if not self.writable():
1248 raise llfuse.FUSEError(errno.EROFS)
1251 with llfuse.lock_released:
1253 "owner_uuid": self.project_uuid,
1255 "manifest_text": "" }
1256 if self.storage_classes is not None:
1257 c["storage_classes_desired"] = self.storage_classes
1259 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1260 except Exception as e:
1263 except apiclient_errors.Error as error:
1264 _logger.error(error)
1265 raise llfuse.FUSEError(errno.EEXIST)
1269 def rmdir(self, name):
1270 if not self.writable():
1271 raise llfuse.FUSEError(errno.EROFS)
1273 if name not in self:
1274 raise llfuse.FUSEError(errno.ENOENT)
1275 if not isinstance(self[name], CollectionDirectory):
1276 raise llfuse.FUSEError(errno.EPERM)
1277 if len(self[name]) > 0:
1278 raise llfuse.FUSEError(errno.ENOTEMPTY)
1279 with llfuse.lock_released:
1280 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1285 def rename(self, name_old, name_new, src):
1286 if not self.writable():
1287 raise llfuse.FUSEError(errno.EROFS)
1289 if not isinstance(src, ProjectDirectory):
1290 raise llfuse.FUSEError(errno.EPERM)
1294 if not isinstance(ent, CollectionDirectory):
1295 raise llfuse.FUSEError(errno.EPERM)
1297 if name_new in self:
1298 # POSIX semantics for replacing one directory with another is
1299 # tricky (the target directory must be empty, the operation must be
1300 # atomic which isn't possible with the Arvados API as of this
1301 # writing) so don't support that.
1302 raise llfuse.FUSEError(errno.EPERM)
1304 self.api.collections().update(uuid=ent.uuid(),
1305 body={"owner_uuid": self.uuid(),
1306 "name": name_new}).execute(num_retries=self.num_retries)
1308 # Acually move the entry from source directory to this directory.
1309 del src._entries[name_old]
1310 self._entries[name_new] = ent
1311 self.inodes.invalidate_entry(src, name_old)
1314 def child_event(self, ev):
1315 properties = ev.get("properties") or {}
1316 old_attrs = properties.get("old_attributes") or {}
1317 new_attrs = properties.get("new_attributes") or {}
1318 old_attrs["uuid"] = ev["object_uuid"]
1319 new_attrs["uuid"] = ev["object_uuid"]
1320 old_name = self.sanitize_filename(self.namefn(old_attrs))
1321 new_name = self.sanitize_filename(self.namefn(new_attrs))
1323 # create events will have a new name, but not an old name
1324 # delete events will have an old name, but not a new name
1325 # update events will have an old and new name, and they may be same or different
1326 # if they are the same, an unrelated field changed and there is nothing to do.
1328 if old_attrs.get("owner_uuid") != self.project_uuid:
1329 # Was moved from somewhere else, so don't try to remove entry.
1331 if ev.get("object_owner_uuid") != self.project_uuid:
1332 # Was moved to somewhere else, so don't try to add entry
1335 if old_attrs.get("is_trashed"):
1336 # Was previously deleted
1338 if new_attrs.get("is_trashed"):
1342 if new_name != old_name:
1344 if old_name in self._entries:
1345 ent = self._entries[old_name]
1346 del self._entries[old_name]
1347 self.inodes.invalidate_entry(self, old_name)
1351 self._entries[new_name] = ent
1353 self._add_entry(new_attrs, new_name)
1354 elif ent is not None:
1355 self.inodes.del_entry(ent)
1358 class SharedDirectory(Directory):
1359 """A special directory that represents users or groups who have shared projects with me."""
1361 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1362 exclude, poll=False, poll_time=60, storage_classes=None):
1363 super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1365 self.num_retries = num_retries
1366 self.current_user = api.users().current().execute(num_retries=num_retries)
1368 self._poll_time = poll_time
1369 self._updating_lock = threading.Lock()
1370 self.storage_classes = storage_classes
1375 with llfuse.lock_released:
1376 self._updating_lock.acquire()
1377 if not self.stale():
1385 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1386 if 'httpMethod' in methods.get('shared', {}):
1389 resp = self.api.groups().shared(
1391 ['group_class', 'in', ['project','filter']],
1393 *self._filters_for('groups', qualified=False),
1398 include="owner_uuid",
1400 if not resp["items"]:
1402 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1403 for r in resp["items"]:
1404 objects[r["uuid"]] = r
1405 roots.append(r["uuid"])
1406 for r in resp["included"]:
1407 objects[r["uuid"]] = r
1408 root_owners.add(r["uuid"])
1410 all_projects = list(arvados.util.keyset_list_all(
1411 self.api.groups().list,
1413 num_retries=self.num_retries,
1415 ['group_class', 'in', ['project','filter']],
1416 *self._filters_for('groups', qualified=False),
1418 select=["uuid", "owner_uuid"],
1420 for ob in all_projects:
1421 objects[ob['uuid']] = ob
1423 current_uuid = self.current_user['uuid']
1424 for ob in all_projects:
1425 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1426 roots.append(ob['uuid'])
1427 root_owners.add(ob['owner_uuid'])
1429 lusers = arvados.util.keyset_list_all(
1430 self.api.users().list,
1432 num_retries=self.num_retries,
1434 ['uuid', 'in', list(root_owners)],
1435 *self._filters_for('users', qualified=False),
1438 lgroups = arvados.util.keyset_list_all(
1439 self.api.groups().list,
1441 num_retries=self.num_retries,
1443 ['uuid', 'in', list(root_owners)+roots],
1444 *self._filters_for('groups', qualified=False),
1448 objects[l["uuid"]] = l
1450 objects[l["uuid"]] = l
1452 for r in root_owners:
1456 contents[obr["name"]] = obr
1457 elif "first_name" in obr:
1458 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1463 if obr['owner_uuid'] not in objects:
1464 contents[obr["name"]] = obr
1466 # end with llfuse.lock_released, re-acquire lock
1471 lambda a, i: a.uuid() == i[1]['uuid'],
1472 lambda i: ProjectDirectory(
1481 poll_time=self._poll_time,
1482 storage_classes=self.storage_classes,
1486 _logger.exception("arv-mount shared dir error")
1488 self._updating_lock.release()
1490 def want_event_subscribe(self):