1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
15 from apiclient import errors as apiclient_errors
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
23 _logger = logging.getLogger('arvados.arvados_fuse')
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
32 class Directory(FreshBase):
33 """Generic directory object, backed by a dict.
35 Consists of a set of entries with the key representing the filename
36 and the value referencing a File or Directory object.
39 def __init__(self, parent_inode, inodes, enable_write, filters):
40 """parent_inode is the integer inode number"""
42 super(Directory, self).__init__()
45 if not isinstance(parent_inode, int):
46 raise Exception("parent_inode should be an int")
47 self.parent_inode = parent_inode
50 self._mtime = time.time()
51 self._enable_write = enable_write
52 self._filters = filters or []
54 def _filters_for(self, subtype, *, qualified):
55 for f in self._filters:
56 f_type, _, f_name = f[0].partition('.')
59 elif f_type != subtype:
64 yield [f_name, *f[1:]]
66 def unsanitize_filename(self, incoming):
67 """Replace ForwardSlashNameSubstitution value with /"""
68 fsns = self.inodes.forward_slash_subst()
69 if isinstance(fsns, str):
70 return incoming.replace(fsns, '/')
74 def sanitize_filename(self, dirty):
75 """Replace disallowed filename characters according to
76 ForwardSlashNameSubstitution in self.api_config."""
77 # '.' and '..' are not reachable if API server is newer than #6277
87 fsns = self.inodes.forward_slash_subst()
88 if isinstance(fsns, str):
89 dirty = dirty.replace('/', fsns)
90 return _disallowed_filename_characters.sub('_', dirty)
93 # Overridden by subclasses to implement logic to update the
94 # entries dict when the directory is stale
99 # Only used when computing the size of the disk footprint of the directory
107 def checkupdate(self):
111 except apiclient.errors.HttpError as e:
116 def __getitem__(self, item):
117 return self._entries[item]
122 return list(self._entries.items())
126 def __contains__(self, k):
127 return k in self._entries
132 return len(self._entries)
135 self.inodes.touch(self)
136 super(Directory, self).fresh()
139 # This is a very rough guess of the amount of overhead involved for
140 # each directory entry (128 bytes is 16 * 8-byte pointers).
141 return len(self._entries) * 128
143 def merge(self, items, fn, same, new_entry):
144 """Helper method for updating the contents of the directory.
146 Takes a list describing the new contents of the directory, reuse
147 entries that are the same in both the old and new lists, create new
148 entries, and delete old entries missing from the new list.
151 * items: Iterable --- New directory contents
153 * fn: Callable --- Takes an entry in 'items' and return the desired file or
154 directory name, or None if this entry should be skipped
156 * same: Callable --- Compare an existing entry (a File or Directory
157 object) with an entry in the items list to determine whether to keep
160 * new_entry: Callable --- Create a new directory entry (File or Directory
161 object) from an entry in the items list.
165 oldentries = self._entries
169 name = self.sanitize_filename(fn(i))
172 if name in oldentries:
173 ent = oldentries[name]
175 # move existing directory entry over
176 self._entries[name] = ent
178 self.inodes.inode_cache.touch(ent)
181 name = self.sanitize_filename(fn(i))
184 if name not in self._entries:
185 # create new directory entry
188 self._entries[name] = self.inodes.add_entry(ent)
190 _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
192 # delete any other directory entries that were not in found in 'items'
194 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
195 self.inodes.invalidate_entry(self, i)
196 self.inodes.del_entry(oldentries[i])
200 self.inodes.invalidate_inode(self)
201 self._mtime = time.time()
202 self.inodes.inode_cache.update_cache_size(self)
207 if super(Directory, self).in_use():
209 for v in self._entries.values():
215 """Delete all entries"""
216 oldentries = self._entries
220 self.inodes.del_entry(oldentries[n])
221 self.inodes.inode_cache.update_cache_size(self)
223 def kernel_invalidate(self):
224 # Invalidating the dentry on the parent implies invalidating all paths
226 if self.parent_inode in self.inodes:
227 parent = self.inodes[self.parent_inode]
229 # parent was removed already.
232 # Find self on the parent in order to invalidate this path.
233 # Calling the public items() method might trigger a refresh,
234 # which we definitely don't want, so read the internal dict directly.
235 for k,v in parent._entries.items():
237 self.inodes.invalidate_entry(parent, k)
240 self.inodes.invalidate_inode(self)
251 def want_event_subscribe(self):
252 raise NotImplementedError()
254 def create(self, name):
255 raise NotImplementedError()
257 def mkdir(self, name):
258 raise NotImplementedError()
260 def unlink(self, name):
261 raise NotImplementedError()
263 def rmdir(self, name):
264 raise NotImplementedError()
266 def rename(self, name_old, name_new, src):
267 raise NotImplementedError()
270 class CollectionDirectoryBase(Directory):
271 """Represent an Arvados Collection as a directory.
273 This class is used for Subcollections, and is also the base class for
274 CollectionDirectory, which implements collection loading/saving on
277 Most operations act only the underlying Arvados `Collection` object. The
278 `Collection` object signals via a notify callback to
279 `CollectionDirectoryBase.on_event` that an item was added, removed or
280 modified. FUSE inodes and directory entries are created, deleted or
281 invalidated in response to these events.
285 def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
286 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
287 self.collection = collection
288 self.collection_root = collection_root
289 self.collection_record_file = None
291 def new_entry(self, name, item, mtime):
292 name = self.sanitize_filename(name)
293 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
294 if item.fuse_entry.dead is not True:
295 raise Exception("Can only reparent dead inode entry")
296 if item.fuse_entry.inode is None:
297 raise Exception("Reparented entry must still have valid inode")
298 item.fuse_entry.dead = False
299 self._entries[name] = item.fuse_entry
300 elif isinstance(item, arvados.collection.RichCollectionBase):
301 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
307 self.collection_root,
309 self._entries[name].populate(mtime)
311 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
312 item.fuse_entry = self._entries[name]
314 def on_event(self, event, collection, name, item):
315 # These are events from the Collection object (ADD/DEL/MOD)
316 # emitted by operations on the Collection object (like
317 # "mkdirs" or "remove"), and by "update", which we need to
318 # synchronize with our FUSE objects that are assigned inodes.
319 if collection == self.collection:
320 name = self.sanitize_filename(name)
323 # It's possible for another thread to have llfuse.lock and
324 # be waiting on collection.lock. Meanwhile, we released
325 # llfuse.lock earlier in the stack, but are still holding
326 # on to the collection lock, and now we need to re-acquire
327 # llfuse.lock. If we don't release the collection lock,
328 # we'll deadlock where we're holding the collection lock
329 # waiting for llfuse.lock and the other thread is holding
330 # llfuse.lock and waiting for the collection lock.
332 # The correct locking order here is to take llfuse.lock
333 # first, then the collection lock.
335 # Since collection.lock is an RLock, it might be locked
336 # multiple times, so we need to release it multiple times,
337 # keep a count, then re-lock it the correct number of
343 self.collection.lock.release()
350 with self.collection.lock:
351 if event == arvados.collection.ADD:
352 self.new_entry(name, item, self.mtime())
353 elif event == arvados.collection.DEL:
354 ent = self._entries[name]
355 del self._entries[name]
356 self.inodes.invalidate_entry(self, name)
357 self.inodes.del_entry(ent)
358 elif event == arvados.collection.MOD:
359 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
360 self.inodes.invalidate_inode(item.fuse_entry)
361 elif name in self._entries:
362 self.inodes.invalidate_inode(self._entries[name])
364 if self.collection_record_file is not None:
365 self.collection_record_file.invalidate()
366 self.inodes.invalidate_inode(self.collection_record_file)
369 self.collection.lock.acquire()
372 def populate(self, mtime):
374 with self.collection.lock:
375 self.collection.subscribe(self.on_event)
376 for entry, item in self.collection.items():
377 self.new_entry(entry, item, self.mtime())
380 return self._enable_write and self.collection.writable()
384 self.collection_root.flush()
388 def create(self, name):
389 if not self.writable():
390 raise llfuse.FUSEError(errno.EROFS)
391 with llfuse.lock_released:
392 self.collection.open(name, "w").close()
396 def mkdir(self, name):
397 if not self.writable():
398 raise llfuse.FUSEError(errno.EROFS)
399 with llfuse.lock_released:
400 self.collection.mkdirs(name)
404 def unlink(self, name):
405 if not self.writable():
406 raise llfuse.FUSEError(errno.EROFS)
407 with llfuse.lock_released:
408 self.collection.remove(name)
413 def rmdir(self, name):
414 if not self.writable():
415 raise llfuse.FUSEError(errno.EROFS)
416 with llfuse.lock_released:
417 self.collection.remove(name)
422 def rename(self, name_old, name_new, src):
423 if not self.writable():
424 raise llfuse.FUSEError(errno.EROFS)
426 if not isinstance(src, CollectionDirectoryBase):
427 raise llfuse.FUSEError(errno.EPERM)
432 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
434 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
436 raise llfuse.FUSEError(errno.ENOTEMPTY)
437 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
438 raise llfuse.FUSEError(errno.ENOTDIR)
439 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
440 raise llfuse.FUSEError(errno.EISDIR)
442 with llfuse.lock_released:
443 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
448 super(CollectionDirectoryBase, self).clear()
449 self.collection = None
452 # objsize for the whole collection is represented at the root,
453 # don't double-count it
456 class CollectionDirectory(CollectionDirectoryBase):
457 """Represents the root of a directory tree representing a collection."""
459 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
460 super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
462 self.num_retries = num_retries
465 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
467 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
468 self._poll_time = 60*60
470 if isinstance(collection_record, dict):
471 self.collection_locator = collection_record['uuid']
472 self._mtime = convertTime(collection_record.get('modified_at'))
474 self.collection_locator = collection_record
476 self._manifest_size = 0
477 if self.collection_locator:
478 self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
479 self._updating_lock = threading.Lock()
482 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
485 return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
489 if not self.writable():
491 with llfuse.lock_released:
492 with self._updating_lock:
493 if self.collection.committed():
494 self.collection.update()
496 self.collection.save()
497 self.new_collection_record(self.collection.api_response())
499 def want_event_subscribe(self):
500 return (uuid_pattern.match(self.collection_locator) is not None)
502 def new_collection(self, new_collection_record, coll_reader):
505 self.collection = coll_reader
506 self.new_collection_record(new_collection_record)
507 self.populate(self.mtime())
509 def new_collection_record(self, new_collection_record):
510 if not new_collection_record:
511 raise Exception("invalid new_collection_record")
512 self._mtime = convertTime(new_collection_record.get('modified_at'))
513 self._manifest_size = len(new_collection_record["manifest_text"])
514 self.collection_locator = new_collection_record["uuid"]
515 if self.collection_record_file is not None:
516 self.collection_record_file.invalidate()
517 self.inodes.invalidate_inode(self.collection_record_file)
518 _logger.debug("%s invalidated collection record file", self)
519 self.inodes.inode_cache.update_cache_size(self)
523 return self.collection_locator
528 if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
529 # It's immutable, nothing to update
532 if self.collection_locator is None:
533 # No collection locator to retrieve from
537 new_collection_record = None
539 with llfuse.lock_released:
540 self._updating_lock.acquire()
544 _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
546 if self.collection is not None:
547 # Already have a collection object
548 self.collection.update()
549 new_collection_record = self.collection.api_response()
551 # Create a new collection object
552 if uuid_pattern.match(self.collection_locator):
553 coll_reader = arvados.collection.Collection(
554 self.collection_locator, self.api, self.api.keep,
555 num_retries=self.num_retries)
557 coll_reader = arvados.collection.CollectionReader(
558 self.collection_locator, self.api, self.api.keep,
559 num_retries=self.num_retries)
560 new_collection_record = coll_reader.api_response() or {}
561 # If the Collection only exists in Keep, there will be no API
562 # response. Fill in the fields we need.
563 if 'uuid' not in new_collection_record:
564 new_collection_record['uuid'] = self.collection_locator
565 if "portable_data_hash" not in new_collection_record:
566 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
567 if 'manifest_text' not in new_collection_record:
568 new_collection_record['manifest_text'] = coll_reader.manifest_text()
569 if 'storage_classes_desired' not in new_collection_record:
570 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
572 # end with llfuse.lock_released, re-acquire lock
574 if new_collection_record is not None:
575 if coll_reader is not None:
576 self.new_collection(new_collection_record, coll_reader)
578 self.new_collection_record(new_collection_record)
582 self._updating_lock.release()
583 except arvados.errors.NotFoundError as e:
584 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
585 except arvados.errors.ArgumentError as detail:
586 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
587 if new_collection_record is not None and "manifest_text" in new_collection_record:
588 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
590 _logger.exception("arv-mount %s: error", self.collection_locator)
591 if new_collection_record is not None and "manifest_text" in new_collection_record:
592 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
597 def collection_record(self):
599 return self.collection.api_response()
603 def __getitem__(self, item):
604 if item == '.arvados#collection':
605 if self.collection_record_file is None:
606 self.collection_record_file = FuncToJSONFile(
607 self.inode, self.collection_record)
608 self.inodes.add_entry(self.collection_record_file)
609 self.invalidate() # use lookup as a signal to force update
610 return self.collection_record_file
612 return super(CollectionDirectory, self).__getitem__(item)
614 def __contains__(self, k):
615 if k == '.arvados#collection':
618 return super(CollectionDirectory, self).__contains__(k)
620 def invalidate(self):
621 if self.collection_record_file is not None:
622 self.collection_record_file.invalidate()
623 self.inodes.invalidate_inode(self.collection_record_file)
624 super(CollectionDirectory, self).invalidate()
627 return (self.collection_locator is not None)
630 # This is a very rough guess of the amount of overhead
631 # involved for a collection; you've got the manifest text
632 # itself which is not discarded by the Collection class, then
633 # the block identifiers that get copied into their own
634 # strings, then the rest of the overhead of the Python
636 return self._manifest_size * 4
639 if self.collection is not None:
642 self.collection.save()
643 except Exception as e:
644 _logger.exception("Failed to save collection %s", self.collection_locator)
645 self.collection.stop_threads()
648 if self.collection is not None:
649 self.collection.stop_threads()
650 self._manifest_size = 0
651 super(CollectionDirectory, self).clear()
654 class TmpCollectionDirectory(CollectionDirectoryBase):
655 """A directory backed by an Arvados collection that never gets saved.
657 This supports using Keep as scratch space. A userspace program can
658 read the .arvados#collection file to get a current manifest in
659 order to save a snapshot of the scratch data or use it as a crunch
663 class UnsaveableCollection(arvados.collection.Collection):
669 def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
670 collection = self.UnsaveableCollection(
671 api_client=api_client,
672 keep_client=api_client.keep,
673 num_retries=num_retries,
674 storage_classes_desired=storage_classes)
675 # This is always enable_write=True because it never tries to
676 # save to the backend
677 super(TmpCollectionDirectory, self).__init__(
678 parent_inode, inodes, True, filters, collection, self)
679 self.populate(self.mtime())
681 def on_event(self, *args, **kwargs):
682 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
683 if self.collection_record_file is None:
686 # See discussion in CollectionDirectoryBase.on_event
690 self.collection.lock.release()
697 with self.collection.lock:
698 self.collection_record_file.invalidate()
699 self.inodes.invalidate_inode(self.collection_record_file)
700 _logger.debug("%s invalidated collection record", self)
703 self.collection.lock.acquire()
706 def collection_record(self):
707 with llfuse.lock_released:
710 "manifest_text": self.collection.manifest_text(),
711 "portable_data_hash": self.collection.portable_data_hash(),
712 "storage_classes_desired": self.collection.storage_classes_desired(),
715 def __contains__(self, k):
716 return (k == '.arvados#collection' or
717 super(TmpCollectionDirectory, self).__contains__(k))
720 def __getitem__(self, item):
721 if item == '.arvados#collection':
722 if self.collection_record_file is None:
723 self.collection_record_file = FuncToJSONFile(
724 self.inode, self.collection_record)
725 self.inodes.add_entry(self.collection_record_file)
726 return self.collection_record_file
727 return super(TmpCollectionDirectory, self).__getitem__(item)
738 def want_event_subscribe(self):
742 self.collection.stop_threads()
744 def invalidate(self):
745 if self.collection_record_file:
746 self.collection_record_file.invalidate()
747 super(TmpCollectionDirectory, self).invalidate()
750 class MagicDirectory(Directory):
751 """A special directory that logically contains the set of all extant keep locators.
753 When a file is referenced by lookup(), it is tested to see if it is a valid
754 keep locator to a manifest, and if so, loads the manifest contents as a
755 subdirectory of this directory with the locator as the directory name.
756 Since querying a list of all extant keep locators is impractical, only
757 collections that have already been accessed are visible to readdir().
762 This directory provides access to Arvados collections as subdirectories listed
763 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
764 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
765 (in the form 'zzzzz-j7d0g-1234567890abcde').
767 Note that this directory will appear empty until you attempt to access a
768 specific collection or project subdirectory (such as trying to 'cd' into it),
769 at which point the collection or project will actually be looked up on the server
770 and the directory will appear if it exists.
774 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
775 super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
777 self.num_retries = num_retries
778 self.pdh_only = pdh_only
779 self.storage_classes = storage_classes
781 def __setattr__(self, name, value):
782 super(MagicDirectory, self).__setattr__(name, value)
783 # When we're assigned an inode, add a README.
784 if ((name == 'inode') and (self.inode is not None) and
785 (not self._entries)):
786 self._entries['README'] = self.inodes.add_entry(
787 StringFile(self.inode, self.README_TEXT, time.time()))
788 # If we're the root directory, add an identical by_id subdirectory.
789 if self.inode == llfuse.ROOT_INODE:
790 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
800 def __contains__(self, k):
801 if k in self._entries:
804 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
810 if group_uuid_pattern.match(k):
811 project = self.api.groups().list(
813 ['group_class', 'in', ['project','filter']],
815 *self._filters_for('groups', qualified=False),
817 ).execute(num_retries=self.num_retries)
818 if project[u'items_available'] == 0:
820 e = self.inodes.add_entry(ProjectDirectory(
827 project[u'items'][0],
828 storage_classes=self.storage_classes,
831 e = self.inodes.add_entry(CollectionDirectory(
842 if k not in self._entries:
845 self.inodes.del_entry(e)
848 self.inodes.invalidate_entry(self, k)
849 self.inodes.del_entry(e)
851 except Exception as ex:
852 _logger.exception("arv-mount lookup '%s':", k)
854 self.inodes.del_entry(e)
857 def __getitem__(self, item):
859 return self._entries[item]
861 raise KeyError("No collection with id " + item)
866 def want_event_subscribe(self):
867 return not self.pdh_only
870 class TagsDirectory(Directory):
871 """A special directory that contains as subdirectories all tags visible to the user."""
873 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
874 super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
876 self.num_retries = num_retries
878 self._poll_time = poll_time
881 def want_event_subscribe(self):
886 with llfuse.lock_released:
887 tags = self.api.links().list(
889 ['link_class', '=', 'tag'],
891 *self._filters_for('links', qualified=False),
896 ).execute(num_retries=self.num_retries)
899 tags['items']+[{"name": n} for n in self._extra],
901 lambda a, i: a.tag == i['name'],
902 lambda i: TagDirectory(
911 poll_time=self._poll_time,
917 def __getitem__(self, item):
918 if super(TagsDirectory, self).__contains__(item):
919 return super(TagsDirectory, self).__getitem__(item)
920 with llfuse.lock_released:
921 tags = self.api.links().list(
923 ['link_class', '=', 'tag'],
925 *self._filters_for('links', qualified=False),
928 ).execute(num_retries=self.num_retries)
930 self._extra.add(item)
932 return super(TagsDirectory, self).__getitem__(item)
936 def __contains__(self, k):
937 if super(TagsDirectory, self).__contains__(k):
947 class TagDirectory(Directory):
948 """A special directory that contains as subdirectories all collections visible
949 to the user that are tagged with a particular tag.
952 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
953 poll=False, poll_time=60):
954 super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
956 self.num_retries = num_retries
959 self._poll_time = poll_time
961 def want_event_subscribe(self):
966 with llfuse.lock_released:
967 taggedcollections = self.api.links().list(
969 ['link_class', '=', 'tag'],
970 ['name', '=', self.tag],
971 ['head_uuid', 'is_a', 'arvados#collection'],
972 *self._filters_for('links', qualified=False),
974 select=['head_uuid'],
975 ).execute(num_retries=self.num_retries)
977 taggedcollections['items'],
978 lambda i: i['head_uuid'],
979 lambda a, i: a.collection_locator == i['head_uuid'],
980 lambda i: CollectionDirectory(
992 class ProjectDirectory(Directory):
993 """A special directory that contains the contents of a project."""
995 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
996 project_object, poll=True, poll_time=3, storage_classes=None):
997 super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
999 self.num_retries = num_retries
1000 self.project_object = project_object
1001 self.project_object_file = None
1002 self.project_uuid = project_object['uuid']
1004 self._poll_time = poll_time
1005 self._updating_lock = threading.Lock()
1006 self._current_user = None
1007 self._full_listing = False
1008 self.storage_classes = storage_classes
1009 self.recursively_contained = False
1011 # Filter groups can contain themselves, which causes tools
1012 # that walk the filesystem to get stuck in an infinite loop,
1013 # so suppress returning a listing in that case.
1014 if self.project_object.get("group_class") == "filter":
1015 iter_parent_inode = parent_inode
1016 while iter_parent_inode != llfuse.ROOT_INODE:
1017 parent_dir = self.inodes[iter_parent_inode]
1018 if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
1019 self.recursively_contained = True
1021 iter_parent_inode = parent_dir.parent_inode
1023 def want_event_subscribe(self):
1026 def createDirectory(self, i):
1027 common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1028 if collection_uuid_pattern.match(i['uuid']):
1029 return CollectionDirectory(*common_args, i)
1030 elif group_uuid_pattern.match(i['uuid']):
1031 return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
1032 elif link_uuid_pattern.match(i['uuid']):
1033 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1034 return CollectionDirectory(*common_args, i['head_uuid'])
1037 elif uuid_pattern.match(i['uuid']):
1038 return ObjectFile(self.parent_inode, i)
1043 return self.project_uuid
1046 self._full_listing = True
1047 return super(ProjectDirectory, self).items()
1049 def namefn(self, i):
1051 if i['name'] is None or len(i['name']) == 0:
1053 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1054 # collection or subproject
1056 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1059 elif 'kind' in i and i['kind'].startswith('arvados#'):
1061 return "{}.{}".format(i['name'], i['kind'][8:])
1068 if self.project_object_file == None:
1069 self.project_object_file = ObjectFile(self.inode, self.project_object)
1070 self.inodes.add_entry(self.project_object_file)
1072 if self.recursively_contained or not self._full_listing:
1076 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1077 return a.uuid() == i['uuid']
1078 elif isinstance(a, ObjectFile):
1079 return a.uuid() == i['uuid'] and not a.stale()
1083 with llfuse.lock_released:
1084 self._updating_lock.acquire()
1085 if not self.stale():
1088 if group_uuid_pattern.match(self.project_uuid):
1089 self.project_object = self.api.groups().get(
1090 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1091 elif user_uuid_pattern.match(self.project_uuid):
1092 self.project_object = self.api.users().get(
1093 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1094 # do this in 2 steps until #17424 is fixed
1095 contents = list(arvados.util.keyset_list_all(
1096 self.api.groups().contents,
1098 num_retries=self.num_retries,
1099 uuid=self.project_uuid,
1101 ['uuid', 'is_a', 'arvados#group'],
1102 ['groups.group_class', 'in', ['project', 'filter']],
1103 *self._filters_for('groups', qualified=True),
1106 contents.extend(obj for obj in arvados.util.keyset_list_all(
1107 self.api.groups().contents,
1109 num_retries=self.num_retries,
1110 uuid=self.project_uuid,
1112 ['uuid', 'is_a', 'arvados#collection'],
1113 *self._filters_for('collections', qualified=True),
1115 ) if obj['current_version_uuid'] == obj['uuid'])
1116 # end with llfuse.lock_released, re-acquire lock
1118 self.merge(contents,
1121 self.createDirectory)
1124 self._updating_lock.release()
1126 def _add_entry(self, i, name):
1127 ent = self.createDirectory(i)
1128 self._entries[name] = self.inodes.add_entry(ent)
1129 return self._entries[name]
1133 def __getitem__(self, k):
1134 if k == '.arvados#project':
1135 return self.project_object_file
1136 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1137 return super(ProjectDirectory, self).__getitem__(k)
1138 with llfuse.lock_released:
1139 k2 = self.unsanitize_filename(k)
1141 namefilter = ["name", "=", k]
1143 namefilter = ["name", "in", [k, k2]]
1144 contents = self.api.groups().list(
1146 ["owner_uuid", "=", self.project_uuid],
1147 ["group_class", "in", ["project","filter"]],
1149 *self._filters_for('groups', qualified=False),
1152 ).execute(num_retries=self.num_retries)["items"]
1154 contents = self.api.collections().list(
1156 ["owner_uuid", "=", self.project_uuid],
1158 *self._filters_for('collections', qualified=False),
1161 ).execute(num_retries=self.num_retries)["items"]
1163 if len(contents) > 1 and contents[1]['name'] == k:
1164 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1166 contents = [contents[1]]
1167 name = self.sanitize_filename(self.namefn(contents[0]))
1170 return self._add_entry(contents[0], name)
1175 def __contains__(self, k):
1176 if k == '.arvados#project':
1188 if not self._enable_write:
1190 with llfuse.lock_released:
1191 if not self._current_user:
1192 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1193 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1195 def persisted(self):
1200 def mkdir(self, name):
1201 if not self.writable():
1202 raise llfuse.FUSEError(errno.EROFS)
1205 with llfuse.lock_released:
1207 "owner_uuid": self.project_uuid,
1209 "manifest_text": "" }
1210 if self.storage_classes is not None:
1211 c["storage_classes_desired"] = self.storage_classes
1213 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1214 except Exception as e:
1217 except apiclient_errors.Error as error:
1218 _logger.error(error)
1219 raise llfuse.FUSEError(errno.EEXIST)
1223 def rmdir(self, name):
1224 if not self.writable():
1225 raise llfuse.FUSEError(errno.EROFS)
1227 if name not in self:
1228 raise llfuse.FUSEError(errno.ENOENT)
1229 if not isinstance(self[name], CollectionDirectory):
1230 raise llfuse.FUSEError(errno.EPERM)
1231 if len(self[name]) > 0:
1232 raise llfuse.FUSEError(errno.ENOTEMPTY)
1233 with llfuse.lock_released:
1234 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1239 def rename(self, name_old, name_new, src):
1240 if not self.writable():
1241 raise llfuse.FUSEError(errno.EROFS)
1243 if not isinstance(src, ProjectDirectory):
1244 raise llfuse.FUSEError(errno.EPERM)
1248 if not isinstance(ent, CollectionDirectory):
1249 raise llfuse.FUSEError(errno.EPERM)
1251 if name_new in self:
1252 # POSIX semantics for replacing one directory with another is
1253 # tricky (the target directory must be empty, the operation must be
1254 # atomic which isn't possible with the Arvados API as of this
1255 # writing) so don't support that.
1256 raise llfuse.FUSEError(errno.EPERM)
1258 self.api.collections().update(uuid=ent.uuid(),
1259 body={"owner_uuid": self.uuid(),
1260 "name": name_new}).execute(num_retries=self.num_retries)
1262 # Acually move the entry from source directory to this directory.
1263 del src._entries[name_old]
1264 self._entries[name_new] = ent
1265 self.inodes.invalidate_entry(src, name_old)
1268 def child_event(self, ev):
1269 properties = ev.get("properties") or {}
1270 old_attrs = properties.get("old_attributes") or {}
1271 new_attrs = properties.get("new_attributes") or {}
1272 old_attrs["uuid"] = ev["object_uuid"]
1273 new_attrs["uuid"] = ev["object_uuid"]
1274 old_name = self.sanitize_filename(self.namefn(old_attrs))
1275 new_name = self.sanitize_filename(self.namefn(new_attrs))
1277 # create events will have a new name, but not an old name
1278 # delete events will have an old name, but not a new name
1279 # update events will have an old and new name, and they may be same or different
1280 # if they are the same, an unrelated field changed and there is nothing to do.
1282 if old_attrs.get("owner_uuid") != self.project_uuid:
1283 # Was moved from somewhere else, so don't try to remove entry.
1285 if ev.get("object_owner_uuid") != self.project_uuid:
1286 # Was moved to somewhere else, so don't try to add entry
1289 if old_attrs.get("is_trashed"):
1290 # Was previously deleted
1292 if new_attrs.get("is_trashed"):
1296 if new_name != old_name:
1298 if old_name in self._entries:
1299 ent = self._entries[old_name]
1300 del self._entries[old_name]
1301 self.inodes.invalidate_entry(self, old_name)
1305 self._entries[new_name] = ent
1307 self._add_entry(new_attrs, new_name)
1308 elif ent is not None:
1309 self.inodes.del_entry(ent)
1312 class SharedDirectory(Directory):
1313 """A special directory that represents users or groups who have shared projects with me."""
1315 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1316 exclude, poll=False, poll_time=60, storage_classes=None):
1317 super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1319 self.num_retries = num_retries
1320 self.current_user = api.users().current().execute(num_retries=num_retries)
1322 self._poll_time = poll_time
1323 self._updating_lock = threading.Lock()
1324 self.storage_classes = storage_classes
1329 with llfuse.lock_released:
1330 self._updating_lock.acquire()
1331 if not self.stale():
1339 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1340 if 'httpMethod' in methods.get('shared', {}):
1343 resp = self.api.groups().shared(
1345 ['group_class', 'in', ['project','filter']],
1347 *self._filters_for('groups', qualified=False),
1352 include="owner_uuid",
1354 if not resp["items"]:
1356 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1357 for r in resp["items"]:
1358 objects[r["uuid"]] = r
1359 roots.append(r["uuid"])
1360 for r in resp["included"]:
1361 objects[r["uuid"]] = r
1362 root_owners.add(r["uuid"])
1364 all_projects = list(arvados.util.keyset_list_all(
1365 self.api.groups().list,
1367 num_retries=self.num_retries,
1369 ['group_class', 'in', ['project','filter']],
1370 *self._filters_for('groups', qualified=False),
1372 select=["uuid", "owner_uuid"],
1374 for ob in all_projects:
1375 objects[ob['uuid']] = ob
1377 current_uuid = self.current_user['uuid']
1378 for ob in all_projects:
1379 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1380 roots.append(ob['uuid'])
1381 root_owners.add(ob['owner_uuid'])
1383 lusers = arvados.util.keyset_list_all(
1384 self.api.users().list,
1386 num_retries=self.num_retries,
1388 ['uuid', 'in', list(root_owners)],
1389 *self._filters_for('users', qualified=False),
1392 lgroups = arvados.util.keyset_list_all(
1393 self.api.groups().list,
1395 num_retries=self.num_retries,
1397 ['uuid', 'in', list(root_owners)+roots],
1398 *self._filters_for('groups', qualified=False),
1402 objects[l["uuid"]] = l
1404 objects[l["uuid"]] = l
1406 for r in root_owners:
1410 contents[obr["name"]] = obr
1411 elif "first_name" in obr:
1412 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1417 if obr['owner_uuid'] not in objects:
1418 contents[obr["name"]] = obr
1420 # end with llfuse.lock_released, re-acquire lock
1425 lambda a, i: a.uuid() == i[1]['uuid'],
1426 lambda i: ProjectDirectory(
1435 poll_time=self._poll_time,
1436 storage_classes=self.storage_classes,
1440 _logger.exception("arv-mount shared dir error")
1442 self._updating_lock.release()
1444 def want_event_subscribe(self):