1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future.utils import viewitems
8 from future.utils import itervalues
9 from builtins import dict
20 from apiclient import errors as apiclient_errors
22 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
23 from .fresh import FreshBase, convertTime, use_counter, check_update
25 import arvados.collection
26 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
28 _logger = logging.getLogger('arvados.arvados_fuse')
31 # Match any character which FUSE or Linux cannot accommodate as part
32 # of a filename. (If present in a collection filename, they will
33 # appear as underscores in the fuse mount.)
34 _disallowed_filename_characters = re.compile('[\x00/]')
37 class Directory(FreshBase):
38 """Generic directory object, backed by a dict.
40 Consists of a set of entries with the key representing the filename
41 and the value referencing a File or Directory object.
44 def __init__(self, parent_inode, inodes, apiconfig):
45 """parent_inode is the integer inode number"""
47 super(Directory, self).__init__()
50 if not isinstance(parent_inode, int):
51 raise Exception("parent_inode should be an int")
52 self.parent_inode = parent_inode
54 self.apiconfig = apiconfig
56 self._mtime = time.time()
58 def forward_slash_subst(self):
59 if not hasattr(self, '_fsns'):
61 config = self.apiconfig()
63 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
65 # old API server with no FSNS config
68 if self._fsns == '' or self._fsns == '/':
72 def unsanitize_filename(self, incoming):
73 """Replace ForwardSlashNameSubstitution value with /"""
74 fsns = self.forward_slash_subst()
75 if isinstance(fsns, str):
76 return incoming.replace(fsns, '/')
80 def sanitize_filename(self, dirty):
81 """Replace disallowed filename characters according to
82 ForwardSlashNameSubstitution in self.api_config."""
83 # '.' and '..' are not reachable if API server is newer than #6277
93 fsns = self.forward_slash_subst()
94 if isinstance(fsns, str):
95 dirty = dirty.replace('/', fsns)
96 return _disallowed_filename_characters.sub('_', dirty)
99 # Overridden by subclasses to implement logic to update the
100 # entries dict when the directory is stale
105 # Only used when computing the size of the disk footprint of the directory
113 def checkupdate(self):
117 except apiclient.errors.HttpError as e:
122 def __getitem__(self, item):
123 return self._entries[item]
128 return list(self._entries.items())
132 def __contains__(self, k):
133 return k in self._entries
138 return len(self._entries)
141 self.inodes.touch(self)
142 super(Directory, self).fresh()
144 def merge(self, items, fn, same, new_entry):
145 """Helper method for updating the contents of the directory.
147 Takes a list describing the new contents of the directory, reuse
148 entries that are the same in both the old and new lists, create new
149 entries, and delete old entries missing from the new list.
151 :items: iterable with new directory contents
153 :fn: function to take an entry in 'items' and return the desired file or
154 directory name, or None if this entry should be skipped
156 :same: function to compare an existing entry (a File or Directory
157 object) with an entry in the items list to determine whether to keep
160 :new_entry: function to create a new directory entry (File or Directory
161 object) from an entry in the items list.
165 oldentries = self._entries
169 name = self.sanitize_filename(fn(i))
171 if name in oldentries and same(oldentries[name], i):
172 # move existing directory entry over
173 self._entries[name] = oldentries[name]
176 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
177 # create new directory entry
180 self._entries[name] = self.inodes.add_entry(ent)
183 # delete any other directory entries that were not in found in 'items'
185 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
186 self.inodes.invalidate_entry(self, i)
187 self.inodes.del_entry(oldentries[i])
191 self.inodes.invalidate_inode(self)
192 self._mtime = time.time()
197 if super(Directory, self).in_use():
199 for v in itervalues(self._entries):
204 def has_ref(self, only_children):
205 if super(Directory, self).has_ref(only_children):
207 for v in itervalues(self._entries):
213 """Delete all entries"""
214 oldentries = self._entries
217 oldentries[n].clear()
218 self.inodes.del_entry(oldentries[n])
221 def kernel_invalidate(self):
222 # Invalidating the dentry on the parent implies invalidating all paths
224 parent = self.inodes[self.parent_inode]
226 # Find self on the parent in order to invalidate this path.
227 # Calling the public items() method might trigger a refresh,
228 # which we definitely don't want, so read the internal dict directly.
229 for k,v in viewitems(parent._entries):
231 self.inodes.invalidate_entry(parent, k)
243 def want_event_subscribe(self):
244 raise NotImplementedError()
246 def create(self, name):
247 raise NotImplementedError()
249 def mkdir(self, name):
250 raise NotImplementedError()
252 def unlink(self, name):
253 raise NotImplementedError()
255 def rmdir(self, name):
256 raise NotImplementedError()
258 def rename(self, name_old, name_new, src):
259 raise NotImplementedError()
262 class CollectionDirectoryBase(Directory):
263 """Represent an Arvados Collection as a directory.
265 This class is used for Subcollections, and is also the base class for
266 CollectionDirectory, which implements collection loading/saving on
269 Most operations act only the underlying Arvados `Collection` object. The
270 `Collection` object signals via a notify callback to
271 `CollectionDirectoryBase.on_event` that an item was added, removed or
272 modified. FUSE inodes and directory entries are created, deleted or
273 invalidated in response to these events.
277 def __init__(self, parent_inode, inodes, apiconfig, collection):
278 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig)
279 self.apiconfig = apiconfig
280 self.collection = collection
282 def new_entry(self, name, item, mtime):
283 name = self.sanitize_filename(name)
284 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
285 if item.fuse_entry.dead is not True:
286 raise Exception("Can only reparent dead inode entry")
287 if item.fuse_entry.inode is None:
288 raise Exception("Reparented entry must still have valid inode")
289 item.fuse_entry.dead = False
290 self._entries[name] = item.fuse_entry
291 elif isinstance(item, arvados.collection.RichCollectionBase):
292 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, item))
293 self._entries[name].populate(mtime)
295 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
296 item.fuse_entry = self._entries[name]
298 def on_event(self, event, collection, name, item):
299 if collection == self.collection:
300 name = self.sanitize_filename(name)
303 # It's possible for another thread to have llfuse.lock and
304 # be waiting on collection.lock. Meanwhile, we released
305 # llfuse.lock earlier in the stack, but are still holding
306 # on to the collection lock, and now we need to re-acquire
307 # llfuse.lock. If we don't release the collection lock,
308 # we'll deadlock where we're holding the collection lock
309 # waiting for llfuse.lock and the other thread is holding
310 # llfuse.lock and waiting for the collection lock.
312 # The correct locking order here is to take llfuse.lock
313 # first, then the collection lock.
315 # Since collection.lock is an RLock, it might be locked
316 # multiple times, so we need to release it multiple times,
317 # keep a count, then re-lock it the correct number of
323 self.collection.lock.release()
330 with self.collection.lock:
331 if event == arvados.collection.ADD:
332 self.new_entry(name, item, self.mtime())
333 elif event == arvados.collection.DEL:
334 ent = self._entries[name]
335 del self._entries[name]
336 self.inodes.invalidate_entry(self, name)
337 self.inodes.del_entry(ent)
338 elif event == arvados.collection.MOD:
339 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
340 self.inodes.invalidate_inode(item.fuse_entry)
341 elif name in self._entries:
342 self.inodes.invalidate_inode(self._entries[name])
345 self.collection.lock.acquire()
348 def populate(self, mtime):
350 self.collection.subscribe(self.on_event)
351 for entry, item in viewitems(self.collection):
352 self.new_entry(entry, item, self.mtime())
355 return self.collection.writable()
359 with llfuse.lock_released:
360 self.collection.root_collection().save()
364 def create(self, name):
365 with llfuse.lock_released:
366 self.collection.open(name, "w").close()
370 def mkdir(self, name):
371 with llfuse.lock_released:
372 self.collection.mkdirs(name)
376 def unlink(self, name):
377 with llfuse.lock_released:
378 self.collection.remove(name)
383 def rmdir(self, name):
384 with llfuse.lock_released:
385 self.collection.remove(name)
390 def rename(self, name_old, name_new, src):
391 if not isinstance(src, CollectionDirectoryBase):
392 raise llfuse.FUSEError(errno.EPERM)
397 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
399 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
401 raise llfuse.FUSEError(errno.ENOTEMPTY)
402 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
403 raise llfuse.FUSEError(errno.ENOTDIR)
404 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
405 raise llfuse.FUSEError(errno.EISDIR)
407 with llfuse.lock_released:
408 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
413 super(CollectionDirectoryBase, self).clear()
414 self.collection = None
417 class CollectionDirectory(CollectionDirectoryBase):
418 """Represents the root of a directory tree representing a collection."""
420 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
421 super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, None)
423 self.num_retries = num_retries
424 self.collection_record_file = None
425 self.collection_record = None
428 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
430 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
431 self._poll_time = 60*60
433 if isinstance(collection_record, dict):
434 self.collection_locator = collection_record['uuid']
435 self._mtime = convertTime(collection_record.get('modified_at'))
437 self.collection_locator = collection_record
439 self._manifest_size = 0
440 if self.collection_locator:
441 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
442 self._updating_lock = threading.Lock()
445 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
448 return self.collection.writable() if self.collection is not None else self._writable
450 def want_event_subscribe(self):
451 return (uuid_pattern.match(self.collection_locator) is not None)
453 # Used by arv-web.py to switch the contents of the CollectionDirectory
454 def change_collection(self, new_locator):
455 """Switch the contents of the CollectionDirectory.
457 Must be called with llfuse.lock held.
460 self.collection_locator = new_locator
461 self.collection_record = None
464 def new_collection(self, new_collection_record, coll_reader):
468 self.collection_record = new_collection_record
470 if self.collection_record:
471 self._mtime = convertTime(self.collection_record.get('modified_at'))
472 self.collection_locator = self.collection_record["uuid"]
473 if self.collection_record_file is not None:
474 self.collection_record_file.update(self.collection_record)
476 self.collection = coll_reader
477 self.populate(self.mtime())
480 return self.collection_locator
483 def update(self, to_record_version=None):
485 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
488 if self.collection_locator is None:
493 with llfuse.lock_released:
494 self._updating_lock.acquire()
498 _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
499 if self.collection is not None:
500 if self.collection.known_past_version(to_record_version):
501 _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
503 self.collection.update()
505 if uuid_pattern.match(self.collection_locator):
506 coll_reader = arvados.collection.Collection(
507 self.collection_locator, self.api, self.api.keep,
508 num_retries=self.num_retries)
510 coll_reader = arvados.collection.CollectionReader(
511 self.collection_locator, self.api, self.api.keep,
512 num_retries=self.num_retries)
513 new_collection_record = coll_reader.api_response() or {}
514 # If the Collection only exists in Keep, there will be no API
515 # response. Fill in the fields we need.
516 if 'uuid' not in new_collection_record:
517 new_collection_record['uuid'] = self.collection_locator
518 if "portable_data_hash" not in new_collection_record:
519 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
520 if 'manifest_text' not in new_collection_record:
521 new_collection_record['manifest_text'] = coll_reader.manifest_text()
522 if 'storage_classes_desired' not in new_collection_record:
523 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
525 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
526 self.new_collection(new_collection_record, coll_reader)
528 self._manifest_size = len(coll_reader.manifest_text())
529 _logger.debug("%s manifest_size %i", self, self._manifest_size)
530 # end with llfuse.lock_released, re-acquire lock
535 self._updating_lock.release()
536 except arvados.errors.NotFoundError as e:
537 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
538 except arvados.errors.ArgumentError as detail:
539 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
540 if self.collection_record is not None and "manifest_text" in self.collection_record:
541 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
543 _logger.exception("arv-mount %s: error", self.collection_locator)
544 if self.collection_record is not None and "manifest_text" in self.collection_record:
545 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
551 def __getitem__(self, item):
552 if item == '.arvados#collection':
553 if self.collection_record_file is None:
554 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
555 self.inodes.add_entry(self.collection_record_file)
556 return self.collection_record_file
558 return super(CollectionDirectory, self).__getitem__(item)
560 def __contains__(self, k):
561 if k == '.arvados#collection':
564 return super(CollectionDirectory, self).__contains__(k)
566 def invalidate(self):
567 self.collection_record = None
568 self.collection_record_file = None
569 super(CollectionDirectory, self).invalidate()
572 return (self.collection_locator is not None)
575 # This is an empirically-derived heuristic to estimate the memory used
576 # to store this collection's metadata. Calculating the memory
577 # footprint directly would be more accurate, but also more complicated.
578 return self._manifest_size * 128
581 if self.collection is not None:
583 self.collection.save()
584 self.collection.stop_threads()
587 if self.collection is not None:
588 self.collection.stop_threads()
589 super(CollectionDirectory, self).clear()
590 self._manifest_size = 0
593 class TmpCollectionDirectory(CollectionDirectoryBase):
594 """A directory backed by an Arvados collection that never gets saved.
596 This supports using Keep as scratch space. A userspace program can
597 read the .arvados#collection file to get a current manifest in
598 order to save a snapshot of the scratch data or use it as a crunch
602 class UnsaveableCollection(arvados.collection.Collection):
608 def __init__(self, parent_inode, inodes, api_client, num_retries, storage_classes=None):
609 collection = self.UnsaveableCollection(
610 api_client=api_client,
611 keep_client=api_client.keep,
612 num_retries=num_retries,
613 storage_classes_desired=storage_classes)
614 super(TmpCollectionDirectory, self).__init__(
615 parent_inode, inodes, api_client.config, collection)
616 self.collection_record_file = None
617 self.populate(self.mtime())
619 def on_event(self, *args, **kwargs):
620 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
621 if self.collection_record_file:
623 # See discussion in CollectionDirectoryBase.on_event
627 self.collection.lock.release()
634 with self.collection.lock:
635 self.collection_record_file.invalidate()
636 self.inodes.invalidate_inode(self.collection_record_file)
637 _logger.debug("%s invalidated collection record", self)
640 self.collection.lock.acquire()
643 def collection_record(self):
644 with llfuse.lock_released:
647 "manifest_text": self.collection.manifest_text(),
648 "portable_data_hash": self.collection.portable_data_hash(),
649 "storage_classes_desired": self.collection.storage_classes_desired(),
652 def __contains__(self, k):
653 return (k == '.arvados#collection' or
654 super(TmpCollectionDirectory, self).__contains__(k))
657 def __getitem__(self, item):
658 if item == '.arvados#collection':
659 if self.collection_record_file is None:
660 self.collection_record_file = FuncToJSONFile(
661 self.inode, self.collection_record)
662 self.inodes.add_entry(self.collection_record_file)
663 return self.collection_record_file
664 return super(TmpCollectionDirectory, self).__getitem__(item)
672 def want_event_subscribe(self):
676 self.collection.stop_threads()
678 def invalidate(self):
679 if self.collection_record_file:
680 self.collection_record_file.invalidate()
681 super(TmpCollectionDirectory, self).invalidate()
684 class MagicDirectory(Directory):
685 """A special directory that logically contains the set of all extant keep locators.
687 When a file is referenced by lookup(), it is tested to see if it is a valid
688 keep locator to a manifest, and if so, loads the manifest contents as a
689 subdirectory of this directory with the locator as the directory name.
690 Since querying a list of all extant keep locators is impractical, only
691 collections that have already been accessed are visible to readdir().
696 This directory provides access to Arvados collections as subdirectories listed
697 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
698 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
699 (in the form 'zzzzz-j7d0g-1234567890abcde').
701 Note that this directory will appear empty until you attempt to access a
702 specific collection or project subdirectory (such as trying to 'cd' into it),
703 at which point the collection or project will actually be looked up on the server
704 and the directory will appear if it exists.
708 def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False, storage_classes=None):
709 super(MagicDirectory, self).__init__(parent_inode, inodes, api.config)
711 self.num_retries = num_retries
712 self.pdh_only = pdh_only
713 self.storage_classes = storage_classes
715 def __setattr__(self, name, value):
716 super(MagicDirectory, self).__setattr__(name, value)
717 # When we're assigned an inode, add a README.
718 if ((name == 'inode') and (self.inode is not None) and
719 (not self._entries)):
720 self._entries['README'] = self.inodes.add_entry(
721 StringFile(self.inode, self.README_TEXT, time.time()))
722 # If we're the root directory, add an identical by_id subdirectory.
723 if self.inode == llfuse.ROOT_INODE:
724 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
725 self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
727 def __contains__(self, k):
728 if k in self._entries:
731 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
737 if group_uuid_pattern.match(k):
738 project = self.api.groups().list(
739 filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
740 if project[u'items_available'] == 0:
742 e = self.inodes.add_entry(ProjectDirectory(
743 self.inode, self.inodes, self.api, self.num_retries,
744 project[u'items'][0], storage_classes=self.storage_classes))
746 e = self.inodes.add_entry(CollectionDirectory(
747 self.inode, self.inodes, self.api, self.num_retries, k))
750 if k not in self._entries:
753 self.inodes.del_entry(e)
756 self.inodes.invalidate_entry(self, k)
757 self.inodes.del_entry(e)
759 except Exception as ex:
760 _logger.exception("arv-mount lookup '%s':", k)
762 self.inodes.del_entry(e)
765 def __getitem__(self, item):
767 return self._entries[item]
769 raise KeyError("No collection with id " + item)
774 def want_event_subscribe(self):
775 return not self.pdh_only
778 class TagsDirectory(Directory):
779 """A special directory that contains as subdirectories all tags visible to the user."""
781 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
782 super(TagsDirectory, self).__init__(parent_inode, inodes, api.config)
784 self.num_retries = num_retries
786 self._poll_time = poll_time
789 def want_event_subscribe(self):
794 with llfuse.lock_released:
795 tags = self.api.links().list(
796 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
797 select=['name'], distinct=True, limit=1000
798 ).execute(num_retries=self.num_retries)
800 self.merge(tags['items']+[{"name": n} for n in self._extra],
802 lambda a, i: a.tag == i['name'],
803 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
807 def __getitem__(self, item):
808 if super(TagsDirectory, self).__contains__(item):
809 return super(TagsDirectory, self).__getitem__(item)
810 with llfuse.lock_released:
811 tags = self.api.links().list(
812 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
813 ).execute(num_retries=self.num_retries)
815 self._extra.add(item)
817 return super(TagsDirectory, self).__getitem__(item)
821 def __contains__(self, k):
822 if super(TagsDirectory, self).__contains__(k):
832 class TagDirectory(Directory):
833 """A special directory that contains as subdirectories all collections visible
834 to the user that are tagged with a particular tag.
837 def __init__(self, parent_inode, inodes, api, num_retries, tag,
838 poll=False, poll_time=60):
839 super(TagDirectory, self).__init__(parent_inode, inodes, api.config)
841 self.num_retries = num_retries
844 self._poll_time = poll_time
846 def want_event_subscribe(self):
851 with llfuse.lock_released:
852 taggedcollections = self.api.links().list(
853 filters=[['link_class', '=', 'tag'],
854 ['name', '=', self.tag],
855 ['head_uuid', 'is_a', 'arvados#collection']],
857 ).execute(num_retries=self.num_retries)
858 self.merge(taggedcollections['items'],
859 lambda i: i['head_uuid'],
860 lambda a, i: a.collection_locator == i['head_uuid'],
861 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
864 class ProjectDirectory(Directory):
865 """A special directory that contains the contents of a project."""
867 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
868 poll=True, poll_time=3, storage_classes=None):
869 super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config)
871 self.num_retries = num_retries
872 self.project_object = project_object
873 self.project_object_file = None
874 self.project_uuid = project_object['uuid']
876 self._poll_time = poll_time
877 self._updating_lock = threading.Lock()
878 self._current_user = None
879 self._full_listing = False
880 self.storage_classes = storage_classes
882 def want_event_subscribe(self):
885 def createDirectory(self, i):
886 if collection_uuid_pattern.match(i['uuid']):
887 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
888 elif group_uuid_pattern.match(i['uuid']):
889 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time, self.storage_classes)
890 elif link_uuid_pattern.match(i['uuid']):
891 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
892 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
895 elif uuid_pattern.match(i['uuid']):
896 return ObjectFile(self.parent_inode, i)
901 return self.project_uuid
904 self._full_listing = True
905 return super(ProjectDirectory, self).items()
909 if i['name'] is None or len(i['name']) == 0:
911 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
912 # collection or subproject
914 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
917 elif 'kind' in i and i['kind'].startswith('arvados#'):
919 return "{}.{}".format(i['name'], i['kind'][8:])
926 if self.project_object_file == None:
927 self.project_object_file = ObjectFile(self.inode, self.project_object)
928 self.inodes.add_entry(self.project_object_file)
930 if not self._full_listing:
934 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
935 return a.uuid() == i['uuid']
936 elif isinstance(a, ObjectFile):
937 return a.uuid() == i['uuid'] and not a.stale()
941 with llfuse.lock_released:
942 self._updating_lock.acquire()
946 if group_uuid_pattern.match(self.project_uuid):
947 self.project_object = self.api.groups().get(
948 uuid=self.project_uuid).execute(num_retries=self.num_retries)
949 elif user_uuid_pattern.match(self.project_uuid):
950 self.project_object = self.api.users().get(
951 uuid=self.project_uuid).execute(num_retries=self.num_retries)
952 # do this in 2 steps until #17424 is fixed
953 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
955 num_retries=self.num_retries,
956 uuid=self.project_uuid,
957 filters=[["uuid", "is_a", "arvados#group"],
958 ["groups.group_class", "in", ["project","filter"]]]))
959 contents.extend(arvados.util.keyset_list_all(self.api.groups().contents,
961 num_retries=self.num_retries,
962 uuid=self.project_uuid,
963 filters=[["uuid", "is_a", "arvados#collection"]]))
965 # end with llfuse.lock_released, re-acquire lock
970 self.createDirectory)
973 self._updating_lock.release()
975 def _add_entry(self, i, name):
976 ent = self.createDirectory(i)
977 self._entries[name] = self.inodes.add_entry(ent)
978 return self._entries[name]
982 def __getitem__(self, k):
983 if k == '.arvados#project':
984 return self.project_object_file
985 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
986 return super(ProjectDirectory, self).__getitem__(k)
987 with llfuse.lock_released:
988 k2 = self.unsanitize_filename(k)
990 namefilter = ["name", "=", k]
992 namefilter = ["name", "in", [k, k2]]
993 contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
994 ["group_class", "in", ["project","filter"]],
996 limit=2).execute(num_retries=self.num_retries)["items"]
998 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
1000 limit=2).execute(num_retries=self.num_retries)["items"]
1002 if len(contents) > 1 and contents[1]['name'] == k:
1003 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1005 contents = [contents[1]]
1006 name = self.sanitize_filename(self.namefn(contents[0]))
1009 return self._add_entry(contents[0], name)
1014 def __contains__(self, k):
1015 if k == '.arvados#project':
1027 with llfuse.lock_released:
1028 if not self._current_user:
1029 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1030 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1032 def persisted(self):
1037 def mkdir(self, name):
1039 with llfuse.lock_released:
1041 "owner_uuid": self.project_uuid,
1043 "manifest_text": "" }
1044 if self.storage_classes is not None:
1045 c["storage_classes_desired"] = self.storage_classes
1047 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1048 except Exception as e:
1051 except apiclient_errors.Error as error:
1052 _logger.error(error)
1053 raise llfuse.FUSEError(errno.EEXIST)
1057 def rmdir(self, name):
1058 if name not in self:
1059 raise llfuse.FUSEError(errno.ENOENT)
1060 if not isinstance(self[name], CollectionDirectory):
1061 raise llfuse.FUSEError(errno.EPERM)
1062 if len(self[name]) > 0:
1063 raise llfuse.FUSEError(errno.ENOTEMPTY)
1064 with llfuse.lock_released:
1065 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1070 def rename(self, name_old, name_new, src):
1071 if not isinstance(src, ProjectDirectory):
1072 raise llfuse.FUSEError(errno.EPERM)
1076 if not isinstance(ent, CollectionDirectory):
1077 raise llfuse.FUSEError(errno.EPERM)
1079 if name_new in self:
1080 # POSIX semantics for replacing one directory with another is
1081 # tricky (the target directory must be empty, the operation must be
1082 # atomic which isn't possible with the Arvados API as of this
1083 # writing) so don't support that.
1084 raise llfuse.FUSEError(errno.EPERM)
1086 self.api.collections().update(uuid=ent.uuid(),
1087 body={"owner_uuid": self.uuid(),
1088 "name": name_new}).execute(num_retries=self.num_retries)
1090 # Acually move the entry from source directory to this directory.
1091 del src._entries[name_old]
1092 self._entries[name_new] = ent
1093 self.inodes.invalidate_entry(src, name_old)
1096 def child_event(self, ev):
1097 properties = ev.get("properties") or {}
1098 old_attrs = properties.get("old_attributes") or {}
1099 new_attrs = properties.get("new_attributes") or {}
1100 old_attrs["uuid"] = ev["object_uuid"]
1101 new_attrs["uuid"] = ev["object_uuid"]
1102 old_name = self.sanitize_filename(self.namefn(old_attrs))
1103 new_name = self.sanitize_filename(self.namefn(new_attrs))
1105 # create events will have a new name, but not an old name
1106 # delete events will have an old name, but not a new name
1107 # update events will have an old and new name, and they may be same or different
1108 # if they are the same, an unrelated field changed and there is nothing to do.
1110 if old_attrs.get("owner_uuid") != self.project_uuid:
1111 # Was moved from somewhere else, so don't try to remove entry.
1113 if ev.get("object_owner_uuid") != self.project_uuid:
1114 # Was moved to somewhere else, so don't try to add entry
1117 if old_attrs.get("is_trashed"):
1118 # Was previously deleted
1120 if new_attrs.get("is_trashed"):
1124 if new_name != old_name:
1126 if old_name in self._entries:
1127 ent = self._entries[old_name]
1128 del self._entries[old_name]
1129 self.inodes.invalidate_entry(self, old_name)
1133 self._entries[new_name] = ent
1135 self._add_entry(new_attrs, new_name)
1136 elif ent is not None:
1137 self.inodes.del_entry(ent)
1140 class SharedDirectory(Directory):
1141 """A special directory that represents users or groups who have shared projects with me."""
1143 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1144 poll=False, poll_time=60, storage_classes=None):
1145 super(SharedDirectory, self).__init__(parent_inode, inodes, api.config)
1147 self.num_retries = num_retries
1148 self.current_user = api.users().current().execute(num_retries=num_retries)
1150 self._poll_time = poll_time
1151 self._updating_lock = threading.Lock()
1152 self.storage_classes = storage_classes
1157 with llfuse.lock_released:
1158 self._updating_lock.acquire()
1159 if not self.stale():
1167 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1168 if 'httpMethod' in methods.get('shared', {}):
1171 resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1175 include="owner_uuid").execute()
1176 if not resp["items"]:
1178 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1179 for r in resp["items"]:
1180 objects[r["uuid"]] = r
1181 roots.append(r["uuid"])
1182 for r in resp["included"]:
1183 objects[r["uuid"]] = r
1184 root_owners.add(r["uuid"])
1186 all_projects = list(arvados.util.keyset_list_all(
1187 self.api.groups().list,
1189 num_retries=self.num_retries,
1190 filters=[['group_class','in',['project','filter']]],
1191 select=["uuid", "owner_uuid"]))
1192 for ob in all_projects:
1193 objects[ob['uuid']] = ob
1195 current_uuid = self.current_user['uuid']
1196 for ob in all_projects:
1197 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1198 roots.append(ob['uuid'])
1199 root_owners.add(ob['owner_uuid'])
1201 lusers = arvados.util.keyset_list_all(
1202 self.api.users().list,
1204 num_retries=self.num_retries,
1205 filters=[['uuid','in', list(root_owners)]])
1206 lgroups = arvados.util.keyset_list_all(
1207 self.api.groups().list,
1209 num_retries=self.num_retries,
1210 filters=[['uuid','in', list(root_owners)+roots]])
1213 objects[l["uuid"]] = l
1215 objects[l["uuid"]] = l
1217 for r in root_owners:
1221 contents[obr["name"]] = obr
1222 elif "first_name" in obr:
1223 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1228 if obr['owner_uuid'] not in objects:
1229 contents[obr["name"]] = obr
1231 # end with llfuse.lock_released, re-acquire lock
1233 self.merge(viewitems(contents),
1235 lambda a, i: a.uuid() == i[1]['uuid'],
1236 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1238 _logger.exception("arv-mount shared dir error")
1240 self._updating_lock.release()
1242 def want_event_subscribe(self):