1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future.utils import viewitems
8 from future.utils import itervalues
9 from builtins import dict
20 from apiclient import errors as apiclient_errors
22 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
23 from .fresh import FreshBase, convertTime, use_counter, check_update
25 import arvados.collection
26 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
28 _logger = logging.getLogger('arvados.arvados_fuse')
31 # Match any character which FUSE or Linux cannot accommodate as part
32 # of a filename. (If present in a collection filename, they will
33 # appear as underscores in the fuse mount.)
34 _disallowed_filename_characters = re.compile('[\x00/]')
37 class Directory(FreshBase):
38 """Generic directory object, backed by a dict.
40 Consists of a set of entries with the key representing the filename
41 and the value referencing a File or Directory object.
44 def __init__(self, parent_inode, inodes, apiconfig):
45 """parent_inode is the integer inode number"""
47 super(Directory, self).__init__()
50 if not isinstance(parent_inode, int):
51 raise Exception("parent_inode should be an int")
52 self.parent_inode = parent_inode
54 self.apiconfig = apiconfig
56 self._mtime = time.time()
58 def forward_slash_subst(self):
59 if not hasattr(self, '_fsns'):
61 config = self.apiconfig()
63 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
65 # old API server with no FSNS config
68 if self._fsns == '' or self._fsns == '/':
72 def unsanitize_filename(self, incoming):
73 """Replace ForwardSlashNameSubstitution value with /"""
74 fsns = self.forward_slash_subst()
75 if isinstance(fsns, str):
76 return incoming.replace(fsns, '/')
80 def sanitize_filename(self, dirty):
81 """Replace disallowed filename characters according to
82 ForwardSlashNameSubstitution in self.api_config."""
83 # '.' and '..' are not reachable if API server is newer than #6277
93 fsns = self.forward_slash_subst()
94 if isinstance(fsns, str):
95 dirty = dirty.replace('/', fsns)
96 return _disallowed_filename_characters.sub('_', dirty)
99 # Overridden by subclasses to implement logic to update the
100 # entries dict when the directory is stale
105 # Only used when computing the size of the disk footprint of the directory
113 def checkupdate(self):
117 except apiclient.errors.HttpError as e:
122 def __getitem__(self, item):
123 return self._entries[item]
128 return list(self._entries.items())
132 def __contains__(self, k):
133 return k in self._entries
138 return len(self._entries)
141 self.inodes.touch(self)
142 super(Directory, self).fresh()
144 def merge(self, items, fn, same, new_entry):
145 """Helper method for updating the contents of the directory.
147 Takes a list describing the new contents of the directory, reuse
148 entries that are the same in both the old and new lists, create new
149 entries, and delete old entries missing from the new list.
151 :items: iterable with new directory contents
153 :fn: function to take an entry in 'items' and return the desired file or
154 directory name, or None if this entry should be skipped
156 :same: function to compare an existing entry (a File or Directory
157 object) with an entry in the items list to determine whether to keep
160 :new_entry: function to create a new directory entry (File or Directory
161 object) from an entry in the items list.
165 oldentries = self._entries
169 name = self.sanitize_filename(fn(i))
171 if name in oldentries and same(oldentries[name], i):
172 # move existing directory entry over
173 self._entries[name] = oldentries[name]
176 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
177 # create new directory entry
180 self._entries[name] = self.inodes.add_entry(ent)
183 # delete any other directory entries that were not in found in 'items'
185 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
186 self.inodes.invalidate_entry(self, i)
187 self.inodes.del_entry(oldentries[i])
191 self.inodes.invalidate_inode(self)
192 self._mtime = time.time()
197 if super(Directory, self).in_use():
199 for v in itervalues(self._entries):
204 def has_ref(self, only_children):
205 if super(Directory, self).has_ref(only_children):
207 for v in itervalues(self._entries):
213 """Delete all entries"""
214 oldentries = self._entries
217 oldentries[n].clear()
218 self.inodes.del_entry(oldentries[n])
221 def kernel_invalidate(self):
222 # Invalidating the dentry on the parent implies invalidating all paths
224 parent = self.inodes[self.parent_inode]
226 # Find self on the parent in order to invalidate this path.
227 # Calling the public items() method might trigger a refresh,
228 # which we definitely don't want, so read the internal dict directly.
229 for k,v in viewitems(parent._entries):
231 self.inodes.invalidate_entry(parent, k)
243 def want_event_subscribe(self):
244 raise NotImplementedError()
246 def create(self, name):
247 raise NotImplementedError()
249 def mkdir(self, name):
250 raise NotImplementedError()
252 def unlink(self, name):
253 raise NotImplementedError()
255 def rmdir(self, name):
256 raise NotImplementedError()
258 def rename(self, name_old, name_new, src):
259 raise NotImplementedError()
262 class CollectionDirectoryBase(Directory):
263 """Represent an Arvados Collection as a directory.
265 This class is used for Subcollections, and is also the base class for
266 CollectionDirectory, which implements collection loading/saving on
269 Most operations act only the underlying Arvados `Collection` object. The
270 `Collection` object signals via a notify callback to
271 `CollectionDirectoryBase.on_event` that an item was added, removed or
272 modified. FUSE inodes and directory entries are created, deleted or
273 invalidated in response to these events.
277 def __init__(self, parent_inode, inodes, apiconfig, collection):
278 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig)
279 self.apiconfig = apiconfig
280 self.collection = collection
282 def new_entry(self, name, item, mtime):
283 name = self.sanitize_filename(name)
284 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
285 if item.fuse_entry.dead is not True:
286 raise Exception("Can only reparent dead inode entry")
287 if item.fuse_entry.inode is None:
288 raise Exception("Reparented entry must still have valid inode")
289 item.fuse_entry.dead = False
290 self._entries[name] = item.fuse_entry
291 elif isinstance(item, arvados.collection.RichCollectionBase):
292 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, item))
293 self._entries[name].populate(mtime)
295 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
296 item.fuse_entry = self._entries[name]
298 def on_event(self, event, collection, name, item):
299 if collection == self.collection:
300 name = self.sanitize_filename(name)
301 _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
303 if event == arvados.collection.ADD:
304 self.new_entry(name, item, self.mtime())
305 elif event == arvados.collection.DEL:
306 ent = self._entries[name]
307 del self._entries[name]
308 self.inodes.invalidate_entry(self, name)
309 self.inodes.del_entry(ent)
310 elif event == arvados.collection.MOD:
311 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
312 self.inodes.invalidate_inode(item.fuse_entry)
313 elif name in self._entries:
314 self.inodes.invalidate_inode(self._entries[name])
316 def populate(self, mtime):
318 self.collection.subscribe(self.on_event)
319 for entry, item in viewitems(self.collection):
320 self.new_entry(entry, item, self.mtime())
323 return self.collection.writable()
327 with llfuse.lock_released:
328 self.collection.root_collection().save()
332 def create(self, name):
333 with llfuse.lock_released:
334 self.collection.open(name, "w").close()
338 def mkdir(self, name):
339 with llfuse.lock_released:
340 self.collection.mkdirs(name)
344 def unlink(self, name):
345 with llfuse.lock_released:
346 self.collection.remove(name)
351 def rmdir(self, name):
352 with llfuse.lock_released:
353 self.collection.remove(name)
358 def rename(self, name_old, name_new, src):
359 if not isinstance(src, CollectionDirectoryBase):
360 raise llfuse.FUSEError(errno.EPERM)
365 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
367 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
369 raise llfuse.FUSEError(errno.ENOTEMPTY)
370 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
371 raise llfuse.FUSEError(errno.ENOTDIR)
372 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
373 raise llfuse.FUSEError(errno.EISDIR)
375 with llfuse.lock_released:
376 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
381 super(CollectionDirectoryBase, self).clear()
382 self.collection = None
385 class CollectionDirectory(CollectionDirectoryBase):
386 """Represents the root of a directory tree representing a collection."""
388 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
389 super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, None)
391 self.num_retries = num_retries
392 self.collection_record_file = None
393 self.collection_record = None
396 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
398 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
399 self._poll_time = 60*60
401 if isinstance(collection_record, dict):
402 self.collection_locator = collection_record['uuid']
403 self._mtime = convertTime(collection_record.get('modified_at'))
405 self.collection_locator = collection_record
407 self._manifest_size = 0
408 if self.collection_locator:
409 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
410 self._updating_lock = threading.Lock()
413 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
416 return self.collection.writable() if self.collection is not None else self._writable
418 def want_event_subscribe(self):
419 return (uuid_pattern.match(self.collection_locator) is not None)
421 # Used by arv-web.py to switch the contents of the CollectionDirectory
422 def change_collection(self, new_locator):
423 """Switch the contents of the CollectionDirectory.
425 Must be called with llfuse.lock held.
428 self.collection_locator = new_locator
429 self.collection_record = None
432 def new_collection(self, new_collection_record, coll_reader):
436 self.collection_record = new_collection_record
438 if self.collection_record:
439 self._mtime = convertTime(self.collection_record.get('modified_at'))
440 self.collection_locator = self.collection_record["uuid"]
441 if self.collection_record_file is not None:
442 self.collection_record_file.update(self.collection_record)
444 self.collection = coll_reader
445 self.populate(self.mtime())
448 return self.collection_locator
451 def update(self, to_record_version=None):
453 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
456 if self.collection_locator is None:
461 with llfuse.lock_released:
462 self._updating_lock.acquire()
466 _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
467 if self.collection is not None:
468 if self.collection.known_past_version(to_record_version):
469 _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
471 self.collection.update()
473 if uuid_pattern.match(self.collection_locator):
474 coll_reader = arvados.collection.Collection(
475 self.collection_locator, self.api, self.api.keep,
476 num_retries=self.num_retries)
478 coll_reader = arvados.collection.CollectionReader(
479 self.collection_locator, self.api, self.api.keep,
480 num_retries=self.num_retries)
481 new_collection_record = coll_reader.api_response() or {}
482 # If the Collection only exists in Keep, there will be no API
483 # response. Fill in the fields we need.
484 if 'uuid' not in new_collection_record:
485 new_collection_record['uuid'] = self.collection_locator
486 if "portable_data_hash" not in new_collection_record:
487 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
488 if 'manifest_text' not in new_collection_record:
489 new_collection_record['manifest_text'] = coll_reader.manifest_text()
491 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
492 self.new_collection(new_collection_record, coll_reader)
494 self._manifest_size = len(coll_reader.manifest_text())
495 _logger.debug("%s manifest_size %i", self, self._manifest_size)
496 # end with llfuse.lock_released, re-acquire lock
501 self._updating_lock.release()
502 except arvados.errors.NotFoundError as e:
503 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
504 except arvados.errors.ArgumentError as detail:
505 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
506 if self.collection_record is not None and "manifest_text" in self.collection_record:
507 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
509 _logger.exception("arv-mount %s: error", self.collection_locator)
510 if self.collection_record is not None and "manifest_text" in self.collection_record:
511 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
517 def __getitem__(self, item):
518 if item == '.arvados#collection':
519 if self.collection_record_file is None:
520 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
521 self.inodes.add_entry(self.collection_record_file)
522 return self.collection_record_file
524 return super(CollectionDirectory, self).__getitem__(item)
526 def __contains__(self, k):
527 if k == '.arvados#collection':
530 return super(CollectionDirectory, self).__contains__(k)
532 def invalidate(self):
533 self.collection_record = None
534 self.collection_record_file = None
535 super(CollectionDirectory, self).invalidate()
538 return (self.collection_locator is not None)
541 # This is an empirically-derived heuristic to estimate the memory used
542 # to store this collection's metadata. Calculating the memory
543 # footprint directly would be more accurate, but also more complicated.
544 return self._manifest_size * 128
547 if self.collection is not None:
549 self.collection.save()
550 self.collection.stop_threads()
553 if self.collection is not None:
554 self.collection.stop_threads()
555 super(CollectionDirectory, self).clear()
556 self._manifest_size = 0
559 class TmpCollectionDirectory(CollectionDirectoryBase):
560 """A directory backed by an Arvados collection that never gets saved.
562 This supports using Keep as scratch space. A userspace program can
563 read the .arvados#collection file to get a current manifest in
564 order to save a snapshot of the scratch data or use it as a crunch
568 class UnsaveableCollection(arvados.collection.Collection):
574 def __init__(self, parent_inode, inodes, api_client, num_retries):
575 collection = self.UnsaveableCollection(
576 api_client=api_client,
577 keep_client=api_client.keep,
578 num_retries=num_retries)
579 super(TmpCollectionDirectory, self).__init__(
580 parent_inode, inodes, api_client.config, collection)
581 self.collection_record_file = None
582 self.populate(self.mtime())
584 def on_event(self, *args, **kwargs):
585 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
586 if self.collection_record_file:
588 self.collection_record_file.invalidate()
589 self.inodes.invalidate_inode(self.collection_record_file)
590 _logger.debug("%s invalidated collection record", self)
592 def collection_record(self):
593 with llfuse.lock_released:
596 "manifest_text": self.collection.manifest_text(),
597 "portable_data_hash": self.collection.portable_data_hash(),
600 def __contains__(self, k):
601 return (k == '.arvados#collection' or
602 super(TmpCollectionDirectory, self).__contains__(k))
605 def __getitem__(self, item):
606 if item == '.arvados#collection':
607 if self.collection_record_file is None:
608 self.collection_record_file = FuncToJSONFile(
609 self.inode, self.collection_record)
610 self.inodes.add_entry(self.collection_record_file)
611 return self.collection_record_file
612 return super(TmpCollectionDirectory, self).__getitem__(item)
620 def want_event_subscribe(self):
624 self.collection.stop_threads()
626 def invalidate(self):
627 if self.collection_record_file:
628 self.collection_record_file.invalidate()
629 super(TmpCollectionDirectory, self).invalidate()
632 class MagicDirectory(Directory):
633 """A special directory that logically contains the set of all extant keep locators.
635 When a file is referenced by lookup(), it is tested to see if it is a valid
636 keep locator to a manifest, and if so, loads the manifest contents as a
637 subdirectory of this directory with the locator as the directory name.
638 Since querying a list of all extant keep locators is impractical, only
639 collections that have already been accessed are visible to readdir().
644 This directory provides access to Arvados collections as subdirectories listed
645 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
646 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
647 (in the form 'zzzzz-j7d0g-1234567890abcde').
649 Note that this directory will appear empty until you attempt to access a
650 specific collection or project subdirectory (such as trying to 'cd' into it),
651 at which point the collection or project will actually be looked up on the server
652 and the directory will appear if it exists.
656 def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
657 super(MagicDirectory, self).__init__(parent_inode, inodes, api.config)
659 self.num_retries = num_retries
660 self.pdh_only = pdh_only
662 def __setattr__(self, name, value):
663 super(MagicDirectory, self).__setattr__(name, value)
664 # When we're assigned an inode, add a README.
665 if ((name == 'inode') and (self.inode is not None) and
666 (not self._entries)):
667 self._entries['README'] = self.inodes.add_entry(
668 StringFile(self.inode, self.README_TEXT, time.time()))
669 # If we're the root directory, add an identical by_id subdirectory.
670 if self.inode == llfuse.ROOT_INODE:
671 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
672 self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
674 def __contains__(self, k):
675 if k in self._entries:
678 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
684 if group_uuid_pattern.match(k):
685 project = self.api.groups().list(
686 filters=[['group_class', '=', 'project'], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
687 if project[u'items_available'] == 0:
689 e = self.inodes.add_entry(ProjectDirectory(
690 self.inode, self.inodes, self.api, self.num_retries, project[u'items'][0]))
692 e = self.inodes.add_entry(CollectionDirectory(
693 self.inode, self.inodes, self.api, self.num_retries, k))
696 if k not in self._entries:
699 self.inodes.del_entry(e)
702 self.inodes.invalidate_entry(self, k)
703 self.inodes.del_entry(e)
705 except Exception as ex:
706 _logger.exception("arv-mount lookup '%s':", k)
708 self.inodes.del_entry(e)
711 def __getitem__(self, item):
713 return self._entries[item]
715 raise KeyError("No collection with id " + item)
720 def want_event_subscribe(self):
721 return not self.pdh_only
724 class TagsDirectory(Directory):
725 """A special directory that contains as subdirectories all tags visible to the user."""
727 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
728 super(TagsDirectory, self).__init__(parent_inode, inodes, api.config)
730 self.num_retries = num_retries
732 self._poll_time = poll_time
735 def want_event_subscribe(self):
740 with llfuse.lock_released:
741 tags = self.api.links().list(
742 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
743 select=['name'], distinct=True, limit=1000
744 ).execute(num_retries=self.num_retries)
746 self.merge(tags['items']+[{"name": n} for n in self._extra],
748 lambda a, i: a.tag == i['name'],
749 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
753 def __getitem__(self, item):
754 if super(TagsDirectory, self).__contains__(item):
755 return super(TagsDirectory, self).__getitem__(item)
756 with llfuse.lock_released:
757 tags = self.api.links().list(
758 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
759 ).execute(num_retries=self.num_retries)
761 self._extra.add(item)
763 return super(TagsDirectory, self).__getitem__(item)
767 def __contains__(self, k):
768 if super(TagsDirectory, self).__contains__(k):
778 class TagDirectory(Directory):
779 """A special directory that contains as subdirectories all collections visible
780 to the user that are tagged with a particular tag.
783 def __init__(self, parent_inode, inodes, api, num_retries, tag,
784 poll=False, poll_time=60):
785 super(TagDirectory, self).__init__(parent_inode, inodes, api.config)
787 self.num_retries = num_retries
790 self._poll_time = poll_time
792 def want_event_subscribe(self):
797 with llfuse.lock_released:
798 taggedcollections = self.api.links().list(
799 filters=[['link_class', '=', 'tag'],
800 ['name', '=', self.tag],
801 ['head_uuid', 'is_a', 'arvados#collection']],
803 ).execute(num_retries=self.num_retries)
804 self.merge(taggedcollections['items'],
805 lambda i: i['head_uuid'],
806 lambda a, i: a.collection_locator == i['head_uuid'],
807 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
810 class ProjectDirectory(Directory):
811 """A special directory that contains the contents of a project."""
813 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
814 poll=False, poll_time=60):
815 super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config)
817 self.num_retries = num_retries
818 self.project_object = project_object
819 self.project_object_file = None
820 self.project_uuid = project_object['uuid']
822 self._poll_time = poll_time
823 self._updating_lock = threading.Lock()
824 self._current_user = None
825 self._full_listing = False
827 def want_event_subscribe(self):
830 def createDirectory(self, i):
831 if collection_uuid_pattern.match(i['uuid']):
832 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
833 elif group_uuid_pattern.match(i['uuid']):
834 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
835 elif link_uuid_pattern.match(i['uuid']):
836 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
837 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
840 elif uuid_pattern.match(i['uuid']):
841 return ObjectFile(self.parent_inode, i)
846 return self.project_uuid
849 self._full_listing = True
850 return super(ProjectDirectory, self).items()
854 if i['name'] is None or len(i['name']) == 0:
856 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
857 # collection or subproject
859 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
862 elif 'kind' in i and i['kind'].startswith('arvados#'):
864 return "{}.{}".format(i['name'], i['kind'][8:])
871 if self.project_object_file == None:
872 self.project_object_file = ObjectFile(self.inode, self.project_object)
873 self.inodes.add_entry(self.project_object_file)
875 if not self._full_listing:
879 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
880 return a.uuid() == i['uuid']
881 elif isinstance(a, ObjectFile):
882 return a.uuid() == i['uuid'] and not a.stale()
886 with llfuse.lock_released:
887 self._updating_lock.acquire()
891 if group_uuid_pattern.match(self.project_uuid):
892 self.project_object = self.api.groups().get(
893 uuid=self.project_uuid).execute(num_retries=self.num_retries)
894 elif user_uuid_pattern.match(self.project_uuid):
895 self.project_object = self.api.users().get(
896 uuid=self.project_uuid).execute(num_retries=self.num_retries)
898 contents = arvados.util.list_all(self.api.groups().list,
900 filters=[["owner_uuid", "=", self.project_uuid],
901 ["group_class", "=", "project"]])
902 contents.extend(arvados.util.list_all(self.api.collections().list,
904 filters=[["owner_uuid", "=", self.project_uuid]]))
906 # end with llfuse.lock_released, re-acquire lock
911 self.createDirectory)
914 self._updating_lock.release()
916 def _add_entry(self, i, name):
917 ent = self.createDirectory(i)
918 self._entries[name] = self.inodes.add_entry(ent)
919 return self._entries[name]
923 def __getitem__(self, k):
924 if k == '.arvados#project':
925 return self.project_object_file
926 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
927 return super(ProjectDirectory, self).__getitem__(k)
928 with llfuse.lock_released:
929 k2 = self.unsanitize_filename(k)
931 namefilter = ["name", "=", k]
933 namefilter = ["name", "in", [k, k2]]
934 contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
935 ["group_class", "=", "project"],
937 limit=2).execute(num_retries=self.num_retries)["items"]
939 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
941 limit=2).execute(num_retries=self.num_retries)["items"]
943 if len(contents) > 1 and contents[1]['name'] == k:
944 # If "foo/bar" and "foo[SUBST]bar" both exist, use
946 contents = [contents[1]]
947 name = self.sanitize_filename(self.namefn(contents[0]))
950 return self._add_entry(contents[0], name)
955 def __contains__(self, k):
956 if k == '.arvados#project':
968 with llfuse.lock_released:
969 if not self._current_user:
970 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
971 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
978 def mkdir(self, name):
980 with llfuse.lock_released:
981 self.api.collections().create(body={"owner_uuid": self.project_uuid,
983 "manifest_text": ""}).execute(num_retries=self.num_retries)
985 except apiclient_errors.Error as error:
987 raise llfuse.FUSEError(errno.EEXIST)
991 def rmdir(self, name):
993 raise llfuse.FUSEError(errno.ENOENT)
994 if not isinstance(self[name], CollectionDirectory):
995 raise llfuse.FUSEError(errno.EPERM)
996 if len(self[name]) > 0:
997 raise llfuse.FUSEError(errno.ENOTEMPTY)
998 with llfuse.lock_released:
999 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1004 def rename(self, name_old, name_new, src):
1005 if not isinstance(src, ProjectDirectory):
1006 raise llfuse.FUSEError(errno.EPERM)
1010 if not isinstance(ent, CollectionDirectory):
1011 raise llfuse.FUSEError(errno.EPERM)
1013 if name_new in self:
1014 # POSIX semantics for replacing one directory with another is
1015 # tricky (the target directory must be empty, the operation must be
1016 # atomic which isn't possible with the Arvados API as of this
1017 # writing) so don't support that.
1018 raise llfuse.FUSEError(errno.EPERM)
1020 self.api.collections().update(uuid=ent.uuid(),
1021 body={"owner_uuid": self.uuid(),
1022 "name": name_new}).execute(num_retries=self.num_retries)
1024 # Acually move the entry from source directory to this directory.
1025 del src._entries[name_old]
1026 self._entries[name_new] = ent
1027 self.inodes.invalidate_entry(src, name_old)
1030 def child_event(self, ev):
1031 properties = ev.get("properties") or {}
1032 old_attrs = properties.get("old_attributes") or {}
1033 new_attrs = properties.get("new_attributes") or {}
1034 old_attrs["uuid"] = ev["object_uuid"]
1035 new_attrs["uuid"] = ev["object_uuid"]
1036 old_name = self.sanitize_filename(self.namefn(old_attrs))
1037 new_name = self.sanitize_filename(self.namefn(new_attrs))
1039 # create events will have a new name, but not an old name
1040 # delete events will have an old name, but not a new name
1041 # update events will have an old and new name, and they may be same or different
1042 # if they are the same, an unrelated field changed and there is nothing to do.
1044 if old_attrs.get("owner_uuid") != self.project_uuid:
1045 # Was moved from somewhere else, so don't try to remove entry.
1047 if ev.get("object_owner_uuid") != self.project_uuid:
1048 # Was moved to somewhere else, so don't try to add entry
1051 if old_attrs.get("is_trashed"):
1052 # Was previously deleted
1054 if new_attrs.get("is_trashed"):
1058 if new_name != old_name:
1060 if old_name in self._entries:
1061 ent = self._entries[old_name]
1062 del self._entries[old_name]
1063 self.inodes.invalidate_entry(self, old_name)
1067 self._entries[new_name] = ent
1069 self._add_entry(new_attrs, new_name)
1070 elif ent is not None:
1071 self.inodes.del_entry(ent)
1074 class SharedDirectory(Directory):
1075 """A special directory that represents users or groups who have shared projects with me."""
1077 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1078 poll=False, poll_time=60):
1079 super(SharedDirectory, self).__init__(parent_inode, inodes, api.config)
1081 self.num_retries = num_retries
1082 self.current_user = api.users().current().execute(num_retries=num_retries)
1084 self._poll_time = poll_time
1085 self._updating_lock = threading.Lock()
1090 with llfuse.lock_released:
1091 self._updating_lock.acquire()
1092 if not self.stale():
1100 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1101 if 'httpMethod' in methods.get('shared', {}):
1104 resp = self.api.groups().shared(filters=[['group_class', '=', 'project']]+page,
1108 include="owner_uuid").execute()
1109 if not resp["items"]:
1111 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1112 for r in resp["items"]:
1113 objects[r["uuid"]] = r
1114 roots.append(r["uuid"])
1115 for r in resp["included"]:
1116 objects[r["uuid"]] = r
1117 root_owners.add(r["uuid"])
1119 all_projects = arvados.util.list_all(
1120 self.api.groups().list, self.num_retries,
1121 filters=[['group_class','=','project']],
1122 select=["uuid", "owner_uuid"])
1123 for ob in all_projects:
1124 objects[ob['uuid']] = ob
1126 current_uuid = self.current_user['uuid']
1127 for ob in all_projects:
1128 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1129 roots.append(ob['uuid'])
1130 root_owners.add(ob['owner_uuid'])
1132 lusers = arvados.util.list_all(
1133 self.api.users().list, self.num_retries,
1134 filters=[['uuid','in', list(root_owners)]])
1135 lgroups = arvados.util.list_all(
1136 self.api.groups().list, self.num_retries,
1137 filters=[['uuid','in', list(root_owners)+roots]])
1140 objects[l["uuid"]] = l
1142 objects[l["uuid"]] = l
1144 for r in root_owners:
1148 contents[obr["name"]] = obr
1149 #elif obr.get("username"):
1150 # contents[obr["username"]] = obr
1151 elif "first_name" in obr:
1152 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1157 if obr['owner_uuid'] not in objects:
1158 contents[obr["name"]] = obr
1160 # end with llfuse.lock_released, re-acquire lock
1162 self.merge(viewitems(contents),
1164 lambda a, i: a.uuid() == i[1]['uuid'],
1165 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
1167 _logger.exception("arv-mount shared dir error")
1169 self._updating_lock.release()
1171 def want_event_subscribe(self):