1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
15 from apiclient import errors as apiclient_errors
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
23 _logger = logging.getLogger('arvados.arvados_fuse')
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
32 class Directory(FreshBase):
33 """Generic directory object, backed by a dict.
35 Consists of a set of entries with the key representing the filename
36 and the value referencing a File or Directory object.
39 __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
41 def __init__(self, parent_inode, inodes, enable_write, filters):
42 """parent_inode is the integer inode number"""
44 super(Directory, self).__init__()
47 if not isinstance(parent_inode, int):
48 raise Exception("parent_inode should be an int")
49 self.parent_inode = parent_inode
52 self._mtime = time.time()
53 self._enable_write = enable_write
54 self._filters = filters or []
56 def _filters_for(self, subtype, *, qualified):
57 for f in self._filters:
58 f_type, _, f_name = f[0].partition('.')
61 elif f_type != subtype:
66 yield [f_name, *f[1:]]
68 def unsanitize_filename(self, incoming):
69 """Replace ForwardSlashNameSubstitution value with /"""
70 fsns = self.inodes.forward_slash_subst()
71 if isinstance(fsns, str):
72 return incoming.replace(fsns, '/')
76 def sanitize_filename(self, dirty):
77 """Replace disallowed filename characters according to
78 ForwardSlashNameSubstitution in self.api_config."""
79 # '.' and '..' are not reachable if API server is newer than #6277
89 fsns = self.inodes.forward_slash_subst()
90 if isinstance(fsns, str):
91 dirty = dirty.replace('/', fsns)
92 return _disallowed_filename_characters.sub('_', dirty)
95 # Overridden by subclasses to implement logic to update the
96 # entries dict when the directory is stale
101 # Only used when computing the size of the disk footprint of the directory
109 def checkupdate(self):
113 except apiclient.errors.HttpError as e:
118 def __getitem__(self, item):
119 return self._entries[item]
124 return list(self._entries.items())
128 def __contains__(self, k):
129 return k in self._entries
134 return len(self._entries)
137 self.inodes.touch(self)
138 super(Directory, self).fresh()
141 # Rough estimate of memory footprint based on using pympler
142 return len(self._entries) * 1024
144 def merge(self, items, fn, same, new_entry):
145 """Helper method for updating the contents of the directory.
147 Takes a list describing the new contents of the directory, reuse
148 entries that are the same in both the old and new lists, create new
149 entries, and delete old entries missing from the new list.
152 * items: Iterable --- New directory contents
154 * fn: Callable --- Takes an entry in 'items' and return the desired file or
155 directory name, or None if this entry should be skipped
157 * same: Callable --- Compare an existing entry (a File or Directory
158 object) with an entry in the items list to determine whether to keep
161 * new_entry: Callable --- Create a new directory entry (File or Directory
162 object) from an entry in the items list.
166 oldentries = self._entries
170 name = self.sanitize_filename(fn(i))
173 if name in oldentries:
174 ent = oldentries[name]
175 if same(ent, i) and ent.parent_inode == self.inode:
176 # move existing directory entry over
177 self._entries[name] = ent
179 self.inodes.inode_cache.touch(ent)
182 name = self.sanitize_filename(fn(i))
185 if name not in self._entries:
186 # create new directory entry
189 self._entries[name] = self.inodes.add_entry(ent)
190 # need to invalidate this just in case there was a
191 # previous entry that couldn't be moved over or a
192 # lookup that returned file not found and cached
194 self.inodes.invalidate_entry(self, name)
196 _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
198 # delete any other directory entries that were not in found in 'items'
199 for name, ent in oldentries.items():
200 _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
201 self.inodes.invalidate_entry(self, name)
202 self.inodes.del_entry(ent)
206 self._mtime = time.time()
207 self.inodes.inode_cache.update_cache_size(self)
212 if super(Directory, self).in_use():
214 for v in self._entries.values():
220 """Delete all entries"""
221 if not self._entries:
223 oldentries = self._entries
226 for name, ent in oldentries.items():
228 self.inodes.invalidate_entry(self, name)
229 self.inodes.del_entry(ent)
230 self.inodes.inode_cache.update_cache_size(self)
232 def kernel_invalidate(self):
233 # Invalidating the dentry on the parent implies invalidating all paths
235 if self.parent_inode in self.inodes:
236 parent = self.inodes[self.parent_inode]
238 # parent was removed already.
241 # Find self on the parent in order to invalidate this path.
242 # Calling the public items() method might trigger a refresh,
243 # which we definitely don't want, so read the internal dict directly.
244 for k,v in parent._entries.items():
246 self.inodes.invalidate_entry(parent, k)
258 def want_event_subscribe(self):
259 raise NotImplementedError()
261 def create(self, name):
262 raise NotImplementedError()
264 def mkdir(self, name):
265 raise NotImplementedError()
267 def unlink(self, name):
268 raise NotImplementedError()
270 def rmdir(self, name):
271 raise NotImplementedError()
273 def rename(self, name_old, name_new, src):
274 raise NotImplementedError()
277 class CollectionDirectoryBase(Directory):
278 """Represent an Arvados Collection as a directory.
280 This class is used for Subcollections, and is also the base class for
281 CollectionDirectory, which implements collection loading/saving on
284 Most operations act only the underlying Arvados `Collection` object. The
285 `Collection` object signals via a notify callback to
286 `CollectionDirectoryBase.on_event` that an item was added, removed or
287 modified. FUSE inodes and directory entries are created, deleted or
288 invalidated in response to these events.
292 __slots__ = ("collection", "collection_root", "collection_record_file")
294 def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
295 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
296 self.collection = collection
297 self.collection_root = collection_root
298 self.collection_record_file = None
300 def new_entry(self, name, item, mtime):
301 name = self.sanitize_filename(name)
302 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
303 if item.fuse_entry.parent_inode is not None:
304 raise Exception("Can only reparent unparented inode entry")
305 if item.fuse_entry.inode is None:
306 raise Exception("Reparented entry must still have valid inode")
307 item.fuse_entry.parent_inode = self.inode
308 self._entries[name] = item.fuse_entry
309 elif isinstance(item, arvados.collection.RichCollectionBase):
310 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
316 self.collection_root,
318 self._entries[name].populate(mtime)
320 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
321 item.fuse_entry = self._entries[name]
323 def on_event(self, event, collection, name, item):
324 # These are events from the Collection object (ADD/DEL/MOD)
325 # emitted by operations on the Collection object (like
326 # "mkdirs" or "remove"), and by "update", which we need to
327 # synchronize with our FUSE objects that are assigned inodes.
328 if collection == self.collection:
329 name = self.sanitize_filename(name)
332 # It's possible for another thread to have llfuse.lock and
333 # be waiting on collection.lock. Meanwhile, we released
334 # llfuse.lock earlier in the stack, but are still holding
335 # on to the collection lock, and now we need to re-acquire
336 # llfuse.lock. If we don't release the collection lock,
337 # we'll deadlock where we're holding the collection lock
338 # waiting for llfuse.lock and the other thread is holding
339 # llfuse.lock and waiting for the collection lock.
341 # The correct locking order here is to take llfuse.lock
342 # first, then the collection lock.
344 # Since collection.lock is an RLock, it might be locked
345 # multiple times, so we need to release it multiple times,
346 # keep a count, then re-lock it the correct number of
352 self.collection.lock.release()
359 with self.collection.lock:
360 if event == arvados.collection.ADD:
361 self.new_entry(name, item, self.mtime())
362 elif event == arvados.collection.DEL:
363 ent = self._entries[name]
364 del self._entries[name]
365 self.inodes.invalidate_entry(self, name)
366 self.inodes.del_entry(ent)
367 elif event == arvados.collection.MOD:
368 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
369 self.inodes.invalidate_inode(item.fuse_entry)
370 elif name in self._entries:
371 self.inodes.invalidate_inode(self._entries[name])
373 if self.collection_record_file is not None:
374 self.collection_record_file.invalidate()
375 self.inodes.invalidate_inode(self.collection_record_file)
378 self.collection.lock.acquire()
381 def populate(self, mtime):
383 with self.collection.lock:
384 self.collection.subscribe(self.on_event)
385 for entry, item in self.collection.items():
386 self.new_entry(entry, item, self.mtime())
389 return self._enable_write and self.collection.writable()
393 self.collection_root.flush()
397 def create(self, name):
398 if not self.writable():
399 raise llfuse.FUSEError(errno.EROFS)
400 with llfuse.lock_released:
401 self.collection.open(name, "w").close()
405 def mkdir(self, name):
406 if not self.writable():
407 raise llfuse.FUSEError(errno.EROFS)
408 with llfuse.lock_released:
409 self.collection.mkdirs(name)
413 def unlink(self, name):
414 if not self.writable():
415 raise llfuse.FUSEError(errno.EROFS)
416 with llfuse.lock_released:
417 self.collection.remove(name)
422 def rmdir(self, name):
423 if not self.writable():
424 raise llfuse.FUSEError(errno.EROFS)
425 with llfuse.lock_released:
426 self.collection.remove(name)
431 def rename(self, name_old, name_new, src):
432 if not self.writable():
433 raise llfuse.FUSEError(errno.EROFS)
435 if not isinstance(src, CollectionDirectoryBase):
436 raise llfuse.FUSEError(errno.EPERM)
441 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
443 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
445 raise llfuse.FUSEError(errno.ENOTEMPTY)
446 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
447 raise llfuse.FUSEError(errno.ENOTDIR)
448 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
449 raise llfuse.FUSEError(errno.EISDIR)
451 with llfuse.lock_released:
452 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
457 super(CollectionDirectoryBase, self).clear()
458 if self.collection is not None:
459 self.collection.unsubscribe()
460 self.collection = None
463 # objsize for the whole collection is represented at the root,
464 # don't double-count it
467 class CollectionDirectory(CollectionDirectoryBase):
468 """Represents the root of a directory tree representing a collection."""
470 __slots__ = ("api", "num_retries", "collection_locator",
471 "_manifest_size", "_writable", "_updating_lock")
473 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
474 super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
476 self.num_retries = num_retries
479 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
481 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
482 self._poll_time = 60*60
484 if isinstance(collection_record, dict):
485 self.collection_locator = collection_record['uuid']
486 self._mtime = convertTime(collection_record.get('modified_at'))
488 self.collection_locator = collection_record
490 self._manifest_size = 0
491 if self.collection_locator:
492 self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
493 self._updating_lock = threading.Lock()
496 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
499 return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
503 if not self.writable():
505 with llfuse.lock_released:
506 with self._updating_lock:
507 if self.collection.committed():
508 self.collection.update()
510 self.collection.save()
511 self.new_collection_record(self.collection.api_response())
513 def want_event_subscribe(self):
514 return (uuid_pattern.match(self.collection_locator) is not None)
516 def new_collection(self, new_collection_record, coll_reader):
519 self.collection = coll_reader
520 self.new_collection_record(new_collection_record)
521 self.populate(self.mtime())
523 def new_collection_record(self, new_collection_record):
524 if not new_collection_record:
525 raise Exception("invalid new_collection_record")
526 self._mtime = convertTime(new_collection_record.get('modified_at'))
527 self._manifest_size = len(new_collection_record["manifest_text"])
528 self.collection_locator = new_collection_record["uuid"]
529 if self.collection_record_file is not None:
530 self.collection_record_file.invalidate()
531 self.inodes.invalidate_inode(self.collection_record_file)
532 _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
533 self.collection_record_file.inode)
534 self.inodes.update_uuid(self)
535 self.inodes.inode_cache.update_cache_size(self)
539 return self.collection_locator
544 if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
545 # It's immutable, nothing to update
548 if self.collection_locator is None:
549 # No collection locator to retrieve from
553 new_collection_record = None
555 with llfuse.lock_released:
556 self._updating_lock.acquire()
560 _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
562 if self.collection is not None:
563 # Already have a collection object
564 self.collection.update()
565 new_collection_record = self.collection.api_response()
567 # Create a new collection object
568 if uuid_pattern.match(self.collection_locator):
569 coll_reader = arvados.collection.Collection(
570 self.collection_locator, self.api, self.api.keep,
571 num_retries=self.num_retries)
573 coll_reader = arvados.collection.CollectionReader(
574 self.collection_locator, self.api, self.api.keep,
575 num_retries=self.num_retries)
576 new_collection_record = coll_reader.api_response() or {}
577 # If the Collection only exists in Keep, there will be no API
578 # response. Fill in the fields we need.
579 if 'uuid' not in new_collection_record:
580 new_collection_record['uuid'] = self.collection_locator
581 if "portable_data_hash" not in new_collection_record:
582 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
583 if 'manifest_text' not in new_collection_record:
584 new_collection_record['manifest_text'] = coll_reader.manifest_text()
585 if 'storage_classes_desired' not in new_collection_record:
586 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
588 # end with llfuse.lock_released, re-acquire lock
590 if new_collection_record is not None:
591 if coll_reader is not None:
592 self.new_collection(new_collection_record, coll_reader)
594 self.new_collection_record(new_collection_record)
598 self._updating_lock.release()
599 except arvados.errors.NotFoundError as e:
600 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
601 except arvados.errors.ArgumentError as detail:
602 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
603 if new_collection_record is not None and "manifest_text" in new_collection_record:
604 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
606 _logger.exception("arv-mount %s: error", self.collection_locator)
607 if new_collection_record is not None and "manifest_text" in new_collection_record:
608 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
614 def collection_record(self):
616 return self.collection.api_response()
620 def __getitem__(self, item):
621 if item == '.arvados#collection':
622 if self.collection_record_file is None:
623 self.collection_record_file = FuncToJSONFile(
624 self.inode, self.collection_record)
625 self.inodes.add_entry(self.collection_record_file)
626 self.invalidate() # use lookup as a signal to force update
627 return self.collection_record_file
629 return super(CollectionDirectory, self).__getitem__(item)
631 def __contains__(self, k):
632 if k == '.arvados#collection':
635 return super(CollectionDirectory, self).__contains__(k)
637 def invalidate(self):
638 if self.collection_record_file is not None:
639 self.collection_record_file.invalidate()
640 self.inodes.invalidate_inode(self.collection_record_file)
641 super(CollectionDirectory, self).invalidate()
644 return (self.collection_locator is not None)
647 # This is a rough guess of the amount of overhead involved for
648 # a collection; the assumptions are that that each file
649 # averages 128 bytes in the manifest, but consume 1024 bytes
650 # of Python data structures, so 1024/128=8 means we estimate
651 # the RAM footprint at 8 times the size of bare manifest text.
652 return self._manifest_size * 8
655 if self.collection is None:
660 self.collection.save()
661 except Exception as e:
662 _logger.exception("Failed to save collection %s", self.collection_locator)
663 self.collection.stop_threads()
666 if self.collection is not None:
667 self.collection.stop_threads()
668 self._manifest_size = 0
669 super(CollectionDirectory, self).clear()
670 if self.collection_record_file is not None:
671 self.inodes.del_entry(self.collection_record_file)
672 self.collection_record_file = None
675 class TmpCollectionDirectory(CollectionDirectoryBase):
676 """A directory backed by an Arvados collection that never gets saved.
678 This supports using Keep as scratch space. A userspace program can
679 read the .arvados#collection file to get a current manifest in
680 order to save a snapshot of the scratch data or use it as a crunch
684 class UnsaveableCollection(arvados.collection.Collection):
690 def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
691 collection = self.UnsaveableCollection(
692 api_client=api_client,
693 keep_client=api_client.keep,
694 num_retries=num_retries,
695 storage_classes_desired=storage_classes)
696 # This is always enable_write=True because it never tries to
697 # save to the backend
698 super(TmpCollectionDirectory, self).__init__(
699 parent_inode, inodes, True, filters, collection, self)
700 self.populate(self.mtime())
702 def on_event(self, *args, **kwargs):
703 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
704 if self.collection_record_file is None:
707 # See discussion in CollectionDirectoryBase.on_event
711 self.collection.lock.release()
718 with self.collection.lock:
719 self.collection_record_file.invalidate()
720 self.inodes.invalidate_inode(self.collection_record_file)
721 _logger.debug("%s invalidated collection record", self.inode)
724 self.collection.lock.acquire()
727 def collection_record(self):
728 with llfuse.lock_released:
731 "manifest_text": self.collection.manifest_text(),
732 "portable_data_hash": self.collection.portable_data_hash(),
733 "storage_classes_desired": self.collection.storage_classes_desired(),
736 def __contains__(self, k):
737 return (k == '.arvados#collection' or
738 super(TmpCollectionDirectory, self).__contains__(k))
741 def __getitem__(self, item):
742 if item == '.arvados#collection':
743 if self.collection_record_file is None:
744 self.collection_record_file = FuncToJSONFile(
745 self.inode, self.collection_record)
746 self.inodes.add_entry(self.collection_record_file)
747 return self.collection_record_file
748 return super(TmpCollectionDirectory, self).__getitem__(item)
759 def want_event_subscribe(self):
763 self.collection.stop_threads()
765 def invalidate(self):
766 if self.collection_record_file:
767 self.collection_record_file.invalidate()
768 super(TmpCollectionDirectory, self).invalidate()
771 class MagicDirectory(Directory):
772 """A special directory that logically contains the set of all extant keep locators.
774 When a file is referenced by lookup(), it is tested to see if it is a valid
775 keep locator to a manifest, and if so, loads the manifest contents as a
776 subdirectory of this directory with the locator as the directory name.
777 Since querying a list of all extant keep locators is impractical, only
778 collections that have already been accessed are visible to readdir().
783 This directory provides access to Arvados collections as subdirectories listed
784 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
785 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
786 (in the form 'zzzzz-j7d0g-1234567890abcde').
788 Note that this directory will appear empty until you attempt to access a
789 specific collection or project subdirectory (such as trying to 'cd' into it),
790 at which point the collection or project will actually be looked up on the server
791 and the directory will appear if it exists.
795 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
796 super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
798 self.num_retries = num_retries
799 self.pdh_only = pdh_only
800 self.storage_classes = storage_classes
802 def __setattr__(self, name, value):
803 super(MagicDirectory, self).__setattr__(name, value)
804 # When we're assigned an inode, add a README.
805 if ((name == 'inode') and (self.inode is not None) and
806 (not self._entries)):
807 self._entries['README'] = self.inodes.add_entry(
808 StringFile(self.inode, self.README_TEXT, time.time()))
809 # If we're the root directory, add an identical by_id subdirectory.
810 if self.inode == llfuse.ROOT_INODE:
811 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
821 def __contains__(self, k):
822 if k in self._entries:
825 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
831 if group_uuid_pattern.match(k):
832 project = self.api.groups().list(
834 ['group_class', 'in', ['project','filter']],
836 *self._filters_for('groups', qualified=False),
838 ).execute(num_retries=self.num_retries)
839 if project[u'items_available'] == 0:
841 e = self.inodes.add_entry(ProjectDirectory(
848 project[u'items'][0],
849 storage_classes=self.storage_classes,
852 e = self.inodes.add_entry(CollectionDirectory(
863 if k not in self._entries:
866 self.inodes.del_entry(e)
869 self.inodes.invalidate_entry(self, k)
870 self.inodes.del_entry(e)
872 except Exception as ex:
873 _logger.exception("arv-mount lookup '%s':", k)
875 self.inodes.del_entry(e)
878 def __getitem__(self, item):
880 return self._entries[item]
882 raise KeyError("No collection with id " + item)
887 def want_event_subscribe(self):
888 return not self.pdh_only
891 class TagsDirectory(Directory):
892 """A special directory that contains as subdirectories all tags visible to the user."""
894 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
895 super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
897 self.num_retries = num_retries
899 self._poll_time = poll_time
902 def want_event_subscribe(self):
907 with llfuse.lock_released:
908 tags = self.api.links().list(
910 ['link_class', '=', 'tag'],
912 *self._filters_for('links', qualified=False),
917 ).execute(num_retries=self.num_retries)
920 tags['items']+[{"name": n} for n in self._extra],
922 lambda a, i: a.tag == i['name'],
923 lambda i: TagDirectory(
932 poll_time=self._poll_time,
938 def __getitem__(self, item):
939 if super(TagsDirectory, self).__contains__(item):
940 return super(TagsDirectory, self).__getitem__(item)
941 with llfuse.lock_released:
942 tags = self.api.links().list(
944 ['link_class', '=', 'tag'],
946 *self._filters_for('links', qualified=False),
949 ).execute(num_retries=self.num_retries)
951 self._extra.add(item)
953 return super(TagsDirectory, self).__getitem__(item)
957 def __contains__(self, k):
958 if super(TagsDirectory, self).__contains__(k):
968 class TagDirectory(Directory):
969 """A special directory that contains as subdirectories all collections visible
970 to the user that are tagged with a particular tag.
973 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
974 poll=False, poll_time=60):
975 super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
977 self.num_retries = num_retries
980 self._poll_time = poll_time
982 def want_event_subscribe(self):
987 with llfuse.lock_released:
988 taggedcollections = self.api.links().list(
990 ['link_class', '=', 'tag'],
991 ['name', '=', self.tag],
992 ['head_uuid', 'is_a', 'arvados#collection'],
993 *self._filters_for('links', qualified=False),
995 select=['head_uuid'],
996 ).execute(num_retries=self.num_retries)
998 taggedcollections['items'],
999 lambda i: i['head_uuid'],
1000 lambda a, i: a.collection_locator == i['head_uuid'],
1001 lambda i: CollectionDirectory(
1013 class ProjectDirectory(Directory):
1014 """A special directory that contains the contents of a project."""
1016 __slots__ = ("api", "num_retries", "project_object", "project_object_file",
1017 "project_uuid", "_updating_lock",
1018 "_current_user", "_full_listing", "storage_classes", "recursively_contained")
1020 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1021 project_object, poll=True, poll_time=3, storage_classes=None):
1022 super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1024 self.num_retries = num_retries
1025 self.project_object = project_object
1026 self.project_object_file = None
1027 self.project_uuid = project_object['uuid']
1029 self._poll_time = poll_time
1030 self._updating_lock = threading.Lock()
1031 self._current_user = None
1032 self._full_listing = False
1033 self.storage_classes = storage_classes
1034 self.recursively_contained = False
1036 # Filter groups can contain themselves, which causes tools
1037 # that walk the filesystem to get stuck in an infinite loop,
1038 # so suppress returning a listing in that case.
1039 if self.project_object.get("group_class") == "filter":
1040 iter_parent_inode = parent_inode
1041 while iter_parent_inode != llfuse.ROOT_INODE:
1042 parent_dir = self.inodes[iter_parent_inode]
1043 if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
1044 self.recursively_contained = True
1046 iter_parent_inode = parent_dir.parent_inode
1048 def want_event_subscribe(self):
1051 def createDirectory(self, i):
1052 common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1053 if collection_uuid_pattern.match(i['uuid']):
1054 return CollectionDirectory(*common_args, i)
1055 elif group_uuid_pattern.match(i['uuid']):
1056 return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
1057 elif link_uuid_pattern.match(i['uuid']):
1058 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1059 return CollectionDirectory(*common_args, i['head_uuid'])
1062 elif uuid_pattern.match(i['uuid']):
1063 return ObjectFile(self.parent_inode, i)
1068 return self.project_uuid
1071 self._full_listing = True
1072 return super(ProjectDirectory, self).items()
1074 def namefn(self, i):
1076 if i['name'] is None or len(i['name']) == 0:
1078 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1079 # collection or subproject
1081 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1084 elif 'kind' in i and i['kind'].startswith('arvados#'):
1086 return "{}.{}".format(i['name'], i['kind'][8:])
1093 if self.project_object_file == None:
1094 self.project_object_file = ObjectFile(self.inode, self.project_object)
1095 self.inodes.add_entry(self.project_object_file)
1097 if self.recursively_contained or not self._full_listing:
1101 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1102 return a.uuid() == i['uuid']
1103 elif isinstance(a, ObjectFile):
1104 return a.uuid() == i['uuid'] and not a.stale()
1108 with llfuse.lock_released:
1109 self._updating_lock.acquire()
1110 if not self.stale():
1113 if group_uuid_pattern.match(self.project_uuid):
1114 self.project_object = self.api.groups().get(
1115 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1116 elif user_uuid_pattern.match(self.project_uuid):
1117 self.project_object = self.api.users().get(
1118 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1119 # do this in 2 steps until #17424 is fixed
1120 contents = list(arvados.util.keyset_list_all(
1121 self.api.groups().contents,
1123 num_retries=self.num_retries,
1124 uuid=self.project_uuid,
1126 ['uuid', 'is_a', 'arvados#group'],
1127 ['groups.group_class', 'in', ['project', 'filter']],
1128 *self._filters_for('groups', qualified=True),
1131 contents.extend(obj for obj in arvados.util.keyset_list_all(
1132 self.api.groups().contents,
1134 num_retries=self.num_retries,
1135 uuid=self.project_uuid,
1137 ['uuid', 'is_a', 'arvados#collection'],
1138 *self._filters_for('collections', qualified=True),
1140 ) if obj['current_version_uuid'] == obj['uuid'])
1141 # end with llfuse.lock_released, re-acquire lock
1143 self.merge(contents,
1146 self.createDirectory)
1149 self._updating_lock.release()
1151 def _add_entry(self, i, name):
1152 ent = self.createDirectory(i)
1153 self._entries[name] = self.inodes.add_entry(ent)
1154 return self._entries[name]
1158 def __getitem__(self, k):
1159 if k == '.arvados#project':
1160 return self.project_object_file
1161 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1162 return super(ProjectDirectory, self).__getitem__(k)
1163 with llfuse.lock_released:
1164 k2 = self.unsanitize_filename(k)
1166 namefilter = ["name", "=", k]
1168 namefilter = ["name", "in", [k, k2]]
1169 contents = self.api.groups().list(
1171 ["owner_uuid", "=", self.project_uuid],
1172 ["group_class", "in", ["project","filter"]],
1174 *self._filters_for('groups', qualified=False),
1177 ).execute(num_retries=self.num_retries)["items"]
1179 contents = self.api.collections().list(
1181 ["owner_uuid", "=", self.project_uuid],
1183 *self._filters_for('collections', qualified=False),
1186 ).execute(num_retries=self.num_retries)["items"]
1188 if len(contents) > 1 and contents[1]['name'] == k:
1189 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1191 contents = [contents[1]]
1192 name = self.sanitize_filename(self.namefn(contents[0]))
1195 return self._add_entry(contents[0], name)
1200 def __contains__(self, k):
1201 if k == '.arvados#project':
1213 if not self._enable_write:
1215 with llfuse.lock_released:
1216 if not self._current_user:
1217 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1218 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1220 def persisted(self):
1224 super(ProjectDirectory, self).clear()
1225 if self.project_object_file is not None:
1226 self.inodes.del_entry(self.project_object_file)
1227 self.project_object_file = None
1231 def mkdir(self, name):
1232 if not self.writable():
1233 raise llfuse.FUSEError(errno.EROFS)
1236 with llfuse.lock_released:
1238 "owner_uuid": self.project_uuid,
1240 "manifest_text": "" }
1241 if self.storage_classes is not None:
1242 c["storage_classes_desired"] = self.storage_classes
1244 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1245 except Exception as e:
1248 except apiclient_errors.Error as error:
1249 _logger.error(error)
1250 raise llfuse.FUSEError(errno.EEXIST)
1254 def rmdir(self, name):
1255 if not self.writable():
1256 raise llfuse.FUSEError(errno.EROFS)
1258 if name not in self:
1259 raise llfuse.FUSEError(errno.ENOENT)
1260 if not isinstance(self[name], CollectionDirectory):
1261 raise llfuse.FUSEError(errno.EPERM)
1262 if len(self[name]) > 0:
1263 raise llfuse.FUSEError(errno.ENOTEMPTY)
1264 with llfuse.lock_released:
1265 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1270 def rename(self, name_old, name_new, src):
1271 if not self.writable():
1272 raise llfuse.FUSEError(errno.EROFS)
1274 if not isinstance(src, ProjectDirectory):
1275 raise llfuse.FUSEError(errno.EPERM)
1279 if not isinstance(ent, CollectionDirectory):
1280 raise llfuse.FUSEError(errno.EPERM)
1282 if name_new in self:
1283 # POSIX semantics for replacing one directory with another is
1284 # tricky (the target directory must be empty, the operation must be
1285 # atomic which isn't possible with the Arvados API as of this
1286 # writing) so don't support that.
1287 raise llfuse.FUSEError(errno.EPERM)
1289 self.api.collections().update(uuid=ent.uuid(),
1290 body={"owner_uuid": self.uuid(),
1291 "name": name_new}).execute(num_retries=self.num_retries)
1293 # Acually move the entry from source directory to this directory.
1294 del src._entries[name_old]
1295 self._entries[name_new] = ent
1296 self.inodes.invalidate_entry(src, name_old)
1299 def child_event(self, ev):
1300 properties = ev.get("properties") or {}
1301 old_attrs = properties.get("old_attributes") or {}
1302 new_attrs = properties.get("new_attributes") or {}
1303 old_attrs["uuid"] = ev["object_uuid"]
1304 new_attrs["uuid"] = ev["object_uuid"]
1305 old_name = self.sanitize_filename(self.namefn(old_attrs))
1306 new_name = self.sanitize_filename(self.namefn(new_attrs))
1308 # create events will have a new name, but not an old name
1309 # delete events will have an old name, but not a new name
1310 # update events will have an old and new name, and they may be same or different
1311 # if they are the same, an unrelated field changed and there is nothing to do.
1313 if old_attrs.get("owner_uuid") != self.project_uuid:
1314 # Was moved from somewhere else, so don't try to remove entry.
1316 if ev.get("object_owner_uuid") != self.project_uuid:
1317 # Was moved to somewhere else, so don't try to add entry
1320 if old_attrs.get("is_trashed"):
1321 # Was previously deleted
1323 if new_attrs.get("is_trashed"):
1327 if new_name != old_name:
1329 if old_name in self._entries:
1330 ent = self._entries[old_name]
1331 del self._entries[old_name]
1332 self.inodes.invalidate_entry(self, old_name)
1336 self._entries[new_name] = ent
1338 self._add_entry(new_attrs, new_name)
1339 elif ent is not None:
1340 self.inodes.del_entry(ent)
1343 class SharedDirectory(Directory):
1344 """A special directory that represents users or groups who have shared projects with me."""
1346 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1347 exclude, poll=False, poll_time=60, storage_classes=None):
1348 super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
1350 self.num_retries = num_retries
1351 self.current_user = api.users().current().execute(num_retries=num_retries)
1353 self._poll_time = poll_time
1354 self._updating_lock = threading.Lock()
1355 self.storage_classes = storage_classes
1360 with llfuse.lock_released:
1361 self._updating_lock.acquire()
1362 if not self.stale():
1370 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1371 if 'httpMethod' in methods.get('shared', {}):
1374 resp = self.api.groups().shared(
1376 ['group_class', 'in', ['project','filter']],
1378 *self._filters_for('groups', qualified=False),
1383 include="owner_uuid",
1385 if not resp["items"]:
1387 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1388 for r in resp["items"]:
1389 objects[r["uuid"]] = r
1390 roots.append(r["uuid"])
1391 for r in resp["included"]:
1392 objects[r["uuid"]] = r
1393 root_owners.add(r["uuid"])
1395 all_projects = list(arvados.util.keyset_list_all(
1396 self.api.groups().list,
1398 num_retries=self.num_retries,
1400 ['group_class', 'in', ['project','filter']],
1401 *self._filters_for('groups', qualified=False),
1403 select=["uuid", "owner_uuid"],
1405 for ob in all_projects:
1406 objects[ob['uuid']] = ob
1408 current_uuid = self.current_user['uuid']
1409 for ob in all_projects:
1410 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1411 roots.append(ob['uuid'])
1412 root_owners.add(ob['owner_uuid'])
1414 lusers = arvados.util.keyset_list_all(
1415 self.api.users().list,
1417 num_retries=self.num_retries,
1419 ['uuid', 'in', list(root_owners)],
1420 *self._filters_for('users', qualified=False),
1423 lgroups = arvados.util.keyset_list_all(
1424 self.api.groups().list,
1426 num_retries=self.num_retries,
1428 ['uuid', 'in', list(root_owners)+roots],
1429 *self._filters_for('groups', qualified=False),
1433 objects[l["uuid"]] = l
1435 objects[l["uuid"]] = l
1437 for r in root_owners:
1441 contents[obr["name"]] = obr
1442 elif "first_name" in obr:
1443 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1448 if obr['owner_uuid'] not in objects:
1449 contents[obr["name"]] = obr
1451 # end with llfuse.lock_released, re-acquire lock
1456 lambda a, i: a.uuid() == i[1]['uuid'],
1457 lambda i: ProjectDirectory(
1466 poll_time=self._poll_time,
1467 storage_classes=self.storage_classes,
1471 _logger.exception("arv-mount shared dir error")
1473 self._updating_lock.release()
1475 def want_event_subscribe(self):