1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future.utils import viewitems
8 from future.utils import itervalues
9 from builtins import dict
18 from apiclient import errors as apiclient_errors
22 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
23 from .fresh import FreshBase, convertTime, use_counter, check_update
25 import arvados.collection
26 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
28 _logger = logging.getLogger('arvados.arvados_fuse')
31 # Match any character which FUSE or Linux cannot accommodate as part
32 # of a filename. (If present in a collection filename, they will
33 # appear as underscores in the fuse mount.)
34 _disallowed_filename_characters = re.compile('[\x00/]')
37 class Directory(FreshBase):
38 """Generic directory object, backed by a dict.
40 Consists of a set of entries with the key representing the filename
41 and the value referencing a File or Directory object.
44 def __init__(self, parent_inode, inodes, apiconfig):
45 """parent_inode is the integer inode number"""
47 super(Directory, self).__init__()
50 if not isinstance(parent_inode, int):
51 raise Exception("parent_inode should be an int")
52 self.parent_inode = parent_inode
54 self.apiconfig = apiconfig
56 self._mtime = time.time()
58 def unsanitize_filename(self, incoming):
59 """Replace ForwardSlashNameSubstitution value with /"""
60 fsns = self.apiconfig()["Collections"]["ForwardSlashNameSubstitution"]
61 if isinstance(fsns, str) and fsns != '' and fsns != '/':
62 return incoming.replace(fsns, '/')
66 def sanitize_filename(self, dirty):
67 """Replace disallowed filename characters according to
68 ForwardSlashNameSubstitution in self.api_config."""
69 # '.' and '..' are not reachable if API server is newer than #6277
79 fsns = self.apiconfig()["Collections"]["ForwardSlashNameSubstitution"]
80 if isinstance(fsns, str) and fsns != '':
81 dirty = dirty.replace('/', fsns)
82 return _disallowed_filename_characters.sub('_', dirty)
85 # Overridden by subclasses to implement logic to update the
86 # entries dict when the directory is stale
91 # Only used when computing the size of the disk footprint of the directory
99 def checkupdate(self):
103 except apiclient.errors.HttpError as e:
108 def __getitem__(self, item):
109 return self._entries[item]
114 return list(self._entries.items())
118 def __contains__(self, k):
119 return k in self._entries
124 return len(self._entries)
127 self.inodes.touch(self)
128 super(Directory, self).fresh()
130 def merge(self, items, fn, same, new_entry):
131 """Helper method for updating the contents of the directory.
133 Takes a list describing the new contents of the directory, reuse
134 entries that are the same in both the old and new lists, create new
135 entries, and delete old entries missing from the new list.
137 :items: iterable with new directory contents
139 :fn: function to take an entry in 'items' and return the desired file or
140 directory name, or None if this entry should be skipped
142 :same: function to compare an existing entry (a File or Directory
143 object) with an entry in the items list to determine whether to keep
146 :new_entry: function to create a new directory entry (File or Directory
147 object) from an entry in the items list.
151 oldentries = self._entries
155 name = self.sanitize_filename(fn(i))
157 if name in oldentries and same(oldentries[name], i):
158 # move existing directory entry over
159 self._entries[name] = oldentries[name]
162 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
163 # create new directory entry
166 self._entries[name] = self.inodes.add_entry(ent)
169 # delete any other directory entries that were not in found in 'items'
171 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
172 self.inodes.invalidate_entry(self, i)
173 self.inodes.del_entry(oldentries[i])
177 self.inodes.invalidate_inode(self)
178 self._mtime = time.time()
183 if super(Directory, self).in_use():
185 for v in itervalues(self._entries):
190 def has_ref(self, only_children):
191 if super(Directory, self).has_ref(only_children):
193 for v in itervalues(self._entries):
199 """Delete all entries"""
200 oldentries = self._entries
203 oldentries[n].clear()
204 self.inodes.del_entry(oldentries[n])
207 def kernel_invalidate(self):
208 # Invalidating the dentry on the parent implies invalidating all paths
210 parent = self.inodes[self.parent_inode]
212 # Find self on the parent in order to invalidate this path.
213 # Calling the public items() method might trigger a refresh,
214 # which we definitely don't want, so read the internal dict directly.
215 for k,v in viewitems(parent._entries):
217 self.inodes.invalidate_entry(parent, k)
229 def want_event_subscribe(self):
230 raise NotImplementedError()
232 def create(self, name):
233 raise NotImplementedError()
235 def mkdir(self, name):
236 raise NotImplementedError()
238 def unlink(self, name):
239 raise NotImplementedError()
241 def rmdir(self, name):
242 raise NotImplementedError()
244 def rename(self, name_old, name_new, src):
245 raise NotImplementedError()
248 class CollectionDirectoryBase(Directory):
249 """Represent an Arvados Collection as a directory.
251 This class is used for Subcollections, and is also the base class for
252 CollectionDirectory, which implements collection loading/saving on
255 Most operations act only the underlying Arvados `Collection` object. The
256 `Collection` object signals via a notify callback to
257 `CollectionDirectoryBase.on_event` that an item was added, removed or
258 modified. FUSE inodes and directory entries are created, deleted or
259 invalidated in response to these events.
263 def __init__(self, parent_inode, inodes, apiconfig, collection):
264 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig)
265 self.apiconfig = apiconfig
266 self.collection = collection
268 def new_entry(self, name, item, mtime):
269 name = self.sanitize_filename(name)
270 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
271 if item.fuse_entry.dead is not True:
272 raise Exception("Can only reparent dead inode entry")
273 if item.fuse_entry.inode is None:
274 raise Exception("Reparented entry must still have valid inode")
275 item.fuse_entry.dead = False
276 self._entries[name] = item.fuse_entry
277 elif isinstance(item, arvados.collection.RichCollectionBase):
278 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, item))
279 self._entries[name].populate(mtime)
281 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
282 item.fuse_entry = self._entries[name]
284 def on_event(self, event, collection, name, item):
285 if collection == self.collection:
286 name = self.sanitize_filename(name)
287 _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
289 if event == arvados.collection.ADD:
290 self.new_entry(name, item, self.mtime())
291 elif event == arvados.collection.DEL:
292 ent = self._entries[name]
293 del self._entries[name]
294 self.inodes.invalidate_entry(self, name)
295 self.inodes.del_entry(ent)
296 elif event == arvados.collection.MOD:
297 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
298 self.inodes.invalidate_inode(item.fuse_entry)
299 elif name in self._entries:
300 self.inodes.invalidate_inode(self._entries[name])
302 def populate(self, mtime):
304 self.collection.subscribe(self.on_event)
305 for entry, item in viewitems(self.collection):
306 self.new_entry(entry, item, self.mtime())
309 return self.collection.writable()
313 with llfuse.lock_released:
314 self.collection.root_collection().save()
318 def create(self, name):
319 with llfuse.lock_released:
320 self.collection.open(name, "w").close()
324 def mkdir(self, name):
325 with llfuse.lock_released:
326 self.collection.mkdirs(name)
330 def unlink(self, name):
331 with llfuse.lock_released:
332 self.collection.remove(name)
337 def rmdir(self, name):
338 with llfuse.lock_released:
339 self.collection.remove(name)
344 def rename(self, name_old, name_new, src):
345 if not isinstance(src, CollectionDirectoryBase):
346 raise llfuse.FUSEError(errno.EPERM)
351 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
353 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
355 raise llfuse.FUSEError(errno.ENOTEMPTY)
356 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
357 raise llfuse.FUSEError(errno.ENOTDIR)
358 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
359 raise llfuse.FUSEError(errno.EISDIR)
361 with llfuse.lock_released:
362 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
367 super(CollectionDirectoryBase, self).clear()
368 self.collection = None
371 class CollectionDirectory(CollectionDirectoryBase):
372 """Represents the root of a directory tree representing a collection."""
374 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
375 super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, None)
377 self.num_retries = num_retries
378 self.collection_record_file = None
379 self.collection_record = None
382 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
384 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
385 self._poll_time = 60*60
387 if isinstance(collection_record, dict):
388 self.collection_locator = collection_record['uuid']
389 self._mtime = convertTime(collection_record.get('modified_at'))
391 self.collection_locator = collection_record
393 self._manifest_size = 0
394 if self.collection_locator:
395 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
396 self._updating_lock = threading.Lock()
399 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
402 return self.collection.writable() if self.collection is not None else self._writable
404 def want_event_subscribe(self):
405 return (uuid_pattern.match(self.collection_locator) is not None)
407 # Used by arv-web.py to switch the contents of the CollectionDirectory
408 def change_collection(self, new_locator):
409 """Switch the contents of the CollectionDirectory.
411 Must be called with llfuse.lock held.
414 self.collection_locator = new_locator
415 self.collection_record = None
418 def new_collection(self, new_collection_record, coll_reader):
422 self.collection_record = new_collection_record
424 if self.collection_record:
425 self._mtime = convertTime(self.collection_record.get('modified_at'))
426 self.collection_locator = self.collection_record["uuid"]
427 if self.collection_record_file is not None:
428 self.collection_record_file.update(self.collection_record)
430 self.collection = coll_reader
431 self.populate(self.mtime())
434 return self.collection_locator
437 def update(self, to_record_version=None):
439 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
442 if self.collection_locator is None:
447 with llfuse.lock_released:
448 self._updating_lock.acquire()
452 _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
453 if self.collection is not None:
454 if self.collection.known_past_version(to_record_version):
455 _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
457 self.collection.update()
459 if uuid_pattern.match(self.collection_locator):
460 coll_reader = arvados.collection.Collection(
461 self.collection_locator, self.api, self.api.keep,
462 num_retries=self.num_retries)
464 coll_reader = arvados.collection.CollectionReader(
465 self.collection_locator, self.api, self.api.keep,
466 num_retries=self.num_retries)
467 new_collection_record = coll_reader.api_response() or {}
468 # If the Collection only exists in Keep, there will be no API
469 # response. Fill in the fields we need.
470 if 'uuid' not in new_collection_record:
471 new_collection_record['uuid'] = self.collection_locator
472 if "portable_data_hash" not in new_collection_record:
473 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
474 if 'manifest_text' not in new_collection_record:
475 new_collection_record['manifest_text'] = coll_reader.manifest_text()
477 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
478 self.new_collection(new_collection_record, coll_reader)
480 self._manifest_size = len(coll_reader.manifest_text())
481 _logger.debug("%s manifest_size %i", self, self._manifest_size)
482 # end with llfuse.lock_released, re-acquire lock
487 self._updating_lock.release()
488 except arvados.errors.NotFoundError as e:
489 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
490 except arvados.errors.ArgumentError as detail:
491 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
492 if self.collection_record is not None and "manifest_text" in self.collection_record:
493 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
495 _logger.exception("arv-mount %s: error", self.collection_locator)
496 if self.collection_record is not None and "manifest_text" in self.collection_record:
497 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
503 def __getitem__(self, item):
504 if item == '.arvados#collection':
505 if self.collection_record_file is None:
506 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
507 self.inodes.add_entry(self.collection_record_file)
508 return self.collection_record_file
510 return super(CollectionDirectory, self).__getitem__(item)
512 def __contains__(self, k):
513 if k == '.arvados#collection':
516 return super(CollectionDirectory, self).__contains__(k)
518 def invalidate(self):
519 self.collection_record = None
520 self.collection_record_file = None
521 super(CollectionDirectory, self).invalidate()
524 return (self.collection_locator is not None)
527 # This is an empirically-derived heuristic to estimate the memory used
528 # to store this collection's metadata. Calculating the memory
529 # footprint directly would be more accurate, but also more complicated.
530 return self._manifest_size * 128
533 if self.collection is not None:
535 self.collection.save()
536 self.collection.stop_threads()
539 if self.collection is not None:
540 self.collection.stop_threads()
541 super(CollectionDirectory, self).clear()
542 self._manifest_size = 0
545 class TmpCollectionDirectory(CollectionDirectoryBase):
546 """A directory backed by an Arvados collection that never gets saved.
548 This supports using Keep as scratch space. A userspace program can
549 read the .arvados#collection file to get a current manifest in
550 order to save a snapshot of the scratch data or use it as a crunch
554 class UnsaveableCollection(arvados.collection.Collection):
560 def __init__(self, parent_inode, inodes, api_client, num_retries):
561 collection = self.UnsaveableCollection(
562 api_client=api_client,
563 keep_client=api_client.keep,
564 num_retries=num_retries)
565 super(TmpCollectionDirectory, self).__init__(
566 parent_inode, inodes, api_client.config, collection)
567 self.collection_record_file = None
568 self.populate(self.mtime())
570 def on_event(self, *args, **kwargs):
571 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
572 if self.collection_record_file:
574 self.collection_record_file.invalidate()
575 self.inodes.invalidate_inode(self.collection_record_file)
576 _logger.debug("%s invalidated collection record", self)
578 def collection_record(self):
579 with llfuse.lock_released:
582 "manifest_text": self.collection.manifest_text(),
583 "portable_data_hash": self.collection.portable_data_hash(),
586 def __contains__(self, k):
587 return (k == '.arvados#collection' or
588 super(TmpCollectionDirectory, self).__contains__(k))
591 def __getitem__(self, item):
592 if item == '.arvados#collection':
593 if self.collection_record_file is None:
594 self.collection_record_file = FuncToJSONFile(
595 self.inode, self.collection_record)
596 self.inodes.add_entry(self.collection_record_file)
597 return self.collection_record_file
598 return super(TmpCollectionDirectory, self).__getitem__(item)
606 def want_event_subscribe(self):
610 self.collection.stop_threads()
612 def invalidate(self):
613 if self.collection_record_file:
614 self.collection_record_file.invalidate()
615 super(TmpCollectionDirectory, self).invalidate()
618 class MagicDirectory(Directory):
619 """A special directory that logically contains the set of all extant keep locators.
621 When a file is referenced by lookup(), it is tested to see if it is a valid
622 keep locator to a manifest, and if so, loads the manifest contents as a
623 subdirectory of this directory with the locator as the directory name.
624 Since querying a list of all extant keep locators is impractical, only
625 collections that have already been accessed are visible to readdir().
630 This directory provides access to Arvados collections as subdirectories listed
631 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
632 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
633 (in the form 'zzzzz-j7d0g-1234567890abcde').
635 Note that this directory will appear empty until you attempt to access a
636 specific collection or project subdirectory (such as trying to 'cd' into it),
637 at which point the collection or project will actually be looked up on the server
638 and the directory will appear if it exists.
642 def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
643 super(MagicDirectory, self).__init__(parent_inode, inodes, api.config)
645 self.num_retries = num_retries
646 self.pdh_only = pdh_only
648 def __setattr__(self, name, value):
649 super(MagicDirectory, self).__setattr__(name, value)
650 # When we're assigned an inode, add a README.
651 if ((name == 'inode') and (self.inode is not None) and
652 (not self._entries)):
653 self._entries['README'] = self.inodes.add_entry(
654 StringFile(self.inode, self.README_TEXT, time.time()))
655 # If we're the root directory, add an identical by_id subdirectory.
656 if self.inode == llfuse.ROOT_INODE:
657 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
658 self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
660 def __contains__(self, k):
661 if k in self._entries:
664 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
670 if group_uuid_pattern.match(k):
671 project = self.api.groups().list(
672 filters=[['group_class', '=', 'project'], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
673 if project[u'items_available'] == 0:
675 e = self.inodes.add_entry(ProjectDirectory(
676 self.inode, self.inodes, self.api, self.num_retries, project[u'items'][0]))
679 e = self.inodes.add_entry(CollectionDirectory(
680 self.inode, self.inodes, self.api, self.num_retries, k))
683 if k not in self._entries:
686 self.inodes.del_entry(e)
689 self.inodes.invalidate_entry(self, k)
690 self.inodes.del_entry(e)
692 except Exception as ex:
693 _logger.exception("arv-mount lookup '%s':", k)
695 self.inodes.del_entry(e)
698 def __getitem__(self, item):
700 return self._entries[item]
702 raise KeyError("No collection with id " + item)
707 def want_event_subscribe(self):
708 return not self.pdh_only
711 class TagsDirectory(Directory):
712 """A special directory that contains as subdirectories all tags visible to the user."""
714 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
715 super(TagsDirectory, self).__init__(parent_inode, inodes, api.config)
717 self.num_retries = num_retries
719 self._poll_time = poll_time
722 def want_event_subscribe(self):
727 with llfuse.lock_released:
728 tags = self.api.links().list(
729 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
730 select=['name'], distinct=True, limit=1000
731 ).execute(num_retries=self.num_retries)
733 self.merge(tags['items']+[{"name": n} for n in self._extra],
735 lambda a, i: a.tag == i['name'],
736 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
740 def __getitem__(self, item):
741 if super(TagsDirectory, self).__contains__(item):
742 return super(TagsDirectory, self).__getitem__(item)
743 with llfuse.lock_released:
744 tags = self.api.links().list(
745 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
746 ).execute(num_retries=self.num_retries)
748 self._extra.add(item)
750 return super(TagsDirectory, self).__getitem__(item)
754 def __contains__(self, k):
755 if super(TagsDirectory, self).__contains__(k):
765 class TagDirectory(Directory):
766 """A special directory that contains as subdirectories all collections visible
767 to the user that are tagged with a particular tag.
770 def __init__(self, parent_inode, inodes, api, num_retries, tag,
771 poll=False, poll_time=60):
772 super(TagDirectory, self).__init__(parent_inode, inodes, api.config)
774 self.num_retries = num_retries
777 self._poll_time = poll_time
779 def want_event_subscribe(self):
784 with llfuse.lock_released:
785 taggedcollections = self.api.links().list(
786 filters=[['link_class', '=', 'tag'],
787 ['name', '=', self.tag],
788 ['head_uuid', 'is_a', 'arvados#collection']],
790 ).execute(num_retries=self.num_retries)
791 self.merge(taggedcollections['items'],
792 lambda i: i['head_uuid'],
793 lambda a, i: a.collection_locator == i['head_uuid'],
794 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
797 class ProjectDirectory(Directory):
798 """A special directory that contains the contents of a project."""
800 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
801 poll=False, poll_time=60):
802 super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config)
804 self.num_retries = num_retries
805 self.project_object = project_object
806 self.project_object_file = None
807 self.project_uuid = project_object['uuid']
809 self._poll_time = poll_time
810 self._updating_lock = threading.Lock()
811 self._current_user = None
812 self._full_listing = False
814 def want_event_subscribe(self):
817 def createDirectory(self, i):
818 if collection_uuid_pattern.match(i['uuid']):
819 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
820 elif group_uuid_pattern.match(i['uuid']):
821 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
822 elif link_uuid_pattern.match(i['uuid']):
823 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
824 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
827 elif uuid_pattern.match(i['uuid']):
828 return ObjectFile(self.parent_inode, i)
833 return self.project_uuid
836 self._full_listing = True
837 return super(ProjectDirectory, self).items()
841 if i['name'] is None or len(i['name']) == 0:
843 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
844 # collection or subproject
846 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
849 elif 'kind' in i and i['kind'].startswith('arvados#'):
851 return "{}.{}".format(i['name'], i['kind'][8:])
858 if self.project_object_file == None:
859 self.project_object_file = ObjectFile(self.inode, self.project_object)
860 self.inodes.add_entry(self.project_object_file)
862 if not self._full_listing:
866 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
867 return a.uuid() == i['uuid']
868 elif isinstance(a, ObjectFile):
869 return a.uuid() == i['uuid'] and not a.stale()
873 with llfuse.lock_released:
874 self._updating_lock.acquire()
878 if group_uuid_pattern.match(self.project_uuid):
879 self.project_object = self.api.groups().get(
880 uuid=self.project_uuid).execute(num_retries=self.num_retries)
881 elif user_uuid_pattern.match(self.project_uuid):
882 self.project_object = self.api.users().get(
883 uuid=self.project_uuid).execute(num_retries=self.num_retries)
885 contents = arvados.util.list_all(self.api.groups().list,
887 filters=[["owner_uuid", "=", self.project_uuid],
888 ["group_class", "=", "project"]])
889 contents.extend(arvados.util.list_all(self.api.collections().list,
891 filters=[["owner_uuid", "=", self.project_uuid]]))
893 # end with llfuse.lock_released, re-acquire lock
898 self.createDirectory)
901 self._updating_lock.release()
903 def _add_entry(self, i, name):
904 ent = self.createDirectory(i)
905 self._entries[name] = self.inodes.add_entry(ent)
906 return self._entries[name]
910 def __getitem__(self, k):
911 if k == '.arvados#project':
912 return self.project_object_file
913 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
914 return super(ProjectDirectory, self).__getitem__(k)
915 with llfuse.lock_released:
916 k2 = self.unsanitize_filename(k)
918 namefilter = ["name", "=", k]
920 namefilter = ["name", "in", [k, k2]]
921 contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
922 ["group_class", "=", "project"],
924 limit=2).execute(num_retries=self.num_retries)["items"]
926 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
928 limit=2).execute(num_retries=self.num_retries)["items"]
930 if len(contents) > 1 and contents[1].name == k:
931 # If "foo/bar" and "foo[SUBST]bar" both exist, use
933 contents = [contents[1]]
934 name = self.sanitize_filename(self.namefn(contents[0]))
937 return self._add_entry(contents[0], name)
942 def __contains__(self, k):
943 if k == '.arvados#project':
955 with llfuse.lock_released:
956 if not self._current_user:
957 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
958 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
965 def mkdir(self, name):
967 with llfuse.lock_released:
968 self.api.collections().create(body={"owner_uuid": self.project_uuid,
970 "manifest_text": ""}).execute(num_retries=self.num_retries)
972 except apiclient_errors.Error as error:
974 raise llfuse.FUSEError(errno.EEXIST)
978 def rmdir(self, name):
980 raise llfuse.FUSEError(errno.ENOENT)
981 if not isinstance(self[name], CollectionDirectory):
982 raise llfuse.FUSEError(errno.EPERM)
983 if len(self[name]) > 0:
984 raise llfuse.FUSEError(errno.ENOTEMPTY)
985 with llfuse.lock_released:
986 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
991 def rename(self, name_old, name_new, src):
992 if not isinstance(src, ProjectDirectory):
993 raise llfuse.FUSEError(errno.EPERM)
997 if not isinstance(ent, CollectionDirectory):
998 raise llfuse.FUSEError(errno.EPERM)
1000 if name_new in self:
1001 # POSIX semantics for replacing one directory with another is
1002 # tricky (the target directory must be empty, the operation must be
1003 # atomic which isn't possible with the Arvados API as of this
1004 # writing) so don't support that.
1005 raise llfuse.FUSEError(errno.EPERM)
1007 self.api.collections().update(uuid=ent.uuid(),
1008 body={"owner_uuid": self.uuid(),
1009 "name": name_new}).execute(num_retries=self.num_retries)
1011 # Acually move the entry from source directory to this directory.
1012 del src._entries[name_old]
1013 self._entries[name_new] = ent
1014 self.inodes.invalidate_entry(src, name_old)
1017 def child_event(self, ev):
1018 properties = ev.get("properties") or {}
1019 old_attrs = properties.get("old_attributes") or {}
1020 new_attrs = properties.get("new_attributes") or {}
1021 old_attrs["uuid"] = ev["object_uuid"]
1022 new_attrs["uuid"] = ev["object_uuid"]
1023 old_name = self.sanitize_filename(self.namefn(old_attrs))
1024 new_name = self.sanitize_filename(self.namefn(new_attrs))
1026 # create events will have a new name, but not an old name
1027 # delete events will have an old name, but not a new name
1028 # update events will have an old and new name, and they may be same or different
1029 # if they are the same, an unrelated field changed and there is nothing to do.
1031 if old_attrs.get("owner_uuid") != self.project_uuid:
1032 # Was moved from somewhere else, so don't try to remove entry.
1034 if ev.get("object_owner_uuid") != self.project_uuid:
1035 # Was moved to somewhere else, so don't try to add entry
1038 if old_attrs.get("is_trashed"):
1039 # Was previously deleted
1041 if new_attrs.get("is_trashed"):
1045 if new_name != old_name:
1047 if old_name in self._entries:
1048 ent = self._entries[old_name]
1049 del self._entries[old_name]
1050 self.inodes.invalidate_entry(self, old_name)
1054 self._entries[new_name] = ent
1056 self._add_entry(new_attrs, new_name)
1057 elif ent is not None:
1058 self.inodes.del_entry(ent)
1061 class SharedDirectory(Directory):
1062 """A special directory that represents users or groups who have shared projects with me."""
1064 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1065 poll=False, poll_time=60):
1066 super(SharedDirectory, self).__init__(parent_inode, inodes, api.config)
1068 self.num_retries = num_retries
1069 self.current_user = api.users().current().execute(num_retries=num_retries)
1071 self._poll_time = poll_time
1072 self._updating_lock = threading.Lock()
1077 with llfuse.lock_released:
1078 self._updating_lock.acquire()
1079 if not self.stale():
1087 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1088 if 'httpMethod' in methods.get('shared', {}):
1091 resp = self.api.groups().shared(filters=[['group_class', '=', 'project']]+page,
1095 include="owner_uuid").execute()
1096 if not resp["items"]:
1098 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1099 for r in resp["items"]:
1100 objects[r["uuid"]] = r
1101 roots.append(r["uuid"])
1102 for r in resp["included"]:
1103 objects[r["uuid"]] = r
1104 root_owners.add(r["uuid"])
1106 all_projects = arvados.util.list_all(
1107 self.api.groups().list, self.num_retries,
1108 filters=[['group_class','=','project']],
1109 select=["uuid", "owner_uuid"])
1110 for ob in all_projects:
1111 objects[ob['uuid']] = ob
1113 current_uuid = self.current_user['uuid']
1114 for ob in all_projects:
1115 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1116 roots.append(ob['uuid'])
1117 root_owners.add(ob['owner_uuid'])
1119 lusers = arvados.util.list_all(
1120 self.api.users().list, self.num_retries,
1121 filters=[['uuid','in', list(root_owners)]])
1122 lgroups = arvados.util.list_all(
1123 self.api.groups().list, self.num_retries,
1124 filters=[['uuid','in', list(root_owners)+roots]])
1127 objects[l["uuid"]] = l
1129 objects[l["uuid"]] = l
1131 for r in root_owners:
1135 contents[obr["name"]] = obr
1136 #elif obr.get("username"):
1137 # contents[obr["username"]] = obr
1138 elif "first_name" in obr:
1139 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1144 if obr['owner_uuid'] not in objects:
1145 contents[obr["name"]] = obr
1147 # end with llfuse.lock_released, re-acquire lock
1149 self.merge(viewitems(contents),
1151 lambda a, i: a.uuid() == i[1]['uuid'],
1152 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
1154 _logger.exception("arv-mount shared dir error")
1156 self._updating_lock.release()
1158 def want_event_subscribe(self):