1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
13 from apiclient import errors as apiclient_errors
17 from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from fresh import FreshBase, convertTime, use_counter, check_update
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
23 _logger = logging.getLogger('arvados.arvados_fuse')
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
31 # '.' and '..' are not reachable if API server is newer than #6277
32 def sanitize_filename(dirty):
33 """Replace disallowed filename characters with harmless "_"."""
43 return _disallowed_filename_characters.sub('_', dirty)
46 class Directory(FreshBase):
47 """Generic directory object, backed by a dict.
49 Consists of a set of entries with the key representing the filename
50 and the value referencing a File or Directory object.
53 def __init__(self, parent_inode, inodes):
54 """parent_inode is the integer inode number"""
56 super(Directory, self).__init__()
59 if not isinstance(parent_inode, int):
60 raise Exception("parent_inode should be an int")
61 self.parent_inode = parent_inode
64 self._mtime = time.time()
66 # Overriden by subclasses to implement logic to update the entries dict
67 # when the directory is stale
72 # Only used when computing the size of the disk footprint of the directory
80 def checkupdate(self):
84 except apiclient.errors.HttpError as e:
89 def __getitem__(self, item):
90 return self._entries[item]
95 return list(self._entries.items())
99 def __contains__(self, k):
100 return k in self._entries
105 return len(self._entries)
108 self.inodes.touch(self)
109 super(Directory, self).fresh()
111 def merge(self, items, fn, same, new_entry):
112 """Helper method for updating the contents of the directory.
114 Takes a list describing the new contents of the directory, reuse
115 entries that are the same in both the old and new lists, create new
116 entries, and delete old entries missing from the new list.
118 :items: iterable with new directory contents
120 :fn: function to take an entry in 'items' and return the desired file or
121 directory name, or None if this entry should be skipped
123 :same: function to compare an existing entry (a File or Directory
124 object) with an entry in the items list to determine whether to keep
127 :new_entry: function to create a new directory entry (File or Directory
128 object) from an entry in the items list.
132 oldentries = self._entries
136 name = sanitize_filename(fn(i))
138 if name in oldentries and same(oldentries[name], i):
139 # move existing directory entry over
140 self._entries[name] = oldentries[name]
143 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
144 # create new directory entry
147 self._entries[name] = self.inodes.add_entry(ent)
150 # delete any other directory entries that were not in found in 'items'
152 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
153 self.inodes.invalidate_entry(self, i)
154 self.inodes.del_entry(oldentries[i])
158 self.inodes.invalidate_inode(self)
159 self._mtime = time.time()
164 if super(Directory, self).in_use():
166 for v in self._entries.itervalues():
171 def has_ref(self, only_children):
172 if super(Directory, self).has_ref(only_children):
174 for v in self._entries.itervalues():
180 """Delete all entries"""
181 oldentries = self._entries
184 oldentries[n].clear()
185 self.inodes.del_entry(oldentries[n])
188 def kernel_invalidate(self):
189 # Invalidating the dentry on the parent implies invalidating all paths
191 parent = self.inodes[self.parent_inode]
193 # Find self on the parent in order to invalidate this path.
194 # Calling the public items() method might trigger a refresh,
195 # which we definitely don't want, so read the internal dict directly.
196 for k,v in parent._entries.items():
198 self.inodes.invalidate_entry(parent, k)
210 def want_event_subscribe(self):
211 raise NotImplementedError()
213 def create(self, name):
214 raise NotImplementedError()
216 def mkdir(self, name):
217 raise NotImplementedError()
219 def unlink(self, name):
220 raise NotImplementedError()
222 def rmdir(self, name):
223 raise NotImplementedError()
225 def rename(self, name_old, name_new, src):
226 raise NotImplementedError()
229 class CollectionDirectoryBase(Directory):
230 """Represent an Arvados Collection as a directory.
232 This class is used for Subcollections, and is also the base class for
233 CollectionDirectory, which implements collection loading/saving on
236 Most operations act only the underlying Arvados `Collection` object. The
237 `Collection` object signals via a notify callback to
238 `CollectionDirectoryBase.on_event` that an item was added, removed or
239 modified. FUSE inodes and directory entries are created, deleted or
240 invalidated in response to these events.
244 def __init__(self, parent_inode, inodes, collection):
245 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
246 self.collection = collection
248 def new_entry(self, name, item, mtime):
249 name = sanitize_filename(name)
250 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
251 if item.fuse_entry.dead is not True:
252 raise Exception("Can only reparent dead inode entry")
253 if item.fuse_entry.inode is None:
254 raise Exception("Reparented entry must still have valid inode")
255 item.fuse_entry.dead = False
256 self._entries[name] = item.fuse_entry
257 elif isinstance(item, arvados.collection.RichCollectionBase):
258 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
259 self._entries[name].populate(mtime)
261 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
262 item.fuse_entry = self._entries[name]
264 def on_event(self, event, collection, name, item):
265 if collection == self.collection:
266 name = sanitize_filename(name)
267 _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
269 if event == arvados.collection.ADD:
270 self.new_entry(name, item, self.mtime())
271 elif event == arvados.collection.DEL:
272 ent = self._entries[name]
273 del self._entries[name]
274 self.inodes.invalidate_entry(self, name)
275 self.inodes.del_entry(ent)
276 elif event == arvados.collection.MOD:
277 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
278 self.inodes.invalidate_inode(item.fuse_entry)
279 elif name in self._entries:
280 self.inodes.invalidate_inode(self._entries[name])
282 def populate(self, mtime):
284 self.collection.subscribe(self.on_event)
285 for entry, item in self.collection.items():
286 self.new_entry(entry, item, self.mtime())
289 return self.collection.writable()
293 with llfuse.lock_released:
294 self.collection.root_collection().save()
298 def create(self, name):
299 with llfuse.lock_released:
300 self.collection.open(name, "w").close()
304 def mkdir(self, name):
305 with llfuse.lock_released:
306 self.collection.mkdirs(name)
310 def unlink(self, name):
311 with llfuse.lock_released:
312 self.collection.remove(name)
317 def rmdir(self, name):
318 with llfuse.lock_released:
319 self.collection.remove(name)
324 def rename(self, name_old, name_new, src):
325 if not isinstance(src, CollectionDirectoryBase):
326 raise llfuse.FUSEError(errno.EPERM)
331 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
333 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
335 raise llfuse.FUSEError(errno.ENOTEMPTY)
336 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
337 raise llfuse.FUSEError(errno.ENOTDIR)
338 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
339 raise llfuse.FUSEError(errno.EISDIR)
341 with llfuse.lock_released:
342 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
347 super(CollectionDirectoryBase, self).clear()
348 self.collection = None
351 class CollectionDirectory(CollectionDirectoryBase):
352 """Represents the root of a directory tree representing a collection."""
354 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
355 super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
357 self.num_retries = num_retries
358 self.collection_record_file = None
359 self.collection_record = None
362 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2)
364 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
365 self._poll_time = 60*60
367 if isinstance(collection_record, dict):
368 self.collection_locator = collection_record['uuid']
369 self._mtime = convertTime(collection_record.get('modified_at'))
371 self.collection_locator = collection_record
373 self._manifest_size = 0
374 if self.collection_locator:
375 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
376 self._updating_lock = threading.Lock()
379 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
382 return self.collection.writable() if self.collection is not None else self._writable
384 def want_event_subscribe(self):
385 return (uuid_pattern.match(self.collection_locator) is not None)
387 # Used by arv-web.py to switch the contents of the CollectionDirectory
388 def change_collection(self, new_locator):
389 """Switch the contents of the CollectionDirectory.
391 Must be called with llfuse.lock held.
394 self.collection_locator = new_locator
395 self.collection_record = None
398 def new_collection(self, new_collection_record, coll_reader):
402 self.collection_record = new_collection_record
404 if self.collection_record:
405 self._mtime = convertTime(self.collection_record.get('modified_at'))
406 self.collection_locator = self.collection_record["uuid"]
407 if self.collection_record_file is not None:
408 self.collection_record_file.update(self.collection_record)
410 self.collection = coll_reader
411 self.populate(self.mtime())
414 return self.collection_locator
417 def update(self, to_record_version=None):
419 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
422 if self.collection_locator is None:
427 with llfuse.lock_released:
428 self._updating_lock.acquire()
432 _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
433 if self.collection is not None:
434 if self.collection.known_past_version(to_record_version):
435 _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
437 self.collection.update()
439 if uuid_pattern.match(self.collection_locator):
440 coll_reader = arvados.collection.Collection(
441 self.collection_locator, self.api, self.api.keep,
442 num_retries=self.num_retries)
444 coll_reader = arvados.collection.CollectionReader(
445 self.collection_locator, self.api, self.api.keep,
446 num_retries=self.num_retries)
447 new_collection_record = coll_reader.api_response() or {}
448 # If the Collection only exists in Keep, there will be no API
449 # response. Fill in the fields we need.
450 if 'uuid' not in new_collection_record:
451 new_collection_record['uuid'] = self.collection_locator
452 if "portable_data_hash" not in new_collection_record:
453 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
454 if 'manifest_text' not in new_collection_record:
455 new_collection_record['manifest_text'] = coll_reader.manifest_text()
457 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
458 self.new_collection(new_collection_record, coll_reader)
460 self._manifest_size = len(coll_reader.manifest_text())
461 _logger.debug("%s manifest_size %i", self, self._manifest_size)
462 # end with llfuse.lock_released, re-acquire lock
467 self._updating_lock.release()
468 except arvados.errors.NotFoundError as e:
469 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
470 except arvados.errors.ArgumentError as detail:
471 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
472 if self.collection_record is not None and "manifest_text" in self.collection_record:
473 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
475 _logger.exception("arv-mount %s: error", self.collection_locator)
476 if self.collection_record is not None and "manifest_text" in self.collection_record:
477 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
483 def __getitem__(self, item):
484 if item == '.arvados#collection':
485 if self.collection_record_file is None:
486 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
487 self.inodes.add_entry(self.collection_record_file)
488 return self.collection_record_file
490 return super(CollectionDirectory, self).__getitem__(item)
492 def __contains__(self, k):
493 if k == '.arvados#collection':
496 return super(CollectionDirectory, self).__contains__(k)
498 def invalidate(self):
499 self.collection_record = None
500 self.collection_record_file = None
501 super(CollectionDirectory, self).invalidate()
504 return (self.collection_locator is not None)
507 # This is an empirically-derived heuristic to estimate the memory used
508 # to store this collection's metadata. Calculating the memory
509 # footprint directly would be more accurate, but also more complicated.
510 return self._manifest_size * 128
513 if self.collection is not None:
515 self.collection.save()
516 self.collection.stop_threads()
519 if self.collection is not None:
520 self.collection.stop_threads()
521 super(CollectionDirectory, self).clear()
522 self._manifest_size = 0
525 class TmpCollectionDirectory(CollectionDirectoryBase):
526 """A directory backed by an Arvados collection that never gets saved.
528 This supports using Keep as scratch space. A userspace program can
529 read the .arvados#collection file to get a current manifest in
530 order to save a snapshot of the scratch data or use it as a crunch
534 class UnsaveableCollection(arvados.collection.Collection):
540 def __init__(self, parent_inode, inodes, api_client, num_retries):
541 collection = self.UnsaveableCollection(
542 api_client=api_client,
543 keep_client=api_client.keep,
544 num_retries=num_retries)
545 super(TmpCollectionDirectory, self).__init__(
546 parent_inode, inodes, collection)
547 self.collection_record_file = None
548 self.populate(self.mtime())
550 def on_event(self, *args, **kwargs):
551 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
552 if self.collection_record_file:
554 self.collection_record_file.invalidate()
555 self.inodes.invalidate_inode(self.collection_record_file)
556 _logger.debug("%s invalidated collection record", self)
558 def collection_record(self):
559 with llfuse.lock_released:
562 "manifest_text": self.collection.manifest_text(),
563 "portable_data_hash": self.collection.portable_data_hash(),
566 def __contains__(self, k):
567 return (k == '.arvados#collection' or
568 super(TmpCollectionDirectory, self).__contains__(k))
571 def __getitem__(self, item):
572 if item == '.arvados#collection':
573 if self.collection_record_file is None:
574 self.collection_record_file = FuncToJSONFile(
575 self.inode, self.collection_record)
576 self.inodes.add_entry(self.collection_record_file)
577 return self.collection_record_file
578 return super(TmpCollectionDirectory, self).__getitem__(item)
586 def want_event_subscribe(self):
590 self.collection.stop_threads()
592 def invalidate(self):
593 if self.collection_record_file:
594 self.collection_record_file.invalidate()
595 super(TmpCollectionDirectory, self).invalidate()
598 class MagicDirectory(Directory):
599 """A special directory that logically contains the set of all extant keep locators.
601 When a file is referenced by lookup(), it is tested to see if it is a valid
602 keep locator to a manifest, and if so, loads the manifest contents as a
603 subdirectory of this directory with the locator as the directory name.
604 Since querying a list of all extant keep locators is impractical, only
605 collections that have already been accessed are visible to readdir().
610 This directory provides access to Arvados collections as subdirectories listed
611 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
612 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
613 (in the form 'zzzzz-j7d0g-1234567890abcde').
615 Note that this directory will appear empty until you attempt to access a
616 specific collection or project subdirectory (such as trying to 'cd' into it),
617 at which point the collection or project will actually be looked up on the server
618 and the directory will appear if it exists.
622 def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
623 super(MagicDirectory, self).__init__(parent_inode, inodes)
625 self.num_retries = num_retries
626 self.pdh_only = pdh_only
628 def __setattr__(self, name, value):
629 super(MagicDirectory, self).__setattr__(name, value)
630 # When we're assigned an inode, add a README.
631 if ((name == 'inode') and (self.inode is not None) and
632 (not self._entries)):
633 self._entries['README'] = self.inodes.add_entry(
634 StringFile(self.inode, self.README_TEXT, time.time()))
635 # If we're the root directory, add an identical by_id subdirectory.
636 if self.inode == llfuse.ROOT_INODE:
637 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
638 self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
640 def __contains__(self, k):
641 if k in self._entries:
644 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
650 if group_uuid_pattern.match(k):
651 project = self.api.groups().list(
652 filters=[['group_class', '=', 'project'], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
653 if project[u'items_available'] == 0:
655 e = self.inodes.add_entry(ProjectDirectory(
656 self.inode, self.inodes, self.api, self.num_retries, project[u'items'][0]))
658 e = self.inodes.add_entry(CollectionDirectory(
659 self.inode, self.inodes, self.api, self.num_retries, k))
662 if k not in self._entries:
665 self.inodes.del_entry(e)
668 self.inodes.invalidate_entry(self, k)
669 self.inodes.del_entry(e)
671 except Exception as ex:
672 _logger.exception("arv-mount lookup '%s':", k)
674 self.inodes.del_entry(e)
677 def __getitem__(self, item):
679 return self._entries[item]
681 raise KeyError("No collection with id " + item)
686 def want_event_subscribe(self):
687 return not self.pdh_only
690 class TagsDirectory(Directory):
691 """A special directory that contains as subdirectories all tags visible to the user."""
693 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
694 super(TagsDirectory, self).__init__(parent_inode, inodes)
696 self.num_retries = num_retries
698 self._poll_time = poll_time
701 def want_event_subscribe(self):
706 with llfuse.lock_released:
707 tags = self.api.links().list(
708 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
709 select=['name'], distinct=True, limit=1000
710 ).execute(num_retries=self.num_retries)
712 self.merge(tags['items']+[{"name": n} for n in self._extra],
714 lambda a, i: a.tag == i['name'],
715 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
719 def __getitem__(self, item):
720 if super(TagsDirectory, self).__contains__(item):
721 return super(TagsDirectory, self).__getitem__(item)
722 with llfuse.lock_released:
723 tags = self.api.links().list(
724 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
725 ).execute(num_retries=self.num_retries)
727 self._extra.add(item)
729 return super(TagsDirectory, self).__getitem__(item)
733 def __contains__(self, k):
734 if super(TagsDirectory, self).__contains__(k):
744 class TagDirectory(Directory):
745 """A special directory that contains as subdirectories all collections visible
746 to the user that are tagged with a particular tag.
749 def __init__(self, parent_inode, inodes, api, num_retries, tag,
750 poll=False, poll_time=60):
751 super(TagDirectory, self).__init__(parent_inode, inodes)
753 self.num_retries = num_retries
756 self._poll_time = poll_time
758 def want_event_subscribe(self):
763 with llfuse.lock_released:
764 taggedcollections = self.api.links().list(
765 filters=[['link_class', '=', 'tag'],
766 ['name', '=', self.tag],
767 ['head_uuid', 'is_a', 'arvados#collection']],
769 ).execute(num_retries=self.num_retries)
770 self.merge(taggedcollections['items'],
771 lambda i: i['head_uuid'],
772 lambda a, i: a.collection_locator == i['head_uuid'],
773 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
776 class ProjectDirectory(Directory):
777 """A special directory that contains the contents of a project."""
779 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
780 poll=False, poll_time=60):
781 super(ProjectDirectory, self).__init__(parent_inode, inodes)
783 self.num_retries = num_retries
784 self.project_object = project_object
785 self.project_object_file = None
786 self.project_uuid = project_object['uuid']
788 self._poll_time = poll_time
789 self._updating_lock = threading.Lock()
790 self._current_user = None
791 self._full_listing = False
793 def want_event_subscribe(self):
796 def createDirectory(self, i):
797 if collection_uuid_pattern.match(i['uuid']):
798 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
799 elif group_uuid_pattern.match(i['uuid']):
800 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
801 elif link_uuid_pattern.match(i['uuid']):
802 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
803 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
806 elif uuid_pattern.match(i['uuid']):
807 return ObjectFile(self.parent_inode, i)
812 return self.project_uuid
815 self._full_listing = True
816 return super(ProjectDirectory, self).items()
820 if i['name'] is None or len(i['name']) == 0:
822 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
823 # collection or subproject
825 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
828 elif 'kind' in i and i['kind'].startswith('arvados#'):
830 return "{}.{}".format(i['name'], i['kind'][8:])
837 if self.project_object_file == None:
838 self.project_object_file = ObjectFile(self.inode, self.project_object)
839 self.inodes.add_entry(self.project_object_file)
841 if not self._full_listing:
845 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
846 return a.uuid() == i['uuid']
847 elif isinstance(a, ObjectFile):
848 return a.uuid() == i['uuid'] and not a.stale()
852 with llfuse.lock_released:
853 self._updating_lock.acquire()
857 if group_uuid_pattern.match(self.project_uuid):
858 self.project_object = self.api.groups().get(
859 uuid=self.project_uuid).execute(num_retries=self.num_retries)
860 elif user_uuid_pattern.match(self.project_uuid):
861 self.project_object = self.api.users().get(
862 uuid=self.project_uuid).execute(num_retries=self.num_retries)
864 contents = arvados.util.list_all(self.api.groups().list,
866 filters=[["owner_uuid", "=", self.project_uuid],
867 ["group_class", "=", "project"]])
868 contents.extend(arvados.util.list_all(self.api.collections().list,
870 filters=[["owner_uuid", "=", self.project_uuid]]))
872 # end with llfuse.lock_released, re-acquire lock
877 self.createDirectory)
880 self._updating_lock.release()
882 def _add_entry(self, i, name):
883 ent = self.createDirectory(i)
884 self._entries[name] = self.inodes.add_entry(ent)
885 return self._entries[name]
889 def __getitem__(self, k):
890 if k == '.arvados#project':
891 return self.project_object_file
892 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
893 return super(ProjectDirectory, self).__getitem__(k)
894 with llfuse.lock_released:
895 contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
896 ["group_class", "=", "project"],
898 limit=1).execute(num_retries=self.num_retries)["items"]
900 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
902 limit=1).execute(num_retries=self.num_retries)["items"]
904 name = sanitize_filename(self.namefn(contents[0]))
907 return self._add_entry(contents[0], name)
912 def __contains__(self, k):
913 if k == '.arvados#project':
925 with llfuse.lock_released:
926 if not self._current_user:
927 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
928 return self._current_user["uuid"] in self.project_object["writable_by"]
935 def mkdir(self, name):
937 with llfuse.lock_released:
938 self.api.collections().create(body={"owner_uuid": self.project_uuid,
940 "manifest_text": ""}).execute(num_retries=self.num_retries)
942 except apiclient_errors.Error as error:
944 raise llfuse.FUSEError(errno.EEXIST)
948 def rmdir(self, name):
950 raise llfuse.FUSEError(errno.ENOENT)
951 if not isinstance(self[name], CollectionDirectory):
952 raise llfuse.FUSEError(errno.EPERM)
953 if len(self[name]) > 0:
954 raise llfuse.FUSEError(errno.ENOTEMPTY)
955 with llfuse.lock_released:
956 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
961 def rename(self, name_old, name_new, src):
962 if not isinstance(src, ProjectDirectory):
963 raise llfuse.FUSEError(errno.EPERM)
967 if not isinstance(ent, CollectionDirectory):
968 raise llfuse.FUSEError(errno.EPERM)
971 # POSIX semantics for replacing one directory with another is
972 # tricky (the target directory must be empty, the operation must be
973 # atomic which isn't possible with the Arvados API as of this
974 # writing) so don't support that.
975 raise llfuse.FUSEError(errno.EPERM)
977 self.api.collections().update(uuid=ent.uuid(),
978 body={"owner_uuid": self.uuid(),
979 "name": name_new}).execute(num_retries=self.num_retries)
981 # Acually move the entry from source directory to this directory.
982 del src._entries[name_old]
983 self._entries[name_new] = ent
984 self.inodes.invalidate_entry(src, name_old)
987 def child_event(self, ev):
988 properties = ev.get("properties") or {}
989 old_attrs = properties.get("old_attributes") or {}
990 new_attrs = properties.get("new_attributes") or {}
991 old_attrs["uuid"] = ev["object_uuid"]
992 new_attrs["uuid"] = ev["object_uuid"]
993 old_name = sanitize_filename(self.namefn(old_attrs))
994 new_name = sanitize_filename(self.namefn(new_attrs))
996 # create events will have a new name, but not an old name
997 # delete events will have an old name, but not a new name
998 # update events will have an old and new name, and they may be same or different
999 # if they are the same, an unrelated field changed and there is nothing to do.
1001 if old_attrs.get("owner_uuid") != self.project_uuid:
1002 # Was moved from somewhere else, so don't try to remove entry.
1004 if ev.get("object_owner_uuid") != self.project_uuid:
1005 # Was moved to somewhere else, so don't try to add entry
1008 if old_attrs.get("is_trashed"):
1009 # Was previously deleted
1011 if new_attrs.get("is_trashed"):
1015 if new_name != old_name:
1017 if old_name in self._entries:
1018 ent = self._entries[old_name]
1019 del self._entries[old_name]
1020 self.inodes.invalidate_entry(self, old_name)
1024 self._entries[new_name] = ent
1026 self._add_entry(new_attrs, new_name)
1027 elif ent is not None:
1028 self.inodes.del_entry(ent)
1031 class SharedDirectory(Directory):
1032 """A special directory that represents users or groups who have shared projects with me."""
1034 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1035 poll=False, poll_time=60):
1036 super(SharedDirectory, self).__init__(parent_inode, inodes)
1038 self.num_retries = num_retries
1039 self.current_user = api.users().current().execute(num_retries=num_retries)
1041 self._poll_time = poll_time
1042 self._updating_lock = threading.Lock()
1047 with llfuse.lock_released:
1048 self._updating_lock.acquire()
1049 if not self.stale():
1052 all_projects = arvados.util.list_all(
1053 self.api.groups().list, self.num_retries,
1054 filters=[['group_class','=','project']],
1055 select=["uuid", "owner_uuid"])
1057 for ob in all_projects:
1058 objects[ob['uuid']] = ob
1062 current_uuid = self.current_user['uuid']
1063 for ob in all_projects:
1064 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1065 roots.append(ob['uuid'])
1066 root_owners.add(ob['owner_uuid'])
1068 lusers = arvados.util.list_all(
1069 self.api.users().list, self.num_retries,
1070 filters=[['uuid','in', list(root_owners)]])
1071 lgroups = arvados.util.list_all(
1072 self.api.groups().list, self.num_retries,
1073 filters=[['uuid','in', list(root_owners)+roots]])
1076 objects[l["uuid"]] = l
1078 objects[l["uuid"]] = l
1081 for r in root_owners:
1085 contents[obr["name"]] = obr
1086 #elif obr.get("username"):
1087 # contents[obr["username"]] = obr
1088 elif "first_name" in obr:
1089 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1094 if obr['owner_uuid'] not in objects:
1095 contents[obr["name"]] = obr
1097 # end with llfuse.lock_released, re-acquire lock
1099 self.merge(contents.items(),
1101 lambda a, i: a.uuid() == i[1]['uuid'],
1102 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
1104 _logger.exception("arv-mount shared dir error")
1106 self._updating_lock.release()
1108 def want_event_subscribe(self):