1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
15 from apiclient import errors as apiclient_errors
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
23 _logger = logging.getLogger('arvados.arvados_fuse')
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
32 class Directory(FreshBase):
33 """Generic directory object, backed by a dict.
35 Consists of a set of entries with the key representing the filename
36 and the value referencing a File or Directory object.
39 def __init__(self, parent_inode, inodes, apiconfig, enable_write):
40 """parent_inode is the integer inode number"""
42 super(Directory, self).__init__()
45 if not isinstance(parent_inode, int):
46 raise Exception("parent_inode should be an int")
47 self.parent_inode = parent_inode
49 self.apiconfig = apiconfig
51 self._mtime = time.time()
52 self._enable_write = enable_write
54 def forward_slash_subst(self):
55 if not hasattr(self, '_fsns'):
57 config = self.apiconfig()
59 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
61 # old API server with no FSNS config
64 if self._fsns == '' or self._fsns == '/':
68 def unsanitize_filename(self, incoming):
69 """Replace ForwardSlashNameSubstitution value with /"""
70 fsns = self.forward_slash_subst()
71 if isinstance(fsns, str):
72 return incoming.replace(fsns, '/')
76 def sanitize_filename(self, dirty):
77 """Replace disallowed filename characters according to
78 ForwardSlashNameSubstitution in self.api_config."""
79 # '.' and '..' are not reachable if API server is newer than #6277
89 fsns = self.forward_slash_subst()
90 if isinstance(fsns, str):
91 dirty = dirty.replace('/', fsns)
92 return _disallowed_filename_characters.sub('_', dirty)
95 # Overridden by subclasses to implement logic to update the
96 # entries dict when the directory is stale
101 # Only used when computing the size of the disk footprint of the directory
109 def checkupdate(self):
113 except apiclient.errors.HttpError as e:
118 def __getitem__(self, item):
119 return self._entries[item]
124 return list(self._entries.items())
128 def __contains__(self, k):
129 return k in self._entries
134 return len(self._entries)
137 self.inodes.touch(self)
138 super(Directory, self).fresh()
140 def merge(self, items, fn, same, new_entry):
141 """Helper method for updating the contents of the directory.
143 Takes a list describing the new contents of the directory, reuse
144 entries that are the same in both the old and new lists, create new
145 entries, and delete old entries missing from the new list.
147 :items: iterable with new directory contents
149 :fn: function to take an entry in 'items' and return the desired file or
150 directory name, or None if this entry should be skipped
152 :same: function to compare an existing entry (a File or Directory
153 object) with an entry in the items list to determine whether to keep
156 :new_entry: function to create a new directory entry (File or Directory
157 object) from an entry in the items list.
161 oldentries = self._entries
165 name = self.sanitize_filename(fn(i))
167 if name in oldentries and same(oldentries[name], i):
168 # move existing directory entry over
169 self._entries[name] = oldentries[name]
172 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
173 # create new directory entry
176 self._entries[name] = self.inodes.add_entry(ent)
179 # delete any other directory entries that were not in found in 'items'
181 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
182 self.inodes.invalidate_entry(self, i)
183 self.inodes.del_entry(oldentries[i])
187 self.inodes.invalidate_inode(self)
188 self._mtime = time.time()
193 if super(Directory, self).in_use():
195 for v in self._entries.values():
200 def has_ref(self, only_children):
201 if super(Directory, self).has_ref(only_children):
203 for v in self._entries.values():
209 """Delete all entries"""
210 oldentries = self._entries
213 oldentries[n].clear()
214 self.inodes.del_entry(oldentries[n])
217 def kernel_invalidate(self):
218 # Invalidating the dentry on the parent implies invalidating all paths
220 parent = self.inodes[self.parent_inode]
222 # Find self on the parent in order to invalidate this path.
223 # Calling the public items() method might trigger a refresh,
224 # which we definitely don't want, so read the internal dict directly.
225 for k,v in parent._entries.items():
227 self.inodes.invalidate_entry(parent, k)
239 def want_event_subscribe(self):
240 raise NotImplementedError()
242 def create(self, name):
243 raise NotImplementedError()
245 def mkdir(self, name):
246 raise NotImplementedError()
248 def unlink(self, name):
249 raise NotImplementedError()
251 def rmdir(self, name):
252 raise NotImplementedError()
254 def rename(self, name_old, name_new, src):
255 raise NotImplementedError()
258 class CollectionDirectoryBase(Directory):
259 """Represent an Arvados Collection as a directory.
261 This class is used for Subcollections, and is also the base class for
262 CollectionDirectory, which implements collection loading/saving on
265 Most operations act only the underlying Arvados `Collection` object. The
266 `Collection` object signals via a notify callback to
267 `CollectionDirectoryBase.on_event` that an item was added, removed or
268 modified. FUSE inodes and directory entries are created, deleted or
269 invalidated in response to these events.
273 def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection):
274 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write)
275 self.apiconfig = apiconfig
276 self.collection = collection
278 def new_entry(self, name, item, mtime):
279 name = self.sanitize_filename(name)
280 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
281 if item.fuse_entry.dead is not True:
282 raise Exception("Can only reparent dead inode entry")
283 if item.fuse_entry.inode is None:
284 raise Exception("Reparented entry must still have valid inode")
285 item.fuse_entry.dead = False
286 self._entries[name] = item.fuse_entry
287 elif isinstance(item, arvados.collection.RichCollectionBase):
288 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item))
289 self._entries[name].populate(mtime)
291 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
292 item.fuse_entry = self._entries[name]
294 def on_event(self, event, collection, name, item):
295 if collection == self.collection:
296 name = self.sanitize_filename(name)
299 # It's possible for another thread to have llfuse.lock and
300 # be waiting on collection.lock. Meanwhile, we released
301 # llfuse.lock earlier in the stack, but are still holding
302 # on to the collection lock, and now we need to re-acquire
303 # llfuse.lock. If we don't release the collection lock,
304 # we'll deadlock where we're holding the collection lock
305 # waiting for llfuse.lock and the other thread is holding
306 # llfuse.lock and waiting for the collection lock.
308 # The correct locking order here is to take llfuse.lock
309 # first, then the collection lock.
311 # Since collection.lock is an RLock, it might be locked
312 # multiple times, so we need to release it multiple times,
313 # keep a count, then re-lock it the correct number of
319 self.collection.lock.release()
326 with self.collection.lock:
327 if event == arvados.collection.ADD:
328 self.new_entry(name, item, self.mtime())
329 elif event == arvados.collection.DEL:
330 ent = self._entries[name]
331 del self._entries[name]
332 self.inodes.invalidate_entry(self, name)
333 self.inodes.del_entry(ent)
334 elif event == arvados.collection.MOD:
335 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
336 self.inodes.invalidate_inode(item.fuse_entry)
337 elif name in self._entries:
338 self.inodes.invalidate_inode(self._entries[name])
341 self.collection.lock.acquire()
344 def populate(self, mtime):
346 with self.collection.lock:
347 self.collection.subscribe(self.on_event)
348 for entry, item in self.collection.items():
349 self.new_entry(entry, item, self.mtime())
352 return self._enable_write and self.collection.writable()
356 if not self.writable():
358 with llfuse.lock_released:
359 self.collection.root_collection().save()
363 def create(self, name):
364 if not self.writable():
365 raise llfuse.FUSEError(errno.EROFS)
366 with llfuse.lock_released:
367 self.collection.open(name, "w").close()
371 def mkdir(self, name):
372 if not self.writable():
373 raise llfuse.FUSEError(errno.EROFS)
374 with llfuse.lock_released:
375 self.collection.mkdirs(name)
379 def unlink(self, name):
380 if not self.writable():
381 raise llfuse.FUSEError(errno.EROFS)
382 with llfuse.lock_released:
383 self.collection.remove(name)
388 def rmdir(self, name):
389 if not self.writable():
390 raise llfuse.FUSEError(errno.EROFS)
391 with llfuse.lock_released:
392 self.collection.remove(name)
397 def rename(self, name_old, name_new, src):
398 if not self.writable():
399 raise llfuse.FUSEError(errno.EROFS)
401 if not isinstance(src, CollectionDirectoryBase):
402 raise llfuse.FUSEError(errno.EPERM)
407 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
409 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
411 raise llfuse.FUSEError(errno.ENOTEMPTY)
412 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
413 raise llfuse.FUSEError(errno.ENOTDIR)
414 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
415 raise llfuse.FUSEError(errno.EISDIR)
417 with llfuse.lock_released:
418 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
423 super(CollectionDirectoryBase, self).clear()
424 self.collection = None
427 class CollectionDirectory(CollectionDirectoryBase):
428 """Represents the root of a directory tree representing a collection."""
430 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None):
431 super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None)
433 self.num_retries = num_retries
434 self.collection_record_file = None
435 self.collection_record = None
438 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
440 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
441 self._poll_time = 60*60
443 if isinstance(collection_record, dict):
444 self.collection_locator = collection_record['uuid']
445 self._mtime = convertTime(collection_record.get('modified_at'))
447 self.collection_locator = collection_record
449 self._manifest_size = 0
450 if self.collection_locator:
451 self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
452 self._updating_lock = threading.Lock()
455 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
458 return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
460 def want_event_subscribe(self):
461 return (uuid_pattern.match(self.collection_locator) is not None)
463 # Used by arv-web.py to switch the contents of the CollectionDirectory
464 def change_collection(self, new_locator):
465 """Switch the contents of the CollectionDirectory.
467 Must be called with llfuse.lock held.
470 self.collection_locator = new_locator
471 self.collection_record = None
474 def new_collection(self, new_collection_record, coll_reader):
478 self.collection_record = new_collection_record
480 if self.collection_record:
481 self._mtime = convertTime(self.collection_record.get('modified_at'))
482 self.collection_locator = self.collection_record["uuid"]
483 if self.collection_record_file is not None:
484 self.collection_record_file.update(self.collection_record)
486 self.collection = coll_reader
487 self.populate(self.mtime())
490 return self.collection_locator
493 def update(self, to_record_version=None):
495 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
498 if self.collection_locator is None:
503 with llfuse.lock_released:
504 self._updating_lock.acquire()
508 _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
509 new_collection_record = None
510 if self.collection is not None:
511 if self.collection.known_past_version(to_record_version):
512 _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
514 self.collection.update()
516 if uuid_pattern.match(self.collection_locator):
517 coll_reader = arvados.collection.Collection(
518 self.collection_locator, self.api, self.api.keep,
519 num_retries=self.num_retries)
521 coll_reader = arvados.collection.CollectionReader(
522 self.collection_locator, self.api, self.api.keep,
523 num_retries=self.num_retries)
524 new_collection_record = coll_reader.api_response() or {}
525 # If the Collection only exists in Keep, there will be no API
526 # response. Fill in the fields we need.
527 if 'uuid' not in new_collection_record:
528 new_collection_record['uuid'] = self.collection_locator
529 if "portable_data_hash" not in new_collection_record:
530 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
531 if 'manifest_text' not in new_collection_record:
532 new_collection_record['manifest_text'] = coll_reader.manifest_text()
533 if 'storage_classes_desired' not in new_collection_record:
534 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
536 # end with llfuse.lock_released, re-acquire lock
537 if (new_collection_record is not None and
538 (self.collection_record is None or
539 self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"))):
540 self.new_collection(new_collection_record, coll_reader)
541 self._manifest_size = len(coll_reader.manifest_text())
542 _logger.debug("%s manifest_size %i", self, self._manifest_size)
547 self._updating_lock.release()
548 except arvados.errors.NotFoundError as e:
549 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
550 except arvados.errors.ArgumentError as detail:
551 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
552 if self.collection_record is not None and "manifest_text" in self.collection_record:
553 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
555 _logger.exception("arv-mount %s: error", self.collection_locator)
556 if self.collection_record is not None and "manifest_text" in self.collection_record:
557 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
563 def __getitem__(self, item):
564 if item == '.arvados#collection':
565 if self.collection_record_file is None:
566 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
567 self.inodes.add_entry(self.collection_record_file)
568 return self.collection_record_file
570 return super(CollectionDirectory, self).__getitem__(item)
572 def __contains__(self, k):
573 if k == '.arvados#collection':
576 return super(CollectionDirectory, self).__contains__(k)
578 def invalidate(self):
579 self.collection_record = None
580 self.collection_record_file = None
581 super(CollectionDirectory, self).invalidate()
584 return (self.collection_locator is not None)
587 # This is an empirically-derived heuristic to estimate the memory used
588 # to store this collection's metadata. Calculating the memory
589 # footprint directly would be more accurate, but also more complicated.
590 return self._manifest_size * 128
593 if self.collection is not None:
595 self.collection.save()
596 self.collection.stop_threads()
599 if self.collection is not None:
600 self.collection.stop_threads()
601 super(CollectionDirectory, self).clear()
602 self._manifest_size = 0
605 class TmpCollectionDirectory(CollectionDirectoryBase):
606 """A directory backed by an Arvados collection that never gets saved.
608 This supports using Keep as scratch space. A userspace program can
609 read the .arvados#collection file to get a current manifest in
610 order to save a snapshot of the scratch data or use it as a crunch
614 class UnsaveableCollection(arvados.collection.Collection):
620 def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, storage_classes=None):
621 collection = self.UnsaveableCollection(
622 api_client=api_client,
623 keep_client=api_client.keep,
624 num_retries=num_retries,
625 storage_classes_desired=storage_classes)
626 # This is always enable_write=True because it never tries to
627 # save to the backend
628 super(TmpCollectionDirectory, self).__init__(
629 parent_inode, inodes, api_client.config, True, collection)
630 self.collection_record_file = None
631 self.populate(self.mtime())
633 def on_event(self, *args, **kwargs):
634 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
635 if self.collection_record_file:
637 # See discussion in CollectionDirectoryBase.on_event
641 self.collection.lock.release()
648 with self.collection.lock:
649 self.collection_record_file.invalidate()
650 self.inodes.invalidate_inode(self.collection_record_file)
651 _logger.debug("%s invalidated collection record", self)
654 self.collection.lock.acquire()
657 def collection_record(self):
658 with llfuse.lock_released:
661 "manifest_text": self.collection.manifest_text(),
662 "portable_data_hash": self.collection.portable_data_hash(),
663 "storage_classes_desired": self.collection.storage_classes_desired(),
666 def __contains__(self, k):
667 return (k == '.arvados#collection' or
668 super(TmpCollectionDirectory, self).__contains__(k))
671 def __getitem__(self, item):
672 if item == '.arvados#collection':
673 if self.collection_record_file is None:
674 self.collection_record_file = FuncToJSONFile(
675 self.inode, self.collection_record)
676 self.inodes.add_entry(self.collection_record_file)
677 return self.collection_record_file
678 return super(TmpCollectionDirectory, self).__getitem__(item)
686 def want_event_subscribe(self):
690 self.collection.stop_threads()
692 def invalidate(self):
693 if self.collection_record_file:
694 self.collection_record_file.invalidate()
695 super(TmpCollectionDirectory, self).invalidate()
698 class MagicDirectory(Directory):
699 """A special directory that logically contains the set of all extant keep locators.
701 When a file is referenced by lookup(), it is tested to see if it is a valid
702 keep locator to a manifest, and if so, loads the manifest contents as a
703 subdirectory of this directory with the locator as the directory name.
704 Since querying a list of all extant keep locators is impractical, only
705 collections that have already been accessed are visible to readdir().
710 This directory provides access to Arvados collections as subdirectories listed
711 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
712 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
713 (in the form 'zzzzz-j7d0g-1234567890abcde').
715 Note that this directory will appear empty until you attempt to access a
716 specific collection or project subdirectory (such as trying to 'cd' into it),
717 at which point the collection or project will actually be looked up on the server
718 and the directory will appear if it exists.
722 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, pdh_only=False, storage_classes=None):
723 super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
725 self.num_retries = num_retries
726 self.pdh_only = pdh_only
727 self.storage_classes = storage_classes
729 def __setattr__(self, name, value):
730 super(MagicDirectory, self).__setattr__(name, value)
731 # When we're assigned an inode, add a README.
732 if ((name == 'inode') and (self.inode is not None) and
733 (not self._entries)):
734 self._entries['README'] = self.inodes.add_entry(
735 StringFile(self.inode, self.README_TEXT, time.time()))
736 # If we're the root directory, add an identical by_id subdirectory.
737 if self.inode == llfuse.ROOT_INODE:
738 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
739 self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
742 def __contains__(self, k):
743 if k in self._entries:
746 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
752 if group_uuid_pattern.match(k):
753 project = self.api.groups().list(
754 filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
755 if project[u'items_available'] == 0:
757 e = self.inodes.add_entry(ProjectDirectory(
758 self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
759 project[u'items'][0], storage_classes=self.storage_classes))
761 e = self.inodes.add_entry(CollectionDirectory(
762 self.inode, self.inodes, self.api, self.num_retries, self._enable_write, k))
765 if k not in self._entries:
768 self.inodes.del_entry(e)
771 self.inodes.invalidate_entry(self, k)
772 self.inodes.del_entry(e)
774 except Exception as ex:
775 _logger.exception("arv-mount lookup '%s':", k)
777 self.inodes.del_entry(e)
780 def __getitem__(self, item):
782 return self._entries[item]
784 raise KeyError("No collection with id " + item)
789 def want_event_subscribe(self):
790 return not self.pdh_only
793 class TagsDirectory(Directory):
794 """A special directory that contains as subdirectories all tags visible to the user."""
796 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, poll_time=60):
797 super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
799 self.num_retries = num_retries
801 self._poll_time = poll_time
804 def want_event_subscribe(self):
809 with llfuse.lock_released:
810 tags = self.api.links().list(
811 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
812 select=['name'], distinct=True, limit=1000
813 ).execute(num_retries=self.num_retries)
815 self.merge(tags['items']+[{"name": n} for n in self._extra],
817 lambda a, i: a.tag == i['name'],
818 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
819 i['name'], poll=self._poll, poll_time=self._poll_time))
823 def __getitem__(self, item):
824 if super(TagsDirectory, self).__contains__(item):
825 return super(TagsDirectory, self).__getitem__(item)
826 with llfuse.lock_released:
827 tags = self.api.links().list(
828 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
829 ).execute(num_retries=self.num_retries)
831 self._extra.add(item)
833 return super(TagsDirectory, self).__getitem__(item)
837 def __contains__(self, k):
838 if super(TagsDirectory, self).__contains__(k):
848 class TagDirectory(Directory):
849 """A special directory that contains as subdirectories all collections visible
850 to the user that are tagged with a particular tag.
853 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, tag,
854 poll=False, poll_time=60):
855 super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
857 self.num_retries = num_retries
860 self._poll_time = poll_time
862 def want_event_subscribe(self):
867 with llfuse.lock_released:
868 taggedcollections = self.api.links().list(
869 filters=[['link_class', '=', 'tag'],
870 ['name', '=', self.tag],
871 ['head_uuid', 'is_a', 'arvados#collection']],
873 ).execute(num_retries=self.num_retries)
874 self.merge(taggedcollections['items'],
875 lambda i: i['head_uuid'],
876 lambda a, i: a.collection_locator == i['head_uuid'],
877 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid']))
880 class ProjectDirectory(Directory):
881 """A special directory that contains the contents of a project."""
883 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, project_object,
884 poll=True, poll_time=3, storage_classes=None):
885 super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
887 self.num_retries = num_retries
888 self.project_object = project_object
889 self.project_object_file = None
890 self.project_uuid = project_object['uuid']
892 self._poll_time = poll_time
893 self._updating_lock = threading.Lock()
894 self._current_user = None
895 self._full_listing = False
896 self.storage_classes = storage_classes
898 def want_event_subscribe(self):
901 def createDirectory(self, i):
902 if collection_uuid_pattern.match(i['uuid']):
903 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i)
904 elif group_uuid_pattern.match(i['uuid']):
905 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
906 i, self._poll, self._poll_time, self.storage_classes)
907 elif link_uuid_pattern.match(i['uuid']):
908 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
909 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid'])
912 elif uuid_pattern.match(i['uuid']):
913 return ObjectFile(self.parent_inode, i)
918 return self.project_uuid
921 self._full_listing = True
922 return super(ProjectDirectory, self).items()
926 if i['name'] is None or len(i['name']) == 0:
928 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
929 # collection or subproject
931 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
934 elif 'kind' in i and i['kind'].startswith('arvados#'):
936 return "{}.{}".format(i['name'], i['kind'][8:])
943 if self.project_object_file == None:
944 self.project_object_file = ObjectFile(self.inode, self.project_object)
945 self.inodes.add_entry(self.project_object_file)
947 if not self._full_listing:
951 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
952 return a.uuid() == i['uuid']
953 elif isinstance(a, ObjectFile):
954 return a.uuid() == i['uuid'] and not a.stale()
958 with llfuse.lock_released:
959 self._updating_lock.acquire()
963 if group_uuid_pattern.match(self.project_uuid):
964 self.project_object = self.api.groups().get(
965 uuid=self.project_uuid).execute(num_retries=self.num_retries)
966 elif user_uuid_pattern.match(self.project_uuid):
967 self.project_object = self.api.users().get(
968 uuid=self.project_uuid).execute(num_retries=self.num_retries)
969 # do this in 2 steps until #17424 is fixed
970 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
972 num_retries=self.num_retries,
973 uuid=self.project_uuid,
974 filters=[["uuid", "is_a", "arvados#group"],
975 ["groups.group_class", "in", ["project","filter"]]]))
976 contents.extend(arvados.util.keyset_list_all(self.api.groups().contents,
978 num_retries=self.num_retries,
979 uuid=self.project_uuid,
980 filters=[["uuid", "is_a", "arvados#collection"]]))
982 # end with llfuse.lock_released, re-acquire lock
987 self.createDirectory)
990 self._updating_lock.release()
992 def _add_entry(self, i, name):
993 ent = self.createDirectory(i)
994 self._entries[name] = self.inodes.add_entry(ent)
995 return self._entries[name]
999 def __getitem__(self, k):
1000 if k == '.arvados#project':
1001 return self.project_object_file
1002 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1003 return super(ProjectDirectory, self).__getitem__(k)
1004 with llfuse.lock_released:
1005 k2 = self.unsanitize_filename(k)
1007 namefilter = ["name", "=", k]
1009 namefilter = ["name", "in", [k, k2]]
1010 contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
1011 ["group_class", "in", ["project","filter"]],
1013 limit=2).execute(num_retries=self.num_retries)["items"]
1015 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
1017 limit=2).execute(num_retries=self.num_retries)["items"]
1019 if len(contents) > 1 and contents[1]['name'] == k:
1020 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1022 contents = [contents[1]]
1023 name = self.sanitize_filename(self.namefn(contents[0]))
1026 return self._add_entry(contents[0], name)
1031 def __contains__(self, k):
1032 if k == '.arvados#project':
1044 if not self._enable_write:
1046 with llfuse.lock_released:
1047 if not self._current_user:
1048 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1049 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1051 def persisted(self):
1056 def mkdir(self, name):
1057 if not self.writable():
1058 raise llfuse.FUSEError(errno.EROFS)
1061 with llfuse.lock_released:
1063 "owner_uuid": self.project_uuid,
1065 "manifest_text": "" }
1066 if self.storage_classes is not None:
1067 c["storage_classes_desired"] = self.storage_classes
1069 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1070 except Exception as e:
1073 except apiclient_errors.Error as error:
1074 _logger.error(error)
1075 raise llfuse.FUSEError(errno.EEXIST)
1079 def rmdir(self, name):
1080 if not self.writable():
1081 raise llfuse.FUSEError(errno.EROFS)
1083 if name not in self:
1084 raise llfuse.FUSEError(errno.ENOENT)
1085 if not isinstance(self[name], CollectionDirectory):
1086 raise llfuse.FUSEError(errno.EPERM)
1087 if len(self[name]) > 0:
1088 raise llfuse.FUSEError(errno.ENOTEMPTY)
1089 with llfuse.lock_released:
1090 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1095 def rename(self, name_old, name_new, src):
1096 if not self.writable():
1097 raise llfuse.FUSEError(errno.EROFS)
1099 if not isinstance(src, ProjectDirectory):
1100 raise llfuse.FUSEError(errno.EPERM)
1104 if not isinstance(ent, CollectionDirectory):
1105 raise llfuse.FUSEError(errno.EPERM)
1107 if name_new in self:
1108 # POSIX semantics for replacing one directory with another is
1109 # tricky (the target directory must be empty, the operation must be
1110 # atomic which isn't possible with the Arvados API as of this
1111 # writing) so don't support that.
1112 raise llfuse.FUSEError(errno.EPERM)
1114 self.api.collections().update(uuid=ent.uuid(),
1115 body={"owner_uuid": self.uuid(),
1116 "name": name_new}).execute(num_retries=self.num_retries)
1118 # Acually move the entry from source directory to this directory.
1119 del src._entries[name_old]
1120 self._entries[name_new] = ent
1121 self.inodes.invalidate_entry(src, name_old)
1124 def child_event(self, ev):
1125 properties = ev.get("properties") or {}
1126 old_attrs = properties.get("old_attributes") or {}
1127 new_attrs = properties.get("new_attributes") or {}
1128 old_attrs["uuid"] = ev["object_uuid"]
1129 new_attrs["uuid"] = ev["object_uuid"]
1130 old_name = self.sanitize_filename(self.namefn(old_attrs))
1131 new_name = self.sanitize_filename(self.namefn(new_attrs))
1133 # create events will have a new name, but not an old name
1134 # delete events will have an old name, but not a new name
1135 # update events will have an old and new name, and they may be same or different
1136 # if they are the same, an unrelated field changed and there is nothing to do.
1138 if old_attrs.get("owner_uuid") != self.project_uuid:
1139 # Was moved from somewhere else, so don't try to remove entry.
1141 if ev.get("object_owner_uuid") != self.project_uuid:
1142 # Was moved to somewhere else, so don't try to add entry
1145 if old_attrs.get("is_trashed"):
1146 # Was previously deleted
1148 if new_attrs.get("is_trashed"):
1152 if new_name != old_name:
1154 if old_name in self._entries:
1155 ent = self._entries[old_name]
1156 del self._entries[old_name]
1157 self.inodes.invalidate_entry(self, old_name)
1161 self._entries[new_name] = ent
1163 self._add_entry(new_attrs, new_name)
1164 elif ent is not None:
1165 self.inodes.del_entry(ent)
1168 class SharedDirectory(Directory):
1169 """A special directory that represents users or groups who have shared projects with me."""
1171 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, exclude,
1172 poll=False, poll_time=60, storage_classes=None):
1173 super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
1175 self.num_retries = num_retries
1176 self.current_user = api.users().current().execute(num_retries=num_retries)
1178 self._poll_time = poll_time
1179 self._updating_lock = threading.Lock()
1180 self.storage_classes = storage_classes
1185 with llfuse.lock_released:
1186 self._updating_lock.acquire()
1187 if not self.stale():
1195 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1196 if 'httpMethod' in methods.get('shared', {}):
1199 resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1203 include="owner_uuid").execute()
1204 if not resp["items"]:
1206 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1207 for r in resp["items"]:
1208 objects[r["uuid"]] = r
1209 roots.append(r["uuid"])
1210 for r in resp["included"]:
1211 objects[r["uuid"]] = r
1212 root_owners.add(r["uuid"])
1214 all_projects = list(arvados.util.keyset_list_all(
1215 self.api.groups().list,
1217 num_retries=self.num_retries,
1218 filters=[['group_class','in',['project','filter']]],
1219 select=["uuid", "owner_uuid"]))
1220 for ob in all_projects:
1221 objects[ob['uuid']] = ob
1223 current_uuid = self.current_user['uuid']
1224 for ob in all_projects:
1225 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1226 roots.append(ob['uuid'])
1227 root_owners.add(ob['owner_uuid'])
1229 lusers = arvados.util.keyset_list_all(
1230 self.api.users().list,
1232 num_retries=self.num_retries,
1233 filters=[['uuid','in', list(root_owners)]])
1234 lgroups = arvados.util.keyset_list_all(
1235 self.api.groups().list,
1237 num_retries=self.num_retries,
1238 filters=[['uuid','in', list(root_owners)+roots]])
1241 objects[l["uuid"]] = l
1243 objects[l["uuid"]] = l
1245 for r in root_owners:
1249 contents[obr["name"]] = obr
1250 elif "first_name" in obr:
1251 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1256 if obr['owner_uuid'] not in objects:
1257 contents[obr["name"]] = obr
1259 # end with llfuse.lock_released, re-acquire lock
1261 self.merge(contents.items(),
1263 lambda a, i: a.uuid() == i[1]['uuid'],
1264 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
1265 i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1267 _logger.exception("arv-mount shared dir error")
1269 self._updating_lock.release()
1271 def want_event_subscribe(self):