1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
15 from apiclient import errors as apiclient_errors
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
23 _logger = logging.getLogger('arvados.arvados_fuse')
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
32 class Directory(FreshBase):
33 """Generic directory object, backed by a dict.
35 Consists of a set of entries with the key representing the filename
36 and the value referencing a File or Directory object.
39 __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
41 def __init__(self, parent_inode, inodes, enable_write, filters):
42 """parent_inode is the integer inode number"""
44 super(Directory, self).__init__()
47 if not isinstance(parent_inode, int):
48 raise Exception("parent_inode should be an int")
49 self.parent_inode = parent_inode
52 self._mtime = time.time()
53 self._enable_write = enable_write
54 self._filters = filters or []
56 def _filters_for(self, subtype, *, qualified):
57 for f in self._filters:
58 f_type, _, f_name = f[0].partition('.')
61 elif f_type != subtype:
66 yield [f_name, *f[1:]]
68 def unsanitize_filename(self, incoming):
69 """Replace ForwardSlashNameSubstitution value with /"""
70 fsns = self.inodes.forward_slash_subst()
71 if isinstance(fsns, str):
72 return incoming.replace(fsns, '/')
76 def sanitize_filename(self, dirty):
77 """Replace disallowed filename characters according to
78 ForwardSlashNameSubstitution in self.api_config."""
79 # '.' and '..' are not reachable if API server is newer than #6277
89 fsns = self.inodes.forward_slash_subst()
90 if isinstance(fsns, str):
91 dirty = dirty.replace('/', fsns)
92 return _disallowed_filename_characters.sub('_', dirty)
95 # Overridden by subclasses to implement logic to update the
96 # entries dict when the directory is stale
101 # Only used when computing the size of the disk footprint of the directory
109 def checkupdate(self):
113 except apiclient.errors.HttpError as e:
118 def __getitem__(self, item):
119 return self._entries[item]
124 return list(self._entries.items())
128 def __contains__(self, k):
129 return k in self._entries
134 return len(self._entries)
137 self.inodes.touch(self)
138 super(Directory, self).fresh()
141 # Rough estimate of memory footprint based on using pympler
142 return len(self._entries) * 1024
144 def merge(self, items, fn, same, new_entry):
145 """Helper method for updating the contents of the directory.
147 Takes a list describing the new contents of the directory, reuse
148 entries that are the same in both the old and new lists, create new
149 entries, and delete old entries missing from the new list.
152 * items: Iterable --- New directory contents
154 * fn: Callable --- Takes an entry in 'items' and return the desired file or
155 directory name, or None if this entry should be skipped
157 * same: Callable --- Compare an existing entry (a File or Directory
158 object) with an entry in the items list to determine whether to keep
161 * new_entry: Callable --- Create a new directory entry (File or Directory
162 object) from an entry in the items list.
166 oldentries = self._entries
170 name = self.sanitize_filename(fn(i))
173 if name in oldentries:
174 ent = oldentries[name]
175 if same(ent, i) and ent.parent_inode == self.inode:
176 # move existing directory entry over
177 self._entries[name] = ent
179 self.inodes.inode_cache.touch(ent)
182 name = self.sanitize_filename(fn(i))
185 if name not in self._entries:
186 # create new directory entry
189 self._entries[name] = self.inodes.add_entry(ent)
190 # need to invalidate this just in case there was a
191 # previous entry that couldn't be moved over or a
192 # lookup that returned file not found and cached
194 self.inodes.invalidate_entry(self, name)
196 _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
198 # delete any other directory entries that were not in found in 'items'
199 for name, ent in oldentries.items():
200 _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
201 self.inodes.invalidate_entry(self, name)
202 self.inodes.del_entry(ent)
206 self._mtime = time.time()
207 self.inodes.inode_cache.update_cache_size(self)
212 if super(Directory, self).in_use():
214 for v in self._entries.values():
220 """Delete all entries"""
221 if not self._entries:
223 oldentries = self._entries
226 for name, ent in oldentries.items():
228 self.inodes.invalidate_entry(self, name)
229 self.inodes.del_entry(ent)
230 self.inodes.inode_cache.update_cache_size(self)
232 def kernel_invalidate(self):
233 # Invalidating the dentry on the parent implies invalidating all paths
235 if self.parent_inode in self.inodes:
236 parent = self.inodes[self.parent_inode]
238 # parent was removed already.
241 # Find self on the parent in order to invalidate this path.
242 # Calling the public items() method might trigger a refresh,
243 # which we definitely don't want, so read the internal dict directly.
244 for k,v in parent._entries.items():
246 self.inodes.invalidate_entry(parent, k)
258 def want_event_subscribe(self):
259 raise NotImplementedError()
261 def create(self, name):
262 raise NotImplementedError()
264 def mkdir(self, name):
265 raise NotImplementedError()
267 def unlink(self, name):
268 raise NotImplementedError()
270 def rmdir(self, name):
271 raise NotImplementedError()
273 def rename(self, name_old, name_new, src):
274 raise NotImplementedError()
277 class CollectionDirectoryBase(Directory):
278 """Represent an Arvados Collection as a directory.
280 This class is used for Subcollections, and is also the base class for
281 CollectionDirectory, which implements collection loading/saving on
284 Most operations act only the underlying Arvados `Collection` object. The
285 `Collection` object signals via a notify callback to
286 `CollectionDirectoryBase.on_event` that an item was added, removed or
287 modified. FUSE inodes and directory entries are created, deleted or
288 invalidated in response to these events.
292 __slots__ = ("collection", "collection_root", "collection_record_file")
294 def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
295 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
296 self.collection = collection
297 self.collection_root = collection_root
298 self.collection_record_file = None
300 def new_entry(self, name, item, mtime):
301 name = self.sanitize_filename(name)
302 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
303 if item.fuse_entry.parent_inode is not None:
304 raise Exception("Can only reparent unparented inode entry")
305 if item.fuse_entry.inode is None:
306 raise Exception("Reparented entry must still have valid inode")
307 item.fuse_entry.parent_inode = self.inode
308 self._entries[name] = item.fuse_entry
309 elif isinstance(item, arvados.collection.RichCollectionBase):
310 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
316 self.collection_root,
318 self._entries[name].populate(mtime)
320 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
321 item.fuse_entry = self._entries[name]
323 def on_event(self, event, collection, name, item):
324 # These are events from the Collection object (ADD/DEL/MOD)
325 # emitted by operations on the Collection object (like
326 # "mkdirs" or "remove"), and by "update", which we need to
327 # synchronize with our FUSE objects that are assigned inodes.
328 if collection == self.collection:
329 name = self.sanitize_filename(name)
332 # It's possible for another thread to have llfuse.lock and
333 # be waiting on collection.lock. Meanwhile, we released
334 # llfuse.lock earlier in the stack, but are still holding
335 # on to the collection lock, and now we need to re-acquire
336 # llfuse.lock. If we don't release the collection lock,
337 # we'll deadlock where we're holding the collection lock
338 # waiting for llfuse.lock and the other thread is holding
339 # llfuse.lock and waiting for the collection lock.
341 # The correct locking order here is to take llfuse.lock
342 # first, then the collection lock.
344 # Since collection.lock is an RLock, it might be locked
345 # multiple times, so we need to release it multiple times,
346 # keep a count, then re-lock it the correct number of
352 self.collection.lock.release()
359 with self.collection.lock:
360 if event == arvados.collection.ADD:
361 self.new_entry(name, item, self.mtime())
362 elif event == arvados.collection.DEL:
363 ent = self._entries[name]
364 del self._entries[name]
365 self.inodes.invalidate_entry(self, name)
366 self.inodes.del_entry(ent)
367 elif event == arvados.collection.MOD:
368 # MOD events have (modified_from, newitem)
370 if hasattr(newitem, "fuse_entry") and newitem.fuse_entry is not None:
371 newitem.fuse_entry.invalidate()
372 self.inodes.invalidate_inode(newitem.fuse_entry)
373 elif name in self._entries:
374 self._entries[name].invalidate()
375 self.inodes.invalidate_inode(self._entries[name])
376 # we don't care about TOK events, those mean
377 # only token signatures were updated
379 if self.collection_record_file is not None:
380 self.collection_record_file.invalidate()
381 self.inodes.invalidate_inode(self.collection_record_file)
384 self.collection.lock.acquire()
387 def populate(self, mtime):
389 with self.collection.lock:
390 self.collection.subscribe(self.on_event)
391 for entry, item in self.collection.items():
392 self.new_entry(entry, item, self.mtime())
395 return self._enable_write and self.collection.writable()
399 self.collection_root.flush()
403 def create(self, name):
404 if not self.writable():
405 raise llfuse.FUSEError(errno.EROFS)
406 with llfuse.lock_released:
407 self.collection.open(name, "w").close()
411 def mkdir(self, name):
412 if not self.writable():
413 raise llfuse.FUSEError(errno.EROFS)
414 with llfuse.lock_released:
415 self.collection.mkdirs(name)
419 def unlink(self, name):
420 if not self.writable():
421 raise llfuse.FUSEError(errno.EROFS)
422 with llfuse.lock_released:
423 self.collection.remove(name)
428 def rmdir(self, name):
429 if not self.writable():
430 raise llfuse.FUSEError(errno.EROFS)
431 with llfuse.lock_released:
432 self.collection.remove(name)
437 def rename(self, name_old, name_new, src):
438 if not self.writable():
439 raise llfuse.FUSEError(errno.EROFS)
441 if not isinstance(src, CollectionDirectoryBase):
442 raise llfuse.FUSEError(errno.EPERM)
447 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
449 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
451 raise llfuse.FUSEError(errno.ENOTEMPTY)
452 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
453 raise llfuse.FUSEError(errno.ENOTDIR)
454 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
455 raise llfuse.FUSEError(errno.EISDIR)
457 with llfuse.lock_released:
458 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
463 super(CollectionDirectoryBase, self).clear()
464 if self.collection is not None:
465 self.collection.unsubscribe()
466 self.collection = None
469 # objsize for the whole collection is represented at the root,
470 # don't double-count it
473 class CollectionDirectory(CollectionDirectoryBase):
474 """Represents the root of a directory tree representing a collection."""
476 __slots__ = ("api", "num_retries", "collection_locator",
477 "_manifest_size", "_writable", "_updating_lock")
479 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
480 super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
482 self.num_retries = num_retries
485 if isinstance(collection_record, dict):
486 self.collection_locator = collection_record['uuid']
487 self._mtime = convertTime(collection_record.get('modified_at'))
489 self.collection_locator = collection_record
492 is_uuid = (self.collection_locator is not None) and (uuid_pattern.match(self.collection_locator) is not None)
495 # It is a uuid, it may be updated upstream, so recheck it periodically.
498 # It is not a uuid. For immutable collections, collection
499 # only needs to be refreshed if it is very long lived
500 # (long enough that there's a risk of the blob signatures
503 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
505 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
506 self._poll_time = 60*60
508 self._writable = is_uuid and enable_write
509 self._manifest_size = 0
510 self._updating_lock = threading.Lock()
513 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
516 return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
520 if not self.writable():
522 with llfuse.lock_released:
523 with self._updating_lock:
524 if self.collection.committed():
525 self.collection.update()
527 self.collection.save()
528 self.new_collection_record(self.collection.api_response())
530 def want_event_subscribe(self):
531 return (uuid_pattern.match(self.collection_locator) is not None)
533 def new_collection(self, new_collection_record, coll_reader):
536 self.collection = coll_reader
537 self.new_collection_record(new_collection_record)
538 self.populate(self.mtime())
540 def new_collection_record(self, new_collection_record):
541 if not new_collection_record:
542 raise Exception("invalid new_collection_record")
543 self._mtime = convertTime(new_collection_record.get('modified_at'))
544 self._manifest_size = len(new_collection_record["manifest_text"])
545 self.collection_locator = new_collection_record["uuid"]
546 if self.collection_record_file is not None:
547 self.collection_record_file.invalidate()
548 self.inodes.invalidate_inode(self.collection_record_file)
549 _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
550 self.collection_record_file.inode)
551 self.inodes.update_uuid(self)
552 self.inodes.inode_cache.update_cache_size(self)
556 return self.collection_locator
561 if self.collection_locator is None:
562 # No collection locator to retrieve from
566 new_collection_record = None
568 with llfuse.lock_released:
569 self._updating_lock.acquire()
573 _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
575 if self.collection is not None:
576 # Already have a collection object
577 self.collection.update()
578 new_collection_record = self.collection.api_response()
580 # Create a new collection object
581 if uuid_pattern.match(self.collection_locator):
582 coll_reader = arvados.collection.Collection(
583 self.collection_locator, self.api, self.api.keep,
584 num_retries=self.num_retries)
586 coll_reader = arvados.collection.CollectionReader(
587 self.collection_locator, self.api, self.api.keep,
588 num_retries=self.num_retries)
589 new_collection_record = coll_reader.api_response() or {}
590 # If the Collection only exists in Keep, there will be no API
591 # response. Fill in the fields we need.
592 if 'uuid' not in new_collection_record:
593 new_collection_record['uuid'] = self.collection_locator
594 if "portable_data_hash" not in new_collection_record:
595 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
596 if 'manifest_text' not in new_collection_record:
597 new_collection_record['manifest_text'] = coll_reader.manifest_text()
598 if 'storage_classes_desired' not in new_collection_record:
599 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
601 # end with llfuse.lock_released, re-acquire lock
603 if new_collection_record is not None:
604 if coll_reader is not None:
605 self.new_collection(new_collection_record, coll_reader)
607 self.new_collection_record(new_collection_record)
611 self._updating_lock.release()
612 except arvados.errors.NotFoundError as e:
613 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
614 except arvados.errors.ArgumentError as detail:
615 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
616 if new_collection_record is not None and "manifest_text" in new_collection_record:
617 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
619 _logger.exception("arv-mount %s: error", self.collection_locator)
620 if new_collection_record is not None and "manifest_text" in new_collection_record:
621 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
627 def collection_record(self):
629 return self.collection.api_response()
633 def __getitem__(self, item):
634 if item == '.arvados#collection':
635 if self.collection_record_file is None:
636 self.collection_record_file = FuncToJSONFile(
637 self.inode, self.collection_record)
638 self.inodes.add_entry(self.collection_record_file)
639 self.invalidate() # use lookup as a signal to force update
640 return self.collection_record_file
642 return super(CollectionDirectory, self).__getitem__(item)
644 def __contains__(self, k):
645 if k == '.arvados#collection':
648 return super(CollectionDirectory, self).__contains__(k)
650 def invalidate(self):
651 if self.collection_record_file is not None:
652 self.collection_record_file.invalidate()
653 self.inodes.invalidate_inode(self.collection_record_file)
654 super(CollectionDirectory, self).invalidate()
657 return (self.collection_locator is not None)
660 # This is a rough guess of the amount of overhead involved for
661 # a collection; the assumptions are that that each file
662 # averages 128 bytes in the manifest, but consume 1024 bytes
663 # of Python data structures, so 1024/128=8 means we estimate
664 # the RAM footprint at 8 times the size of bare manifest text.
665 return self._manifest_size * 8
668 if self.collection is None:
673 self.collection.save()
674 except Exception as e:
675 _logger.exception("Failed to save collection %s", self.collection_locator)
676 self.collection.stop_threads()
679 if self.collection is not None:
680 self.collection.stop_threads()
681 self._manifest_size = 0
682 super(CollectionDirectory, self).clear()
683 if self.collection_record_file is not None:
684 self.inodes.del_entry(self.collection_record_file)
685 self.collection_record_file = None
688 class TmpCollectionDirectory(CollectionDirectoryBase):
689 """A directory backed by an Arvados collection that never gets saved.
691 This supports using Keep as scratch space. A userspace program can
692 read the .arvados#collection file to get a current manifest in
693 order to save a snapshot of the scratch data or use it as a crunch
697 class UnsaveableCollection(arvados.collection.Collection):
703 def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
704 collection = self.UnsaveableCollection(
705 api_client=api_client,
706 keep_client=api_client.keep,
707 num_retries=num_retries,
708 storage_classes_desired=storage_classes)
709 # This is always enable_write=True because it never tries to
710 # save to the backend
711 super(TmpCollectionDirectory, self).__init__(
712 parent_inode, inodes, True, filters, collection, self)
713 self.populate(self.mtime())
715 def on_event(self, *args, **kwargs):
716 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
717 if self.collection_record_file is None:
720 # See discussion in CollectionDirectoryBase.on_event
724 self.collection.lock.release()
731 with self.collection.lock:
732 self.collection_record_file.invalidate()
733 self.inodes.invalidate_inode(self.collection_record_file)
734 _logger.debug("%s invalidated collection record", self.inode)
737 self.collection.lock.acquire()
740 def collection_record(self):
741 with llfuse.lock_released:
744 "manifest_text": self.collection.manifest_text(),
745 "portable_data_hash": self.collection.portable_data_hash(),
746 "storage_classes_desired": self.collection.storage_classes_desired(),
749 def __contains__(self, k):
750 return (k == '.arvados#collection' or
751 super(TmpCollectionDirectory, self).__contains__(k))
754 def __getitem__(self, item):
755 if item == '.arvados#collection':
756 if self.collection_record_file is None:
757 self.collection_record_file = FuncToJSONFile(
758 self.inode, self.collection_record)
759 self.inodes.add_entry(self.collection_record_file)
760 return self.collection_record_file
761 return super(TmpCollectionDirectory, self).__getitem__(item)
772 def want_event_subscribe(self):
776 self.collection.stop_threads()
778 def invalidate(self):
779 if self.collection_record_file:
780 self.collection_record_file.invalidate()
781 super(TmpCollectionDirectory, self).invalidate()
784 class MagicDirectory(Directory):
785 """A special directory that logically contains the set of all extant keep locators.
787 When a file is referenced by lookup(), it is tested to see if it is a valid
788 keep locator to a manifest, and if so, loads the manifest contents as a
789 subdirectory of this directory with the locator as the directory name.
790 Since querying a list of all extant keep locators is impractical, only
791 collections that have already been accessed are visible to readdir().
796 This directory provides access to Arvados collections as subdirectories listed
797 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
798 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
799 (in the form 'zzzzz-j7d0g-1234567890abcde').
801 Note that this directory will appear empty until you attempt to access a
802 specific collection or project subdirectory (such as trying to 'cd' into it),
803 at which point the collection or project will actually be looked up on the server
804 and the directory will appear if it exists.
808 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
809 super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
811 self.num_retries = num_retries
812 self.pdh_only = pdh_only
813 self.storage_classes = storage_classes
815 def __setattr__(self, name, value):
816 super(MagicDirectory, self).__setattr__(name, value)
817 # When we're assigned an inode, add a README.
818 if ((name == 'inode') and (self.inode is not None) and
819 (not self._entries)):
820 self._entries['README'] = self.inodes.add_entry(
821 StringFile(self.inode, self.README_TEXT, time.time()))
822 # If we're the root directory, add an identical by_id subdirectory.
823 if self.inode == llfuse.ROOT_INODE:
824 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
834 def __contains__(self, k):
835 if k in self._entries:
838 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
844 if group_uuid_pattern.match(k):
845 project = self.api.groups().list(
847 ['group_class', 'in', ['project','filter']],
849 *self._filters_for('groups', qualified=False),
851 ).execute(num_retries=self.num_retries)
852 if project[u'items_available'] == 0:
854 e = self.inodes.add_entry(ProjectDirectory(
861 project[u'items'][0],
862 storage_classes=self.storage_classes,
865 e = self.inodes.add_entry(CollectionDirectory(
876 if k not in self._entries:
879 self.inodes.del_entry(e)
882 self.inodes.invalidate_entry(self, k)
883 self.inodes.del_entry(e)
885 except Exception as ex:
886 _logger.exception("arv-mount lookup '%s':", k)
888 self.inodes.del_entry(e)
891 def __getitem__(self, item):
893 return self._entries[item]
895 raise KeyError("No collection with id " + item)
900 def want_event_subscribe(self):
901 return not self.pdh_only
904 class TagsDirectory(Directory):
905 """A special directory that contains as subdirectories all tags visible to the user."""
907 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
908 super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
910 self.num_retries = num_retries
912 self._poll_time = poll_time
915 def want_event_subscribe(self):
920 with llfuse.lock_released:
921 tags = self.api.links().list(
923 ['link_class', '=', 'tag'],
925 *self._filters_for('links', qualified=False),
930 ).execute(num_retries=self.num_retries)
933 tags['items']+[{"name": n} for n in self._extra],
935 lambda a, i: a.tag == i['name'],
936 lambda i: TagDirectory(
945 poll_time=self._poll_time,
951 def __getitem__(self, item):
952 if super(TagsDirectory, self).__contains__(item):
953 return super(TagsDirectory, self).__getitem__(item)
954 with llfuse.lock_released:
955 tags = self.api.links().list(
957 ['link_class', '=', 'tag'],
959 *self._filters_for('links', qualified=False),
962 ).execute(num_retries=self.num_retries)
964 self._extra.add(item)
966 return super(TagsDirectory, self).__getitem__(item)
970 def __contains__(self, k):
971 if super(TagsDirectory, self).__contains__(k):
981 class TagDirectory(Directory):
982 """A special directory that contains as subdirectories all collections visible
983 to the user that are tagged with a particular tag.
986 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
987 poll=False, poll_time=60):
988 super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
990 self.num_retries = num_retries
993 self._poll_time = poll_time
995 def want_event_subscribe(self):
1000 with llfuse.lock_released:
1001 taggedcollections = self.api.links().list(
1003 ['link_class', '=', 'tag'],
1004 ['name', '=', self.tag],
1005 ['head_uuid', 'is_a', 'arvados#collection'],
1006 *self._filters_for('links', qualified=False),
1008 select=['head_uuid'],
1009 ).execute(num_retries=self.num_retries)
1011 taggedcollections['items'],
1012 lambda i: i['head_uuid'],
1013 lambda a, i: a.collection_locator == i['head_uuid'],
1014 lambda i: CollectionDirectory(
1026 class ProjectDirectory(Directory):
1027 """A special directory that contains the contents of a project."""
1029 __slots__ = ("api", "num_retries", "project_object", "project_object_file",
1030 "project_uuid", "_updating_lock",
1031 "_current_user", "_full_listing", "storage_classes", "recursively_contained")
1033 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1034 project_object, poll=True, poll_time=3, storage_classes=None):
1035 super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1037 self.num_retries = num_retries
1038 self.project_object = project_object
1039 self.project_object_file = None
1040 self.project_uuid = project_object['uuid']
1042 self._poll_time = poll_time
1043 self._updating_lock = threading.Lock()
1044 self._current_user = None
1045 self._full_listing = False
1046 self.storage_classes = storage_classes
1047 self.recursively_contained = False
1049 # Filter groups can contain themselves, which causes tools
1050 # that walk the filesystem to get stuck in an infinite loop,
1051 # so suppress returning a listing in that case.
1052 if self.project_object.get("group_class") == "filter":
1053 iter_parent_inode = parent_inode
1054 while iter_parent_inode != llfuse.ROOT_INODE:
1055 parent_dir = self.inodes[iter_parent_inode]
1056 if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
1057 self.recursively_contained = True
1059 iter_parent_inode = parent_dir.parent_inode
1061 def want_event_subscribe(self):
1064 def createDirectory(self, i):
1065 common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1066 if collection_uuid_pattern.match(i['uuid']):
1067 return CollectionDirectory(*common_args, i)
1068 elif group_uuid_pattern.match(i['uuid']):
1069 return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
1070 elif link_uuid_pattern.match(i['uuid']):
1071 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1072 return CollectionDirectory(*common_args, i['head_uuid'])
1075 elif uuid_pattern.match(i['uuid']):
1076 return ObjectFile(self.parent_inode, i)
1081 return self.project_uuid
1084 self._full_listing = True
1085 return super(ProjectDirectory, self).items()
1087 def namefn(self, i):
1089 if i['name'] is None or len(i['name']) == 0:
1091 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1092 # collection or subproject
1094 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1097 elif 'kind' in i and i['kind'].startswith('arvados#'):
1099 return "{}.{}".format(i['name'], i['kind'][8:])
1106 if self.project_object_file == None:
1107 self.project_object_file = ObjectFile(self.inode, self.project_object)
1108 self.inodes.add_entry(self.project_object_file)
1110 if self.recursively_contained or not self._full_listing:
1114 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1115 return a.uuid() == i['uuid']
1116 elif isinstance(a, ObjectFile):
1117 return a.uuid() == i['uuid'] and not a.stale()
1121 with llfuse.lock_released:
1122 self._updating_lock.acquire()
1123 if not self.stale():
1126 if group_uuid_pattern.match(self.project_uuid):
1127 self.project_object = self.api.groups().get(
1128 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1129 elif user_uuid_pattern.match(self.project_uuid):
1130 self.project_object = self.api.users().get(
1131 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1132 # do this in 2 steps until #17424 is fixed
1133 contents = list(arvados.util.keyset_list_all(
1134 self.api.groups().contents,
1136 num_retries=self.num_retries,
1137 uuid=self.project_uuid,
1139 ['uuid', 'is_a', 'arvados#group'],
1140 ['groups.group_class', 'in', ['project', 'filter']],
1141 *self._filters_for('groups', qualified=True),
1144 contents.extend(obj for obj in arvados.util.keyset_list_all(
1145 self.api.groups().contents,
1147 num_retries=self.num_retries,
1148 uuid=self.project_uuid,
1150 ['uuid', 'is_a', 'arvados#collection'],
1151 *self._filters_for('collections', qualified=True),
1153 ) if obj['current_version_uuid'] == obj['uuid'])
1154 # end with llfuse.lock_released, re-acquire lock
1156 self.merge(contents,
1159 self.createDirectory)
1162 self._updating_lock.release()
1164 def _add_entry(self, i, name):
1165 ent = self.createDirectory(i)
1166 self._entries[name] = self.inodes.add_entry(ent)
1167 return self._entries[name]
1171 def __getitem__(self, k):
1172 if k == '.arvados#project':
1173 return self.project_object_file
1174 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1175 return super(ProjectDirectory, self).__getitem__(k)
1176 with llfuse.lock_released:
1177 k2 = self.unsanitize_filename(k)
1179 namefilter = ["name", "=", k]
1181 namefilter = ["name", "in", [k, k2]]
1182 contents = self.api.groups().list(
1184 ["owner_uuid", "=", self.project_uuid],
1185 ["group_class", "in", ["project","filter"]],
1187 *self._filters_for('groups', qualified=False),
1190 ).execute(num_retries=self.num_retries)["items"]
1192 contents = self.api.collections().list(
1194 ["owner_uuid", "=", self.project_uuid],
1196 *self._filters_for('collections', qualified=False),
1199 ).execute(num_retries=self.num_retries)["items"]
1201 if len(contents) > 1 and contents[1]['name'] == k:
1202 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1204 contents = [contents[1]]
1205 name = self.sanitize_filename(self.namefn(contents[0]))
1208 return self._add_entry(contents[0], name)
1213 def __contains__(self, k):
1214 if k == '.arvados#project':
1226 if not self._enable_write:
1228 with llfuse.lock_released:
1229 if not self._current_user:
1230 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1231 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1233 def persisted(self):
1237 super(ProjectDirectory, self).clear()
1238 if self.project_object_file is not None:
1239 self.inodes.del_entry(self.project_object_file)
1240 self.project_object_file = None
1244 def mkdir(self, name):
1245 if not self.writable():
1246 raise llfuse.FUSEError(errno.EROFS)
1249 with llfuse.lock_released:
1251 "owner_uuid": self.project_uuid,
1253 "manifest_text": "" }
1254 if self.storage_classes is not None:
1255 c["storage_classes_desired"] = self.storage_classes
1257 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1258 except Exception as e:
1261 except apiclient_errors.Error as error:
1262 _logger.error(error)
1263 raise llfuse.FUSEError(errno.EEXIST)
1267 def rmdir(self, name):
1268 if not self.writable():
1269 raise llfuse.FUSEError(errno.EROFS)
1271 if name not in self:
1272 raise llfuse.FUSEError(errno.ENOENT)
1273 if not isinstance(self[name], CollectionDirectory):
1274 raise llfuse.FUSEError(errno.EPERM)
1275 if len(self[name]) > 0:
1276 raise llfuse.FUSEError(errno.ENOTEMPTY)
1277 with llfuse.lock_released:
1278 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1283 def rename(self, name_old, name_new, src):
1284 if not self.writable():
1285 raise llfuse.FUSEError(errno.EROFS)
1287 if not isinstance(src, ProjectDirectory):
1288 raise llfuse.FUSEError(errno.EPERM)
1292 if not isinstance(ent, CollectionDirectory):
1293 raise llfuse.FUSEError(errno.EPERM)
1295 if name_new in self:
1296 # POSIX semantics for replacing one directory with another is
1297 # tricky (the target directory must be empty, the operation must be
1298 # atomic which isn't possible with the Arvados API as of this
1299 # writing) so don't support that.
1300 raise llfuse.FUSEError(errno.EPERM)
1302 self.api.collections().update(uuid=ent.uuid(),
1303 body={"owner_uuid": self.uuid(),
1304 "name": name_new}).execute(num_retries=self.num_retries)
1306 # Acually move the entry from source directory to this directory.
1307 del src._entries[name_old]
1308 self._entries[name_new] = ent
1309 self.inodes.invalidate_entry(src, name_old)
1312 def child_event(self, ev):
1313 properties = ev.get("properties") or {}
1314 old_attrs = properties.get("old_attributes") or {}
1315 new_attrs = properties.get("new_attributes") or {}
1316 old_attrs["uuid"] = ev["object_uuid"]
1317 new_attrs["uuid"] = ev["object_uuid"]
1318 old_name = self.sanitize_filename(self.namefn(old_attrs))
1319 new_name = self.sanitize_filename(self.namefn(new_attrs))
1321 # create events will have a new name, but not an old name
1322 # delete events will have an old name, but not a new name
1323 # update events will have an old and new name, and they may be same or different
1324 # if they are the same, an unrelated field changed and there is nothing to do.
1326 if old_attrs.get("owner_uuid") != self.project_uuid:
1327 # Was moved from somewhere else, so don't try to remove entry.
1329 if ev.get("object_owner_uuid") != self.project_uuid:
1330 # Was moved to somewhere else, so don't try to add entry
1333 if old_attrs.get("is_trashed"):
1334 # Was previously deleted
1336 if new_attrs.get("is_trashed"):
1340 if new_name != old_name:
1342 if old_name in self._entries:
1343 ent = self._entries[old_name]
1344 del self._entries[old_name]
1345 self.inodes.invalidate_entry(self, old_name)
1349 self._entries[new_name] = ent
1351 self._add_entry(new_attrs, new_name)
1352 elif ent is not None:
1353 self.inodes.del_entry(ent)
1356 class SharedDirectory(Directory):
1357 """A special directory that represents users or groups who have shared projects with me."""
1359 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1360 exclude, poll=False, poll_time=60, storage_classes=None):
1361 super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1363 self.num_retries = num_retries
1364 self.current_user = api.users().current().execute(num_retries=num_retries)
1366 self._poll_time = poll_time
1367 self._updating_lock = threading.Lock()
1368 self.storage_classes = storage_classes
1373 with llfuse.lock_released:
1374 self._updating_lock.acquire()
1375 if not self.stale():
1383 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1384 if 'httpMethod' in methods.get('shared', {}):
1387 resp = self.api.groups().shared(
1389 ['group_class', 'in', ['project','filter']],
1391 *self._filters_for('groups', qualified=False),
1396 include="owner_uuid",
1398 if not resp["items"]:
1400 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1401 for r in resp["items"]:
1402 objects[r["uuid"]] = r
1403 roots.append(r["uuid"])
1404 for r in resp["included"]:
1405 objects[r["uuid"]] = r
1406 root_owners.add(r["uuid"])
1408 all_projects = list(arvados.util.keyset_list_all(
1409 self.api.groups().list,
1411 num_retries=self.num_retries,
1413 ['group_class', 'in', ['project','filter']],
1414 *self._filters_for('groups', qualified=False),
1416 select=["uuid", "owner_uuid"],
1418 for ob in all_projects:
1419 objects[ob['uuid']] = ob
1421 current_uuid = self.current_user['uuid']
1422 for ob in all_projects:
1423 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1424 roots.append(ob['uuid'])
1425 root_owners.add(ob['owner_uuid'])
1427 lusers = arvados.util.keyset_list_all(
1428 self.api.users().list,
1430 num_retries=self.num_retries,
1432 ['uuid', 'in', list(root_owners)],
1433 *self._filters_for('users', qualified=False),
1436 lgroups = arvados.util.keyset_list_all(
1437 self.api.groups().list,
1439 num_retries=self.num_retries,
1441 ['uuid', 'in', list(root_owners)+roots],
1442 *self._filters_for('groups', qualified=False),
1446 objects[l["uuid"]] = l
1448 objects[l["uuid"]] = l
1450 for r in root_owners:
1454 contents[obr["name"]] = obr
1455 elif "first_name" in obr:
1456 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1461 if obr['owner_uuid'] not in objects:
1462 contents[obr["name"]] = obr
1464 # end with llfuse.lock_released, re-acquire lock
1469 lambda a, i: a.uuid() == i[1]['uuid'],
1470 lambda i: ProjectDirectory(
1479 poll_time=self._poll_time,
1480 storage_classes=self.storage_classes,
1484 _logger.exception("arv-mount shared dir error")
1486 self._updating_lock.release()
1488 def want_event_subscribe(self):