1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future.utils import viewitems
8 from future.utils import itervalues
9 from builtins import dict
18 from apiclient import errors as apiclient_errors
22 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
23 from .fresh import FreshBase, convertTime, use_counter, check_update
25 import arvados.collection
26 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
28 _logger = logging.getLogger('arvados.arvados_fuse')
31 # Match any character which FUSE or Linux cannot accommodate as part
32 # of a filename. (If present in a collection filename, they will
33 # appear as underscores in the fuse mount.)
34 _disallowed_filename_characters = re.compile('[\x00/]')
36 # '.' and '..' are not reachable if API server is newer than #6277
37 def sanitize_filename(dirty):
38 """Replace disallowed filename characters with harmless "_"."""
48 return _disallowed_filename_characters.sub('_', dirty)
51 class Directory(FreshBase):
52 """Generic directory object, backed by a dict.
54 Consists of a set of entries with the key representing the filename
55 and the value referencing a File or Directory object.
58 def __init__(self, parent_inode, inodes):
59 """parent_inode is the integer inode number"""
61 super(Directory, self).__init__()
64 if not isinstance(parent_inode, int):
65 raise Exception("parent_inode should be an int")
66 self.parent_inode = parent_inode
69 self._mtime = time.time()
71 # Overriden by subclasses to implement logic to update the entries dict
72 # when the directory is stale
77 # Only used when computing the size of the disk footprint of the directory
85 def checkupdate(self):
89 except apiclient.errors.HttpError as e:
94 def __getitem__(self, item):
95 return self._entries[item]
100 return list(self._entries.items())
104 def __contains__(self, k):
105 return k in self._entries
110 return len(self._entries)
113 self.inodes.touch(self)
114 super(Directory, self).fresh()
116 def merge(self, items, fn, same, new_entry):
117 """Helper method for updating the contents of the directory.
119 Takes a list describing the new contents of the directory, reuse
120 entries that are the same in both the old and new lists, create new
121 entries, and delete old entries missing from the new list.
123 :items: iterable with new directory contents
125 :fn: function to take an entry in 'items' and return the desired file or
126 directory name, or None if this entry should be skipped
128 :same: function to compare an existing entry (a File or Directory
129 object) with an entry in the items list to determine whether to keep
132 :new_entry: function to create a new directory entry (File or Directory
133 object) from an entry in the items list.
137 oldentries = self._entries
141 name = sanitize_filename(fn(i))
143 if name in oldentries and same(oldentries[name], i):
144 # move existing directory entry over
145 self._entries[name] = oldentries[name]
148 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
149 # create new directory entry
152 self._entries[name] = self.inodes.add_entry(ent)
155 # delete any other directory entries that were not in found in 'items'
157 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
158 self.inodes.invalidate_entry(self, i)
159 self.inodes.del_entry(oldentries[i])
163 self.inodes.invalidate_inode(self)
164 self._mtime = time.time()
169 if super(Directory, self).in_use():
171 for v in itervalues(self._entries):
176 def has_ref(self, only_children):
177 if super(Directory, self).has_ref(only_children):
179 for v in itervalues(self._entries):
185 """Delete all entries"""
186 oldentries = self._entries
189 oldentries[n].clear()
190 self.inodes.del_entry(oldentries[n])
193 def kernel_invalidate(self):
194 # Invalidating the dentry on the parent implies invalidating all paths
196 parent = self.inodes[self.parent_inode]
198 # Find self on the parent in order to invalidate this path.
199 # Calling the public items() method might trigger a refresh,
200 # which we definitely don't want, so read the internal dict directly.
201 for k,v in viewitems(parent._entries):
203 self.inodes.invalidate_entry(parent, k)
215 def want_event_subscribe(self):
216 raise NotImplementedError()
218 def create(self, name):
219 raise NotImplementedError()
221 def mkdir(self, name):
222 raise NotImplementedError()
224 def unlink(self, name):
225 raise NotImplementedError()
227 def rmdir(self, name):
228 raise NotImplementedError()
230 def rename(self, name_old, name_new, src):
231 raise NotImplementedError()
234 class CollectionDirectoryBase(Directory):
235 """Represent an Arvados Collection as a directory.
237 This class is used for Subcollections, and is also the base class for
238 CollectionDirectory, which implements collection loading/saving on
241 Most operations act only the underlying Arvados `Collection` object. The
242 `Collection` object signals via a notify callback to
243 `CollectionDirectoryBase.on_event` that an item was added, removed or
244 modified. FUSE inodes and directory entries are created, deleted or
245 invalidated in response to these events.
249 def __init__(self, parent_inode, inodes, collection):
250 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
251 self.collection = collection
253 def new_entry(self, name, item, mtime):
254 name = sanitize_filename(name)
255 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
256 if item.fuse_entry.dead is not True:
257 raise Exception("Can only reparent dead inode entry")
258 if item.fuse_entry.inode is None:
259 raise Exception("Reparented entry must still have valid inode")
260 item.fuse_entry.dead = False
261 self._entries[name] = item.fuse_entry
262 elif isinstance(item, arvados.collection.RichCollectionBase):
263 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
264 self._entries[name].populate(mtime)
266 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
267 item.fuse_entry = self._entries[name]
269 def on_event(self, event, collection, name, item):
270 if collection == self.collection:
271 name = sanitize_filename(name)
272 _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
274 if event == arvados.collection.ADD:
275 self.new_entry(name, item, self.mtime())
276 elif event == arvados.collection.DEL:
277 ent = self._entries[name]
278 del self._entries[name]
279 self.inodes.invalidate_entry(self, name)
280 self.inodes.del_entry(ent)
281 elif event == arvados.collection.MOD:
282 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
283 self.inodes.invalidate_inode(item.fuse_entry)
284 elif name in self._entries:
285 self.inodes.invalidate_inode(self._entries[name])
287 def populate(self, mtime):
289 self.collection.subscribe(self.on_event)
290 for entry, item in viewitems(self.collection):
291 self.new_entry(entry, item, self.mtime())
294 return self.collection.writable()
298 with llfuse.lock_released:
299 self.collection.root_collection().save()
303 def create(self, name):
304 with llfuse.lock_released:
305 self.collection.open(name, "w").close()
309 def mkdir(self, name):
310 with llfuse.lock_released:
311 self.collection.mkdirs(name)
315 def unlink(self, name):
316 with llfuse.lock_released:
317 self.collection.remove(name)
322 def rmdir(self, name):
323 with llfuse.lock_released:
324 self.collection.remove(name)
329 def rename(self, name_old, name_new, src):
330 if not isinstance(src, CollectionDirectoryBase):
331 raise llfuse.FUSEError(errno.EPERM)
336 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
338 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
340 raise llfuse.FUSEError(errno.ENOTEMPTY)
341 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
342 raise llfuse.FUSEError(errno.ENOTDIR)
343 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
344 raise llfuse.FUSEError(errno.EISDIR)
346 with llfuse.lock_released:
347 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
352 super(CollectionDirectoryBase, self).clear()
353 self.collection = None
356 class CollectionDirectory(CollectionDirectoryBase):
357 """Represents the root of a directory tree representing a collection."""
359 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
360 super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
362 self.num_retries = num_retries
363 self.collection_record_file = None
364 self.collection_record = None
367 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
369 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
370 self._poll_time = 60*60
372 if isinstance(collection_record, dict):
373 self.collection_locator = collection_record['uuid']
374 self._mtime = convertTime(collection_record.get('modified_at'))
376 self.collection_locator = collection_record
378 self._manifest_size = 0
379 if self.collection_locator:
380 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
381 self._updating_lock = threading.Lock()
384 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
387 return self.collection.writable() if self.collection is not None else self._writable
389 def want_event_subscribe(self):
390 return (uuid_pattern.match(self.collection_locator) is not None)
392 # Used by arv-web.py to switch the contents of the CollectionDirectory
393 def change_collection(self, new_locator):
394 """Switch the contents of the CollectionDirectory.
396 Must be called with llfuse.lock held.
399 self.collection_locator = new_locator
400 self.collection_record = None
403 def new_collection(self, new_collection_record, coll_reader):
407 self.collection_record = new_collection_record
409 if self.collection_record:
410 self._mtime = convertTime(self.collection_record.get('modified_at'))
411 self.collection_locator = self.collection_record["uuid"]
412 if self.collection_record_file is not None:
413 self.collection_record_file.update(self.collection_record)
415 self.collection = coll_reader
416 self.populate(self.mtime())
419 return self.collection_locator
422 def update(self, to_record_version=None):
424 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
427 if self.collection_locator is None:
432 with llfuse.lock_released:
433 self._updating_lock.acquire()
437 _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
438 if self.collection is not None:
439 if self.collection.known_past_version(to_record_version):
440 _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
442 self.collection.update()
444 if uuid_pattern.match(self.collection_locator):
445 coll_reader = arvados.collection.Collection(
446 self.collection_locator, self.api, self.api.keep,
447 num_retries=self.num_retries)
449 coll_reader = arvados.collection.CollectionReader(
450 self.collection_locator, self.api, self.api.keep,
451 num_retries=self.num_retries)
452 new_collection_record = coll_reader.api_response() or {}
453 # If the Collection only exists in Keep, there will be no API
454 # response. Fill in the fields we need.
455 if 'uuid' not in new_collection_record:
456 new_collection_record['uuid'] = self.collection_locator
457 if "portable_data_hash" not in new_collection_record:
458 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
459 if 'manifest_text' not in new_collection_record:
460 new_collection_record['manifest_text'] = coll_reader.manifest_text()
462 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
463 self.new_collection(new_collection_record, coll_reader)
465 self._manifest_size = len(coll_reader.manifest_text())
466 _logger.debug("%s manifest_size %i", self, self._manifest_size)
467 # end with llfuse.lock_released, re-acquire lock
472 self._updating_lock.release()
473 except arvados.errors.NotFoundError as e:
474 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
475 except arvados.errors.ArgumentError as detail:
476 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
477 if self.collection_record is not None and "manifest_text" in self.collection_record:
478 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
480 _logger.exception("arv-mount %s: error", self.collection_locator)
481 if self.collection_record is not None and "manifest_text" in self.collection_record:
482 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
488 def __getitem__(self, item):
489 if item == '.arvados#collection':
490 if self.collection_record_file is None:
491 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
492 self.inodes.add_entry(self.collection_record_file)
493 return self.collection_record_file
495 return super(CollectionDirectory, self).__getitem__(item)
497 def __contains__(self, k):
498 if k == '.arvados#collection':
501 return super(CollectionDirectory, self).__contains__(k)
503 def invalidate(self):
504 self.collection_record = None
505 self.collection_record_file = None
506 super(CollectionDirectory, self).invalidate()
509 return (self.collection_locator is not None)
512 # This is an empirically-derived heuristic to estimate the memory used
513 # to store this collection's metadata. Calculating the memory
514 # footprint directly would be more accurate, but also more complicated.
515 return self._manifest_size * 128
518 if self.collection is not None:
520 self.collection.save()
521 self.collection.stop_threads()
524 if self.collection is not None:
525 self.collection.stop_threads()
526 super(CollectionDirectory, self).clear()
527 self._manifest_size = 0
530 class TmpCollectionDirectory(CollectionDirectoryBase):
531 """A directory backed by an Arvados collection that never gets saved.
533 This supports using Keep as scratch space. A userspace program can
534 read the .arvados#collection file to get a current manifest in
535 order to save a snapshot of the scratch data or use it as a crunch
539 class UnsaveableCollection(arvados.collection.Collection):
545 def __init__(self, parent_inode, inodes, api_client, num_retries):
546 collection = self.UnsaveableCollection(
547 api_client=api_client,
548 keep_client=api_client.keep,
549 num_retries=num_retries)
550 super(TmpCollectionDirectory, self).__init__(
551 parent_inode, inodes, collection)
552 self.collection_record_file = None
553 self.populate(self.mtime())
555 def on_event(self, *args, **kwargs):
556 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
557 if self.collection_record_file:
559 self.collection_record_file.invalidate()
560 self.inodes.invalidate_inode(self.collection_record_file)
561 _logger.debug("%s invalidated collection record", self)
563 def collection_record(self):
564 with llfuse.lock_released:
567 "manifest_text": self.collection.manifest_text(),
568 "portable_data_hash": self.collection.portable_data_hash(),
571 def __contains__(self, k):
572 return (k == '.arvados#collection' or
573 super(TmpCollectionDirectory, self).__contains__(k))
576 def __getitem__(self, item):
577 if item == '.arvados#collection':
578 if self.collection_record_file is None:
579 self.collection_record_file = FuncToJSONFile(
580 self.inode, self.collection_record)
581 self.inodes.add_entry(self.collection_record_file)
582 return self.collection_record_file
583 return super(TmpCollectionDirectory, self).__getitem__(item)
591 def want_event_subscribe(self):
595 self.collection.stop_threads()
597 def invalidate(self):
598 if self.collection_record_file:
599 self.collection_record_file.invalidate()
600 super(TmpCollectionDirectory, self).invalidate()
603 class MagicDirectory(Directory):
604 """A special directory that logically contains the set of all extant keep locators.
606 When a file is referenced by lookup(), it is tested to see if it is a valid
607 keep locator to a manifest, and if so, loads the manifest contents as a
608 subdirectory of this directory with the locator as the directory name.
609 Since querying a list of all extant keep locators is impractical, only
610 collections that have already been accessed are visible to readdir().
615 This directory provides access to Arvados collections as subdirectories listed
616 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
617 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
618 (in the form 'zzzzz-j7d0g-1234567890abcde').
620 Note that this directory will appear empty until you attempt to access a
621 specific collection or project subdirectory (such as trying to 'cd' into it),
622 at which point the collection or project will actually be looked up on the server
623 and the directory will appear if it exists.
627 def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
628 super(MagicDirectory, self).__init__(parent_inode, inodes)
630 self.num_retries = num_retries
631 self.pdh_only = pdh_only
633 def __setattr__(self, name, value):
634 super(MagicDirectory, self).__setattr__(name, value)
635 # When we're assigned an inode, add a README.
636 if ((name == 'inode') and (self.inode is not None) and
637 (not self._entries)):
638 self._entries['README'] = self.inodes.add_entry(
639 StringFile(self.inode, self.README_TEXT, time.time()))
640 # If we're the root directory, add an identical by_id subdirectory.
641 if self.inode == llfuse.ROOT_INODE:
642 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
643 self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
645 def __contains__(self, k):
646 if k in self._entries:
649 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
655 if group_uuid_pattern.match(k):
656 project = self.api.groups().list(
657 filters=[['group_class', '=', 'project'], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
658 if project[u'items_available'] == 0:
660 e = self.inodes.add_entry(ProjectDirectory(
661 self.inode, self.inodes, self.api, self.num_retries, project[u'items'][0]))
663 e = self.inodes.add_entry(CollectionDirectory(
664 self.inode, self.inodes, self.api, self.num_retries, k))
667 if k not in self._entries:
670 self.inodes.del_entry(e)
673 self.inodes.invalidate_entry(self, k)
674 self.inodes.del_entry(e)
676 except Exception as ex:
677 _logger.exception("arv-mount lookup '%s':", k)
679 self.inodes.del_entry(e)
682 def __getitem__(self, item):
684 return self._entries[item]
686 raise KeyError("No collection with id " + item)
691 def want_event_subscribe(self):
692 return not self.pdh_only
695 class TagsDirectory(Directory):
696 """A special directory that contains as subdirectories all tags visible to the user."""
698 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
699 super(TagsDirectory, self).__init__(parent_inode, inodes)
701 self.num_retries = num_retries
703 self._poll_time = poll_time
706 def want_event_subscribe(self):
711 with llfuse.lock_released:
712 tags = self.api.links().list(
713 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
714 select=['name'], distinct=True, limit=1000
715 ).execute(num_retries=self.num_retries)
717 self.merge(tags['items']+[{"name": n} for n in self._extra],
719 lambda a, i: a.tag == i['name'],
720 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
724 def __getitem__(self, item):
725 if super(TagsDirectory, self).__contains__(item):
726 return super(TagsDirectory, self).__getitem__(item)
727 with llfuse.lock_released:
728 tags = self.api.links().list(
729 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
730 ).execute(num_retries=self.num_retries)
732 self._extra.add(item)
734 return super(TagsDirectory, self).__getitem__(item)
738 def __contains__(self, k):
739 if super(TagsDirectory, self).__contains__(k):
749 class TagDirectory(Directory):
750 """A special directory that contains as subdirectories all collections visible
751 to the user that are tagged with a particular tag.
754 def __init__(self, parent_inode, inodes, api, num_retries, tag,
755 poll=False, poll_time=60):
756 super(TagDirectory, self).__init__(parent_inode, inodes)
758 self.num_retries = num_retries
761 self._poll_time = poll_time
763 def want_event_subscribe(self):
768 with llfuse.lock_released:
769 taggedcollections = self.api.links().list(
770 filters=[['link_class', '=', 'tag'],
771 ['name', '=', self.tag],
772 ['head_uuid', 'is_a', 'arvados#collection']],
774 ).execute(num_retries=self.num_retries)
775 self.merge(taggedcollections['items'],
776 lambda i: i['head_uuid'],
777 lambda a, i: a.collection_locator == i['head_uuid'],
778 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
781 class ProjectDirectory(Directory):
782 """A special directory that contains the contents of a project."""
784 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
785 poll=False, poll_time=60):
786 super(ProjectDirectory, self).__init__(parent_inode, inodes)
788 self.num_retries = num_retries
789 self.project_object = project_object
790 self.project_object_file = None
791 self.project_uuid = project_object['uuid']
793 self._poll_time = poll_time
794 self._updating_lock = threading.Lock()
795 self._current_user = None
796 self._full_listing = False
798 def want_event_subscribe(self):
801 def createDirectory(self, i):
802 if collection_uuid_pattern.match(i['uuid']):
803 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
804 elif group_uuid_pattern.match(i['uuid']):
805 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
806 elif link_uuid_pattern.match(i['uuid']):
807 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
808 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
811 elif uuid_pattern.match(i['uuid']):
812 return ObjectFile(self.parent_inode, i)
817 return self.project_uuid
820 self._full_listing = True
821 return super(ProjectDirectory, self).items()
825 if i['name'] is None or len(i['name']) == 0:
827 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
828 # collection or subproject
830 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
833 elif 'kind' in i and i['kind'].startswith('arvados#'):
835 return "{}.{}".format(i['name'], i['kind'][8:])
842 if self.project_object_file == None:
843 self.project_object_file = ObjectFile(self.inode, self.project_object)
844 self.inodes.add_entry(self.project_object_file)
846 if not self._full_listing:
850 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
851 return a.uuid() == i['uuid']
852 elif isinstance(a, ObjectFile):
853 return a.uuid() == i['uuid'] and not a.stale()
857 with llfuse.lock_released:
858 self._updating_lock.acquire()
862 if group_uuid_pattern.match(self.project_uuid):
863 self.project_object = self.api.groups().get(
864 uuid=self.project_uuid).execute(num_retries=self.num_retries)
865 elif user_uuid_pattern.match(self.project_uuid):
866 self.project_object = self.api.users().get(
867 uuid=self.project_uuid).execute(num_retries=self.num_retries)
869 contents = arvados.util.list_all(self.api.groups().list,
871 filters=[["owner_uuid", "=", self.project_uuid],
872 ["group_class", "=", "project"]])
873 contents.extend(arvados.util.list_all(self.api.collections().list,
875 filters=[["owner_uuid", "=", self.project_uuid]]))
877 # end with llfuse.lock_released, re-acquire lock
882 self.createDirectory)
885 self._updating_lock.release()
887 def _add_entry(self, i, name):
888 ent = self.createDirectory(i)
889 self._entries[name] = self.inodes.add_entry(ent)
890 return self._entries[name]
894 def __getitem__(self, k):
895 if k == '.arvados#project':
896 return self.project_object_file
897 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
898 return super(ProjectDirectory, self).__getitem__(k)
899 with llfuse.lock_released:
900 contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
901 ["group_class", "=", "project"],
903 limit=1).execute(num_retries=self.num_retries)["items"]
905 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
907 limit=1).execute(num_retries=self.num_retries)["items"]
909 name = sanitize_filename(self.namefn(contents[0]))
912 return self._add_entry(contents[0], name)
917 def __contains__(self, k):
918 if k == '.arvados#project':
930 with llfuse.lock_released:
931 if not self._current_user:
932 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
933 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
940 def mkdir(self, name):
942 with llfuse.lock_released:
943 self.api.collections().create(body={"owner_uuid": self.project_uuid,
945 "manifest_text": ""}).execute(num_retries=self.num_retries)
947 except apiclient_errors.Error as error:
949 raise llfuse.FUSEError(errno.EEXIST)
953 def rmdir(self, name):
955 raise llfuse.FUSEError(errno.ENOENT)
956 if not isinstance(self[name], CollectionDirectory):
957 raise llfuse.FUSEError(errno.EPERM)
958 if len(self[name]) > 0:
959 raise llfuse.FUSEError(errno.ENOTEMPTY)
960 with llfuse.lock_released:
961 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
966 def rename(self, name_old, name_new, src):
967 if not isinstance(src, ProjectDirectory):
968 raise llfuse.FUSEError(errno.EPERM)
972 if not isinstance(ent, CollectionDirectory):
973 raise llfuse.FUSEError(errno.EPERM)
976 # POSIX semantics for replacing one directory with another is
977 # tricky (the target directory must be empty, the operation must be
978 # atomic which isn't possible with the Arvados API as of this
979 # writing) so don't support that.
980 raise llfuse.FUSEError(errno.EPERM)
982 self.api.collections().update(uuid=ent.uuid(),
983 body={"owner_uuid": self.uuid(),
984 "name": name_new}).execute(num_retries=self.num_retries)
986 # Acually move the entry from source directory to this directory.
987 del src._entries[name_old]
988 self._entries[name_new] = ent
989 self.inodes.invalidate_entry(src, name_old)
992 def child_event(self, ev):
993 properties = ev.get("properties") or {}
994 old_attrs = properties.get("old_attributes") or {}
995 new_attrs = properties.get("new_attributes") or {}
996 old_attrs["uuid"] = ev["object_uuid"]
997 new_attrs["uuid"] = ev["object_uuid"]
998 old_name = sanitize_filename(self.namefn(old_attrs))
999 new_name = sanitize_filename(self.namefn(new_attrs))
1001 # create events will have a new name, but not an old name
1002 # delete events will have an old name, but not a new name
1003 # update events will have an old and new name, and they may be same or different
1004 # if they are the same, an unrelated field changed and there is nothing to do.
1006 if old_attrs.get("owner_uuid") != self.project_uuid:
1007 # Was moved from somewhere else, so don't try to remove entry.
1009 if ev.get("object_owner_uuid") != self.project_uuid:
1010 # Was moved to somewhere else, so don't try to add entry
1013 if old_attrs.get("is_trashed"):
1014 # Was previously deleted
1016 if new_attrs.get("is_trashed"):
1020 if new_name != old_name:
1022 if old_name in self._entries:
1023 ent = self._entries[old_name]
1024 del self._entries[old_name]
1025 self.inodes.invalidate_entry(self, old_name)
1029 self._entries[new_name] = ent
1031 self._add_entry(new_attrs, new_name)
1032 elif ent is not None:
1033 self.inodes.del_entry(ent)
1036 class SharedDirectory(Directory):
1037 """A special directory that represents users or groups who have shared projects with me."""
1039 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1040 poll=False, poll_time=60):
1041 super(SharedDirectory, self).__init__(parent_inode, inodes)
1043 self.num_retries = num_retries
1044 self.current_user = api.users().current().execute(num_retries=num_retries)
1046 self._poll_time = poll_time
1047 self._updating_lock = threading.Lock()
1052 with llfuse.lock_released:
1053 self._updating_lock.acquire()
1054 if not self.stale():
1062 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1063 if 'httpMethod' in methods.get('shared', {}):
1066 resp = self.api.groups().shared(filters=[['group_class', '=', 'project']]+page,
1070 include="owner_uuid").execute()
1071 if not resp["items"]:
1073 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1074 for r in resp["items"]:
1075 objects[r["uuid"]] = r
1076 roots.append(r["uuid"])
1077 for r in resp["included"]:
1078 objects[r["uuid"]] = r
1079 root_owners.add(r["uuid"])
1081 all_projects = arvados.util.list_all(
1082 self.api.groups().list, self.num_retries,
1083 filters=[['group_class','=','project']],
1084 select=["uuid", "owner_uuid"])
1085 for ob in all_projects:
1086 objects[ob['uuid']] = ob
1088 current_uuid = self.current_user['uuid']
1089 for ob in all_projects:
1090 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1091 roots.append(ob['uuid'])
1092 root_owners.add(ob['owner_uuid'])
1094 lusers = arvados.util.list_all(
1095 self.api.users().list, self.num_retries,
1096 filters=[['uuid','in', list(root_owners)]])
1097 lgroups = arvados.util.list_all(
1098 self.api.groups().list, self.num_retries,
1099 filters=[['uuid','in', list(root_owners)+roots]])
1102 objects[l["uuid"]] = l
1104 objects[l["uuid"]] = l
1106 for r in root_owners:
1110 contents[obr["name"]] = obr
1111 #elif obr.get("username"):
1112 # contents[obr["username"]] = obr
1113 elif "first_name" in obr:
1114 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1119 if obr['owner_uuid'] not in objects:
1120 contents[obr["name"]] = obr
1122 # end with llfuse.lock_released, re-acquire lock
1124 self.merge(viewitems(contents),
1126 lambda a, i: a.uuid() == i[1]['uuid'],
1127 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
1129 _logger.exception("arv-mount shared dir error")
1131 self._updating_lock.release()
1133 def want_event_subscribe(self):