1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
15 from apiclient import errors as apiclient_errors
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
23 _logger = logging.getLogger('arvados.arvados_fuse')
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
32 class Directory(FreshBase):
33 """Generic directory object, backed by a dict.
35 Consists of a set of entries with the key representing the filename
36 and the value referencing a File or Directory object.
39 def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters):
40 """parent_inode is the integer inode number"""
42 super(Directory, self).__init__()
45 if not isinstance(parent_inode, int):
46 raise Exception("parent_inode should be an int")
47 self.parent_inode = parent_inode
49 self.apiconfig = apiconfig
51 self._mtime = time.time()
52 self._enable_write = enable_write
53 self._filters = filters or []
55 def _filters_for(self, subtype, *, qualified):
56 for f in self._filters:
57 f_type, _, f_name = f[0].partition('.')
60 elif f_type != subtype:
65 yield [f_name, *f[1:]]
67 def forward_slash_subst(self):
68 if not hasattr(self, '_fsns'):
70 config = self.apiconfig()
72 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
74 # old API server with no FSNS config
77 if self._fsns == '' or self._fsns == '/':
81 def unsanitize_filename(self, incoming):
82 """Replace ForwardSlashNameSubstitution value with /"""
83 fsns = self.forward_slash_subst()
84 if isinstance(fsns, str):
85 return incoming.replace(fsns, '/')
89 def sanitize_filename(self, dirty):
90 """Replace disallowed filename characters according to
91 ForwardSlashNameSubstitution in self.api_config."""
92 # '.' and '..' are not reachable if API server is newer than #6277
102 fsns = self.forward_slash_subst()
103 if isinstance(fsns, str):
104 dirty = dirty.replace('/', fsns)
105 return _disallowed_filename_characters.sub('_', dirty)
108 # Overridden by subclasses to implement logic to update the
109 # entries dict when the directory is stale
114 # Only used when computing the size of the disk footprint of the directory
122 def checkupdate(self):
126 except apiclient.errors.HttpError as e:
131 def __getitem__(self, item):
132 return self._entries[item]
137 return list(self._entries.items())
141 def __contains__(self, k):
142 return k in self._entries
147 return len(self._entries)
150 self.inodes.touch(self)
151 super(Directory, self).fresh()
153 def merge(self, items, fn, same, new_entry):
154 """Helper method for updating the contents of the directory.
156 Takes a list describing the new contents of the directory, reuse
157 entries that are the same in both the old and new lists, create new
158 entries, and delete old entries missing from the new list.
160 :items: iterable with new directory contents
162 :fn: function to take an entry in 'items' and return the desired file or
163 directory name, or None if this entry should be skipped
165 :same: function to compare an existing entry (a File or Directory
166 object) with an entry in the items list to determine whether to keep
169 :new_entry: function to create a new directory entry (File or Directory
170 object) from an entry in the items list.
174 oldentries = self._entries
178 name = self.sanitize_filename(fn(i))
181 if name in oldentries:
182 ent = oldentries[name]
185 # move existing directory entry over
186 self._entries[name] = ent
190 name = self.sanitize_filename(fn(i))
193 if name not in self._entries:
194 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
195 # create new directory entry
199 self._entries[name] = self.inodes.add_entry(ent)
202 # delete any other directory entries that were not in found in 'items'
204 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
205 self.inodes.invalidate_entry(self, i)
206 self.inodes.del_entry(oldentries[i])
211 self.inodes.invalidate_inode(self)
212 self._mtime = time.time()
214 self.inodes.inode_cache.cap_cache()
216 for ent in self._entries.values():
222 if super(Directory, self).in_use():
224 for v in self._entries.values():
229 def has_ref(self, only_children):
230 if super(Directory, self).has_ref(only_children):
232 for v in self._entries.values():
238 """Delete all entries"""
239 oldentries = self._entries
242 oldentries[n].clear()
243 self.inodes.del_entry(oldentries[n])
246 def kernel_invalidate(self):
247 # Invalidating the dentry on the parent implies invalidating all paths
249 parent = self.inodes[self.parent_inode]
251 # Find self on the parent in order to invalidate this path.
252 # Calling the public items() method might trigger a refresh,
253 # which we definitely don't want, so read the internal dict directly.
254 for k,v in parent._entries.items():
256 self.inodes.invalidate_entry(parent, k)
268 def want_event_subscribe(self):
269 raise NotImplementedError()
271 def create(self, name):
272 raise NotImplementedError()
274 def mkdir(self, name):
275 raise NotImplementedError()
277 def unlink(self, name):
278 raise NotImplementedError()
280 def rmdir(self, name):
281 raise NotImplementedError()
283 def rename(self, name_old, name_new, src):
284 raise NotImplementedError()
287 class CollectionDirectoryBase(Directory):
288 """Represent an Arvados Collection as a directory.
290 This class is used for Subcollections, and is also the base class for
291 CollectionDirectory, which implements collection loading/saving on
294 Most operations act only the underlying Arvados `Collection` object. The
295 `Collection` object signals via a notify callback to
296 `CollectionDirectoryBase.on_event` that an item was added, removed or
297 modified. FUSE inodes and directory entries are created, deleted or
298 invalidated in response to these events.
302 def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters, collection, collection_root):
303 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write, filters)
304 self.apiconfig = apiconfig
305 self.collection = collection
306 self.collection_root = collection_root
307 self.collection_record_file = None
309 def new_entry(self, name, item, mtime):
310 name = self.sanitize_filename(name)
311 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
312 if item.fuse_entry.dead is not True:
313 raise Exception("Can only reparent dead inode entry")
314 if item.fuse_entry.inode is None:
315 raise Exception("Reparented entry must still have valid inode")
316 item.fuse_entry.dead = False
317 self._entries[name] = item.fuse_entry
318 elif isinstance(item, arvados.collection.RichCollectionBase):
319 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
326 self.collection_root,
328 self._entries[name].populate(mtime)
330 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
331 item.fuse_entry = self._entries[name]
333 def on_event(self, event, collection, name, item):
334 # These are events from the Collection object (ADD/DEL/MOD)
335 # emitted by operations on the Collection object (like
336 # "mkdirs" or "remove"), and by "update", which we need to
337 # synchronize with our FUSE objects that are assigned inodes.
338 if collection == self.collection:
339 name = self.sanitize_filename(name)
342 # It's possible for another thread to have llfuse.lock and
343 # be waiting on collection.lock. Meanwhile, we released
344 # llfuse.lock earlier in the stack, but are still holding
345 # on to the collection lock, and now we need to re-acquire
346 # llfuse.lock. If we don't release the collection lock,
347 # we'll deadlock where we're holding the collection lock
348 # waiting for llfuse.lock and the other thread is holding
349 # llfuse.lock and waiting for the collection lock.
351 # The correct locking order here is to take llfuse.lock
352 # first, then the collection lock.
354 # Since collection.lock is an RLock, it might be locked
355 # multiple times, so we need to release it multiple times,
356 # keep a count, then re-lock it the correct number of
362 self.collection.lock.release()
369 with self.collection.lock:
370 if event == arvados.collection.ADD:
371 self.new_entry(name, item, self.mtime())
372 elif event == arvados.collection.DEL:
373 ent = self._entries[name]
374 del self._entries[name]
375 self.inodes.invalidate_entry(self, name)
376 self.inodes.del_entry(ent)
377 elif event == arvados.collection.MOD:
378 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
379 self.inodes.invalidate_inode(item.fuse_entry)
380 elif name in self._entries:
381 self.inodes.invalidate_inode(self._entries[name])
383 if self.collection_record_file is not None:
384 self.collection_record_file.invalidate()
385 self.inodes.invalidate_inode(self.collection_record_file)
388 self.collection.lock.acquire()
391 def populate(self, mtime):
393 with self.collection.lock:
394 self.collection.subscribe(self.on_event)
395 for entry, item in self.collection.items():
396 self.new_entry(entry, item, self.mtime())
399 return self._enable_write and self.collection.writable()
403 self.collection_root.flush()
407 def create(self, name):
408 if not self.writable():
409 raise llfuse.FUSEError(errno.EROFS)
410 with llfuse.lock_released:
411 self.collection.open(name, "w").close()
415 def mkdir(self, name):
416 if not self.writable():
417 raise llfuse.FUSEError(errno.EROFS)
418 with llfuse.lock_released:
419 self.collection.mkdirs(name)
423 def unlink(self, name):
424 if not self.writable():
425 raise llfuse.FUSEError(errno.EROFS)
426 with llfuse.lock_released:
427 self.collection.remove(name)
432 def rmdir(self, name):
433 if not self.writable():
434 raise llfuse.FUSEError(errno.EROFS)
435 with llfuse.lock_released:
436 self.collection.remove(name)
441 def rename(self, name_old, name_new, src):
442 if not self.writable():
443 raise llfuse.FUSEError(errno.EROFS)
445 if not isinstance(src, CollectionDirectoryBase):
446 raise llfuse.FUSEError(errno.EPERM)
451 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
453 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
455 raise llfuse.FUSEError(errno.ENOTEMPTY)
456 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
457 raise llfuse.FUSEError(errno.ENOTDIR)
458 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
459 raise llfuse.FUSEError(errno.EISDIR)
461 with llfuse.lock_released:
462 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
467 super(CollectionDirectoryBase, self).clear()
468 self.collection = None
471 class CollectionDirectory(CollectionDirectoryBase):
472 """Represents the root of a directory tree representing a collection."""
474 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
475 super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters, None, self)
477 self.num_retries = num_retries
480 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
482 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
483 self._poll_time = 60*60
485 if isinstance(collection_record, dict):
486 self.collection_locator = collection_record['uuid']
487 self._mtime = convertTime(collection_record.get('modified_at'))
489 self.collection_locator = collection_record
491 self._manifest_size = 0
492 if self.collection_locator:
493 self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
494 self._updating_lock = threading.Lock()
497 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
500 return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
504 if not self.writable():
506 with llfuse.lock_released:
507 with self._updating_lock:
508 if self.collection.committed():
509 self.collection.update()
511 self.collection.save()
512 self.new_collection_record(self.collection.api_response())
514 def want_event_subscribe(self):
515 return (uuid_pattern.match(self.collection_locator) is not None)
517 def new_collection(self, new_collection_record, coll_reader):
520 self.collection = coll_reader
521 self.new_collection_record(new_collection_record)
522 self.populate(self.mtime())
524 def new_collection_record(self, new_collection_record):
525 if not new_collection_record:
526 raise Exception("invalid new_collection_record")
527 self._mtime = convertTime(new_collection_record.get('modified_at'))
528 self._manifest_size = len(new_collection_record["manifest_text"])
529 self.collection_locator = new_collection_record["uuid"]
530 if self.collection_record_file is not None:
531 self.collection_record_file.invalidate()
532 self.inodes.invalidate_inode(self.collection_record_file)
533 _logger.debug("%s invalidated collection record file", self)
537 return self.collection_locator
542 if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
543 # It's immutable, nothing to update
546 if self.collection_locator is None:
547 # No collection locator to retrieve from
551 new_collection_record = None
553 with llfuse.lock_released:
554 self._updating_lock.acquire()
558 _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
560 if self.collection is not None:
561 # Already have a collection object
562 self.collection.update()
563 new_collection_record = self.collection.api_response()
565 # Create a new collection object
566 if uuid_pattern.match(self.collection_locator):
567 coll_reader = arvados.collection.Collection(
568 self.collection_locator, self.api, self.api.keep,
569 num_retries=self.num_retries)
571 coll_reader = arvados.collection.CollectionReader(
572 self.collection_locator, self.api, self.api.keep,
573 num_retries=self.num_retries)
574 new_collection_record = coll_reader.api_response() or {}
575 # If the Collection only exists in Keep, there will be no API
576 # response. Fill in the fields we need.
577 if 'uuid' not in new_collection_record:
578 new_collection_record['uuid'] = self.collection_locator
579 if "portable_data_hash" not in new_collection_record:
580 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
581 if 'manifest_text' not in new_collection_record:
582 new_collection_record['manifest_text'] = coll_reader.manifest_text()
583 if 'storage_classes_desired' not in new_collection_record:
584 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
586 # end with llfuse.lock_released, re-acquire lock
588 if new_collection_record is not None:
589 if coll_reader is not None:
590 self.new_collection(new_collection_record, coll_reader)
592 self.new_collection_record(new_collection_record)
596 self._updating_lock.release()
597 except arvados.errors.NotFoundError as e:
598 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
599 except arvados.errors.ArgumentError as detail:
600 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
601 if new_collection_record is not None and "manifest_text" in new_collection_record:
602 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
604 _logger.exception("arv-mount %s: error", self.collection_locator)
605 if new_collection_record is not None and "manifest_text" in new_collection_record:
606 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
611 def collection_record(self):
613 return self.collection.api_response()
617 def __getitem__(self, item):
618 if item == '.arvados#collection':
619 if self.collection_record_file is None:
620 self.collection_record_file = FuncToJSONFile(
621 self.inode, self.collection_record)
622 self.inodes.add_entry(self.collection_record_file)
623 self.invalidate() # use lookup as a signal to force update
624 return self.collection_record_file
626 return super(CollectionDirectory, self).__getitem__(item)
628 def __contains__(self, k):
629 if k == '.arvados#collection':
632 return super(CollectionDirectory, self).__contains__(k)
634 def invalidate(self):
635 if self.collection_record_file is not None:
636 self.collection_record_file.invalidate()
637 self.inodes.invalidate_inode(self.collection_record_file)
638 super(CollectionDirectory, self).invalidate()
641 return (self.collection_locator is not None)
644 # This is an empirically-derived heuristic to estimate the memory used
645 # to store this collection's metadata. Calculating the memory
646 # footprint directly would be more accurate, but also more complicated.
647 return self._manifest_size * 128
650 if self.collection is not None:
652 self.collection.save()
653 self.collection.stop_threads()
656 if self.collection is not None:
657 self.collection.stop_threads()
658 super(CollectionDirectory, self).clear()
659 self._manifest_size = 0
662 class TmpCollectionDirectory(CollectionDirectoryBase):
663 """A directory backed by an Arvados collection that never gets saved.
665 This supports using Keep as scratch space. A userspace program can
666 read the .arvados#collection file to get a current manifest in
667 order to save a snapshot of the scratch data or use it as a crunch
671 class UnsaveableCollection(arvados.collection.Collection):
677 def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
678 collection = self.UnsaveableCollection(
679 api_client=api_client,
680 keep_client=api_client.keep,
681 num_retries=num_retries,
682 storage_classes_desired=storage_classes)
683 # This is always enable_write=True because it never tries to
684 # save to the backend
685 super(TmpCollectionDirectory, self).__init__(
686 parent_inode, inodes, api_client.config, True, filters, collection, self)
687 self.populate(self.mtime())
689 def on_event(self, *args, **kwargs):
690 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
691 if self.collection_record_file is None:
694 # See discussion in CollectionDirectoryBase.on_event
698 self.collection.lock.release()
705 with self.collection.lock:
706 self.collection_record_file.invalidate()
707 self.inodes.invalidate_inode(self.collection_record_file)
708 _logger.debug("%s invalidated collection record", self)
711 self.collection.lock.acquire()
714 def collection_record(self):
715 with llfuse.lock_released:
718 "manifest_text": self.collection.manifest_text(),
719 "portable_data_hash": self.collection.portable_data_hash(),
720 "storage_classes_desired": self.collection.storage_classes_desired(),
723 def __contains__(self, k):
724 return (k == '.arvados#collection' or
725 super(TmpCollectionDirectory, self).__contains__(k))
728 def __getitem__(self, item):
729 if item == '.arvados#collection':
730 if self.collection_record_file is None:
731 self.collection_record_file = FuncToJSONFile(
732 self.inode, self.collection_record)
733 self.inodes.add_entry(self.collection_record_file)
734 return self.collection_record_file
735 return super(TmpCollectionDirectory, self).__getitem__(item)
746 def want_event_subscribe(self):
750 self.collection.stop_threads()
752 def invalidate(self):
753 if self.collection_record_file:
754 self.collection_record_file.invalidate()
755 super(TmpCollectionDirectory, self).invalidate()
758 class MagicDirectory(Directory):
759 """A special directory that logically contains the set of all extant keep locators.
761 When a file is referenced by lookup(), it is tested to see if it is a valid
762 keep locator to a manifest, and if so, loads the manifest contents as a
763 subdirectory of this directory with the locator as the directory name.
764 Since querying a list of all extant keep locators is impractical, only
765 collections that have already been accessed are visible to readdir().
770 This directory provides access to Arvados collections as subdirectories listed
771 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
772 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
773 (in the form 'zzzzz-j7d0g-1234567890abcde').
775 Note that this directory will appear empty until you attempt to access a
776 specific collection or project subdirectory (such as trying to 'cd' into it),
777 at which point the collection or project will actually be looked up on the server
778 and the directory will appear if it exists.
782 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
783 super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
785 self.num_retries = num_retries
786 self.pdh_only = pdh_only
787 self.storage_classes = storage_classes
789 def __setattr__(self, name, value):
790 super(MagicDirectory, self).__setattr__(name, value)
791 # When we're assigned an inode, add a README.
792 if ((name == 'inode') and (self.inode is not None) and
793 (not self._entries)):
794 self._entries['README'] = self.inodes.add_entry(
795 StringFile(self.inode, self.README_TEXT, time.time()))
796 # If we're the root directory, add an identical by_id subdirectory.
797 if self.inode == llfuse.ROOT_INODE:
798 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
808 def __contains__(self, k):
809 if k in self._entries:
812 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
818 if group_uuid_pattern.match(k):
819 project = self.api.groups().list(
821 ['group_class', 'in', ['project','filter']],
823 *self._filters_for('groups', qualified=False),
825 ).execute(num_retries=self.num_retries)
826 if project[u'items_available'] == 0:
828 e = self.inodes.add_entry(ProjectDirectory(
835 project[u'items'][0],
836 storage_classes=self.storage_classes,
839 e = self.inodes.add_entry(CollectionDirectory(
850 if k not in self._entries:
853 self.inodes.del_entry(e)
856 self.inodes.invalidate_entry(self, k)
857 self.inodes.del_entry(e)
859 except Exception as ex:
860 _logger.exception("arv-mount lookup '%s':", k)
862 self.inodes.del_entry(e)
865 def __getitem__(self, item):
867 return self._entries[item]
869 raise KeyError("No collection with id " + item)
874 def want_event_subscribe(self):
875 return not self.pdh_only
878 class TagsDirectory(Directory):
879 """A special directory that contains as subdirectories all tags visible to the user."""
881 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
882 super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
884 self.num_retries = num_retries
886 self._poll_time = poll_time
889 def want_event_subscribe(self):
894 with llfuse.lock_released:
895 tags = self.api.links().list(
897 ['link_class', '=', 'tag'],
899 *self._filters_for('links', qualified=False),
904 ).execute(num_retries=self.num_retries)
907 tags['items']+[{"name": n} for n in self._extra],
909 lambda a, i: a.tag == i['name'],
910 lambda i: TagDirectory(
919 poll_time=self._poll_time,
925 def __getitem__(self, item):
926 if super(TagsDirectory, self).__contains__(item):
927 return super(TagsDirectory, self).__getitem__(item)
928 with llfuse.lock_released:
929 tags = self.api.links().list(
931 ['link_class', '=', 'tag'],
933 *self._filters_for('links', qualified=False),
936 ).execute(num_retries=self.num_retries)
938 self._extra.add(item)
940 return super(TagsDirectory, self).__getitem__(item)
944 def __contains__(self, k):
945 if super(TagsDirectory, self).__contains__(k):
955 class TagDirectory(Directory):
956 """A special directory that contains as subdirectories all collections visible
957 to the user that are tagged with a particular tag.
960 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
961 poll=False, poll_time=60):
962 super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
964 self.num_retries = num_retries
967 self._poll_time = poll_time
969 def want_event_subscribe(self):
974 with llfuse.lock_released:
975 taggedcollections = self.api.links().list(
977 ['link_class', '=', 'tag'],
978 ['name', '=', self.tag],
979 ['head_uuid', 'is_a', 'arvados#collection'],
980 *self._filters_for('links', qualified=False),
982 select=['head_uuid'],
983 ).execute(num_retries=self.num_retries)
985 taggedcollections['items'],
986 lambda i: i['head_uuid'],
987 lambda a, i: a.collection_locator == i['head_uuid'],
988 lambda i: CollectionDirectory(
1000 class ProjectDirectory(Directory):
1001 """A special directory that contains the contents of a project."""
1003 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1004 project_object, poll=True, poll_time=3, storage_classes=None):
1005 super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
1007 self.num_retries = num_retries
1008 self.project_object = project_object
1009 self.project_object_file = None
1010 self.project_uuid = project_object['uuid']
1012 self._poll_time = poll_time
1013 self._updating_lock = threading.Lock()
1014 self._current_user = None
1015 self._full_listing = False
1016 self.storage_classes = storage_classes
1018 def want_event_subscribe(self):
1021 def createDirectory(self, i):
1022 common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1023 if collection_uuid_pattern.match(i['uuid']):
1024 return CollectionDirectory(*common_args, i)
1025 elif group_uuid_pattern.match(i['uuid']):
1026 return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
1027 elif link_uuid_pattern.match(i['uuid']):
1028 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1029 return CollectionDirectory(*common_args, i['head_uuid'])
1032 elif uuid_pattern.match(i['uuid']):
1033 return ObjectFile(self.parent_inode, i)
1038 return self.project_uuid
1041 self._full_listing = True
1042 return super(ProjectDirectory, self).items()
1044 def namefn(self, i):
1046 if i['name'] is None or len(i['name']) == 0:
1048 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1049 # collection or subproject
1051 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1054 elif 'kind' in i and i['kind'].startswith('arvados#'):
1056 return "{}.{}".format(i['name'], i['kind'][8:])
1063 if self.project_object_file == None:
1064 self.project_object_file = ObjectFile(self.inode, self.project_object)
1065 self.inodes.add_entry(self.project_object_file)
1067 if not self._full_listing:
1071 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1072 return a.uuid() == i['uuid']
1073 elif isinstance(a, ObjectFile):
1074 return a.uuid() == i['uuid'] and not a.stale()
1078 with llfuse.lock_released:
1079 _logger.debug("Getting lock to update %s", self.project_uuid)
1080 self._updating_lock.acquire()
1081 if not self.stale():
1082 _logger.debug("%s was updated already", self.project_uuid)
1085 _logger.debug("Requesting update of %s", self.project_uuid)
1087 if group_uuid_pattern.match(self.project_uuid):
1088 self.project_object = self.api.groups().get(
1089 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1090 elif user_uuid_pattern.match(self.project_uuid):
1091 self.project_object = self.api.users().get(
1092 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1093 # do this in 2 steps until #17424 is fixed
1094 contents = list(arvados.util.keyset_list_all(
1095 self.api.groups().contents,
1097 num_retries=self.num_retries,
1098 uuid=self.project_uuid,
1100 ['uuid', 'is_a', 'arvados#group'],
1101 ['groups.group_class', 'in', ['project', 'filter']],
1102 *self._filters_for('groups', qualified=True),
1105 contents.extend(obj for obj in arvados.util.keyset_list_all(
1106 self.api.groups().contents,
1108 num_retries=self.num_retries,
1109 uuid=self.project_uuid,
1111 ['uuid', 'is_a', 'arvados#collection'],
1112 *self._filters_for('collections', qualified=True),
1114 ) if obj['current_version_uuid'] == obj['uuid'])
1115 # end with llfuse.lock_released, re-acquire lock
1117 self.merge(contents,
1120 self.createDirectory)
1123 self._updating_lock.release()
1125 def _add_entry(self, i, name):
1126 ent = self.createDirectory(i)
1127 self._entries[name] = self.inodes.add_entry(ent)
1128 return self._entries[name]
1132 def __getitem__(self, k):
1133 if k == '.arvados#project':
1134 return self.project_object_file
1135 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1136 return super(ProjectDirectory, self).__getitem__(k)
1137 with llfuse.lock_released:
1138 k2 = self.unsanitize_filename(k)
1140 namefilter = ["name", "=", k]
1142 namefilter = ["name", "in", [k, k2]]
1143 contents = self.api.groups().list(
1145 ["owner_uuid", "=", self.project_uuid],
1146 ["group_class", "in", ["project","filter"]],
1148 *self._filters_for('groups', qualified=False),
1151 ).execute(num_retries=self.num_retries)["items"]
1153 contents = self.api.collections().list(
1155 ["owner_uuid", "=", self.project_uuid],
1157 *self._filters_for('collections', qualified=False),
1160 ).execute(num_retries=self.num_retries)["items"]
1162 if len(contents) > 1 and contents[1]['name'] == k:
1163 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1165 contents = [contents[1]]
1166 name = self.sanitize_filename(self.namefn(contents[0]))
1169 return self._add_entry(contents[0], name)
1174 def __contains__(self, k):
1175 if k == '.arvados#project':
1187 if not self._enable_write:
1189 with llfuse.lock_released:
1190 if not self._current_user:
1191 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1192 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1194 def persisted(self):
1199 def mkdir(self, name):
1200 if not self.writable():
1201 raise llfuse.FUSEError(errno.EROFS)
1204 with llfuse.lock_released:
1206 "owner_uuid": self.project_uuid,
1208 "manifest_text": "" }
1209 if self.storage_classes is not None:
1210 c["storage_classes_desired"] = self.storage_classes
1212 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1213 except Exception as e:
1216 except apiclient_errors.Error as error:
1217 _logger.error(error)
1218 raise llfuse.FUSEError(errno.EEXIST)
1222 def rmdir(self, name):
1223 if not self.writable():
1224 raise llfuse.FUSEError(errno.EROFS)
1226 if name not in self:
1227 raise llfuse.FUSEError(errno.ENOENT)
1228 if not isinstance(self[name], CollectionDirectory):
1229 raise llfuse.FUSEError(errno.EPERM)
1230 if len(self[name]) > 0:
1231 raise llfuse.FUSEError(errno.ENOTEMPTY)
1232 with llfuse.lock_released:
1233 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1238 def rename(self, name_old, name_new, src):
1239 if not self.writable():
1240 raise llfuse.FUSEError(errno.EROFS)
1242 if not isinstance(src, ProjectDirectory):
1243 raise llfuse.FUSEError(errno.EPERM)
1247 if not isinstance(ent, CollectionDirectory):
1248 raise llfuse.FUSEError(errno.EPERM)
1250 if name_new in self:
1251 # POSIX semantics for replacing one directory with another is
1252 # tricky (the target directory must be empty, the operation must be
1253 # atomic which isn't possible with the Arvados API as of this
1254 # writing) so don't support that.
1255 raise llfuse.FUSEError(errno.EPERM)
1257 self.api.collections().update(uuid=ent.uuid(),
1258 body={"owner_uuid": self.uuid(),
1259 "name": name_new}).execute(num_retries=self.num_retries)
1261 # Acually move the entry from source directory to this directory.
1262 del src._entries[name_old]
1263 self._entries[name_new] = ent
1264 self.inodes.invalidate_entry(src, name_old)
1267 def child_event(self, ev):
1268 properties = ev.get("properties") or {}
1269 old_attrs = properties.get("old_attributes") or {}
1270 new_attrs = properties.get("new_attributes") or {}
1271 old_attrs["uuid"] = ev["object_uuid"]
1272 new_attrs["uuid"] = ev["object_uuid"]
1273 old_name = self.sanitize_filename(self.namefn(old_attrs))
1274 new_name = self.sanitize_filename(self.namefn(new_attrs))
1276 # create events will have a new name, but not an old name
1277 # delete events will have an old name, but not a new name
1278 # update events will have an old and new name, and they may be same or different
1279 # if they are the same, an unrelated field changed and there is nothing to do.
1281 if old_attrs.get("owner_uuid") != self.project_uuid:
1282 # Was moved from somewhere else, so don't try to remove entry.
1284 if ev.get("object_owner_uuid") != self.project_uuid:
1285 # Was moved to somewhere else, so don't try to add entry
1288 if old_attrs.get("is_trashed"):
1289 # Was previously deleted
1291 if new_attrs.get("is_trashed"):
1295 if new_name != old_name:
1297 if old_name in self._entries:
1298 ent = self._entries[old_name]
1299 del self._entries[old_name]
1300 self.inodes.invalidate_entry(self, old_name)
1304 self._entries[new_name] = ent
1306 self._add_entry(new_attrs, new_name)
1307 elif ent is not None:
1308 self.inodes.del_entry(ent)
1311 class SharedDirectory(Directory):
1312 """A special directory that represents users or groups who have shared projects with me."""
1314 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1315 exclude, poll=False, poll_time=60, storage_classes=None):
1316 super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
1318 self.num_retries = num_retries
1319 self.current_user = api.users().current().execute(num_retries=num_retries)
1321 self._poll_time = poll_time
1322 self._updating_lock = threading.Lock()
1323 self.storage_classes = storage_classes
1328 with llfuse.lock_released:
1329 self._updating_lock.acquire()
1330 if not self.stale():
1338 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1339 if 'httpMethod' in methods.get('shared', {}):
1342 resp = self.api.groups().shared(
1344 ['group_class', 'in', ['project','filter']],
1346 *self._filters_for('groups', qualified=False),
1351 include="owner_uuid",
1353 if not resp["items"]:
1355 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1356 for r in resp["items"]:
1357 objects[r["uuid"]] = r
1358 roots.append(r["uuid"])
1359 for r in resp["included"]:
1360 objects[r["uuid"]] = r
1361 root_owners.add(r["uuid"])
1363 all_projects = list(arvados.util.keyset_list_all(
1364 self.api.groups().list,
1366 num_retries=self.num_retries,
1368 ['group_class', 'in', ['project','filter']],
1369 *self._filters_for('groups', qualified=False),
1371 select=["uuid", "owner_uuid"],
1373 for ob in all_projects:
1374 objects[ob['uuid']] = ob
1376 current_uuid = self.current_user['uuid']
1377 for ob in all_projects:
1378 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1379 roots.append(ob['uuid'])
1380 root_owners.add(ob['owner_uuid'])
1382 lusers = arvados.util.keyset_list_all(
1383 self.api.users().list,
1385 num_retries=self.num_retries,
1387 ['uuid', 'in', list(root_owners)],
1388 *self._filters_for('users', qualified=False),
1391 lgroups = arvados.util.keyset_list_all(
1392 self.api.groups().list,
1394 num_retries=self.num_retries,
1396 ['uuid', 'in', list(root_owners)+roots],
1397 *self._filters_for('groups', qualified=False),
1401 objects[l["uuid"]] = l
1403 objects[l["uuid"]] = l
1405 for r in root_owners:
1409 contents[obr["name"]] = obr
1410 elif "first_name" in obr:
1411 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1416 if obr['owner_uuid'] not in objects:
1417 contents[obr["name"]] = obr
1419 # end with llfuse.lock_released, re-acquire lock
1424 lambda a, i: a.uuid() == i[1]['uuid'],
1425 lambda i: ProjectDirectory(
1434 poll_time=self._poll_time,
1435 storage_classes=self.storage_classes,
1439 _logger.exception("arv-mount shared dir error")
1441 self._updating_lock.release()
1443 def want_event_subscribe(self):