1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future.utils import viewitems
8 from future.utils import itervalues
9 from builtins import dict
20 from apiclient import errors as apiclient_errors
22 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
23 from .fresh import FreshBase, convertTime, use_counter, check_update
25 import arvados.collection
26 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
28 _logger = logging.getLogger('arvados.arvados_fuse')
31 # Match any character which FUSE or Linux cannot accommodate as part
32 # of a filename. (If present in a collection filename, they will
33 # appear as underscores in the fuse mount.)
34 _disallowed_filename_characters = re.compile('[\x00/]')
37 class Directory(FreshBase):
38 """Generic directory object, backed by a dict.
40 Consists of a set of entries with the key representing the filename
41 and the value referencing a File or Directory object.
44 def __init__(self, parent_inode, inodes, apiconfig):
45 """parent_inode is the integer inode number"""
47 super(Directory, self).__init__()
50 if not isinstance(parent_inode, int):
51 raise Exception("parent_inode should be an int")
52 self.parent_inode = parent_inode
54 self.apiconfig = apiconfig
56 self._mtime = time.time()
58 def forward_slash_subst(self):
59 if not hasattr(self, '_fsns'):
61 config = self.apiconfig()
63 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
65 # old API server with no FSNS config
68 if self._fsns == '' or self._fsns == '/':
72 def unsanitize_filename(self, incoming):
73 """Replace ForwardSlashNameSubstitution value with /"""
74 fsns = self.forward_slash_subst()
75 if isinstance(fsns, str):
76 return incoming.replace(fsns, '/')
80 def sanitize_filename(self, dirty):
81 """Replace disallowed filename characters according to
82 ForwardSlashNameSubstitution in self.api_config."""
83 # '.' and '..' are not reachable if API server is newer than #6277
93 fsns = self.forward_slash_subst()
94 if isinstance(fsns, str):
95 dirty = dirty.replace('/', fsns)
96 return _disallowed_filename_characters.sub('_', dirty)
99 # Overridden by subclasses to implement logic to update the
100 # entries dict when the directory is stale
105 # Only used when computing the size of the disk footprint of the directory
113 def checkupdate(self):
117 except apiclient.errors.HttpError as e:
122 def __getitem__(self, item):
123 return self._entries[item]
128 return list(self._entries.items())
132 def __contains__(self, k):
133 return k in self._entries
138 return len(self._entries)
141 self.inodes.touch(self)
142 super(Directory, self).fresh()
144 def merge(self, items, fn, same, new_entry):
145 """Helper method for updating the contents of the directory.
147 Takes a list describing the new contents of the directory, reuse
148 entries that are the same in both the old and new lists, create new
149 entries, and delete old entries missing from the new list.
151 :items: iterable with new directory contents
153 :fn: function to take an entry in 'items' and return the desired file or
154 directory name, or None if this entry should be skipped
156 :same: function to compare an existing entry (a File or Directory
157 object) with an entry in the items list to determine whether to keep
160 :new_entry: function to create a new directory entry (File or Directory
161 object) from an entry in the items list.
165 oldentries = self._entries
169 name = self.sanitize_filename(fn(i))
171 if name in oldentries and same(oldentries[name], i):
172 # move existing directory entry over
173 self._entries[name] = oldentries[name]
176 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
177 # create new directory entry
180 self._entries[name] = self.inodes.add_entry(ent)
183 # delete any other directory entries that were not in found in 'items'
185 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
186 self.inodes.invalidate_entry(self, i)
187 self.inodes.del_entry(oldentries[i])
191 self.inodes.invalidate_inode(self)
192 self._mtime = time.time()
197 if super(Directory, self).in_use():
199 for v in itervalues(self._entries):
204 def has_ref(self, only_children):
205 if super(Directory, self).has_ref(only_children):
207 for v in itervalues(self._entries):
213 """Delete all entries"""
214 oldentries = self._entries
217 oldentries[n].clear()
218 self.inodes.del_entry(oldentries[n])
221 def kernel_invalidate(self):
222 # Invalidating the dentry on the parent implies invalidating all paths
224 parent = self.inodes[self.parent_inode]
226 # Find self on the parent in order to invalidate this path.
227 # Calling the public items() method might trigger a refresh,
228 # which we definitely don't want, so read the internal dict directly.
229 for k,v in viewitems(parent._entries):
231 self.inodes.invalidate_entry(parent, k)
243 def want_event_subscribe(self):
244 raise NotImplementedError()
246 def create(self, name):
247 raise NotImplementedError()
249 def mkdir(self, name):
250 raise NotImplementedError()
252 def unlink(self, name):
253 raise NotImplementedError()
255 def rmdir(self, name):
256 raise NotImplementedError()
258 def rename(self, name_old, name_new, src):
259 raise NotImplementedError()
262 class CollectionDirectoryBase(Directory):
263 """Represent an Arvados Collection as a directory.
265 This class is used for Subcollections, and is also the base class for
266 CollectionDirectory, which implements collection loading/saving on
269 Most operations act only the underlying Arvados `Collection` object. The
270 `Collection` object signals via a notify callback to
271 `CollectionDirectoryBase.on_event` that an item was added, removed or
272 modified. FUSE inodes and directory entries are created, deleted or
273 invalidated in response to these events.
277 def __init__(self, parent_inode, inodes, apiconfig, collection):
278 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig)
279 self.apiconfig = apiconfig
280 self.collection = collection
282 def new_entry(self, name, item, mtime):
283 name = self.sanitize_filename(name)
284 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
285 if item.fuse_entry.dead is not True:
286 raise Exception("Can only reparent dead inode entry")
287 if item.fuse_entry.inode is None:
288 raise Exception("Reparented entry must still have valid inode")
289 item.fuse_entry.dead = False
290 self._entries[name] = item.fuse_entry
291 elif isinstance(item, arvados.collection.RichCollectionBase):
292 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, item))
293 self._entries[name].populate(mtime)
295 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
296 item.fuse_entry = self._entries[name]
298 def on_event(self, event, collection, name, item):
299 if collection == self.collection:
300 name = self.sanitize_filename(name)
301 _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
303 if event == arvados.collection.ADD:
304 self.new_entry(name, item, self.mtime())
305 elif event == arvados.collection.DEL:
306 ent = self._entries[name]
307 del self._entries[name]
308 self.inodes.invalidate_entry(self, name)
309 self.inodes.del_entry(ent)
310 elif event == arvados.collection.MOD:
311 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
312 self.inodes.invalidate_inode(item.fuse_entry)
313 elif name in self._entries:
314 self.inodes.invalidate_inode(self._entries[name])
316 def populate(self, mtime):
318 self.collection.subscribe(self.on_event)
319 for entry, item in viewitems(self.collection):
320 self.new_entry(entry, item, self.mtime())
323 return self.collection.writable()
327 with llfuse.lock_released:
328 self.collection.root_collection().save()
332 def create(self, name):
333 with llfuse.lock_released:
334 self.collection.open(name, "w").close()
338 def mkdir(self, name):
339 with llfuse.lock_released:
340 self.collection.mkdirs(name)
344 def unlink(self, name):
345 with llfuse.lock_released:
346 self.collection.remove(name)
351 def rmdir(self, name):
352 with llfuse.lock_released:
353 self.collection.remove(name)
358 def rename(self, name_old, name_new, src):
359 if not isinstance(src, CollectionDirectoryBase):
360 raise llfuse.FUSEError(errno.EPERM)
365 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
367 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
369 raise llfuse.FUSEError(errno.ENOTEMPTY)
370 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
371 raise llfuse.FUSEError(errno.ENOTDIR)
372 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
373 raise llfuse.FUSEError(errno.EISDIR)
375 with llfuse.lock_released:
376 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
381 super(CollectionDirectoryBase, self).clear()
382 self.collection = None
385 class CollectionDirectory(CollectionDirectoryBase):
386 """Represents the root of a directory tree representing a collection."""
388 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
389 super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, None)
391 self.num_retries = num_retries
392 self.collection_record_file = None
393 self.collection_record = None
396 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
398 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
399 self._poll_time = 60*60
401 if isinstance(collection_record, dict):
402 self.collection_locator = collection_record['uuid']
403 self._mtime = convertTime(collection_record.get('modified_at'))
405 self.collection_locator = collection_record
407 self._manifest_size = 0
408 if self.collection_locator:
409 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
410 self._updating_lock = threading.Lock()
413 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
416 return self.collection.writable() if self.collection is not None else self._writable
418 def want_event_subscribe(self):
419 return (uuid_pattern.match(self.collection_locator) is not None)
421 # Used by arv-web.py to switch the contents of the CollectionDirectory
422 def change_collection(self, new_locator):
423 """Switch the contents of the CollectionDirectory.
425 Must be called with llfuse.lock held.
428 self.collection_locator = new_locator
429 self.collection_record = None
432 def new_collection(self, new_collection_record, coll_reader):
436 self.collection_record = new_collection_record
438 if self.collection_record:
439 self._mtime = convertTime(self.collection_record.get('modified_at'))
440 self.collection_locator = self.collection_record["uuid"]
441 if self.collection_record_file is not None:
442 self.collection_record_file.update(self.collection_record)
444 self.collection = coll_reader
445 self.populate(self.mtime())
448 return self.collection_locator
451 def update(self, to_record_version=None):
453 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
456 if self.collection_locator is None:
461 with llfuse.lock_released:
462 self._updating_lock.acquire()
466 _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
467 if self.collection is not None:
468 if self.collection.known_past_version(to_record_version):
469 _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
471 self.collection.update()
473 if uuid_pattern.match(self.collection_locator):
474 coll_reader = arvados.collection.Collection(
475 self.collection_locator, self.api, self.api.keep,
476 num_retries=self.num_retries)
478 coll_reader = arvados.collection.CollectionReader(
479 self.collection_locator, self.api, self.api.keep,
480 num_retries=self.num_retries)
481 new_collection_record = coll_reader.api_response() or {}
482 # If the Collection only exists in Keep, there will be no API
483 # response. Fill in the fields we need.
484 if 'uuid' not in new_collection_record:
485 new_collection_record['uuid'] = self.collection_locator
486 if "portable_data_hash" not in new_collection_record:
487 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
488 if 'manifest_text' not in new_collection_record:
489 new_collection_record['manifest_text'] = coll_reader.manifest_text()
490 if 'storage_classes_desired' not in new_collection_record:
491 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
493 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
494 self.new_collection(new_collection_record, coll_reader)
496 self._manifest_size = len(coll_reader.manifest_text())
497 _logger.debug("%s manifest_size %i", self, self._manifest_size)
498 # end with llfuse.lock_released, re-acquire lock
503 self._updating_lock.release()
504 except arvados.errors.NotFoundError as e:
505 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
506 except arvados.errors.ArgumentError as detail:
507 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
508 if self.collection_record is not None and "manifest_text" in self.collection_record:
509 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
511 _logger.exception("arv-mount %s: error", self.collection_locator)
512 if self.collection_record is not None and "manifest_text" in self.collection_record:
513 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
519 def __getitem__(self, item):
520 if item == '.arvados#collection':
521 if self.collection_record_file is None:
522 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
523 self.inodes.add_entry(self.collection_record_file)
524 return self.collection_record_file
526 return super(CollectionDirectory, self).__getitem__(item)
528 def __contains__(self, k):
529 if k == '.arvados#collection':
532 return super(CollectionDirectory, self).__contains__(k)
534 def invalidate(self):
535 self.collection_record = None
536 self.collection_record_file = None
537 super(CollectionDirectory, self).invalidate()
540 return (self.collection_locator is not None)
543 # This is an empirically-derived heuristic to estimate the memory used
544 # to store this collection's metadata. Calculating the memory
545 # footprint directly would be more accurate, but also more complicated.
546 return self._manifest_size * 128
549 if self.collection is not None:
551 self.collection.save()
552 self.collection.stop_threads()
555 if self.collection is not None:
556 self.collection.stop_threads()
557 super(CollectionDirectory, self).clear()
558 self._manifest_size = 0
561 class TmpCollectionDirectory(CollectionDirectoryBase):
562 """A directory backed by an Arvados collection that never gets saved.
564 This supports using Keep as scratch space. A userspace program can
565 read the .arvados#collection file to get a current manifest in
566 order to save a snapshot of the scratch data or use it as a crunch
570 class UnsaveableCollection(arvados.collection.Collection):
576 def __init__(self, parent_inode, inodes, api_client, num_retries, storage_classes=None):
577 collection = self.UnsaveableCollection(
578 api_client=api_client,
579 keep_client=api_client.keep,
580 num_retries=num_retries,
581 storage_classes_desired=storage_classes)
582 super(TmpCollectionDirectory, self).__init__(
583 parent_inode, inodes, api_client.config, collection)
584 self.collection_record_file = None
585 self.populate(self.mtime())
587 def on_event(self, *args, **kwargs):
588 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
589 if self.collection_record_file:
591 self.collection_record_file.invalidate()
592 self.inodes.invalidate_inode(self.collection_record_file)
593 _logger.debug("%s invalidated collection record", self)
595 def collection_record(self):
596 with llfuse.lock_released:
599 "manifest_text": self.collection.manifest_text(),
600 "portable_data_hash": self.collection.portable_data_hash(),
601 "storage_classes_desired": self.collection.storage_classes_desired(),
604 def __contains__(self, k):
605 return (k == '.arvados#collection' or
606 super(TmpCollectionDirectory, self).__contains__(k))
609 def __getitem__(self, item):
610 if item == '.arvados#collection':
611 if self.collection_record_file is None:
612 self.collection_record_file = FuncToJSONFile(
613 self.inode, self.collection_record)
614 self.inodes.add_entry(self.collection_record_file)
615 return self.collection_record_file
616 return super(TmpCollectionDirectory, self).__getitem__(item)
624 def want_event_subscribe(self):
628 self.collection.stop_threads()
630 def invalidate(self):
631 if self.collection_record_file:
632 self.collection_record_file.invalidate()
633 super(TmpCollectionDirectory, self).invalidate()
636 class MagicDirectory(Directory):
637 """A special directory that logically contains the set of all extant keep locators.
639 When a file is referenced by lookup(), it is tested to see if it is a valid
640 keep locator to a manifest, and if so, loads the manifest contents as a
641 subdirectory of this directory with the locator as the directory name.
642 Since querying a list of all extant keep locators is impractical, only
643 collections that have already been accessed are visible to readdir().
648 This directory provides access to Arvados collections as subdirectories listed
649 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
650 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
651 (in the form 'zzzzz-j7d0g-1234567890abcde').
653 Note that this directory will appear empty until you attempt to access a
654 specific collection or project subdirectory (such as trying to 'cd' into it),
655 at which point the collection or project will actually be looked up on the server
656 and the directory will appear if it exists.
660 def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False, storage_classes=None):
661 super(MagicDirectory, self).__init__(parent_inode, inodes, api.config)
663 self.num_retries = num_retries
664 self.pdh_only = pdh_only
665 self.storage_classes = storage_classes
667 def __setattr__(self, name, value):
668 super(MagicDirectory, self).__setattr__(name, value)
669 # When we're assigned an inode, add a README.
670 if ((name == 'inode') and (self.inode is not None) and
671 (not self._entries)):
672 self._entries['README'] = self.inodes.add_entry(
673 StringFile(self.inode, self.README_TEXT, time.time()))
674 # If we're the root directory, add an identical by_id subdirectory.
675 if self.inode == llfuse.ROOT_INODE:
676 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
677 self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
679 def __contains__(self, k):
680 if k in self._entries:
683 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
689 if group_uuid_pattern.match(k):
690 project = self.api.groups().list(
691 filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
692 if project[u'items_available'] == 0:
694 e = self.inodes.add_entry(ProjectDirectory(
695 self.inode, self.inodes, self.api, self.num_retries,
696 project[u'items'][0], storage_classes=self.storage_classes))
698 e = self.inodes.add_entry(CollectionDirectory(
699 self.inode, self.inodes, self.api, self.num_retries, k))
702 if k not in self._entries:
705 self.inodes.del_entry(e)
708 self.inodes.invalidate_entry(self, k)
709 self.inodes.del_entry(e)
711 except Exception as ex:
712 _logger.exception("arv-mount lookup '%s':", k)
714 self.inodes.del_entry(e)
717 def __getitem__(self, item):
719 return self._entries[item]
721 raise KeyError("No collection with id " + item)
726 def want_event_subscribe(self):
727 return not self.pdh_only
730 class TagsDirectory(Directory):
731 """A special directory that contains as subdirectories all tags visible to the user."""
733 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
734 super(TagsDirectory, self).__init__(parent_inode, inodes, api.config)
736 self.num_retries = num_retries
738 self._poll_time = poll_time
741 def want_event_subscribe(self):
746 with llfuse.lock_released:
747 tags = self.api.links().list(
748 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
749 select=['name'], distinct=True, limit=1000
750 ).execute(num_retries=self.num_retries)
752 self.merge(tags['items']+[{"name": n} for n in self._extra],
754 lambda a, i: a.tag == i['name'],
755 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
759 def __getitem__(self, item):
760 if super(TagsDirectory, self).__contains__(item):
761 return super(TagsDirectory, self).__getitem__(item)
762 with llfuse.lock_released:
763 tags = self.api.links().list(
764 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
765 ).execute(num_retries=self.num_retries)
767 self._extra.add(item)
769 return super(TagsDirectory, self).__getitem__(item)
773 def __contains__(self, k):
774 if super(TagsDirectory, self).__contains__(k):
784 class TagDirectory(Directory):
785 """A special directory that contains as subdirectories all collections visible
786 to the user that are tagged with a particular tag.
789 def __init__(self, parent_inode, inodes, api, num_retries, tag,
790 poll=False, poll_time=60):
791 super(TagDirectory, self).__init__(parent_inode, inodes, api.config)
793 self.num_retries = num_retries
796 self._poll_time = poll_time
798 def want_event_subscribe(self):
803 with llfuse.lock_released:
804 taggedcollections = self.api.links().list(
805 filters=[['link_class', '=', 'tag'],
806 ['name', '=', self.tag],
807 ['head_uuid', 'is_a', 'arvados#collection']],
809 ).execute(num_retries=self.num_retries)
810 self.merge(taggedcollections['items'],
811 lambda i: i['head_uuid'],
812 lambda a, i: a.collection_locator == i['head_uuid'],
813 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
816 class ProjectDirectory(Directory):
817 """A special directory that contains the contents of a project."""
819 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
820 poll=True, poll_time=3, storage_classes=None):
821 super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config)
823 self.num_retries = num_retries
824 self.project_object = project_object
825 self.project_object_file = None
826 self.project_uuid = project_object['uuid']
828 self._poll_time = poll_time
829 self._updating_lock = threading.Lock()
830 self._current_user = None
831 self._full_listing = False
832 self.storage_classes = storage_classes
834 def want_event_subscribe(self):
837 def createDirectory(self, i):
838 if collection_uuid_pattern.match(i['uuid']):
839 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
840 elif group_uuid_pattern.match(i['uuid']):
841 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time, self.storage_classes)
842 elif link_uuid_pattern.match(i['uuid']):
843 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
844 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
847 elif uuid_pattern.match(i['uuid']):
848 return ObjectFile(self.parent_inode, i)
853 return self.project_uuid
856 self._full_listing = True
857 return super(ProjectDirectory, self).items()
861 if i['name'] is None or len(i['name']) == 0:
863 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
864 # collection or subproject
866 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
869 elif 'kind' in i and i['kind'].startswith('arvados#'):
871 return "{}.{}".format(i['name'], i['kind'][8:])
878 if self.project_object_file == None:
879 self.project_object_file = ObjectFile(self.inode, self.project_object)
880 self.inodes.add_entry(self.project_object_file)
882 if not self._full_listing:
886 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
887 return a.uuid() == i['uuid']
888 elif isinstance(a, ObjectFile):
889 return a.uuid() == i['uuid'] and not a.stale()
893 with llfuse.lock_released:
894 self._updating_lock.acquire()
898 if group_uuid_pattern.match(self.project_uuid):
899 self.project_object = self.api.groups().get(
900 uuid=self.project_uuid).execute(num_retries=self.num_retries)
901 elif user_uuid_pattern.match(self.project_uuid):
902 self.project_object = self.api.users().get(
903 uuid=self.project_uuid).execute(num_retries=self.num_retries)
904 # do this in 2 steps until #17424 is fixed
905 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
907 num_retries=self.num_retries,
908 uuid=self.project_uuid,
909 filters=[["uuid", "is_a", "arvados#group"],
910 ["groups.group_class", "in", ["project","filter"]]]))
911 contents.extend(arvados.util.keyset_list_all(self.api.groups().contents,
913 num_retries=self.num_retries,
914 uuid=self.project_uuid,
915 filters=[["uuid", "is_a", "arvados#collection"]]))
917 # end with llfuse.lock_released, re-acquire lock
922 self.createDirectory)
925 self._updating_lock.release()
927 def _add_entry(self, i, name):
928 ent = self.createDirectory(i)
929 self._entries[name] = self.inodes.add_entry(ent)
930 return self._entries[name]
934 def __getitem__(self, k):
935 if k == '.arvados#project':
936 return self.project_object_file
937 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
938 return super(ProjectDirectory, self).__getitem__(k)
939 with llfuse.lock_released:
940 k2 = self.unsanitize_filename(k)
942 namefilter = ["name", "=", k]
944 namefilter = ["name", "in", [k, k2]]
945 contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
946 ["group_class", "in", ["project","filter"]],
948 limit=2).execute(num_retries=self.num_retries)["items"]
950 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
952 limit=2).execute(num_retries=self.num_retries)["items"]
954 if len(contents) > 1 and contents[1]['name'] == k:
955 # If "foo/bar" and "foo[SUBST]bar" both exist, use
957 contents = [contents[1]]
958 name = self.sanitize_filename(self.namefn(contents[0]))
961 return self._add_entry(contents[0], name)
966 def __contains__(self, k):
967 if k == '.arvados#project':
979 with llfuse.lock_released:
980 if not self._current_user:
981 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
982 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
989 def mkdir(self, name):
991 with llfuse.lock_released:
993 "owner_uuid": self.project_uuid,
995 "manifest_text": "" }
996 if self.storage_classes is not None:
997 c["storage_classes_desired"] = self.storage_classes
999 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1000 except Exception as e:
1003 except apiclient_errors.Error as error:
1004 _logger.error(error)
1005 raise llfuse.FUSEError(errno.EEXIST)
1009 def rmdir(self, name):
1010 if name not in self:
1011 raise llfuse.FUSEError(errno.ENOENT)
1012 if not isinstance(self[name], CollectionDirectory):
1013 raise llfuse.FUSEError(errno.EPERM)
1014 if len(self[name]) > 0:
1015 raise llfuse.FUSEError(errno.ENOTEMPTY)
1016 with llfuse.lock_released:
1017 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1022 def rename(self, name_old, name_new, src):
1023 if not isinstance(src, ProjectDirectory):
1024 raise llfuse.FUSEError(errno.EPERM)
1028 if not isinstance(ent, CollectionDirectory):
1029 raise llfuse.FUSEError(errno.EPERM)
1031 if name_new in self:
1032 # POSIX semantics for replacing one directory with another is
1033 # tricky (the target directory must be empty, the operation must be
1034 # atomic which isn't possible with the Arvados API as of this
1035 # writing) so don't support that.
1036 raise llfuse.FUSEError(errno.EPERM)
1038 self.api.collections().update(uuid=ent.uuid(),
1039 body={"owner_uuid": self.uuid(),
1040 "name": name_new}).execute(num_retries=self.num_retries)
1042 # Acually move the entry from source directory to this directory.
1043 del src._entries[name_old]
1044 self._entries[name_new] = ent
1045 self.inodes.invalidate_entry(src, name_old)
1048 def child_event(self, ev):
1049 properties = ev.get("properties") or {}
1050 old_attrs = properties.get("old_attributes") or {}
1051 new_attrs = properties.get("new_attributes") or {}
1052 old_attrs["uuid"] = ev["object_uuid"]
1053 new_attrs["uuid"] = ev["object_uuid"]
1054 old_name = self.sanitize_filename(self.namefn(old_attrs))
1055 new_name = self.sanitize_filename(self.namefn(new_attrs))
1057 # create events will have a new name, but not an old name
1058 # delete events will have an old name, but not a new name
1059 # update events will have an old and new name, and they may be same or different
1060 # if they are the same, an unrelated field changed and there is nothing to do.
1062 if old_attrs.get("owner_uuid") != self.project_uuid:
1063 # Was moved from somewhere else, so don't try to remove entry.
1065 if ev.get("object_owner_uuid") != self.project_uuid:
1066 # Was moved to somewhere else, so don't try to add entry
1069 if old_attrs.get("is_trashed"):
1070 # Was previously deleted
1072 if new_attrs.get("is_trashed"):
1076 if new_name != old_name:
1078 if old_name in self._entries:
1079 ent = self._entries[old_name]
1080 del self._entries[old_name]
1081 self.inodes.invalidate_entry(self, old_name)
1085 self._entries[new_name] = ent
1087 self._add_entry(new_attrs, new_name)
1088 elif ent is not None:
1089 self.inodes.del_entry(ent)
1092 class SharedDirectory(Directory):
1093 """A special directory that represents users or groups who have shared projects with me."""
1095 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1096 poll=False, poll_time=60, storage_classes=None):
1097 super(SharedDirectory, self).__init__(parent_inode, inodes, api.config)
1099 self.num_retries = num_retries
1100 self.current_user = api.users().current().execute(num_retries=num_retries)
1102 self._poll_time = poll_time
1103 self._updating_lock = threading.Lock()
1104 self.storage_classes = storage_classes
1109 with llfuse.lock_released:
1110 self._updating_lock.acquire()
1111 if not self.stale():
1119 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1120 if 'httpMethod' in methods.get('shared', {}):
1123 resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1127 include="owner_uuid").execute()
1128 if not resp["items"]:
1130 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1131 for r in resp["items"]:
1132 objects[r["uuid"]] = r
1133 roots.append(r["uuid"])
1134 for r in resp["included"]:
1135 objects[r["uuid"]] = r
1136 root_owners.add(r["uuid"])
1138 all_projects = list(arvados.util.keyset_list_all(
1139 self.api.groups().list,
1141 num_retries=self.num_retries,
1142 filters=[['group_class','in',['project','filter']]],
1143 select=["uuid", "owner_uuid"]))
1144 for ob in all_projects:
1145 objects[ob['uuid']] = ob
1147 current_uuid = self.current_user['uuid']
1148 for ob in all_projects:
1149 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1150 roots.append(ob['uuid'])
1151 root_owners.add(ob['owner_uuid'])
1153 lusers = arvados.util.keyset_list_all(
1154 self.api.users().list,
1156 num_retries=self.num_retries,
1157 filters=[['uuid','in', list(root_owners)]])
1158 lgroups = arvados.util.keyset_list_all(
1159 self.api.groups().list,
1161 num_retries=self.num_retries,
1162 filters=[['uuid','in', list(root_owners)+roots]])
1165 objects[l["uuid"]] = l
1167 objects[l["uuid"]] = l
1169 for r in root_owners:
1173 contents[obr["name"]] = obr
1174 elif "first_name" in obr:
1175 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1180 if obr['owner_uuid'] not in objects:
1181 contents[obr["name"]] = obr
1183 # end with llfuse.lock_released, re-acquire lock
1185 self.merge(viewitems(contents),
1187 lambda a, i: a.uuid() == i[1]['uuid'],
1188 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1190 _logger.exception("arv-mount shared dir error")
1192 self._updating_lock.release()
1194 def want_event_subscribe(self):