1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
15 from apiclient import errors as apiclient_errors
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
23 _logger = logging.getLogger('arvados.arvados_fuse')
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
32 class Directory(FreshBase):
33 """Generic directory object, backed by a dict.
35 Consists of a set of entries with the key representing the filename
36 and the value referencing a File or Directory object.
39 def __init__(self, parent_inode, inodes, apiconfig, enable_write):
40 """parent_inode is the integer inode number"""
42 super(Directory, self).__init__()
45 if not isinstance(parent_inode, int):
46 raise Exception("parent_inode should be an int")
47 self.parent_inode = parent_inode
49 self.apiconfig = apiconfig
51 self._mtime = time.time()
52 self._enable_write = enable_write
54 def forward_slash_subst(self):
55 if not hasattr(self, '_fsns'):
57 config = self.apiconfig()
59 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
61 # old API server with no FSNS config
64 if self._fsns == '' or self._fsns == '/':
68 def unsanitize_filename(self, incoming):
69 """Replace ForwardSlashNameSubstitution value with /"""
70 fsns = self.forward_slash_subst()
71 if isinstance(fsns, str):
72 return incoming.replace(fsns, '/')
76 def sanitize_filename(self, dirty):
77 """Replace disallowed filename characters according to
78 ForwardSlashNameSubstitution in self.api_config."""
79 # '.' and '..' are not reachable if API server is newer than #6277
89 fsns = self.forward_slash_subst()
90 if isinstance(fsns, str):
91 dirty = dirty.replace('/', fsns)
92 return _disallowed_filename_characters.sub('_', dirty)
95 # Overridden by subclasses to implement logic to update the
96 # entries dict when the directory is stale
101 # Only used when computing the size of the disk footprint of the directory
109 def checkupdate(self):
113 except apiclient.errors.HttpError as e:
118 def __getitem__(self, item):
119 return self._entries[item]
124 return list(self._entries.items())
128 def __contains__(self, k):
129 return k in self._entries
134 return len(self._entries)
137 self.inodes.touch(self)
138 super(Directory, self).fresh()
140 def merge(self, items, fn, same, new_entry):
141 """Helper method for updating the contents of the directory.
143 Takes a list describing the new contents of the directory, reuse
144 entries that are the same in both the old and new lists, create new
145 entries, and delete old entries missing from the new list.
147 :items: iterable with new directory contents
149 :fn: function to take an entry in 'items' and return the desired file or
150 directory name, or None if this entry should be skipped
152 :same: function to compare an existing entry (a File or Directory
153 object) with an entry in the items list to determine whether to keep
156 :new_entry: function to create a new directory entry (File or Directory
157 object) from an entry in the items list.
161 oldentries = self._entries
165 name = self.sanitize_filename(fn(i))
167 if name in oldentries and same(oldentries[name], i):
168 # move existing directory entry over
169 self._entries[name] = oldentries[name]
172 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
173 # create new directory entry
176 self._entries[name] = self.inodes.add_entry(ent)
179 # delete any other directory entries that were not in found in 'items'
181 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
182 self.inodes.invalidate_entry(self, i)
183 self.inodes.del_entry(oldentries[i])
187 self.inodes.invalidate_inode(self)
188 self._mtime = time.time()
193 if super(Directory, self).in_use():
195 for v in self._entries.values():
200 def has_ref(self, only_children):
201 if super(Directory, self).has_ref(only_children):
203 for v in self._entries.values():
209 """Delete all entries"""
210 oldentries = self._entries
213 oldentries[n].clear()
214 self.inodes.del_entry(oldentries[n])
217 def kernel_invalidate(self):
218 # Invalidating the dentry on the parent implies invalidating all paths
220 parent = self.inodes[self.parent_inode]
222 # Find self on the parent in order to invalidate this path.
223 # Calling the public items() method might trigger a refresh,
224 # which we definitely don't want, so read the internal dict directly.
225 for k,v in parent._entries.items():
227 self.inodes.invalidate_entry(parent, k)
239 def want_event_subscribe(self):
240 raise NotImplementedError()
242 def create(self, name):
243 raise NotImplementedError()
245 def mkdir(self, name):
246 raise NotImplementedError()
248 def unlink(self, name):
249 raise NotImplementedError()
251 def rmdir(self, name):
252 raise NotImplementedError()
254 def rename(self, name_old, name_new, src):
255 raise NotImplementedError()
258 class CollectionDirectoryBase(Directory):
259 """Represent an Arvados Collection as a directory.
261 This class is used for Subcollections, and is also the base class for
262 CollectionDirectory, which implements collection loading/saving on
265 Most operations act only the underlying Arvados `Collection` object. The
266 `Collection` object signals via a notify callback to
267 `CollectionDirectoryBase.on_event` that an item was added, removed or
268 modified. FUSE inodes and directory entries are created, deleted or
269 invalidated in response to these events.
273 def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection, collection_root):
274 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write)
275 self.apiconfig = apiconfig
276 self.collection = collection
277 self.collection_root = collection_root
278 self.collection_record_file = None
280 def new_entry(self, name, item, mtime):
281 name = self.sanitize_filename(name)
282 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
283 if item.fuse_entry.dead is not True:
284 raise Exception("Can only reparent dead inode entry")
285 if item.fuse_entry.inode is None:
286 raise Exception("Reparented entry must still have valid inode")
287 item.fuse_entry.dead = False
288 self._entries[name] = item.fuse_entry
289 elif isinstance(item, arvados.collection.RichCollectionBase):
290 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item, self.collection_root))
291 self._entries[name].populate(mtime)
293 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
294 item.fuse_entry = self._entries[name]
296 def on_event(self, event, collection, name, item):
297 # These are events from the Collection object (ADD/DEL/MOD)
298 # emitted by operations on the Collection object (like
299 # "mkdirs" or "remove"), and by "update", which we need to
300 # synchronize with our FUSE objects that are assigned inodes.
301 if collection == self.collection:
302 name = self.sanitize_filename(name)
305 # It's possible for another thread to have llfuse.lock and
306 # be waiting on collection.lock. Meanwhile, we released
307 # llfuse.lock earlier in the stack, but are still holding
308 # on to the collection lock, and now we need to re-acquire
309 # llfuse.lock. If we don't release the collection lock,
310 # we'll deadlock where we're holding the collection lock
311 # waiting for llfuse.lock and the other thread is holding
312 # llfuse.lock and waiting for the collection lock.
314 # The correct locking order here is to take llfuse.lock
315 # first, then the collection lock.
317 # Since collection.lock is an RLock, it might be locked
318 # multiple times, so we need to release it multiple times,
319 # keep a count, then re-lock it the correct number of
325 self.collection.lock.release()
332 with self.collection.lock:
333 if event == arvados.collection.ADD:
334 self.new_entry(name, item, self.mtime())
335 elif event == arvados.collection.DEL:
336 ent = self._entries[name]
337 del self._entries[name]
338 self.inodes.invalidate_entry(self, name)
339 self.inodes.del_entry(ent)
340 elif event == arvados.collection.MOD:
341 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
342 self.inodes.invalidate_inode(item.fuse_entry)
343 elif name in self._entries:
344 self.inodes.invalidate_inode(self._entries[name])
346 if self.collection_record_file is not None:
347 self.collection_record_file.invalidate()
348 self.inodes.invalidate_inode(self.collection_record_file)
351 self.collection.lock.acquire()
354 def populate(self, mtime):
356 with self.collection.lock:
357 self.collection.subscribe(self.on_event)
358 for entry, item in self.collection.items():
359 self.new_entry(entry, item, self.mtime())
362 return self._enable_write and self.collection.writable()
366 self.collection_root.flush()
370 def create(self, name):
371 if not self.writable():
372 raise llfuse.FUSEError(errno.EROFS)
373 with llfuse.lock_released:
374 self.collection.open(name, "w").close()
378 def mkdir(self, name):
379 if not self.writable():
380 raise llfuse.FUSEError(errno.EROFS)
381 with llfuse.lock_released:
382 self.collection.mkdirs(name)
386 def unlink(self, name):
387 if not self.writable():
388 raise llfuse.FUSEError(errno.EROFS)
389 with llfuse.lock_released:
390 self.collection.remove(name)
395 def rmdir(self, name):
396 if not self.writable():
397 raise llfuse.FUSEError(errno.EROFS)
398 with llfuse.lock_released:
399 self.collection.remove(name)
404 def rename(self, name_old, name_new, src):
405 if not self.writable():
406 raise llfuse.FUSEError(errno.EROFS)
408 if not isinstance(src, CollectionDirectoryBase):
409 raise llfuse.FUSEError(errno.EPERM)
414 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
416 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
418 raise llfuse.FUSEError(errno.ENOTEMPTY)
419 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
420 raise llfuse.FUSEError(errno.ENOTDIR)
421 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
422 raise llfuse.FUSEError(errno.EISDIR)
424 with llfuse.lock_released:
425 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
430 super(CollectionDirectoryBase, self).clear()
431 self.collection = None
434 class CollectionDirectory(CollectionDirectoryBase):
435 """Represents the root of a directory tree representing a collection."""
437 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None):
438 super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self)
440 self.num_retries = num_retries
443 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
445 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
446 self._poll_time = 60*60
448 if isinstance(collection_record, dict):
449 self.collection_locator = collection_record['uuid']
450 self._mtime = convertTime(collection_record.get('modified_at'))
452 self.collection_locator = collection_record
454 self._manifest_size = 0
455 if self.collection_locator:
456 self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
457 self._updating_lock = threading.Lock()
460 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
463 return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
467 if not self.writable():
469 with llfuse.lock_released:
470 with self._updating_lock:
471 if self.collection.committed():
472 self.collection.update()
474 self.collection.save()
475 self.new_collection_record(self.collection.api_response())
477 def want_event_subscribe(self):
478 return (uuid_pattern.match(self.collection_locator) is not None)
480 def new_collection(self, new_collection_record, coll_reader):
483 self.collection = coll_reader
484 self.new_collection_record(new_collection_record)
485 self.populate(self.mtime())
487 def new_collection_record(self, new_collection_record):
488 if not new_collection_record:
489 raise Exception("invalid new_collection_record")
490 self._mtime = convertTime(new_collection_record.get('modified_at'))
491 self._manifest_size = len(new_collection_record["manifest_text"])
492 self.collection_locator = new_collection_record["uuid"]
493 if self.collection_record_file is not None:
494 self.collection_record_file.invalidate()
495 self.inodes.invalidate_inode(self.collection_record_file)
496 _logger.debug("%s invalidated collection record file", self)
500 return self.collection_locator
505 if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
506 # It's immutable, nothing to update
509 if self.collection_locator is None:
510 # No collection locator to retrieve from
514 new_collection_record = None
516 with llfuse.lock_released:
517 self._updating_lock.acquire()
521 _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
523 if self.collection is not None:
524 # Already have a collection object
525 self.collection.update()
526 new_collection_record = self.collection.api_response()
528 # too much prefetch and you end up stepping on your own transfers
529 # experimentally the optimal somewhere between 4 and 6
530 get_threads = min(max((self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 6)
531 # Create a new collection object
532 if uuid_pattern.match(self.collection_locator):
533 coll_reader = arvados.collection.Collection(
534 self.collection_locator, self.api, self.api.keep,
535 num_retries=self.num_retries,
536 get_threads=get_threads)
538 coll_reader = arvados.collection.CollectionReader(
539 self.collection_locator, self.api, self.api.keep,
540 num_retries=self.num_retries,
541 get_threads=get_threads)
542 new_collection_record = coll_reader.api_response() or {}
543 # If the Collection only exists in Keep, there will be no API
544 # response. Fill in the fields we need.
545 if 'uuid' not in new_collection_record:
546 new_collection_record['uuid'] = self.collection_locator
547 if "portable_data_hash" not in new_collection_record:
548 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
549 if 'manifest_text' not in new_collection_record:
550 new_collection_record['manifest_text'] = coll_reader.manifest_text()
551 if 'storage_classes_desired' not in new_collection_record:
552 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
554 # end with llfuse.lock_released, re-acquire lock
556 if new_collection_record is not None:
557 if coll_reader is not None:
558 self.new_collection(new_collection_record, coll_reader)
560 self.new_collection_record(new_collection_record)
564 self._updating_lock.release()
565 except arvados.errors.NotFoundError as e:
566 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
567 except arvados.errors.ArgumentError as detail:
568 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
569 if new_collection_record is not None and "manifest_text" in new_collection_record:
570 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
572 _logger.exception("arv-mount %s: error", self.collection_locator)
573 if new_collection_record is not None and "manifest_text" in new_collection_record:
574 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
579 def collection_record(self):
581 return self.collection.api_response()
585 def __getitem__(self, item):
586 if item == '.arvados#collection':
587 if self.collection_record_file is None:
588 self.collection_record_file = FuncToJSONFile(
589 self.inode, self.collection_record)
590 self.inodes.add_entry(self.collection_record_file)
591 self.invalidate() # use lookup as a signal to force update
592 return self.collection_record_file
594 return super(CollectionDirectory, self).__getitem__(item)
596 def __contains__(self, k):
597 if k == '.arvados#collection':
600 return super(CollectionDirectory, self).__contains__(k)
602 def invalidate(self):
603 if self.collection_record_file is not None:
604 self.collection_record_file.invalidate()
605 self.inodes.invalidate_inode(self.collection_record_file)
606 super(CollectionDirectory, self).invalidate()
609 return (self.collection_locator is not None)
612 # This is an empirically-derived heuristic to estimate the memory used
613 # to store this collection's metadata. Calculating the memory
614 # footprint directly would be more accurate, but also more complicated.
615 return self._manifest_size * 128
618 if self.collection is not None:
620 self.collection.save()
621 self.collection.stop_threads()
624 if self.collection is not None:
625 self.collection.stop_threads()
626 super(CollectionDirectory, self).clear()
627 self._manifest_size = 0
630 class TmpCollectionDirectory(CollectionDirectoryBase):
631 """A directory backed by an Arvados collection that never gets saved.
633 This supports using Keep as scratch space. A userspace program can
634 read the .arvados#collection file to get a current manifest in
635 order to save a snapshot of the scratch data or use it as a crunch
639 class UnsaveableCollection(arvados.collection.Collection):
645 def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, storage_classes=None):
646 collection = self.UnsaveableCollection(
647 api_client=api_client,
648 keep_client=api_client.keep,
649 num_retries=num_retries,
650 storage_classes_desired=storage_classes)
651 # This is always enable_write=True because it never tries to
652 # save to the backend
653 super(TmpCollectionDirectory, self).__init__(
654 parent_inode, inodes, api_client.config, True, collection, self)
655 self.populate(self.mtime())
657 def on_event(self, *args, **kwargs):
658 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
659 if self.collection_record_file is None:
662 # See discussion in CollectionDirectoryBase.on_event
666 self.collection.lock.release()
673 with self.collection.lock:
674 self.collection_record_file.invalidate()
675 self.inodes.invalidate_inode(self.collection_record_file)
676 _logger.debug("%s invalidated collection record", self)
679 self.collection.lock.acquire()
682 def collection_record(self):
683 with llfuse.lock_released:
686 "manifest_text": self.collection.manifest_text(),
687 "portable_data_hash": self.collection.portable_data_hash(),
688 "storage_classes_desired": self.collection.storage_classes_desired(),
691 def __contains__(self, k):
692 return (k == '.arvados#collection' or
693 super(TmpCollectionDirectory, self).__contains__(k))
696 def __getitem__(self, item):
697 if item == '.arvados#collection':
698 if self.collection_record_file is None:
699 self.collection_record_file = FuncToJSONFile(
700 self.inode, self.collection_record)
701 self.inodes.add_entry(self.collection_record_file)
702 return self.collection_record_file
703 return super(TmpCollectionDirectory, self).__getitem__(item)
714 def want_event_subscribe(self):
718 self.collection.stop_threads()
720 def invalidate(self):
721 if self.collection_record_file:
722 self.collection_record_file.invalidate()
723 super(TmpCollectionDirectory, self).invalidate()
726 class MagicDirectory(Directory):
727 """A special directory that logically contains the set of all extant keep locators.
729 When a file is referenced by lookup(), it is tested to see if it is a valid
730 keep locator to a manifest, and if so, loads the manifest contents as a
731 subdirectory of this directory with the locator as the directory name.
732 Since querying a list of all extant keep locators is impractical, only
733 collections that have already been accessed are visible to readdir().
738 This directory provides access to Arvados collections as subdirectories listed
739 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
740 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
741 (in the form 'zzzzz-j7d0g-1234567890abcde').
743 Note that this directory will appear empty until you attempt to access a
744 specific collection or project subdirectory (such as trying to 'cd' into it),
745 at which point the collection or project will actually be looked up on the server
746 and the directory will appear if it exists.
750 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, pdh_only=False, storage_classes=None):
751 super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
753 self.num_retries = num_retries
754 self.pdh_only = pdh_only
755 self.storage_classes = storage_classes
757 def __setattr__(self, name, value):
758 super(MagicDirectory, self).__setattr__(name, value)
759 # When we're assigned an inode, add a README.
760 if ((name == 'inode') and (self.inode is not None) and
761 (not self._entries)):
762 self._entries['README'] = self.inodes.add_entry(
763 StringFile(self.inode, self.README_TEXT, time.time()))
764 # If we're the root directory, add an identical by_id subdirectory.
765 if self.inode == llfuse.ROOT_INODE:
766 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
767 self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
770 def __contains__(self, k):
771 if k in self._entries:
774 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
780 if group_uuid_pattern.match(k):
781 project = self.api.groups().list(
782 filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
783 if project[u'items_available'] == 0:
785 e = self.inodes.add_entry(ProjectDirectory(
786 self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
787 project[u'items'][0], storage_classes=self.storage_classes))
789 e = self.inodes.add_entry(CollectionDirectory(
790 self.inode, self.inodes, self.api, self.num_retries, self._enable_write, k))
793 if k not in self._entries:
796 self.inodes.del_entry(e)
799 self.inodes.invalidate_entry(self, k)
800 self.inodes.del_entry(e)
802 except Exception as ex:
803 _logger.exception("arv-mount lookup '%s':", k)
805 self.inodes.del_entry(e)
808 def __getitem__(self, item):
810 return self._entries[item]
812 raise KeyError("No collection with id " + item)
817 def want_event_subscribe(self):
818 return not self.pdh_only
821 class TagsDirectory(Directory):
822 """A special directory that contains as subdirectories all tags visible to the user."""
824 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, poll_time=60):
825 super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
827 self.num_retries = num_retries
829 self._poll_time = poll_time
832 def want_event_subscribe(self):
837 with llfuse.lock_released:
838 tags = self.api.links().list(
839 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
840 select=['name'], distinct=True, limit=1000
841 ).execute(num_retries=self.num_retries)
843 self.merge(tags['items']+[{"name": n} for n in self._extra],
845 lambda a, i: a.tag == i['name'],
846 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
847 i['name'], poll=self._poll, poll_time=self._poll_time))
851 def __getitem__(self, item):
852 if super(TagsDirectory, self).__contains__(item):
853 return super(TagsDirectory, self).__getitem__(item)
854 with llfuse.lock_released:
855 tags = self.api.links().list(
856 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
857 ).execute(num_retries=self.num_retries)
859 self._extra.add(item)
861 return super(TagsDirectory, self).__getitem__(item)
865 def __contains__(self, k):
866 if super(TagsDirectory, self).__contains__(k):
876 class TagDirectory(Directory):
877 """A special directory that contains as subdirectories all collections visible
878 to the user that are tagged with a particular tag.
881 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, tag,
882 poll=False, poll_time=60):
883 super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
885 self.num_retries = num_retries
888 self._poll_time = poll_time
890 def want_event_subscribe(self):
895 with llfuse.lock_released:
896 taggedcollections = self.api.links().list(
897 filters=[['link_class', '=', 'tag'],
898 ['name', '=', self.tag],
899 ['head_uuid', 'is_a', 'arvados#collection']],
901 ).execute(num_retries=self.num_retries)
902 self.merge(taggedcollections['items'],
903 lambda i: i['head_uuid'],
904 lambda a, i: a.collection_locator == i['head_uuid'],
905 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid']))
908 class ProjectDirectory(Directory):
909 """A special directory that contains the contents of a project."""
911 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, project_object,
912 poll=True, poll_time=3, storage_classes=None):
913 super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
915 self.num_retries = num_retries
916 self.project_object = project_object
917 self.project_object_file = None
918 self.project_uuid = project_object['uuid']
920 self._poll_time = poll_time
921 self._updating_lock = threading.Lock()
922 self._current_user = None
923 self._full_listing = False
924 self.storage_classes = storage_classes
926 def want_event_subscribe(self):
929 def createDirectory(self, i):
930 if collection_uuid_pattern.match(i['uuid']):
931 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i)
932 elif group_uuid_pattern.match(i['uuid']):
933 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
934 i, self._poll, self._poll_time, self.storage_classes)
935 elif link_uuid_pattern.match(i['uuid']):
936 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
937 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid'])
940 elif uuid_pattern.match(i['uuid']):
941 return ObjectFile(self.parent_inode, i)
946 return self.project_uuid
949 self._full_listing = True
950 return super(ProjectDirectory, self).items()
954 if i['name'] is None or len(i['name']) == 0:
956 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
957 # collection or subproject
959 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
962 elif 'kind' in i and i['kind'].startswith('arvados#'):
964 return "{}.{}".format(i['name'], i['kind'][8:])
971 if self.project_object_file == None:
972 self.project_object_file = ObjectFile(self.inode, self.project_object)
973 self.inodes.add_entry(self.project_object_file)
975 if not self._full_listing:
979 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
980 return a.uuid() == i['uuid']
981 elif isinstance(a, ObjectFile):
982 return a.uuid() == i['uuid'] and not a.stale()
986 with llfuse.lock_released:
987 self._updating_lock.acquire()
991 if group_uuid_pattern.match(self.project_uuid):
992 self.project_object = self.api.groups().get(
993 uuid=self.project_uuid).execute(num_retries=self.num_retries)
994 elif user_uuid_pattern.match(self.project_uuid):
995 self.project_object = self.api.users().get(
996 uuid=self.project_uuid).execute(num_retries=self.num_retries)
997 # do this in 2 steps until #17424 is fixed
998 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
1000 num_retries=self.num_retries,
1001 uuid=self.project_uuid,
1002 filters=[["uuid", "is_a", "arvados#group"],
1003 ["groups.group_class", "in", ["project","filter"]]]))
1004 contents.extend(filter(lambda i: i["current_version_uuid"] == i["uuid"],
1005 arvados.util.keyset_list_all(self.api.groups().contents,
1007 num_retries=self.num_retries,
1008 uuid=self.project_uuid,
1009 filters=[["uuid", "is_a", "arvados#collection"]])))
1012 # end with llfuse.lock_released, re-acquire lock
1014 self.merge(contents,
1017 self.createDirectory)
1020 self._updating_lock.release()
1022 def _add_entry(self, i, name):
1023 ent = self.createDirectory(i)
1024 self._entries[name] = self.inodes.add_entry(ent)
1025 return self._entries[name]
1029 def __getitem__(self, k):
1030 if k == '.arvados#project':
1031 return self.project_object_file
1032 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1033 return super(ProjectDirectory, self).__getitem__(k)
1034 with llfuse.lock_released:
1035 k2 = self.unsanitize_filename(k)
1037 namefilter = ["name", "=", k]
1039 namefilter = ["name", "in", [k, k2]]
1040 contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
1041 ["group_class", "in", ["project","filter"]],
1043 limit=2).execute(num_retries=self.num_retries)["items"]
1045 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
1047 limit=2).execute(num_retries=self.num_retries)["items"]
1049 if len(contents) > 1 and contents[1]['name'] == k:
1050 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1052 contents = [contents[1]]
1053 name = self.sanitize_filename(self.namefn(contents[0]))
1056 return self._add_entry(contents[0], name)
1061 def __contains__(self, k):
1062 if k == '.arvados#project':
1074 if not self._enable_write:
1076 with llfuse.lock_released:
1077 if not self._current_user:
1078 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1079 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1081 def persisted(self):
1086 def mkdir(self, name):
1087 if not self.writable():
1088 raise llfuse.FUSEError(errno.EROFS)
1091 with llfuse.lock_released:
1093 "owner_uuid": self.project_uuid,
1095 "manifest_text": "" }
1096 if self.storage_classes is not None:
1097 c["storage_classes_desired"] = self.storage_classes
1099 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1100 except Exception as e:
1103 except apiclient_errors.Error as error:
1104 _logger.error(error)
1105 raise llfuse.FUSEError(errno.EEXIST)
1109 def rmdir(self, name):
1110 if not self.writable():
1111 raise llfuse.FUSEError(errno.EROFS)
1113 if name not in self:
1114 raise llfuse.FUSEError(errno.ENOENT)
1115 if not isinstance(self[name], CollectionDirectory):
1116 raise llfuse.FUSEError(errno.EPERM)
1117 if len(self[name]) > 0:
1118 raise llfuse.FUSEError(errno.ENOTEMPTY)
1119 with llfuse.lock_released:
1120 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1125 def rename(self, name_old, name_new, src):
1126 if not self.writable():
1127 raise llfuse.FUSEError(errno.EROFS)
1129 if not isinstance(src, ProjectDirectory):
1130 raise llfuse.FUSEError(errno.EPERM)
1134 if not isinstance(ent, CollectionDirectory):
1135 raise llfuse.FUSEError(errno.EPERM)
1137 if name_new in self:
1138 # POSIX semantics for replacing one directory with another is
1139 # tricky (the target directory must be empty, the operation must be
1140 # atomic which isn't possible with the Arvados API as of this
1141 # writing) so don't support that.
1142 raise llfuse.FUSEError(errno.EPERM)
1144 self.api.collections().update(uuid=ent.uuid(),
1145 body={"owner_uuid": self.uuid(),
1146 "name": name_new}).execute(num_retries=self.num_retries)
1148 # Acually move the entry from source directory to this directory.
1149 del src._entries[name_old]
1150 self._entries[name_new] = ent
1151 self.inodes.invalidate_entry(src, name_old)
1154 def child_event(self, ev):
1155 properties = ev.get("properties") or {}
1156 old_attrs = properties.get("old_attributes") or {}
1157 new_attrs = properties.get("new_attributes") or {}
1158 old_attrs["uuid"] = ev["object_uuid"]
1159 new_attrs["uuid"] = ev["object_uuid"]
1160 old_name = self.sanitize_filename(self.namefn(old_attrs))
1161 new_name = self.sanitize_filename(self.namefn(new_attrs))
1163 # create events will have a new name, but not an old name
1164 # delete events will have an old name, but not a new name
1165 # update events will have an old and new name, and they may be same or different
1166 # if they are the same, an unrelated field changed and there is nothing to do.
1168 if old_attrs.get("owner_uuid") != self.project_uuid:
1169 # Was moved from somewhere else, so don't try to remove entry.
1171 if ev.get("object_owner_uuid") != self.project_uuid:
1172 # Was moved to somewhere else, so don't try to add entry
1175 if old_attrs.get("is_trashed"):
1176 # Was previously deleted
1178 if new_attrs.get("is_trashed"):
1182 if new_name != old_name:
1184 if old_name in self._entries:
1185 ent = self._entries[old_name]
1186 del self._entries[old_name]
1187 self.inodes.invalidate_entry(self, old_name)
1191 self._entries[new_name] = ent
1193 self._add_entry(new_attrs, new_name)
1194 elif ent is not None:
1195 self.inodes.del_entry(ent)
1198 class SharedDirectory(Directory):
1199 """A special directory that represents users or groups who have shared projects with me."""
1201 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, exclude,
1202 poll=False, poll_time=60, storage_classes=None):
1203 super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
1205 self.num_retries = num_retries
1206 self.current_user = api.users().current().execute(num_retries=num_retries)
1208 self._poll_time = poll_time
1209 self._updating_lock = threading.Lock()
1210 self.storage_classes = storage_classes
1215 with llfuse.lock_released:
1216 self._updating_lock.acquire()
1217 if not self.stale():
1225 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1226 if 'httpMethod' in methods.get('shared', {}):
1229 resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1233 include="owner_uuid").execute()
1234 if not resp["items"]:
1236 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1237 for r in resp["items"]:
1238 objects[r["uuid"]] = r
1239 roots.append(r["uuid"])
1240 for r in resp["included"]:
1241 objects[r["uuid"]] = r
1242 root_owners.add(r["uuid"])
1244 all_projects = list(arvados.util.keyset_list_all(
1245 self.api.groups().list,
1247 num_retries=self.num_retries,
1248 filters=[['group_class','in',['project','filter']]],
1249 select=["uuid", "owner_uuid"]))
1250 for ob in all_projects:
1251 objects[ob['uuid']] = ob
1253 current_uuid = self.current_user['uuid']
1254 for ob in all_projects:
1255 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1256 roots.append(ob['uuid'])
1257 root_owners.add(ob['owner_uuid'])
1259 lusers = arvados.util.keyset_list_all(
1260 self.api.users().list,
1262 num_retries=self.num_retries,
1263 filters=[['uuid','in', list(root_owners)]])
1264 lgroups = arvados.util.keyset_list_all(
1265 self.api.groups().list,
1267 num_retries=self.num_retries,
1268 filters=[['uuid','in', list(root_owners)+roots]])
1271 objects[l["uuid"]] = l
1273 objects[l["uuid"]] = l
1275 for r in root_owners:
1279 contents[obr["name"]] = obr
1280 elif "first_name" in obr:
1281 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1286 if obr['owner_uuid'] not in objects:
1287 contents[obr["name"]] = obr
1289 # end with llfuse.lock_released, re-acquire lock
1291 self.merge(contents.items(),
1293 lambda a, i: a.uuid() == i[1]['uuid'],
1294 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
1295 i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1297 _logger.exception("arv-mount shared dir error")
1299 self._updating_lock.release()
1301 def want_event_subscribe(self):