1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future.utils import viewitems
8 from builtins import dict
17 from apiclient import errors as apiclient_errors
21 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
22 from .fresh import FreshBase, convertTime, use_counter, check_update
24 import arvados.collection
25 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
27 _logger = logging.getLogger('arvados.arvados_fuse')
30 # Match any character which FUSE or Linux cannot accommodate as part
31 # of a filename. (If present in a collection filename, they will
32 # appear as underscores in the fuse mount.)
33 _disallowed_filename_characters = re.compile('[\x00/]')
35 # '.' and '..' are not reachable if API server is newer than #6277
36 def sanitize_filename(dirty):
37 """Replace disallowed filename characters with harmless "_"."""
47 return _disallowed_filename_characters.sub('_', dirty)
50 class Directory(FreshBase):
51 """Generic directory object, backed by a dict.
53 Consists of a set of entries with the key representing the filename
54 and the value referencing a File or Directory object.
57 def __init__(self, parent_inode, inodes):
58 """parent_inode is the integer inode number"""
60 super(Directory, self).__init__()
63 if not isinstance(parent_inode, int):
64 raise Exception("parent_inode should be an int")
65 self.parent_inode = parent_inode
68 self._mtime = time.time()
70 # Overriden by subclasses to implement logic to update the entries dict
71 # when the directory is stale
76 # Only used when computing the size of the disk footprint of the directory
84 def checkupdate(self):
88 except apiclient.errors.HttpError as e:
93 def __getitem__(self, item):
94 return self._entries[item]
99 return list(self._entries.items())
103 def __contains__(self, k):
104 return k in self._entries
109 return len(self._entries)
112 self.inodes.touch(self)
113 super(Directory, self).fresh()
115 def merge(self, items, fn, same, new_entry):
116 """Helper method for updating the contents of the directory.
118 Takes a list describing the new contents of the directory, reuse
119 entries that are the same in both the old and new lists, create new
120 entries, and delete old entries missing from the new list.
122 :items: iterable with new directory contents
124 :fn: function to take an entry in 'items' and return the desired file or
125 directory name, or None if this entry should be skipped
127 :same: function to compare an existing entry (a File or Directory
128 object) with an entry in the items list to determine whether to keep
131 :new_entry: function to create a new directory entry (File or Directory
132 object) from an entry in the items list.
136 oldentries = self._entries
140 name = sanitize_filename(fn(i))
142 if name in oldentries and same(oldentries[name], i):
143 # move existing directory entry over
144 self._entries[name] = oldentries[name]
147 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
148 # create new directory entry
151 self._entries[name] = self.inodes.add_entry(ent)
154 # delete any other directory entries that were not in found in 'items'
156 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
157 self.inodes.invalidate_entry(self, i)
158 self.inodes.del_entry(oldentries[i])
162 self.inodes.invalidate_inode(self)
163 self._mtime = time.time()
168 if super(Directory, self).in_use():
170 for v in self._entries.values():
175 def has_ref(self, only_children):
176 if super(Directory, self).has_ref(only_children):
178 for v in self._entries.values():
184 """Delete all entries"""
185 oldentries = self._entries
188 oldentries[n].clear()
189 self.inodes.del_entry(oldentries[n])
192 def kernel_invalidate(self):
193 # Invalidating the dentry on the parent implies invalidating all paths
195 parent = self.inodes[self.parent_inode]
197 # Find self on the parent in order to invalidate this path.
198 # Calling the public items() method might trigger a refresh,
199 # which we definitely don't want, so read the internal dict directly.
200 for k,v in viewitems(parent._entries):
202 self.inodes.invalidate_entry(parent, k)
214 def want_event_subscribe(self):
215 raise NotImplementedError()
217 def create(self, name):
218 raise NotImplementedError()
220 def mkdir(self, name):
221 raise NotImplementedError()
223 def unlink(self, name):
224 raise NotImplementedError()
226 def rmdir(self, name):
227 raise NotImplementedError()
229 def rename(self, name_old, name_new, src):
230 raise NotImplementedError()
233 class CollectionDirectoryBase(Directory):
234 """Represent an Arvados Collection as a directory.
236 This class is used for Subcollections, and is also the base class for
237 CollectionDirectory, which implements collection loading/saving on
240 Most operations act only the underlying Arvados `Collection` object. The
241 `Collection` object signals via a notify callback to
242 `CollectionDirectoryBase.on_event` that an item was added, removed or
243 modified. FUSE inodes and directory entries are created, deleted or
244 invalidated in response to these events.
248 def __init__(self, parent_inode, inodes, collection):
249 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
250 self.collection = collection
252 def new_entry(self, name, item, mtime):
253 name = sanitize_filename(name)
254 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
255 if item.fuse_entry.dead is not True:
256 raise Exception("Can only reparent dead inode entry")
257 if item.fuse_entry.inode is None:
258 raise Exception("Reparented entry must still have valid inode")
259 item.fuse_entry.dead = False
260 self._entries[name] = item.fuse_entry
261 elif isinstance(item, arvados.collection.RichCollectionBase):
262 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
263 self._entries[name].populate(mtime)
265 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
266 item.fuse_entry = self._entries[name]
268 def on_event(self, event, collection, name, item):
269 if collection == self.collection:
270 name = sanitize_filename(name)
271 _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
273 if event == arvados.collection.ADD:
274 self.new_entry(name, item, self.mtime())
275 elif event == arvados.collection.DEL:
276 ent = self._entries[name]
277 del self._entries[name]
278 self.inodes.invalidate_entry(self, name)
279 self.inodes.del_entry(ent)
280 elif event == arvados.collection.MOD:
281 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
282 self.inodes.invalidate_inode(item.fuse_entry)
283 elif name in self._entries:
284 self.inodes.invalidate_inode(self._entries[name])
286 def populate(self, mtime):
288 self.collection.subscribe(self.on_event)
289 for entry, item in viewitems(self.collection):
290 self.new_entry(entry, item, self.mtime())
293 return self.collection.writable()
297 with llfuse.lock_released:
298 self.collection.root_collection().save()
302 def create(self, name):
303 with llfuse.lock_released:
304 self.collection.open(name, "w").close()
308 def mkdir(self, name):
309 with llfuse.lock_released:
310 self.collection.mkdirs(name)
314 def unlink(self, name):
315 with llfuse.lock_released:
316 self.collection.remove(name)
321 def rmdir(self, name):
322 with llfuse.lock_released:
323 self.collection.remove(name)
328 def rename(self, name_old, name_new, src):
329 if not isinstance(src, CollectionDirectoryBase):
330 raise llfuse.FUSEError(errno.EPERM)
335 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
337 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
339 raise llfuse.FUSEError(errno.ENOTEMPTY)
340 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
341 raise llfuse.FUSEError(errno.ENOTDIR)
342 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
343 raise llfuse.FUSEError(errno.EISDIR)
345 with llfuse.lock_released:
346 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
351 super(CollectionDirectoryBase, self).clear()
352 self.collection = None
355 class CollectionDirectory(CollectionDirectoryBase):
356 """Represents the root of a directory tree representing a collection."""
358 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
359 super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
361 self.num_retries = num_retries
362 self.collection_record_file = None
363 self.collection_record = None
366 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
368 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
369 self._poll_time = 60*60
371 if isinstance(collection_record, dict):
372 self.collection_locator = collection_record['uuid']
373 self._mtime = convertTime(collection_record.get('modified_at'))
375 self.collection_locator = collection_record
377 self._manifest_size = 0
378 if self.collection_locator:
379 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
380 self._updating_lock = threading.Lock()
383 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
386 return self.collection.writable() if self.collection is not None else self._writable
388 def want_event_subscribe(self):
389 return (uuid_pattern.match(self.collection_locator) is not None)
391 # Used by arv-web.py to switch the contents of the CollectionDirectory
392 def change_collection(self, new_locator):
393 """Switch the contents of the CollectionDirectory.
395 Must be called with llfuse.lock held.
398 self.collection_locator = new_locator
399 self.collection_record = None
402 def new_collection(self, new_collection_record, coll_reader):
406 self.collection_record = new_collection_record
408 if self.collection_record:
409 self._mtime = convertTime(self.collection_record.get('modified_at'))
410 self.collection_locator = self.collection_record["uuid"]
411 if self.collection_record_file is not None:
412 self.collection_record_file.update(self.collection_record)
414 self.collection = coll_reader
415 self.populate(self.mtime())
418 return self.collection_locator
421 def update(self, to_record_version=None):
423 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
426 if self.collection_locator is None:
431 with llfuse.lock_released:
432 self._updating_lock.acquire()
436 _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
437 if self.collection is not None:
438 if self.collection.known_past_version(to_record_version):
439 _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
441 self.collection.update()
443 if uuid_pattern.match(self.collection_locator):
444 coll_reader = arvados.collection.Collection(
445 self.collection_locator, self.api, self.api.keep,
446 num_retries=self.num_retries)
448 coll_reader = arvados.collection.CollectionReader(
449 self.collection_locator, self.api, self.api.keep,
450 num_retries=self.num_retries)
451 new_collection_record = coll_reader.api_response() or {}
452 # If the Collection only exists in Keep, there will be no API
453 # response. Fill in the fields we need.
454 if 'uuid' not in new_collection_record:
455 new_collection_record['uuid'] = self.collection_locator
456 if "portable_data_hash" not in new_collection_record:
457 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
458 if 'manifest_text' not in new_collection_record:
459 new_collection_record['manifest_text'] = coll_reader.manifest_text()
461 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
462 self.new_collection(new_collection_record, coll_reader)
464 self._manifest_size = len(coll_reader.manifest_text())
465 _logger.debug("%s manifest_size %i", self, self._manifest_size)
466 # end with llfuse.lock_released, re-acquire lock
471 self._updating_lock.release()
472 except arvados.errors.NotFoundError as e:
473 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
474 except arvados.errors.ArgumentError as detail:
475 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
476 if self.collection_record is not None and "manifest_text" in self.collection_record:
477 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
479 _logger.exception("arv-mount %s: error", self.collection_locator)
480 if self.collection_record is not None and "manifest_text" in self.collection_record:
481 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
487 def __getitem__(self, item):
488 if item == '.arvados#collection':
489 if self.collection_record_file is None:
490 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
491 self.inodes.add_entry(self.collection_record_file)
492 return self.collection_record_file
494 return super(CollectionDirectory, self).__getitem__(item)
496 def __contains__(self, k):
497 if k == '.arvados#collection':
500 return super(CollectionDirectory, self).__contains__(k)
502 def invalidate(self):
503 self.collection_record = None
504 self.collection_record_file = None
505 super(CollectionDirectory, self).invalidate()
508 return (self.collection_locator is not None)
511 # This is an empirically-derived heuristic to estimate the memory used
512 # to store this collection's metadata. Calculating the memory
513 # footprint directly would be more accurate, but also more complicated.
514 return self._manifest_size * 128
517 if self.collection is not None:
519 self.collection.save()
520 self.collection.stop_threads()
523 if self.collection is not None:
524 self.collection.stop_threads()
525 super(CollectionDirectory, self).clear()
526 self._manifest_size = 0
529 class TmpCollectionDirectory(CollectionDirectoryBase):
530 """A directory backed by an Arvados collection that never gets saved.
532 This supports using Keep as scratch space. A userspace program can
533 read the .arvados#collection file to get a current manifest in
534 order to save a snapshot of the scratch data or use it as a crunch
538 class UnsaveableCollection(arvados.collection.Collection):
544 def __init__(self, parent_inode, inodes, api_client, num_retries):
545 collection = self.UnsaveableCollection(
546 api_client=api_client,
547 keep_client=api_client.keep,
548 num_retries=num_retries)
549 super(TmpCollectionDirectory, self).__init__(
550 parent_inode, inodes, collection)
551 self.collection_record_file = None
552 self.populate(self.mtime())
554 def on_event(self, *args, **kwargs):
555 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
556 if self.collection_record_file:
558 self.collection_record_file.invalidate()
559 self.inodes.invalidate_inode(self.collection_record_file)
560 _logger.debug("%s invalidated collection record", self)
562 def collection_record(self):
563 with llfuse.lock_released:
566 "manifest_text": self.collection.manifest_text(),
567 "portable_data_hash": self.collection.portable_data_hash(),
570 def __contains__(self, k):
571 return (k == '.arvados#collection' or
572 super(TmpCollectionDirectory, self).__contains__(k))
575 def __getitem__(self, item):
576 if item == '.arvados#collection':
577 if self.collection_record_file is None:
578 self.collection_record_file = FuncToJSONFile(
579 self.inode, self.collection_record)
580 self.inodes.add_entry(self.collection_record_file)
581 return self.collection_record_file
582 return super(TmpCollectionDirectory, self).__getitem__(item)
590 def want_event_subscribe(self):
594 self.collection.stop_threads()
596 def invalidate(self):
597 if self.collection_record_file:
598 self.collection_record_file.invalidate()
599 super(TmpCollectionDirectory, self).invalidate()
602 class MagicDirectory(Directory):
603 """A special directory that logically contains the set of all extant keep locators.
605 When a file is referenced by lookup(), it is tested to see if it is a valid
606 keep locator to a manifest, and if so, loads the manifest contents as a
607 subdirectory of this directory with the locator as the directory name.
608 Since querying a list of all extant keep locators is impractical, only
609 collections that have already been accessed are visible to readdir().
614 This directory provides access to Arvados collections as subdirectories listed
615 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
616 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
617 (in the form 'zzzzz-j7d0g-1234567890abcde').
619 Note that this directory will appear empty until you attempt to access a
620 specific collection or project subdirectory (such as trying to 'cd' into it),
621 at which point the collection or project will actually be looked up on the server
622 and the directory will appear if it exists.
626 def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
627 super(MagicDirectory, self).__init__(parent_inode, inodes)
629 self.num_retries = num_retries
630 self.pdh_only = pdh_only
632 def __setattr__(self, name, value):
633 super(MagicDirectory, self).__setattr__(name, value)
634 # When we're assigned an inode, add a README.
635 if ((name == 'inode') and (self.inode is not None) and
636 (not self._entries)):
637 self._entries['README'] = self.inodes.add_entry(
638 StringFile(self.inode, self.README_TEXT, time.time()))
639 # If we're the root directory, add an identical by_id subdirectory.
640 if self.inode == llfuse.ROOT_INODE:
641 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
642 self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
644 def __contains__(self, k):
645 if k in self._entries:
648 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
654 if group_uuid_pattern.match(k):
655 project = self.api.groups().list(
656 filters=[['group_class', '=', 'project'], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
657 if project[u'items_available'] == 0:
659 e = self.inodes.add_entry(ProjectDirectory(
660 self.inode, self.inodes, self.api, self.num_retries, project[u'items'][0]))
662 e = self.inodes.add_entry(CollectionDirectory(
663 self.inode, self.inodes, self.api, self.num_retries, k))
666 if k not in self._entries:
669 self.inodes.del_entry(e)
672 self.inodes.invalidate_entry(self, k)
673 self.inodes.del_entry(e)
675 except Exception as ex:
676 _logger.exception("arv-mount lookup '%s':", k)
678 self.inodes.del_entry(e)
681 def __getitem__(self, item):
683 return self._entries[item]
685 raise KeyError("No collection with id " + item)
690 def want_event_subscribe(self):
691 return not self.pdh_only
694 class TagsDirectory(Directory):
695 """A special directory that contains as subdirectories all tags visible to the user."""
697 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
698 super(TagsDirectory, self).__init__(parent_inode, inodes)
700 self.num_retries = num_retries
702 self._poll_time = poll_time
705 def want_event_subscribe(self):
710 with llfuse.lock_released:
711 tags = self.api.links().list(
712 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
713 select=['name'], distinct=True, limit=1000
714 ).execute(num_retries=self.num_retries)
716 self.merge(tags['items']+[{"name": n} for n in self._extra],
718 lambda a, i: a.tag == i['name'],
719 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
723 def __getitem__(self, item):
724 if super(TagsDirectory, self).__contains__(item):
725 return super(TagsDirectory, self).__getitem__(item)
726 with llfuse.lock_released:
727 tags = self.api.links().list(
728 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
729 ).execute(num_retries=self.num_retries)
731 self._extra.add(item)
733 return super(TagsDirectory, self).__getitem__(item)
737 def __contains__(self, k):
738 if super(TagsDirectory, self).__contains__(k):
748 class TagDirectory(Directory):
749 """A special directory that contains as subdirectories all collections visible
750 to the user that are tagged with a particular tag.
753 def __init__(self, parent_inode, inodes, api, num_retries, tag,
754 poll=False, poll_time=60):
755 super(TagDirectory, self).__init__(parent_inode, inodes)
757 self.num_retries = num_retries
760 self._poll_time = poll_time
762 def want_event_subscribe(self):
767 with llfuse.lock_released:
768 taggedcollections = self.api.links().list(
769 filters=[['link_class', '=', 'tag'],
770 ['name', '=', self.tag],
771 ['head_uuid', 'is_a', 'arvados#collection']],
773 ).execute(num_retries=self.num_retries)
774 self.merge(taggedcollections['items'],
775 lambda i: i['head_uuid'],
776 lambda a, i: a.collection_locator == i['head_uuid'],
777 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
780 class ProjectDirectory(Directory):
781 """A special directory that contains the contents of a project."""
783 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
784 poll=False, poll_time=60):
785 super(ProjectDirectory, self).__init__(parent_inode, inodes)
787 self.num_retries = num_retries
788 self.project_object = project_object
789 self.project_object_file = None
790 self.project_uuid = project_object['uuid']
792 self._poll_time = poll_time
793 self._updating_lock = threading.Lock()
794 self._current_user = None
795 self._full_listing = False
797 def want_event_subscribe(self):
800 def createDirectory(self, i):
801 if collection_uuid_pattern.match(i['uuid']):
802 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
803 elif group_uuid_pattern.match(i['uuid']):
804 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
805 elif link_uuid_pattern.match(i['uuid']):
806 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
807 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
810 elif uuid_pattern.match(i['uuid']):
811 return ObjectFile(self.parent_inode, i)
816 return self.project_uuid
819 self._full_listing = True
820 return super(ProjectDirectory, self).items()
824 if i['name'] is None or len(i['name']) == 0:
826 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
827 # collection or subproject
829 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
832 elif 'kind' in i and i['kind'].startswith('arvados#'):
834 return "{}.{}".format(i['name'], i['kind'][8:])
841 if self.project_object_file == None:
842 self.project_object_file = ObjectFile(self.inode, self.project_object)
843 self.inodes.add_entry(self.project_object_file)
845 if not self._full_listing:
849 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
850 return a.uuid() == i['uuid']
851 elif isinstance(a, ObjectFile):
852 return a.uuid() == i['uuid'] and not a.stale()
856 with llfuse.lock_released:
857 self._updating_lock.acquire()
861 if group_uuid_pattern.match(self.project_uuid):
862 self.project_object = self.api.groups().get(
863 uuid=self.project_uuid).execute(num_retries=self.num_retries)
864 elif user_uuid_pattern.match(self.project_uuid):
865 self.project_object = self.api.users().get(
866 uuid=self.project_uuid).execute(num_retries=self.num_retries)
868 contents = arvados.util.list_all(self.api.groups().list,
870 filters=[["owner_uuid", "=", self.project_uuid],
871 ["group_class", "=", "project"]])
872 contents.extend(arvados.util.list_all(self.api.collections().list,
874 filters=[["owner_uuid", "=", self.project_uuid]]))
876 # end with llfuse.lock_released, re-acquire lock
881 self.createDirectory)
884 self._updating_lock.release()
886 def _add_entry(self, i, name):
887 ent = self.createDirectory(i)
888 self._entries[name] = self.inodes.add_entry(ent)
889 return self._entries[name]
893 def __getitem__(self, k):
894 if k == '.arvados#project':
895 return self.project_object_file
896 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
897 return super(ProjectDirectory, self).__getitem__(k)
898 with llfuse.lock_released:
899 contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
900 ["group_class", "=", "project"],
902 limit=1).execute(num_retries=self.num_retries)["items"]
904 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
906 limit=1).execute(num_retries=self.num_retries)["items"]
908 name = sanitize_filename(self.namefn(contents[0]))
911 return self._add_entry(contents[0], name)
916 def __contains__(self, k):
917 if k == '.arvados#project':
929 with llfuse.lock_released:
930 if not self._current_user:
931 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
932 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
939 def mkdir(self, name):
941 with llfuse.lock_released:
942 self.api.collections().create(body={"owner_uuid": self.project_uuid,
944 "manifest_text": ""}).execute(num_retries=self.num_retries)
946 except apiclient_errors.Error as error:
948 raise llfuse.FUSEError(errno.EEXIST)
952 def rmdir(self, name):
954 raise llfuse.FUSEError(errno.ENOENT)
955 if not isinstance(self[name], CollectionDirectory):
956 raise llfuse.FUSEError(errno.EPERM)
957 if len(self[name]) > 0:
958 raise llfuse.FUSEError(errno.ENOTEMPTY)
959 with llfuse.lock_released:
960 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
965 def rename(self, name_old, name_new, src):
966 if not isinstance(src, ProjectDirectory):
967 raise llfuse.FUSEError(errno.EPERM)
971 if not isinstance(ent, CollectionDirectory):
972 raise llfuse.FUSEError(errno.EPERM)
975 # POSIX semantics for replacing one directory with another is
976 # tricky (the target directory must be empty, the operation must be
977 # atomic which isn't possible with the Arvados API as of this
978 # writing) so don't support that.
979 raise llfuse.FUSEError(errno.EPERM)
981 self.api.collections().update(uuid=ent.uuid(),
982 body={"owner_uuid": self.uuid(),
983 "name": name_new}).execute(num_retries=self.num_retries)
985 # Acually move the entry from source directory to this directory.
986 del src._entries[name_old]
987 self._entries[name_new] = ent
988 self.inodes.invalidate_entry(src, name_old)
991 def child_event(self, ev):
992 properties = ev.get("properties") or {}
993 old_attrs = properties.get("old_attributes") or {}
994 new_attrs = properties.get("new_attributes") or {}
995 old_attrs["uuid"] = ev["object_uuid"]
996 new_attrs["uuid"] = ev["object_uuid"]
997 old_name = sanitize_filename(self.namefn(old_attrs))
998 new_name = sanitize_filename(self.namefn(new_attrs))
1000 # create events will have a new name, but not an old name
1001 # delete events will have an old name, but not a new name
1002 # update events will have an old and new name, and they may be same or different
1003 # if they are the same, an unrelated field changed and there is nothing to do.
1005 if old_attrs.get("owner_uuid") != self.project_uuid:
1006 # Was moved from somewhere else, so don't try to remove entry.
1008 if ev.get("object_owner_uuid") != self.project_uuid:
1009 # Was moved to somewhere else, so don't try to add entry
1012 if old_attrs.get("is_trashed"):
1013 # Was previously deleted
1015 if new_attrs.get("is_trashed"):
1019 if new_name != old_name:
1021 if old_name in self._entries:
1022 ent = self._entries[old_name]
1023 del self._entries[old_name]
1024 self.inodes.invalidate_entry(self, old_name)
1028 self._entries[new_name] = ent
1030 self._add_entry(new_attrs, new_name)
1031 elif ent is not None:
1032 self.inodes.del_entry(ent)
1035 class SharedDirectory(Directory):
1036 """A special directory that represents users or groups who have shared projects with me."""
1038 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1039 poll=False, poll_time=60):
1040 super(SharedDirectory, self).__init__(parent_inode, inodes)
1042 self.num_retries = num_retries
1043 self.current_user = api.users().current().execute(num_retries=num_retries)
1045 self._poll_time = poll_time
1046 self._updating_lock = threading.Lock()
1051 with llfuse.lock_released:
1052 self._updating_lock.acquire()
1053 if not self.stale():
1061 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1062 if 'httpMethod' in methods.get('shared', {}):
1065 resp = self.api.groups().shared(filters=[['group_class', '=', 'project']]+page,
1069 include="owner_uuid").execute()
1070 if not resp["items"]:
1072 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1073 for r in resp["items"]:
1074 objects[r["uuid"]] = r
1075 roots.append(r["uuid"])
1076 for r in resp["included"]:
1077 objects[r["uuid"]] = r
1078 root_owners.add(r["uuid"])
1080 all_projects = arvados.util.list_all(
1081 self.api.groups().list, self.num_retries,
1082 filters=[['group_class','=','project']],
1083 select=["uuid", "owner_uuid"])
1084 for ob in all_projects:
1085 objects[ob['uuid']] = ob
1087 current_uuid = self.current_user['uuid']
1088 for ob in all_projects:
1089 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1090 roots.append(ob['uuid'])
1091 root_owners.add(ob['owner_uuid'])
1093 lusers = arvados.util.list_all(
1094 self.api.users().list, self.num_retries,
1095 filters=[['uuid','in', list(root_owners)]])
1096 lgroups = arvados.util.list_all(
1097 self.api.groups().list, self.num_retries,
1098 filters=[['uuid','in', list(root_owners)+roots]])
1101 objects[l["uuid"]] = l
1103 objects[l["uuid"]] = l
1105 for r in root_owners:
1109 contents[obr["name"]] = obr
1110 #elif obr.get("username"):
1111 # contents[obr["username"]] = obr
1112 elif "first_name" in obr:
1113 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1118 if obr['owner_uuid'] not in objects:
1119 contents[obr["name"]] = obr
1121 # end with llfuse.lock_released, re-acquire lock
1123 self.merge(viewitems(contents),
1125 lambda a, i: a.uuid() == i[1]['uuid'],
1126 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
1128 _logger.exception("arv-mount shared dir error")
1130 self._updating_lock.release()
1132 def want_event_subscribe(self):