1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
15 from apiclient import errors as apiclient_errors
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
23 _logger = logging.getLogger('arvados.arvados_fuse')
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
32 class Directory(FreshBase):
33 """Generic directory object, backed by a dict.
35 Consists of a set of entries with the key representing the filename
36 and the value referencing a File or Directory object.
39 def __init__(self, parent_inode, inodes, apiconfig, enable_write):
40 """parent_inode is the integer inode number"""
42 super(Directory, self).__init__()
45 if not isinstance(parent_inode, int):
46 raise Exception("parent_inode should be an int")
47 self.parent_inode = parent_inode
49 self.apiconfig = apiconfig
51 self._mtime = time.time()
52 self._enable_write = enable_write
54 def forward_slash_subst(self):
55 if not hasattr(self, '_fsns'):
57 config = self.apiconfig()
59 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
61 # old API server with no FSNS config
64 if self._fsns == '' or self._fsns == '/':
68 def unsanitize_filename(self, incoming):
69 """Replace ForwardSlashNameSubstitution value with /"""
70 fsns = self.forward_slash_subst()
71 if isinstance(fsns, str):
72 return incoming.replace(fsns, '/')
76 def sanitize_filename(self, dirty):
77 """Replace disallowed filename characters according to
78 ForwardSlashNameSubstitution in self.api_config."""
79 # '.' and '..' are not reachable if API server is newer than #6277
89 fsns = self.forward_slash_subst()
90 if isinstance(fsns, str):
91 dirty = dirty.replace('/', fsns)
92 return _disallowed_filename_characters.sub('_', dirty)
95 # Overridden by subclasses to implement logic to update the
96 # entries dict when the directory is stale
101 # Only used when computing the size of the disk footprint of the directory
109 def checkupdate(self):
113 except apiclient.errors.HttpError as e:
118 def __getitem__(self, item):
119 return self._entries[item]
124 return list(self._entries.items())
128 def __contains__(self, k):
129 return k in self._entries
134 return len(self._entries)
137 self.inodes.touch(self)
138 super(Directory, self).fresh()
140 def merge(self, items, fn, same, new_entry):
141 """Helper method for updating the contents of the directory.
143 Takes a list describing the new contents of the directory, reuse
144 entries that are the same in both the old and new lists, create new
145 entries, and delete old entries missing from the new list.
147 :items: iterable with new directory contents
149 :fn: function to take an entry in 'items' and return the desired file or
150 directory name, or None if this entry should be skipped
152 :same: function to compare an existing entry (a File or Directory
153 object) with an entry in the items list to determine whether to keep
156 :new_entry: function to create a new directory entry (File or Directory
157 object) from an entry in the items list.
161 oldentries = self._entries
165 name = self.sanitize_filename(fn(i))
167 if name in oldentries and same(oldentries[name], i):
168 # move existing directory entry over
169 self._entries[name] = oldentries[name]
172 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
173 # create new directory entry
176 self._entries[name] = self.inodes.add_entry(ent)
179 # delete any other directory entries that were not in found in 'items'
181 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
182 self.inodes.invalidate_entry(self, i)
183 self.inodes.del_entry(oldentries[i])
187 self.inodes.invalidate_inode(self)
188 self._mtime = time.time()
193 if super(Directory, self).in_use():
195 for v in self._entries.values():
200 def has_ref(self, only_children):
201 if super(Directory, self).has_ref(only_children):
203 for v in self._entries.values():
209 """Delete all entries"""
210 oldentries = self._entries
213 oldentries[n].clear()
214 self.inodes.del_entry(oldentries[n])
217 def kernel_invalidate(self):
218 # Invalidating the dentry on the parent implies invalidating all paths
220 parent = self.inodes[self.parent_inode]
222 # Find self on the parent in order to invalidate this path.
223 # Calling the public items() method might trigger a refresh,
224 # which we definitely don't want, so read the internal dict directly.
225 for k,v in parent._entries.items():
227 self.inodes.invalidate_entry(parent, k)
239 def want_event_subscribe(self):
240 raise NotImplementedError()
242 def create(self, name):
243 raise NotImplementedError()
245 def mkdir(self, name):
246 raise NotImplementedError()
248 def unlink(self, name):
249 raise NotImplementedError()
251 def rmdir(self, name):
252 raise NotImplementedError()
254 def rename(self, name_old, name_new, src):
255 raise NotImplementedError()
258 class CollectionDirectoryBase(Directory):
259 """Represent an Arvados Collection as a directory.
261 This class is used for Subcollections, and is also the base class for
262 CollectionDirectory, which implements collection loading/saving on
265 Most operations act only the underlying Arvados `Collection` object. The
266 `Collection` object signals via a notify callback to
267 `CollectionDirectoryBase.on_event` that an item was added, removed or
268 modified. FUSE inodes and directory entries are created, deleted or
269 invalidated in response to these events.
273 def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection, collection_root):
274 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write)
275 self.apiconfig = apiconfig
276 self.collection = collection
277 self.collection_root = collection_root
278 self.collection_record_file = None
280 def new_entry(self, name, item, mtime):
281 name = self.sanitize_filename(name)
282 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
283 if item.fuse_entry.dead is not True:
284 raise Exception("Can only reparent dead inode entry")
285 if item.fuse_entry.inode is None:
286 raise Exception("Reparented entry must still have valid inode")
287 item.fuse_entry.dead = False
288 self._entries[name] = item.fuse_entry
289 elif isinstance(item, arvados.collection.RichCollectionBase):
290 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item, self.collection_root))
291 self._entries[name].populate(mtime)
293 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
294 item.fuse_entry = self._entries[name]
296 def on_event(self, event, collection, name, item):
297 # These are events from the Collection object (ADD/DEL/MOD)
298 # emitted by operations on the Collection object (like
299 # "mkdirs" or "remove"), and by "update", which we need to
300 # synchronize with our FUSE objects that are assigned inodes.
301 if collection == self.collection:
302 name = self.sanitize_filename(name)
305 # It's possible for another thread to have llfuse.lock and
306 # be waiting on collection.lock. Meanwhile, we released
307 # llfuse.lock earlier in the stack, but are still holding
308 # on to the collection lock, and now we need to re-acquire
309 # llfuse.lock. If we don't release the collection lock,
310 # we'll deadlock where we're holding the collection lock
311 # waiting for llfuse.lock and the other thread is holding
312 # llfuse.lock and waiting for the collection lock.
314 # The correct locking order here is to take llfuse.lock
315 # first, then the collection lock.
317 # Since collection.lock is an RLock, it might be locked
318 # multiple times, so we need to release it multiple times,
319 # keep a count, then re-lock it the correct number of
325 self.collection.lock.release()
332 with self.collection.lock:
333 if event == arvados.collection.ADD:
334 self.new_entry(name, item, self.mtime())
335 elif event == arvados.collection.DEL:
336 ent = self._entries[name]
337 del self._entries[name]
338 self.inodes.invalidate_entry(self, name)
339 self.inodes.del_entry(ent)
340 elif event == arvados.collection.MOD:
341 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
342 self.inodes.invalidate_inode(item.fuse_entry)
343 elif name in self._entries:
344 self.inodes.invalidate_inode(self._entries[name])
346 if self.collection_record_file is not None:
347 self.collection_record_file.invalidate()
348 self.inodes.invalidate_inode(self.collection_record_file)
351 self.collection.lock.acquire()
354 def populate(self, mtime):
356 with self.collection.lock:
357 self.collection.subscribe(self.on_event)
358 for entry, item in self.collection.items():
359 self.new_entry(entry, item, self.mtime())
362 return self._enable_write and self.collection.writable()
366 self.collection_root.flush()
370 def create(self, name):
371 if not self.writable():
372 raise llfuse.FUSEError(errno.EROFS)
373 with llfuse.lock_released:
374 self.collection.open(name, "w").close()
378 def mkdir(self, name):
379 if not self.writable():
380 raise llfuse.FUSEError(errno.EROFS)
381 with llfuse.lock_released:
382 self.collection.mkdirs(name)
386 def unlink(self, name):
387 if not self.writable():
388 raise llfuse.FUSEError(errno.EROFS)
389 with llfuse.lock_released:
390 self.collection.remove(name)
395 def rmdir(self, name):
396 if not self.writable():
397 raise llfuse.FUSEError(errno.EROFS)
398 with llfuse.lock_released:
399 self.collection.remove(name)
404 def rename(self, name_old, name_new, src):
405 if not self.writable():
406 raise llfuse.FUSEError(errno.EROFS)
408 if not isinstance(src, CollectionDirectoryBase):
409 raise llfuse.FUSEError(errno.EPERM)
414 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
416 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
418 raise llfuse.FUSEError(errno.ENOTEMPTY)
419 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
420 raise llfuse.FUSEError(errno.ENOTDIR)
421 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
422 raise llfuse.FUSEError(errno.EISDIR)
424 with llfuse.lock_released:
425 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
430 super(CollectionDirectoryBase, self).clear()
431 self.collection = None
434 class CollectionDirectory(CollectionDirectoryBase):
435 """Represents the root of a directory tree representing a collection."""
437 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None):
438 super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self)
440 self.num_retries = num_retries
443 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
445 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
446 self._poll_time = 60*60
448 if isinstance(collection_record, dict):
449 self.collection_locator = collection_record['uuid']
450 self._mtime = convertTime(collection_record.get('modified_at'))
452 self.collection_locator = collection_record
454 self._manifest_size = 0
455 if self.collection_locator:
456 self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
457 self._updating_lock = threading.Lock()
460 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
463 return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
467 if not self.writable():
469 with llfuse.lock_released:
470 with self._updating_lock:
471 if self.collection.committed():
472 self.collection.update()
474 self.collection.save()
475 self.new_collection_record(self.collection.api_response())
477 def want_event_subscribe(self):
478 return (uuid_pattern.match(self.collection_locator) is not None)
480 def new_collection(self, new_collection_record, coll_reader):
483 self.collection = coll_reader
484 self.new_collection_record(new_collection_record)
485 self.populate(self.mtime())
487 def new_collection_record(self, new_collection_record):
488 if not new_collection_record:
489 raise Exception("invalid new_collection_record")
490 self._mtime = convertTime(new_collection_record.get('modified_at'))
491 self._manifest_size = len(new_collection_record["manifest_text"])
492 self.collection_locator = new_collection_record["uuid"]
493 if self.collection_record_file is not None:
494 self.collection_record_file.invalidate()
495 self.inodes.invalidate_inode(self.collection_record_file)
496 _logger.debug("%s invalidated collection record file", self)
500 return self.collection_locator
505 if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
506 # It's immutable, nothing to update
509 if self.collection_locator is None:
510 # No collection locator to retrieve from
514 new_collection_record = None
516 with llfuse.lock_released:
517 self._updating_lock.acquire()
521 _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
523 if self.collection is not None:
524 # Already have a collection object
525 self.collection.update()
526 new_collection_record = self.collection.api_response()
528 # If there's too many prefetch threads and you
529 # max out the CPU, delivering data to the FUSE
530 # layer actually ends up being slower.
531 # Experimentally, capping 7 threads seems to
533 get_threads = min(max((self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
534 # Create a new collection object
535 if uuid_pattern.match(self.collection_locator):
536 coll_reader = arvados.collection.Collection(
537 self.collection_locator, self.api, self.api.keep,
538 num_retries=self.num_retries,
539 get_threads=get_threads)
541 coll_reader = arvados.collection.CollectionReader(
542 self.collection_locator, self.api, self.api.keep,
543 num_retries=self.num_retries,
544 get_threads=get_threads)
545 new_collection_record = coll_reader.api_response() or {}
546 # If the Collection only exists in Keep, there will be no API
547 # response. Fill in the fields we need.
548 if 'uuid' not in new_collection_record:
549 new_collection_record['uuid'] = self.collection_locator
550 if "portable_data_hash" not in new_collection_record:
551 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
552 if 'manifest_text' not in new_collection_record:
553 new_collection_record['manifest_text'] = coll_reader.manifest_text()
554 if 'storage_classes_desired' not in new_collection_record:
555 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
557 # end with llfuse.lock_released, re-acquire lock
559 if new_collection_record is not None:
560 if coll_reader is not None:
561 self.new_collection(new_collection_record, coll_reader)
563 self.new_collection_record(new_collection_record)
567 self._updating_lock.release()
568 except arvados.errors.NotFoundError as e:
569 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
570 except arvados.errors.ArgumentError as detail:
571 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
572 if new_collection_record is not None and "manifest_text" in new_collection_record:
573 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
575 _logger.exception("arv-mount %s: error", self.collection_locator)
576 if new_collection_record is not None and "manifest_text" in new_collection_record:
577 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
582 def collection_record(self):
584 return self.collection.api_response()
588 def __getitem__(self, item):
589 if item == '.arvados#collection':
590 if self.collection_record_file is None:
591 self.collection_record_file = FuncToJSONFile(
592 self.inode, self.collection_record)
593 self.inodes.add_entry(self.collection_record_file)
594 self.invalidate() # use lookup as a signal to force update
595 return self.collection_record_file
597 return super(CollectionDirectory, self).__getitem__(item)
599 def __contains__(self, k):
600 if k == '.arvados#collection':
603 return super(CollectionDirectory, self).__contains__(k)
605 def invalidate(self):
606 if self.collection_record_file is not None:
607 self.collection_record_file.invalidate()
608 self.inodes.invalidate_inode(self.collection_record_file)
609 super(CollectionDirectory, self).invalidate()
612 return (self.collection_locator is not None)
615 # This is an empirically-derived heuristic to estimate the memory used
616 # to store this collection's metadata. Calculating the memory
617 # footprint directly would be more accurate, but also more complicated.
618 return self._manifest_size * 128
621 if self.collection is not None:
623 self.collection.save()
624 self.collection.stop_threads()
627 if self.collection is not None:
628 self.collection.stop_threads()
629 super(CollectionDirectory, self).clear()
630 self._manifest_size = 0
633 class TmpCollectionDirectory(CollectionDirectoryBase):
634 """A directory backed by an Arvados collection that never gets saved.
636 This supports using Keep as scratch space. A userspace program can
637 read the .arvados#collection file to get a current manifest in
638 order to save a snapshot of the scratch data or use it as a crunch
642 class UnsaveableCollection(arvados.collection.Collection):
648 def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, storage_classes=None):
649 collection = self.UnsaveableCollection(
650 api_client=api_client,
651 keep_client=api_client.keep,
652 num_retries=num_retries,
653 storage_classes_desired=storage_classes)
654 # This is always enable_write=True because it never tries to
655 # save to the backend
656 super(TmpCollectionDirectory, self).__init__(
657 parent_inode, inodes, api_client.config, True, collection, self)
658 self.populate(self.mtime())
660 def on_event(self, *args, **kwargs):
661 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
662 if self.collection_record_file is None:
665 # See discussion in CollectionDirectoryBase.on_event
669 self.collection.lock.release()
676 with self.collection.lock:
677 self.collection_record_file.invalidate()
678 self.inodes.invalidate_inode(self.collection_record_file)
679 _logger.debug("%s invalidated collection record", self)
682 self.collection.lock.acquire()
685 def collection_record(self):
686 with llfuse.lock_released:
689 "manifest_text": self.collection.manifest_text(),
690 "portable_data_hash": self.collection.portable_data_hash(),
691 "storage_classes_desired": self.collection.storage_classes_desired(),
694 def __contains__(self, k):
695 return (k == '.arvados#collection' or
696 super(TmpCollectionDirectory, self).__contains__(k))
699 def __getitem__(self, item):
700 if item == '.arvados#collection':
701 if self.collection_record_file is None:
702 self.collection_record_file = FuncToJSONFile(
703 self.inode, self.collection_record)
704 self.inodes.add_entry(self.collection_record_file)
705 return self.collection_record_file
706 return super(TmpCollectionDirectory, self).__getitem__(item)
717 def want_event_subscribe(self):
721 self.collection.stop_threads()
723 def invalidate(self):
724 if self.collection_record_file:
725 self.collection_record_file.invalidate()
726 super(TmpCollectionDirectory, self).invalidate()
729 class MagicDirectory(Directory):
730 """A special directory that logically contains the set of all extant keep locators.
732 When a file is referenced by lookup(), it is tested to see if it is a valid
733 keep locator to a manifest, and if so, loads the manifest contents as a
734 subdirectory of this directory with the locator as the directory name.
735 Since querying a list of all extant keep locators is impractical, only
736 collections that have already been accessed are visible to readdir().
741 This directory provides access to Arvados collections as subdirectories listed
742 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
743 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
744 (in the form 'zzzzz-j7d0g-1234567890abcde').
746 Note that this directory will appear empty until you attempt to access a
747 specific collection or project subdirectory (such as trying to 'cd' into it),
748 at which point the collection or project will actually be looked up on the server
749 and the directory will appear if it exists.
753 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, pdh_only=False, storage_classes=None):
754 super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
756 self.num_retries = num_retries
757 self.pdh_only = pdh_only
758 self.storage_classes = storage_classes
760 def __setattr__(self, name, value):
761 super(MagicDirectory, self).__setattr__(name, value)
762 # When we're assigned an inode, add a README.
763 if ((name == 'inode') and (self.inode is not None) and
764 (not self._entries)):
765 self._entries['README'] = self.inodes.add_entry(
766 StringFile(self.inode, self.README_TEXT, time.time()))
767 # If we're the root directory, add an identical by_id subdirectory.
768 if self.inode == llfuse.ROOT_INODE:
769 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
770 self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
773 def __contains__(self, k):
774 if k in self._entries:
777 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
783 if group_uuid_pattern.match(k):
784 project = self.api.groups().list(
785 filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
786 if project[u'items_available'] == 0:
788 e = self.inodes.add_entry(ProjectDirectory(
789 self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
790 project[u'items'][0], storage_classes=self.storage_classes))
792 e = self.inodes.add_entry(CollectionDirectory(
793 self.inode, self.inodes, self.api, self.num_retries, self._enable_write, k))
796 if k not in self._entries:
799 self.inodes.del_entry(e)
802 self.inodes.invalidate_entry(self, k)
803 self.inodes.del_entry(e)
805 except Exception as ex:
806 _logger.exception("arv-mount lookup '%s':", k)
808 self.inodes.del_entry(e)
811 def __getitem__(self, item):
813 return self._entries[item]
815 raise KeyError("No collection with id " + item)
820 def want_event_subscribe(self):
821 return not self.pdh_only
824 class TagsDirectory(Directory):
825 """A special directory that contains as subdirectories all tags visible to the user."""
827 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, poll_time=60):
828 super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
830 self.num_retries = num_retries
832 self._poll_time = poll_time
835 def want_event_subscribe(self):
840 with llfuse.lock_released:
841 tags = self.api.links().list(
842 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
843 select=['name'], distinct=True, limit=1000
844 ).execute(num_retries=self.num_retries)
846 self.merge(tags['items']+[{"name": n} for n in self._extra],
848 lambda a, i: a.tag == i['name'],
849 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
850 i['name'], poll=self._poll, poll_time=self._poll_time))
854 def __getitem__(self, item):
855 if super(TagsDirectory, self).__contains__(item):
856 return super(TagsDirectory, self).__getitem__(item)
857 with llfuse.lock_released:
858 tags = self.api.links().list(
859 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
860 ).execute(num_retries=self.num_retries)
862 self._extra.add(item)
864 return super(TagsDirectory, self).__getitem__(item)
868 def __contains__(self, k):
869 if super(TagsDirectory, self).__contains__(k):
879 class TagDirectory(Directory):
880 """A special directory that contains as subdirectories all collections visible
881 to the user that are tagged with a particular tag.
884 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, tag,
885 poll=False, poll_time=60):
886 super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
888 self.num_retries = num_retries
891 self._poll_time = poll_time
893 def want_event_subscribe(self):
898 with llfuse.lock_released:
899 taggedcollections = self.api.links().list(
900 filters=[['link_class', '=', 'tag'],
901 ['name', '=', self.tag],
902 ['head_uuid', 'is_a', 'arvados#collection']],
904 ).execute(num_retries=self.num_retries)
905 self.merge(taggedcollections['items'],
906 lambda i: i['head_uuid'],
907 lambda a, i: a.collection_locator == i['head_uuid'],
908 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid']))
911 class ProjectDirectory(Directory):
912 """A special directory that contains the contents of a project."""
914 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, project_object,
915 poll=True, poll_time=3, storage_classes=None):
916 super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
918 self.num_retries = num_retries
919 self.project_object = project_object
920 self.project_object_file = None
921 self.project_uuid = project_object['uuid']
923 self._poll_time = poll_time
924 self._updating_lock = threading.Lock()
925 self._current_user = None
926 self._full_listing = False
927 self.storage_classes = storage_classes
929 def want_event_subscribe(self):
932 def createDirectory(self, i):
933 if collection_uuid_pattern.match(i['uuid']):
934 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i)
935 elif group_uuid_pattern.match(i['uuid']):
936 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
937 i, self._poll, self._poll_time, self.storage_classes)
938 elif link_uuid_pattern.match(i['uuid']):
939 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
940 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid'])
943 elif uuid_pattern.match(i['uuid']):
944 return ObjectFile(self.parent_inode, i)
949 return self.project_uuid
952 self._full_listing = True
953 return super(ProjectDirectory, self).items()
957 if i['name'] is None or len(i['name']) == 0:
959 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
960 # collection or subproject
962 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
965 elif 'kind' in i and i['kind'].startswith('arvados#'):
967 return "{}.{}".format(i['name'], i['kind'][8:])
974 if self.project_object_file == None:
975 self.project_object_file = ObjectFile(self.inode, self.project_object)
976 self.inodes.add_entry(self.project_object_file)
978 if not self._full_listing:
982 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
983 return a.uuid() == i['uuid']
984 elif isinstance(a, ObjectFile):
985 return a.uuid() == i['uuid'] and not a.stale()
989 with llfuse.lock_released:
990 self._updating_lock.acquire()
994 if group_uuid_pattern.match(self.project_uuid):
995 self.project_object = self.api.groups().get(
996 uuid=self.project_uuid).execute(num_retries=self.num_retries)
997 elif user_uuid_pattern.match(self.project_uuid):
998 self.project_object = self.api.users().get(
999 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1000 # do this in 2 steps until #17424 is fixed
1001 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
1003 num_retries=self.num_retries,
1004 uuid=self.project_uuid,
1005 filters=[["uuid", "is_a", "arvados#group"],
1006 ["groups.group_class", "in", ["project","filter"]]]))
1007 contents.extend(filter(lambda i: i["current_version_uuid"] == i["uuid"],
1008 arvados.util.keyset_list_all(self.api.groups().contents,
1010 num_retries=self.num_retries,
1011 uuid=self.project_uuid,
1012 filters=[["uuid", "is_a", "arvados#collection"]])))
1015 # end with llfuse.lock_released, re-acquire lock
1017 self.merge(contents,
1020 self.createDirectory)
1023 self._updating_lock.release()
1025 def _add_entry(self, i, name):
1026 ent = self.createDirectory(i)
1027 self._entries[name] = self.inodes.add_entry(ent)
1028 return self._entries[name]
1032 def __getitem__(self, k):
1033 if k == '.arvados#project':
1034 return self.project_object_file
1035 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1036 return super(ProjectDirectory, self).__getitem__(k)
1037 with llfuse.lock_released:
1038 k2 = self.unsanitize_filename(k)
1040 namefilter = ["name", "=", k]
1042 namefilter = ["name", "in", [k, k2]]
1043 contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
1044 ["group_class", "in", ["project","filter"]],
1046 limit=2).execute(num_retries=self.num_retries)["items"]
1048 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
1050 limit=2).execute(num_retries=self.num_retries)["items"]
1052 if len(contents) > 1 and contents[1]['name'] == k:
1053 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1055 contents = [contents[1]]
1056 name = self.sanitize_filename(self.namefn(contents[0]))
1059 return self._add_entry(contents[0], name)
1064 def __contains__(self, k):
1065 if k == '.arvados#project':
1077 if not self._enable_write:
1079 with llfuse.lock_released:
1080 if not self._current_user:
1081 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1082 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1084 def persisted(self):
1089 def mkdir(self, name):
1090 if not self.writable():
1091 raise llfuse.FUSEError(errno.EROFS)
1094 with llfuse.lock_released:
1096 "owner_uuid": self.project_uuid,
1098 "manifest_text": "" }
1099 if self.storage_classes is not None:
1100 c["storage_classes_desired"] = self.storage_classes
1102 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1103 except Exception as e:
1106 except apiclient_errors.Error as error:
1107 _logger.error(error)
1108 raise llfuse.FUSEError(errno.EEXIST)
1112 def rmdir(self, name):
1113 if not self.writable():
1114 raise llfuse.FUSEError(errno.EROFS)
1116 if name not in self:
1117 raise llfuse.FUSEError(errno.ENOENT)
1118 if not isinstance(self[name], CollectionDirectory):
1119 raise llfuse.FUSEError(errno.EPERM)
1120 if len(self[name]) > 0:
1121 raise llfuse.FUSEError(errno.ENOTEMPTY)
1122 with llfuse.lock_released:
1123 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1128 def rename(self, name_old, name_new, src):
1129 if not self.writable():
1130 raise llfuse.FUSEError(errno.EROFS)
1132 if not isinstance(src, ProjectDirectory):
1133 raise llfuse.FUSEError(errno.EPERM)
1137 if not isinstance(ent, CollectionDirectory):
1138 raise llfuse.FUSEError(errno.EPERM)
1140 if name_new in self:
1141 # POSIX semantics for replacing one directory with another is
1142 # tricky (the target directory must be empty, the operation must be
1143 # atomic which isn't possible with the Arvados API as of this
1144 # writing) so don't support that.
1145 raise llfuse.FUSEError(errno.EPERM)
1147 self.api.collections().update(uuid=ent.uuid(),
1148 body={"owner_uuid": self.uuid(),
1149 "name": name_new}).execute(num_retries=self.num_retries)
1151 # Acually move the entry from source directory to this directory.
1152 del src._entries[name_old]
1153 self._entries[name_new] = ent
1154 self.inodes.invalidate_entry(src, name_old)
1157 def child_event(self, ev):
1158 properties = ev.get("properties") or {}
1159 old_attrs = properties.get("old_attributes") or {}
1160 new_attrs = properties.get("new_attributes") or {}
1161 old_attrs["uuid"] = ev["object_uuid"]
1162 new_attrs["uuid"] = ev["object_uuid"]
1163 old_name = self.sanitize_filename(self.namefn(old_attrs))
1164 new_name = self.sanitize_filename(self.namefn(new_attrs))
1166 # create events will have a new name, but not an old name
1167 # delete events will have an old name, but not a new name
1168 # update events will have an old and new name, and they may be same or different
1169 # if they are the same, an unrelated field changed and there is nothing to do.
1171 if old_attrs.get("owner_uuid") != self.project_uuid:
1172 # Was moved from somewhere else, so don't try to remove entry.
1174 if ev.get("object_owner_uuid") != self.project_uuid:
1175 # Was moved to somewhere else, so don't try to add entry
1178 if old_attrs.get("is_trashed"):
1179 # Was previously deleted
1181 if new_attrs.get("is_trashed"):
1185 if new_name != old_name:
1187 if old_name in self._entries:
1188 ent = self._entries[old_name]
1189 del self._entries[old_name]
1190 self.inodes.invalidate_entry(self, old_name)
1194 self._entries[new_name] = ent
1196 self._add_entry(new_attrs, new_name)
1197 elif ent is not None:
1198 self.inodes.del_entry(ent)
1201 class SharedDirectory(Directory):
1202 """A special directory that represents users or groups who have shared projects with me."""
1204 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, exclude,
1205 poll=False, poll_time=60, storage_classes=None):
1206 super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
1208 self.num_retries = num_retries
1209 self.current_user = api.users().current().execute(num_retries=num_retries)
1211 self._poll_time = poll_time
1212 self._updating_lock = threading.Lock()
1213 self.storage_classes = storage_classes
1218 with llfuse.lock_released:
1219 self._updating_lock.acquire()
1220 if not self.stale():
1228 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1229 if 'httpMethod' in methods.get('shared', {}):
1232 resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1236 include="owner_uuid").execute()
1237 if not resp["items"]:
1239 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1240 for r in resp["items"]:
1241 objects[r["uuid"]] = r
1242 roots.append(r["uuid"])
1243 for r in resp["included"]:
1244 objects[r["uuid"]] = r
1245 root_owners.add(r["uuid"])
1247 all_projects = list(arvados.util.keyset_list_all(
1248 self.api.groups().list,
1250 num_retries=self.num_retries,
1251 filters=[['group_class','in',['project','filter']]],
1252 select=["uuid", "owner_uuid"]))
1253 for ob in all_projects:
1254 objects[ob['uuid']] = ob
1256 current_uuid = self.current_user['uuid']
1257 for ob in all_projects:
1258 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1259 roots.append(ob['uuid'])
1260 root_owners.add(ob['owner_uuid'])
1262 lusers = arvados.util.keyset_list_all(
1263 self.api.users().list,
1265 num_retries=self.num_retries,
1266 filters=[['uuid','in', list(root_owners)]])
1267 lgroups = arvados.util.keyset_list_all(
1268 self.api.groups().list,
1270 num_retries=self.num_retries,
1271 filters=[['uuid','in', list(root_owners)+roots]])
1274 objects[l["uuid"]] = l
1276 objects[l["uuid"]] = l
1278 for r in root_owners:
1282 contents[obr["name"]] = obr
1283 elif "first_name" in obr:
1284 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1289 if obr['owner_uuid'] not in objects:
1290 contents[obr["name"]] = obr
1292 # end with llfuse.lock_released, re-acquire lock
1294 self.merge(contents.items(),
1296 lambda a, i: a.uuid() == i[1]['uuid'],
1297 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
1298 i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1300 _logger.exception("arv-mount shared dir error")
1302 self._updating_lock.release()
1304 def want_event_subscribe(self):